Apache Flink

Apache Flink is a powerful framework for stateful computations over data streams. With Tacnode's PostgreSQL compatibility, you can seamlessly integrate Flink for real-time data processing, batch operations, and Change Data Capture (CDC) workflows.

This guide covers both the DataStream API and Flink SQL approaches, along with best practices for production deployments.

Prerequisites

Environment Requirements

  • Flink Version: 1.14+ (1.16+ recommended for enhanced features)
  • JDBC Driver: PostgreSQL JDBC driver 42.5+
  • Java: JDK 8+ (JDK 11+ recommended)

Maven Dependencies

Add the required dependencies to your Flink project:

<dependencies>
    <!-- PostgreSQL JDBC Driver -->
    <dependency>
        <groupId>org.postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>42.5.0</version>
    </dependency>
    
    <!-- Flink JDBC Connector -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>1.16.0</version>
    </dependency>
    
    <!-- Flink PostgreSQL CDC Connector (for CDC use cases) -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-postgres-cdc</artifactId>
        <version>2.4.0</version>
    </dependency>
</dependencies>

Data Type Mapping

Understanding type compatibility between Flink SQL and Tacnode ensures smooth data operations:

Flink SQL TypeTacnode TypeNotes
BOOLEANBOOLEANDirect mapping
TINYINTSMALLINTPromoted to SMALLINT
SMALLINTSMALLINTDirect mapping
INTINTEGERDirect mapping
BIGINTBIGINTDirect mapping
FLOATREALDirect mapping
DOUBLEDOUBLE PRECISIONDirect mapping
DECIMAL(p,s)NUMERIC(p,s)Precision preserved
VARCHAR(n)VARCHAR(n)Length preserved
CHAR(n)CHAR(n)Length preserved
DATEDATEDirect mapping
TIMETIMEDirect mapping
TIMESTAMPTIMESTAMPDirect mapping
ARRAYARRAYComplex type support
MAPJSONBSerialized as JSON
ROWJSONBSerialized as JSON

DataStream API Integration

Basic JDBC Sink

Use the DataStream API for fine-grained control over data processing:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
 
public class TacnodeDataStreamExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Configure your data source
        DataStream<User> userStream = env.addSource(new UserSourceFunction());
        
        // Configure Tacnode connection
        JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
            .withUrl("jdbc:postgresql://your-tacnode-host:5432/your-database")
            .withDriverName("org.postgresql.Driver")
            .withUsername("your-username")
            .withPassword("your-password")
            .build();
        
        // Create JDBC Sink with prepared statement
        userStream.addSink(JdbcSink.sink(
            "INSERT INTO users (id, name, email, created_at) VALUES (?, ?, ?, ?) " +
            "ON CONFLICT (id) DO UPDATE SET " +
            "name = EXCLUDED.name, email = EXCLUDED.email",
            (ps, user) -> {
                ps.setLong(1, user.getId());
                ps.setString(2, user.getName());
                ps.setString(3, user.getEmail());
                ps.setTimestamp(4, Timestamp.valueOf(user.getCreatedAt()));
            },
            connectionOptions
        ));
        
        env.execute("Tacnode DataStream Job");
    }
}

Optimized Batch Writing

For high-throughput scenarios, configure batch parameters:

import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 
JdbcExecutionOptions executionOptions = new JdbcExecutionOptions.Builder()
    .withBatchSize(1000)                    // Records per batch
    .withBatchIntervalMs(200)               // Maximum wait time
    .withMaxRetries(3)                      // Retry attempts
    .build();
 
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, email) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setLong(1, user.getId());
        ps.setString(2, user.getName());
        ps.setString(3, user.getEmail());
    },
    executionOptions,
    connectionOptions
));

Catalog Registration

Register Tacnode as a catalog for seamless table access:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
 
public class TacnodeSQLExample {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .build();
            
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        
        // Register Tacnode catalog
        tableEnv.executeSql(
            "CREATE CATALOG tacnode_catalog WITH (" +
            "  'type' = 'jdbc'," +
            "  'default-database' = 'your_database'," +
            "  'username' = 'your_username'," +
            "  'password' = 'your_password'," +
            "  'base-url' = 'jdbc:postgresql://your-tacnode-host:5432'" +
            ")"
        );
        
        // Set as default catalog
        tableEnv.useCatalog("tacnode_catalog");
        
        // Now you can directly reference Tacnode tables
        tableEnv.executeSql("SELECT * FROM users LIMIT 10").print();
    }
}

Table Definitions and Data Insertion

Define source and sink tables using DDL:

-- Define a Kafka source table
CREATE TABLE user_events (
    user_id BIGINT,
    event_type STRING,
    event_time TIMESTAMP(3),
    properties MAP<STRING, STRING>,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_events',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'flink_consumer',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);
 
-- Define Tacnode sink table
CREATE TABLE user_analytics (
    user_id BIGINT,
    event_count BIGINT,
    last_event_time TIMESTAMP(3),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://your-tacnode-host:5432/analytics',
    'table-name' = 'user_analytics',
    'username' = 'your_username',
    'password' = 'your_password',
    'driver' = 'org.postgresql.Driver',
    'sink.buffer-flush.max-rows' = '1000',
    'sink.buffer-flush.interval' = '1s',
    'sink.max-retries' = '3'
);
 
-- Perform real-time aggregation and insert
INSERT INTO user_analytics
SELECT 
    user_id,
    COUNT(*) as event_count,
    MAX(event_time) as last_event_time
FROM user_events
GROUP BY user_id;

Configuration Parameters

Optimize performance with these connector parameters:

ParameterDescriptionRecommended Value
sink.buffer-flush.max-rowsMaximum rows per batch1000-5000
sink.buffer-flush.intervalFlush interval1s-5s
sink.max-retriesMaximum retry attempts3
sink.parallelismSink operator parallelismMatch node count
connection.max-retry-timeoutConnection timeout30s

Change Data Capture (CDC)

Prerequisites for CDC

Before implementing CDC, ensure your Tacnode setup supports logical replication:

  1. Enable logical replication (wal_level = logical)
  2. Network connectivity between Flink and Tacnode
  3. User permissions for replication and slot management

Refer to the Change Data Capture guide for detailed setup instructions.

Publication and Slot Management

Create Publications for efficient event filtering:

-- Create a publication for specific tables
CREATE PUBLICATION user_changes FOR TABLE users, user_profiles;
 
-- Or create for all tables (less efficient)
CREATE PUBLICATION all_changes FOR ALL TABLES;

Create Replication Slots for tracking consumption progress:

-- Create a slot with no export snapshot for efficiency
SELECT pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);
 
-- View existing slots
SELECT * FROM pg_replication_slots;

Clean up unused slots to prevent storage bloat:

-- Drop unused slots
SELECT pg_drop_replication_slot('flink_cdc_slot');

CDC Source Implementation

Here's a complete example using the postgres-cdc connector:

-- Source table with CDC
CREATE TABLE user_changes_source (
    user_id BIGINT,
    username STRING,
    email STRING,
    status STRING,
    created_at TIMESTAMP(6),
    updated_at TIMESTAMP(6),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'your-tacnode-host',
    'port' = '5432',
    'username' = 'your_username',
    'password' = 'your_password',
    'database-name' = 'your_database',
    'schema-name' = 'public',
    'table-name' = 'users',
    
    -- Use pre-created slot for better resource management
    'slot.name' = 'flink_cdc_slot',
    
    -- Required decoding plugin
    'decoding.plugin.name' = 'pgoutput',
    
    -- Snapshot mode: 'never' for real-time only, 'initial' for full + incremental
    'debezium.snapshot.mode' = 'never',
    
    -- Use specific publication (optional but recommended)
    'debezium.publication.name' = 'user_changes'
);
 
-- Sink table for processed changes
CREATE TABLE user_activity_log (
    user_id BIGINT,
    change_type STRING,
    old_values STRING,
    new_values STRING,
    change_timestamp TIMESTAMP(6)
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_activity_log',
    'properties.bootstrap.servers' = 'localhost:9092',
    'key.format' = 'json',
    'key.fields' = 'user_id',
    'value.format' = 'debezium-json',
    'value.debezium-json.schema-include' = 'false'
);
 
-- Process CDC events
INSERT INTO user_activity_log
SELECT 
    user_id,
    'UPDATE' as change_type,
    'old_data' as old_values,  -- You can access before/after values
    'new_data' as new_values,
    CURRENT_TIMESTAMP as change_timestamp
FROM user_changes_source;

Monitoring and Performance

Key Metrics to Monitor

  • Throughput: Records processed per second
  • Latency: End-to-end processing time
  • Backpressure: Operator processing capacity
  • Checkpoint Success Rate: State consistency indicators

Logging Configuration

Configure detailed logging for troubleshooting:

# log4j.properties
rootLogger.level = INFO
 
# Flink JDBC connector logging
logger.jdbc.name = org.apache.flink.connector.jdbc
logger.jdbc.level = DEBUG
 
# PostgreSQL driver logging
logger.postgres.name = org.postgresql
logger.postgres.level = INFO
 
# CDC connector logging
logger.cdc.name = com.ververica.cdc
logger.cdc.level = DEBUG

Troubleshooting

Common Type Mapping Issues

Problem: Schema mismatch errors during writes.

Solution: Explicitly define types in DDL:

CREATE TABLE orders_sink (
    order_id BIGINT,
    customer_name VARCHAR(255),           -- Specify length
    order_total DECIMAL(10,2),           -- Specify precision
    order_date TIMESTAMP(3),             -- Specify timestamp precision
    metadata_json STRING                 -- Use STRING for JSON data
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://your-tacnode-host:5432/orders',
    'table-name' = 'orders'
);

Performance Optimization

Problem: Low throughput or high latency.

Solutions:

  1. Increase parallelism:

    SET parallelism.default = 8;
  2. Optimize batch parameters:

    'sink.buffer-flush.max-rows' = '5000',
    'sink.buffer-flush.interval' = '2s'
  3. Use upsert operations for deduplication:

    INSERT INTO target_table 
    SELECT * FROM source_table
    -- Use UPSERT logic in your INSERT statements

CDC-Specific Issues

Problem: Replication slot lag or storage growth.

Solutions:

  1. Monitor slot lag:

    SELECT slot_name, confirmed_flush_lsn, restart_lsn 
    FROM pg_replication_slots;
  2. Ensure regular checkpointing:

    env.enableCheckpointing(60000); // Checkpoint every minute
  3. Clean up unused slots:

    SELECT pg_drop_replication_slot('unused_slot_name');

Best Practices

Production Deployment

  1. Resource Planning: Size Flink cluster based on expected throughput
  2. Fault Tolerance: Enable checkpointing with appropriate intervals
  3. Monitoring: Implement comprehensive metrics collection
  4. Security: Use SSL connections and credential management
  5. Testing: Validate data quality and performance under load

CDC Best Practices

  1. Pre-create slots with noexport_snapshot for efficiency
  2. Use specific publications to reduce network overhead
  3. Monitor slot lag to prevent storage issues
  4. Plan for schema evolution in your processing logic

This integration enables powerful real-time analytics and data processing workflows while leveraging Tacnode's distributed architecture and PostgreSQL compatibility.