Skip to content

Per-Case Token Burn Tracking — Design Spec (v2)

Issue: curaway-ai/curaway-backend#360 Date: 2026-04-24 (v2 — updated after spec review + live API verification) Scope: Sync Langfuse generation data → PostgreSQL → Metabase dashboards + cost anomaly alerts


Spec Review Findings (v1 → v2 changes)

3 critical issues found in v1, all fixed: 1. API pagination: Langfuse HIPAA Cloud uses page-based pagination (not cursor). Verified. 2. Field names: Real response uses usageDetails.input/output, costDetails.total, not the v1 assumed names. Verified via live API call. 3. Session/user IDs missing: Langfuse v4 + OpenTelemetry doesn't auto-populate sessionId/userId from LangChain config metadata. Traces have sessionId: null. This must be fixed before sync can link generations to cases.


Prerequisite: Fix Langfuse Trace Association

BLOCKING — must be done before the sync job works.

The Langfuse v4 SDK (OpenTelemetry-based) doesn't propagate langfuse_session_id from LangChain config metadata to the trace's sessionId. Current state: all traces have sessionId: null, userId: null.

Fix options: 1. Use Langfuse Python SDK directly (not via LangChain CallbackHandler) to create traces with explicit sessionId/userId before the LLM call 2. Set env vars per-callLANGFUSE_SESSION_ID / LANGFUSE_USER_ID before each invoke (fragile, not thread-safe) 3. Use the @observe() decorator from Langfuse v4 with explicit session_id parameter 4. Post-hoc enrichment — after generation, use Langfuse API to update the trace with sessionId/userId

Recommended: Option 1 or 3. Modify llm_gateway.invoke() to create a Langfuse trace with session_id=case_id and user_id=patient_id before invoking the LLM. The CallbackHandler then attaches generations to this trace.

Effort: 2-3 hours to fix tracing + verify session_id appears in API.


Langfuse API — Verified Response Shape

Endpoint: GET /api/public/observations?type=GENERATION Host: https://hipaa.cloud.langfuse.com Auth: Basic auth with public_key:secret_key Pagination: Page-based (page=1&limit=100, meta.totalPages)

Real generation response (verified 2026-04-24):

{
  "id": "019db65e-7b96-720e-bf47-be5f90974d69",
  "traceId": "203a2f9cf737190868c71af5b13f4a7c",
  "type": "GENERATION",
  "name": "ChatAnthropic",
  "model": "claude-haiku-4-5-20251001",
  "startTime": "2026-04-22T18:05:38.582Z",
  "endTime": "2026-04-22T18:05:40.381Z",
  "latency": 1.799,
  "usageDetails": {
    "input": 5399,
    "output": 126,
    "total": 5525,
    "input_cache_creation": 0,
    "input_cache_read": 0
  },
  "costDetails": {
    "input": 0.005399,
    "output": 0.00063,
    "input_cache_creation": 0,
    "input_cache_read": 0,
    "total": 0.006029
  },
  "promptTokens": 5399,
  "completionTokens": 126,
  "totalTokens": 5525,
  "calculatedTotalCost": 0.006029,
  "metadata": { ... }
}

Pagination meta:

{
  "meta": {
    "page": 1,
    "limit": 100,
    "totalItems": 5002,
    "totalPages": 51
  }
}

Trace response (for session/user linking):

{
  "sessionId": null,        // CURRENTLY NULL — must fix
  "userId": null,           // CURRENTLY NULL — must fix
  "name": "LangGraph",
  "metadata": { ... }
}


Database Schema (v2 — fixed types)

CREATE TABLE llm_usage (
    id VARCHAR(36) PRIMARY KEY DEFAULT gen_random_uuid()::text,

    -- Case linkage (from Langfuse trace sessionId/userId — NULL until tracing fixed)
    case_id VARCHAR(36) REFERENCES cases(id) ON DELETE SET NULL,
    patient_id VARCHAR(36),
    tenant_id VARCHAR(100) NOT NULL DEFAULT 'tenant-apollo-001',

    -- Langfuse identifiers (for dedup)
    langfuse_generation_id VARCHAR(100) UNIQUE NOT NULL,
    langfuse_trace_id VARCHAR(100),

    -- What was called
    agent_name VARCHAR(100),
    model VARCHAR(100),
    tier VARCHAR(20),
    provider VARCHAR(20),

    -- Exact token counts
    input_tokens INTEGER NOT NULL DEFAULT 0,
    output_tokens INTEGER NOT NULL DEFAULT 0,
    total_tokens INTEGER NOT NULL DEFAULT 0,

    -- Cache tokens (Anthropic prompt caching)
    cache_creation_tokens INTEGER DEFAULT 0,
    cache_read_tokens INTEGER DEFAULT 0,

    -- Cost (Langfuse calculated — list price, not cache-discounted)
    cost_input_usd NUMERIC(10, 6) DEFAULT 0,
    cost_output_usd NUMERIC(10, 6) DEFAULT 0,
    cost_total_usd NUMERIC(10, 6) NOT NULL DEFAULT 0,

    -- Performance
    latency_ms INTEGER,

    -- Timestamps
    called_at TIMESTAMPTZ NOT NULL,
    synced_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_llm_usage_case ON llm_usage(case_id);
CREATE INDEX idx_llm_usage_tenant_called ON llm_usage(tenant_id, called_at);
CREATE INDEX idx_llm_usage_agent ON llm_usage(agent_name);

Changes from v1: - id, case_id, patient_id are VARCHAR(36) not UUID (matches existing models) - Removed GENERATED ALWAYS AS for total_tokens — just store the value directly - Added cache_creation_tokens, cache_read_tokens (Anthropic prompt caching visibility) - Added cost_input_usd, cost_output_usd split (enables prompt optimization analysis) - Composite index on (tenant_id, called_at) for multi-tenant queries


Sync Job (v2 — page-based pagination, correct field mappings)

async def sync_langfuse_generations(db) -> int:
    """Sync Langfuse generations → llm_usage. Page-based pagination."""
    # Watermark: last called_at minus 5-min overlap for late arrivals
    last_called = await _get_watermark(db)
    from_ts = last_called - timedelta(minutes=5) if last_called else datetime.utcnow() - timedelta(days=30)

    page = 1
    total_synced = 0
    while True:
        data = await _fetch_observations(from_ts, page, limit=100)
        generations = data.get("data", [])
        if not generations:
            break

        for gen in generations:
            # Map Langfuse fields to our schema
            usage = gen.get("usageDetails", {}) or {}
            cost = gen.get("costDetails", {}) or {}

            # Get case/patient from trace (requires separate API call or pre-fetched)
            trace_info = await _get_trace_info(gen["traceId"])

            await _upsert(db, {
                "langfuse_generation_id": gen["id"],
                "langfuse_trace_id": gen.get("traceId"),
                "case_id": trace_info.get("sessionId"),     # NULL until tracing fixed
                "patient_id": trace_info.get("userId"),
                "agent_name": gen.get("name"),
                "model": gen.get("model"),
                "tier": model_to_tier(gen.get("model", "")).value,
                "provider": "gpt" if "gpt" in (gen.get("model") or "") else "claude",
                "input_tokens": usage.get("input", 0),
                "output_tokens": usage.get("output", 0),
                "total_tokens": usage.get("total", 0),
                "cache_creation_tokens": usage.get("input_cache_creation", 0),
                "cache_read_tokens": usage.get("input_cache_read", 0),
                "cost_input_usd": cost.get("input", 0),
                "cost_output_usd": cost.get("output", 0),
                "cost_total_usd": cost.get("total", 0),
                "latency_ms": int((gen.get("latency") or 0) * 1000),
                "called_at": gen.get("startTime"),
            })
            total_synced += 1

        total_pages = data.get("meta", {}).get("totalPages", 1)
        if page >= total_pages:
            break
        page += 1

    await db.commit()

    # Check for cost anomalies
    await _check_cost_anomalies(db)

    return total_synced

Implementation Order

  1. Fix Langfuse trace association (BLOCKING) — ensure sessionId = case_id on every trace
  2. Alembic migration — create llm_usage table
  3. Sync serviceapp/services/langfuse_sync.py
  4. Internal endpointPOST /api/v1/internal/sync/langfuse-usage
  5. QStash cron — every 15 min
  6. Cost anomaly alert — in sync job, trigger Telegram alert
  7. Metabase dashboards — SQL queries (provided in this spec)

Cost Accuracy Note

Langfuse calculates cost at list price per token. Anthropic prompt caching reduces input token cost by ~90% for cache hits. The cache_read_tokens field captures this, but costDetails.total does NOT reflect the discount.

For now: Langfuse cost is an upper-bound estimate. Actual billed cost is lower when caching is active. We capture cache_read_tokens so future reconciliation against Anthropic billing API is possible.


Concurrent Sync Protection

Add a Redis-based lock to prevent overlapping syncs:

SYNC_LOCK_KEY = "langfuse_sync:lock"
SYNC_LOCK_TTL = 600  # 10 minutes

async def sync_langfuse_generations(db):
    if not await redis.set(SYNC_LOCK_KEY, "1", nx=True, ex=SYNC_LOCK_TTL):
        logger.info("Langfuse sync already in progress — skipping")
        return 0
    try:
        # ... sync logic ...
    finally:
        await redis.delete(SYNC_LOCK_KEY)

Edge Cases

  • No sessionId on traces (current state): Store with case_id = NULL. Backfill when tracing is fixed.
  • Duplicate generations: ON CONFLICT (langfuse_generation_id) DO NOTHING.
  • Langfuse API down: Sync fails, retries next cron. No data loss.
  • Backfill: First run syncs last 30 days. ~5K records across ~50 pages. May take 2-3 min — use separate /backfill endpoint with longer timeout.
  • 5K+ daily records: Current Langfuse limit: 50K observations/month free. At scale, may need paid plan.
  • Late-arriving data: Watermark uses MAX(called_at) - 5 minutes overlap. Dedup handles duplicates.

Files to Create/Modify

File Action
app/services/langfuse_sync.py New — sync job
app/models/llm_usage.py New — SQLAlchemy model
app/routers/internal.py Add sync + backfill endpoints
alembic/versions/xxx_add_llm_usage.py New — migration
app/services/llm_gateway.py Fix Langfuse trace association (sessionId)
app/agents/tracing.py Update trace creation for v4 session/user