Skip to main content
fastapiceleryredisbackground-jobsasyncproduction

Async Background Jobs with Celery and FastAPI

How to offload long-running LLM tasks to Celery workers in FastAPI — job queuing, status polling, result storage, and monitoring with Flower.

FastAPI AI Kit Team··3 min read

LLM inference can take 30 seconds. Document ingestion can take minutes. Running these in your HTTP handlers kills concurrency and causes timeouts. Celery workers solve this — but wiring Celery into FastAPI correctly has several non-obvious pieces. This guide covers the complete setup.

Why you need background jobs for AI APIs

When a FastAPI handler awaits an LLM call, the async event loop is free to handle other requests. That's fine for short calls. But for:

  • Document processing pipelines (multiple LLM calls per document)
  • Batch embedding jobs
  • Email/notification sending after LLM processing
  • Long-running chains with multiple steps

...you want to return an immediate response with a job ID and process asynchronously.

Celery application setup

# app/worker.py
from celery import Celery
from app.config import settings

celery_app = Celery(
    "fastapikit",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL,
    include=["app.tasks"],
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_expires=3600,  # Results expire in 1 hour
    worker_prefetch_multiplier=1,  # One task at a time per worker
    task_acks_late=True,  # Acknowledge after completion, not before
    task_reject_on_worker_lost=True,  # Requeue if worker crashes
)

task_acks_late=True is critical for AI tasks — if a worker crashes mid-inference, the task requeues rather than disappearing.

Job status tracking

Store job status in Postgres for persistence beyond Redis TTL:

# app/models/job.py
class Job(Base):
    __tablename__ = "jobs"
    
    id: Mapped[str] = mapped_column(primary_key=True)  # Celery task ID
    task_name: Mapped[str]
    status: Mapped[str] = mapped_column(default="pending")
    result: Mapped[dict | None] = mapped_column(JSONB)
    error: Mapped[str | None]
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    completed_at: Mapped[datetime | None]
    
    # Optional: link to API key for access control
    api_key_id: Mapped[uuid.UUID | None]

Writing tasks

# app/tasks/document.py
from app.worker import celery_app
from celery import current_task

@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_document(self, doc_id: str, collection: str, key_id: str):
    """Process a document for RAG ingestion."""
    try:
        update_job_status(self.request.id, "processing")
        
        with SyncSession() as db:
            text = storage.read(doc_id)
            chunks = split_into_chunks(text)
            
            for i, chunk in enumerate(chunks):
                embedding = sync_get_embedding(chunk)
                db.add(Document(
                    content=chunk,
                    embedding=embedding,
                    collection=collection,
                ))
                # Update progress periodically
                if i % 10 == 0:
                    current_task.update_state(
                        state="PROGRESS",
                        meta={"current": i, "total": len(chunks)},
                    )
            
            db.commit()
        
        update_job_status(
            self.request.id,
            "completed",
            result={"chunks": len(chunks), "doc_id": doc_id},
        )
        
    except Exception as exc:
        update_job_status(self.request.id, "failed", error=str(exc))
        raise self.retry(exc=exc)

Enqueueing from FastAPI endpoints

@router.post("/v1/documents/ingest")
async def ingest_document(
    file: UploadFile,
    collection: str,
    key: APIKey = Depends(get_api_key),
    db: AsyncSession = Depends(get_db),
):
    doc_id = await storage.upload(file)
    
    # Create job record before enqueueing
    task_id = str(uuid.uuid4())
    db.add(Job(id=task_id, task_name="process_document", api_key_id=key.id))
    await db.commit()
    
    process_document.apply_async(
        args=[doc_id, collection, str(key.id)],
        task_id=task_id,
        countdown=0,  # Start immediately
    )
    
    return {"job_id": task_id, "status": "queued"}

Status polling endpoint

@router.get("/v1/jobs/{job_id}")
async def get_job_status(
    job_id: str,
    key: APIKey = Depends(get_api_key),
    db: AsyncSession = Depends(get_db),
):
    job = await db.get(Job, job_id)
    
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")
    
    # Verify the requesting key owns this job
    if job.api_key_id != key.id:
        raise HTTPException(status_code=403, detail="Not authorized")
    
    # Check Celery for live progress if still processing
    if job.status == "processing":
        task = celery_app.AsyncResult(job_id)
        if task.state == "PROGRESS":
            return {
                "job_id": job_id,
                "status": "processing",
                "progress": task.info,
            }
    
    return {
        "job_id": job_id,
        "status": job.status,
        "result": job.result,
        "error": job.error,
        "created_at": job.created_at.isoformat(),
        "completed_at": job.completed_at.isoformat() if job.completed_at else None,
    }

Running workers

Local development via docker-compose:

# docker-compose.yml
services:
  worker:
    build: .
    command: celery -A app.worker worker --loglevel=info --concurrency=2
    environment:
      - DATABASE_URL=${DATABASE_URL}
      - REDIS_URL=${REDIS_URL}
    depends_on:
      - redis
      - db

For production, run workers as a separate service on Railway or Render. Scale workers independently from your API servers.

FastAPI AI Kit ships this complete Celery setup — the celery app, job model, task decorator, status endpoint, and docker-compose worker service — all pre-configured.

Build your AI backend with FastAPI AI Kit.

Clone, configure, and ship — everything is already wired up.

Read the docs
No subscriptions · One-time payment · Lifetime updates