FastAPI AI Kit uses Celery + Redis for background job processing. Long-running operations — document ingestion, batch LLM processing, email sending — run in dedicated workers without blocking API threads.
Architecture
HTTP Request → FastAPI → Enqueue task → Return job_id
↓
Redis (broker) → Celery Worker → Process → Update DB → Done
↓
HTTP Polling → GET /v1/jobs/{id} → Return status/result
Running workers
Local development (docker-compose)
The included docker-compose.yml starts a worker automatically:
docker-compose up -d
# Starts: db, redis, api, worker
Manual worker start
celery -A app.worker worker --loglevel=info --concurrency=2
Beat scheduler (cron jobs)
celery -A app.worker beat --loglevel=info
Beat handles recurring tasks like usage-to-Stripe metering flushes.
Built-in tasks
Document ingestion
Triggered automatically by POST /v1/rag/ingest for large files.
Usage metering flush
Runs every 5 minutes, reporting buffered token counts to Stripe.
Cleanup jobs
Daily cleanup of expired sessions, old job records, and orphaned uploads.
Writing your own tasks
# app/tasks/my_tasks.py
from app.worker import celery_app
from app.db import SyncSession
from app.models import MyModel
@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60, # seconds
)
def process_my_data(self, item_id: str, user_id: str):
"""Example task with retry logic."""
try:
with SyncSession() as db:
item = db.get(MyModel, item_id)
# ... process item ...
db.commit()
except Exception as exc:
raise self.retry(exc=exc)
Note: Celery tasks use sync sessions (SyncSession), not async. The Celery worker runs in its own thread pool, separate from the asyncio event loop.
Enqueueing tasks from FastAPI
from app.tasks.my_tasks import process_my_data
@router.post("/v1/process")
async def process(
body: ProcessRequest,
key: APIKey = Depends(get_api_key),
):
# Enqueue — returns immediately
task = process_my_data.delay(
item_id=str(body.item_id),
user_id=str(key.owner_id),
)
return {"job_id": task.id, "status": "queued"}
Checking job status
GET /v1/jobs/{job_id}
# Response
{
"job_id": "uuid",
"status": "processing", # pending | processing | completed | failed
"progress": {
"current": 45,
"total": 142
},
"result": null, # Set when completed
"error": null, # Set when failed
"created_at": "2024-11-15T10:30:00Z",
"completed_at": null
}
Progress updates
Update task state from within a task:
@celery_app.task(bind=True)
def process_batch(self, items: list):
for i, item in enumerate(items):
process_item(item)
# Update progress every 10 items
if i % 10 == 0:
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": len(items)},
)
Periodic tasks (beat)
Add to celery.conf.beat_schedule:
# app/worker.py
celery_app.conf.beat_schedule = {
"flush-usage-to-stripe": {
"task": "app.tasks.billing.flush_usage",
"schedule": 300, # Every 5 minutes
},
"cleanup-expired-sessions": {
"task": "app.tasks.cleanup.expire_sessions",
"schedule": crontab(hour=3, minute=0), # Daily at 3 AM
},
}
Configuration
# .env
REDIS_URL=redis://localhost:6379
CELERY_WORKER_CONCURRENCY=2 # Workers per process
CELERY_TASK_ALWAYS_EAGER=false # Set to true for testing
Testing tasks
Use CELERY_TASK_ALWAYS_EAGER=True in tests to run tasks synchronously:
# conftest.py
@pytest.fixture(autouse=True)
def eager_celery():
with celery_app.conf.override(CELERY_TASK_ALWAYS_EAGER=True):
yield
