Skip to main content
FastAPILLMStreamingSSEOpenAI

Adding streaming LLM responses with Server-Sent Events in FastAPI

How to implement real-time streaming chat responses using SSE in FastAPI, with token counting and proper error handling.

FastAPI AI Kit Team··3 min read

Adding streaming LLM responses with Server-Sent Events in FastAPI

Nobody wants to stare at a loading spinner for 10 seconds while your LLM generates a response. Streaming is how you give users immediate feedback — and it's simpler to implement than most tutorials suggest.

This post covers the full implementation: the FastAPI endpoint, the OpenAI streaming call, token counting, and the client-side EventSource setup.

Why Server-Sent Events over WebSockets?

For LLM streaming, SSE is almost always the better choice:

  • Unidirectional — the client sends one request, the server streams the response. LLM chat fits this perfectly.
  • HTTP-native — SSE works over standard HTTP/1.1 and HTTP/2. Proxies, CDNs, and load balancers handle it correctly without special config.
  • Reconnection built-in — the browser EventSource API automatically reconnects on drop.
  • Simpler — no WebSocket upgrade, no keep-alive pings, no connection state machine.

WebSockets make sense when you need bidirectional real-time communication (think collaborative editing or live games). For streaming text, SSE is cleaner.

The FastAPI streaming endpoint

from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from app.core.security import require_api_key
from app.services.llm_service import LLMService
from app.schemas.chat import ChatRequest
import json

router = APIRouter()

@router.post("/v1/chat/stream")
@require_api_key()
async def stream_chat(
    body: ChatRequest,
    llm: LLMService = Depends(get_llm_service),
    key = Depends(get_api_key),
):
    async def event_generator():
        total_tokens = 0
        try:
            async for chunk in llm.stream_chat(body.messages):
                total_tokens += chunk.token_count
                data = json.dumps({
                    "delta": chunk.content,
                    "finish_reason": chunk.finish_reason,
                })
                yield f"data: {data}\n\n"
        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"
        finally:
            # Record usage after stream completes
            await meter.record(key.id, total_tokens)
            yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )

Two things to note:

  1. X-Accel-Buffering: no — this header tells nginx not to buffer the response. Without it, nginx collects the entire response before forwarding to the client, defeating the purpose of streaming.
  2. Token counting in the finally block — usage is always recorded, even if the client disconnects mid-stream.

Streaming from the OpenAI SDK

The OpenAI Python SDK makes streaming straightforward with stream=True:

# app/services/providers/openai_provider.py

async def stream_chat(
    self,
    messages: list[ChatMessage],
    model: str = "gpt-4o",
) -> AsyncGenerator[ChunkResponse, None]:
    stream = await self.client.chat.completions.create(
        model=model,
        messages=[m.model_dump() for m in messages],
        stream=True,
    )

    async for event in stream:
        choice = event.choices[0]
        delta = choice.delta.content or ""
        if delta:
            yield ChunkResponse(
                content=delta,
                finish_reason=choice.finish_reason,
                # OpenAI doesn't return per-chunk token counts;
                # estimate from content length
                token_count=len(delta.split()),
            )

For accurate token counts on streamed responses, OpenAI returns total usage in the final chunk when you pass stream_options={"include_usage": True}. We handle this in the kit automatically.

Client-side: EventSource

The native browser EventSource API handles SSE with no libraries:

const source = new EventSource("/v1/chat/stream", {
  headers: { "X-API-Key": apiKey },
});

source.onmessage = (event) => {
  if (event.data === "[DONE]") {
    source.close();
    return;
  }
  const { delta, error } = JSON.parse(event.data);
  if (error) {
    console.error("Stream error:", error);
    source.close();
    return;
  }
  // Append delta to your UI
  appendToChat(delta);
};

source.onerror = () => {
  source.close();
};

For React apps, wrap this in a custom hook. The kit ships a useStreamingChat hook for Next.js and React frontends.

Handling disconnects gracefully

When a client disconnects mid-stream, you want to stop the LLM call to avoid wasting tokens. FastAPI's Request object exposes a is_disconnected() method:

@router.post("/v1/chat/stream")
async def stream_chat(request: Request, body: ChatRequest, ...):
    async def event_generator():
        async for chunk in llm.stream_chat(body.messages):
            if await request.is_disconnected():
                break
            yield f"data: {json.dumps({'delta': chunk.content})}\n\n"

This stops generation immediately when the user closes the tab or navigates away — no wasted API credits.

Takeaway

Streaming LLM responses with SSE in FastAPI is about 50 lines of code. The hard parts — token tracking, nginx buffering, disconnect handling — are edge cases that trip up most implementations. FastAPI AI Kit ships with all of this pre-built and tested, so you can focus on building the product around the chat endpoint, not the endpoint itself.

Build your AI backend with FastAPI AI Kit.

Clone, configure, and ship — everything is already wired up.

Read the docs
No subscriptions · One-time payment · Lifetime updates