Apache Flink
Learn how to integrate Apache Flink with Tacnode for real-time stream processing, batch operations, and Change Data Capture (CDC) workflows.
Apache Flink is a powerful framework for stateful computations over data streams. With Tacnode’s PostgreSQL compatibility, you can seamlessly integrate Flink for real-time data processing, batch operations, and Change Data Capture (CDC) workflows.
This guide covers both the DataStream API and Flink SQL approaches, along with best practices for production deployments.
Prerequisites
Environment Requirements
- Flink Version: 1.14+ (1.16+ recommended for enhanced features)
- JDBC Driver: PostgreSQL JDBC driver 42.5+
- Java: JDK 8+ (JDK 11+ recommended)
Maven Dependencies
Add the required dependencies to your Flink project:
<dependencies>
<!-- PostgreSQL JDBC Driver -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.0</version>
</dependency>
<!-- Flink JDBC Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.16.0</version>
</dependency>
<!-- Flink PostgreSQL CDC Connector (for CDC use cases) -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
Data Type Mapping
Understanding type compatibility between Flink SQL and Tacnode ensures smooth data operations:
| Flink SQL Type | Tacnode Type | Notes |
|---|---|---|
| BOOLEAN | BOOLEAN | Direct mapping |
| TINYINT | SMALLINT | Promoted to SMALLINT |
| SMALLINT | SMALLINT | Direct mapping |
| INT | INTEGER | Direct mapping |
| BIGINT | BIGINT | Direct mapping |
| FLOAT | REAL | Direct mapping |
| DOUBLE | DOUBLE PRECISION | Direct mapping |
| DECIMAL(p,s) | NUMERIC(p,s) | Precision preserved |
| VARCHAR(n) | VARCHAR(n) | Length preserved |
| CHAR(n) | CHAR(n) | Length preserved |
| DATE | DATE | Direct mapping |
| TIME | TIME | Direct mapping |
| TIMESTAMP | TIMESTAMP | Direct mapping |
| ARRAY | ARRAY | Complex type support |
| MAP | JSONB | Serialized as JSON |
| ROW | JSONB | Serialized as JSON |
DataStream API Integration
Basic JDBC Sink
Use the DataStream API for fine-grained control over data processing:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
public class TacnodeDataStreamExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure your data source
DataStream<User> userStream = env.addSource(new UserSourceFunction());
// Configure Tacnode connection
JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://your-tacnode-host:5432/your-database")
.withDriverName("org.postgresql.Driver")
.withUsername("your-username")
.withPassword("your-password")
.build();
// Create JDBC Sink with prepared statement
userStream.addSink(JdbcSink.sink(
"INSERT INTO users (id, name, email, created_at) VALUES (?, ?, ?, ?) " +
"ON CONFLICT (id) DO UPDATE SET " +
"name = EXCLUDED.name, email = EXCLUDED.email",
(ps, user) -> {
ps.setLong(1, user.getId());
ps.setString(2, user.getName());
ps.setString(3, user.getEmail());
ps.setTimestamp(4, Timestamp.valueOf(user.getCreatedAt()));
},
connectionOptions
));
env.execute("Tacnode DataStream Job");
}
}
Optimized Batch Writing
For high-throughput scenarios, configure batch parameters:
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
JdbcExecutionOptions executionOptions = new JdbcExecutionOptions.Builder()
.withBatchSize(1000) // Records per batch
.withBatchIntervalMs(200) // Maximum wait time
.withMaxRetries(3) // Retry attempts
.build();
userStream.addSink(JdbcSink.sink(
"INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
(ps, user) -> {
ps.setLong(1, user.getId());
ps.setString(2, user.getName());
ps.setString(3, user.getEmail());
},
executionOptions,
connectionOptions
));
Flink SQL Integration
Catalog Registration
Register Tacnode as a catalog for seamless table access:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
public class TacnodeSQLExample {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// Register Tacnode catalog
tableEnv.executeSql(
"CREATE CATALOG tacnode_catalog WITH (" +
" 'type' = 'jdbc'," +
" 'default-database' = 'your_database'," +
" 'username' = 'your_username'," +
" 'password' = 'your_password'," +
" 'base-url' = 'jdbc:postgresql://your-tacnode-host:5432'" +
")"
);
// Set as default catalog
tableEnv.useCatalog("tacnode_catalog");
// Now you can directly reference Tacnode tables
tableEnv.executeSql("SELECT * FROM users LIMIT 10").print();
}
}
Table Definitions and Data Insertion
Define source and sink tables using DDL:
-- Define a Kafka source table
CREATE TABLE user_events (
user_id BIGINT,
event_type STRING,
event_time TIMESTAMP(3),
properties MAP<STRING, STRING>,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink_consumer',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
-- Define Tacnode sink table
CREATE TABLE user_analytics (
user_id BIGINT,
event_count BIGINT,
last_event_time TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://your-tacnode-host:5432/analytics',
'table-name' = 'user_analytics',
'username' = 'your_username',
'password' = 'your_password',
'driver' = 'org.postgresql.Driver',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '1s',
'sink.max-retries' = '3'
);
-- Perform real-time aggregation and insert
INSERT INTO user_analytics
SELECT
user_id,
COUNT(*) as event_count,
MAX(event_time) as last_event_time
FROM user_events
GROUP BY user_id;
Configuration Parameters
Optimize performance with these connector parameters:
| Parameter | Description | Recommended Value |
|---|---|---|
sink.buffer-flush.max-rows | Maximum rows per batch | 1000-5000 |
sink.buffer-flush.interval | Flush interval | 1s-5s |
sink.max-retries | Maximum retry attempts | 3 |
sink.parallelism | Sink operator parallelism | Match node count |
connection.max-retry-timeout | Connection timeout | 30s |
Change Data Capture (CDC)
Prerequisites for CDC
Before implementing CDC, ensure your Tacnode setup supports logical replication:
- Enable logical replication (
wal_level = logical) - Network connectivity between Flink and Tacnode
- User permissions for replication and slot management
Refer to the Change Data Capture guide for detailed setup instructions.
Publication and Slot Management
Create Publications for efficient event filtering:
-- Create a publication for specific tables
CREATE PUBLICATION user_changes FOR TABLE users, user_profiles;
-- Or create for all tables (less efficient)
CREATE PUBLICATION all_changes FOR ALL TABLES;
Create Replication Slots for tracking consumption progress:
-- Create a slot with no export snapshot for efficiency
SELECT pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);
-- View existing slots
SELECT * FROM pg_replication_slots;
Clean up unused slots to prevent storage bloat:
-- Drop unused slots
SELECT pg_drop_replication_slot('flink_cdc_slot');
CDC Source Implementation
Here’s a complete example using the postgres-cdc connector:
-- Source table with CDC
CREATE TABLE user_changes_source (
user_id BIGINT,
username STRING,
email STRING,
status STRING,
created_at TIMESTAMP(6),
updated_at TIMESTAMP(6),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'your-tacnode-host',
'port' = '5432',
'username' = 'your_username',
'password' = 'your_password',
'database-name' = 'your_database',
'schema-name' = 'public',
'table-name' = 'users',
-- Use pre-created slot for better resource management
'slot.name' = 'flink_cdc_slot',
-- Required decoding plugin
'decoding.plugin.name' = 'pgoutput',
-- Snapshot mode: 'never' for real-time only, 'initial' for full + incremental
'debezium.snapshot.mode' = 'never',
-- Use specific publication (optional but recommended)
'debezium.publication.name' = 'user_changes'
);
-- Sink table for processed changes
CREATE TABLE user_activity_log (
user_id BIGINT,
change_type STRING,
old_values STRING,
new_values STRING,
change_timestamp TIMESTAMP(6)
) WITH (
'connector' = 'kafka',
'topic' = 'user_activity_log',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'key.fields' = 'user_id',
'value.format' = 'debezium-json',
'value.debezium-json.schema-include' = 'false'
);
-- Process CDC events
INSERT INTO user_activity_log
SELECT
user_id,
'UPDATE' as change_type,
'old_data' as old_values, -- You can access before/after values
'new_data' as new_values,
CURRENT_TIMESTAMP as change_timestamp
FROM user_changes_source;
Monitoring and Performance
Key Metrics to Monitor
- Throughput: Records processed per second
- Latency: End-to-end processing time
- Backpressure: Operator processing capacity
- Checkpoint Success Rate: State consistency indicators
Logging Configuration
Configure detailed logging for troubleshooting:
# log4j.properties
rootLogger.level = INFO
# Flink JDBC connector logging
logger.jdbc.name = org.apache.flink.connector.jdbc
logger.jdbc.level = DEBUG
# PostgreSQL driver logging
logger.postgres.name = org.postgresql
logger.postgres.level = INFO
# CDC connector logging
logger.cdc.name = com.ververica.cdc
logger.cdc.level = DEBUG
Troubleshooting
Common Type Mapping Issues
Problem: Schema mismatch errors during writes.
Solution: Explicitly define types in DDL:
CREATE TABLE orders_sink (
order_id BIGINT,
customer_name VARCHAR(255), -- Specify length
order_total DECIMAL(10,2), -- Specify precision
order_date TIMESTAMP(3), -- Specify timestamp precision
metadata_json STRING -- Use STRING for JSON data
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://your-tacnode-host:5432/orders',
'table-name' = 'orders'
);
Performance Optimization
Problem: Low throughput or high latency.
Solutions:
-
Increase parallelism:
SET parallelism.default = 8; -
Optimize batch parameters:
'sink.buffer-flush.max-rows' = '5000', 'sink.buffer-flush.interval' = '2s' -
Use upsert operations for deduplication:
INSERT INTO target_table SELECT * FROM source_table -- Use UPSERT logic in your INSERT statements
CDC-Specific Issues
Problem: Replication slot lag or storage growth.
Solutions:
-
Monitor slot lag:
SELECT slot_name, confirmed_flush_lsn, restart_lsn FROM pg_replication_slots; -
Ensure regular checkpointing:
env.enableCheckpointing(60000); // Checkpoint every minute -
Clean up unused slots:
SELECT pg_drop_replication_slot('unused_slot_name');
Best Practices
Production Deployment
- Resource Planning: Size Flink cluster based on expected throughput
- Fault Tolerance: Enable checkpointing with appropriate intervals
- Monitoring: Implement comprehensive metrics collection
- Security: Use SSL connections and credential management
- Testing: Validate data quality and performance under load
CDC Best Practices
- Pre-create slots with
noexport_snapshotfor efficiency - Use specific publications to reduce network overhead
- Monitor slot lag to prevent storage issues
- Plan for schema evolution in your processing logic
This integration enables powerful real-time analytics and data processing workflows while leveraging Tacnode’s distributed architecture and PostgreSQL compatibility.