SQLAlchemy Development Guide

SQLAlchemy is the most popular Python ORM framework, providing both high-level ORM patterns and low-level database access. This guide demonstrates how to leverage SQLAlchemy's powerful features with Tacnode's PostgreSQL compatibility to build robust, scalable Python applications.

Prerequisites

Before starting, ensure you have:

  • Python 3.7+ (Python 3.9+ recommended)
  • pip for package management
  • A Tacnode database with appropriate access credentials

Database Setup

Create your database structure in Tacnode:

-- Create the database
CREATE DATABASE example;
 
-- Connect to the database and create a table
\c example;
 
CREATE TABLE customer (
    id BIGSERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    phone VARCHAR(20),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE
);
 
-- Create an index for better query performance
CREATE INDEX idx_customer_email ON customer(email);
CREATE INDEX idx_customer_active ON customer(is_active);

Project Setup

Installing Dependencies

Install SQLAlchemy and the PostgreSQL driver:

# Core dependencies
pip install SQLAlchemy psycopg2-binary
 
# Optional but recommended dependencies
pip install alembic          # Database migrations
pip install sqlalchemy-utils # Additional utilities
pip install python-dotenv   # Environment variables

For async applications, also install:

pip install asyncpg sqlalchemy[asyncio]

Environment Configuration

Create a .env file for your database credentials:

# Database Configuration
DB_HOST=your-cluster.tacnode.io
DB_PORT=5432
DB_NAME=example
DB_USER=your_username
DB_PASSWORD=your_password
DB_SSL_MODE=require

# Application Configuration
DEBUG=False
LOG_LEVEL=INFO

Model Definition

Modern SQLAlchemy 2.0 Style

Create your models using SQLAlchemy's modern declarative syntax:

# models.py
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
 
class Base(DeclarativeBase):
    """Base class for all database models."""
    pass
 
class Customer(Base):
    """Customer model representing the customer table."""
    __tablename__ = "customer"
 
    # Primary key
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # Required fields
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
    
    # Optional fields
    phone: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), 
        server_default=func.now(),
        nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), 
        server_default=func.now(),
        onupdate=func.now(),
        nullable=False
    )
 
    def __repr__(self) -> str:
        return f"<Customer(id={self.id}, name='{self.name}', email='{self.email}')>"
 
    def to_dict(self) -> dict:
        """Convert model to dictionary."""
        return {
            'id': self.id,
            'name': self.name,
            'email': self.email,
            'phone': self.phone,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat() if self.created_at else None,
            'updated_at': self.updated_at.isoformat() if self.updated_at else None
        }

Database Configuration

Connection Management

Create a database configuration module:

# database.py
import os
from typing import Optional
from sqlalchemy import create_engine, Engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from models import Base
 
class DatabaseConfig:
    """Database configuration and connection management."""
    
    def __init__(self):
        self.host = os.getenv('DB_HOST', 'localhost')
        self.port = int(os.getenv('DB_PORT', 5432))
        self.database = os.getenv('DB_NAME', 'example')
        self.username = os.getenv('DB_USER', 'postgres')
        self.password = os.getenv('DB_PASSWORD', '')
        self.ssl_mode = os.getenv('DB_SSL_MODE', 'prefer')
        
        self._engine: Optional[Engine] = None
        self._session_factory: Optional[sessionmaker] = None
    
    @property
    def database_url(self) -> str:
        """Generate database URL."""
        return (
            f"postgresql://{self.username}:{self.password}@"
            f"{self.host}:{self.port}/{self.database}?sslmode={self.ssl_mode}"
        )
    
    @property
    def engine(self) -> Engine:
        """Get or create database engine."""
        if self._engine is None:
            self._engine = create_engine(
                self.database_url,
                # Connection pool settings
                poolclass=QueuePool,
                pool_size=10,
                max_overflow=20,
                pool_pre_ping=True,
                pool_recycle=3600,
                # Performance settings
                echo=os.getenv('DEBUG', 'False').lower() == 'true',
                future=True  # Enable SQLAlchemy 2.0 style
            )
        return self._engine
    
    @property
    def session_factory(self) -> sessionmaker:
        """Get or create session factory."""
        if self._session_factory is None:
            self._session_factory = sessionmaker(
                bind=self.engine,
                autocommit=False,
                autoflush=False
            )
        return self._session_factory
    
    def get_session(self) -> Session:
        """Get a new database session."""
        return self.session_factory()
    
    def create_tables(self):
        """Create all tables."""
        Base.metadata.create_all(bind=self.engine)
    
    def drop_tables(self):
        """Drop all tables."""
        Base.metadata.drop_all(bind=self.engine)
 
# Global database instance
db_config = DatabaseConfig()

Data Access Layer

Repository Pattern

Implement a repository pattern for clean data access:

# repositories.py
from typing import List, Optional
from sqlalchemy import select, update, delete
from sqlalchemy.orm import Session
from models import Customer
from database import db_config
 
class CustomerRepository:
    """Repository for Customer operations."""
    
    def __init__(self, session: Optional[Session] = None):
        self.session = session or db_config.get_session()
        self._should_close_session = session is None
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._should_close_session:
            self.session.close()
    
    def create(self, customer_data: dict) -> Customer:
        """Create a new customer."""
        customer = Customer(**customer_data)
        self.session.add(customer)
        self.session.commit()
        self.session.refresh(customer)
        return customer
    
    def get_by_id(self, customer_id: int) -> Optional[Customer]:
        """Get customer by ID."""
        stmt = select(Customer).where(Customer.id == customer_id)
        return self.session.scalar(stmt)
    
    def get_by_email(self, email: str) -> Optional[Customer]:
        """Get customer by email."""
        stmt = select(Customer).where(Customer.email == email)
        return self.session.scalar(stmt)
    
    def get_all(self, active_only: bool = True) -> List[Customer]:
        """Get all customers."""
        stmt = select(Customer)
        if active_only:
            stmt = stmt.where(Customer.is_active == True)
        return list(self.session.scalars(stmt))
    
    def get_by_name_pattern(self, pattern: str) -> List[Customer]:
        """Get customers by name pattern."""
        stmt = select(Customer).where(Customer.name.ilike(f"%{pattern}%"))
        return list(self.session.scalars(stmt))
    
    def update(self, customer_id: int, update_data: dict) -> Optional[Customer]:
        """Update customer."""
        stmt = (
            update(Customer)
            .where(Customer.id == customer_id)
            .values(**update_data)
            .returning(Customer)
        )
        result = self.session.execute(stmt)
        self.session.commit()
        return result.scalar_one_or_none()
    
    def delete(self, customer_id: int) -> bool:
        """Delete customer."""
        stmt = delete(Customer).where(Customer.id == customer_id)
        result = self.session.execute(stmt)
        self.session.commit()
        return result.rowcount > 0
    
    def soft_delete(self, customer_id: int) -> bool:
        """Soft delete customer (mark as inactive)."""
        result = self.update(customer_id, {'is_active': False})
        return result is not None
    
    def bulk_create(self, customers_data: List[dict]) -> List[Customer]:
        """Bulk create customers."""
        customers = [Customer(**data) for data in customers_data]
        self.session.add_all(customers)
        self.session.commit()
        for customer in customers:
            self.session.refresh(customer)
        return customers

CRUD Operations Examples

Complete Application Example

# main.py
import os
from dotenv import load_dotenv
from repositories import CustomerRepository
from database import db_config
 
# Load environment variables
load_dotenv()
 
def demonstrate_crud_operations():
    """Demonstrate comprehensive CRUD operations."""
    
    # Initialize database
    db_config.create_tables()
    
    try:
        # CREATE operations
        print("=== Creating Customers ===")
        create_customers()
        
        # READ operations
        print("\n=== Reading Customers ===")
        read_customers()
        
        # UPDATE operations
        print("\n=== Updating Customer ===")
        update_customer()
        
        # Advanced queries
        print("\n=== Advanced Queries ===")
        advanced_queries()
        
        # DELETE operations
        print("\n=== Deleting Customer ===")
        delete_customer()
        
        # Final state
        print("\n=== Final State ===")
        show_final_state()
        
    except Exception as e:
        print(f"Error occurred: {e}")
        raise
 
def create_customers():
    """Create sample customers."""
    with CustomerRepository() as repo:
        # Create individual customers
        customer1 = repo.create({
            'name': 'Jacob Emily',
            'email': 'jacob.emily@tacnode.io',
            'phone': '+1-555-0101'
        })
        print(f"Created: {customer1}")
        
        customer2 = repo.create({
            'name': 'Michael Emma',
            'email': 'michael.emma@tacnode.io',
            'phone': '+1-555-0102'
        })
        print(f"Created: {customer2}")
        
        # Bulk create
        bulk_data = [
            {
                'name': 'Sarah Wilson',
                'email': 'sarah.wilson@tacnode.io',
                'phone': '+1-555-0103'
            },
            {
                'name': 'David Chen',
                'email': 'david.chen@tacnode.io',
                'phone': '+1-555-0104'
            }
        ]
        bulk_customers = repo.bulk_create(bulk_data)
        print(f"Bulk created {len(bulk_customers)} customers")
 
def read_customers():
    """Demonstrate various read operations."""
    with CustomerRepository() as repo:
        # Get all customers
        all_customers = repo.get_all()
        print(f"Total customers: {len(all_customers)}")
        for customer in all_customers:
            print(f"  - {customer.name} ({customer.email})")
        
        # Get by ID
        customer = repo.get_by_id(1)
        if customer:
            print(f"Customer #1: {customer.name}")
        
        # Get by email
        customer = repo.get_by_email('jacob.emily@tacnode.io')
        if customer:
            print(f"Found by email: {customer.name}")
 
def update_customer():
    """Demonstrate update operations."""
    with CustomerRepository() as repo:
        # Update customer
        updated_customer = repo.update(2, {
            'email': 'michael.emma@gmail.com',
            'phone': '+1-555-9999'
        })
        if updated_customer:
            print(f"Updated customer: {updated_customer.name} -> {updated_customer.email}")
 
def advanced_queries():
    """Demonstrate advanced query operations."""
    with CustomerRepository() as repo:
        # Search by name pattern
        customers = repo.get_by_name_pattern('Emma')
        print(f"Customers with 'Emma' in name: {len(customers)}")
        for customer in customers:
            print(f"  - {customer.name}")
 
def delete_customer():
    """Demonstrate delete operations."""
    with CustomerRepository() as repo:
        # Soft delete
        success = repo.soft_delete(3)
        if success:
            print("Soft deleted customer #3")
        
        # Hard delete
        success = repo.delete(4)
        if success:
            print("Hard deleted customer #4")
 
def show_final_state():
    """Show final state of customers."""
    with CustomerRepository() as repo:
        active_customers = repo.get_all(active_only=True)
        all_customers = repo.get_all(active_only=False)
        
        print(f"Active customers: {len(active_customers)}")
        print(f"Total customers: {len(all_customers)}")
        
        for customer in all_customers:
            status = "Active" if customer.is_active else "Inactive"
            print(f"  - {customer.name} ({status})")
 
if __name__ == '__main__':
    demonstrate_crud_operations()

Async SQLAlchemy Support

For high-performance applications, use async SQLAlchemy:

# async_database.py
import asyncio
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from models import Base
 
class AsyncDatabaseConfig:
    def __init__(self):
        self.database_url = "postgresql+asyncpg://user:password@host:port/database"
        self.engine = create_async_engine(
            self.database_url,
            echo=True,
            future=True
        )
        self.session_factory = async_sessionmaker(
            bind=self.engine,
            class_=AsyncSession,
            autoflush=False,
            autocommit=False
        )
    
    async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
        async with self.session_factory() as session:
            yield session
 
# Async repository example
class AsyncCustomerRepository:
    def __init__(self, session: AsyncSession):
        self.session = session
    
    async def create(self, customer_data: dict) -> Customer:
        customer = Customer(**customer_data)
        self.session.add(customer)
        await self.session.commit()
        await self.session.refresh(customer)
        return customer
    
    async def get_by_id(self, customer_id: int) -> Optional[Customer]:
        stmt = select(Customer).where(Customer.id == customer_id)
        result = await self.session.execute(stmt)
        return result.scalar_one_or_none()

Sample Output

When you run the main application, you'll see output similar to:

=== Creating Customers ===
Created: <Customer(id=1, name='Jacob Emily', email='jacob.emily@tacnode.io')>
Created: <Customer(id=2, name='Michael Emma', email='michael.emma@tacnode.io')>
Bulk created 2 customers

=== Reading Customers ===
Total customers: 4
  - Jacob Emily (jacob.emily@tacnode.io)
  - Michael Emma (michael.emma@tacnode.io)
  - Sarah Wilson (sarah.wilson@tacnode.io)
  - David Chen (david.chen@tacnode.io)
Customer #1: Jacob Emily
Found by email: Jacob Emily

=== Updating Customer ===
Updated customer: Michael Emma -> michael.emma@gmail.com

=== Advanced Queries ===
Customers with 'Emma' in name: 1
  - Michael Emma

=== Deleting Customer ===
Soft deleted customer #3
Hard deleted customer #4

=== Final State ===
Active customers: 2
Total customers: 3
  - Jacob Emily (Active)
  - Michael Emma (Active)
  - Sarah Wilson (Inactive)

Best Practices

Performance Optimization

  1. Use Connection Pooling:

    engine = create_engine(
        database_url,
        pool_size=20,
        max_overflow=30,
        pool_pre_ping=True
    )
  2. Lazy Loading and Eager Loading:

    # Eager loading relationships
    stmt = select(Customer).options(selectinload(Customer.orders))
  3. Batch Operations:

    # Use bulk operations for better performance
    session.bulk_insert_mappings(Customer, customers_data)

Security Best Practices

  1. Use Environment Variables:

    # Never hardcode credentials
    DATABASE_URL = os.getenv('DATABASE_URL')
  2. Input Validation:

    from pydantic import BaseModel, EmailStr
     
    class CustomerCreate(BaseModel):
        name: str
        email: EmailStr
  3. Parameterized Queries (SQLAlchemy handles this automatically):

    # This is safe from SQL injection
    stmt = select(Customer).where(Customer.email == user_input)

This comprehensive guide demonstrates how to build robust, scalable Python applications using SQLAlchemy with Tacnode. The PostgreSQL compatibility ensures full feature support while providing the benefits of a distributed, cloud-native database architecture.