tacnode

Change Data Capture (CDC)

Tacnode provides a logical streaming replication method to transmit executed data write and delete operations to external consumers in a streaming format via SQL. This feature can be used for various purposes, including Change Data Capture (CDC) and data auditing.

There are two methods to use the streaming replication protocol:

  • Use pg_logical-related functions to retrieve data, typically for testing limited data volumes.
  • Access comprehensive data through the complete Logical Replication Protocol.

Prerequisites

To enable the CDC feature at the database level, set WAL_LEVEL to logical (higher than the default replica level). Changing WAL_LEVEL requires re-establishing the connection for the change to take effect.

ALTER DATABASE <db_name> SET WAL_LEVEL TO LOGICAL;

Get data changes through pg_logical

Example:

postgres=# -- Create a slot called 'regression_slot' using the output plugin 'test_decoding'
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name    |         lsn
-----------------+---------------------
 regression_slot | 1698640579589000000
(1 row)
 
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
    slot_name    |    plugin     | slot_type | database | active |     restart_lsn     | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+---------------------+---------------------
 regression_slot | test_decoding | logical   | postgres | f      | 1698640579588460626 | 0
(1 row)
 
postgres=# -- No changes can be seen yet
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 lsn | xid | data
-----+-----+------
(0 rows)
 
postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
 
postgres=# -- DDL is not replicated, so the only things you will see are transactions, skipping empty transactions
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 lsn | xid | data
-----+-----+------
(0 rows)
 
 
postgres=# BEGIN;
postgres=# INSERT INTO data(data) VALUES('1');
postgres=# INSERT INTO data(data) VALUES('2');
postgres=# COMMIT;
 
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |     xid     |                          data
-------------------+-------------+---------------------------------------------------------
 1792C8F7/7B0A3F65 | -1611778953 | BEGIN
 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:1 data[text]:'1'
 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:2 data[text]:'2'
 1792C8F7/7B0A3F65 | -1611778953 | COMMIT
(4 rows)
 
postgres=# INSERT INTO data(data) VALUES('3');
 
postgres=# -- You can also look at the change stream without consuming the changes.
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |    xid     |                          data
-------------------+------------+---------------------------------------------------------
 1792C909/DFC392DA | 2109858110 | BEGIN
 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
 
postgres=# -- The following call to pg_logical_slot_peek_changes() returns the same changes again.
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |    xid     |                          data
-------------------+------------+---------------------------------------------------------
 1792C909/DFC392DA | 2109858110 | BEGIN
 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
 
postgres=# -- When a slot is no longer needed remember to destroy it to stop consuming server resources
postgres=# SELECT pg_drop_replication_slot('regression_slot');
 pg_drop_replication_slot
-----------------------
 
(1 row)

Several concepts are outlined below:

  • Logical Decoding

Data written in real-time to the database is stored in the internal format of the Write Ahead Log (WAL), which is not user-friendly. Logical decoding transforms WAL into a format that is understandable to clients. For instance, test_decoding converts real-time data into an SQL-like format.

  • Replication Slot

Each replication slot serves as a real-time data stream subscribed to by a database. A single database can create multiple replication slots, each independently maintaining its consumption progress.

  • Decoding Plugins

The output plugin defines the output format of a data stream. Currently, Tacnode supports two plugins: test_decoding and pgoutput.

The output format for test_decoding resembles SQL text.

In contrast, pgoutput is a binary format adhering to the PostgreSQL logical replication protocol, offering superior performance but requiring the client to handle decoding.

For further details on the encoding protocol, refer to Logical Replication Message Formats.

Streaming Replication Protocol

In addition to using the functions demonstrated in the previous examples to test streaming replication data export, you can adopt a more effective approach by employing a distinct streaming replication protocol.

Typically, PostgreSQL JDBC drivers for various programming languages offer respective encapsulations. Alternatively, you can connect using the standard method and manage the protocol manually. For details regarding the protocol, please see Streaming Replication Protocol

Flink can access Tacnode's CDC data via the postgres-cdc connector. It's essential to create slots in the database in advance.

In this example, the Source table is first accessed using CDC, followed by writing to the Sink table.

CREATE TEMPORARY TABLE jdbc_source (
        ins_time timestamp(6) NULL,
        store_id string NULL,
        store_code string NOT NULL,
        store_name string NULL,
        time_section_no bigint NULL,
        stat_date string NOT NULL,
        PRIMARY KEY (store_code,stat_date) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'host-url',
  'port' = '5432',
  'username' = 'xxxx',
  'password' = 'xxxx',
  'database-name' = 'dbname',
  'schema-name' = 'schemaname',
  'table-name' = 'tablename',
  'slot.name' = 'slotname',
  'scan.startup.mode' = 'latest-offset',
  'scan.incremental.snapshot.enabled' = 'true',
  'decoding.plugin.name' = 'pgoutput'
);
 
CREATE TEMPORARY TABLE jdbc_sink(
        store_id string NULL,
        cnt bigint NOT NULL,
        PRIMARY KEY (store_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/dbname',
  'table-name' = 'schema.table_name',
  'username' = 'xxxx',
  'password' = 'xxxx'
);
 
INSERT INTO jdbc_sink
SELECT store_id, count(*) as cnt
FROM jdbc_source GROUP BY store_id;

On this page