Skip to content

Spec: EHR Builder Service Class (Gap #9)

Status: Deferred — tactical bridge in place (PR #262), full refactor needed Effort: 5-7 days (increased from 3-5 — scope expanded for progressive EHR) Tier: Opus Issue: #173 Last audited: 2026-04-19 (Session 50)

Problem

rebuild_ehr_for_case() in app/services/ehr_rebuild_service.py and run_ehr_builder() in app/agents/ehr_builder_agent.py total ~750 lines of imperative code. They work for document-triggered full rebuilds but lack:

  1. Progressive rebuild from conversation — 5 triage layers extract rich data (demographics, diagnosis, travel, logistics, financial) that never reaches EHR until documents are uploaded
  2. Merge rules when multiple sources report the same condition
  3. Conflict detection (e.g., two documents disagree on laterality)
  4. Source priority ordering (agent > ocr > conversation > manual > ported)
  5. Confidence-weighted deduplication (field exists but is never populated)
  6. Formal EHR snapshot schema — currently a flexible dict with no contract

Current State (Post-Session 50)

A tactical bridge in case_orchestrator._handle_intake_triage() (PR #262) writes patient_demographics and diagnosis from layer_state to case.ehr_snapshot on every triage turn. This is a stopgap — it writes directly to the snapshot without merge rules, dedup, or the builder service.

Design

EHRBuilderService class

class EHRBuilderService:
    """Builds and maintains the EHR snapshot for a case.

    Two rebuild paths:
    1. Full rebuild — from FHIR resources + documents (after document processing)
    2. Incremental rebuild — from layer_state (after each triage turn)

    Both paths produce the same EHR snapshot schema.
    """

    def __init__(self, db: AsyncSession, case_id: str, tenant_id: str):
        self.db = db
        self.case_id = case_id
        self.tenant_id = tenant_id

    async def rebuild(self) -> dict:
        """Full rebuild from all FHIR + document + conversation sources."""

    async def rebuild_from_layer_state(self, layer_state: dict) -> dict:
        """Incremental rebuild from triage layer_state.

        Called after each triage turn. Merges conversation-extracted data
        into existing EHR snapshot without overwriting document-sourced data.
        """

    async def merge_record(self, new_resource: FHIRResource) -> MergeResult:
        """Merge a single new FHIR resource into the existing EHR."""

    def _resolve_conflict(self, existing: dict, incoming: dict) -> dict:
        """Apply source priority + confidence to resolve conflicts."""

    def _deduplicate(self, records: list[dict]) -> list[dict]:
        """Remove duplicates using ICD-10 code + name fuzzy match."""

Source Priority

  1. agent — Clinical Context Agent extraction from documents (highest confidence)
  2. ocr — Direct OCR text extraction
  3. conversation — Triage agent layer_state extraction (NEW)
  4. intake — Patient self-report via legacy intake
  5. manual — Coordinator entry
  6. ported — From prior case

Layer State → EHR Field Map

Layer Layer State Field EHR Snapshot Field
medical_status patient_demographics.patient_name patient_demographics.name
medical_status patient_demographics.gender patient_demographics.gender
medical_status patient_demographics.location.city patient_demographics.city
medical_status patient_demographics.location.country patient_demographics.country
medical_status patient_demographics.date_of_birth patient_demographics.date_of_birth
medical_status age patient_demographics.age
medical_status diagnosis.description medical_history.conditions[].name (source: conversation)
medical_status procedure.name primary_condition.procedure
medical_status symptoms medical_history.symptoms
medical_status medications medical_history.medications
medical_status allergies medical_history.allergies
medical_status comorbidities medical_history.comorbidities
intent_capture case_type case_context.urgency
intent_capture trigger_event.description case_context.reason_for_seeking_care
logistics country_of_residence patient_demographics.country (if not set by medical)
logistics companion travel_plan.companion
logistics timeline.preferred_start travel_plan.preferred_timeline
travel_readiness transport_tier travel_plan.transport_tier
financial_readiness funding_source financial.funding_source
financial_readiness budget_range financial.budget_range

EHR Snapshot Schema (Contract)

interface EHRSnapshot {
  // Demographics
  patient_demographics: {
    name?: string;
    age?: number;
    gender?: string;
    date_of_birth?: string;
    city?: string;
    country?: string;
    location?: string;           // "City, Country" display string
    language_preference?: string;
    preferred_currency?: string;
  };

  // Clinical
  medical_history: {
    conditions: Array<{
      name: string;
      icd10?: string;
      snomed?: string;
      source: string;            // filename or "conversation"
      confidence?: number;       // 0-1, null = unverified
      laterality?: string;
      status?: string;           // active, resolved, suspected
    }>;
    symptoms?: string[];
    medications?: string[];
    allergies?: string[];
    comorbidities?: string[];
  };

  // Procedure
  primary_condition: {
    procedure?: string;
    procedure_code?: string;
    icd10_codes?: string[];
  };

  // Context (from conversation)
  case_context?: {
    urgency?: string;            // elective, selective, urgent
    reason_for_seeking_care?: string;
    speaker_relationship?: string;  // self, child, parent, spouse
  };

  // Travel & logistics
  travel_plan?: {
    companion?: { relationship: string; traveling_with: boolean };
    preferred_timeline?: string;
    transport_tier?: string;
  };

  // Financial
  financial?: {
    funding_source?: string;
    budget_range?: { min_usd_cents?: number; max_usd_cents?: number };
  };

  // Documents
  documents: Array<{
    id: string;
    filename: string;
    status: string;
    extracted_entities_count?: number;
  }>;

  // Scoring
  risk_factors: Array<{ name: string; severity: string; source: string }>;
  completeness_score: number;    // 0-1
  missing_information: string[];

  // Metadata
  built_at: string;              // ISO timestamp
  _rebuild_sources: {
    fhir_count: number;
    document_count: number;
    conversation_layers_used: string[];
    rebuilt_at: string;
  };
}

Merge Rules

Scenario Rule
Same ICD-10 code, same source Keep latest version
Same ICD-10 code, different source Keep higher-priority source
Same condition name, different codes Flag for review, keep both
Conflicting laterality Flag for review, keep both with conflict marker
Confidence < 0.5 Mark as "unverified" in EHR
Conversation-sourced vs document-sourced Document wins (higher confidence), conversation kept as fallback
Demographics from conversation vs document Document wins for clinical fields (age from DOB), conversation wins for contact/location

Progressive Rebuild Triggers

Event Rebuild Type What Updates
Each triage turn rebuild_from_layer_state() Demographics, diagnosis, symptoms, context
Document processed (OCR complete) rebuild() (full) Everything — FHIR + documents + conversation
FHIR resource created manually merge_record() Single condition/observation
Case porting rebuild() (full) Ported records merged with dedup

Confidence Pipeline (Blocked)

Current state: - FHIRResource.confidence column exists (Float, nullable) - Clinical Context Agent's map_to_medical_codes extracts confidence per entity - But FHIRResourceCreate schema has no confidence field - store_resources() never passes confidence

Pre-requisite before implementing confidence-weighted merge: 1. Add confidence: float | None = None to FHIRResourceCreate schema 2. Pass entity.get("confidence") in clinical_context.store_resources() 3. Use in _resolve_conflict() and _deduplicate()

Dependencies

  • Gap #17 (FHIR confidence field) Column exists, needs wiring (see above)
  • Stable FHIR resource schema
  • Layer state field map (defined above)

When to Build

Build this when: 1. Multiple document uploads per case become common Already happening (8 docs in test case) 2. Provider-submitted records need merging Phase 2 coordinator flow is live 3. Progressive EHR is needed for pre-document intake experience (NOW — patients see empty EHR panel) 4. Wave 1 triage agent is default path (flag flipped to true)

Migration from Tactical Bridge

When implementing this service: 1. Move demographics bridging from case_orchestrator._handle_intake_triage() into rebuild_from_layer_state() 2. Call EHRBuilderService.rebuild_from_layer_state() instead of direct ehr_snapshot writes 3. Keep rebuild() (full) for document processing path 4. Both paths produce identical schema output

Edge Cases

  1. Same patient, multiple cases — each case has its own EHR snapshot. FHIR resources are patient-level but EHR snapshots are case-level. Document from case A shouldn't appear in case B's EHR unless ported.
  2. Observation value conflicts — same lab (HbA1c) from two documents with different values. Keep both with source attribution, flag for coordinator review.
  3. Procedure identified mid-conversation — completeness score jumps when procedure is set. Recalculate requirements.
  4. Laterality conflicts — "left knee" vs "right knee" from different sources. Keep both with conflict marker, don't auto-resolve.
  5. Demographics conflict — document says age 15 (from 2022 report), conversation says born 2004 (= age 21 now). Document age is stale; compute from DOB.