PL/Python UDFs
Write database functions in Python with access to NumPy, Pandas, scikit-learn, and other popular libraries for data processing, ML inference, and more.
Overview
PL/Python lets you write User-Defined Functions (UDFs) in Python that run directly inside your database. Instead of pulling data out to process it elsewhere, you can bring Python’s powerful ecosystem—NumPy, Pandas, scikit-learn—right to your data.
Why use PL/Python?
- Go beyond SQL: Implement any logic that’s awkward or impossible in pure SQL
- Use Python libraries: NumPy for math, Pandas for data wrangling, scikit-learn for ML
- Skip the data transfer: Process data where it lives instead of shipping it around
Tacnode supports two PL/Python extensions:
plpython3u— Standard Python 3 support. Theumeans “untrusted”—it can access the filesystem and network, so use it carefully.plpythonvec3u— Vectorized execution for batch processing. Same capabilities as above, but processes multiple rows at once for better performance.
PL/Python requires a running UDF Server. Make sure you’ve configured and started one before creating functions.
Pre-installed Libraries
These libraries are available out of the box—just import them in your UDF code:
Data & ML
- numpy — Arrays, linear algebra, math functions
- pandas — DataFrames for data manipulation
- scipy — Scientific computing and statistics
- scikit-learn — Machine learning algorithms
Database
- psycopg2 — PostgreSQL database adapter
- SQLAlchemy — SQL toolkit and ORM
Security
- cryptography — Encryption primitives
- passlib — Password hashing (bcrypt, argon2, etc.)
- pycryptodome — Low-level crypto algorithms
- pyOpenSSL — SSL/TLS support
- base58 — Base58 encoding (common in crypto)
Network & Cloud
- requests — HTTP client
- boto3 — AWS SDK
Utilities
- pendulum — Better datetime handling
- pytest — Testing framework
Getting Started
Enable the extension in your database (requires superuser):
-- Standard PL/Python
CREATE EXTENSION IF NOT EXISTS plpython3u;
-- Vectorized PL/Python (for batch processing)
CREATE EXTENSION IF NOT EXISTS plpythonvec3u;
Writing Your First UDF
Here’s the basic structure:
CREATE OR REPLACE FUNCTION my_function(arg1 TYPE, arg2 TYPE)
RETURNS return_type
AS $$
# Your Python code here
return result
$$ LANGUAGE plpython3u;
Type Mapping
SQL types automatically convert to Python types:
| SQL Type | Python Type |
|---|---|
| TEXT | str |
| INTEGER | int |
| FLOAT8 | float |
| BOOL | bool |
| INTEGER[] | list of int |
| JSON/JSONB | dict or list |
| NULL | None |
Simple Examples
Title case a string:
CREATE OR REPLACE FUNCTION title_case(name TEXT)
RETURNS TEXT
AS $$
if name is None:
return None
return name.title()
$$ LANGUAGE plpython3u;
SELECT title_case('john doe'); -- 'John Doe'
Calculate an average:
CREATE OR REPLACE FUNCTION avg_array(numbers float8[])
RETURNS float8
AS $$
if not numbers:
return 0.0
return sum(numbers) / len(numbers)
$$ LANGUAGE plpython3u;
SELECT avg_array(ARRAY[1.0, 2.5, 3.7, 4.1]); -- 2.825
Return an array:
CREATE FUNCTION fibonacci_sequence(n int)
RETURNS int[]
AS $$
if n <= 0:
return []
fib = [0, 1]
while len(fib) < n:
fib.append(fib[-1] + fib[-2])
return fib[:n]
$$ LANGUAGE plpython3u;
SELECT fibonacci_sequence(8); -- {0,1,1,2,3,5,8,13}
Vectorized Execution
Standard UDFs process one row at a time. For a million rows, that’s a million function calls with all the overhead that entails.
plpythonvec3u changes this—your function receives batches of rows and returns batches of results. The SQL syntax stays the same, but internally you’re processing arrays:
-- Standard: called once per row
CREATE OR REPLACE FUNCTION clean_text(input text)
RETURNS text
AS $$
import re
return re.sub(r'\s+', ' ', input.strip().lower())
$$ LANGUAGE plpython3u;
-- Vectorized: called once with all rows
CREATE OR REPLACE FUNCTION clean_text_fast(input text)
RETURNS text
AS $$
import re
results = []
for item in input: # input is a list of all values
if item is not None:
results.append(re.sub(r'\s+', ' ', item.strip().lower()))
else:
results.append(None)
return results
$$ LANGUAGE plpythonvec3u;
Both are called the same way:
SELECT clean_text(message) FROM logs;
SELECT clean_text_fast(message) FROM logs;
The vectorized version is faster because it:
- Makes fewer cross-process calls
- Lets you use NumPy/Pandas batch operations
- Takes advantage of CPU SIMD instructions
Real-World Examples
Statistical Analysis with NumPy
CREATE OR REPLACE FUNCTION array_stats(arr float8[])
RETURNS JSONB
AS $$
import numpy as np
import json
if not arr:
return json.dumps({"error": "empty array"})
data = np.array(arr)
return json.dumps({
"count": len(data),
"mean": float(np.mean(data)),
"median": float(np.median(data)),
"std": float(np.std(data)),
"min": float(np.min(data)),
"max": float(np.max(data)),
"p25": float(np.percentile(data, 25)),
"p75": float(np.percentile(data, 75))
})
$$ LANGUAGE plpython3u;
SELECT array_stats(ARRAY[1.0, 2.5, 3.7, 4.1, 5.8, 6.2, 7.9]);
Data Cleaning with Pandas
CREATE OR REPLACE FUNCTION clean_employee_data(raw_data JSONB)
RETURNS JSONB
AS $$
import pandas as pd
import json
from datetime import datetime
df = pd.DataFrame(json.loads(raw_data))
# Fill missing values
df['age'].fillna(df['age'].mean(), inplace=True)
df['salary'].fillna(df['salary'].median(), inplace=True)
# Parse dates and calculate tenure
df['join_date'] = pd.to_datetime(df['join_date'])
df['tenure_years'] = datetime.now().year - df['join_date'].dt.year
# Filter out low salaries
df = df[df['salary'] > 30000]
# Format dates for JSON output
df['join_date'] = df['join_date'].dt.strftime('%Y-%m-%d')
return json.dumps({
'employee_count': len(df),
'avg_salary': float(df['salary'].mean()),
'by_department': df['department'].value_counts().to_dict(),
'records': df.to_dict('records')
})
$$ LANGUAGE plpython3u;
SELECT clean_employee_data('[
{"name": "Alice", "age": 30, "salary": 50000, "department": "Engineering", "join_date": "2020-03-15"},
{"name": "Bob", "age": null, "salary": 45000, "department": "Sales", "join_date": "2021-07-01"}
]'::jsonb);
Password Hashing
CREATE OR REPLACE FUNCTION hash_password(password TEXT)
RETURNS TEXT
AS $$
from passlib.hash import bcrypt
if not password or not password.strip():
return None
return bcrypt.hash(password)
$$ LANGUAGE plpython3u;
CREATE OR REPLACE FUNCTION check_password(password TEXT, hash TEXT)
RETURNS BOOLEAN
AS $$
from passlib.hash import bcrypt
if not password or not hash:
return False
# Verify it's actually a bcrypt hash
if not hash.startswith(('$2a$', '$2b$', '$2y$')):
return False
try:
return bcrypt.verify(password, hash)
except Exception:
return False
$$ LANGUAGE plpython3u;
Calling External APIs
CREATE OR REPLACE FUNCTION get_exchange_rate(from_currency TEXT, to_currency TEXT)
RETURNS JSONB
AS $$
import requests
import json
url = f'https://api.exchangerate-api.com/v4/latest/{from_currency}'
try:
resp = requests.get(url, timeout=5)
resp.raise_for_status()
data = resp.json()
if to_currency not in data['rates']:
return json.dumps({"error": f"Unknown currency: {to_currency}"})
return json.dumps({
"from": from_currency,
"to": to_currency,
"rate": data['rates'][to_currency],
"date": data['date']
})
except requests.RequestException as e:
return json.dumps({"error": str(e)})
$$ LANGUAGE plpython3u;
SELECT get_exchange_rate('USD', 'EUR');
ML Scoring
CREATE OR REPLACE FUNCTION score_churn_risk(features float8[])
RETURNS JSONB
AS $$
import numpy as np
import json
if not features or len(features) < 4:
return json.dumps({"error": "Need at least 4 features"})
# In production, you'd load a trained model here
# This is a simplified scoring example
score = np.mean(features)
if score > 0.7:
risk = "high"
prob = min(0.95, score)
elif score > 0.4:
risk = "medium"
prob = score * 0.7
else:
risk = "low"
prob = score * 0.3
return json.dumps({
"risk_level": risk,
"probability": round(prob, 3),
"features_used": len(features)
})
$$ LANGUAGE plpython3u;
SELECT score_churn_risk(ARRAY[0.8, 0.6, 0.9, 0.7]);
Time Series Analysis
CREATE OR REPLACE FUNCTION analyze_trend(values float8[], window int DEFAULT 7)
RETURNS JSONB
AS $$
import numpy as np
import json
if not values or len(values) < window:
return json.dumps({"error": f"Need at least {window} data points"})
data = np.array(values)
# Calculate moving average
moving_avg = []
for i in range(window - 1, len(data)):
moving_avg.append(float(np.mean(data[i - window + 1:i + 1])))
# Determine trend direction
if len(moving_avg) >= 2:
trend = "up" if moving_avg[-1] > moving_avg[0] else "down"
else:
trend = "flat"
return json.dumps({
"trend": trend,
"volatility": round(float(np.std(data)), 4),
"min": float(np.min(data)),
"max": float(np.max(data)),
"moving_averages": moving_avg,
"data_points": len(values)
})
$$ LANGUAGE plpython3u;
SELECT analyze_trend(ARRAY[100, 105, 103, 108, 112, 109, 115, 118, 114, 120]);
Security Considerations
PL/Python runs with significant privileges. Keep these points in mind:
-
Restrict access — Only grant
CREATE FUNCTIONto users you trust. These functions can access the filesystem and network. -
Use built-in libraries only — You can’t install additional packages. Stick to what’s pre-installed.
-
Handle errors gracefully — Wrap risky operations in try/except so one bad input doesn’t crash your query.
-
Watch resource usage — Complex Python code can consume significant CPU and memory. Monitor your UDF performance.
-
Validate inputs — Always check for NULL and empty values before processing.