Document Pipeline¶
Overview¶
The Document Pipeline handles the complete lifecycle of medical documents in Curaway -- from patient upload through OCR extraction, clinical entity recognition, FHIR resource generation, and requirement matching. It is designed to be resilient, asynchronous, and privacy-aware.
Upload Flow¶
Architecture¶
sequenceDiagram
participant Client as Frontend (Next.js)
participant API as FastAPI
participant R2 as Cloudflare R2
participant QS as QStash
participant OCR as OCR Pipeline
participant CCA as Clinical Context Agent
participant DB as PostgreSQL
Client->>API: POST /uploads/presign
API->>API: Validate: extension, size, consent
API-->>Client: {presigned_url, document_id}
Client->>R2: PUT file (presigned URL)
Client->>API: POST /uploads/confirm
API->>DB: Update document status → "uploaded"
API->>QS: Enqueue OCR callback
QS->>OCR: Trigger OCR processing
OCR->>DB: Update document with extracted text
OCR->>CCA: Trigger Clinical Context Agent
CCA->>DB: Store FHIR resources
CCA->>DB: Update document status → "processed"
Step 1: Presign Request¶
class PresignRequest(BaseModel):
case_id: UUID
filename: str
content_type: str # "application/pdf", "image/jpeg", etc.
file_size_bytes: int
class PresignResponse(BaseModel):
document_id: UUID
presigned_url: str # R2 presigned PUT URL (15-minute expiry)
expires_at: datetime
The API performs these validations before generating a presigned URL:
| Validation | Rule | Error Code |
|---|---|---|
| File extension | .pdf, .jpg, .jpeg, .png, .tiff, .dicom |
INVALID_EXTENSION |
| File size (minimum) | >= 5 KB | FILE_TOO_SMALL |
| File size (maximum) | <= 20 MB | FILE_TOO_LARGE |
| Content type | Must match extension | CONTENT_TYPE_MISMATCH |
| Patient consent | Active consent record must exist | CONSENT_REQUIRED |
| Case status | Case must be in active workflow phase | CASE_NOT_ACTIVE |
Step 2: Direct Upload to R2¶
The client uploads directly to Cloudflare R2 using the presigned URL. This avoids routing large files through the API server:
// Frontend upload (Next.js)
const upload = async (file: File, presignedUrl: string) => {
const response = await fetch(presignedUrl, {
method: "PUT",
body: file,
headers: { "Content-Type": file.type },
});
if (!response.ok) throw new UploadError("R2 upload failed");
};
Step 3: Confirm Upload¶
class ConfirmRequest(BaseModel):
document_id: UUID
sha256_hash: str # Client-computed hash for dedup
On confirmation, the API:
- Verifies the file exists in R2
- Checks SHA-256 hash for deduplication
- Updates document status to
uploaded - Enqueues OCR processing via QStash
Step 4: QStash OCR Callback¶
QStash delivers an async callback to the OCR processing endpoint:
@router.post("/internal/callbacks/ocr")
async def ocr_callback(
request: QStashCallbackRequest,
x_qstash_signature: str = Header(),
):
"""Process OCR for an uploaded document (QStash callback)."""
verify_qstash_signature(request, x_qstash_signature)
document = await document_service.get(request.document_id)
await ocr_pipeline.process(document)
Why QStash?
OCR processing can take 5-30 seconds depending on document complexity. QStash provides reliable async delivery with automatic retries (3 attempts, exponential backoff), dead letter queues, and signature verification -- all on the free tier.
OCR Stack¶
The OCR pipeline uses an ordered fallback chain, trying each method in sequence until one succeeds:
graph TD
A[Document Upload] --> B{PyMuPDF}
B -->|Success| G[Extracted Text]
B -->|Failure/Low Quality| C{Unstructured.io}
C -->|Success| G
C -->|Failure| D{Claude Vision}
D -->|Success| G
D -->|Failure| E[Manual Review Flag]
style B fill:#008B8B,color:#fff
style C fill:#4A90D9,color:#fff
style D fill:#FF7F50,color:#fff
style E fill:#FF0000,color:#fff
Method Comparison¶
| Method | Type | Speed | Cost | Best For |
|---|---|---|---|---|
| PyMuPDF | Synchronous, local | ~1 second | $0 | Text-based PDFs, well-formatted documents |
| Unstructured.io | API call | ~5 seconds | Free tier (1K pages/mo) | Complex layouts, tables, multi-column |
| Claude Vision | LLM API call | ~10 seconds | ~$0.02/page | Scanned PDFs, handwritten notes, poor quality images |
PyMuPDF (Primary)¶
import fitz # PyMuPDF
async def extract_with_pymupdf(document_path: str) -> OCRResult:
"""Primary OCR: fast, free, synchronous text extraction."""
doc = fitz.open(document_path)
pages = []
for page in doc:
text = page.get_text("text")
pages.append(PageResult(
page_number=page.number + 1,
text=text,
confidence=estimate_text_quality(text),
))
overall_confidence = sum(p.confidence for p in pages) / len(pages)
if overall_confidence < 0.6:
raise LowQualityError(f"PyMuPDF confidence {overall_confidence:.2f} below threshold")
return OCRResult(
method="pymupdf",
pages=pages,
confidence=overall_confidence,
)
Unstructured.io (Fallback 1)¶
from unstructured.partition.auto import partition
async def extract_with_unstructured(document_path: str) -> OCRResult:
"""Fallback 1: Unstructured.io for complex layouts."""
elements = partition(filename=document_path)
text = "\n".join(str(el) for el in elements)
return OCRResult(
method="unstructured",
pages=[PageResult(page_number=1, text=text, confidence=0.8)],
confidence=0.8,
)
Claude Vision (Fallback 2)¶
async def extract_with_claude_vision(document_path: str) -> OCRResult:
"""Fallback 2: Claude Vision for scanned/handwritten documents."""
images = convert_pdf_to_images(document_path)
pages = []
for i, image in enumerate(images):
response = await anthropic_client.messages.create(
model="claude-sonnet-4-6-20250514",
messages=[{
"role": "user",
"content": [
{"type": "image", "source": {"type": "base64", "data": image}},
{"type": "text", "text": VISION_OCR_PROMPT},
],
}],
max_tokens=4000,
)
pages.append(PageResult(
page_number=i + 1,
text=response.content[0].text,
confidence=0.85,
))
return OCRResult(method="claude_vision", pages=pages, confidence=0.85)
Claude Vision Cost
Claude Vision is the most expensive OCR method (~$0.02/page). It is only invoked when both PyMuPDF and Unstructured.io fail, which typically means the document is a scanned image or contains handwritten notes.
Clinical Context Agent Pipeline¶
After OCR extraction, the Clinical Context Agent processes the text through a 4-node LangGraph workflow (see Agent System for full details):
graph LR
A[extract_clinical_entities] --> B[map_to_medical_codes]
B --> C[generate_fhir_resources]
C --> D[store_resources]
style A fill:#008B8B,color:#fff
style B fill:#008B8B,color:#fff
style C fill:#008B8B,color:#fff
style D fill:#008B8B,color:#fff
Entity Extraction Output¶
class ExtractedEntities(BaseModel):
"""Clinical entities extracted from a medical document."""
conditions: list[ConditionEntity] # Diagnoses, findings
procedures: list[ProcedureEntity] # Past/recommended procedures
medications: list[MedicationEntity] # Current medications
lab_results: list[LabResult] # Lab values with ranges
vitals: list[VitalSign] # Blood pressure, heart rate, etc.
allergies: list[AllergyEntity]
imaging_findings: list[ImagingFinding]
physician_notes: list[str] # Free-text clinical notes
document_date: Optional[date]
patient_name_in_doc: Optional[str] # For cross-reference validation
laterality: Optional[str] # "left", "right", "bilateral"
Embedding-Based Document Matching¶
Purpose¶
When a patient uploads a document, the system must determine which procedure requirements it satisfies. This is done through embedding-based matching against the requirement_embeddings Qdrant collection.
Flow¶
graph TD
A[Extracted Document Text] --> B[Voyage AI Embedding]
B --> C[Qdrant Cosine Similarity Search]
C --> D{Confidence >= 0.85?}
D -->|Yes| E[Auto-Match to Requirement]
D -->|No| F{Confidence >= 0.60?}
F -->|Yes| G[LLM Re-Ranker Verification]
F -->|No| H[Unmatched - Flag for Review]
G -->|Confirmed| E
G -->|Rejected| H
style D fill:#008B8B,color:#fff
style F fill:#008B8B,color:#fff
style G fill:#FF7F50,color:#fff
Matching Thresholds¶
| Cosine Similarity | Action | Cost |
|---|---|---|
| >= 0.85 | Auto-match (high confidence) | $0 |
| 0.60 - 0.84 | LLM re-ranker verification | ~$0.005 |
| < 0.60 | Unmatched, flag for manual review | $0 |
Re-Ranker Prompt¶
RERANKER_PROMPT = """
You are a medical document classifier. Given an extracted document summary
and a list of candidate requirements, determine which requirement (if any)
the document satisfies.
Document summary: {document_summary}
Candidate requirements:
{candidates}
For each candidate, respond with:
- requirement_id: The ID of the requirement
- confidence: 0.0 to 1.0
- reasoning: Brief explanation
Only return matches with confidence >= 0.70.
"""
Requirement Embeddings (70 Vectors)¶
The 70 requirement embedding vectors cover document types across all 12 supported procedures:
| Category | Count | Examples |
|---|---|---|
| Blood Work | 15 | CBC, metabolic panel, coagulation, HbA1c, thyroid |
| Imaging | 18 | X-ray, MRI, CT, echocardiogram, angiogram, OPG |
| Cardiac Tests | 8 | ECG, stress test, echo, Holter monitor |
| Clearances | 10 | Cardiac clearance, dental clearance, anesthesia eval |
| Specialist Reports | 12 | Orthopedic assessment, oncology report, pulmonary function |
| Other | 7 | Vaccination records, insurance pre-auth, travel fitness certificate |
Document Validation¶
Automated Validators¶
The document pipeline runs a series of automated validators that detect potential issues before the document enters the matching workflow:
class DocumentValidator:
"""Validates extracted document data for anomalies."""
async def validate(
self,
document: Document,
extracted: ExtractedEntities,
case: Case,
) -> list[ValidationIssue]:
issues = []
issues.extend(self._check_laterality_mismatch(extracted, case))
issues.extend(self._check_document_age(extracted))
issues.extend(self._check_wrong_body_part(extracted, case))
issues.extend(self._check_patient_name_mismatch(extracted, case))
issues.extend(self._check_ocr_quality(document))
return issues
| Validator | Description | Severity |
|---|---|---|
| Laterality Mismatch | Document says "left knee" but case is for "right knee" | error |
| Document Age | Document older than requirement's max_age_days |
warning |
| Wrong Body Part | Hip X-ray uploaded for a cardiac case | error |
| Patient Name Mismatch | Name in document doesn't match patient record | warning |
| OCR Quality | OCR confidence below 0.5 on any page | warning |
Laterality Mismatch Example¶
def _check_laterality_mismatch(
self,
extracted: ExtractedEntities,
case: Case,
) -> list[ValidationIssue]:
"""Detect left/right mismatch between document and case."""
if not extracted.laterality or not case.laterality:
return []
if extracted.laterality != case.laterality:
return [ValidationIssue(
type="laterality_mismatch",
severity="error",
message=(
f"Document indicates '{extracted.laterality}' side, "
f"but case is for '{case.laterality}' side. "
f"Please verify and upload the correct document."
),
)]
return []
Anomaly Detection Wired into Orchestrator
Validation issues are not just logged -- they are surfaced to the patient through the chat orchestrator. If an error-severity issue is detected, the orchestrator pauses the workflow and asks the patient to verify or re-upload the document.
Deduplication and Version Management¶
SHA-256 Deduplication¶
Every uploaded document has its SHA-256 hash computed client-side and verified server-side:
async def check_duplicate(
tenant_id: str,
case_id: UUID,
sha256_hash: str,
) -> Optional[Document]:
"""Check if this exact document has already been uploaded."""
existing = await db.fetch_one(
"""
SELECT * FROM documents
WHERE tenant_id = $1 AND case_id = $2 AND sha256_hash = $3
AND status != 'deleted'
""",
tenant_id, case_id, sha256_hash,
)
return Document(**existing) if existing else None
If a duplicate is detected, the API returns the existing document instead of creating a new one.
Version Management (Supersede on Re-Upload)¶
When a patient uploads a newer version of a document type (e.g., updated blood work), the system supersedes the old version:
async def supersede_document(
document_id: UUID,
new_document_id: UUID,
tenant_id: str,
):
"""Mark an existing document as superseded by a newer version."""
await db.execute(
"""
UPDATE documents
SET status = 'superseded',
superseded_by = $1,
updated_at = NOW()
WHERE id = $2 AND tenant_id = $3
""",
new_document_id, document_id, tenant_id,
)
Document status lifecycle:
stateDiagram-v2
[*] --> pending: Presign generated
pending --> uploaded: Client confirms upload
uploaded --> processing: QStash triggers OCR
processing --> processed: OCR + extraction complete
processing --> failed: OCR failure (all methods)
processed --> matched: Requirement matched
processed --> unmatched: No requirement match
processed --> superseded: Newer version uploaded
matched --> superseded: Newer version uploaded
failed --> uploaded: Retry triggered
Frontend Validation¶
The frontend performs pre-upload validation to catch obvious issues before the network request:
const UPLOAD_CONFIG = {
allowedExtensions: [".pdf", ".jpg", ".jpeg", ".png", ".tiff"],
minSizeBytes: 5 * 1024, // 5 KB
maxSizeBytes: 20 * 1024 * 1024, // 20 MB
};
function validateFile(file: File): ValidationResult {
const ext = getExtension(file.name).toLowerCase();
if (!UPLOAD_CONFIG.allowedExtensions.includes(ext)) {
return { valid: false, error: `Unsupported file type: ${ext}` };
}
if (file.size < UPLOAD_CONFIG.minSizeBytes) {
return { valid: false, error: "File is too small (minimum 5 KB)" };
}
if (file.size > UPLOAD_CONFIG.maxSizeBytes) {
return { valid: false, error: "File is too large (maximum 20 MB)" };
}
return { valid: true };
}
Error Toast Pattern¶
Validation errors are shown as inline toast notifications in the upload area:
const handleUpload = async (file: File) => {
const validation = validateFile(file);
if (!validation.valid) {
toast.error(validation.error, {
position: "bottom-center",
duration: 5000,
});
return;
}
// Proceed with presign + upload flow
};
Pipeline Metrics¶
| Metric | Target | Current (POC) |
|---|---|---|
| OCR success rate (any method) | > 95% | 98% |
| Average OCR time | < 10 seconds | 3 seconds (PyMuPDF) |
| Clinical extraction accuracy | > 85% | 88% (Claude Sonnet) |
| Requirement auto-match rate | > 60% | 65% |
| False positive match rate | < 5% | 3% |
| End-to-end pipeline time | < 30 seconds | 12 seconds average |