Apache Flink

Tacnode is a high-performance, distributed database compatible with the PostgreSQL protocol. Supports standard SQL syntax and the PostgreSQL ecosystem toolchain. This article explains how to efficiently write data to Tacnode using Apache Flink, focusing on two approaches: the DataStream API and Flink SQL. It also covers how to read CDC (Change Data Capture) incremental changes via Apache Flink.

Environment Preparation

Version Compatibility

  • Flink version: 1.14+ (1.16+ recommended)
  • JDBC driver: PostgreSQL JDBC driver (42.5+ recommended)

Dependency Configuration

Add the PostgreSQL JDBC driver dependency to your Flink project:

<!-- Maven configuration -->
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.5.0</version>
</dependency>
Flink SQL TypeTacnode Type
BOOLEANBOOLEAN
TINYINTSMALLINT
SMALLINTSMALLINT
INTINTEGER
BIGINTBIGINT
FLOATREAL
DOUBLEDOUBLE PRECISION
DECIMAL(p,s)NUMERIC(p,s)
VARCHAR(n)VARCHAR(n)
CHAR(n)CHAR(n)
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
ARRAYARRAY
MAPJSONB
ROWJSONB

Writing with the DataStream API

Using the JDBC Sink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
// Define data stream
DataStream<User> userStream = env.addSource(...);
 
// Configure JDBC connection options
JdbcConnectionOptions jdbcOpts = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:postgresql://tacnode-host:5432/dbname")
    .withDriverName("org.postgresql.Driver")
    .withUsername("your-username")
    .withPassword("your-password")
    .build();
 
// Create JDBC Sink
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setInt(1, user.getId());
        ps.setString(2, user.getName());
        ps.setInt(3, user.getAge());
    },
    jdbcOpts));
 
env.execute("Tacnode Sink Job");

Batch Write Optimization

JdbcExecutionOptions execOpts = new JdbcExecutionOptions.Builder()
    .withBatchSize(1000)              // Records per batch
    .withBatchIntervalMs(200)         // Batch interval (ms)
    .withMaxRetries(3)                // Max retry attempts
    .build();
 
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setInt(1, user.getId());
        ps.setString(2, user.getName());
        ps.setInt(3, user.getAge());
    },
    execOpts,
    jdbcOpts));

Registering the Tacnode Catalog

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
// Register Tacnode Catalog
tableEnv.executeSql(
    "CREATE CATALOG Tacnode WITH (" +
    "  'type'='jdbc'," +
    "  'default-database'='dbname'," +
    "  'username'='your-username'," +
    "  'password'='your-password'," +
    "  'base-url'='jdbc:postgresql://tacnode-host:5432'" +
    ")");
 
// Set active catalog
tableEnv.useCatalog("Tacnode");
// Register Flink source table
tableEnv.executeSql(
    "CREATE TABLE source_table (" +
    "  id INT," +
    "  name STRING," +
    "  age INT" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user_topic'," +
    "  'properties.bootstrap.servers' = 'kafka:9092'," +
    "  'format' = 'json'" +
    ")");
 
// Perform insertion
tableEnv.executeSql(
    "INSERT INTO users " +
    "SELECT id, name, age FROM source_table");

Alternatively, submit via the Flink SQL client or web UI:

CREATE TEMPORARY TABLE users_sink (
 id INTEGER,
 name STRING,
 age INTEGER
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:postgresql://tacnode-host:5432/dbname',
 'table-name' = 'users',
 'username' = 'your-username',
 'password' = 'your-password',
 'driver' = 'org.postgresql.Driver',
 'sink.buffer-flush.max-rows' = '1000',
 'sink.buffer-flush.interval' = '1s',
 'sink.max-retries' = '3'
);
 
INSERT INTO users_sink
SELECT * FROM source_table;

Common Configuration Parameters

Parameter NameDescriptionRecommended Value
sink.buffer-flush.max-rowsMax number of rows per batch1000-5000
sink.buffer-flush.intervalBatch flush interval1s
sink.max-retriesMax retry attempts3
sink.parallelismSink parallelismNumber of Tacnode nodes
connection.max-retry-timeoutConnection timeout30s

For more configuration options, see: Apache Flink JDBC SQL Connector.

Consume CDC By postgres-cdc connector

Flink retrieves CDC data from Tacnode using the postgres-cdc connector.

Preconditions

  • Database must have logical replication enabled (WAL_LEVEL=logical)
  • Flink runtime and Tacnode must be network-accessible to each other
  • Database user requires Replication privilege, and permission to create replication slots and publications if needed

See change-data-capture for details.

Publication & Slot Resource Management

Create publications in advance. Publications, as defined by the PostgreSQL protocol, manage server-side event publishing and filtering; specific tables or events can be selected for publishing, and partitioned table events can be aggregated to parent tables. If no explicit publication is created, the Flink CDC connector will automatically generate a default dbz_publication, which subscribes to all tables and change events. This default leads to local filtering, lower efficiency, and higher network overhead. Specify the publication in the Flink connector by setting 'debezium.publication.name' = 'pub-name'. Multiple Flink jobs can share the same publication.

Slots track CDC consumer progress (current LSN). Each consumer should use a unique slot to maintain an independent consumption state. It is recommended to pre-create slots with the noexport_snapshot parameter for efficient storage usage. Each Flink job should use a distinct slot. Example:

SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);

Manually clean up unused slots after stopping a Flink consumer job to release pinned storage:

SELECT pg_drop_replication_slot('flink_cdc_slot');

Slot state is updated when Flink checkpoints are triggered; each checkpoint updates the slot's LSN. Unchecked slots will keep old WAL segments pinned, causing wasted storage. CDC consumption can resume only from the latest completed checkpoint—historical checkpoints cannot be selected. Each checkpoint tracks the LSN confirmed for consumption. Once confirmed, slots advance, and older LSNs are no longer available. Regardless of rollback attempts, jobs always resume from the most recent successful checkpoint.

The following example demonstrates reading from a Source table using CDC, then writing results into a Sink table.

CREATE TEMPORARY TABLE source_cdc (
        ins_time timestamp(6) NULL,
        store_id string NULL,
        store_code string NOT NULL,
        store_name string NULL,
        time_section_no bigint NULL,
        stat_date string NOT NULL,
        PRIMARY KEY (store_code,stat_date) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'host-url',
  'port' = '5432',
  'username' = 'xxxx',
  'password' = 'xxxx',
  'database-name' = 'dbname',
  'schema-name' = 'schemaname',
  'table-name' = 'tablename',
  -- It is recommended to manually create the slot in the database and handle its lifecycle. Without manual creation, Flink CDC automatically creates the slot at startup; automatically created slots lack the NOEXPORT_SNAPSHOT property and may use extra storage.
  'slot.name' = 'flink_cdc_slot',
  -- Decoding plugin required; only pgoutput is supported.
  'decoding.plugin.name' = 'pgoutput',
  -- Currently only supports 'never', which syncs only real-time changes from the slot’s creation, without performing a full snapshot. Full data reading is not supported in the current engine.
  'debezium.snapshot.mode' = 'never',
  -- When using pgoutput decoding, a specific publication can be assigned (optional). If omitted, dbz_publication is automatically created, subscribing to all table updates.
  'debezium.pulication.name' = 'pub-name'
);
 
CREATE TEMPORARY TABLE sink_kafka(
        store_id string NULL,
        cnt bigint NOT NULL,
        PRIMARY KEY (store_id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic-name',
  'properties.bootstrap.servers' = 'servers',
  'key.format' = 'json',
  'key.fields' = 'store_id',
  'value.format' =  'debezium-json',
  'value.debezium-json.schema-include' = 'false'
);
 
INSERT INTO sink_kafka
SELECT store_id, count(*) as cnt
FROM source_cdc GROUP BY store_id;

Monitoring and Operations

Monitoring SQL Jobs

Monitor with Flink Web UI:

  • Write throughput (records/s)
  • Operator backpressure status
  • Checkpoint state

Logging Configuration

# log4j.properties
log4j.logger.org.apache.flink.table.runtime=INFO
log4j.logger.org.postgresql=INFO

Troubleshooting

Type Mapping Issues

Symptom: Write failures because of field type mismatch.

Resolution:

  1. Explicitly specify type mapping in DDL:
CREATE TABLE Tacnode_sink (
  id INT,
  name VARCHAR(100),        -- Set length explicitly
  create_time TIMESTAMP(3)
) WITH (...);
  1. Use CAST to convert types:
INSERT INTO Tacnode_sink
SELECT id, name, CAST(create_time AS TIMESTAMP(3)) FROM source_table;

Performance Bottlenecks

Symptom: Low SQL job throughput.

Resolution:

  1. Increase parallelism
  2. Tune batch parameters
  3. Check Tacnode cluster load
  4. Optimize SQL query logic