Usage-Based Billing with Stripe Metering in FastAPI
How to implement token-based usage metering with Stripe's metered billing in a FastAPI backend — from per-request tracking to webhook handling.
Usage-based pricing is the right model for AI APIs. Customers pay for what they consume, friction to start is low, and your revenue scales with their usage. But implementing token-based metering correctly is non-trivial — this post covers the full Stripe metering integration for a FastAPI AI API.
The metering architecture
Every LLM call produces a token count. That count needs to:
- Be recorded per API key (for your own cost tracking)
- Be associated with a Stripe customer (for billing)
- Be batched and reported to Stripe's metered billing API
Doing this synchronously on every request adds latency. The right approach is a three-layer pipeline: record locally, batch in Redis, flush to Stripe periodically.
Stripe setup
Create a metered product in Stripe Dashboard:
- Product: "FastAPI AI Kit API Usage"
- Price: recurring, per unit, metered aggregation
- Billing meter: aggregate by
sumoftoken_countevents
Note the price ID: price_.... This goes in your .env as STRIPE_METERED_PRICE_ID.
Recording usage per request
# app/billing/meter.py
from app.cache import redis_client
class UsageMeter:
FLUSH_INTERVAL = 300 # 5 minutes
async def record(
self,
api_key_id: str,
tokens: int,
customer_id: str,
):
"""Buffer usage — flushed to Stripe in batches."""
event_key = f"usage:{customer_id}:{api_key_id}"
await redis_client.incrby(event_key, tokens)
await redis_client.expire(event_key, self.FLUSH_INTERVAL * 2)
# Also write to Postgres for internal analytics
await self._record_internal(api_key_id, tokens)
async def _record_internal(self, api_key_id: str, tokens: int):
async with AsyncSession() as db:
await db.execute(
update(APIKeyUsage)
.where(APIKeyUsage.api_key_id == api_key_id)
.where(APIKeyUsage.date == date.today())
.values(tokens=APIKeyUsage.tokens + tokens)
)
await db.commit()
meter = UsageMeter()
Flushing to Stripe via Celery beat
A periodic task flushes buffered usage to Stripe:
# app/tasks/billing.py
from celery.schedules import crontab
from stripe import stripe
@celery.task
def flush_usage_to_stripe():
"""Runs every 5 minutes via Celery beat."""
pattern = "usage:*"
keys = redis_client.scan_iter(match=pattern)
for key in keys:
tokens = redis_client.getdel(key) # Atomic get+delete
if not tokens:
continue
_, customer_id, api_key_id = key.split(":")
stripe.billing.MeterEvent.create(
event_name="api_tokens",
payload={
"stripe_customer_id": customer_id,
"value": str(tokens),
},
timestamp=int(time.time()),
)
# celery beat schedule
app.conf.beat_schedule = {
"flush-usage": {
"task": "app.tasks.billing.flush_usage_to_stripe",
"schedule": crontab(minute="*/5"),
},
}
Using the meter in route handlers
@router.post("/v1/chat")
@require_api_key(tier=["basic", "pro"])
async def chat(
body: ChatRequest,
key: APIKey = Depends(get_api_key),
):
response = await llm.chat(
messages=body.messages,
track_tokens=True,
)
# Non-blocking — doesn't add latency
asyncio.create_task(
meter.record(
api_key_id=str(key.id),
tokens=response.tokens.total,
customer_id=key.stripe_customer_id,
)
)
return ChatResponse(
reply=response.content,
tokens=response.tokens,
)
Handling subscription lifecycle via webhooks
@router.post("/v1/webhooks/stripe")
async def stripe_webhook(request: Request):
payload = await request.body()
sig_header = request.headers.get("Stripe-Signature")
try:
event = stripe.Webhook.construct_event(
payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
)
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")
match event["type"]:
case "customer.subscription.created":
await handle_subscription_created(event["data"]["object"])
case "customer.subscription.deleted":
await handle_subscription_cancelled(event["data"]["object"])
case "invoice.payment_failed":
await handle_payment_failed(event["data"]["object"])
Giving customers usage visibility
Expose a usage endpoint so customers can see their consumption:
@router.get("/v1/usage")
async def get_usage(
key: APIKey = Depends(get_api_key),
db: AsyncSession = Depends(get_db),
period: str = "current_month",
):
usage = await get_key_usage(db, key.id, period)
return {
"tokens_used": usage.total_tokens,
"requests": usage.request_count,
"period_start": usage.period_start.isoformat(),
"period_end": usage.period_end.isoformat(),
"rate_limit_remaining": {
"per_minute": key.rate_limit_per_minute - usage.last_minute_tokens,
"per_day": key.rate_limit_per_day - usage.today_tokens,
},
}
FastAPI AI Kit ships this complete metering pipeline: buffered recording, Celery flush task, Stripe integration, webhook handling, and the usage endpoint. Configure STRIPE_SECRET_KEY and STRIPE_METERED_PRICE_ID and the billing layer works automatically.
Build your AI backend with FastAPI AI Kit.
Clone, configure, and ship — everything is already wired up.
