Steer-02: Event System & SSE¶
Read first:
specs/sdd-mvp/02-event-driven-design.md,specs/sdd-mvp/08-conversation-design.md
Context & Goal¶
Build the real-time event infrastructure: typed event registry, SSE streaming endpoint, QStash handler wiring, and frontend SSE client hook. This is the plumbing that makes everything feel real-time.
Backend (curaway-ai/curaway-backend)¶
Step 1: Event Type Registry¶
Create app/events/registry.py:
- Enum EventType with all core event types from spec (patient_registered, consent_granted, document_uploaded, document_parsed, clinical_entities_extracted, fhir_resource_created, intake_progress_updated, match_requested, match_completed, agent_response_generated, etc.)
- EventPayload Pydantic models for each event type (typed payloads, not free-form JSONB)
- emit_event(event_type, tenant_id, patient_id, payload, actor_id, correlation_id) utility function that writes to events table + dispatches to SSE subscribers
Step 2: SSE Streaming Endpoint¶
Create GET /api/v1/patients/{patient_id}/events/stream in app/routers/sse.py:
- Returns StreamingResponse with media_type="text/event-stream"
- Clerk auth + X-Tenant-ID required
- On connect: send connected event with current patient state summary
- Server-side poll events table every 3 seconds for new events since last check
- Send typed SSE events: document_update, ehr_update, intake_progress, match_progress, agent_response
- Close conditions: done event when no active processing, timeout at 5 min, client disconnect
- Connection cleanup on disconnect (abort controller pattern)
Step 3: QStash Handler Endpoints¶
Create app/routers/webhooks.py (QStash callback endpoints):
- POST /webhooks/qstash/document-parse — receives document_id, runs OCR pipeline, emits document_parsed
- POST /webhooks/qstash/clinical-extract — receives document_id + extracted_text, chains to Clinical Context Agent
- POST /webhooks/qstash/notification — handles email/notification dispatch
- Verify QStash signature on all webhook handlers
- Each handler emits appropriate events via emit_event()
Step 4: Wire Existing Endpoints to Emit Events¶
Audit and update existing endpoints to use emit_event():
- POST /api/v1/patients → emit patient_registered
- POST /api/v1/consent → emit consent_granted or consent_revoked
- POST /api/v1/uploads/confirm → emit document_uploaded, dispatch QStash OCR job
- POST /api/v1/patients/{id}/chat → emit chat_message_sent, then agent_response_generated
- POST /api/v1/patients/{id}/match → emit match_requested, then match_completed
Step 5: Tests¶
- Test event emission writes to events table with correct schema
- Test SSE endpoint returns proper event-stream format
- Test QStash webhook handlers verify signature
- Test event type registry validates payload schemas
Backend Verification¶
python -m pytest tests/test_events.py tests/test_sse.py -v
# Test SSE manually (uses a clean per-persona account —
# demo-patient-aisha-001 is quarantined, see
# docs/runbook/test-data-hygiene.md):
curl -N -H "Authorization: Bearer $TOKEN" -H "X-Tenant-ID: tenant-curaway-patients" \
http://localhost:8000/api/v1/patients/demo-patient-maria-001/events/stream
Frontend (curaway-ai/curaway-frontend)¶
Step 1: SSE Client Hook¶
Create src/hooks/useEventStream.ts:
- Takes patientId as parameter
- Opens SSE connection to /api/v1/patients/{id}/events/stream
- Includes Clerk auth token + X-Tenant-ID in headers
- Parses SSE events by type, dispatches to registered handlers
- AbortController for cleanup on unmount
- Auto-reconnect with exponential backoff (max 3 retries)
- Export typed event handlers: onDocumentUpdate, onEHRUpdate, onIntakeProgress, onMatchProgress, onAgentResponse
Step 2: Wire SSE to Existing Components¶
MessageThread→ subscribe toagent_responsefor streamed agent messagesDocumentCard→ subscribe todocument_updatefor status transitionsEHRSummary(from steer-01) → subscribe toehr_updateProgressStrip→ subscribe tointake_progressandmatch_progress
Step 3: Connection Status Indicator¶
Add subtle connection indicator (small dot in header): - 🟢 Connected (SSE active) - 🟡 Reconnecting - 🔴 Disconnected (after max retries)
Frontend Verification¶
npm run build
npx tsc --noEmit
# Manual: open app, check SSE connection in Network tab (EventStream type)
Checklist¶
- [ ] No new migrations (events table exists)
- [ ] Error codes: SSE_CONNECTION_001, EVENT_EMIT_001, QSTASH_VERIFY_001
- [ ] Feature flag:
sse_streaming(Flagsmith) — fallback to polling if disabled - [ ] CORS: ensure SSE endpoint allows streaming from app.curaway.ai
- [ ] Env vars: QSTASH_SIGNING_KEY (for webhook verification)
- [ ] Swagger: SSE endpoint documented (note: streaming, not JSON response)
- [ ] Unit tests: event emission, SSE format, QStash signature verification
- [ ] PostHog: track
sse_connected,sse_reconnected,sse_disconnected - [ ] Mobile: SSE works on mobile browsers (tested Safari/Chrome mobile)
- [ ] CLAUDE.md: update with event system architecture
- [ ] Rollback: feature flag disables SSE, falls back to manual refresh