GuidesData Sync

Export to Kafka

Tacnode DataSync supports exporting CDC (Change Data Capture) events to downstream Kafka clusters, enabling seamless integration with downstream systems and facilitating real-time data processing workflows.

Export Formats

Data samples in this guide are based on the following example table:

CREATE TABLE tx(
    hash text,
    maker text,
    ts timestamp,
    volume numeric(160,60),
    PRIMARY KEY(hash)
);

KV JSON Format

Outputs only the updated data, ideal for frontend state display and state synchronization scenarios.

Example Output:

// Executing SQL: INSERT INTO tx VALUES('0496f994-3c42-4cf3-86f0-7faf20699bd5', '840', NOW(), 230.33037109759)
// Output content:
{
    "hash": "0496f994-3c42-4cf3-86f0-7faf20699bd5",
    "maker": "840",
    "ts": "2025-09-09 16:56:56.449000",
    "volume": 230.33037109759
}

Use Cases:

  • Frontend state synchronization
  • Real-time dashboard updates
  • Simple event streaming
  • Microservice state propagation

Maxwell JSON Format

Maxwell is a CDC JSON protocol that completely records all INSERT/UPDATE/DELETE operations.

Field Descriptions

FieldDescription
databaseDatabase where the change operation occurred
tableTable where the change operation occurred
typeOperation type (insert/update/delete)
tsChange timestamp (Unix timestamp)
xidChange transaction ID
commitWhether the transaction is committed
dataRow data after the change
oldPrevious row data (for updates)

Data Examples

INSERT Operation:

// Executing: insert into tx VALUES(gen_random_uuid(), (random() * 1024)::bigint::text, now(), random() * 1024);
{
    "database": "test",
    "table": "tx",
    "type": "insert",
    "ts": 1757417323,
    "xid": 616699495,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": "503",
        "ts": "2025-09-09 19:28:43.484600",
        "volume": 505.356811461980000000000000000000000000000000000000000000000000
    }
}

UPDATE Operation:

// Executing: UPDATE tx SET volume = 1024 WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
    "database": "test",
    "table": "tx",
    "type": "update",
    "ts": 1757417442,
    "xid": -1527714998,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": "503",
        "ts": "2025-09-09 19:28:43.484600",
        "volume": 1024.000000000000000000000000000000000000000000000000000000000000
    },
    "old": {
        // old defaults to outputting only primary keys, modify subscription config to output complete old values
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": null,
        "ts": null,
        "volume": null
    }
}

DELETE Operation:

// Executing: DELETE FROM tx WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
    "database": "test",
    "table": "tx",
    "type": "delete",
    "ts": 1757417562,
    "xid": -448725205,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": null,
        "ts": null,
        "volume": null
    }
}

Export Job Configuration

1. Create Export Task

  1. Log into the Tacnode console
  2. Navigate to "Data Sync" → "Data Export" → "Data Export Jobs"

2. Select Target Type

When creating a task, select Kafka as the output target:

3. Configure Connection Information

Source Configuration

  1. Select the Tacnode instance you want to sync from on the left side
  2. Click "Test Connection" to test the connection
  3. A successful test will display "Connection Successful"

Sink Configuration (Kafka)

  1. Fill in the target Kafka connection information on the right side

  2. Network Configuration:

    • First resolve network connectivity between DataSync and target Kafka
    • Bootstrap Server: Enter Kafka server address and port
    • For CDC push over public network, modify Kafka's advertised.listeners property to avoid connection issues
  3. Security Protocol: Supports "PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL" - choose based on your requirements

    • PLAINTEXT: Plain text transmission, suitable for internal networks
    • SASL_PLAINTEXT: SASL authentication + plain text transmission, suitable for scenarios requiring authentication but not encryption
    • SASL_SSL: SASL authentication + SSL encrypted transmission, suitable for public networks or high-security requirements
  4. Click "Test Connection" after configuration

  5. A successful test will display "Connection Successful"

Example Configuration:

kafka_config:
  bootstrap_servers: "kafka-cluster:9092"
  security_protocol: "SASL_SSL"
  sasl_mechanism: "PLAIN"
  sasl_username: "your_username"
  sasl_password: "your_password"
  ssl_config:
    ssl_ca_location: "/path/to/ca-cert"
    ssl_certificate_location: "/path/to/client-cert"
    ssl_key_location: "/path/to/client-key"

4. Select Sync Objects

Choose the schemas and tables you want to synchronize:

Optional: Sync Materialized Views (MV)

If you need to sync materialized views, check the Mview category in the configuration.

Object Selection Examples:

-- Query available schemas
SELECT schema_name FROM information_schema.schemata 
WHERE schema_name NOT IN ('information_schema', 'pg_catalog');
 
-- Query tables in specific schema
SELECT table_name FROM information_schema.tables 
WHERE table_schema = 'your_schema' AND table_type = 'BASE TABLE';
 
-- Query materialized views
SELECT schemaname, matviewname 
FROM pg_matviews 
WHERE schemaname = 'your_schema';

5. Output Configuration

Sync Mode

Three sync mode options are available:

  1. FULL: Reads the current data snapshot of the entire table, writes to Kafka, then exits the task. Suitable for one-time data migration scenarios.

  2. INCREMENTAL: Only reads incremental CDC events and continuously writes to Kafka. Suitable for real-time data sync scenarios.

  3. FULL + INCREMENTAL: First reads the current data snapshot of the table, then continuously pushes all CDC JSON starting from the snapshot point. Suitable for scenarios requiring both historical data and real-time sync.

Mode Selection Guidelines:

# Use case examples
migration_scenario:
  mode: "FULL"
  description: "One-time historical data migration"
  
real_time_scenario:
  mode: "INCREMENTAL" 
  description: "Real-time change tracking only"
  
complete_sync_scenario:
  mode: "FULL + INCREMENTAL"
  description: "Complete data replication with ongoing sync"

Other Configuration Items

  • Topic: Kafka topic name to write to (must be pre-created)
  • Zone ID: Task timezone, defaults to UTC

Topic Management:

# Create Kafka topic
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic tacnode-cdc-events \
  --partitions 3 \
  --replication-factor 2
 
# Verify topic creation
kafka-topics.sh --list \
  --bootstrap-server localhost:9092
 
# Describe topic configuration
kafka-topics.sh --describe \
  --bootstrap-server localhost:9092 \
  --topic tacnode-cdc-events

6. Advanced Parameter Configuration

Timezone Handling

Zone ID is a JDK-based timezone handling parameter. Since the Maxwell format doesn't have a timestamptz type, PostgreSQL's timestamptz to Maxwell conversion will format according to the configured timezone and then remove timezone information. Other fields follow Maxwell standards and remain consistent with Maxwell tools.

Valid Zone ID values reference Java timezone identifiers, commonly used ones include:

  • UTC: Coordinated Universal Time (default)
  • Asia/Shanghai: China Standard Time
  • America/New_York: US Eastern Time
  • Europe/London: UK Time
  • Japan: Japan Time

Example Timezone Configuration:

{
  "export_config": {
    "zone_id": "Asia/Shanghai",
    "timestamp_format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
    "timezone_conversion": {
      "source_timezone": "UTC",
      "target_timezone": "Asia/Shanghai",
      "preserve_original": false
    }
  }
}

Other Parameter Descriptions

ParameterDescriptionDefault
output_binlog_positionWhether to output binlog position information (PG position)false
output_server_idWhether to output server ID (mock implementation)false
output_thread_idWhether to output thread ID (mock implementation)false
output_schema_idSchema change incremental IDfalse
output_primary_keysWhether to output primary key valuestrue
output_primary_key_columnsWhether to output primary key column namesfalse
output_push_timestampWhether to output send timestampfalse
kafka_key_formatKafka key generation method"primary_key"
producer_partition_byPartitioning strategy"key"
producer_partition_columnsPartition column names (for column-based partitioning)null
producer_partition_by_fallbackFallback method when partitioning strategy fails"random"
kafka_partition_hashPartition hash algorithm"murmur2"

Advanced Configuration Example:

{
  "advanced_config": {
    "output_primary_keys": true,
    "output_push_timestamp": true,
    "kafka_key_format": "table_primary_key",
    "producer_config": {
      "partition_by": "column",
      "partition_columns": ["hash"],
      "partition_by_fallback": "round_robin",
      "hash_algorithm": "murmur2"
    },
    "performance_tuning": {
      "batch_size": 1000,
      "linger_ms": 100,
      "compression_type": "snappy",
      "acks": "all"
    }
  }
}

Monitoring and Troubleshooting

Export Job Monitoring

-- Monitor export job status
SELECT 
    job_id,
    job_name,
    source_instance,
    target_kafka_cluster,
    sync_mode,
    status,
    records_exported,
    last_export_time,
    error_message
FROM datasync_export_jobs 
WHERE target_type = 'kafka'
ORDER BY last_export_time DESC;
 
-- Monitor partition-level metrics
SELECT 
    topic,
    partition,
    current_offset,
    lag,
    throughput_per_sec
FROM kafka_partition_metrics
WHERE topic = 'tacnode-cdc-events';

Common Issues and Solutions

Connection Issues:

# Test Kafka connectivity
telnet kafka-broker 9092
 
# Verify Kafka cluster health
kafka-broker-api-versions.sh --bootstrap-server kafka-broker:9092
 
# Check topic permissions
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --list --topic tacnode-cdc-events

Performance Optimization:

# Producer optimization
producer_config:
  batch_size: 16384
  linger_ms: 5
  compression_type: "snappy"
  buffer_memory: 33554432
  max_request_size: 1048576
  
# Consumer optimization (for downstream)
consumer_config:
  fetch_min_bytes: 1024
  fetch_max_wait_ms: 500
  max_partition_fetch_bytes: 1048576

Best Practices

  1. Topic Design:

    • Use appropriate partition count based on throughput requirements
    • Set adequate replication factor for durability
    • Configure retention policies based on downstream consumption patterns
  2. Security Configuration:

    • Use SSL/TLS for data in transit
    • Implement proper authentication and authorization
    • Regularly rotate credentials
  3. Performance Optimization:

    • Tune batch sizes based on latency requirements
    • Monitor and adjust partition assignments
    • Use compression for large messages
  4. Error Handling:

    • Implement dead letter queues for failed messages
    • Set up alerting for export job failures
    • Configure retry policies for transient failures

This comprehensive configuration enables robust, scalable CDC event streaming from TacNode to Kafka, supporting diverse real-time data processing and integration scenarios.