Skip to main content

FastAPI AI Kit ships a complete SSE (Server-Sent Events) streaming layer for LLM responses. Token-by-token streaming works with OpenAI, Anthropic, and any OpenAI-compatible provider.

How SSE works

The server keeps an HTTP connection open and pushes events as they arrive:

Client                              Server
  │── POST /v1/chat ───────────────▶│
  │                                  │ ← LLM starts streaming
  │◀──── data: {"delta":"Hello"} ───│
  │◀──── data: {"delta":"!"} ───────│
  │◀──── data: {"delta":" How"} ────│
  │◀──── data: [DONE] ──────────────│
  │── connection closed ────────────│

Streaming chat endpoint

# POST /v1/chat with streaming
import httpx

def stream_chat(message: str, api_key: str):
    with httpx.stream(
        "POST",
        "https://api.example.com/v1/chat",
        headers={"X-API-Key": api_key},
        json={"message": message, "stream": True},
        timeout=60,
    ) as response:
        for line in response.iter_lines():
            if line.startswith("data: "):
                data = line[6:]
                if data == "[DONE]":
                    break
                chunk = json.loads(data)
                print(chunk["delta"], end="", flush=True)

Using EventSource in the browser

const evtSource = new EventSource(
    `/api/v1/chat?message=${encodeURIComponent(message)}`,
    { headers: { "X-API-Key": apiKey } }
);

evtSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.done) {
        evtSource.close();
        return;
    }
    appendToChat(data.delta);
};

How the kit handles streaming

The streaming endpoint in the kit:

@router.post("/v1/chat/stream")
@require_api_key()
async def stream_chat(
    body: ChatRequest,
    key: APIKey = Depends(get_api_key),
):
    async def event_generator():
        total_tokens = 0
        try:
            async for chunk in llm.stream(
                messages=body.messages,
                model=body.model,
            ):
                total_tokens += chunk.tokens
                yield f"data: {chunk.model_dump_json()}\n\n"
            
            yield "data: [DONE]\n\n"
        except asyncio.CancelledError:
            # Client disconnected — clean up and stop
            pass
        finally:
            # Record usage regardless of how stream ended
            if total_tokens > 0:
                await meter.record(key.id, total_tokens, key.stripe_customer_id)
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Disable Nginx buffering
        },
    )

Token tracking during streaming

Tokens accumulate across chunks. The kit tallies them and records usage after the stream completes or the client disconnects:

# Chunk format
{
    "delta": "Hello",      # Token text
    "tokens": 1,           # Tokens in this chunk
    "model": "gpt-4o",
    "done": false
}

# Final chunk
{
    "delta": "",
    "tokens": 0,
    "total_tokens": 142,   # Total for this response
    "done": true
}

Abort handling

When a client disconnects mid-stream, asyncio.CancelledError is raised. The kit catches this and still records partial usage:

except asyncio.CancelledError:
    # Stream aborted by client
    if total_tokens > 0:
        asyncio.create_task(
            meter.record(key.id, total_tokens, key.stripe_customer_id)
        )

Infrastructure notes

Nginx / load balancers

Disable proxy buffering for SSE to work correctly:

location /v1/chat/stream {
    proxy_pass http://api;
    proxy_buffering off;
    proxy_cache off;
}

The kit sets X-Accel-Buffering: no automatically, which Nginx respects.

Railway / Render

Both support SSE natively — no additional configuration needed.

Vercel

Vercel's serverless functions have a 10-second streaming timeout on hobby plans. Use a dedicated API service for streaming endpoints if you deploy your frontend to Vercel.