Per-Case Token Burn Tracking — Design Spec (v2)¶
Issue: curaway-ai/curaway-backend#360 Date: 2026-04-24 (v2 — updated after spec review + live API verification) Scope: Sync Langfuse generation data → PostgreSQL → Metabase dashboards + cost anomaly alerts
Spec Review Findings (v1 → v2 changes)¶
3 critical issues found in v1, all fixed:
1. API pagination: Langfuse HIPAA Cloud uses page-based pagination (not cursor). Verified.
2. Field names: Real response uses usageDetails.input/output, costDetails.total, not the v1 assumed names. Verified via live API call.
3. Session/user IDs missing: Langfuse v4 + OpenTelemetry doesn't auto-populate sessionId/userId from LangChain config metadata. Traces have sessionId: null. This must be fixed before sync can link generations to cases.
Prerequisite: Fix Langfuse Trace Association¶
BLOCKING — must be done before the sync job works.
The Langfuse v4 SDK (OpenTelemetry-based) doesn't propagate langfuse_session_id from LangChain config metadata to the trace's sessionId. Current state: all traces have sessionId: null, userId: null.
Fix options:
1. Use Langfuse Python SDK directly (not via LangChain CallbackHandler) to create traces with explicit sessionId/userId before the LLM call
2. Set env vars per-call — LANGFUSE_SESSION_ID / LANGFUSE_USER_ID before each invoke (fragile, not thread-safe)
3. Use the @observe() decorator from Langfuse v4 with explicit session_id parameter
4. Post-hoc enrichment — after generation, use Langfuse API to update the trace with sessionId/userId
Recommended: Option 1 or 3. Modify llm_gateway.invoke() to create a Langfuse trace with session_id=case_id and user_id=patient_id before invoking the LLM. The CallbackHandler then attaches generations to this trace.
Effort: 2-3 hours to fix tracing + verify session_id appears in API.
Langfuse API — Verified Response Shape¶
Endpoint: GET /api/public/observations?type=GENERATION
Host: https://hipaa.cloud.langfuse.com
Auth: Basic auth with public_key:secret_key
Pagination: Page-based (page=1&limit=100, meta.totalPages)
Real generation response (verified 2026-04-24):
{
"id": "019db65e-7b96-720e-bf47-be5f90974d69",
"traceId": "203a2f9cf737190868c71af5b13f4a7c",
"type": "GENERATION",
"name": "ChatAnthropic",
"model": "claude-haiku-4-5-20251001",
"startTime": "2026-04-22T18:05:38.582Z",
"endTime": "2026-04-22T18:05:40.381Z",
"latency": 1.799,
"usageDetails": {
"input": 5399,
"output": 126,
"total": 5525,
"input_cache_creation": 0,
"input_cache_read": 0
},
"costDetails": {
"input": 0.005399,
"output": 0.00063,
"input_cache_creation": 0,
"input_cache_read": 0,
"total": 0.006029
},
"promptTokens": 5399,
"completionTokens": 126,
"totalTokens": 5525,
"calculatedTotalCost": 0.006029,
"metadata": { ... }
}
Pagination meta:
Trace response (for session/user linking):
{
"sessionId": null, // CURRENTLY NULL — must fix
"userId": null, // CURRENTLY NULL — must fix
"name": "LangGraph",
"metadata": { ... }
}
Database Schema (v2 — fixed types)¶
CREATE TABLE llm_usage (
id VARCHAR(36) PRIMARY KEY DEFAULT gen_random_uuid()::text,
-- Case linkage (from Langfuse trace sessionId/userId — NULL until tracing fixed)
case_id VARCHAR(36) REFERENCES cases(id) ON DELETE SET NULL,
patient_id VARCHAR(36),
tenant_id VARCHAR(100) NOT NULL DEFAULT 'tenant-apollo-001',
-- Langfuse identifiers (for dedup)
langfuse_generation_id VARCHAR(100) UNIQUE NOT NULL,
langfuse_trace_id VARCHAR(100),
-- What was called
agent_name VARCHAR(100),
model VARCHAR(100),
tier VARCHAR(20),
provider VARCHAR(20),
-- Exact token counts
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
total_tokens INTEGER NOT NULL DEFAULT 0,
-- Cache tokens (Anthropic prompt caching)
cache_creation_tokens INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
-- Cost (Langfuse calculated — list price, not cache-discounted)
cost_input_usd NUMERIC(10, 6) DEFAULT 0,
cost_output_usd NUMERIC(10, 6) DEFAULT 0,
cost_total_usd NUMERIC(10, 6) NOT NULL DEFAULT 0,
-- Performance
latency_ms INTEGER,
-- Timestamps
called_at TIMESTAMPTZ NOT NULL,
synced_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_llm_usage_case ON llm_usage(case_id);
CREATE INDEX idx_llm_usage_tenant_called ON llm_usage(tenant_id, called_at);
CREATE INDEX idx_llm_usage_agent ON llm_usage(agent_name);
Changes from v1:
- id, case_id, patient_id are VARCHAR(36) not UUID (matches existing models)
- Removed GENERATED ALWAYS AS for total_tokens — just store the value directly
- Added cache_creation_tokens, cache_read_tokens (Anthropic prompt caching visibility)
- Added cost_input_usd, cost_output_usd split (enables prompt optimization analysis)
- Composite index on (tenant_id, called_at) for multi-tenant queries
Sync Job (v2 — page-based pagination, correct field mappings)¶
async def sync_langfuse_generations(db) -> int:
"""Sync Langfuse generations → llm_usage. Page-based pagination."""
# Watermark: last called_at minus 5-min overlap for late arrivals
last_called = await _get_watermark(db)
from_ts = last_called - timedelta(minutes=5) if last_called else datetime.utcnow() - timedelta(days=30)
page = 1
total_synced = 0
while True:
data = await _fetch_observations(from_ts, page, limit=100)
generations = data.get("data", [])
if not generations:
break
for gen in generations:
# Map Langfuse fields to our schema
usage = gen.get("usageDetails", {}) or {}
cost = gen.get("costDetails", {}) or {}
# Get case/patient from trace (requires separate API call or pre-fetched)
trace_info = await _get_trace_info(gen["traceId"])
await _upsert(db, {
"langfuse_generation_id": gen["id"],
"langfuse_trace_id": gen.get("traceId"),
"case_id": trace_info.get("sessionId"), # NULL until tracing fixed
"patient_id": trace_info.get("userId"),
"agent_name": gen.get("name"),
"model": gen.get("model"),
"tier": model_to_tier(gen.get("model", "")).value,
"provider": "gpt" if "gpt" in (gen.get("model") or "") else "claude",
"input_tokens": usage.get("input", 0),
"output_tokens": usage.get("output", 0),
"total_tokens": usage.get("total", 0),
"cache_creation_tokens": usage.get("input_cache_creation", 0),
"cache_read_tokens": usage.get("input_cache_read", 0),
"cost_input_usd": cost.get("input", 0),
"cost_output_usd": cost.get("output", 0),
"cost_total_usd": cost.get("total", 0),
"latency_ms": int((gen.get("latency") or 0) * 1000),
"called_at": gen.get("startTime"),
})
total_synced += 1
total_pages = data.get("meta", {}).get("totalPages", 1)
if page >= total_pages:
break
page += 1
await db.commit()
# Check for cost anomalies
await _check_cost_anomalies(db)
return total_synced
Implementation Order¶
- Fix Langfuse trace association (BLOCKING) — ensure sessionId = case_id on every trace
- Alembic migration — create
llm_usagetable - Sync service —
app/services/langfuse_sync.py - Internal endpoint —
POST /api/v1/internal/sync/langfuse-usage - QStash cron — every 15 min
- Cost anomaly alert — in sync job, trigger Telegram alert
- Metabase dashboards — SQL queries (provided in this spec)
Cost Accuracy Note¶
Langfuse calculates cost at list price per token. Anthropic prompt caching reduces input token cost by ~90% for cache hits. The cache_read_tokens field captures this, but costDetails.total does NOT reflect the discount.
For now: Langfuse cost is an upper-bound estimate. Actual billed cost is lower when caching is active. We capture cache_read_tokens so future reconciliation against Anthropic billing API is possible.
Concurrent Sync Protection¶
Add a Redis-based lock to prevent overlapping syncs:
SYNC_LOCK_KEY = "langfuse_sync:lock"
SYNC_LOCK_TTL = 600 # 10 minutes
async def sync_langfuse_generations(db):
if not await redis.set(SYNC_LOCK_KEY, "1", nx=True, ex=SYNC_LOCK_TTL):
logger.info("Langfuse sync already in progress — skipping")
return 0
try:
# ... sync logic ...
finally:
await redis.delete(SYNC_LOCK_KEY)
Edge Cases¶
- No sessionId on traces (current state): Store with
case_id = NULL. Backfill when tracing is fixed. - Duplicate generations:
ON CONFLICT (langfuse_generation_id) DO NOTHING. - Langfuse API down: Sync fails, retries next cron. No data loss.
- Backfill: First run syncs last 30 days. ~5K records across ~50 pages. May take 2-3 min — use separate
/backfillendpoint with longer timeout. - 5K+ daily records: Current Langfuse limit: 50K observations/month free. At scale, may need paid plan.
- Late-arriving data: Watermark uses
MAX(called_at) - 5 minutesoverlap. Dedup handles duplicates.
Files to Create/Modify¶
| File | Action |
|---|---|
app/services/langfuse_sync.py |
New — sync job |
app/models/llm_usage.py |
New — SQLAlchemy model |
app/routers/internal.py |
Add sync + backfill endpoints |
alembic/versions/xxx_add_llm_usage.py |
New — migration |
app/services/llm_gateway.py |
Fix Langfuse trace association (sessionId) |
app/agents/tracing.py |
Update trace creation for v4 session/user |