Developing UDFs with PL/Python

1. PL/Python Extension Overview

PL/Python is a loadable procedural language that allows you to write database User-Defined Functions (UDF) or stored procedures using Python. This enables you to leverage the powerful Python ecosystem directly within the database for complex data processing, analysis, and machine learning inference, without needing to move data out of the database, thereby improving development efficiency and reducing data movement overhead.

Core Value:

  • Extensibility: Break through the limitations of built-in SQL functions to implement arbitrary complex logic.
  • Flexibility: Leverage Python's vast ecosystem (such as NumPy, Pandas, Scikit-learn, Requests, etc.).
  • Convenience: Data processing logic is closer to the data, reducing data movement and improving development efficiency.

In Tacnode, the supported PL/Python extensions include:

  • plpython3u: Untrusted language extension supporting Python 3. The u stands for "untrusted," meaning it has fewer security sandbox restrictions, making it powerful but requiring careful use.
  • plpythonvec3u: Untrusted language extension for Python 3 that supports vectorized execution. It's designed to significantly improve performance when processing batch data rows.

Note: Using PL/Python requires first binding a UDF Server and keeping the UDF Server in a running state.

2. Built-in Python Library Support

To improve development convenience and performance, the system comes pre-installed with the following commonly used Python libraries that you can directly import and use in PL/Python UDFs without additional installation:

2.1 Database Connection and Operations

  • psycopg2: PostgreSQL adapter for connecting and operating PostgreSQL databases in Python
  • SQLAlchemy: Python SQL toolkit and Object-Relational Mapping, providing efficient data access patterns

2.2 Data Processing and Analysis

  • numpy: Fundamental scientific computing library providing high-performance multidimensional array objects and mathematical functions
  • pandas: Powerful data analysis and manipulation tool providing DataFrame data structures
  • SciPy: Scientific computing library based on numpy, providing mathematical, scientific, and engineering computing functions
  • Scikit-learn: Machine learning library providing various classification, regression, and clustering algorithms

2.3 Security and Encryption

  • pycryptodome: Encryption toolkit providing various encryption algorithm implementations
  • cryptography: Cryptographic library providing encryption recipes and primitives
  • passlib: Password hashing library supporting multiple password hashing schemes
  • pyOpenSSL: OpenSSL bindings providing SSL/TLS functionality
  • base58: Base58 encoding/decoding library commonly used for cryptocurrency address encoding

2.4 Network and Cloud Services

  • Requests: HTTP library for sending HTTP requests and calling REST APIs
  • boto3: AWS SDK for Python for interacting with AWS services

2.5 Utilities and Tools

  • pytest: Testing framework for writing and running test cases
  • pendulum: Date-time library providing more intuitive date-time operation interfaces

These pre-installed libraries cover common use cases from data processing, machine learning to security encryption and cloud service integration, allowing you to quickly build complex in-database processing logic.

3. Enabling PL/Python Support

To use PL/Python, you first need to enable the corresponding extension in the target database.

3.1 Enable plpython3u

Use the CREATE EXTENSION command to enable plpython3u. Since it's an untrusted language, superuser privileges are required.

-- Connect to target database and execute
CREATE EXTENSION IF NOT EXISTS plpython3u;

3.2 Enable plpythonvec3u

-- Execute with superuser
CREATE EXTENSION IF NOT EXISTS plpythonvec3u;

4. Writing PL/Python UDFs with plpython3u

The core of PL/Python UDF is writing Python scripts within SQL function definitions.

4.1 Basic Structure

A basic PL/Python UDF creation syntax is as follows:

CREATE OR REPLACE FUNCTION function_name (argument_list)
RETURNS return_type
AS $$
  # Python script goes here
  # Function logic
  return result  # Return result
$$ LANGUAGE plpython3u;

4.2 Data Type Mapping

PL/Python automatically converts between SQL and Python data types:

SQL TypePython Type
TEXTstr
INTEGERint
FLOAT8float
BOOLbool
ARRAY (e.g., INTEGER[])list (e.g., [int, int, ...])
JSON/JSONBdict/list (Python objects)
NULLNone

4.3 Basic Examples

4.3.1 String Processing Function

CREATE OR REPLACE FUNCTION capitalize_name(name TEXT)
RETURNS TEXT
AS $$
  if name is None:
      return None
  return name.title()  # Convert name to title case
$$ LANGUAGE plpython3u;
 
-- Usage example
SELECT capitalize_name('john doe');  -- Returns 'John Doe'

4.3.2 Numerical Calculation and Array Processing

CREATE OR REPLACE FUNCTION calculate_average(arr float8[])
RETURNS float8
AS $$
  # Calculate list average
  if not arr or len(arr) == 0:
      return 0.0
  return sum(arr) / len(arr)
$$ LANGUAGE plpython3u;
 
-- Usage example
SELECT calculate_average(ARRAY[1.0, 2.5, 3.7, 4.1]);  -- Returns 2.825

4.3.3 Returning Arrays

CREATE FUNCTION return_py_int_array()
RETURNS int[]
AS $$
  return [1, 11, 21, 31]
$$ LANGUAGE plpython3u;
 
SELECT return_py_int_array(); -- Returns {1,11,21,31}

5. Writing PL/Python UDFs with plpythonvec3u

5.1 Vectorized Execution and plpythonvec3u

plpythonvec3u is an extension designed to significantly improve PL/Python UDF performance when processing batch data.

Core Concept: Vectorized Execution

  • Traditional UDF (plpython3u): The Python function is called once for each row in the query result set. This means if you're processing 1 million rows of data, the Python interpreter will be called 1 million times, processing only one row of data each time. The overhead of this pattern (inter-process communication, function calls) is very large.

  • Vectorized UDF (plpythonvec3u): Receives a batch of rows at once (such as an array or matrix) and performs vectorized or batch processing operations within the Python function. This greatly reduces the number of function calls and inter-process communication overhead, and can better utilize the vectorized operation advantages of Python scientific computing libraries (such as NumPy, Pandas).

Expected Advantages of plpythonvec3u

  1. Dramatically Improve Performance: Reduce function call and process switching overhead, especially suitable for aggregation, transformation, and machine learning inference scenarios that need to process large numbers of rows.
  2. Better Hardware Utilization: More efficiently utilize modern CPU SIMD instructions and multi-core parallel capabilities.
  3. Seamless Integration with Scientific Computing Libraries: Directly receive and return NumPy arrays or Pandas DataFrames, facilitating the use of optimized batch computation functions from these libraries.

5.2 Usage Examples

-- Standard single-row processing function
CREATE OR REPLACE FUNCTION process_text_single(text_input text)
RETURNS text
AS $$
    import re
    result = text_input.upper()
    result = re.sub(r'\s+', '_', result)
    return f"SINGLE_PROCESSED: {result}"
$$ LANGUAGE plpython3u;
 
-- Usage: Function is called separately for each row
SELECT id, original_text, process_text_single(original_text) as processed
FROM (
    VALUES
    (1, 'hello world'),
    (2, 'python function'),
    (3, 'database processing')
) AS t(id, original_text);
-- Same function signature, but internally receives batch data
CREATE OR REPLACE FUNCTION process_text_batch(text_input text)
RETURNS text
AS $$
    import re
    results = []
    for item in text_input:  # text_input is ['hello world', 'python function', 'database processing']
        if item is not None:
            processed = item.upper()
            processed = re.sub(r'\s+', '_', processed)
            results.append(f"BATCH_PROCESSED: {processed}")
        else:
            results.append(None)
    
    return results  # Return results list
$$ LANGUAGE plpythonvec3u;
 
-- Usage is exactly the same! But internally it's batch processing
SELECT id, original_text, process_text_batch(original_text) as processed
FROM (
    VALUES
    (1, 'hello world'),
    (2, 'python function'),
    (3, 'database processing')
) AS t(id, original_text);

6. Built-in Python Library UDF Usage Examples

6.1 NumPy UDF Array Statistical Analysis Example

CREATE OR REPLACE FUNCTION numpy_array_stats(arr float8[])
RETURNS JSONB
AS $$
    import numpy as np
    import json
    
    if arr is None or len(arr) == 0:
        return json.dumps({"error": "Empty array provided"})
    
    np_array = np.array(arr)
    
    stats = {
        "mean": float(np.mean(np_array)),
        "median": float(np.median(np_array)),
        "std_dev": float(np.std(np_array)),
        "variance": float(np.var(np_array)),
        "min": float(np.min(np_array)),
        "max": float(np.max(np_array)),
        "sum": float(np.sum(np_array)),
        "percentile_25": float(np.percentile(np_array, 25)),
        "percentile_75": float(np.percentile(np_array, 75))
    }
    
    return json.dumps(stats)
$$ LANGUAGE plpython3u;
 
-- Usage example
SELECT numpy_array_stats(ARRAY[1.0, 2.5, 3.7, 4.1, 5.8, 6.2, 7.9]);

6.2 Pandas UDF Data Cleaning and Transformation Example

CREATE OR REPLACE FUNCTION pandas_data_cleanup(data_json JSONB)
RETURNS JSONB
AS $$
    import pandas as pd
    import json
    from datetime import datetime
    
    # Convert JSON data to DataFrame
    data_dict = json.loads(data_json)
    df = pd.DataFrame(data_dict)
    
    # Data cleaning operations
    # 1. Handle missing values
    df.fillna({'age': df['age'].mean(), 'salary': df['salary'].median()}, inplace=True)
    
    # 2. Data type conversion
    df['join_date'] = pd.to_datetime(df['join_date'])
    df['age'] = df['age'].astype(int)
    
    # 3. Create new features
    current_year = datetime.now().year
    df['years_of_service'] = current_year - df['join_date'].dt.year
    
    # 4. Data filtering
    df = df[df['salary'] > 30000]
    
    # 5. Convert Timestamp to string for JSON serialization
    df['join_date'] = df['join_date'].dt.strftime('%Y-%m-%d')
    
    # 6. Data aggregation
    summary = {
        'total_employees': len(df),
        'average_salary': float(df['salary'].mean()),
        'max_salary': float(df['salary'].max()),
        'department_counts': df['department'].value_counts().to_dict(),
        'cleaned_data': df.to_dict('records')
    }
    
    return json.dumps(summary)
$$ LANGUAGE plpython3u;
 
-- Usage example
SELECT pandas_data_cleanup('[
    {"name": "John", "age": 30, "salary": 50000, "department": "IT", "join_date": "2018-05-15"},
    {"name": "Jane", "age": null, "salary": 45000, "department": "HR", "join_date": "2020-02-20"},
    {"name": "Bob", "age": 35, "salary": null, "department": "IT", "join_date": "2019-11-10"}
]'::jsonb);

6.3 Cryptography UDF Password Hash Verification Example

CREATE OR REPLACE FUNCTION hash_password(password TEXT)
RETURNS TEXT
AS $$
    from passlib.hash import bcrypt
    
    try:
        # Check if password is empty
        if password is None or password.strip() == '':
            return NULL
            
        # Hash password using bcrypt algorithm
        hashed_password = bcrypt.hash(password)
        return hashed_password
        
    except Exception:
        # Return NULL if exception occurs
        return NULL
        
$$ LANGUAGE plpython3u;
 
CREATE OR REPLACE FUNCTION verify_password(password TEXT, hashed_password TEXT)
RETURNS BOOLEAN
AS $$
    from passlib.hash import bcrypt
    
    try:
        # Check if input parameters are valid
        if password is None or password.strip() == '':
            return False
            
        if hashed_password is None or hashed_password.strip() == '':
            return False
            
        # Validate hash format (basic check)
        if not (hashed_password.startswith('$2b$') or
                hashed_password.startswith('$2a$') or
                hashed_password.startswith('$2y$')):
            return False
            
        # Verify if password matches hash
        return bcrypt.verify(password, hashed_password)
        
    except Exception:
        # Return false for any exception
        return False
        
$$ LANGUAGE plpython3u;
 
-- Usage example
SELECT hash_password('your_key_123');
SELECT verify_password('your_key_123', '$2b$12$ah/---');
 
-- Test exception scenarios
SELECT verify_password('', '$2b$12$ah---'); -- Empty password
SELECT verify_password('your_key_123', ''); -- Empty hash
SELECT verify_password('your_key_123', 'invalid_hash_format'); -- Invalid hash format
SELECT verify_password(NULL, '$2b$12$...'); -- NULL password
SELECT verify_password('your_key_123', NULL); -- NULL hash

6.4 Requests Exchange Rate Query Example

CREATE OR REPLACE FUNCTION get_exchange_rate(base_currency TEXT, target_currency TEXT)
RETURNS JSONB
AS $$
    import requests
    import json
    
    # Use free exchange rate API (example)
    url = f'https://api.exchangerate-api.com/v4/latest/{base_currency}'
    
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        
        rates_data = response.json()
        
        if target_currency in rates_data['rates']:
            result = {
                "base_currency": base_currency,
                "target_currency": target_currency,
                "exchange_rate": rates_data['rates'][target_currency],
                "last_updated": rates_data['date']
            }
            return json.dumps(result)
        else:
            return json.dumps({"error": f"Currency {target_currency} not found"})
            
    except requests.exceptions.RequestException as e:
        return json.dumps({"error": f"Exchange rate request failed: {str(e)}"})
$$ LANGUAGE plpython3u;
 
-- Usage example
SELECT get_exchange_rate('USD', 'EUR');
SELECT get_exchange_rate('EUR', 'JPY');

7. Security and Production Environment Considerations

  1. Untrusted Language: Both plpython3u and plpythonvec3u are untrusted languages, powerful but high-risk. Never grant permissions to use or create these functions to untrusted users.

  2. Module Installation: Python libraries used in UDFs can only use built-in Python libraries; manual installation of third-party libraries is not supported.

  3. Error Handling: Use try-except blocks in Python code to properly handle potential exceptions, avoiding entire query failures due to uncaught exceptions.

  4. Resource Management: Complex Python functions may consume significant CPU and memory resources. Monitor database resource usage to ensure UDFs don't affect overall database stability.

  5. Code Quality: Ensure written Python code is robust, efficient, and thoroughly tested, especially handling edge cases (such as empty input, NULL values).

Advanced Examples

Machine Learning Integration

CREATE OR REPLACE FUNCTION predict_customer_churn(features float8[])
RETURNS JSONB
AS $$
    import numpy as np
    from sklearn.ensemble import RandomForestClassifier
    import json
    
    # Pre-trained model coefficients (simplified example)
    # In production, you'd load a properly trained model
    
    if not features or len(features) < 4:
        return json.dumps({"error": "Insufficient features"})
    
    # Simple rule-based prediction for demonstration
    score = sum(features) / len(features)
    
    if score > 0.7:
        prediction = "high_risk"
        probability = min(0.95, score)
    elif score > 0.4:
        prediction = "medium_risk"
        probability = score * 0.6
    else:
        prediction = "low_risk"
        probability = max(0.05, score * 0.3)
    
    result = {
        "prediction": prediction,
        "probability": round(probability, 3),
        "features_processed": len(features)
    }
    
    return json.dumps(result)
$$ LANGUAGE plpython3u;
 
-- Usage
SELECT predict_customer_churn(ARRAY[0.8, 0.6, 0.9, 0.7]);

Time Series Analysis

CREATE OR REPLACE FUNCTION analyze_time_series(values float8[], window_size int DEFAULT 7)
RETURNS JSONB
AS $$
    import numpy as np
    import json
    
    if not values or len(values) < window_size:
        return json.dumps({"error": "Insufficient data points"})
    
    np_values = np.array(values)
    
    # Calculate moving averages
    moving_avg = []
    for i in range(window_size - 1, len(np_values)):
        window = np_values[i - window_size + 1:i + 1]
        moving_avg.append(float(np.mean(window)))
    
    # Detect trends
    if len(moving_avg) >= 2:
        trend = "increasing" if moving_avg[-1] > moving_avg[0] else "decreasing"
    else:
        trend = "stable"
    
    # Calculate volatility
    volatility = float(np.std(np_values))
    
    result = {
        "moving_averages": moving_avg,
        "trend": trend,
        "volatility": round(volatility, 4),
        "min_value": float(np.min(np_values)),
        "max_value": float(np.max(np_values)),
        "total_points": len(values)
    }
    
    return json.dumps(result)
$$ LANGUAGE plpython3u;
 
-- Usage
SELECT analyze_time_series(ARRAY[100, 105, 103, 108, 112, 109, 115, 118, 114, 120]);

This comprehensive guide provides everything needed to develop sophisticated PL/Python UDFs in Tacnode, from basic concepts to advanced machine learning integration.