Skip to main content

FastAPI AI Kit uses PostgreSQL with SQLAlchemy 2.0 (async) and Alembic for database migrations.

Connection setup

# .env
DATABASE_URL=postgresql+asyncpg://user:password@localhost:5432/dbname

Connection pool settings in app/config.py:

DATABASE_POOL_SIZE=10       # Active connections
DATABASE_MAX_OVERFLOW=20    # Temporary overflow
DATABASE_POOL_TIMEOUT=30    # Seconds to wait for connection

Alembic migrations

Common commands

# Apply all pending migrations
alembic upgrade head

# Roll back last migration
alembic downgrade -1

# Create a new migration
alembic revision --autogenerate -m "add_user_tier_column"

# View migration history
alembic history --verbose

# Check current revision
alembic current

Migration runs automatically on startup

The kit's start command runs alembic upgrade head before starting the server:

# Dockerfile CMD
CMD sh -c "alembic upgrade head && uvicorn app.main:app ..."

Defining models

Use SQLAlchemy 2.0 Mapped annotations:

# app/models/my_model.py
import uuid
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, JSONB
from app.db import Base

class MyModel(Base):
    __tablename__ = "my_models"
    
    id: Mapped[uuid.UUID] = mapped_column(
        primary_key=True,
        default=uuid.uuid4,
    )
    name: Mapped[str] = mapped_column(String(255))
    data: Mapped[dict] = mapped_column(JSONB, default=dict)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

After defining the model, generate a migration:

alembic revision --autogenerate -m "create_my_models"
alembic upgrade head

Async database sessions

Use the get_db dependency in route handlers:

from app.db import get_db

@router.get("/v1/items/{item_id}")
async def get_item(
    item_id: uuid.UUID,
    db: AsyncSession = Depends(get_db),
):
    item = await db.get(MyModel, item_id)
    if not item:
        raise HTTPException(status_code=404)
    return item

The get_db dependency handles commit, rollback, and session close automatically.

Querying

from sqlalchemy import select

# Get by ID
user = await db.get(User, user_id)

# Query with filters
result = await db.execute(
    select(User)
    .where(User.email == email)
    .where(User.is_active == True)
)
user = result.scalar_one_or_none()

# Query with ordering and limit
result = await db.execute(
    select(Document)
    .where(Document.collection == "company-kb")
    .order_by(Document.created_at.desc())
    .limit(10)
)
docs = result.scalars().all()

pgvector for embeddings

pgvector is installed via the initial migration. Use it in models:

from pgvector.sqlalchemy import Vector

class Document(Base):
    __tablename__ = "documents"
    id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
    content: Mapped[str]
    embedding: Mapped[list[float]] = mapped_column(Vector(1536))
    collection: Mapped[str] = mapped_column(index=True)

# Similarity search
result = await db.execute(
    select(Document)
    .order_by(Document.embedding.cosine_distance(query_embedding))
    .limit(5)
)

Testing with a real database

# conftest.py
@pytest.fixture
async def db():
    async with AsyncSessionLocal() as session:
        yield session
        await session.rollback()  # Clean up after each test

Use a dedicated test database — see TEST_DATABASE_URL in .env.example.