SQLAlchemy Development Guide
Learn how to build Python applications with SQLAlchemy and Tacnode, featuring modern ORM patterns, async support, and comprehensive CRUD examples.
SQLAlchemy is the most popular Python ORM framework, providing both high-level ORM patterns and low-level database access. This guide demonstrates how to leverage SQLAlchemy’s powerful features with Tacnode’s PostgreSQL compatibility to build robust, scalable Python applications.
Prerequisites
Before starting, ensure you have:
- Python 3.7+ (Python 3.9+ recommended)
- pip for package management
- A Tacnode database with appropriate access credentials
Database Setup
Create your database structure in Tacnode:
-- Create the database
CREATE DATABASE example;
-- Connect to the database and create a table
\c example;
CREATE TABLE customer (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
phone VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT TRUE
);
-- Create an index for better query performance
CREATE INDEX idx_customer_email ON customer(email);
CREATE INDEX idx_customer_active ON customer(is_active);
Project Setup
Installing Dependencies
Install SQLAlchemy and the PostgreSQL driver:
# Core dependencies
pip install SQLAlchemy psycopg2-binary
# Optional but recommended dependencies
pip install alembic # Database migrations
pip install sqlalchemy-utils # Additional utilities
pip install python-dotenv # Environment variables
For async applications, also install:
pip install asyncpg sqlalchemy[asyncio]
Environment Configuration
Create a .env file for your database credentials:
# Database Configuration
DB_HOST=your-cluster.tacnode.io
DB_PORT=5432
DB_NAME=example
DB_USER=your_username
DB_PASSWORD=your_password
DB_SSL_MODE=require
# Application Configuration
DEBUG=False
LOG_LEVEL=INFO
Model Definition
Modern SQLAlchemy 2.0 Style
Create your models using SQLAlchemy’s modern declarative syntax:
# models.py
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
"""Base class for all database models."""
pass
class Customer(Base):
"""Customer model representing the customer table."""
__tablename__ = "customer"
# Primary key
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# Required fields
name: Mapped[str] = mapped_column(String(255), nullable=False)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
# Optional fields
phone: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False
)
def __repr__(self) -> str:
return f"<Customer(id={self.id}, name='{self.name}', email='{self.email}')>"
def to_dict(self) -> dict:
"""Convert model to dictionary."""
return {
'id': self.id,
'name': self.name,
'email': self.email,
'phone': self.phone,
'is_active': self.is_active,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
Database Configuration
Connection Management
Create a database configuration module:
# database.py
import os
from typing import Optional
from sqlalchemy import create_engine, Engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from models import Base
class DatabaseConfig:
"""Database configuration and connection management."""
def __init__(self):
self.host = os.getenv('DB_HOST', 'localhost')
self.port = int(os.getenv('DB_PORT', 5432))
self.database = os.getenv('DB_NAME', 'example')
self.username = os.getenv('DB_USER', 'postgres')
self.password = os.getenv('DB_PASSWORD', '')
self.ssl_mode = os.getenv('DB_SSL_MODE', 'prefer')
self._engine: Optional[Engine] = None
self._session_factory: Optional[sessionmaker] = None
@property
def database_url(self) -> str:
"""Generate database URL."""
return (
f"postgresql://{self.username}:{self.password}@"
f"{self.host}:{self.port}/{self.database}?sslmode={self.ssl_mode}"
)
@property
def engine(self) -> Engine:
"""Get or create database engine."""
if self._engine is None:
self._engine = create_engine(
self.database_url,
# Connection pool settings
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600,
# Performance settings
echo=os.getenv('DEBUG', 'False').lower() == 'true',
future=True # Enable SQLAlchemy 2.0 style
)
return self._engine
@property
def session_factory(self) -> sessionmaker:
"""Get or create session factory."""
if self._session_factory is None:
self._session_factory = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False
)
return self._session_factory
def get_session(self) -> Session:
"""Get a new database session."""
return self.session_factory()
def create_tables(self):
"""Create all tables."""
Base.metadata.create_all(bind=self.engine)
def drop_tables(self):
"""Drop all tables."""
Base.metadata.drop_all(bind=self.engine)
# Global database instance
db_config = DatabaseConfig()
Data Access Layer
Repository Pattern
Implement a repository pattern for clean data access:
# repositories.py
from typing import List, Optional
from sqlalchemy import select, update, delete
from sqlalchemy.orm import Session
from models import Customer
from database import db_config
class CustomerRepository:
"""Repository for Customer operations."""
def __init__(self, session: Optional[Session] = None):
self.session = session or db_config.get_session()
self._should_close_session = session is None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._should_close_session:
self.session.close()
def create(self, customer_data: dict) -> Customer:
"""Create a new customer."""
customer = Customer(**customer_data)
self.session.add(customer)
self.session.commit()
self.session.refresh(customer)
return customer
def get_by_id(self, customer_id: int) -> Optional[Customer]:
"""Get customer by ID."""
stmt = select(Customer).where(Customer.id == customer_id)
return self.session.scalar(stmt)
def get_by_email(self, email: str) -> Optional[Customer]:
"""Get customer by email."""
stmt = select(Customer).where(Customer.email == email)
return self.session.scalar(stmt)
def get_all(self, active_only: bool = True) -> List[Customer]:
"""Get all customers."""
stmt = select(Customer)
if active_only:
stmt = stmt.where(Customer.is_active == True)
return list(self.session.scalars(stmt))
def get_by_name_pattern(self, pattern: str) -> List[Customer]:
"""Get customers by name pattern."""
stmt = select(Customer).where(Customer.name.ilike(f"%{pattern}%"))
return list(self.session.scalars(stmt))
def update(self, customer_id: int, update_data: dict) -> Optional[Customer]:
"""Update customer."""
stmt = (
update(Customer)
.where(Customer.id == customer_id)
.values(**update_data)
.returning(Customer)
)
result = self.session.execute(stmt)
self.session.commit()
return result.scalar_one_or_none()
def delete(self, customer_id: int) -> bool:
"""Delete customer."""
stmt = delete(Customer).where(Customer.id == customer_id)
result = self.session.execute(stmt)
self.session.commit()
return result.rowcount > 0
def soft_delete(self, customer_id: int) -> bool:
"""Soft delete customer (mark as inactive)."""
result = self.update(customer_id, {'is_active': False})
return result is not None
def bulk_create(self, customers_data: List[dict]) -> List[Customer]:
"""Bulk create customers."""
customers = [Customer(**data) for data in customers_data]
self.session.add_all(customers)
self.session.commit()
for customer in customers:
self.session.refresh(customer)
return customers
CRUD Operations Examples
Complete Application Example
# main.py
import os
from dotenv import load_dotenv
from repositories import CustomerRepository
from database import db_config
# Load environment variables
load_dotenv()
def demonstrate_crud_operations():
"""Demonstrate comprehensive CRUD operations."""
# Initialize database
db_config.create_tables()
try:
# CREATE operations
print("=== Creating Customers ===")
create_customers()
# READ operations
print("\n=== Reading Customers ===")
read_customers()
# UPDATE operations
print("\n=== Updating Customer ===")
update_customer()
# Advanced queries
print("\n=== Advanced Queries ===")
advanced_queries()
# DELETE operations
print("\n=== Deleting Customer ===")
delete_customer()
# Final state
print("\n=== Final State ===")
show_final_state()
except Exception as e:
print(f"Error occurred: {e}")
raise
def create_customers():
"""Create sample customers."""
with CustomerRepository() as repo:
# Create individual customers
customer1 = repo.create({
'name': 'Jacob Emily',
'email': 'jacob.emily@tacnode.io',
'phone': '+1-555-0101'
})
print(f"Created: {customer1}")
customer2 = repo.create({
'name': 'Michael Emma',
'email': 'michael.emma@tacnode.io',
'phone': '+1-555-0102'
})
print(f"Created: {customer2}")
# Bulk create
bulk_data = [
{
'name': 'Sarah Wilson',
'email': 'sarah.wilson@tacnode.io',
'phone': '+1-555-0103'
},
{
'name': 'David Chen',
'email': 'david.chen@tacnode.io',
'phone': '+1-555-0104'
}
]
bulk_customers = repo.bulk_create(bulk_data)
print(f"Bulk created {len(bulk_customers)} customers")
def read_customers():
"""Demonstrate various read operations."""
with CustomerRepository() as repo:
# Get all customers
all_customers = repo.get_all()
print(f"Total customers: {len(all_customers)}")
for customer in all_customers:
print(f" - {customer.name} ({customer.email})")
# Get by ID
customer = repo.get_by_id(1)
if customer:
print(f"Customer #1: {customer.name}")
# Get by email
customer = repo.get_by_email('jacob.emily@tacnode.io')
if customer:
print(f"Found by email: {customer.name}")
def update_customer():
"""Demonstrate update operations."""
with CustomerRepository() as repo:
# Update customer
updated_customer = repo.update(2, {
'email': 'michael.emma@gmail.com',
'phone': '+1-555-9999'
})
if updated_customer:
print(f"Updated customer: {updated_customer.name} -> {updated_customer.email}")
def advanced_queries():
"""Demonstrate advanced query operations."""
with CustomerRepository() as repo:
# Search by name pattern
customers = repo.get_by_name_pattern('Emma')
print(f"Customers with 'Emma' in name: {len(customers)}")
for customer in customers:
print(f" - {customer.name}")
def delete_customer():
"""Demonstrate delete operations."""
with CustomerRepository() as repo:
# Soft delete
success = repo.soft_delete(3)
if success:
print("Soft deleted customer #3")
# Hard delete
success = repo.delete(4)
if success:
print("Hard deleted customer #4")
def show_final_state():
"""Show final state of customers."""
with CustomerRepository() as repo:
active_customers = repo.get_all(active_only=True)
all_customers = repo.get_all(active_only=False)
print(f"Active customers: {len(active_customers)}")
print(f"Total customers: {len(all_customers)}")
for customer in all_customers:
status = "Active" if customer.is_active else "Inactive"
print(f" - {customer.name} ({status})")
if __name__ == '__main__':
demonstrate_crud_operations()
Async SQLAlchemy Support
For high-performance applications, use async SQLAlchemy:
# async_database.py
import asyncio
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from models import Base
class AsyncDatabaseConfig:
def __init__(self):
self.database_url = "postgresql+asyncpg://user:password@host:port/database"
self.engine = create_async_engine(
self.database_url,
echo=True,
future=True
)
self.session_factory = async_sessionmaker(
bind=self.engine,
class_=AsyncSession,
autoflush=False,
autocommit=False
)
async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
async with self.session_factory() as session:
yield session
# Async repository example
class AsyncCustomerRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def create(self, customer_data: dict) -> Customer:
customer = Customer(**customer_data)
self.session.add(customer)
await self.session.commit()
await self.session.refresh(customer)
return customer
async def get_by_id(self, customer_id: int) -> Optional[Customer]:
stmt = select(Customer).where(Customer.id == customer_id)
result = await self.session.execute(stmt)
return result.scalar_one_or_none()
Sample Output
When you run the main application, you’ll see output similar to:
=== Creating Customers ===
Created: <Customer(id=1, name='Jacob Emily', email='jacob.emily@tacnode.io')>
Created: <Customer(id=2, name='Michael Emma', email='michael.emma@tacnode.io')>
Bulk created 2 customers
=== Reading Customers ===
Total customers: 4
- Jacob Emily (jacob.emily@tacnode.io)
- Michael Emma (michael.emma@tacnode.io)
- Sarah Wilson (sarah.wilson@tacnode.io)
- David Chen (david.chen@tacnode.io)
Customer #1: Jacob Emily
Found by email: Jacob Emily
=== Updating Customer ===
Updated customer: Michael Emma -> michael.emma@gmail.com
=== Advanced Queries ===
Customers with 'Emma' in name: 1
- Michael Emma
=== Deleting Customer ===
Soft deleted customer #3
Hard deleted customer #4
=== Final State ===
Active customers: 2
Total customers: 3
- Jacob Emily (Active)
- Michael Emma (Active)
- Sarah Wilson (Inactive)
Best Practices
Performance Optimization
-
Use Connection Pooling:
engine = create_engine( database_url, pool_size=20, max_overflow=30, pool_pre_ping=True ) -
Lazy Loading and Eager Loading:
# Eager loading relationships stmt = select(Customer).options(selectinload(Customer.orders)) -
Batch Operations:
# Use bulk operations for better performance session.bulk_insert_mappings(Customer, customers_data)
Security Best Practices
-
Use Environment Variables:
# Never hardcode credentials DATABASE_URL = os.getenv('DATABASE_URL') -
Input Validation:
from pydantic import BaseModel, EmailStr class CustomerCreate(BaseModel): name: str email: EmailStr -
Parameterized Queries (SQLAlchemy handles this automatically):
# This is safe from SQL injection stmt = select(Customer).where(Customer.email == user_input)
This comprehensive guide demonstrates how to build robust, scalable Python applications using SQLAlchemy with Tacnode. The PostgreSQL compatibility ensures full feature support while providing the benefits of a distributed, cloud-native database architecture.