Skip to main content
ragfastapipgvectoropenaiproduction

Building a Production RAG Pipeline with FastAPI and pgvector

A complete walkthrough of building a retrieval-augmented generation pipeline: document ingestion, embedding, vector search, and LLM context injection — all in async FastAPI.

FastAPI AI Kit Team··4 min read

Retrieval-Augmented Generation (RAG) is now table stakes for AI products. Users expect your AI to know about your specific documents — not just what GPT-4 learned in training. This post walks through building a production-ready RAG pipeline in FastAPI from scratch, covering the pieces that tutorials usually skip.

What we're building

A document ingestion API that accepts files, chunks and embeds them, stores vectors in Postgres via pgvector, and a query endpoint that retrieves relevant chunks and injects them into LLM prompts.

The final system handles:

  • Incremental re-ingestion without duplicates
  • Chunking strategy for different document types
  • Embedding caching to avoid redundant API calls
  • Async processing via Celery for large documents
  • Collection-level access control

Setting up pgvector

pgvector is a PostgreSQL extension that adds a vector type and similarity search operators. Install it via Alembic migration:

# migrations/versions/001_add_pgvector.py
from alembic import op

def upgrade():
    op.execute("CREATE EXTENSION IF NOT EXISTS vector")
    op.execute("""
        CREATE TABLE documents (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            content TEXT NOT NULL,
            embedding vector(1536),
            metadata JSONB DEFAULT '{}',
            collection VARCHAR(255) NOT NULL,
            created_at TIMESTAMPTZ DEFAULT NOW()
        )
    """)
    # HNSW index for fast approximate search
    op.execute("""
        CREATE INDEX ON documents 
        USING hnsw (embedding vector_cosine_ops)
        WITH (m = 16, ef_construction = 64)
    """)

The HNSW index is crucial for performance. Without it, similarity search does a full table scan — fine for thousands of vectors, unusable for millions.

Document ingestion pipeline

The ingestion pipeline has four stages: parse, chunk, embed, upsert.

# app/rag/ingestion.py
from app.embeddings import get_embedding
from app.models import Document
from app.db import AsyncSession

CHUNK_SIZE = 512  # tokens
CHUNK_OVERLAP = 64

async def ingest_document(
    source: str,
    collection: str,
    db: AsyncSession,
) -> int:
    """Returns number of chunks ingested."""
    text = await parse_document(source)
    chunks = split_into_chunks(text, CHUNK_SIZE, CHUNK_OVERLAP)
    
    embedded = []
    for chunk in chunks:
        # Cache embeddings to avoid re-embedding identical text
        embedding = await get_embedding(chunk, cache=True)
        embedded.append(Document(
            content=chunk,
            embedding=embedding,
            collection=collection,
            metadata={"source": source},
        ))
    
    # Upsert by content hash — idempotent re-ingestion
    await db.execute(
        insert(Document)
        .values([e.model_dump() for e in embedded])
        .on_conflict_do_nothing(index_elements=["content_hash"])
    )
    await db.commit()
    return len(embedded)

Chunking strategy

Fixed-size chunking on character count is wrong. Chunk on token boundaries using tiktoken:

import tiktoken

def split_into_chunks(
    text: str,
    max_tokens: int = 512,
    overlap: int = 64,
) -> list[str]:
    enc = tiktoken.get_encoding("cl100k_base")
    tokens = enc.encode(text)
    chunks = []
    start = 0
    while start < len(tokens):
        end = min(start + max_tokens, len(tokens))
        chunk = enc.decode(tokens[start:end])
        chunks.append(chunk)
        start += max_tokens - overlap
    return chunks

For structured documents (PDFs, Markdown), split on semantic boundaries first (paragraphs, headings), then apply token limits within each section. This keeps context coherent within each chunk.

Query with context injection

The query pipeline retrieves the top-k most similar chunks and injects them into the LLM prompt:

# app/rag/query.py
async def rag_query(
    question: str,
    collection: str,
    top_k: int = 5,
    db: AsyncSession = Depends(get_db),
) -> RAGResponse:
    # Embed the question
    query_embedding = await get_embedding(question)
    
    # Retrieve similar chunks using cosine distance
    results = await db.execute(
        select(Document)
        .where(Document.collection == collection)
        .order_by(Document.embedding.cosine_distance(query_embedding))
        .limit(top_k)
    )
    chunks = results.scalars().all()
    
    # Build context-augmented prompt
    context = "\n\n---\n\n".join(c.content for c in chunks)
    prompt = f"""Answer the question based on the context below.
If the answer is not in the context, say so explicitly.

Context:
{context}

Question: {question}"""
    
    response = await llm.chat(
        messages=[{"role": "user", "content": prompt}],
        track_tokens=True,
    )
    
    return RAGResponse(
        answer=response.content,
        sources=[c.metadata.get("source") for c in chunks],
        tokens=response.tokens,
    )

Handling large documents asynchronously

PDFs with hundreds of pages need async processing. Return a job ID immediately and process in a Celery worker:

@router.post("/v1/rag/ingest")
async def ingest_endpoint(
    file: UploadFile,
    collection: str,
    key: APIKey = Depends(get_api_key),
):
    doc_id = await storage.upload(file)
    job = ingest_document_task.delay(doc_id, collection, key.id)
    return {"job_id": job.id, "status": "processing"}

@celery.task
def ingest_document_task(doc_id: str, collection: str, key_id: str):
    with SyncSession() as db:
        chunks = ingest_document(doc_id, collection, db)
        # Update job status for polling
        update_job_status(job_id=current_task.request.id, chunks=chunks)

Embedding caching

Embedding API calls are the expensive part of RAG ingestion. Cache them in Redis:

import hashlib
from app.cache import cache

@cache(ttl=86400 * 30)  # 30 day cache
async def get_embedding(text: str) -> list[float]:
    key = hashlib.sha256(text.encode()).hexdigest()
    # Redis handles hit/miss
    return await openai.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )

This makes re-ingestion essentially free — only new or modified chunks hit the API.

What FastAPI AI Kit includes

FastAPI AI Kit ships this entire pipeline pre-built: ingestion endpoint, Celery worker, pgvector models with HNSW index, embedding caching, and the query endpoint. The only thing you add is your documents and collection names.

Build your AI backend with FastAPI AI Kit.

Clone, configure, and ship — everything is already wired up.

Read the docs
No subscriptions · One-time payment · Lifetime updates