Change Data Capture (CDC)
Master real-time data synchronization with Tacnode's Change Data Capture capabilities. Learn publications, replication slots, and best practices for streaming data changes.
Change Data Capture (CDC) enables real-time tracking and streaming of data modifications in your Tacnode database. This guide covers essential concepts and practical implementation for robust data synchronization across your systems.
Overview
Change Data Capture is a technique that identifies and captures changes made to data in a database, then delivers those changes in real-time to downstream systems. This enables:
- Real-time data synchronization between systems
- Event-driven architectures based on data changes
- Data warehousing with near-zero latency
- Audit trails and compliance monitoring
CDC Architecture in Tacnode
| Component | Purpose | Key Features |
|---|---|---|
| Publications | Define what data to replicate | Table selection, operation filtering |
| Replication Slots | Track consumer progress | At-least-once delivery, WAL retention |
| Logical Decoding | Convert WAL to readable format | Real-time processing, multiple formats |
| Decoding Plugins | Output format control | test_decoding, pgoutput |
Core Concepts
Publications define the scope of data replication by specifying which tables to monitor and which operations to capture (INSERT, UPDATE, DELETE, TRUNCATE).
Replication Slots are server-side mechanisms that track consumer progress, ensure WAL entries aren’t deleted prematurely, and guarantee at-least-once delivery.
Logical Decoding transforms the internal WAL format into a client-readable stream in real-time.
Decoding Plugins: Tacnode supports test_decoding (SQL-like text for testing) and pgoutput (binary format for production).
Prerequisites and Setup
Configure your database for logical replication:
-- Check current WAL level
SHOW wal_level;
-- Set WAL level to logical (requires connection restart)
ALTER DATABASE your_database SET WAL_LEVEL TO LOGICAL;
For comprehensive change tracking, set replica identity:
-- Set replica identity to FULL for existing tables
ALTER TABLE your_table_name REPLICA IDENTITY FULL;
REPLICA IDENTITY FULL increases WAL size but provides complete change information, essential for handling UPDATE and DELETE operations without primary keys.
Working with Publications
Publications define the data change subscription scope for your CDC setup.
Creating Publications
-- Publication for specific tables
CREATE PUBLICATION user_changes_pub FOR TABLE users, profiles;
-- Publication for all tables in database
CREATE PUBLICATION all_tables_pub FOR ALL TABLES;
-- Monitor only specific operations
CREATE PUBLICATION inserts_only_pub
FOR TABLE orders
WITH (publish = 'insert');
-- Partitioned table handling
CREATE PUBLICATION partitioned_data_pub FOR TABLE sales_data;
ALTER PUBLICATION partitioned_data_pub SET (publish_via_partition_root = true);
Managing Publications
-- Add/remove tables
ALTER PUBLICATION user_changes_pub ADD TABLE user_preferences;
ALTER PUBLICATION user_changes_pub DROP TABLE profiles;
-- Monitor publications
SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete
FROM pg_publication;
-- Drop publication
DROP PUBLICATION user_changes_pub;
Working with Replication Slots
Replication slots ensure reliable change delivery and prevent data loss.
Creating Replication Slots
-- Create logical replication slot with pgoutput plugin
SELECT * FROM pg_create_logical_replication_slot(
'app_cdc_slot', -- slot name
'pgoutput', -- plugin
false, -- temporary slot (false = persistent)
true -- no export snapshot (minimizes disk usage)
);
Monitoring Replication Slots
-- List all replication slots
SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn
FROM pg_replication_slots;
-- Check replication lag
SELECT
slot_name,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_size
FROM pg_replication_slots
WHERE slot_name = 'app_cdc_slot';
Managing Replication Slots
-- Manually advance slot position (use with caution)
SELECT pg_replication_slot_advance('app_cdc_slot', pg_current_wal_lsn());
-- Drop replication slot
SELECT active FROM pg_replication_slots WHERE slot_name = 'app_cdc_slot';
SELECT pg_drop_replication_slot('app_cdc_slot');
Practical Examples
E-commerce Order Tracking Setup
-- Create sample tables
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Set replica identity and create CDC components
ALTER TABLE customers REPLICA IDENTITY FULL;
ALTER TABLE orders REPLICA IDENTITY FULL;
CREATE PUBLICATION ecommerce_cdc_pub
FOR TABLE customers, orders
WITH (publish = 'insert, update, delete');
SELECT * FROM pg_create_logical_replication_slot(
'ecommerce_cdc_slot', 'pgoutput', false, true
);
Testing CDC with pg_logical Functions
-- Create test slot with test_decoding plugin
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'test_decoding');
-- Create test table and make changes
CREATE TABLE cdc_test (
id SERIAL PRIMARY KEY,
data TEXT,
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
BEGIN;
INSERT INTO cdc_test (data) VALUES ('First record');
INSERT INTO cdc_test (data) VALUES ('Second record');
UPDATE cdc_test SET data = 'Modified first record' WHERE id = 1;
COMMIT;
-- View captured changes
SELECT lsn, xid, data
FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Clean up
SELECT pg_drop_replication_slot('test_slot');
Integration with Apache Flink
Apache Flink provides excellent CDC integration through the postgres-cdc connector.
-- Prepare tables for Flink CDC
CREATE TABLE sales_transactions (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
quantity INTEGER NOT NULL,
unit_price DECIMAL(10,2) NOT NULL,
transaction_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Set replica identity and create CDC components
ALTER TABLE sales_transactions REPLICA IDENTITY FULL;
CREATE PUBLICATION flink_sales_pub FOR TABLE sales_transactions;
SELECT * FROM pg_create_logical_replication_slot('flink_sales_slot', 'pgoutput', false, true);
-- Monitor Flink CDC slot health
SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_name = 'flink_sales_slot';
For detailed Flink CDC configuration and examples, refer to the Apache Flink Integration documentation.
Troubleshooting Common Issues
Slot Lag Issues
- Check consumer application health and processing capacity
- Monitor lag with:
SELECT pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) FROM pg_replication_slots WHERE slot_name = 'your_slot'; - Consider advancing slot position if safe:
SELECT pg_replication_slot_advance('slot_name', pg_current_wal_lsn());
Consumer Connection Problems
- Verify network connectivity and authentication
- Check active connections:
SELECT r.application_name, r.client_addr, s.slot_name FROM pg_stat_replication r RIGHT JOIN pg_replication_slots s ON r.pid = s.active_pid;
Missing Change Events
- Verify table is included in publication:
SELECT tablename FROM pg_publication_tables WHERE pubname = 'your_publication'; - Check operation types enabled:
SELECT pubinsert, pubupdate, pubdelete FROM pg_publication WHERE pubname = 'your_publication'; - Ensure replica identity is properly configured
Security Considerations
Access Control
-- Create dedicated CDC user
CREATE USER cdc_user WITH REPLICATION;
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
ALTER USER cdc_user REPLICATION;
Network Security
- Use SSL/TLS for replication connections
- Implement firewall rules for CDC consumers
- Consider VPN for cross-region replication
- Monitor connection attempts and failures
By following these guidelines and best practices, you’ll build reliable CDC solutions that scale with your data and business needs.
For specific integration patterns, refer to: