SQLAlchemy Development Guide
SQLAlchemy is the most popular Python ORM framework, providing both high-level ORM patterns and low-level database access. This guide demonstrates how to leverage SQLAlchemy's powerful features with Tacnode's PostgreSQL compatibility to build robust, scalable Python applications.
Prerequisites
Before starting, ensure you have:
- Python 3.7+ (Python 3.9+ recommended)
- pip for package management
- A Tacnode database with appropriate access credentials
Database Setup
Create your database structure in Tacnode:
-- Create the database
CREATE DATABASE example;
-- Connect to the database and create a table
\c example;
CREATE TABLE customer (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
phone VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
);
-- Create an index for better query performance
CREATE INDEX idx_customer_email ON customer(email);
CREATE INDEX idx_customer_active ON customer(is_active);
Project Setup
Installing Dependencies
Install SQLAlchemy and the PostgreSQL driver:
# Core dependencies
pip install SQLAlchemy psycopg2-binary
# Optional but recommended dependencies
pip install alembic # Database migrations
pip install sqlalchemy-utils # Additional utilities
pip install python-dotenv # Environment variables
For async applications, also install:
pip install asyncpg sqlalchemy[asyncio]
Environment Configuration
Create a .env
file for your database credentials:
# Database Configuration
DB_HOST=your-cluster.tacnode.io
DB_PORT=5432
DB_NAME=example
DB_USER=your_username
DB_PASSWORD=your_password
DB_SSL_MODE=require
# Application Configuration
DEBUG=False
LOG_LEVEL=INFO
Model Definition
Modern SQLAlchemy 2.0 Style
Create your models using SQLAlchemy's modern declarative syntax:
# models.py
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
"""Base class for all database models."""
pass
class Customer(Base):
"""Customer model representing the customer table."""
__tablename__ = "customer"
# Primary key
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# Required fields
name: Mapped[str] = mapped_column(String(255), nullable=False)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
# Optional fields
phone: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False
)
def __repr__(self) -> str:
return f"<Customer(id={self.id}, name='{self.name}', email='{self.email}')>"
def to_dict(self) -> dict:
"""Convert model to dictionary."""
return {
'id': self.id,
'name': self.name,
'email': self.email,
'phone': self.phone,
'is_active': self.is_active,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
Database Configuration
Connection Management
Create a database configuration module:
# database.py
import os
from typing import Optional
from sqlalchemy import create_engine, Engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from models import Base
class DatabaseConfig:
"""Database configuration and connection management."""
def __init__(self):
self.host = os.getenv('DB_HOST', 'localhost')
self.port = int(os.getenv('DB_PORT', 5432))
self.database = os.getenv('DB_NAME', 'example')
self.username = os.getenv('DB_USER', 'postgres')
self.password = os.getenv('DB_PASSWORD', '')
self.ssl_mode = os.getenv('DB_SSL_MODE', 'prefer')
self._engine: Optional[Engine] = None
self._session_factory: Optional[sessionmaker] = None
@property
def database_url(self) -> str:
"""Generate database URL."""
return (
f"postgresql://{self.username}:{self.password}@"
f"{self.host}:{self.port}/{self.database}?sslmode={self.ssl_mode}"
)
@property
def engine(self) -> Engine:
"""Get or create database engine."""
if self._engine is None:
self._engine = create_engine(
self.database_url,
# Connection pool settings
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600,
# Performance settings
echo=os.getenv('DEBUG', 'False').lower() == 'true',
future=True # Enable SQLAlchemy 2.0 style
)
return self._engine
@property
def session_factory(self) -> sessionmaker:
"""Get or create session factory."""
if self._session_factory is None:
self._session_factory = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False
)
return self._session_factory
def get_session(self) -> Session:
"""Get a new database session."""
return self.session_factory()
def create_tables(self):
"""Create all tables."""
Base.metadata.create_all(bind=self.engine)
def drop_tables(self):
"""Drop all tables."""
Base.metadata.drop_all(bind=self.engine)
# Global database instance
db_config = DatabaseConfig()
Data Access Layer
Repository Pattern
Implement a repository pattern for clean data access:
# repositories.py
from typing import List, Optional
from sqlalchemy import select, update, delete
from sqlalchemy.orm import Session
from models import Customer
from database import db_config
class CustomerRepository:
"""Repository for Customer operations."""
def __init__(self, session: Optional[Session] = None):
self.session = session or db_config.get_session()
self._should_close_session = session is None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._should_close_session:
self.session.close()
def create(self, customer_data: dict) -> Customer:
"""Create a new customer."""
customer = Customer(**customer_data)
self.session.add(customer)
self.session.commit()
self.session.refresh(customer)
return customer
def get_by_id(self, customer_id: int) -> Optional[Customer]:
"""Get customer by ID."""
stmt = select(Customer).where(Customer.id == customer_id)
return self.session.scalar(stmt)
def get_by_email(self, email: str) -> Optional[Customer]:
"""Get customer by email."""
stmt = select(Customer).where(Customer.email == email)
return self.session.scalar(stmt)
def get_all(self, active_only: bool = True) -> List[Customer]:
"""Get all customers."""
stmt = select(Customer)
if active_only:
stmt = stmt.where(Customer.is_active == True)
return list(self.session.scalars(stmt))
def get_by_name_pattern(self, pattern: str) -> List[Customer]:
"""Get customers by name pattern."""
stmt = select(Customer).where(Customer.name.ilike(f"%{pattern}%"))
return list(self.session.scalars(stmt))
def update(self, customer_id: int, update_data: dict) -> Optional[Customer]:
"""Update customer."""
stmt = (
update(Customer)
.where(Customer.id == customer_id)
.values(**update_data)
.returning(Customer)
)
result = self.session.execute(stmt)
self.session.commit()
return result.scalar_one_or_none()
def delete(self, customer_id: int) -> bool:
"""Delete customer."""
stmt = delete(Customer).where(Customer.id == customer_id)
result = self.session.execute(stmt)
self.session.commit()
return result.rowcount > 0
def soft_delete(self, customer_id: int) -> bool:
"""Soft delete customer (mark as inactive)."""
result = self.update(customer_id, {'is_active': False})
return result is not None
def bulk_create(self, customers_data: List[dict]) -> List[Customer]:
"""Bulk create customers."""
customers = [Customer(**data) for data in customers_data]
self.session.add_all(customers)
self.session.commit()
for customer in customers:
self.session.refresh(customer)
return customers
CRUD Operations Examples
Complete Application Example
# main.py
import os
from dotenv import load_dotenv
from repositories import CustomerRepository
from database import db_config
# Load environment variables
load_dotenv()
def demonstrate_crud_operations():
"""Demonstrate comprehensive CRUD operations."""
# Initialize database
db_config.create_tables()
try:
# CREATE operations
print("=== Creating Customers ===")
create_customers()
# READ operations
print("\n=== Reading Customers ===")
read_customers()
# UPDATE operations
print("\n=== Updating Customer ===")
update_customer()
# Advanced queries
print("\n=== Advanced Queries ===")
advanced_queries()
# DELETE operations
print("\n=== Deleting Customer ===")
delete_customer()
# Final state
print("\n=== Final State ===")
show_final_state()
except Exception as e:
print(f"Error occurred: {e}")
raise
def create_customers():
"""Create sample customers."""
with CustomerRepository() as repo:
# Create individual customers
customer1 = repo.create({
'name': 'Jacob Emily',
'email': 'jacob.emily@tacnode.io',
'phone': '+1-555-0101'
})
print(f"Created: {customer1}")
customer2 = repo.create({
'name': 'Michael Emma',
'email': 'michael.emma@tacnode.io',
'phone': '+1-555-0102'
})
print(f"Created: {customer2}")
# Bulk create
bulk_data = [
{
'name': 'Sarah Wilson',
'email': 'sarah.wilson@tacnode.io',
'phone': '+1-555-0103'
},
{
'name': 'David Chen',
'email': 'david.chen@tacnode.io',
'phone': '+1-555-0104'
}
]
bulk_customers = repo.bulk_create(bulk_data)
print(f"Bulk created {len(bulk_customers)} customers")
def read_customers():
"""Demonstrate various read operations."""
with CustomerRepository() as repo:
# Get all customers
all_customers = repo.get_all()
print(f"Total customers: {len(all_customers)}")
for customer in all_customers:
print(f" - {customer.name} ({customer.email})")
# Get by ID
customer = repo.get_by_id(1)
if customer:
print(f"Customer #1: {customer.name}")
# Get by email
customer = repo.get_by_email('jacob.emily@tacnode.io')
if customer:
print(f"Found by email: {customer.name}")
def update_customer():
"""Demonstrate update operations."""
with CustomerRepository() as repo:
# Update customer
updated_customer = repo.update(2, {
'email': 'michael.emma@gmail.com',
'phone': '+1-555-9999'
})
if updated_customer:
print(f"Updated customer: {updated_customer.name} -> {updated_customer.email}")
def advanced_queries():
"""Demonstrate advanced query operations."""
with CustomerRepository() as repo:
# Search by name pattern
customers = repo.get_by_name_pattern('Emma')
print(f"Customers with 'Emma' in name: {len(customers)}")
for customer in customers:
print(f" - {customer.name}")
def delete_customer():
"""Demonstrate delete operations."""
with CustomerRepository() as repo:
# Soft delete
success = repo.soft_delete(3)
if success:
print("Soft deleted customer #3")
# Hard delete
success = repo.delete(4)
if success:
print("Hard deleted customer #4")
def show_final_state():
"""Show final state of customers."""
with CustomerRepository() as repo:
active_customers = repo.get_all(active_only=True)
all_customers = repo.get_all(active_only=False)
print(f"Active customers: {len(active_customers)}")
print(f"Total customers: {len(all_customers)}")
for customer in all_customers:
status = "Active" if customer.is_active else "Inactive"
print(f" - {customer.name} ({status})")
if __name__ == '__main__':
demonstrate_crud_operations()
Async SQLAlchemy Support
For high-performance applications, use async SQLAlchemy:
# async_database.py
import asyncio
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from models import Base
class AsyncDatabaseConfig:
def __init__(self):
self.database_url = "postgresql+asyncpg://user:password@host:port/database"
self.engine = create_async_engine(
self.database_url,
echo=True,
future=True
)
self.session_factory = async_sessionmaker(
bind=self.engine,
class_=AsyncSession,
autoflush=False,
autocommit=False
)
async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
async with self.session_factory() as session:
yield session
# Async repository example
class AsyncCustomerRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, customer_data: dict) -> Customer:
customer = Customer(**customer_data)
self.session.add(customer)
await self.session.commit()
await self.session.refresh(customer)
return customer
async def get_by_id(self, customer_id: int) -> Optional[Customer]:
stmt = select(Customer).where(Customer.id == customer_id)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
Sample Output
When you run the main application, you'll see output similar to:
=== Creating Customers ===
Created: <Customer(id=1, name='Jacob Emily', email='jacob.emily@tacnode.io')>
Created: <Customer(id=2, name='Michael Emma', email='michael.emma@tacnode.io')>
Bulk created 2 customers
=== Reading Customers ===
Total customers: 4
- Jacob Emily (jacob.emily@tacnode.io)
- Michael Emma (michael.emma@tacnode.io)
- Sarah Wilson (sarah.wilson@tacnode.io)
- David Chen (david.chen@tacnode.io)
Customer #1: Jacob Emily
Found by email: Jacob Emily
=== Updating Customer ===
Updated customer: Michael Emma -> michael.emma@gmail.com
=== Advanced Queries ===
Customers with 'Emma' in name: 1
- Michael Emma
=== Deleting Customer ===
Soft deleted customer #3
Hard deleted customer #4
=== Final State ===
Active customers: 2
Total customers: 3
- Jacob Emily (Active)
- Michael Emma (Active)
- Sarah Wilson (Inactive)
Best Practices
Performance Optimization
-
Use Connection Pooling:
engine = create_engine( database_url, pool_size=20, max_overflow=30, pool_pre_ping=True )
-
Lazy Loading and Eager Loading:
# Eager loading relationships stmt = select(Customer).options(selectinload(Customer.orders))
-
Batch Operations:
# Use bulk operations for better performance session.bulk_insert_mappings(Customer, customers_data)
Security Best Practices
-
Use Environment Variables:
# Never hardcode credentials DATABASE_URL = os.getenv('DATABASE_URL')
-
Input Validation:
from pydantic import BaseModel, EmailStr class CustomerCreate(BaseModel): name: str email: EmailStr
-
Parameterized Queries (SQLAlchemy handles this automatically):
# This is safe from SQL injection stmt = select(Customer).where(Customer.email == user_input)
This comprehensive guide demonstrates how to build robust, scalable Python applications using SQLAlchemy with Tacnode. The PostgreSQL compatibility ensures full feature support while providing the benefits of a distributed, cloud-native database architecture.