Skip to content

Steer-02: Event System & SSE

Read first: specs/sdd-mvp/02-event-driven-design.md, specs/sdd-mvp/08-conversation-design.md

Context & Goal

Build the real-time event infrastructure: typed event registry, SSE streaming endpoint, QStash handler wiring, and frontend SSE client hook. This is the plumbing that makes everything feel real-time.


Backend (curaway-ai/curaway-backend)

Step 1: Event Type Registry

Create app/events/registry.py: - Enum EventType with all core event types from spec (patient_registered, consent_granted, document_uploaded, document_parsed, clinical_entities_extracted, fhir_resource_created, intake_progress_updated, match_requested, match_completed, agent_response_generated, etc.) - EventPayload Pydantic models for each event type (typed payloads, not free-form JSONB) - emit_event(event_type, tenant_id, patient_id, payload, actor_id, correlation_id) utility function that writes to events table + dispatches to SSE subscribers

Step 2: SSE Streaming Endpoint

Create GET /api/v1/patients/{patient_id}/events/stream in app/routers/sse.py: - Returns StreamingResponse with media_type="text/event-stream" - Clerk auth + X-Tenant-ID required - On connect: send connected event with current patient state summary - Server-side poll events table every 3 seconds for new events since last check - Send typed SSE events: document_update, ehr_update, intake_progress, match_progress, agent_response - Close conditions: done event when no active processing, timeout at 5 min, client disconnect - Connection cleanup on disconnect (abort controller pattern)

Step 3: QStash Handler Endpoints

Create app/routers/webhooks.py (QStash callback endpoints): - POST /webhooks/qstash/document-parse — receives document_id, runs OCR pipeline, emits document_parsed - POST /webhooks/qstash/clinical-extract — receives document_id + extracted_text, chains to Clinical Context Agent - POST /webhooks/qstash/notification — handles email/notification dispatch - Verify QStash signature on all webhook handlers - Each handler emits appropriate events via emit_event()

Step 4: Wire Existing Endpoints to Emit Events

Audit and update existing endpoints to use emit_event(): - POST /api/v1/patients → emit patient_registered - POST /api/v1/consent → emit consent_granted or consent_revoked - POST /api/v1/uploads/confirm → emit document_uploaded, dispatch QStash OCR job - POST /api/v1/patients/{id}/chat → emit chat_message_sent, then agent_response_generated - POST /api/v1/patients/{id}/match → emit match_requested, then match_completed

Step 5: Tests

  • Test event emission writes to events table with correct schema
  • Test SSE endpoint returns proper event-stream format
  • Test QStash webhook handlers verify signature
  • Test event type registry validates payload schemas

Backend Verification

python -m pytest tests/test_events.py tests/test_sse.py -v
# Test SSE manually (uses a clean per-persona account —
# demo-patient-aisha-001 is quarantined, see
# docs/runbook/test-data-hygiene.md):
curl -N -H "Authorization: Bearer $TOKEN" -H "X-Tenant-ID: tenant-curaway-patients" \
  http://localhost:8000/api/v1/patients/demo-patient-maria-001/events/stream

Frontend (curaway-ai/curaway-frontend)

Step 1: SSE Client Hook

Create src/hooks/useEventStream.ts: - Takes patientId as parameter - Opens SSE connection to /api/v1/patients/{id}/events/stream - Includes Clerk auth token + X-Tenant-ID in headers - Parses SSE events by type, dispatches to registered handlers - AbortController for cleanup on unmount - Auto-reconnect with exponential backoff (max 3 retries) - Export typed event handlers: onDocumentUpdate, onEHRUpdate, onIntakeProgress, onMatchProgress, onAgentResponse

Step 2: Wire SSE to Existing Components

  • MessageThread → subscribe to agent_response for streamed agent messages
  • DocumentCard → subscribe to document_update for status transitions
  • EHRSummary (from steer-01) → subscribe to ehr_update
  • ProgressStrip → subscribe to intake_progress and match_progress

Step 3: Connection Status Indicator

Add subtle connection indicator (small dot in header): - 🟢 Connected (SSE active) - 🟡 Reconnecting - 🔴 Disconnected (after max retries)

Frontend Verification

npm run build
npx tsc --noEmit
# Manual: open app, check SSE connection in Network tab (EventStream type)

Checklist

  • [ ] No new migrations (events table exists)
  • [ ] Error codes: SSE_CONNECTION_001, EVENT_EMIT_001, QSTASH_VERIFY_001
  • [ ] Feature flag: sse_streaming (Flagsmith) — fallback to polling if disabled
  • [ ] CORS: ensure SSE endpoint allows streaming from app.curaway.ai
  • [ ] Env vars: QSTASH_SIGNING_KEY (for webhook verification)
  • [ ] Swagger: SSE endpoint documented (note: streaming, not JSON response)
  • [ ] Unit tests: event emission, SSE format, QStash signature verification
  • [ ] PostHog: track sse_connected, sse_reconnected, sse_disconnected
  • [ ] Mobile: SSE works on mobile browsers (tested Safari/Chrome mobile)
  • [ ] CLAUDE.md: update with event system architecture
  • [ ] Rollback: feature flag disables SSE, falls back to manual refresh