DocsGuidesEcosystemClientSqlalchemy

SQLAlchemy Development Guide

Learn how to build Python applications with SQLAlchemy and Tacnode, featuring modern ORM patterns, async support, and comprehensive CRUD examples.

SQLAlchemy is the most popular Python ORM framework, providing both high-level ORM patterns and low-level database access. This guide demonstrates how to leverage SQLAlchemy’s powerful features with Tacnode’s PostgreSQL compatibility to build robust, scalable Python applications.

Prerequisites

Before starting, ensure you have:

  • Python 3.7+ (Python 3.9+ recommended)
  • pip for package management
  • A Tacnode database with appropriate access credentials

Database Setup

Create your database structure in Tacnode:

-- Create the database
CREATE DATABASE example;

-- Connect to the database and create a table
\c example;

CREATE TABLE customer (
    id BIGSERIAL PRIMARY KEY,
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    phone VARCHAR(20),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE
);

-- Create an index for better query performance
CREATE INDEX idx_customer_email ON customer(email);
CREATE INDEX idx_customer_active ON customer(is_active);

Project Setup

Installing Dependencies

Install SQLAlchemy and the PostgreSQL driver:

# Core dependencies
pip install SQLAlchemy psycopg2-binary

# Optional but recommended dependencies
pip install alembic          # Database migrations
pip install sqlalchemy-utils # Additional utilities
pip install python-dotenv   # Environment variables

For async applications, also install:

pip install asyncpg sqlalchemy[asyncio]

Environment Configuration

Create a .env file for your database credentials:

# Database Configuration
DB_HOST=your-cluster.tacnode.io
DB_PORT=5432
DB_NAME=example
DB_USER=your_username
DB_PASSWORD=your_password
DB_SSL_MODE=require

# Application Configuration
DEBUG=False
LOG_LEVEL=INFO

Model Definition

Modern SQLAlchemy 2.0 Style

Create your models using SQLAlchemy’s modern declarative syntax:

# models.py
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    """Base class for all database models."""
    pass

class Customer(Base):
    """Customer model representing the customer table."""
    __tablename__ = "customer"

    # Primary key
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # Required fields
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
    
    # Optional fields
    phone: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), 
        server_default=func.now(),
        nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), 
        server_default=func.now(),
        onupdate=func.now(),
        nullable=False
    )

    def __repr__(self) -> str:
        return f"<Customer(id={self.id}, name='{self.name}', email='{self.email}')>"

    def to_dict(self) -> dict:
        """Convert model to dictionary."""
        return {
            'id': self.id,
            'name': self.name,
            'email': self.email,
            'phone': self.phone,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat() if self.created_at else None,
            'updated_at': self.updated_at.isoformat() if self.updated_at else None
        }

Database Configuration

Connection Management

Create a database configuration module:

# database.py
import os
from typing import Optional
from sqlalchemy import create_engine, Engine
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import QueuePool
from models import Base

class DatabaseConfig:
    """Database configuration and connection management."""
    
    def __init__(self):
        self.host = os.getenv('DB_HOST', 'localhost')
        self.port = int(os.getenv('DB_PORT', 5432))
        self.database = os.getenv('DB_NAME', 'example')
        self.username = os.getenv('DB_USER', 'postgres')
        self.password = os.getenv('DB_PASSWORD', '')
        self.ssl_mode = os.getenv('DB_SSL_MODE', 'prefer')
        
        self._engine: Optional[Engine] = None
        self._session_factory: Optional[sessionmaker] = None
    
    @property
    def database_url(self) -> str:
        """Generate database URL."""
        return (
            f"postgresql://{self.username}:{self.password}@"
            f"{self.host}:{self.port}/{self.database}?sslmode={self.ssl_mode}"
        )
    
    @property
    def engine(self) -> Engine:
        """Get or create database engine."""
        if self._engine is None:
            self._engine = create_engine(
                self.database_url,
                # Connection pool settings
                poolclass=QueuePool,
                pool_size=10,
                max_overflow=20,
                pool_pre_ping=True,
                pool_recycle=3600,
                # Performance settings
                echo=os.getenv('DEBUG', 'False').lower() == 'true',
                future=True  # Enable SQLAlchemy 2.0 style
            )
        return self._engine
    
    @property
    def session_factory(self) -> sessionmaker:
        """Get or create session factory."""
        if self._session_factory is None:
            self._session_factory = sessionmaker(
                bind=self.engine,
                autocommit=False,
                autoflush=False
            )
        return self._session_factory
    
    def get_session(self) -> Session:
        """Get a new database session."""
        return self.session_factory()
    
    def create_tables(self):
        """Create all tables."""
        Base.metadata.create_all(bind=self.engine)
    
    def drop_tables(self):
        """Drop all tables."""
        Base.metadata.drop_all(bind=self.engine)

# Global database instance
db_config = DatabaseConfig()

Data Access Layer

Repository Pattern

Implement a repository pattern for clean data access:

# repositories.py
from typing import List, Optional
from sqlalchemy import select, update, delete
from sqlalchemy.orm import Session
from models import Customer
from database import db_config

class CustomerRepository:
    """Repository for Customer operations."""
    
    def __init__(self, session: Optional[Session] = None):
        self.session = session or db_config.get_session()
        self._should_close_session = session is None
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._should_close_session:
            self.session.close()
    
    def create(self, customer_data: dict) -> Customer:
        """Create a new customer."""
        customer = Customer(**customer_data)
        self.session.add(customer)
        self.session.commit()
        self.session.refresh(customer)
        return customer
    
    def get_by_id(self, customer_id: int) -> Optional[Customer]:
        """Get customer by ID."""
        stmt = select(Customer).where(Customer.id == customer_id)
        return self.session.scalar(stmt)
    
    def get_by_email(self, email: str) -> Optional[Customer]:
        """Get customer by email."""
        stmt = select(Customer).where(Customer.email == email)
        return self.session.scalar(stmt)
    
    def get_all(self, active_only: bool = True) -> List[Customer]:
        """Get all customers."""
        stmt = select(Customer)
        if active_only:
            stmt = stmt.where(Customer.is_active == True)
        return list(self.session.scalars(stmt))
    
    def get_by_name_pattern(self, pattern: str) -> List[Customer]:
        """Get customers by name pattern."""
        stmt = select(Customer).where(Customer.name.ilike(f"%{pattern}%"))
        return list(self.session.scalars(stmt))
    
    def update(self, customer_id: int, update_data: dict) -> Optional[Customer]:
        """Update customer."""
        stmt = (
            update(Customer)
            .where(Customer.id == customer_id)
            .values(**update_data)
            .returning(Customer)
        )
        result = self.session.execute(stmt)
        self.session.commit()
        return result.scalar_one_or_none()
    
    def delete(self, customer_id: int) -> bool:
        """Delete customer."""
        stmt = delete(Customer).where(Customer.id == customer_id)
        result = self.session.execute(stmt)
        self.session.commit()
        return result.rowcount > 0
    
    def soft_delete(self, customer_id: int) -> bool:
        """Soft delete customer (mark as inactive)."""
        result = self.update(customer_id, {'is_active': False})
        return result is not None
    
    def bulk_create(self, customers_data: List[dict]) -> List[Customer]:
        """Bulk create customers."""
        customers = [Customer(**data) for data in customers_data]
        self.session.add_all(customers)
        self.session.commit()
        for customer in customers:
            self.session.refresh(customer)
        return customers

CRUD Operations Examples

Complete Application Example

# main.py
import os
from dotenv import load_dotenv
from repositories import CustomerRepository
from database import db_config

# Load environment variables
load_dotenv()

def demonstrate_crud_operations():
    """Demonstrate comprehensive CRUD operations."""
    
    # Initialize database
    db_config.create_tables()
    
    try:
        # CREATE operations
        print("=== Creating Customers ===")
        create_customers()
        
        # READ operations
        print("\n=== Reading Customers ===")
        read_customers()
        
        # UPDATE operations
        print("\n=== Updating Customer ===")
        update_customer()
        
        # Advanced queries
        print("\n=== Advanced Queries ===")
        advanced_queries()
        
        # DELETE operations
        print("\n=== Deleting Customer ===")
        delete_customer()
        
        # Final state
        print("\n=== Final State ===")
        show_final_state()
        
    except Exception as e:
        print(f"Error occurred: {e}")
        raise

def create_customers():
    """Create sample customers."""
    with CustomerRepository() as repo:
        # Create individual customers
        customer1 = repo.create({
            'name': 'Jacob Emily',
            'email': 'jacob.emily@tacnode.io',
            'phone': '+1-555-0101'
        })
        print(f"Created: {customer1}")
        
        customer2 = repo.create({
            'name': 'Michael Emma',
            'email': 'michael.emma@tacnode.io',
            'phone': '+1-555-0102'
        })
        print(f"Created: {customer2}")
        
        # Bulk create
        bulk_data = [
            {
                'name': 'Sarah Wilson',
                'email': 'sarah.wilson@tacnode.io',
                'phone': '+1-555-0103'
            },
            {
                'name': 'David Chen',
                'email': 'david.chen@tacnode.io',
                'phone': '+1-555-0104'
            }
        ]
        bulk_customers = repo.bulk_create(bulk_data)
        print(f"Bulk created {len(bulk_customers)} customers")

def read_customers():
    """Demonstrate various read operations."""
    with CustomerRepository() as repo:
        # Get all customers
        all_customers = repo.get_all()
        print(f"Total customers: {len(all_customers)}")
        for customer in all_customers:
            print(f"  - {customer.name} ({customer.email})")
        
        # Get by ID
        customer = repo.get_by_id(1)
        if customer:
            print(f"Customer #1: {customer.name}")
        
        # Get by email
        customer = repo.get_by_email('jacob.emily@tacnode.io')
        if customer:
            print(f"Found by email: {customer.name}")

def update_customer():
    """Demonstrate update operations."""
    with CustomerRepository() as repo:
        # Update customer
        updated_customer = repo.update(2, {
            'email': 'michael.emma@gmail.com',
            'phone': '+1-555-9999'
        })
        if updated_customer:
            print(f"Updated customer: {updated_customer.name} -> {updated_customer.email}")

def advanced_queries():
    """Demonstrate advanced query operations."""
    with CustomerRepository() as repo:
        # Search by name pattern
        customers = repo.get_by_name_pattern('Emma')
        print(f"Customers with 'Emma' in name: {len(customers)}")
        for customer in customers:
            print(f"  - {customer.name}")

def delete_customer():
    """Demonstrate delete operations."""
    with CustomerRepository() as repo:
        # Soft delete
        success = repo.soft_delete(3)
        if success:
            print("Soft deleted customer #3")
        
        # Hard delete
        success = repo.delete(4)
        if success:
            print("Hard deleted customer #4")

def show_final_state():
    """Show final state of customers."""
    with CustomerRepository() as repo:
        active_customers = repo.get_all(active_only=True)
        all_customers = repo.get_all(active_only=False)
        
        print(f"Active customers: {len(active_customers)}")
        print(f"Total customers: {len(all_customers)}")
        
        for customer in all_customers:
            status = "Active" if customer.is_active else "Inactive"
            print(f"  - {customer.name} ({status})")

if __name__ == '__main__':
    demonstrate_crud_operations()

Async SQLAlchemy Support

For high-performance applications, use async SQLAlchemy:

# async_database.py
import asyncio
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from models import Base

class AsyncDatabaseConfig:
    def __init__(self):
        self.database_url = "postgresql+asyncpg://user:password@host:port/database"
        self.engine = create_async_engine(
            self.database_url,
            echo=True,
            future=True
        )
        self.session_factory = async_sessionmaker(
            bind=self.engine,
            class_=AsyncSession,
            autoflush=False,
            autocommit=False
        )
    
    async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
        async with self.session_factory() as session:
            yield session

# Async repository example
class AsyncCustomerRepository:
    def __init__(self, session: AsyncSession):
        self.session = session
    
    async def create(self, customer_data: dict) -> Customer:
        customer = Customer(**customer_data)
        self.session.add(customer)
        await self.session.commit()
        await self.session.refresh(customer)
        return customer
    
    async def get_by_id(self, customer_id: int) -> Optional[Customer]:
        stmt = select(Customer).where(Customer.id == customer_id)
        result = await self.session.execute(stmt)
        return result.scalar_one_or_none()

Sample Output

When you run the main application, you’ll see output similar to:

=== Creating Customers ===
Created: <Customer(id=1, name='Jacob Emily', email='jacob.emily@tacnode.io')>
Created: <Customer(id=2, name='Michael Emma', email='michael.emma@tacnode.io')>
Bulk created 2 customers

=== Reading Customers ===
Total customers: 4
  - Jacob Emily (jacob.emily@tacnode.io)
  - Michael Emma (michael.emma@tacnode.io)
  - Sarah Wilson (sarah.wilson@tacnode.io)
  - David Chen (david.chen@tacnode.io)
Customer #1: Jacob Emily
Found by email: Jacob Emily

=== Updating Customer ===
Updated customer: Michael Emma -> michael.emma@gmail.com

=== Advanced Queries ===
Customers with 'Emma' in name: 1
  - Michael Emma

=== Deleting Customer ===
Soft deleted customer #3
Hard deleted customer #4

=== Final State ===
Active customers: 2
Total customers: 3
  - Jacob Emily (Active)
  - Michael Emma (Active)
  - Sarah Wilson (Inactive)

Best Practices

Performance Optimization

  1. Use Connection Pooling:

    engine = create_engine(
        database_url,
        pool_size=20,
        max_overflow=30,
        pool_pre_ping=True
    )
  2. Lazy Loading and Eager Loading:

    # Eager loading relationships
    stmt = select(Customer).options(selectinload(Customer.orders))
  3. Batch Operations:

    # Use bulk operations for better performance
    session.bulk_insert_mappings(Customer, customers_data)

Security Best Practices

  1. Use Environment Variables:

    # Never hardcode credentials
    DATABASE_URL = os.getenv('DATABASE_URL')
  2. Input Validation:

    from pydantic import BaseModel, EmailStr
    
    class CustomerCreate(BaseModel):
        name: str
        email: EmailStr
  3. Parameterized Queries (SQLAlchemy handles this automatically):

    # This is safe from SQL injection
    stmt = select(Customer).where(Customer.email == user_input)

This comprehensive guide demonstrates how to build robust, scalable Python applications using SQLAlchemy with Tacnode. The PostgreSQL compatibility ensures full feature support while providing the benefits of a distributed, cloud-native database architecture.