Sequence — Async Pipelines + Webhooks¶
Beyond the synchronous request paths, Curaway runs scheduled crons (via QStash) and inbound webhooks (from Clerk). On GCP these all consolidate to Cloud Tasks + Cloud Scheduler — a single migration unit deserving its own diagram set.
Audience: Engineering team. Read alongside Phase A2 of the patient happy path which covers the document-extraction queue.
Inventory¶
Scheduled crons (registered via app/register_schedules.py)¶
| Schedule | Cron | Endpoint | Purpose |
|---|---|---|---|
exchange-rate-refresh |
0 0 * * * |
/api/v1/internal/tasks/exchange-rate-refresh |
Pull currency rates from Frankfurter API |
intake-reminders |
0 * * * * |
/api/v1/internal/tasks/intake-reminders |
Email patients stuck in intake > 24h |
stale-session-cleanup |
0 3 * * * |
/api/v1/internal/tasks/stale-session-cleanup |
Clear stuck OCR jobs, orphaned uploads |
consent-expiry-check |
0 9 * * * |
/api/v1/internal/tasks/consent-expiry-check |
Warn 30-day-expiring consents (stub) |
notification-digest |
0 8 * * * |
/api/v1/internal/tasks/notification-digest |
Daily digest emails (stub) |
analytics-refresh |
0 */2 * * * |
/api/v1/internal/tasks/analytics-refresh |
Refresh Metabase materialized views (stub) |
One-off async tasks (enqueued from sync handlers)¶
| Trigger | Endpoint | Purpose |
|---|---|---|
POST /documents/confirm |
/api/v1/internal/extract |
Document text + entity extraction (covered in Phase A2) |
| FHIR sweep (manual + future cron) | /api/v1/internal/fhir/sweep |
Backfill FHIR resources for cases that lack them |
Inbound webhooks¶
| Source | Endpoint | Purpose |
|---|---|---|
| Clerk | POST /api/v1/webhooks/clerk |
organizationMembership.deleted → deactivate auto-assigned roles |
Sequence — FHIR sweep (admin-triggered or scheduled)¶
Sweeps cases that have status='extraction_complete' but missing FHIR resources. Idempotent: re-running on the same case is a no-op once resources land.
sequenceDiagram
autonumber
actor Operator as Operator<br/>(SD or future cron)
participant API as Curaway API
participant Sweep as fhir_sweep_service
participant Repo as DocumentRepository
participant PG as PostgreSQL
participant Anthropic as Anthropic
participant LF as Langfuse
Operator->>API: POST /api/v1/internal/fhir/sweep<br/>?dry_run=true&case_id=…<br/>X-Internal-Secret: …
API->>Sweep: run_sweep(db, dry_run, case_id, limit)
Sweep->>Repo: find_sweep_candidates(limit, case_id)
Repo->>PG: SELECT documents JOIN cases<br/>WHERE status='extracted' AND no FHIR rows
PG-->>Repo: candidates[]
Repo-->>Sweep: candidates[]
loop per candidate
alt dry_run = true
Sweep->>Sweep: log "would extract"
else dry_run = false
Sweep->>Anthropic: clinical_context agent<br/>(re-extract with current prompt)
Note over Sweep,Anthropic: ANTHROPIC_API_KEY<br/>via llm_gateway
Anthropic-->>Sweep: structured FHIR JSON
Sweep--xLF: trace
Sweep->>PG: INSERT fhir_resources rows
Sweep->>PG: UPDATE documents.fhir_status='complete'
end
end
Sweep-->>API: { processed_count, failed_count, dry_run }
API-->>Operator: 200 with summary
Migration note: The sweep is currently manual (operator pushes a curl). Issue #442 plans a weekly QStash cron — that conversion happens with the rest of the QStash → Cloud Tasks migration.
Sequence — Scheduled cron (representative: intake-reminders)¶
All 6 crons follow the same pattern: QStash → POST → handler reads/writes Postgres → optional egress (email, exchange rates, etc.).
sequenceDiagram
autonumber
participant QS as Upstash QStash<br/>(cron scheduler)
participant API as Curaway API
participant Handler as Internal task handler
participant PG as PostgreSQL
participant Email as Email provider<br/>(intake reminders)
QS->>QS: trigger schedule "intake-reminders"<br/>(cron: 0 * * * *)
QS->>API: POST /api/v1/internal/tasks/intake-reminders<br/>X-Internal-Secret: …<br/>Upstash-Signature: …
API->>API: verify X-Internal-Secret
API->>Handler: task body
Handler->>PG: SELECT cases<br/>WHERE status='intake' AND<br/>last_message_at < NOW() - INTERVAL '24h'
loop per stuck case
Handler->>Email: send reminder
Handler->>PG: INSERT events<br/>(reminder_sent)
end
Handler-->>API: { reminders_sent: N }
API-->>QS: 200
Sequence — Clerk webhook (organizationMembership.deleted)¶
When a user is removed from a Clerk org, we deactivate their auto-assigned role rows for that tenant. Manual rows are preserved (sticky-revoke is a separate operator action).
sequenceDiagram
autonumber
participant Clerk as Clerk
participant API as Curaway API
participant Wh as clerk_webhook service
participant PG as PostgreSQL
Note over Clerk: admin removes user<br/>from Clerk org
Clerk->>API: POST /api/v1/webhooks/clerk<br/>svix-id, svix-timestamp, svix-signature
API->>Wh: handle(headers, body)
Wh->>Wh: verify HMAC-SHA256 signature<br/>using CLERK_WEBHOOK_SECRET<br/>(custom impl, no svix dep)
alt signature invalid
Wh-->>API: 401
else replay window > 5 min
Wh-->>API: 401
else
Wh->>Wh: parse type=organizationMembership.deleted
Wh->>PG: SELECT tenant_id<br/>FROM tenant_org_mappings<br/>WHERE clerk_org_id = ?
Wh->>PG: UPDATE user_roles<br/>SET is_active=false, updated_at=NOW()<br/>WHERE user_id=? AND tenant_id=?<br/>AND granted_by='system:clerk_jwt'
Note over Wh,PG: manual rows untouched
Wh-->>API: { event: deactivated, count: N }
end
API-->>Clerk: 200 (or 401)
Migration callouts (consolidated)¶
| Concern | Today | GCP target |
|---|---|---|
| Cron scheduling | Upstash QStash schedules (registered via register_schedules.py) |
Cloud Scheduler + Cloud Tasks queue per schedule. Cloud Scheduler offers same cron syntax. |
| One-off async enqueue | QStash POST with HMAC signing | Cloud Tasks with OIDC auth tokens (drop the HMAC custom code) |
| Internal endpoint auth | X-Internal-Secret header |
Replace with Cloud Run service-to-service auth via OIDC tokens. Service identity in IAM is more robust than a shared secret. |
| Webhook auth | Custom HMAC-SHA256 verification of Svix signature | Unchanged (Clerk → public endpoint). Still use the existing inline verifier. |
| Replay protection | 5-min timestamp window in Clerk webhook handler | Unchanged |
| Retry policy | QStash default (exponential, 3 tries) | Cloud Tasks: configurable per queue. Match current 3-try limit. |
| Logging | Stdout → Railway logs → Axiom | Stdout → Cloud Logging → BigQuery / Cloud Monitoring. Axiom retained or migrated. |
| Dead-letter | QStash DLQ (pseudo-queue) | Cloud Tasks DLQ + Pub/Sub topic for ops alerting |
| Cost | Upstash QStash $0/mo at current volume (free tier) | Cloud Tasks pricing: ~$0.40 per million ops; effectively free at our volume. |
Migration risks specific to async paths¶
-
X-Internal-Secretheader is shared across services. A leak (e.g. accidental log) gives any caller permission to enqueue tasks. Rotation is currently manual. Move to Cloud Run service-to-service OIDC tokens during migration — eliminates the shared secret entirely. -
No deduplication on cron triggers. If Cloud Scheduler retries due to a transient 5xx, we could double-process. The current cron handlers are idempotent for
intake-remindersandstale-session-cleanup, but not all. Audit each before migration. -
Clerk webhook signature secret is in Railway env. Already in scope of the secret-rotation hygiene task (Session 80 carryover). Move to Secret Manager on GCP, with auto-rotation hook.
-
FHIR sweep concurrency. The current sweep runs serially in a single worker. If migrated to a Cloud Tasks queue with parallelism > 1, two sweep invocations could race on the same case. Pre-migration: add a Postgres advisory lock on
case_idinrun_sweep. -
No backpressure between document-extraction and chat. Both share the same Railway container today. After migration to separate Cloud Run services (worker vs API), the worker can scale independently — but a backlog spike on extraction won't slow chat. Net positive, just needs separate observability.
Code references¶
- Schedules:
app/register_schedules.py - QStash client:
app/integrations/qstash_client.py - Internal task handlers:
app/routers/internal_ops.py - FHIR sweep:
app/services/fhir_sweep_service.py - Clerk webhook:
app/routers/clerk_webhooks.py,app/services/clerk_webhook.py