Change Data Capture (CDC)

Tacnode offers comprehensive logical replication capabilities, supporting data change capture via Change Data Capture (CDC) mechanisms. This documentation focuses on how to efficiently capture data changes using the core concepts of publications and replication slots.

Core Concepts

  • Publication

Defines the tables and change types (INSERT, UPDATE, DELETE) to be replicated from the database. Supports creating publications for specific tables or for all tables.

  • Replication Slot

Server-side mechanism that tracks the consumer’s replication progress, ensuring that Write-Ahead Log (WAL) entries are not deleted before consumers acknowledge receipt of changes. Guarantees at-least-once delivery for consumers.

Other Concepts

  • Logical Decoding

Converts database Write-Ahead Log (WAL), typically stored in an internal format, into a client-readable format in real time as data is written to the database.

  • Decoding Plugins

Determine the output format of the data stream. Tacnode supports two output plugins: test_decoding and pgoutput. test_decoding outputs a SQL-like textual format intended for testing and validation. pgoutput provides a more performant binary format that complies with the PostgreSQL logical replication protocol; decoding is required on the client side. Flink CDC uses the pgoutput format.

For more information on encoding protocols, see Logical Replication Message Formats.

Prerequisites

To enable the CDC feature at the database level, set WAL_LEVEL to logical (higher than the default replica level). Changing WAL_LEVEL requires re-establishing the connection for the change to take effect.

-- check current setting
SHOW wal_level;
 
ALTER DATABASE <db_name> SET WAL_LEVEL TO LOGICAL;

Publication

Publication is the core mechanism for table change subscription in Tacnode. Use Publication to define a set of table changes, which Subscriptions can then subscribe to for real-time data synchronization and distribution.

Create Publication

Create a Publication

-- Create a Publication for specific tables
CREATE PUBLICATION my_publication FOR TABLE table1, table2;
 
-- Create a Publication for all tables
CREATE PUBLICATION all_tables_pub FOR ALL TABLES;
 
-- Publish changes through the parent table
ALTER PUBLICATION partitioned_table_pub SET (publish_via_partition_root = true);

Create a Publication for specific operation types

-- Publish only INSERT operations
CREATE PUBLICATION insert_only_pub FOR TABLE table1 WITH (publish = 'insert');
 
-- Publish INSERT and UPDATE operations
CREATE PUBLICATION insert_update_pub FOR TABLE table1, table2 WITH (publish = 'insert, update');
 
-- Publish all operations (default)
CREATE PUBLICATION all_ops_pub FOR TABLE table1 WITH (publish = 'insert, update, delete, truncate');

Alter Publication

Add tables to a Publication

ALTER PUBLICATION my_publication ADD TABLE table3, table4;

Remove tables from a Publication

ALTER PUBLICATION my_publication DROP TABLE table1;

Change the operations published

ALTER PUBLICATION my_publication SET (publish = 'insert, update');

Check Publication

List all Publications

SELECT * FROM pg_publication;

View details of a specific Publication

SELECT * FROM pg_publication WHERE pubname = 'my_publication';

Drop Publication

Delete a Publication within the database where it was created.

-- Delete a Publication
DROP PUBLICATION my_publication;
 
-- Check dependencies before deletion
-- List slots using the Publication
SELECT slot_name FROM pg_replication_slots WHERE active_pid IS NOT NULL AND confirmed_flush_lsn IS NOT NULL;

Replication Slot

Create Slot

-- Create a logical replication slot. With noexport_snapshot set to true, snapshot export is not supported and disk usage is minimized.
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput', false, true);

Update Slot

In certain scenarios, you may need to manually advance the replication slot position (LSN):

-- Advance slot to the current WAL position
SELECT pg_replication_slot_advance('my_slot', pg_current_wal_lsn());

Check Slot

-- List all replication slots
SELECT * FROM pg_replication_slots;
 
-- View replication slot lag details
SELECT slot_name, confirmed_flush_lsn, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) FROM pg_replication_slots;

Drop Slot

Slots can be removed from the database where they were created.

It is recommended to check the slot status before dropping.

-- Check if slot is active
SELECT active FROM pg_replication_slots WHERE slot_name = 'my_slot';
 
-- Check for connected consumers
SELECT
    r.pid,
    r.usename,
    r.application_name,
    r.client_addr,
    s.slot_name,
    r.state,
    r.sent_lsn,
    r.flush_lsn,
    s.confirmed_flush_lsn
FROM pg_stat_replication r
JOIN pg_replication_slots s ON r.pid = s.active_pid
WHERE s.slot_name = 'my_slot';
-- Safely drop replication slot
SELECT pg_drop_replication_slot('my_slot');

Apache Flink supports consuming CDC events using the official postgres-cdc connector. It can subscribe to a specified publication and, during checkpoint execution, confirm and update the LSN within the slot. For detailed usage, see Apache Flink Integration.

For more configuration options of the postgres-cdc connector, refer to the Postgres CDC Connector documentation.

Typical Example

Create test tables:

-- Create example tables
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

Create a publication:

-- Create a publication with specified tables
CREATE PUBLICATION flink_cdc_pub FOR TABLE users, orders;

Create a replication slot:

-- Create a dedicated replication slot for Flink
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);
 
-- Verify slot creation
SELECT slot_name, plugin, slot_type, active FROM pg_replication_slots WHERE slot_name = 'flink_cdc_slot';

For Flink CDC job configuration, refer to Apache Flink Integration.

Monitor slot status:

-- Check slot status and lag
SELECT
    slot_name,
    active,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_pretty,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS restart_lag_pretty
FROM pg_replication_slots
WHERE slot_name = 'flink_cdc_slot';

Handle slot stalling. If the slot is found to be inactive (active=false) and not in use:

-- Try to reactivate the slot
SELECT pg_replication_slot_advance('flink_cdc_slot', pg_current_wal_lsn());
 
-- If no longer needed, drop and recreate the slot
SELECT pg_drop_replication_slot('flink_cdc_slot');
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);

When jobs complete or require reconfiguration, follow these steps to clean up resources:

  • Stop the Flink job. Ensure the Flink job has fully stopped and no longer uses the CDC connection.

  • Delete the replication slot:

-- Safely drop the slot
SELECT pg_drop_replication_slot('flink_cdc_slot');
 
-- Verify the slot has been deleted
SELECT * FROM pg_replication_slots WHERE slot_name = 'flink_cdc_slot';
  • Delete the publication:
-- Drop the publication
DROP PUBLICATION flink_cdc_pub;
 
-- Verify the publication has been deleted
SELECT * FROM pg_publication WHERE pubname = 'flink_cdc_pub';

pg_logical functions.

Validate CDC event inside the database using pg_logical functions and the test_decoding plugin.

Example:

postgres=# -- Create a slot called 'regression_slot' using the output plugin 'test_decoding'
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name    |         lsn
-----------------+---------------------
 regression_slot | 1698640579589000000
(1 row)
 
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
    slot_name    |    plugin     | slot_type | database | active |     restart_lsn     | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+---------------------+---------------------
 regression_slot | test_decoding | logical   | postgres | f      | 1698640579588460626 | 0
(1 row)
 
postgres=# -- No changes can be seen yet
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 lsn | xid | data
-----+-----+------
(0 rows)
 
postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
 
postgres=# -- DDL is not replicated, so the only things you will see are transactions, skipping empty transactions
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 lsn | xid | data
-----+-----+------
(0 rows)
 
 
postgres=# BEGIN;
postgres=# INSERT INTO data(data) VALUES('1');
postgres=# INSERT INTO data(data) VALUES('2');
postgres=# COMMIT;
 
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |     xid     |                          data
-------------------+-------------+---------------------------------------------------------
 1792C8F7/7B0A3F65 | -1611778953 | BEGIN
 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:1 data[text]:'1'
 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:2 data[text]:'2'
 1792C8F7/7B0A3F65 | -1611778953 | COMMIT
(4 rows)
 
postgres=# INSERT INTO data(data) VALUES('3');
 
postgres=# -- You can also look at the change stream without consuming the changes.
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |    xid     |                          data
-------------------+------------+---------------------------------------------------------
 1792C909/DFC392DA | 2109858110 | BEGIN
 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
 
postgres=# -- The following call to pg_logical_slot_peek_changes() returns the same changes again.
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |    xid     |                          data
-------------------+------------+---------------------------------------------------------
 1792C909/DFC392DA | 2109858110 | BEGIN
 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
 
postgres=# -- When a slot is no longer needed remember to destroy it to stop consuming server resources
postgres=# SELECT pg_drop_replication_slot('regression_slot');
 pg_drop_replication_slot
-----------------------
 
(1 row)

Streaming Replication Protocol

In addition to using the functions demonstrated in the previous examples to test streaming replication data export, you can adopt a more effective approach by employing a distinct streaming replication protocol.

Typically, PostgreSQL JDBC drivers for various programming languages offer respective encapsulations. Alternatively, you can connect using the standard method and manage the protocol manually. For details regarding the protocol, please see Streaming Replication Protocol

On this page