Skip to content

02 — Real-Time Event-Driven Design

The platform is fundamentally event-driven. Clinical data arrives asynchronously from multiple sources (chat, documents, agents) and the system updates the patient record on every event. Not a batch system.

Events Table (Source of Truth)

Append-only PostgreSQL table. Every meaningful action writes a row.

CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    event_type VARCHAR(100) NOT NULL,       -- e.g., 'clinical_entities_extracted'
    tenant_id VARCHAR(100) NOT NULL,
    actor_id VARCHAR(100),                   -- user or agent that triggered
    patient_id UUID,
    payload JSONB NOT NULL DEFAULT '{}',
    correlation_id VARCHAR(100) NOT NULL,
    locale VARCHAR(10),
    source_service VARCHAR(50),              -- 'intake_agent', 'document_service', etc.
    api_version VARCHAR(10) DEFAULT 'v1',
    client_type VARCHAR(20),                 -- 'web', 'ios', 'android', 'mcp'
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_events_patient ON events(patient_id, created_at DESC);
CREATE INDEX idx_events_type ON events(event_type, created_at DESC);
CREATE INDEX idx_events_tenant ON events(tenant_id, created_at DESC);
CREATE INDEX idx_events_correlation ON events(correlation_id);

Event Flow Pattern

User Action / System Process
Event written to events table (synchronous, within request transaction)
    ├──▶ QStash dispatches async downstream handlers (non-blocking)
    ├──▶ SSE pushes real-time update to connected frontend client
    └──▶ Downstream agents/services react (e.g., Clinical Context Agent on document_parsed)

Core Event Types

Event Type Trigger Downstream Reactions
patient_registered New patient signs up Welcome email, intake agent init
consent_granted Patient grants consent Unblock data processing pipeline
consent_revoked Patient revokes consent Block further processing for that purpose
document_uploaded File confirmed in R2 QStash dispatches OCR job
document_parsing OCR started SSE: status update to frontend
document_parsed OCR/extraction complete Auto-chain to Clinical Context Agent
document_failed OCR/extraction failed SSE: error notification, retry queued
clinical_entities_extracted Agent extracts conditions/meds Update EHR, update intake progress
fhir_resource_created FHIR resource validated + stored Update Neo4j graph, Qdrant embedding
fhir_resource_updated Existing FHIR resource modified Recalculate affected matches
intake_progress_updated Any new data arrives Check threshold for matching readiness
chat_message_sent Patient sends message Orchestrator routes to agent
agent_response_generated Agent produces output SSE push to frontend, Langfuse trace
match_requested Patient or system triggers match Execute 4-stage pipeline
match_completed Scoring and ranking done Generate explanations, notify patient
match_explanation_generated Explanation agent completes SSE: results to frontend

SSE (Server-Sent Events) Streaming

Why SSE, Not Polling

Polling exhausted browser resources in testing. SSE provides a persistent one-way channel from server to client.

SSE Endpoint

GET /api/v1/patients/{patient_id}/events/stream
  • Requires Clerk auth + X-Tenant-ID header
  • Returns StreamingResponse with media_type="text/event-stream"
  • Server-side polling of events table (every 3–5 seconds)
  • Sends typed events when state changes detected
  • Maximum stream duration: 5 minutes (sends timeout, closes)
  • Client disconnect triggers cleanup

SSE Event Format

event: document_update
data: {"document_id": "xxx", "status": "parsed", "extracted_entities_count": 5}

event: ehr_update
data: {"field": "conditions", "action": "added", "value": "M17.11 Primary osteoarthritis"}

event: intake_progress
data: {"progress": 0.62, "missing_fields": ["allergies", "blood_type"]}

event: match_progress
data: {"stage": "neo4j_filtering", "candidates_remaining": 12}

event: agent_response
data: {"agent": "intake", "content": "Based on your diabetes...", "suggested_actions": [...]}

Frontend SSE Client Pattern

// useEventStream.ts hook
const useEventStream = (patientId: string) => {
  const [events, setEvents] = useState<SSEEvent[]>([]);
  const controllerRef = useRef<AbortController>();

  useEffect(() => {
    const controller = new AbortController();
    controllerRef.current = controller;

    const connect = async () => {
      const token = await getToken();
      const response = await fetch(
        `${API_BASE}/api/v1/patients/${patientId}/events/stream`,
        {
          headers: { Authorization: `Bearer ${token}`, 'X-Tenant-ID': TENANT_ID },
          signal: controller.signal,
        }
      );
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      // ... parse SSE format, dispatch to event handlers
    };

    connect();
    return () => controller.abort();
  }, [patientId]);
};

QStash Async Event Bus

Dispatches non-blocking tasks that should not delay request/response:

Task Trigger Event Handler
Document OCR document_uploaded PyMuPDF → Unstructured.io → Claude vision
Clinical extraction document_parsed Clinical Context Agent auto-chain
Email notification match_completed Resend template rendering
Intake reminder Cron (hourly) Notify patients with stale intakes (>24h)
Exchange rate refresh Cron (daily 00:00 UTC) Frankfurter API → currency cache
Consent expiry warning Cron (daily 09:00 UTC) Warn 30-day expiry
Stale session cleanup Cron (daily 03:00 UTC) Clean abandoned intakes
Analytics refresh Cron (hourly) Refresh PostgreSQL materialized views

QStash provides: exactly-once delivery, configurable retry (count + backoff), dead letter queue for failed messages, HTTP callback-based (stateless handlers).