Change Data Capture (CDC)

Change Data Capture (CDC) enables real-time tracking and streaming of data modifications in your Tacnode database. This guide covers essential concepts and practical implementation for robust data synchronization across your systems.

Overview

Change Data Capture is a technique that identifies and captures changes made to data in a database, then delivers those changes in real-time to downstream systems. This enables:

  • Real-time data synchronization between systems
  • Event-driven architectures based on data changes
  • Data warehousing with near-zero latency
  • Audit trails and compliance monitoring

CDC Architecture in Tacnode

ComponentPurposeKey Features
PublicationsDefine what data to replicateTable selection, operation filtering
Replication SlotsTrack consumer progressAt-least-once delivery, WAL retention
Logical DecodingConvert WAL to readable formatReal-time processing, multiple formats
Decoding PluginsOutput format controltest_decoding, pgoutput

Core Concepts

Publications define the scope of data replication by specifying which tables to monitor and which operations to capture (INSERT, UPDATE, DELETE, TRUNCATE).

Replication Slots are server-side mechanisms that track consumer progress, ensure WAL entries aren't deleted prematurely, and guarantee at-least-once delivery.

Logical Decoding transforms the internal WAL format into a client-readable stream in real-time.

Decoding Plugins: Tacnode supports test_decoding (SQL-like text for testing) and pgoutput (binary format for production).

Prerequisites and Setup

Configure your database for logical replication:

-- Check current WAL level
SHOW wal_level;
 
-- Set WAL level to logical (requires connection restart)
ALTER DATABASE your_database SET WAL_LEVEL TO LOGICAL;

For comprehensive change tracking, set replica identity:

-- Set replica identity to FULL for existing tables
ALTER TABLE your_table_name REPLICA IDENTITY FULL;

Working with Publications

Publications define the data change subscription scope for your CDC setup.

Creating Publications

-- Publication for specific tables
CREATE PUBLICATION user_changes_pub FOR TABLE users, profiles;
 
-- Publication for all tables in database
CREATE PUBLICATION all_tables_pub FOR ALL TABLES;
 
-- Monitor only specific operations
CREATE PUBLICATION inserts_only_pub 
FOR TABLE orders 
WITH (publish = 'insert');
 
-- Partitioned table handling
CREATE PUBLICATION partitioned_data_pub FOR TABLE sales_data;
ALTER PUBLICATION partitioned_data_pub SET (publish_via_partition_root = true);

Managing Publications

-- Add/remove tables
ALTER PUBLICATION user_changes_pub ADD TABLE user_preferences;
ALTER PUBLICATION user_changes_pub DROP TABLE profiles;
 
-- Monitor publications
SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete
FROM pg_publication;
 
-- Drop publication
DROP PUBLICATION user_changes_pub;

Working with Replication Slots

Replication slots ensure reliable change delivery and prevent data loss.

Creating Replication Slots

-- Create logical replication slot with pgoutput plugin
SELECT * FROM pg_create_logical_replication_slot(
    'app_cdc_slot',      -- slot name
    'pgoutput',          -- plugin
    false,               -- temporary slot (false = persistent)
    true                 -- no export snapshot (minimizes disk usage)
);

Monitoring Replication Slots

-- List all replication slots
SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
 
-- Check replication lag
SELECT 
    slot_name,
    active,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_size
FROM pg_replication_slots
WHERE slot_name = 'app_cdc_slot';

Managing Replication Slots

-- Manually advance slot position (use with caution)
SELECT pg_replication_slot_advance('app_cdc_slot', pg_current_wal_lsn());
 
-- Drop replication slot
SELECT active FROM pg_replication_slots WHERE slot_name = 'app_cdc_slot';
SELECT pg_drop_replication_slot('app_cdc_slot');

Practical Examples

E-commerce Order Tracking Setup

-- Create sample tables
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
-- Set replica identity and create CDC components
ALTER TABLE customers REPLICA IDENTITY FULL;
ALTER TABLE orders REPLICA IDENTITY FULL;
 
CREATE PUBLICATION ecommerce_cdc_pub 
FOR TABLE customers, orders
WITH (publish = 'insert, update, delete');
 
SELECT * FROM pg_create_logical_replication_slot(
    'ecommerce_cdc_slot', 'pgoutput', false, true
);

Testing CDC with pg_logical Functions

-- Create test slot with test_decoding plugin
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
 
-- Create test table and make changes
CREATE TABLE cdc_test (
    id SERIAL PRIMARY KEY,
    data TEXT,
    modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
BEGIN;
INSERT INTO cdc_test (data) VALUES ('First record');
INSERT INTO cdc_test (data) VALUES ('Second record');
UPDATE cdc_test SET data = 'Modified first record' WHERE id = 1;
COMMIT;
 
-- View captured changes
SELECT lsn, xid, data
FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
-- Clean up
SELECT pg_drop_replication_slot('test_slot');

Apache Flink provides excellent CDC integration through the postgres-cdc connector.

-- Prepare tables for Flink CDC
CREATE TABLE sales_transactions (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    quantity INTEGER NOT NULL,
    unit_price DECIMAL(10,2) NOT NULL,
    transaction_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
-- Set replica identity and create CDC components
ALTER TABLE sales_transactions REPLICA IDENTITY FULL;
 
CREATE PUBLICATION flink_sales_pub FOR TABLE sales_transactions;
 
SELECT * FROM pg_create_logical_replication_slot('flink_sales_slot', 'pgoutput', false, true);
 
-- Monitor Flink CDC slot health
SELECT 
    slot_name,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_name = 'flink_sales_slot';

Troubleshooting Common Issues

Slot Lag Issues

  • Check consumer application health and processing capacity
  • Monitor lag with: SELECT pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) FROM pg_replication_slots WHERE slot_name = 'your_slot';
  • Consider advancing slot position if safe: SELECT pg_replication_slot_advance('slot_name', pg_current_wal_lsn());

Consumer Connection Problems

  • Verify network connectivity and authentication
  • Check active connections: SELECT r.application_name, r.client_addr, s.slot_name FROM pg_stat_replication r RIGHT JOIN pg_replication_slots s ON r.pid = s.active_pid;

Missing Change Events

  • Verify table is included in publication: SELECT tablename FROM pg_publication_tables WHERE pubname = 'your_publication';
  • Check operation types enabled: SELECT pubinsert, pubupdate, pubdelete FROM pg_publication WHERE pubname = 'your_publication';
  • Ensure replica identity is properly configured

Security Considerations

Access Control

-- Create dedicated CDC user
CREATE USER cdc_user WITH REPLICATION;
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
ALTER USER cdc_user REPLICATION;

Network Security

  • Use SSL/TLS for replication connections
  • Implement firewall rules for CDC consumers
  • Consider VPN for cross-region replication
  • Monitor connection attempts and failures

By following these guidelines and best practices, you'll build reliable CDC solutions that scale with your data and business needs.

For specific integration patterns, refer to:

On this page