Spec: EHR Builder Service Class (Gap #9)¶
Status: Deferred — tactical bridge in place (PR #262), full refactor needed Effort: 5-7 days (increased from 3-5 — scope expanded for progressive EHR) Tier: Opus Issue: #173 Last audited: 2026-04-19 (Session 50)
Problem¶
rebuild_ehr_for_case() in app/services/ehr_rebuild_service.py and run_ehr_builder() in app/agents/ehr_builder_agent.py total ~750 lines of imperative code. They work for document-triggered full rebuilds but lack:
- Progressive rebuild from conversation — 5 triage layers extract rich data (demographics, diagnosis, travel, logistics, financial) that never reaches EHR until documents are uploaded
- Merge rules when multiple sources report the same condition
- Conflict detection (e.g., two documents disagree on laterality)
- Source priority ordering (agent > ocr > conversation > manual > ported)
- Confidence-weighted deduplication (field exists but is never populated)
- Formal EHR snapshot schema — currently a flexible dict with no contract
Current State (Post-Session 50)¶
A tactical bridge in case_orchestrator._handle_intake_triage() (PR #262) writes patient_demographics and diagnosis from layer_state to case.ehr_snapshot on every triage turn. This is a stopgap — it writes directly to the snapshot without merge rules, dedup, or the builder service.
Design¶
EHRBuilderService class¶
class EHRBuilderService:
"""Builds and maintains the EHR snapshot for a case.
Two rebuild paths:
1. Full rebuild — from FHIR resources + documents (after document processing)
2. Incremental rebuild — from layer_state (after each triage turn)
Both paths produce the same EHR snapshot schema.
"""
def __init__(self, db: AsyncSession, case_id: str, tenant_id: str):
self.db = db
self.case_id = case_id
self.tenant_id = tenant_id
async def rebuild(self) -> dict:
"""Full rebuild from all FHIR + document + conversation sources."""
async def rebuild_from_layer_state(self, layer_state: dict) -> dict:
"""Incremental rebuild from triage layer_state.
Called after each triage turn. Merges conversation-extracted data
into existing EHR snapshot without overwriting document-sourced data.
"""
async def merge_record(self, new_resource: FHIRResource) -> MergeResult:
"""Merge a single new FHIR resource into the existing EHR."""
def _resolve_conflict(self, existing: dict, incoming: dict) -> dict:
"""Apply source priority + confidence to resolve conflicts."""
def _deduplicate(self, records: list[dict]) -> list[dict]:
"""Remove duplicates using ICD-10 code + name fuzzy match."""
Source Priority¶
agent— Clinical Context Agent extraction from documents (highest confidence)ocr— Direct OCR text extractionconversation— Triage agent layer_state extraction (NEW)intake— Patient self-report via legacy intakemanual— Coordinator entryported— From prior case
Layer State → EHR Field Map¶
| Layer | Layer State Field | EHR Snapshot Field |
|---|---|---|
medical_status |
patient_demographics.patient_name |
patient_demographics.name |
medical_status |
patient_demographics.gender |
patient_demographics.gender |
medical_status |
patient_demographics.location.city |
patient_demographics.city |
medical_status |
patient_demographics.location.country |
patient_demographics.country |
medical_status |
patient_demographics.date_of_birth |
patient_demographics.date_of_birth |
medical_status |
age |
patient_demographics.age |
medical_status |
diagnosis.description |
medical_history.conditions[].name (source: conversation) |
medical_status |
procedure.name |
primary_condition.procedure |
medical_status |
symptoms |
medical_history.symptoms |
medical_status |
medications |
medical_history.medications |
medical_status |
allergies |
medical_history.allergies |
medical_status |
comorbidities |
medical_history.comorbidities |
intent_capture |
case_type |
case_context.urgency |
intent_capture |
trigger_event.description |
case_context.reason_for_seeking_care |
logistics |
country_of_residence |
patient_demographics.country (if not set by medical) |
logistics |
companion |
travel_plan.companion |
logistics |
timeline.preferred_start |
travel_plan.preferred_timeline |
travel_readiness |
transport_tier |
travel_plan.transport_tier |
financial_readiness |
funding_source |
financial.funding_source |
financial_readiness |
budget_range |
financial.budget_range |
EHR Snapshot Schema (Contract)¶
interface EHRSnapshot {
// Demographics
patient_demographics: {
name?: string;
age?: number;
gender?: string;
date_of_birth?: string;
city?: string;
country?: string;
location?: string; // "City, Country" display string
language_preference?: string;
preferred_currency?: string;
};
// Clinical
medical_history: {
conditions: Array<{
name: string;
icd10?: string;
snomed?: string;
source: string; // filename or "conversation"
confidence?: number; // 0-1, null = unverified
laterality?: string;
status?: string; // active, resolved, suspected
}>;
symptoms?: string[];
medications?: string[];
allergies?: string[];
comorbidities?: string[];
};
// Procedure
primary_condition: {
procedure?: string;
procedure_code?: string;
icd10_codes?: string[];
};
// Context (from conversation)
case_context?: {
urgency?: string; // elective, selective, urgent
reason_for_seeking_care?: string;
speaker_relationship?: string; // self, child, parent, spouse
};
// Travel & logistics
travel_plan?: {
companion?: { relationship: string; traveling_with: boolean };
preferred_timeline?: string;
transport_tier?: string;
};
// Financial
financial?: {
funding_source?: string;
budget_range?: { min_usd_cents?: number; max_usd_cents?: number };
};
// Documents
documents: Array<{
id: string;
filename: string;
status: string;
extracted_entities_count?: number;
}>;
// Scoring
risk_factors: Array<{ name: string; severity: string; source: string }>;
completeness_score: number; // 0-1
missing_information: string[];
// Metadata
built_at: string; // ISO timestamp
_rebuild_sources: {
fhir_count: number;
document_count: number;
conversation_layers_used: string[];
rebuilt_at: string;
};
}
Merge Rules¶
| Scenario | Rule |
|---|---|
| Same ICD-10 code, same source | Keep latest version |
| Same ICD-10 code, different source | Keep higher-priority source |
| Same condition name, different codes | Flag for review, keep both |
| Conflicting laterality | Flag for review, keep both with conflict marker |
| Confidence < 0.5 | Mark as "unverified" in EHR |
| Conversation-sourced vs document-sourced | Document wins (higher confidence), conversation kept as fallback |
| Demographics from conversation vs document | Document wins for clinical fields (age from DOB), conversation wins for contact/location |
Progressive Rebuild Triggers¶
| Event | Rebuild Type | What Updates |
|---|---|---|
| Each triage turn | rebuild_from_layer_state() |
Demographics, diagnosis, symptoms, context |
| Document processed (OCR complete) | rebuild() (full) |
Everything — FHIR + documents + conversation |
| FHIR resource created manually | merge_record() |
Single condition/observation |
| Case porting | rebuild() (full) |
Ported records merged with dedup |
Confidence Pipeline (Blocked)¶
Current state:
- FHIRResource.confidence column exists (Float, nullable)
- Clinical Context Agent's map_to_medical_codes extracts confidence per entity
- But FHIRResourceCreate schema has no confidence field
- store_resources() never passes confidence
Pre-requisite before implementing confidence-weighted merge:
1. Add confidence: float | None = None to FHIRResourceCreate schema
2. Pass entity.get("confidence") in clinical_context.store_resources()
3. Use in _resolve_conflict() and _deduplicate()
Dependencies¶
Gap #17 (FHIR confidence field)Column exists, needs wiring (see above)- Stable FHIR resource schema
- Layer state field map (defined above)
When to Build¶
Build this when:
1. Multiple document uploads per case become common Already happening (8 docs in test case)
2. Provider-submitted records need merging Phase 2 coordinator flow is live
3. Progressive EHR is needed for pre-document intake experience (NOW — patients see empty EHR panel)
4. Wave 1 triage agent is default path (flag flipped to true)
Migration from Tactical Bridge¶
When implementing this service:
1. Move demographics bridging from case_orchestrator._handle_intake_triage() into rebuild_from_layer_state()
2. Call EHRBuilderService.rebuild_from_layer_state() instead of direct ehr_snapshot writes
3. Keep rebuild() (full) for document processing path
4. Both paths produce identical schema output
Edge Cases¶
- Same patient, multiple cases — each case has its own EHR snapshot. FHIR resources are patient-level but EHR snapshots are case-level. Document from case A shouldn't appear in case B's EHR unless ported.
- Observation value conflicts — same lab (HbA1c) from two documents with different values. Keep both with source attribution, flag for coordinator review.
- Procedure identified mid-conversation — completeness score jumps when procedure is set. Recalculate requirements.
- Laterality conflicts — "left knee" vs "right knee" from different sources. Keep both with conflict marker, don't auto-resolve.
- Demographics conflict — document says age 15 (from 2022 report), conversation says born 2004 (= age 21 now). Document age is stale; compute from DOB.