Skip to main content

FastAPI AI Kit uses Celery + Redis for background job processing. Long-running operations — document ingestion, batch LLM processing, email sending — run in dedicated workers without blocking API threads.

Architecture

HTTP Request → FastAPI → Enqueue task → Return job_id
                              ↓
Redis (broker) → Celery Worker → Process → Update DB → Done
                                                ↓
HTTP Polling → GET /v1/jobs/{id} → Return status/result

Running workers

Local development (docker-compose)

The included docker-compose.yml starts a worker automatically:

docker-compose up -d
# Starts: db, redis, api, worker

Manual worker start

celery -A app.worker worker --loglevel=info --concurrency=2

Beat scheduler (cron jobs)

celery -A app.worker beat --loglevel=info

Beat handles recurring tasks like usage-to-Stripe metering flushes.

Built-in tasks

Document ingestion

Triggered automatically by POST /v1/rag/ingest for large files.

Usage metering flush

Runs every 5 minutes, reporting buffered token counts to Stripe.

Cleanup jobs

Daily cleanup of expired sessions, old job records, and orphaned uploads.

Writing your own tasks

# app/tasks/my_tasks.py
from app.worker import celery_app
from app.db import SyncSession
from app.models import MyModel

@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,  # seconds
)
def process_my_data(self, item_id: str, user_id: str):
    """Example task with retry logic."""
    try:
        with SyncSession() as db:
            item = db.get(MyModel, item_id)
            # ... process item ...
            db.commit()
    except Exception as exc:
        raise self.retry(exc=exc)

Note: Celery tasks use sync sessions (SyncSession), not async. The Celery worker runs in its own thread pool, separate from the asyncio event loop.

Enqueueing tasks from FastAPI

from app.tasks.my_tasks import process_my_data

@router.post("/v1/process")
async def process(
    body: ProcessRequest,
    key: APIKey = Depends(get_api_key),
):
    # Enqueue — returns immediately
    task = process_my_data.delay(
        item_id=str(body.item_id),
        user_id=str(key.owner_id),
    )
    return {"job_id": task.id, "status": "queued"}

Checking job status

GET /v1/jobs/{job_id}

# Response
{
    "job_id": "uuid",
    "status": "processing",   # pending | processing | completed | failed
    "progress": {
        "current": 45,
        "total": 142
    },
    "result": null,           # Set when completed
    "error": null,            # Set when failed
    "created_at": "2024-11-15T10:30:00Z",
    "completed_at": null
}

Progress updates

Update task state from within a task:

@celery_app.task(bind=True)
def process_batch(self, items: list):
    for i, item in enumerate(items):
        process_item(item)
        
        # Update progress every 10 items
        if i % 10 == 0:
            self.update_state(
                state="PROGRESS",
                meta={"current": i + 1, "total": len(items)},
            )

Periodic tasks (beat)

Add to celery.conf.beat_schedule:

# app/worker.py
celery_app.conf.beat_schedule = {
    "flush-usage-to-stripe": {
        "task": "app.tasks.billing.flush_usage",
        "schedule": 300,  # Every 5 minutes
    },
    "cleanup-expired-sessions": {
        "task": "app.tasks.cleanup.expire_sessions",
        "schedule": crontab(hour=3, minute=0),  # Daily at 3 AM
    },
}

Configuration

# .env
REDIS_URL=redis://localhost:6379
CELERY_WORKER_CONCURRENCY=2      # Workers per process
CELERY_TASK_ALWAYS_EAGER=false   # Set to true for testing

Testing tasks

Use CELERY_TASK_ALWAYS_EAGER=True in tests to run tasks synchronously:

# conftest.py
@pytest.fixture(autouse=True)
def eager_celery():
    with celery_app.conf.override(CELERY_TASK_ALWAYS_EAGER=True):
        yield