Skip to content

ADR-0014: Upstash Workflow for Multi-Step Async Pipelines

Status: Accepted (intent) — Not Yet Implemented Date: 2026-04-08 Session: 34

Context

Curaway has several asynchronous pipelines that today are implemented as loose chains of QStash messages or as asyncio.create_task background work. They share three properties that the current approach handles poorly:

  1. They span multiple LLM and HTTP calls that each take seconds.
  2. Any single step can fail or hang (LLM rate limits, OCR worker crashes, downstream provider timeouts) and the user shouldn't lose the work done by earlier steps.
  3. Visibility into where a pipeline got stuck is buried in events table joins and Langfuse traces.

The pipelines that match this profile today:

Pipeline Steps Current implementation
Document upload → analysis → matching Upload → R2 confirm → OCR (PyMuPDF inline OR Unstructured async) → Clinical Context Agent → FHIR validation → embedding match → EHR rebuild → SSE notify Mix of inline async + QStash callback (enable_inline_ocr flag); shared pipeline in app/services/document_processing.py:run_post_ocr_pipeline
Forward records to providers Aggregate EHR → on-site tests notice → consent verify → forward to N selected providers → write outbound webhooks → notify patient _handle_forwarding in case_orchestrator.py runs steps inline; provider webhooks would be a follow-up QStash chain
Provider webhook delivery Build payload → HTTP POST to provider → handle 4xx/5xx → retry with backoff → mark delivered or dead-letter provider_webhooks table exists, delivery handler stubbed
Daily scheduled jobs Exchange-rate refresh, intake reminders, stale-session cleanup, consent expiry warnings, notification digest, analytics view refresh QStash cron schedules → internal callback endpoints with X-Internal-Secret
Future: ReAct loops (ADR-0011, deferred) Tool call → external API → observe → reason → next tool call → … (5–10 hops) Not built yet

What we want from the runtime layer of those pipelines:

  1. Durable execution — if step 4 of 7 crashes, the pipeline resumes from step 4 on retry, not step 1. The completed work persists across restarts.
  2. Idempotent retries — each step has a deterministic ID so a duplicate delivery doesn't double-write.
  3. Failure isolation — a single failed step shouldn't take down the web process or block unrelated requests.
  4. Built-in scheduling — run X minutes/hours/days from now (already covered by QStash).
  5. Observability — a single ID per pipeline run that ties together every step's logs, retries, and outcome.
  6. Zero net new infrastructure — Curaway runs on Railway Pro with one FastAPI container; we don't want a separate worker pool, a Redis-Streams consumer group, or a Temporal cluster.

The current QStash-callback pattern gives us #3, #4, and #5 (somewhat — via correlation_id) but not #1 (durable execution) or #2 (deterministic step IDs). Each callback is a fresh HTTP request with no memory of what came before; we have to reconstruct state from PostgreSQL on every step.

Decision

Adopt Upstash Workflow as the runtime for multi-step async pipelines that need durable execution. Keep plain QStash (single-message publish/schedule) for fire-and-forget jobs that don't have multiple steps to coordinate.

Upstash Workflow is built on top of QStash. It adds:

  • context.run("step-name", async () => ...) — each step is a named, idempotent unit. The framework persists the return value, so a retry resumes from the next step with the previous step's result already in memory.
  • context.sleep("delay-name", "5 minutes") — durable sleep that doesn't hold a server process; the workflow re-enters via HTTP after the sleep.
  • context.call("call-name", { url, ... }) — durable HTTP call with built-in retries.
  • context.waitForEvent(...) / context.notify(...) — wait for an external event (e.g., provider webhook ack) without holding a connection.
  • One workflow run = one ID — Upstash console shows the entire DAG, per-step status, retry history, latency, and error messages.

We pay zero new infrastructure cost. Workflow runs on the same QStash account we already use (free tier: 500 messages/day; each step counts as one message, so a 7-step pipeline = 7 messages).

Use Cases

We will introduce Upstash Workflow incrementally, one pipeline per session, in this order:

1. Document upload pipeline (highest ROI)

Today: Mix of inline asyncio.create_task (PyMuPDF fast path) and QStash callback (Unstructured slow path). Shared logic in run_post_ocr_pipeline. When OCR succeeds but Clinical Context Agent fails halfway through, the pipeline silently abandons and the patient sees a stuck "Analyzing your documents…" indicator until the timeout.

With Workflow: Each stage becomes a context.run() step:

await context.run("ocr",                async () => extractText(documentId))
await context.run("clinical-extract",   async () => callClinicalContextAgent())
await context.run("fhir-validate",      async () => validateFhirResources())
await context.run("embedding-match",    async () => matchProcedureRequirements())
await context.run("ehr-rebuild",        async () => rebuildEhr(caseId))
await context.run("sse-notify",         async () => emitProgress("matching_complete"))
  • If any step fails, Upstash automatically retries that step (with exponential backoff) without re-running OCR or the LLM extraction.
  • The patient sees deterministic SSE progress events because emit_progress is itself a step (idempotent).
  • We get a single workflow ID that ties together every Langfuse span, Clinical Context Agent log, and final SSE event for that document.

This replaces the current bug-prone "OCR done? has_issues counts as analyzed? did the QStash callback fire? did the inline path race?" ambiguity that consumed multiple sessions of debugging in Sessions 23B-24.

2. Provider forwarding + webhook delivery

Today: _handle_forwarding runs inline, then a follow-up QStash job would deliver outbound webhooks to each selected provider. Webhook retries, backoff, and dead-letter handling are not implemented.

With Workflow:

const consentRow = await context.run("verify-consent", verifyConsent)
const packet     = await context.run("build-packet", buildForwardingPacket)
for (const providerId of selectedProviders) {
  await context.run(`notify-${providerId}`, async () => {
    return context.call(`webhook-${providerId}`, {
      url: providerWebhookUrl(providerId),
      method: "POST",
      body: packet,
      retries: 3,
      backoff: { type: "exponential", initial: 30 },
    })
  })
  await context.run(`mark-delivered-${providerId}`, markDelivered)
}
await context.run("notify-patient", emitConfirmationEvent)

Each provider notification is its own retryable step. A 502 from one hospital's webhook receiver doesn't block delivery to the others. The events table gets a clean per-provider delivery audit trail.

3. Long-running scheduled jobs that have multiple stages

Today: Daily QStash cron messages → single internal callback endpoint that does everything inline. If the job times out at the Railway request level (30s), partial work is lost.

With Workflow: A single scheduled trigger starts a workflow that runs for as long as it needs to, paginating through work in context.run chunks of safe size. Example: stale-session cleanup walks 10,000 documents in batches of 100, with each batch as its own step.

4. Future: ReAct loops (ADR-0011)

Today: Deferred. The reason ReAct was deferred was complexity around "how do we durably manage a 5–10 step tool-use loop without losing state on a worker crash?" — Workflow answers exactly that question. When ReAct is reconsidered (post-MVP), Workflow becomes the runtime for the think → act → observe → think loop.

What we will NOT migrate to Workflow

  • Single-message fire-and-forget QStash jobs (e.g., "send this email now", "post this single webhook"). Plain qstash_client.publish() stays. Workflow's value is multi-step durability — it's overkill for a one-shot publish.
  • Real-time conversation chat turns. The chat flow is request → response within ~5–8 seconds and lives entirely inside one HTTP request. Workflow would add latency without adding value here.
  • Chat extractor / EHR rebuild on every turn. These are short background tasks that the user has explicit decisions about (deferred vs sync per Layer 3 spec). Workflow is too heavy.

Rationale

  1. Durable resume is the killer feature for healthcare workflows. When a patient uploads a document, every step from OCR to EHR rebuild is a step the patient has paid for in attention. Losing partial work and silently abandoning the pipeline is the worst possible UX. Upstash Workflow makes "every completed step is permanent" the default.

  2. Zero new infrastructure. We are already paying for QStash (free tier). Workflow runs on the same account, the same SDK, the same delivery primitives. No new container, no new dashboard, no new secret. The marginal cost per workflow run is the same as a single QStash message, scaled by step count.

  3. Step IDs are deterministic by name. context.run("ocr", ...) is the same step on a retry; we don't need to invent our own idempotency keys for each callback (which we currently do, sometimes inconsistently — see Session 23B per-document dedup bug).

  4. The Upstash console gives us a free workflow visualizer. Today, to understand "what happened to this document", we cross-reference Langfuse, the events table, the document_references table, and the Railway logs. With Workflow, that becomes one URL with the full DAG.

  5. It's a stepping stone to ReAct (ADR-0011). The Workflow primitives (context.run, context.call, context.waitForEvent) are exactly what ReAct needs. Building familiarity with Workflow on the document pipeline first makes the eventual ReAct rollout much cheaper.

  6. It's trivially reversible. Workflow definitions are TypeScript / Python functions on the same service. If we decide to roll back, we replace the workflow function with the existing inline / QStash callback code. There is no schema change, no data migration.

Alternatives considered

  • Stay with raw QStash callbacks. The status quo. Works, but leaks state across HTTP boundaries, has no built-in retry, and is invisible outside the events table. Rejected because the operational pain of document pipeline debugging in Sessions 23B–24 was a direct symptom of this approach.
  • Temporal. Industry-standard durable workflow engine. Powerful but requires its own cluster (or paid Temporal Cloud), introduces a new language runtime (Go SDK is the most mature), and is wildly oversized for POC traffic. Reconsider post-Series A.
  • Celery or Dramatiq with a dedicated worker. Would require a second Railway container, a Redis broker, and a worker process tree. We explicitly chose the modular monolith pattern (Ground Rule 7) to avoid exactly this kind of operational fragmentation.
  • AWS Step Functions / GCP Workflows. Cloud-vendor-specific. Curaway runs entirely on managed-third-party services (Railway, Cloudflare R2, Upstash, Neon). Adopting a hyperscaler workflow service would be the first time we've coupled to one. Rejected to keep the "every backend on free or near-free managed services" property.
  • LangGraph durable checkpointing. LangGraph supports SQLite and Postgres checkpointers for durable agent state. Tempting because we already use LangGraph for the agent pipeline. But: (a) checkpointing is for single-process graph state, not distributed multi-step pipelines that span HTTP boundaries, and (b) the document pipeline isn't a LangGraph graph — it's an orchestration of LLM calls plus pure-function processing, with HTTP boundaries between pieces. LangGraph stays for inside-an-agent state; Workflow handles inter-agent and inter-service orchestration.

Consequences

Positive

  • Document upload pipeline failures become recoverable instead of fatal.
  • Single workflow ID per pipeline run gives us a clean correlation key across Langfuse, events, and the Upstash Workflow console.
  • Provider webhook delivery becomes reliable without us building a custom retry system.
  • ReAct (ADR-0011) gets a head start when we revisit it — the runtime is already in production.
  • Adds one service to the spend dashboard's "free tier" indicator (already counted under Upstash).

Negative

  • New mental model for the team: "step functions" instead of "callbacks". Mitigation: the first migration (document pipeline) is mechanical conversion of existing code, not a redesign.
  • Upstash Workflow is younger than QStash; the API may evolve. Mitigation: pin to a specific SDK version and gate adoption per pipeline.
  • Each workflow run consumes 1 QStash message per step, multiplying our free-tier usage. A 7-step document pipeline at 100 docs/day = 700 messages/day, comfortably above the 500/day free tier. We may need to upgrade QStash to the next paid tier ($10/mo) sooner than otherwise. Acceptable — already in budget.

Neutral

  • Workflow steps are HTTP-driven, so each step has a serialization boundary. We can't pass huge in-memory objects between steps; pipelines that need to share large state should write it to Postgres or R2 between steps and pass an ID. This is mostly already how we do it.

Implementation checklist

When we execute this ADR (separate session):

  • [ ] Add @upstash/workflow (or the Python equivalent if Upstash ships one — currently TypeScript-first; if we need Python we'll wrap the QStash REST API ourselves)
  • [ ] Verify Upstash Workflow is enabled on the same QStash account we already use (no new secret should be needed)
  • [ ] Migrate run_post_ocr_pipeline (document upload) first as the proof of concept
  • [ ] Add a workflow ID column to the events table for correlation
  • [ ] Update landscape/spend.json Upstash fetcher to count workflow message usage explicitly
  • [ ] Document the migration pattern in docs/runbook/
  • [ ] Migrate provider forwarding once document pipeline has been stable for a session
  • [ ] Defer scheduled job migration until 1+2 are proven
  • [ ] Defer ReAct integration until ADR-0011 is reopened

References

  • ADR-0011: ReAct Pattern — Evaluated, Deferred (adr/011-react-pattern-deferred.md)
  • ADR-0010: Synchronous OCR Before Orchestration (adr/0010-synchronous-ocr-before-orchestration.md)
  • Upstash QStash docs: https://upstash.com/docs/qstash
  • Upstash Workflow docs: https://upstash.com/docs/workflow
  • app/integrations/qstash_client.py — current QStash usage
  • app/services/document_processing.py:run_post_ocr_pipeline — first migration target