Skip to main content
fastapistripebillingmeteringsaasproduction

Usage-Based Billing with Stripe Metering in FastAPI

How to implement token-based usage metering with Stripe's metered billing in a FastAPI backend — from per-request tracking to webhook handling.

FastAPI AI Kit Team··3 min read

Usage-based pricing is the right model for AI APIs. Customers pay for what they consume, friction to start is low, and your revenue scales with their usage. But implementing token-based metering correctly is non-trivial — this post covers the full Stripe metering integration for a FastAPI AI API.

The metering architecture

Every LLM call produces a token count. That count needs to:

  1. Be recorded per API key (for your own cost tracking)
  2. Be associated with a Stripe customer (for billing)
  3. Be batched and reported to Stripe's metered billing API

Doing this synchronously on every request adds latency. The right approach is a three-layer pipeline: record locally, batch in Redis, flush to Stripe periodically.

Stripe setup

Create a metered product in Stripe Dashboard:

  • Product: "FastAPI AI Kit API Usage"
  • Price: recurring, per unit, metered aggregation
  • Billing meter: aggregate by sum of token_count events

Note the price ID: price_.... This goes in your .env as STRIPE_METERED_PRICE_ID.

Recording usage per request

# app/billing/meter.py
from app.cache import redis_client

class UsageMeter:
    FLUSH_INTERVAL = 300  # 5 minutes
    
    async def record(
        self,
        api_key_id: str,
        tokens: int,
        customer_id: str,
    ):
        """Buffer usage — flushed to Stripe in batches."""
        event_key = f"usage:{customer_id}:{api_key_id}"
        await redis_client.incrby(event_key, tokens)
        await redis_client.expire(event_key, self.FLUSH_INTERVAL * 2)
        
        # Also write to Postgres for internal analytics
        await self._record_internal(api_key_id, tokens)
    
    async def _record_internal(self, api_key_id: str, tokens: int):
        async with AsyncSession() as db:
            await db.execute(
                update(APIKeyUsage)
                .where(APIKeyUsage.api_key_id == api_key_id)
                .where(APIKeyUsage.date == date.today())
                .values(tokens=APIKeyUsage.tokens + tokens)
            )
            await db.commit()

meter = UsageMeter()

Flushing to Stripe via Celery beat

A periodic task flushes buffered usage to Stripe:

# app/tasks/billing.py
from celery.schedules import crontab
from stripe import stripe

@celery.task
def flush_usage_to_stripe():
    """Runs every 5 minutes via Celery beat."""
    pattern = "usage:*"
    keys = redis_client.scan_iter(match=pattern)
    
    for key in keys:
        tokens = redis_client.getdel(key)  # Atomic get+delete
        if not tokens:
            continue
        
        _, customer_id, api_key_id = key.split(":")
        
        stripe.billing.MeterEvent.create(
            event_name="api_tokens",
            payload={
                "stripe_customer_id": customer_id,
                "value": str(tokens),
            },
            timestamp=int(time.time()),
        )

# celery beat schedule
app.conf.beat_schedule = {
    "flush-usage": {
        "task": "app.tasks.billing.flush_usage_to_stripe",
        "schedule": crontab(minute="*/5"),
    },
}

Using the meter in route handlers

@router.post("/v1/chat")
@require_api_key(tier=["basic", "pro"])
async def chat(
    body: ChatRequest,
    key: APIKey = Depends(get_api_key),
):
    response = await llm.chat(
        messages=body.messages,
        track_tokens=True,
    )
    
    # Non-blocking — doesn't add latency
    asyncio.create_task(
        meter.record(
            api_key_id=str(key.id),
            tokens=response.tokens.total,
            customer_id=key.stripe_customer_id,
        )
    )
    
    return ChatResponse(
        reply=response.content,
        tokens=response.tokens,
    )

Handling subscription lifecycle via webhooks

@router.post("/v1/webhooks/stripe")
async def stripe_webhook(request: Request):
    payload = await request.body()
    sig_header = request.headers.get("Stripe-Signature")
    
    try:
        event = stripe.Webhook.construct_event(
            payload, sig_header, settings.STRIPE_WEBHOOK_SECRET
        )
    except stripe.error.SignatureVerificationError:
        raise HTTPException(status_code=400, detail="Invalid signature")
    
    match event["type"]:
        case "customer.subscription.created":
            await handle_subscription_created(event["data"]["object"])
        case "customer.subscription.deleted":
            await handle_subscription_cancelled(event["data"]["object"])
        case "invoice.payment_failed":
            await handle_payment_failed(event["data"]["object"])

Giving customers usage visibility

Expose a usage endpoint so customers can see their consumption:

@router.get("/v1/usage")
async def get_usage(
    key: APIKey = Depends(get_api_key),
    db: AsyncSession = Depends(get_db),
    period: str = "current_month",
):
    usage = await get_key_usage(db, key.id, period)
    return {
        "tokens_used": usage.total_tokens,
        "requests": usage.request_count,
        "period_start": usage.period_start.isoformat(),
        "period_end": usage.period_end.isoformat(),
        "rate_limit_remaining": {
            "per_minute": key.rate_limit_per_minute - usage.last_minute_tokens,
            "per_day": key.rate_limit_per_day - usage.today_tokens,
        },
    }

FastAPI AI Kit ships this complete metering pipeline: buffered recording, Celery flush task, Stripe integration, webhook handling, and the usage endpoint. Configure STRIPE_SECRET_KEY and STRIPE_METERED_PRICE_ID and the billing layer works automatically.

Build your AI backend with FastAPI AI Kit.

Clone, configure, and ship — everything is already wired up.

Read the docs
No subscriptions · One-time payment · Lifetime updates