DocsGuidesQueryFunctionsPlpython Udf

PL/Python UDFs

Write database functions in Python with access to NumPy, Pandas, scikit-learn, and other popular libraries for data processing, ML inference, and more.

Overview

PL/Python lets you write User-Defined Functions (UDFs) in Python that run directly inside your database. Instead of pulling data out to process it elsewhere, you can bring Python’s powerful ecosystem—NumPy, Pandas, scikit-learn—right to your data.

Why use PL/Python?

  • Go beyond SQL: Implement any logic that’s awkward or impossible in pure SQL
  • Use Python libraries: NumPy for math, Pandas for data wrangling, scikit-learn for ML
  • Skip the data transfer: Process data where it lives instead of shipping it around

Tacnode supports two PL/Python extensions:

  • plpython3u — Standard Python 3 support. The u means “untrusted”—it can access the filesystem and network, so use it carefully.
  • plpythonvec3u — Vectorized execution for batch processing. Same capabilities as above, but processes multiple rows at once for better performance.

PL/Python requires a running UDF Server. Make sure you’ve configured and started one before creating functions.

Pre-installed Libraries

These libraries are available out of the box—just import them in your UDF code:

Data & ML

  • numpy — Arrays, linear algebra, math functions
  • pandas — DataFrames for data manipulation
  • scipy — Scientific computing and statistics
  • scikit-learn — Machine learning algorithms

Database

  • psycopg2 — PostgreSQL database adapter
  • SQLAlchemy — SQL toolkit and ORM

Security

  • cryptography — Encryption primitives
  • passlib — Password hashing (bcrypt, argon2, etc.)
  • pycryptodome — Low-level crypto algorithms
  • pyOpenSSL — SSL/TLS support
  • base58 — Base58 encoding (common in crypto)

Network & Cloud

  • requests — HTTP client
  • boto3 — AWS SDK

Utilities

  • pendulum — Better datetime handling
  • pytest — Testing framework

Getting Started

Enable the extension in your database (requires superuser):

-- Standard PL/Python
CREATE EXTENSION IF NOT EXISTS plpython3u;

-- Vectorized PL/Python (for batch processing)
CREATE EXTENSION IF NOT EXISTS plpythonvec3u;

Writing Your First UDF

Here’s the basic structure:

CREATE OR REPLACE FUNCTION my_function(arg1 TYPE, arg2 TYPE)
RETURNS return_type
AS $$
    # Your Python code here
    return result
$$ LANGUAGE plpython3u;

Type Mapping

SQL types automatically convert to Python types:

SQL TypePython Type
TEXTstr
INTEGERint
FLOAT8float
BOOLbool
INTEGER[]list of int
JSON/JSONBdict or list
NULLNone

Simple Examples

Title case a string:

CREATE OR REPLACE FUNCTION title_case(name TEXT)
RETURNS TEXT
AS $$
    if name is None:
        return None
    return name.title()
$$ LANGUAGE plpython3u;

SELECT title_case('john doe');  -- 'John Doe'

Calculate an average:

CREATE OR REPLACE FUNCTION avg_array(numbers float8[])
RETURNS float8
AS $$
    if not numbers:
        return 0.0
    return sum(numbers) / len(numbers)
$$ LANGUAGE plpython3u;

SELECT avg_array(ARRAY[1.0, 2.5, 3.7, 4.1]);  -- 2.825

Return an array:

CREATE FUNCTION fibonacci_sequence(n int)
RETURNS int[]
AS $$
    if n <= 0:
        return []
    fib = [0, 1]
    while len(fib) < n:
        fib.append(fib[-1] + fib[-2])
    return fib[:n]
$$ LANGUAGE plpython3u;

SELECT fibonacci_sequence(8);  -- {0,1,1,2,3,5,8,13}

Vectorized Execution

Standard UDFs process one row at a time. For a million rows, that’s a million function calls with all the overhead that entails.

plpythonvec3u changes this—your function receives batches of rows and returns batches of results. The SQL syntax stays the same, but internally you’re processing arrays:

-- Standard: called once per row
CREATE OR REPLACE FUNCTION clean_text(input text)
RETURNS text
AS $$
    import re
    return re.sub(r'\s+', ' ', input.strip().lower())
$$ LANGUAGE plpython3u;

-- Vectorized: called once with all rows
CREATE OR REPLACE FUNCTION clean_text_fast(input text)
RETURNS text
AS $$
    import re
    results = []
    for item in input:  # input is a list of all values
        if item is not None:
            results.append(re.sub(r'\s+', ' ', item.strip().lower()))
        else:
            results.append(None)
    return results
$$ LANGUAGE plpythonvec3u;

Both are called the same way:

SELECT clean_text(message) FROM logs;
SELECT clean_text_fast(message) FROM logs;

The vectorized version is faster because it:

  • Makes fewer cross-process calls
  • Lets you use NumPy/Pandas batch operations
  • Takes advantage of CPU SIMD instructions

Real-World Examples

Statistical Analysis with NumPy

CREATE OR REPLACE FUNCTION array_stats(arr float8[])
RETURNS JSONB
AS $$
    import numpy as np
    import json

    if not arr:
        return json.dumps({"error": "empty array"})

    data = np.array(arr)

    return json.dumps({
        "count": len(data),
        "mean": float(np.mean(data)),
        "median": float(np.median(data)),
        "std": float(np.std(data)),
        "min": float(np.min(data)),
        "max": float(np.max(data)),
        "p25": float(np.percentile(data, 25)),
        "p75": float(np.percentile(data, 75))
    })
$$ LANGUAGE plpython3u;

SELECT array_stats(ARRAY[1.0, 2.5, 3.7, 4.1, 5.8, 6.2, 7.9]);

Data Cleaning with Pandas

CREATE OR REPLACE FUNCTION clean_employee_data(raw_data JSONB)
RETURNS JSONB
AS $$
    import pandas as pd
    import json
    from datetime import datetime

    df = pd.DataFrame(json.loads(raw_data))

    # Fill missing values
    df['age'].fillna(df['age'].mean(), inplace=True)
    df['salary'].fillna(df['salary'].median(), inplace=True)

    # Parse dates and calculate tenure
    df['join_date'] = pd.to_datetime(df['join_date'])
    df['tenure_years'] = datetime.now().year - df['join_date'].dt.year

    # Filter out low salaries
    df = df[df['salary'] > 30000]

    # Format dates for JSON output
    df['join_date'] = df['join_date'].dt.strftime('%Y-%m-%d')

    return json.dumps({
        'employee_count': len(df),
        'avg_salary': float(df['salary'].mean()),
        'by_department': df['department'].value_counts().to_dict(),
        'records': df.to_dict('records')
    })
$$ LANGUAGE plpython3u;

SELECT clean_employee_data('[
    {"name": "Alice", "age": 30, "salary": 50000, "department": "Engineering", "join_date": "2020-03-15"},
    {"name": "Bob", "age": null, "salary": 45000, "department": "Sales", "join_date": "2021-07-01"}
]'::jsonb);

Password Hashing

CREATE OR REPLACE FUNCTION hash_password(password TEXT)
RETURNS TEXT
AS $$
    from passlib.hash import bcrypt

    if not password or not password.strip():
        return None

    return bcrypt.hash(password)
$$ LANGUAGE plpython3u;

CREATE OR REPLACE FUNCTION check_password(password TEXT, hash TEXT)
RETURNS BOOLEAN
AS $$
    from passlib.hash import bcrypt

    if not password or not hash:
        return False

    # Verify it's actually a bcrypt hash
    if not hash.startswith(('$2a$', '$2b$', '$2y$')):
        return False

    try:
        return bcrypt.verify(password, hash)
    except Exception:
        return False
$$ LANGUAGE plpython3u;

Calling External APIs

CREATE OR REPLACE FUNCTION get_exchange_rate(from_currency TEXT, to_currency TEXT)
RETURNS JSONB
AS $$
    import requests
    import json

    url = f'https://api.exchangerate-api.com/v4/latest/{from_currency}'

    try:
        resp = requests.get(url, timeout=5)
        resp.raise_for_status()
        data = resp.json()

        if to_currency not in data['rates']:
            return json.dumps({"error": f"Unknown currency: {to_currency}"})

        return json.dumps({
            "from": from_currency,
            "to": to_currency,
            "rate": data['rates'][to_currency],
            "date": data['date']
        })
    except requests.RequestException as e:
        return json.dumps({"error": str(e)})
$$ LANGUAGE plpython3u;

SELECT get_exchange_rate('USD', 'EUR');

ML Scoring

CREATE OR REPLACE FUNCTION score_churn_risk(features float8[])
RETURNS JSONB
AS $$
    import numpy as np
    import json

    if not features or len(features) < 4:
        return json.dumps({"error": "Need at least 4 features"})

    # In production, you'd load a trained model here
    # This is a simplified scoring example

    score = np.mean(features)

    if score > 0.7:
        risk = "high"
        prob = min(0.95, score)
    elif score > 0.4:
        risk = "medium"
        prob = score * 0.7
    else:
        risk = "low"
        prob = score * 0.3

    return json.dumps({
        "risk_level": risk,
        "probability": round(prob, 3),
        "features_used": len(features)
    })
$$ LANGUAGE plpython3u;

SELECT score_churn_risk(ARRAY[0.8, 0.6, 0.9, 0.7]);

Time Series Analysis

CREATE OR REPLACE FUNCTION analyze_trend(values float8[], window int DEFAULT 7)
RETURNS JSONB
AS $$
    import numpy as np
    import json

    if not values or len(values) < window:
        return json.dumps({"error": f"Need at least {window} data points"})

    data = np.array(values)

    # Calculate moving average
    moving_avg = []
    for i in range(window - 1, len(data)):
        moving_avg.append(float(np.mean(data[i - window + 1:i + 1])))

    # Determine trend direction
    if len(moving_avg) >= 2:
        trend = "up" if moving_avg[-1] > moving_avg[0] else "down"
    else:
        trend = "flat"

    return json.dumps({
        "trend": trend,
        "volatility": round(float(np.std(data)), 4),
        "min": float(np.min(data)),
        "max": float(np.max(data)),
        "moving_averages": moving_avg,
        "data_points": len(values)
    })
$$ LANGUAGE plpython3u;

SELECT analyze_trend(ARRAY[100, 105, 103, 108, 112, 109, 115, 118, 114, 120]);

Security Considerations

PL/Python runs with significant privileges. Keep these points in mind:

  1. Restrict access — Only grant CREATE FUNCTION to users you trust. These functions can access the filesystem and network.

  2. Use built-in libraries only — You can’t install additional packages. Stick to what’s pre-installed.

  3. Handle errors gracefully — Wrap risky operations in try/except so one bad input doesn’t crash your query.

  4. Watch resource usage — Complex Python code can consume significant CPU and memory. Monitor your UDF performance.

  5. Validate inputs — Always check for NULL and empty values before processing.