Tacnode offers comprehensive logical replication capabilities, supporting data change capture via Change Data Capture (CDC) mechanisms. This documentation focuses on how to efficiently capture data changes using the core concepts of publications and replication slots.
Defines the tables and change types (INSERT, UPDATE, DELETE) to be replicated from the database. Supports creating publications for specific tables or for all tables.
Replication Slot
Server-side mechanism that tracks the consumer’s replication progress, ensuring that Write-Ahead Log (WAL) entries are not deleted before consumers acknowledge receipt of changes. Guarantees at-least-once delivery for consumers.
Other Concepts
Logical Decoding
Converts database Write-Ahead Log (WAL), typically stored in an internal format, into a client-readable format in real time as data is written to the database.
Decoding Plugins
Determine the output format of the data stream. Tacnode supports two output plugins: test_decoding and pgoutput. test_decoding outputs a SQL-like textual format intended for testing and validation. pgoutput provides a more performant binary format that complies with the PostgreSQL logical replication protocol; decoding is required on the client side. Flink CDC uses the pgoutput format.
To enable the CDC feature at the database level, set WAL_LEVEL to logical (higher than the default replica level). Changing WAL_LEVEL requires re-establishing the connection for the change to take effect.
-- check current settingSHOW wal_level;ALTER DATABASE <db_name> SET WAL_LEVEL TO LOGICAL;
Publication is the core mechanism for table change subscription in Tacnode. Use Publication to define a set of table changes, which Subscriptions can then subscribe to for real-time data synchronization and distribution.
-- Create a Publication for specific tablesCREATE PUBLICATION my_publication FOR TABLE table1, table2;-- Create a Publication for all tablesCREATE PUBLICATION all_tables_pub FOR ALL TABLES;-- Publish changes through the parent tableALTER PUBLICATION partitioned_table_pub SET (publish_via_partition_root = true);
Create a Publication for specific operation types
-- Publish only INSERT operationsCREATE PUBLICATION insert_only_pub FOR TABLE table1 WITH (publish = 'insert');-- Publish INSERT and UPDATE operationsCREATE PUBLICATION insert_update_pub FOR TABLE table1, table2 WITH (publish = 'insert, update');-- Publish all operations (default)CREATE PUBLICATION all_ops_pub FOR TABLE table1 WITH (publish = 'insert, update, delete, truncate');
Delete a Publication within the database where it was created.
-- Delete a PublicationDROP PUBLICATION my_publication;-- Check dependencies before deletion-- List slots using the PublicationSELECT slot_name FROM pg_replication_slots WHERE active_pid IS NOT NULL AND confirmed_flush_lsn IS NOT NULL;
-- Create a logical replication slot. With noexport_snapshot set to true, snapshot export is not supported and disk usage is minimized.SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput', false, true);
-- List all replication slotsSELECT * FROM pg_replication_slots;-- View replication slot lag detailsSELECT slot_name, confirmed_flush_lsn, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) FROM pg_replication_slots;
Slots can be removed from the database where they were created.
It is recommended to check the slot status before dropping.
-- Check if slot is activeSELECT active FROM pg_replication_slots WHERE slot_name = 'my_slot';-- Check for connected consumersSELECT r.pid, r.usename, r.application_name, r.client_addr, s.slot_name, r.state, r.sent_lsn, r.flush_lsn, s.confirmed_flush_lsnFROM pg_stat_replication rJOIN pg_replication_slots s ON r.pid = s.active_pidWHERE s.slot_name = 'my_slot';
-- Safely drop replication slotSELECT pg_drop_replication_slot('my_slot');
Apache Flink supports consuming CDC events using the official postgres-cdc connector. It can subscribe to a specified publication and, during checkpoint execution, confirm and update the LSN within the slot. For detailed usage, see Apache Flink Integration.
For more configuration options of the postgres-cdc connector, refer to the Postgres CDC Connector documentation.
-- Create example tablesCREATE TABLE users ( id SERIAL PRIMARY KEY, username VARCHAR(50) NOT NULL, email VARCHAR(100), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);CREATE TABLE orders ( id SERIAL PRIMARY KEY, user_id INTEGER REFERENCES users(id), amount DECIMAL(10,2), status VARCHAR(20), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
Create a publication:
-- Create a publication with specified tablesCREATE PUBLICATION flink_cdc_pub FOR TABLE users, orders;
Create a replication slot:
-- Create a dedicated replication slot for FlinkSELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);-- Verify slot creationSELECT slot_name, plugin, slot_type, active FROM pg_replication_slots WHERE slot_name = 'flink_cdc_slot';
-- Check slot status and lagSELECT slot_name, active, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_pretty, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS restart_lag_prettyFROM pg_replication_slotsWHERE slot_name = 'flink_cdc_slot';
Handle slot stalling. If the slot is found to be inactive (active=false) and not in use:
-- Try to reactivate the slotSELECT pg_replication_slot_advance('flink_cdc_slot', pg_current_wal_lsn());-- If no longer needed, drop and recreate the slotSELECT pg_drop_replication_slot('flink_cdc_slot');SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);
When jobs complete or require reconfiguration, follow these steps to clean up resources:
Stop the Flink job. Ensure the Flink job has fully stopped and no longer uses the CDC connection.
Delete the replication slot:
-- Safely drop the slotSELECT pg_drop_replication_slot('flink_cdc_slot');-- Verify the slot has been deletedSELECT * FROM pg_replication_slots WHERE slot_name = 'flink_cdc_slot';
Delete the publication:
-- Drop the publicationDROP PUBLICATION flink_cdc_pub;-- Verify the publication has been deletedSELECT * FROM pg_publication WHERE pubname = 'flink_cdc_pub';
In addition to using the functions demonstrated in the previous examples to test streaming replication data export, you can adopt a more effective approach by employing a distinct streaming replication protocol.
Typically, PostgreSQL JDBC drivers for various programming languages offer respective encapsulations. Alternatively, you can connect using the standard method and manage the protocol manually. For details regarding the protocol, please see Streaming Replication Protocol