Skip to content

FHIR Provenance & Document Traceability — Feature Spec

Date: 2026-04-10 (Session 35) Status: Implemented in Session 35 Companion steer: ai-steer/fhir-provenance-steer.md


Summary

Every FHIR resource traces to its source — the document it was extracted from, the chat message it was mentioned in, or the API call that created it. Source attribution flows through the EHR snapshot to the frontend, so patients and providers see where each finding came from.

Ships behind fhir_provenance_v1 feature flag (default true).


Part A — Data Model Changes

A1. FHIRResource model — 2 new columns

# app/models/fhir_resource.py

# Add after 'created_by' field:
document_id: Mapped[str | None] = mapped_column(
    String(36), nullable=True, index=True,
)
# The DocumentReference.id that this FHIR resource was extracted from.
# Null for chat-extracted or manually created resources.

case_id: Mapped[str | None] = mapped_column(
    String(36), nullable=True, index=True,
)
# The case active when this resource was created.
# Used for case-scoped EHR queries (porting spec).

Migration: Alembic add_column — nullable, no default. Existing rows stay null (legacy data, acceptable).

Index: ix_fhir_resources_document_id for cascade queries. ix_fhir_resources_case_id for case-scoped EHR.

A2. FHIRResourceCreate schema — 2 new optional fields

# app/schemas/fhir.py

class FHIRResourceCreate(BaseModel):
    # ... existing fields ...
    document_id: str | None = None
    case_id: str | None = None

A3. FHIR Provenance resource type

New resource type Provenance stored in fhir_resources table:

{
  "resourceType": "Provenance",
  "target": [{"reference": "Condition/agent-condition-abc123"}],
  "recorded": "2026-04-10T12:00:00Z",
  "agent": [{
    "type": {"text": "assembler"},
    "who": {"display": "Curaway Clinical Context Agent"}
  }],
  "entity": [{
    "role": "source",
    "what": {
      "display": "blood_work_feb2024.pdf",
      "identifier": {"value": "doc-uuid-here"}
    }
  }]
}

Created alongside each clinical FHIR resource in store_resources().

A4. Data forwarding audit model

# app/models/data_forwarding_audit.py (NEW)

class DataForwardingAudit(Base, UUIDPrimaryKeyMixin, TenantScopedMixin, TimestampMixin):
    __tablename__ = "data_forwarding_audits"

    case_id: Mapped[str] = mapped_column(String(36), nullable=False, index=True)
    patient_id: Mapped[str] = mapped_column(String(36), nullable=False)
    provider_ids: Mapped[list] = mapped_column(FlexibleJSON, nullable=False)
    consent_record_id: Mapped[str] = mapped_column(String(36), nullable=False)
    fhir_resource_ids: Mapped[list] = mapped_column(FlexibleJSON, nullable=False)
    document_ids: Mapped[list] = mapped_column(FlexibleJSON, nullable=False)
    data_categories: Mapped[list] = mapped_column(FlexibleJSON, nullable=False)
    # ["conditions", "medications", "observations", "allergies", "procedures"]
    ehr_snapshot_hash: Mapped[str] = mapped_column(String(64), nullable=False)
    # SHA-256 of the EHR snapshot JSON at time of forwarding
    forwarded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)

Part B — Pipeline Changes

B1. Pass document_id through document processing pipeline

# app/services/document_processing.py
# Line ~175 — currently:
agent_result = await run_clinical_context_agent(
    db=db, patient_id=patient_id, tenant_id=tenant_id,
    text=extracted_text, report_type="general",
    langfuse_handler=langfuse_handler,
)

# Change to:
agent_result = await run_clinical_context_agent(
    db=db, patient_id=patient_id, tenant_id=tenant_id,
    text=extracted_text, report_type="general",
    document_id=doc_id,           # NEW
    case_id=active_case_id,       # NEW — look up active case
    document_filename=doc.original_filename,  # NEW — for Provenance display
    langfuse_handler=langfuse_handler,
)

Active case lookup: Query cases table for the most recent non-forwarded case for this patient. If no active case, case_id=None.

B2. ClinicalContextState schema update

# app/agents/clinical_context.py

class ClinicalContextState(TypedDict):
    patient_id: str
    tenant_id: str
    raw_text: str
    report_type: str
    document_id: str | None       # NEW
    case_id: str | None           # NEW
    document_filename: str | None # NEW — for Provenance.entity.what.display
    # ... existing fields ...

B3. store_resources() — write document_id + Provenance

# app/agents/clinical_context.py:store_resources()

async def store_resources(state, db):
    document_id = state.get("document_id")
    case_id = state.get("case_id")
    document_filename = state.get("document_filename", "Unknown document")

    for resource_json in state["fhir_resources"]:
        # 1. Create the clinical FHIR resource (existing)
        data = FHIRResourceCreate(
            resource_type=resource_type,
            fhir_id=fhir_id,
            resource_json=resource_json,
            source="agent",
            document_id=document_id,   # NEW
            case_id=case_id,           # NEW
            icd_codes=icd_codes,
            snomed_codes=snomed_codes,
            notes=f"Extracted from {document_filename}",  # NEW — specific
        )
        resource = await fhir_service.create_fhir_resource(...)

        # 2. Create FHIR Provenance resource (NEW)
        if document_id:
            provenance_json = {
                "resourceType": "Provenance",
                "target": [{"reference": f"{resource_type}/{fhir_id}"}],
                "recorded": datetime.utcnow().isoformat() + "Z",
                "agent": [{"type": {"text": "assembler"},
                           "who": {"display": "Curaway Clinical Context Agent"}}],
                "entity": [{"role": "source",
                            "what": {"display": document_filename,
                                     "identifier": {"value": document_id}}}],
            }
            await fhir_service.create_fhir_resource(
                db=db, patient_id=state["patient_id"],
                tenant_id=state["tenant_id"],
                data=FHIRResourceCreate(
                    resource_type="Provenance",
                    fhir_id=f"prov-{fhir_id}",
                    resource_json=provenance_json,
                    source="agent",
                    document_id=document_id,
                    case_id=case_id,
                ),
                actor_id="agent:clinical_context",
            )

B4. Dedup on re-processing same document

# app/services/fhir_service.py:create_fhir_resource()

# Before creating, check if resources from same document already exist
if data.document_id:
    existing = await db.execute(
        select(FHIRResource).where(
            FHIRResource.patient_id == patient_id,
            FHIRResource.document_id == data.document_id,
            FHIRResource.status == "active",
        )
    )
    for old in existing.scalars():
        old.status = "superseded"
    await db.flush()

This supersedes ALL previous resources from the same document before creating new ones. The re-processed document's findings replace the old ones cleanly.

B5. Document deletion cascade

# app/services/document_service.py — in soft_delete():

# After marking document as deleted, cascade to FHIR
await db.execute(
    update(FHIRResource)
    .where(FHIRResource.document_id == document_id)
    .values(status="entered_in_error",
            notes=f"Source document deleted at {datetime.utcnow()}")
)

B6. Chat extraction source attribution

# app/services/chat_extractor.py

async def extract_from_chat_turn(
    user_message: str,
    assistant_response: str,
    message_id: str | None = None,      # NEW
    conversation_id: str | None = None,  # NEW
) -> dict:
    result = ... # existing extraction
    result["_source"] = {
        "type": "conversation",
        "message_id": message_id,
        "conversation_id": conversation_id,
        "extracted_at": datetime.utcnow().isoformat(),
    }
    return result

Caller in case_orchestrator.py passes message_id from the current user message.


Part C — EHR Source Map

C1. EHR rebuild includes source attribution

# app/services/ehr_rebuild_service.py

# When building the EHR snapshot, include source metadata:
for resource in fhir_resources:
    entry = {
        "resource": resource.resource_json,
        "source": {
            "type": "document" if resource.document_id else "conversation",
            "document_id": resource.document_id,
            "document_name": doc_name_map.get(resource.document_id, "Unknown"),
            "extracted_at": resource.created_at.isoformat(),
            "case_id": resource.case_id,
        }
    }

doc_name_map: pre-loaded from document_references table by IDs found on FHIR resources. Single query.

C2. EHR API response includes source

The GET /cases/{id}/ehr response already returns the snapshot. The snapshot now includes source on each entry. No API schema change needed — frontend reads the new field.


Part D — Frontend Changes

D1. EHRPanel — source tag per finding

// src/components/panels/EHRPanel.tsx

// After each condition/observation row:
{source?.document_name && (
  <span className="text-[10px] text-chat-text-muted ml-1">
    from {source.document_name}
  </span>
)}

D2. FullEHRDrawer — source column in tables

Add a "Source" column to the conditions and observations tables showing the document name or "Conversation" with a timestamp.

D3. Document panel — extracted findings count

// src/components/panels/DocsPanel.tsx

// After each document row:
<span className="text-[10px] text-chat-text-secondary">
  {fhirCount} findings extracted
</span>

Requires a new field in the document checklist API response: fhir_resource_count per document.


Part E — Data Forwarding Audit

E1. Record what was sent

When case_orchestrator._handle_forwarding() runs:

# Create audit record
audit = DataForwardingAudit(
    case_id=case_id,
    patient_id=patient_id,
    provider_ids=[p["provider_id"] for p in selected_providers],
    consent_record_id=consent.id,
    fhir_resource_ids=[r.id for r in active_fhir_resources],
    document_ids=[d.id for d in case_documents],
    data_categories=["conditions", "medications", "observations",
                     "allergies", "procedures"],
    ehr_snapshot_hash=hashlib.sha256(
        json.dumps(ehr_snapshot, sort_keys=True).encode()
    ).hexdigest(),
    forwarded_at=datetime.utcnow(),
)
db.add(audit)

E2. Event logging

await emit_event(
    db, tenant_id, patient_id, case_id,
    event_type="fhir.data.forwarded",
    payload={
        "provider_count": len(selected_providers),
        "resource_count": len(active_fhir_resources),
        "document_count": len(case_documents),
        "categories": data_categories,
    },
)

Part F — Feature Flags

# config/feature_flags.yaml

fhir_provenance_v1:
  default: true
  description: "Store document_id + case_id on FHIR resources. Create
    Provenance FHIR resources for traceability. Show source attribution
    in EHR panel."

fhir_dedup_on_reprocess:
  default: true
  description: "When re-processing a document, supersede previous FHIR
    resources from that document instead of creating duplicates."

Implementation Checklist

Model guidance: Opus for architectural/compliance tasks, Sonnet for mechanical implementation.

Opus (data model, compliance, pipeline architecture)

  • [ ] FHIRResource model — add document_id + case_id columns (A1)
  • [ ] Alembic migration for new columns
  • [ ] FHIR Provenance resource creation in store_resources (B3)
  • [ ] Dedup logic on re-processing same document (B4)
  • [ ] Document deletion cascade to FHIR (B5)
  • [ ] Data forwarding audit model + recording (E1)
  • [ ] EHR source map design in ehr_rebuild_service (C1)

Sonnet (plumbing, frontend, config, tests)

  • [ ] FHIRResourceCreate schema — add 2 optional fields (A2)
  • [ ] ClinicalContextState — add 3 fields (B2)
  • [ ] document_processing.py — pass document_id/case_id to agent (B1)
  • [ ] chat_extractor.py — source attribution (B6)
  • [ ] fhir.py router — optional document_id in create API
  • [ ] Feature flags — 2 new flags (F)
  • [ ] EHRPanel.tsx — source tag per finding (D1)
  • [ ] FullEHRDrawer.tsx — source column in tables (D2)
  • [ ] DocsPanel.tsx — extracted findings count (D3)
  • [ ] Event logging for forwarding (E2)
  • [ ] DataForwardingAudit model (E1 model only)
  • [ ] Unit tests: 10 for provenance creation + dedup + cascade
  • [ ] E2E test: upload doc → verify FHIR has document_id
  • [ ] E2E test: delete doc → verify FHIR marked entered_in_error

Test Plan

Backend

  1. Upload document → Clinical Context Agent → verify FHIR resources have document_id set
  2. Upload same document again → verify old FHIR resources superseded, new ones created (dedup)
  3. Delete document → verify derived FHIR resources marked entered_in_error
  4. Chat message "I take metformin" → verify case.extra_metadata has _source.message_id
  5. EHR rebuild → verify snapshot includes source on each entry
  6. Forward to provider → verify data_forwarding_audits row created with correct resource IDs
  7. Provenance FHIR resource created for each clinical resource
  8. Provenance references correct document_id

Frontend

  1. EHR panel shows "from: blood_work.pdf" next to each condition
  2. FullEHRDrawer shows Source column in tables
  3. Document panel shows "3 findings extracted" per document

References

  • Session 35 pipeline audit: 6 gaps identified
  • FHIR R4 Provenance: https://hl7.org/fhir/R4/provenance.html
  • Porting spec (PR #85): meta.extension for case attribution
  • Gap report finding #17: no FHIR confidence field
  • app/agents/clinical_context.py:435 — current store_resources()
  • app/services/document_processing.py:175 — where document_id is lost