Export to Kafka
Learn how to export Tacnode CDC (Change Data Capture) events to Kafka for downstream integration and real-time data processing. Complete configuration guide with format options and best practices.
Tacnode DataSync supports exporting CDC (Change Data Capture) events to downstream Kafka clusters, enabling seamless integration with downstream systems and facilitating real-time data processing workflows.
Export Formats
Data samples in this guide are based on the following example table:
CREATE TABLE tx(
hash text,
maker text,
ts timestamp,
volume numeric(160,60),
PRIMARY KEY(hash)
);
KV JSON Format
Outputs only the updated data, ideal for frontend state display and state synchronization scenarios.
Example Output:
// Executing SQL: INSERT INTO tx VALUES('0496f994-3c42-4cf3-86f0-7faf20699bd5', '840', NOW(), 230.33037109759)
// Output content:
{
"hash": "0496f994-3c42-4cf3-86f0-7faf20699bd5",
"maker": "840",
"ts": "2025-09-09 16:56:56.449000",
"volume": 230.33037109759
}
Use Cases:
- Frontend state synchronization
- Real-time dashboard updates
- Simple event streaming
- Microservice state propagation
Maxwell JSON Format
Maxwell is a CDC JSON protocol that completely records all INSERT/UPDATE/DELETE operations.
Field Descriptions
| Field | Description |
|---|---|
database | Database where the change operation occurred |
table | Table where the change operation occurred |
type | Operation type (insert/update/delete) |
ts | Change timestamp (Unix timestamp) |
xid | Change transaction ID |
commit | Whether the transaction is committed |
data | Row data after the change |
old | Previous row data (for updates) |
Data Examples
INSERT Operation:
// Executing: insert into tx VALUES(gen_random_uuid(), (random() * 1024)::bigint::text, now(), random() * 1024);
{
"database": "test",
"table": "tx",
"type": "insert",
"ts": 1757417323,
"xid": 616699495,
"commit": true,
"data": {
"hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
"maker": "503",
"ts": "2025-09-09 19:28:43.484600",
"volume": 505.356811461980000000000000000000000000000000000000000000000000
}
}
UPDATE Operation:
// Executing: UPDATE tx SET volume = 1024 WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
"database": "test",
"table": "tx",
"type": "update",
"ts": 1757417442,
"xid": -1527714998,
"commit": true,
"data": {
"hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
"maker": "503",
"ts": "2025-09-09 19:28:43.484600",
"volume": 1024.000000000000000000000000000000000000000000000000000000000000
},
"old": {
// old defaults to outputting only primary keys, modify subscription config to output complete old values
"hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
"maker": null,
"ts": null,
"volume": null
}
}
DELETE Operation:
// Executing: DELETE FROM tx WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
"database": "test",
"table": "tx",
"type": "delete",
"ts": 1757417562,
"xid": -448725205,
"commit": true,
"data": {
"hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
"maker": null,
"ts": null,
"volume": null
}
}
Export Job Configuration
1. Create Export Task
- Log into the Tacnode console
- Navigate to “Data Sync” → “Data Export” → “Data Export Jobs”
2. Select Target Type
When creating a task, select Kafka as the output target:
3. Configure Connection Information
Source Configuration
- Select the Tacnode instance you want to sync from on the left side
- Click “Test Connection” to test the connection
- A successful test will display “Connection Successful”
Sink Configuration (Kafka)
-
Fill in the target Kafka connection information on the right side
-
Network Configuration:
- First resolve network connectivity between DataSync and target Kafka
- Bootstrap Server: Enter Kafka server address and port
- For CDC push over public network, modify Kafka’s
advertised.listenersproperty to avoid connection issues
-
Security Protocol: Supports “PLAINTEXT”, “SASL_PLAINTEXT”, “SASL_SSL” - choose based on your requirements
PLAINTEXT: Plain text transmission, suitable for internal networksSASL_PLAINTEXT: SASL authentication + plain text transmission, suitable for scenarios requiring authentication but not encryptionSASL_SSL: SASL authentication + SSL encrypted transmission, suitable for public networks or high-security requirements
-
Click “Test Connection” after configuration
-
A successful test will display “Connection Successful”
Example Configuration:
kafka_config:
bootstrap_servers: "kafka-cluster:9092"
security_protocol: "SASL_SSL"
sasl_mechanism: "PLAIN"
sasl_username: "your_username"
sasl_password: "your_password"
ssl_config:
ssl_ca_location: "/path/to/ca-cert"
ssl_certificate_location: "/path/to/client-cert"
ssl_key_location: "/path/to/client-key"
4. Select Sync Objects
Choose the schemas and tables you want to synchronize:
Optional: Sync Materialized Views (MV)
If you need to sync materialized views, check the Mview category in the configuration.
Object Selection Examples:
-- Query available schemas
SELECT schema_name FROM information_schema.schemata
WHERE schema_name NOT IN ('information_schema', 'pg_catalog');
-- Query tables in specific schema
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'your_schema' AND table_type = 'BASE TABLE';
-- Query materialized views
SELECT schemaname, matviewname
FROM pg_matviews
WHERE schemaname = 'your_schema';
5. Output Configuration
Sync Mode
Three sync mode options are available:
-
FULL: Reads the current data snapshot of the entire table, writes to Kafka, then exits the task. Suitable for one-time data migration scenarios.
-
INCREMENTAL: Only reads incremental CDC events and continuously writes to Kafka. Suitable for real-time data sync scenarios.
-
FULL + INCREMENTAL: First reads the current data snapshot of the table, then continuously pushes all CDC JSON starting from the snapshot point. Suitable for scenarios requiring both historical data and real-time sync.
Mode Selection Guidelines:
# Use case examples
migration_scenario:
mode: "FULL"
description: "One-time historical data migration"
real_time_scenario:
mode: "INCREMENTAL"
description: "Real-time change tracking only"
complete_sync_scenario:
mode: "FULL + INCREMENTAL"
description: "Complete data replication with ongoing sync"
Other Configuration Items
- Topic: Kafka topic name to write to (must be pre-created)
- Zone ID: Task timezone, defaults to UTC
Topic Management:
# Create Kafka topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic tacnode-cdc-events \
--partitions 3 \
--replication-factor 2
# Verify topic creation
kafka-topics.sh --list \
--bootstrap-server localhost:9092
# Describe topic configuration
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic tacnode-cdc-events
6. Advanced Parameter Configuration
Timezone Handling
Zone ID is a JDK-based timezone handling parameter. Since the Maxwell format doesn’t have a timestamptz type, PostgreSQL’s timestamptz to Maxwell conversion will format according to the configured timezone and then remove timezone information. Other fields follow Maxwell standards and remain consistent with Maxwell tools.
Valid Zone ID values reference Java timezone identifiers, commonly used ones include:
UTC: Coordinated Universal Time (default)Asia/Shanghai: China Standard TimeAmerica/New_York: US Eastern TimeEurope/London: UK TimeJapan: Japan Time
Example Timezone Configuration:
{
"export_config": {
"zone_id": "Asia/Shanghai",
"timestamp_format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"timezone_conversion": {
"source_timezone": "UTC",
"target_timezone": "Asia/Shanghai",
"preserve_original": false
}
}
}
Other Parameter Descriptions
| Parameter | Description | Default |
|---|---|---|
output_binlog_position | Whether to output binlog position information (PG position) | false |
output_server_id | Whether to output server ID (mock implementation) | false |
output_thread_id | Whether to output thread ID (mock implementation) | false |
output_schema_id | Schema change incremental ID | false |
output_primary_keys | Whether to output primary key values | true |
output_primary_key_columns | Whether to output primary key column names | false |
output_push_timestamp | Whether to output send timestamp | false |
kafka_key_format | Kafka key generation method | ”primary_key” |
producer_partition_by | Partitioning strategy | ”key” |
producer_partition_columns | Partition column names (for column-based partitioning) | null |
producer_partition_by_fallback | Fallback method when partitioning strategy fails | ”random” |
kafka_partition_hash | Partition hash algorithm | ”murmur2” |
Advanced Configuration Example:
{
"advanced_config": {
"output_primary_keys": true,
"output_push_timestamp": true,
"kafka_key_format": "table_primary_key",
"producer_config": {
"partition_by": "column",
"partition_columns": ["hash"],
"partition_by_fallback": "round_robin",
"hash_algorithm": "murmur2"
},
"performance_tuning": {
"batch_size": 1000,
"linger_ms": 100,
"compression_type": "snappy",
"acks": "all"
}
}
}
Monitoring and Troubleshooting
Export Job Monitoring
-- Monitor export job status
SELECT
job_id,
job_name,
source_instance,
target_kafka_cluster,
sync_mode,
status,
records_exported,
last_export_time,
error_message
FROM datasync_export_jobs
WHERE target_type = 'kafka'
ORDER BY last_export_time DESC;
-- Monitor partition-level metrics
SELECT
topic,
partition,
current_offset,
lag,
throughput_per_sec
FROM kafka_partition_metrics
WHERE topic = 'tacnode-cdc-events';
Common Issues and Solutions
Connection Issues:
# Test Kafka connectivity
telnet kafka-broker 9092
# Verify Kafka cluster health
kafka-broker-api-versions.sh --bootstrap-server kafka-broker:9092
# Check topic permissions
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--list --topic tacnode-cdc-events
Performance Optimization:
# Producer optimization
producer_config:
batch_size: 16384
linger_ms: 5
compression_type: "snappy"
buffer_memory: 33554432
max_request_size: 1048576
# Consumer optimization (for downstream)
consumer_config:
fetch_min_bytes: 1024
fetch_max_wait_ms: 500
max_partition_fetch_bytes: 1048576
Best Practices
-
Topic Design:
- Use appropriate partition count based on throughput requirements
- Set adequate replication factor for durability
- Configure retention policies based on downstream consumption patterns
-
Security Configuration:
- Use SSL/TLS for data in transit
- Implement proper authentication and authorization
- Regularly rotate credentials
-
Performance Optimization:
- Tune batch sizes based on latency requirements
- Monitor and adjust partition assignments
- Use compression for large messages
-
Error Handling:
- Implement dead letter queues for failed messages
- Set up alerting for export job failures
- Configure retry policies for transient failures
This comprehensive configuration enables robust, scalable CDC event streaming from TacNode to Kafka, supporting diverse real-time data processing and integration scenarios.