Tacnode provides a logical streaming replication method to transmit executed data write and delete operations to external consumers in a streaming format via SQL. This feature can be used for various purposes, including Change Data Capture (CDC) and data auditing.
There are two methods to use the streaming replication protocol:
Use pg_logical-related functions to retrieve data, typically for testing limited data volumes.
Access comprehensive data through the complete Logical Replication Protocol.
To enable the CDC feature at the database level, set WAL_LEVEL to logical (higher than the default replica level). Changing WAL_LEVEL requires re-establishing the connection for the change to take effect.
ALTER DATABASE <db_name> SET WAL_LEVEL TO LOGICAL;
postgres=# -- Create a slot called 'regression_slot' using the output plugin 'test_decoding'postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); slot_name | lsn-----------------+--------------------- regression_slot | 1698640579589000000(1 row)postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn-----------------+---------------+-----------+----------+--------+---------------------+--------------------- regression_slot | test_decoding | logical | postgres | f | 1698640579588460626 | 0(1 row)postgres=# -- No changes can be seen yetpostgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); lsn | xid | data-----+-----+------(0 rows)postgres=# CREATE TABLE data(id serial primary key, data text);CREATE TABLEpostgres=# -- DDL is not replicated, so the only things you will see are transactions, skipping empty transactionspostgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); lsn | xid | data-----+-----+------(0 rows)postgres=# BEGIN;postgres=# INSERT INTO data(data) VALUES('1');postgres=# INSERT INTO data(data) VALUES('2');postgres=# COMMIT;postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); lsn | xid | data-------------------+-------------+--------------------------------------------------------- 1792C8F7/7B0A3F65 | -1611778953 | BEGIN 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:1 data[text]:'1' 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:2 data[text]:'2' 1792C8F7/7B0A3F65 | -1611778953 | COMMIT(4 rows)postgres=# INSERT INTO data(data) VALUES('3');postgres=# -- You can also look at the change stream without consuming the changes.postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); lsn | xid | data-------------------+------------+--------------------------------------------------------- 1792C909/DFC392DA | 2109858110 | BEGIN 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3' 1792C909/DFC392DA | 2109858110 | COMMIT(3 rows)postgres=# -- The following call to pg_logical_slot_peek_changes() returns the same changes again.postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); lsn | xid | data-------------------+------------+--------------------------------------------------------- 1792C909/DFC392DA | 2109858110 | BEGIN 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3' 1792C909/DFC392DA | 2109858110 | COMMIT(3 rows)postgres=# -- When a slot is no longer needed remember to destroy it to stop consuming server resourcespostgres=# SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot-----------------------(1 row)
Several concepts are outlined below:
Logical Decoding
Data written in real-time to the database is stored in the internal format of the Write Ahead Log (WAL), which is not user-friendly. Logical decoding transforms WAL into a format that is understandable to clients. For instance, test_decoding converts real-time data into an SQL-like format.
Replication Slot
Each replication slot serves as a real-time data stream subscribed to by a database. A single database can create multiple replication slots, each independently maintaining its consumption progress.
Decoding Plugins
The output plugin defines the output format of a data stream. Currently, Tacnode supports two plugins: test_decoding and pgoutput.
The output format for test_decoding resembles SQL text.
In contrast, pgoutput is a binary format adhering to the PostgreSQL logical replication protocol, offering superior performance but requiring the client to handle decoding.
In addition to using the functions demonstrated in the previous examples to test streaming replication data export, you can adopt a more effective approach by employing a distinct streaming replication protocol.
Typically, PostgreSQL JDBC drivers for various programming languages offer respective encapsulations. Alternatively, you can connect using the standard method and manage the protocol manually. For details regarding the protocol, please see Streaming Replication Protocol