Skip to content

Sequence — Async Pipelines + Webhooks

Beyond the synchronous request paths, Curaway runs scheduled crons (via QStash) and inbound webhooks (from Clerk). On GCP these all consolidate to Cloud Tasks + Cloud Scheduler — a single migration unit deserving its own diagram set.

Audience: Engineering team. Read alongside Phase A2 of the patient happy path which covers the document-extraction queue.


Inventory

Scheduled crons (registered via app/register_schedules.py)

Schedule Cron Endpoint Purpose
exchange-rate-refresh 0 0 * * * /api/v1/internal/tasks/exchange-rate-refresh Pull currency rates from Frankfurter API
intake-reminders 0 * * * * /api/v1/internal/tasks/intake-reminders Email patients stuck in intake > 24h
stale-session-cleanup 0 3 * * * /api/v1/internal/tasks/stale-session-cleanup Clear stuck OCR jobs, orphaned uploads
consent-expiry-check 0 9 * * * /api/v1/internal/tasks/consent-expiry-check Warn 30-day-expiring consents (stub)
notification-digest 0 8 * * * /api/v1/internal/tasks/notification-digest Daily digest emails (stub)
analytics-refresh 0 */2 * * * /api/v1/internal/tasks/analytics-refresh Refresh Metabase materialized views (stub)

One-off async tasks (enqueued from sync handlers)

Trigger Endpoint Purpose
POST /documents/confirm /api/v1/internal/extract Document text + entity extraction (covered in Phase A2)
FHIR sweep (manual + future cron) /api/v1/internal/fhir/sweep Backfill FHIR resources for cases that lack them

Inbound webhooks

Source Endpoint Purpose
Clerk POST /api/v1/webhooks/clerk organizationMembership.deleted → deactivate auto-assigned roles

Sequence — FHIR sweep (admin-triggered or scheduled)

Sweeps cases that have status='extraction_complete' but missing FHIR resources. Idempotent: re-running on the same case is a no-op once resources land.

sequenceDiagram
    autonumber
    actor Operator as Operator<br/>(SD or future cron)
    participant API as Curaway API
    participant Sweep as fhir_sweep_service
    participant Repo as DocumentRepository
    participant PG as PostgreSQL
    participant Anthropic as Anthropic
    participant LF as Langfuse

    Operator->>API: POST /api/v1/internal/fhir/sweep<br/>?dry_run=true&case_id=…<br/>X-Internal-Secret: …
    API->>Sweep: run_sweep(db, dry_run, case_id, limit)
    Sweep->>Repo: find_sweep_candidates(limit, case_id)
    Repo->>PG: SELECT documents JOIN cases<br/>WHERE status='extracted' AND no FHIR rows
    PG-->>Repo: candidates[]
    Repo-->>Sweep: candidates[]

    loop per candidate
        alt dry_run = true
            Sweep->>Sweep: log "would extract"
        else dry_run = false
            Sweep->>Anthropic: clinical_context agent<br/>(re-extract with current prompt)
            Note over Sweep,Anthropic: ANTHROPIC_API_KEY<br/>via llm_gateway
            Anthropic-->>Sweep: structured FHIR JSON
            Sweep--xLF: trace
            Sweep->>PG: INSERT fhir_resources rows
            Sweep->>PG: UPDATE documents.fhir_status='complete'
        end
    end

    Sweep-->>API: { processed_count, failed_count, dry_run }
    API-->>Operator: 200 with summary

Migration note: The sweep is currently manual (operator pushes a curl). Issue #442 plans a weekly QStash cron — that conversion happens with the rest of the QStash → Cloud Tasks migration.


Sequence — Scheduled cron (representative: intake-reminders)

All 6 crons follow the same pattern: QStash → POST → handler reads/writes Postgres → optional egress (email, exchange rates, etc.).

sequenceDiagram
    autonumber
    participant QS as Upstash QStash<br/>(cron scheduler)
    participant API as Curaway API
    participant Handler as Internal task handler
    participant PG as PostgreSQL
    participant Email as Email provider<br/>(intake reminders)

    QS->>QS: trigger schedule "intake-reminders"<br/>(cron: 0 * * * *)
    QS->>API: POST /api/v1/internal/tasks/intake-reminders<br/>X-Internal-Secret: …<br/>Upstash-Signature: …
    API->>API: verify X-Internal-Secret
    API->>Handler: task body
    Handler->>PG: SELECT cases<br/>WHERE status='intake' AND<br/>last_message_at < NOW() - INTERVAL '24h'

    loop per stuck case
        Handler->>Email: send reminder
        Handler->>PG: INSERT events<br/>(reminder_sent)
    end

    Handler-->>API: { reminders_sent: N }
    API-->>QS: 200

Sequence — Clerk webhook (organizationMembership.deleted)

When a user is removed from a Clerk org, we deactivate their auto-assigned role rows for that tenant. Manual rows are preserved (sticky-revoke is a separate operator action).

sequenceDiagram
    autonumber
    participant Clerk as Clerk
    participant API as Curaway API
    participant Wh as clerk_webhook service
    participant PG as PostgreSQL

    Note over Clerk: admin removes user<br/>from Clerk org
    Clerk->>API: POST /api/v1/webhooks/clerk<br/>svix-id, svix-timestamp, svix-signature

    API->>Wh: handle(headers, body)
    Wh->>Wh: verify HMAC-SHA256 signature<br/>using CLERK_WEBHOOK_SECRET<br/>(custom impl, no svix dep)
    alt signature invalid
        Wh-->>API: 401
    else replay window > 5 min
        Wh-->>API: 401
    else
        Wh->>Wh: parse type=organizationMembership.deleted
        Wh->>PG: SELECT tenant_id<br/>FROM tenant_org_mappings<br/>WHERE clerk_org_id = ?
        Wh->>PG: UPDATE user_roles<br/>SET is_active=false, updated_at=NOW()<br/>WHERE user_id=? AND tenant_id=?<br/>AND granted_by='system:clerk_jwt'
        Note over Wh,PG: manual rows untouched
        Wh-->>API: { event: deactivated, count: N }
    end

    API-->>Clerk: 200 (or 401)

Migration callouts (consolidated)

Concern Today GCP target
Cron scheduling Upstash QStash schedules (registered via register_schedules.py) Cloud Scheduler + Cloud Tasks queue per schedule. Cloud Scheduler offers same cron syntax.
One-off async enqueue QStash POST with HMAC signing Cloud Tasks with OIDC auth tokens (drop the HMAC custom code)
Internal endpoint auth X-Internal-Secret header Replace with Cloud Run service-to-service auth via OIDC tokens. Service identity in IAM is more robust than a shared secret.
Webhook auth Custom HMAC-SHA256 verification of Svix signature Unchanged (Clerk → public endpoint). Still use the existing inline verifier.
Replay protection 5-min timestamp window in Clerk webhook handler Unchanged
Retry policy QStash default (exponential, 3 tries) Cloud Tasks: configurable per queue. Match current 3-try limit.
Logging Stdout → Railway logs → Axiom Stdout → Cloud Logging → BigQuery / Cloud Monitoring. Axiom retained or migrated.
Dead-letter QStash DLQ (pseudo-queue) Cloud Tasks DLQ + Pub/Sub topic for ops alerting
Cost Upstash QStash $0/mo at current volume (free tier) Cloud Tasks pricing: ~$0.40 per million ops; effectively free at our volume.

Migration risks specific to async paths

  1. X-Internal-Secret header is shared across services. A leak (e.g. accidental log) gives any caller permission to enqueue tasks. Rotation is currently manual. Move to Cloud Run service-to-service OIDC tokens during migration — eliminates the shared secret entirely.

  2. No deduplication on cron triggers. If Cloud Scheduler retries due to a transient 5xx, we could double-process. The current cron handlers are idempotent for intake-reminders and stale-session-cleanup, but not all. Audit each before migration.

  3. Clerk webhook signature secret is in Railway env. Already in scope of the secret-rotation hygiene task (Session 80 carryover). Move to Secret Manager on GCP, with auto-rotation hook.

  4. FHIR sweep concurrency. The current sweep runs serially in a single worker. If migrated to a Cloud Tasks queue with parallelism > 1, two sweep invocations could race on the same case. Pre-migration: add a Postgres advisory lock on case_id in run_sweep.

  5. No backpressure between document-extraction and chat. Both share the same Railway container today. After migration to separate Cloud Run services (worker vs API), the worker can scale independently — but a backlog spike on extraction won't slow chat. Net positive, just needs separate observability.


Code references