FHIR Provenance & Document Traceability — Feature Spec¶
Date: 2026-04-10 (Session 35)
Status: Implemented in Session 35
Companion steer: ai-steer/fhir-provenance-steer.md
Summary¶
Every FHIR resource traces to its source — the document it was extracted from, the chat message it was mentioned in, or the API call that created it. Source attribution flows through the EHR snapshot to the frontend, so patients and providers see where each finding came from.
Ships behind fhir_provenance_v1 feature flag (default true).
Part A — Data Model Changes¶
A1. FHIRResource model — 2 new columns¶
# app/models/fhir_resource.py
# Add after 'created_by' field:
document_id: Mapped[str | None] = mapped_column(
String(36), nullable=True, index=True,
)
# The DocumentReference.id that this FHIR resource was extracted from.
# Null for chat-extracted or manually created resources.
case_id: Mapped[str | None] = mapped_column(
String(36), nullable=True, index=True,
)
# The case active when this resource was created.
# Used for case-scoped EHR queries (porting spec).
Migration: Alembic add_column — nullable, no default. Existing
rows stay null (legacy data, acceptable).
Index: ix_fhir_resources_document_id for cascade queries.
ix_fhir_resources_case_id for case-scoped EHR.
A2. FHIRResourceCreate schema — 2 new optional fields¶
# app/schemas/fhir.py
class FHIRResourceCreate(BaseModel):
# ... existing fields ...
document_id: str | None = None
case_id: str | None = None
A3. FHIR Provenance resource type¶
New resource type Provenance stored in fhir_resources table:
{
"resourceType": "Provenance",
"target": [{"reference": "Condition/agent-condition-abc123"}],
"recorded": "2026-04-10T12:00:00Z",
"agent": [{
"type": {"text": "assembler"},
"who": {"display": "Curaway Clinical Context Agent"}
}],
"entity": [{
"role": "source",
"what": {
"display": "blood_work_feb2024.pdf",
"identifier": {"value": "doc-uuid-here"}
}
}]
}
Created alongside each clinical FHIR resource in store_resources().
A4. Data forwarding audit model¶
# app/models/data_forwarding_audit.py (NEW)
class DataForwardingAudit(Base, UUIDPrimaryKeyMixin, TenantScopedMixin, TimestampMixin):
__tablename__ = "data_forwarding_audits"
case_id: Mapped[str] = mapped_column(String(36), nullable=False, index=True)
patient_id: Mapped[str] = mapped_column(String(36), nullable=False)
provider_ids: Mapped[list] = mapped_column(FlexibleJSON, nullable=False)
consent_record_id: Mapped[str] = mapped_column(String(36), nullable=False)
fhir_resource_ids: Mapped[list] = mapped_column(FlexibleJSON, nullable=False)
document_ids: Mapped[list] = mapped_column(FlexibleJSON, nullable=False)
data_categories: Mapped[list] = mapped_column(FlexibleJSON, nullable=False)
# ["conditions", "medications", "observations", "allergies", "procedures"]
ehr_snapshot_hash: Mapped[str] = mapped_column(String(64), nullable=False)
# SHA-256 of the EHR snapshot JSON at time of forwarding
forwarded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
Part B — Pipeline Changes¶
B1. Pass document_id through document processing pipeline¶
# app/services/document_processing.py
# Line ~175 — currently:
agent_result = await run_clinical_context_agent(
db=db, patient_id=patient_id, tenant_id=tenant_id,
text=extracted_text, report_type="general",
langfuse_handler=langfuse_handler,
)
# Change to:
agent_result = await run_clinical_context_agent(
db=db, patient_id=patient_id, tenant_id=tenant_id,
text=extracted_text, report_type="general",
document_id=doc_id, # NEW
case_id=active_case_id, # NEW — look up active case
document_filename=doc.original_filename, # NEW — for Provenance display
langfuse_handler=langfuse_handler,
)
Active case lookup: Query cases table for the most recent
non-forwarded case for this patient. If no active case, case_id=None.
B2. ClinicalContextState schema update¶
# app/agents/clinical_context.py
class ClinicalContextState(TypedDict):
patient_id: str
tenant_id: str
raw_text: str
report_type: str
document_id: str | None # NEW
case_id: str | None # NEW
document_filename: str | None # NEW — for Provenance.entity.what.display
# ... existing fields ...
B3. store_resources() — write document_id + Provenance¶
# app/agents/clinical_context.py:store_resources()
async def store_resources(state, db):
document_id = state.get("document_id")
case_id = state.get("case_id")
document_filename = state.get("document_filename", "Unknown document")
for resource_json in state["fhir_resources"]:
# 1. Create the clinical FHIR resource (existing)
data = FHIRResourceCreate(
resource_type=resource_type,
fhir_id=fhir_id,
resource_json=resource_json,
source="agent",
document_id=document_id, # NEW
case_id=case_id, # NEW
icd_codes=icd_codes,
snomed_codes=snomed_codes,
notes=f"Extracted from {document_filename}", # NEW — specific
)
resource = await fhir_service.create_fhir_resource(...)
# 2. Create FHIR Provenance resource (NEW)
if document_id:
provenance_json = {
"resourceType": "Provenance",
"target": [{"reference": f"{resource_type}/{fhir_id}"}],
"recorded": datetime.utcnow().isoformat() + "Z",
"agent": [{"type": {"text": "assembler"},
"who": {"display": "Curaway Clinical Context Agent"}}],
"entity": [{"role": "source",
"what": {"display": document_filename,
"identifier": {"value": document_id}}}],
}
await fhir_service.create_fhir_resource(
db=db, patient_id=state["patient_id"],
tenant_id=state["tenant_id"],
data=FHIRResourceCreate(
resource_type="Provenance",
fhir_id=f"prov-{fhir_id}",
resource_json=provenance_json,
source="agent",
document_id=document_id,
case_id=case_id,
),
actor_id="agent:clinical_context",
)
B4. Dedup on re-processing same document¶
# app/services/fhir_service.py:create_fhir_resource()
# Before creating, check if resources from same document already exist
if data.document_id:
existing = await db.execute(
select(FHIRResource).where(
FHIRResource.patient_id == patient_id,
FHIRResource.document_id == data.document_id,
FHIRResource.status == "active",
)
)
for old in existing.scalars():
old.status = "superseded"
await db.flush()
This supersedes ALL previous resources from the same document before creating new ones. The re-processed document's findings replace the old ones cleanly.
B5. Document deletion cascade¶
# app/services/document_service.py — in soft_delete():
# After marking document as deleted, cascade to FHIR
await db.execute(
update(FHIRResource)
.where(FHIRResource.document_id == document_id)
.values(status="entered_in_error",
notes=f"Source document deleted at {datetime.utcnow()}")
)
B6. Chat extraction source attribution¶
# app/services/chat_extractor.py
async def extract_from_chat_turn(
user_message: str,
assistant_response: str,
message_id: str | None = None, # NEW
conversation_id: str | None = None, # NEW
) -> dict:
result = ... # existing extraction
result["_source"] = {
"type": "conversation",
"message_id": message_id,
"conversation_id": conversation_id,
"extracted_at": datetime.utcnow().isoformat(),
}
return result
Caller in case_orchestrator.py passes message_id from the
current user message.
Part C — EHR Source Map¶
C1. EHR rebuild includes source attribution¶
# app/services/ehr_rebuild_service.py
# When building the EHR snapshot, include source metadata:
for resource in fhir_resources:
entry = {
"resource": resource.resource_json,
"source": {
"type": "document" if resource.document_id else "conversation",
"document_id": resource.document_id,
"document_name": doc_name_map.get(resource.document_id, "Unknown"),
"extracted_at": resource.created_at.isoformat(),
"case_id": resource.case_id,
}
}
doc_name_map: pre-loaded from document_references table by IDs
found on FHIR resources. Single query.
C2. EHR API response includes source¶
The GET /cases/{id}/ehr response already returns the snapshot.
The snapshot now includes source on each entry. No API schema
change needed — frontend reads the new field.
Part D — Frontend Changes¶
D1. EHRPanel — source tag per finding¶
// src/components/panels/EHRPanel.tsx
// After each condition/observation row:
{source?.document_name && (
<span className="text-[10px] text-chat-text-muted ml-1">
from {source.document_name}
</span>
)}
D2. FullEHRDrawer — source column in tables¶
Add a "Source" column to the conditions and observations tables showing the document name or "Conversation" with a timestamp.
D3. Document panel — extracted findings count¶
// src/components/panels/DocsPanel.tsx
// After each document row:
<span className="text-[10px] text-chat-text-secondary">
{fhirCount} findings extracted
</span>
Requires a new field in the document checklist API response:
fhir_resource_count per document.
Part E — Data Forwarding Audit¶
E1. Record what was sent¶
When case_orchestrator._handle_forwarding() runs:
# Create audit record
audit = DataForwardingAudit(
case_id=case_id,
patient_id=patient_id,
provider_ids=[p["provider_id"] for p in selected_providers],
consent_record_id=consent.id,
fhir_resource_ids=[r.id for r in active_fhir_resources],
document_ids=[d.id for d in case_documents],
data_categories=["conditions", "medications", "observations",
"allergies", "procedures"],
ehr_snapshot_hash=hashlib.sha256(
json.dumps(ehr_snapshot, sort_keys=True).encode()
).hexdigest(),
forwarded_at=datetime.utcnow(),
)
db.add(audit)
E2. Event logging¶
await emit_event(
db, tenant_id, patient_id, case_id,
event_type="fhir.data.forwarded",
payload={
"provider_count": len(selected_providers),
"resource_count": len(active_fhir_resources),
"document_count": len(case_documents),
"categories": data_categories,
},
)
Part F — Feature Flags¶
# config/feature_flags.yaml
fhir_provenance_v1:
default: true
description: "Store document_id + case_id on FHIR resources. Create
Provenance FHIR resources for traceability. Show source attribution
in EHR panel."
fhir_dedup_on_reprocess:
default: true
description: "When re-processing a document, supersede previous FHIR
resources from that document instead of creating duplicates."
Implementation Checklist¶
Model guidance: Opus for architectural/compliance tasks, Sonnet for mechanical implementation.
Opus (data model, compliance, pipeline architecture)¶
- [ ] FHIRResource model — add
document_id+case_idcolumns (A1) - [ ] Alembic migration for new columns
- [ ] FHIR Provenance resource creation in store_resources (B3)
- [ ] Dedup logic on re-processing same document (B4)
- [ ] Document deletion cascade to FHIR (B5)
- [ ] Data forwarding audit model + recording (E1)
- [ ] EHR source map design in ehr_rebuild_service (C1)
Sonnet (plumbing, frontend, config, tests)¶
- [ ] FHIRResourceCreate schema — add 2 optional fields (A2)
- [ ] ClinicalContextState — add 3 fields (B2)
- [ ] document_processing.py — pass document_id/case_id to agent (B1)
- [ ] chat_extractor.py — source attribution (B6)
- [ ] fhir.py router — optional document_id in create API
- [ ] Feature flags — 2 new flags (F)
- [ ] EHRPanel.tsx — source tag per finding (D1)
- [ ] FullEHRDrawer.tsx — source column in tables (D2)
- [ ] DocsPanel.tsx — extracted findings count (D3)
- [ ] Event logging for forwarding (E2)
- [ ] DataForwardingAudit model (E1 model only)
- [ ] Unit tests: 10 for provenance creation + dedup + cascade
- [ ] E2E test: upload doc → verify FHIR has document_id
- [ ] E2E test: delete doc → verify FHIR marked entered_in_error
Test Plan¶
Backend¶
- Upload document → Clinical Context Agent → verify FHIR resources
have
document_idset - Upload same document again → verify old FHIR resources superseded, new ones created (dedup)
- Delete document → verify derived FHIR resources marked
entered_in_error - Chat message "I take metformin" → verify case.extra_metadata has
_source.message_id - EHR rebuild → verify snapshot includes
sourceon each entry - Forward to provider → verify
data_forwarding_auditsrow created with correct resource IDs - Provenance FHIR resource created for each clinical resource
- Provenance references correct document_id
Frontend¶
- EHR panel shows "from: blood_work.pdf" next to each condition
- FullEHRDrawer shows Source column in tables
- Document panel shows "3 findings extracted" per document
References¶
- Session 35 pipeline audit: 6 gaps identified
- FHIR R4 Provenance: https://hl7.org/fhir/R4/provenance.html
- Porting spec (PR #85):
meta.extensionfor case attribution - Gap report finding #17: no FHIR confidence field
app/agents/clinical_context.py:435— current store_resources()app/services/document_processing.py:175— where document_id is lost