Skip to content

Document Pipeline

Overview

The Document Pipeline handles the complete lifecycle of medical documents in Curaway -- from patient upload through OCR extraction, clinical entity recognition, FHIR resource generation, and requirement matching. It is designed to be resilient, asynchronous, and privacy-aware.


Upload Flow

Architecture

sequenceDiagram
    participant Client as Frontend (Next.js)
    participant API as FastAPI
    participant R2 as Cloudflare R2
    participant QS as QStash
    participant OCR as OCR Pipeline
    participant CCA as Clinical Context Agent
    participant DB as PostgreSQL

    Client->>API: POST /uploads/presign
    API->>API: Validate: extension, size, consent
    API-->>Client: {presigned_url, document_id}
    Client->>R2: PUT file (presigned URL)
    Client->>API: POST /uploads/confirm
    API->>DB: Update document status → "uploaded"
    API->>QS: Enqueue OCR callback
    QS->>OCR: Trigger OCR processing
    OCR->>DB: Update document with extracted text
    OCR->>CCA: Trigger Clinical Context Agent
    CCA->>DB: Store FHIR resources
    CCA->>DB: Update document status → "processed"

Step 1: Presign Request

POST /api/v1/uploads/presign
class PresignRequest(BaseModel):
    case_id: UUID
    filename: str
    content_type: str           # "application/pdf", "image/jpeg", etc.
    file_size_bytes: int

class PresignResponse(BaseModel):
    document_id: UUID
    presigned_url: str          # R2 presigned PUT URL (15-minute expiry)
    expires_at: datetime

The API performs these validations before generating a presigned URL:

Validation Rule Error Code
File extension .pdf, .jpg, .jpeg, .png, .tiff, .dicom INVALID_EXTENSION
File size (minimum) >= 5 KB FILE_TOO_SMALL
File size (maximum) <= 20 MB FILE_TOO_LARGE
Content type Must match extension CONTENT_TYPE_MISMATCH
Patient consent Active consent record must exist CONSENT_REQUIRED
Case status Case must be in active workflow phase CASE_NOT_ACTIVE

Step 2: Direct Upload to R2

The client uploads directly to Cloudflare R2 using the presigned URL. This avoids routing large files through the API server:

// Frontend upload (Next.js)
const upload = async (file: File, presignedUrl: string) => {
  const response = await fetch(presignedUrl, {
    method: "PUT",
    body: file,
    headers: { "Content-Type": file.type },
  });
  if (!response.ok) throw new UploadError("R2 upload failed");
};

Step 3: Confirm Upload

POST /api/v1/uploads/confirm
class ConfirmRequest(BaseModel):
    document_id: UUID
    sha256_hash: str            # Client-computed hash for dedup

On confirmation, the API:

  1. Verifies the file exists in R2
  2. Checks SHA-256 hash for deduplication
  3. Updates document status to uploaded
  4. Enqueues OCR processing via QStash

Step 4: QStash OCR Callback

QStash delivers an async callback to the OCR processing endpoint:

@router.post("/internal/callbacks/ocr")
async def ocr_callback(
    request: QStashCallbackRequest,
    x_qstash_signature: str = Header(),
):
    """Process OCR for an uploaded document (QStash callback)."""
    verify_qstash_signature(request, x_qstash_signature)
    document = await document_service.get(request.document_id)
    await ocr_pipeline.process(document)

Why QStash?

OCR processing can take 5-30 seconds depending on document complexity. QStash provides reliable async delivery with automatic retries (3 attempts, exponential backoff), dead letter queues, and signature verification -- all on the free tier.


OCR Stack

The OCR pipeline uses an ordered fallback chain, trying each method in sequence until one succeeds:

graph TD
    A[Document Upload] --> B{PyMuPDF}
    B -->|Success| G[Extracted Text]
    B -->|Failure/Low Quality| C{Unstructured.io}
    C -->|Success| G
    C -->|Failure| D{Claude Vision}
    D -->|Success| G
    D -->|Failure| E[Manual Review Flag]

    style B fill:#008B8B,color:#fff
    style C fill:#4A90D9,color:#fff
    style D fill:#FF7F50,color:#fff
    style E fill:#FF0000,color:#fff

Method Comparison

Method Type Speed Cost Best For
PyMuPDF Synchronous, local ~1 second $0 Text-based PDFs, well-formatted documents
Unstructured.io API call ~5 seconds Free tier (1K pages/mo) Complex layouts, tables, multi-column
Claude Vision LLM API call ~10 seconds ~$0.02/page Scanned PDFs, handwritten notes, poor quality images

PyMuPDF (Primary)

import fitz  # PyMuPDF

async def extract_with_pymupdf(document_path: str) -> OCRResult:
    """Primary OCR: fast, free, synchronous text extraction."""
    doc = fitz.open(document_path)
    pages = []
    for page in doc:
        text = page.get_text("text")
        pages.append(PageResult(
            page_number=page.number + 1,
            text=text,
            confidence=estimate_text_quality(text),
        ))

    overall_confidence = sum(p.confidence for p in pages) / len(pages)

    if overall_confidence < 0.6:
        raise LowQualityError(f"PyMuPDF confidence {overall_confidence:.2f} below threshold")

    return OCRResult(
        method="pymupdf",
        pages=pages,
        confidence=overall_confidence,
    )

Unstructured.io (Fallback 1)

from unstructured.partition.auto import partition

async def extract_with_unstructured(document_path: str) -> OCRResult:
    """Fallback 1: Unstructured.io for complex layouts."""
    elements = partition(filename=document_path)
    text = "\n".join(str(el) for el in elements)

    return OCRResult(
        method="unstructured",
        pages=[PageResult(page_number=1, text=text, confidence=0.8)],
        confidence=0.8,
    )

Claude Vision (Fallback 2)

async def extract_with_claude_vision(document_path: str) -> OCRResult:
    """Fallback 2: Claude Vision for scanned/handwritten documents."""
    images = convert_pdf_to_images(document_path)

    pages = []
    for i, image in enumerate(images):
        response = await anthropic_client.messages.create(
            model="claude-sonnet-4-6-20250514",
            messages=[{
                "role": "user",
                "content": [
                    {"type": "image", "source": {"type": "base64", "data": image}},
                    {"type": "text", "text": VISION_OCR_PROMPT},
                ],
            }],
            max_tokens=4000,
        )
        pages.append(PageResult(
            page_number=i + 1,
            text=response.content[0].text,
            confidence=0.85,
        ))

    return OCRResult(method="claude_vision", pages=pages, confidence=0.85)

Claude Vision Cost

Claude Vision is the most expensive OCR method (~$0.02/page). It is only invoked when both PyMuPDF and Unstructured.io fail, which typically means the document is a scanned image or contains handwritten notes.


Clinical Context Agent Pipeline

After OCR extraction, the Clinical Context Agent processes the text through a 4-node LangGraph workflow (see Agent System for full details):

graph LR
    A[extract_clinical_entities] --> B[map_to_medical_codes]
    B --> C[generate_fhir_resources]
    C --> D[store_resources]

    style A fill:#008B8B,color:#fff
    style B fill:#008B8B,color:#fff
    style C fill:#008B8B,color:#fff
    style D fill:#008B8B,color:#fff

Entity Extraction Output

class ExtractedEntities(BaseModel):
    """Clinical entities extracted from a medical document."""
    conditions: list[ConditionEntity]       # Diagnoses, findings
    procedures: list[ProcedureEntity]       # Past/recommended procedures
    medications: list[MedicationEntity]     # Current medications
    lab_results: list[LabResult]            # Lab values with ranges
    vitals: list[VitalSign]                 # Blood pressure, heart rate, etc.
    allergies: list[AllergyEntity]
    imaging_findings: list[ImagingFinding]
    physician_notes: list[str]              # Free-text clinical notes
    document_date: Optional[date]
    patient_name_in_doc: Optional[str]      # For cross-reference validation
    laterality: Optional[str]               # "left", "right", "bilateral"

Embedding-Based Document Matching

Purpose

When a patient uploads a document, the system must determine which procedure requirements it satisfies. This is done through embedding-based matching against the requirement_embeddings Qdrant collection.

Flow

graph TD
    A[Extracted Document Text] --> B[Voyage AI Embedding]
    B --> C[Qdrant Cosine Similarity Search]
    C --> D{Confidence >= 0.85?}
    D -->|Yes| E[Auto-Match to Requirement]
    D -->|No| F{Confidence >= 0.60?}
    F -->|Yes| G[LLM Re-Ranker Verification]
    F -->|No| H[Unmatched - Flag for Review]
    G -->|Confirmed| E
    G -->|Rejected| H

    style D fill:#008B8B,color:#fff
    style F fill:#008B8B,color:#fff
    style G fill:#FF7F50,color:#fff

Matching Thresholds

Cosine Similarity Action Cost
>= 0.85 Auto-match (high confidence) $0
0.60 - 0.84 LLM re-ranker verification ~$0.005
< 0.60 Unmatched, flag for manual review $0

Re-Ranker Prompt

RERANKER_PROMPT = """
You are a medical document classifier. Given an extracted document summary
and a list of candidate requirements, determine which requirement (if any)
the document satisfies.

Document summary: {document_summary}

Candidate requirements:
{candidates}

For each candidate, respond with:
- requirement_id: The ID of the requirement
- confidence: 0.0 to 1.0
- reasoning: Brief explanation

Only return matches with confidence >= 0.70.
"""

Requirement Embeddings (70 Vectors)

The 70 requirement embedding vectors cover document types across all 12 supported procedures:

Category Count Examples
Blood Work 15 CBC, metabolic panel, coagulation, HbA1c, thyroid
Imaging 18 X-ray, MRI, CT, echocardiogram, angiogram, OPG
Cardiac Tests 8 ECG, stress test, echo, Holter monitor
Clearances 10 Cardiac clearance, dental clearance, anesthesia eval
Specialist Reports 12 Orthopedic assessment, oncology report, pulmonary function
Other 7 Vaccination records, insurance pre-auth, travel fitness certificate

Document Validation

Automated Validators

The document pipeline runs a series of automated validators that detect potential issues before the document enters the matching workflow:

class DocumentValidator:
    """Validates extracted document data for anomalies."""

    async def validate(
        self,
        document: Document,
        extracted: ExtractedEntities,
        case: Case,
    ) -> list[ValidationIssue]:
        issues = []
        issues.extend(self._check_laterality_mismatch(extracted, case))
        issues.extend(self._check_document_age(extracted))
        issues.extend(self._check_wrong_body_part(extracted, case))
        issues.extend(self._check_patient_name_mismatch(extracted, case))
        issues.extend(self._check_ocr_quality(document))
        return issues
Validator Description Severity
Laterality Mismatch Document says "left knee" but case is for "right knee" error
Document Age Document older than requirement's max_age_days warning
Wrong Body Part Hip X-ray uploaded for a cardiac case error
Patient Name Mismatch Name in document doesn't match patient record warning
OCR Quality OCR confidence below 0.5 on any page warning

Laterality Mismatch Example

def _check_laterality_mismatch(
    self,
    extracted: ExtractedEntities,
    case: Case,
) -> list[ValidationIssue]:
    """Detect left/right mismatch between document and case."""
    if not extracted.laterality or not case.laterality:
        return []

    if extracted.laterality != case.laterality:
        return [ValidationIssue(
            type="laterality_mismatch",
            severity="error",
            message=(
                f"Document indicates '{extracted.laterality}' side, "
                f"but case is for '{case.laterality}' side. "
                f"Please verify and upload the correct document."
            ),
        )]
    return []

Anomaly Detection Wired into Orchestrator

Validation issues are not just logged -- they are surfaced to the patient through the chat orchestrator. If an error-severity issue is detected, the orchestrator pauses the workflow and asks the patient to verify or re-upload the document.


Deduplication and Version Management

SHA-256 Deduplication

Every uploaded document has its SHA-256 hash computed client-side and verified server-side:

async def check_duplicate(
    tenant_id: str,
    case_id: UUID,
    sha256_hash: str,
) -> Optional[Document]:
    """Check if this exact document has already been uploaded."""
    existing = await db.fetch_one(
        """
        SELECT * FROM documents
        WHERE tenant_id = $1 AND case_id = $2 AND sha256_hash = $3
        AND status != 'deleted'
        """,
        tenant_id, case_id, sha256_hash,
    )
    return Document(**existing) if existing else None

If a duplicate is detected, the API returns the existing document instead of creating a new one.

Version Management (Supersede on Re-Upload)

When a patient uploads a newer version of a document type (e.g., updated blood work), the system supersedes the old version:

async def supersede_document(
    document_id: UUID,
    new_document_id: UUID,
    tenant_id: str,
):
    """Mark an existing document as superseded by a newer version."""
    await db.execute(
        """
        UPDATE documents
        SET status = 'superseded',
            superseded_by = $1,
            updated_at = NOW()
        WHERE id = $2 AND tenant_id = $3
        """,
        new_document_id, document_id, tenant_id,
    )

Document status lifecycle:

stateDiagram-v2
    [*] --> pending: Presign generated
    pending --> uploaded: Client confirms upload
    uploaded --> processing: QStash triggers OCR
    processing --> processed: OCR + extraction complete
    processing --> failed: OCR failure (all methods)
    processed --> matched: Requirement matched
    processed --> unmatched: No requirement match
    processed --> superseded: Newer version uploaded
    matched --> superseded: Newer version uploaded
    failed --> uploaded: Retry triggered

Frontend Validation

The frontend performs pre-upload validation to catch obvious issues before the network request:

const UPLOAD_CONFIG = {
  allowedExtensions: [".pdf", ".jpg", ".jpeg", ".png", ".tiff"],
  minSizeBytes: 5 * 1024,        // 5 KB
  maxSizeBytes: 20 * 1024 * 1024, // 20 MB
};

function validateFile(file: File): ValidationResult {
  const ext = getExtension(file.name).toLowerCase();

  if (!UPLOAD_CONFIG.allowedExtensions.includes(ext)) {
    return { valid: false, error: `Unsupported file type: ${ext}` };
  }
  if (file.size < UPLOAD_CONFIG.minSizeBytes) {
    return { valid: false, error: "File is too small (minimum 5 KB)" };
  }
  if (file.size > UPLOAD_CONFIG.maxSizeBytes) {
    return { valid: false, error: "File is too large (maximum 20 MB)" };
  }
  return { valid: true };
}

Error Toast Pattern

Validation errors are shown as inline toast notifications in the upload area:

const handleUpload = async (file: File) => {
  const validation = validateFile(file);
  if (!validation.valid) {
    toast.error(validation.error, {
      position: "bottom-center",
      duration: 5000,
    });
    return;
  }
  // Proceed with presign + upload flow
};

Pipeline Metrics

Metric Target Current (POC)
OCR success rate (any method) > 95% 98%
Average OCR time < 10 seconds 3 seconds (PyMuPDF)
Clinical extraction accuracy > 85% 88% (Claude Sonnet)
Requirement auto-match rate > 60% 65%
False positive match rate < 5% 3%
End-to-end pipeline time < 30 seconds 12 seconds average