FastAPI AI Kit uses PostgreSQL with SQLAlchemy 2.0 (async) and Alembic for database migrations.
Connection setup
# .env
DATABASE_URL=postgresql+asyncpg://user:password@localhost:5432/dbname
Connection pool settings in app/config.py:
DATABASE_POOL_SIZE=10 # Active connections
DATABASE_MAX_OVERFLOW=20 # Temporary overflow
DATABASE_POOL_TIMEOUT=30 # Seconds to wait for connection
Alembic migrations
Common commands
# Apply all pending migrations
alembic upgrade head
# Roll back last migration
alembic downgrade -1
# Create a new migration
alembic revision --autogenerate -m "add_user_tier_column"
# View migration history
alembic history --verbose
# Check current revision
alembic current
Migration runs automatically on startup
The kit's start command runs alembic upgrade head before starting the server:
# Dockerfile CMD
CMD sh -c "alembic upgrade head && uvicorn app.main:app ..."
Defining models
Use SQLAlchemy 2.0 Mapped annotations:
# app/models/my_model.py
import uuid
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, JSONB
from app.db import Base
class MyModel(Base):
__tablename__ = "my_models"
id: Mapped[uuid.UUID] = mapped_column(
primary_key=True,
default=uuid.uuid4,
)
name: Mapped[str] = mapped_column(String(255))
data: Mapped[dict] = mapped_column(JSONB, default=dict)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
After defining the model, generate a migration:
alembic revision --autogenerate -m "create_my_models"
alembic upgrade head
Async database sessions
Use the get_db dependency in route handlers:
from app.db import get_db
@router.get("/v1/items/{item_id}")
async def get_item(
item_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
):
item = await db.get(MyModel, item_id)
if not item:
raise HTTPException(status_code=404)
return item
The get_db dependency handles commit, rollback, and session close automatically.
Querying
from sqlalchemy import select
# Get by ID
user = await db.get(User, user_id)
# Query with filters
result = await db.execute(
select(User)
.where(User.email == email)
.where(User.is_active == True)
)
user = result.scalar_one_or_none()
# Query with ordering and limit
result = await db.execute(
select(Document)
.where(Document.collection == "company-kb")
.order_by(Document.created_at.desc())
.limit(10)
)
docs = result.scalars().all()
pgvector for embeddings
pgvector is installed via the initial migration. Use it in models:
from pgvector.sqlalchemy import Vector
class Document(Base):
__tablename__ = "documents"
id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
content: Mapped[str]
embedding: Mapped[list[float]] = mapped_column(Vector(1536))
collection: Mapped[str] = mapped_column(index=True)
# Similarity search
result = await db.execute(
select(Document)
.order_by(Document.embedding.cosine_distance(query_embedding))
.limit(5)
)
Testing with a real database
# conftest.py
@pytest.fixture
async def db():
async with AsyncSessionLocal() as session:
yield session
await session.rollback() # Clean up after each test
Use a dedicated test database — see TEST_DATABASE_URL in .env.example.
