FastAPI AI Kit ships a complete SSE (Server-Sent Events) streaming layer for LLM responses. Token-by-token streaming works with OpenAI, Anthropic, and any OpenAI-compatible provider.
How SSE works
The server keeps an HTTP connection open and pushes events as they arrive:
Client Server
│── POST /v1/chat ───────────────▶│
│ │ ← LLM starts streaming
│◀──── data: {"delta":"Hello"} ───│
│◀──── data: {"delta":"!"} ───────│
│◀──── data: {"delta":" How"} ────│
│◀──── data: [DONE] ──────────────│
│── connection closed ────────────│
Streaming chat endpoint
# POST /v1/chat with streaming
import httpx
def stream_chat(message: str, api_key: str):
with httpx.stream(
"POST",
"https://api.example.com/v1/chat",
headers={"X-API-Key": api_key},
json={"message": message, "stream": True},
timeout=60,
) as response:
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
chunk = json.loads(data)
print(chunk["delta"], end="", flush=True)
Using EventSource in the browser
const evtSource = new EventSource(
`/api/v1/chat?message=${encodeURIComponent(message)}`,
{ headers: { "X-API-Key": apiKey } }
);
evtSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.done) {
evtSource.close();
return;
}
appendToChat(data.delta);
};
How the kit handles streaming
The streaming endpoint in the kit:
@router.post("/v1/chat/stream")
@require_api_key()
async def stream_chat(
body: ChatRequest,
key: APIKey = Depends(get_api_key),
):
async def event_generator():
total_tokens = 0
try:
async for chunk in llm.stream(
messages=body.messages,
model=body.model,
):
total_tokens += chunk.tokens
yield f"data: {chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
except asyncio.CancelledError:
# Client disconnected — clean up and stop
pass
finally:
# Record usage regardless of how stream ended
if total_tokens > 0:
await meter.record(key.id, total_tokens, key.stripe_customer_id)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable Nginx buffering
},
)
Token tracking during streaming
Tokens accumulate across chunks. The kit tallies them and records usage after the stream completes or the client disconnects:
# Chunk format
{
"delta": "Hello", # Token text
"tokens": 1, # Tokens in this chunk
"model": "gpt-4o",
"done": false
}
# Final chunk
{
"delta": "",
"tokens": 0,
"total_tokens": 142, # Total for this response
"done": true
}
Abort handling
When a client disconnects mid-stream, asyncio.CancelledError is raised. The kit catches this and still records partial usage:
except asyncio.CancelledError:
# Stream aborted by client
if total_tokens > 0:
asyncio.create_task(
meter.record(key.id, total_tokens, key.stripe_customer_id)
)
Infrastructure notes
Nginx / load balancers
Disable proxy buffering for SSE to work correctly:
location /v1/chat/stream {
proxy_pass http://api;
proxy_buffering off;
proxy_cache off;
}
The kit sets X-Accel-Buffering: no automatically, which Nginx respects.
Railway / Render
Both support SSE natively — no additional configuration needed.
Vercel
Vercel's serverless functions have a 10-second streaming timeout on hobby plans. Use a dedicated API service for streaming endpoints if you deploy your frontend to Vercel.
