02 — Real-Time Event-Driven Design¶
The platform is fundamentally event-driven. Clinical data arrives asynchronously from multiple sources (chat, documents, agents) and the system updates the patient record on every event. Not a batch system.
Events Table (Source of Truth)¶
Append-only PostgreSQL table. Every meaningful action writes a row.
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_type VARCHAR(100) NOT NULL, -- e.g., 'clinical_entities_extracted'
tenant_id VARCHAR(100) NOT NULL,
actor_id VARCHAR(100), -- user or agent that triggered
patient_id UUID,
payload JSONB NOT NULL DEFAULT '{}',
correlation_id VARCHAR(100) NOT NULL,
locale VARCHAR(10),
source_service VARCHAR(50), -- 'intake_agent', 'document_service', etc.
api_version VARCHAR(10) DEFAULT 'v1',
client_type VARCHAR(20), -- 'web', 'ios', 'android', 'mcp'
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_events_patient ON events(patient_id, created_at DESC);
CREATE INDEX idx_events_type ON events(event_type, created_at DESC);
CREATE INDEX idx_events_tenant ON events(tenant_id, created_at DESC);
CREATE INDEX idx_events_correlation ON events(correlation_id);
Event Flow Pattern¶
User Action / System Process
│
▼
Event written to events table (synchronous, within request transaction)
│
├──▶ QStash dispatches async downstream handlers (non-blocking)
│
├──▶ SSE pushes real-time update to connected frontend client
│
└──▶ Downstream agents/services react (e.g., Clinical Context Agent on document_parsed)
Core Event Types¶
| Event Type | Trigger | Downstream Reactions |
|---|---|---|
patient_registered |
New patient signs up | Welcome email, intake agent init |
consent_granted |
Patient grants consent | Unblock data processing pipeline |
consent_revoked |
Patient revokes consent | Block further processing for that purpose |
document_uploaded |
File confirmed in R2 | QStash dispatches OCR job |
document_parsing |
OCR started | SSE: status update to frontend |
document_parsed |
OCR/extraction complete | Auto-chain to Clinical Context Agent |
document_failed |
OCR/extraction failed | SSE: error notification, retry queued |
clinical_entities_extracted |
Agent extracts conditions/meds | Update EHR, update intake progress |
fhir_resource_created |
FHIR resource validated + stored | Update Neo4j graph, Qdrant embedding |
fhir_resource_updated |
Existing FHIR resource modified | Recalculate affected matches |
intake_progress_updated |
Any new data arrives | Check threshold for matching readiness |
chat_message_sent |
Patient sends message | Orchestrator routes to agent |
agent_response_generated |
Agent produces output | SSE push to frontend, Langfuse trace |
match_requested |
Patient or system triggers match | Execute 4-stage pipeline |
match_completed |
Scoring and ranking done | Generate explanations, notify patient |
match_explanation_generated |
Explanation agent completes | SSE: results to frontend |
SSE (Server-Sent Events) Streaming¶
Why SSE, Not Polling¶
Polling exhausted browser resources in testing. SSE provides a persistent one-way channel from server to client.
SSE Endpoint¶
- Requires Clerk auth + X-Tenant-ID header
- Returns
StreamingResponsewithmedia_type="text/event-stream" - Server-side polling of events table (every 3–5 seconds)
- Sends typed events when state changes detected
- Maximum stream duration: 5 minutes (sends
timeout, closes) - Client disconnect triggers cleanup
SSE Event Format¶
event: document_update
data: {"document_id": "xxx", "status": "parsed", "extracted_entities_count": 5}
event: ehr_update
data: {"field": "conditions", "action": "added", "value": "M17.11 Primary osteoarthritis"}
event: intake_progress
data: {"progress": 0.62, "missing_fields": ["allergies", "blood_type"]}
event: match_progress
data: {"stage": "neo4j_filtering", "candidates_remaining": 12}
event: agent_response
data: {"agent": "intake", "content": "Based on your diabetes...", "suggested_actions": [...]}
Frontend SSE Client Pattern¶
// useEventStream.ts hook
const useEventStream = (patientId: string) => {
const [events, setEvents] = useState<SSEEvent[]>([]);
const controllerRef = useRef<AbortController>();
useEffect(() => {
const controller = new AbortController();
controllerRef.current = controller;
const connect = async () => {
const token = await getToken();
const response = await fetch(
`${API_BASE}/api/v1/patients/${patientId}/events/stream`,
{
headers: { Authorization: `Bearer ${token}`, 'X-Tenant-ID': TENANT_ID },
signal: controller.signal,
}
);
const reader = response.body.getReader();
const decoder = new TextDecoder();
// ... parse SSE format, dispatch to event handlers
};
connect();
return () => controller.abort();
}, [patientId]);
};
QStash Async Event Bus¶
Dispatches non-blocking tasks that should not delay request/response:
| Task | Trigger Event | Handler |
|---|---|---|
| Document OCR | document_uploaded |
PyMuPDF → Unstructured.io → Claude vision |
| Clinical extraction | document_parsed |
Clinical Context Agent auto-chain |
| Email notification | match_completed |
Resend template rendering |
| Intake reminder | Cron (hourly) | Notify patients with stale intakes (>24h) |
| Exchange rate refresh | Cron (daily 00:00 UTC) | Frankfurter API → currency cache |
| Consent expiry warning | Cron (daily 09:00 UTC) | Warn 30-day expiry |
| Stale session cleanup | Cron (daily 03:00 UTC) | Clean abandoned intakes |
| Analytics refresh | Cron (hourly) | Refresh PostgreSQL materialized views |
QStash provides: exactly-once delivery, configurable retry (count + backoff), dead letter queue for failed messages, HTTP callback-based (stateless handlers).