Skip to content

Wave 1 Implementation Spec — Intake Restructuring

Status: Implemented in Session 43 Date: 2026-04-17 Session: 42 Design source: docs/specs/platform-restructuring-design.md (Parts 1-3, 7-11, 17) Depends on: Wave 0 complete (DAO Layer + RLS + LLM Gateway) Effort: 3-4 weeks UI/UX: None — pure backend. Frontend continues unchanged.


Overview

Restructure the intake pipeline from a phase-based orchestrator into a layer-aware Triage Agent with parallel extractors and three-score system (PFS/HSS/FMS). The existing chat API surface is unchanged — this is an internal architecture evolution.


Implementation Order

Execute sequentially — each step depends on the previous:

Step What New files Modified files Effort
1 Layer state model + Alembic migration 1 migration, 1 module case model 1 day
2 Scoring config + pure function scorers 1 YAML, 3 modules 1 day
3 Layer extractors (5) 5 modules 2-3 days
4 Prompt layer contexts 7 YAML, 1 module update prompt_loader.py 1 day
5 Triage Agent as LangGraph graph 1 module orchestrator.py 2-3 days
6 Audit tables + Alembic migration 1 migration, 3 modules 1 day
7 LangGraph checkpointing 1 module database.py 1 day
8 PFS remediation behavior triage agent prompts 1 day
9 Tests 8+ test files 2 days
10 Integration + verification 1 day

Step 1: Layer State Model

New: app/services/layer_state.py

Manages the patient's 5-layer intake state. Pure data — no LLM calls.

"""
Curaway — Layer State Management.

Tracks completion of 5 intake dimensions. Layers are independent —
completion is a high-water mark, not a linear pipeline.
"""

from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class LayerName(str, Enum):
    INTENT = "intent_capture"
    MEDICAL = "medical_status"
    TRAVEL = "travel_readiness"
    LOGISTICS = "logistics"
    FINANCIAL = "financial_readiness"


class LayerStatus(str, Enum):
    NOT_STARTED = "not_started"
    IN_PROGRESS = "in_progress"
    COMPLETE = "complete"


@dataclass
class LayerState:
    status: LayerStatus = LayerStatus.NOT_STARTED
    completion: float = 0.0
    data: dict[str, Any] = field(default_factory=dict)
    summary_card_fired: bool = False


@dataclass
class PatientLayerState:
    """Full layer state for a patient's intake journey."""
    intent_capture: LayerState = field(default_factory=LayerState)
    medical_status: LayerState = field(default_factory=LayerState)
    travel_readiness: LayerState = field(default_factory=LayerState)
    logistics: LayerState = field(default_factory=LayerState)
    financial_readiness: LayerState = field(default_factory=LayerState)

    def get_layer(self, name: LayerName) -> LayerState:
        return getattr(self, name.value)

    def set_layer(self, name: LayerName, state: LayerState) -> None:
        setattr(self, name.value, state)

    def to_dict(self) -> dict:
        return {
            name.value: {
                "status": self.get_layer(name).status.value,
                "completion": self.get_layer(name).completion,
                "data": self.get_layer(name).data,
                "summary_card_fired": self.get_layer(name).summary_card_fired,
            }
            for name in LayerName
        }

    @classmethod
    def from_dict(cls, data: dict) -> "PatientLayerState":
        state = cls()
        for name in LayerName:
            if name.value in data:
                layer_data = data[name.value]
                setattr(state, name.value, LayerState(
                    status=LayerStatus(layer_data.get("status", "not_started")),
                    completion=layer_data.get("completion", 0.0),
                    data=layer_data.get("data", {}),
                    summary_card_fired=layer_data.get("summary_card_fired", False),
                ))
        return state

    def all_above_threshold(self, threshold: float, exclude: list[LayerName] | None = None) -> bool:
        """Check if all layers (except excluded) are above a completion threshold."""
        for name in LayerName:
            if exclude and name in exclude:
                continue
            if self.get_layer(name).completion < threshold:
                return False
        return True

Alembic migration: Add layer_state column to cases

# alembic/versions/xxxx_add_layer_state_to_cases.py

def upgrade():
    op.add_column('cases', sa.Column('layer_state', FlexibleJSON, nullable=True))

def downgrade():
    op.drop_column('cases', 'layer_state')

The layer_state column stores PatientLayerState.to_dict() as JSONB alongside the existing workflow_state. Both coexist during transition — workflow_state for backward compatibility, layer_state for the new intake.


Step 2: Scoring Config + Scorers

New: config/scoring.yaml

Full YAML from platform-restructuring-design.md Part 9. All weights configurable, Flagsmith override per key.

New: app/services/pfs_scorer.py

"""PFS — Patient Fitness Score (0-100). Pure function, deterministic."""

def compute_pfs(
    layer_state: dict,
    risk_items: list[dict],
    config: dict,
) -> dict:
    """Compute PFS from layer data + risk assessor output.

    Returns: {
        "score": 78,
        "band": "conditionally_ready",
        "transport_tier": "T1",
        "blocking_flags": [],
        "component_breakdown": {"medical": 85, "travel": 100, ...},
        "gaps": ["passport_timeline_tight"]
    }
    """

New: app/services/hss_scorer.py

Reframes existing matching engine output as HSS (0-100). Thin wrapper.

New: app/services/fms_scorer.py

Sigmoid combination of PFS + HSS + preference alignment + historical conversion.


Step 3: Layer Extractors

New: app/services/extractors/intent_extractor.py

Structured output from conversation turn → Layer 1 data (case_type, emotional_state, urgency_score, decision_stage, trigger_event).

New: app/services/extractors/travel_extractor.py

Structured output → Layer 3 data (mobility, oxygen, hospitalization, transport_tier).

New: app/services/extractors/logistics_extractor.py

Structured output → Layer 4 data (country, passport, visa, timeline, companion).

New: app/services/extractors/financial_extractor.py

Structured output → Layer 5 data (funding_source, budget_range, flexibility, insurance).

Existing: app/agents/clinical_context.py

Already handles Layer 2 (medical extraction). Wired as a LangGraph node — no new code, just integration.

Extractor pattern (all 4 new extractors follow this):

"""
Curaway — Intent Capture Extractor (Layer 1).

Pure function: (conversation_turn, patient_state) → structured_data_delta.
Uses structured output (JSON schema enforcement) via llm_gateway.invoke().
"""

from app.services.llm_gateway import invoke as llm_invoke
from app.services.prompt_loader import load_prompt


EXTRACTION_SCHEMA = {
    "type": "object",
    "properties": {
        "case_type": {"type": "string", "enum": ["elective", "selective", "urgent"]},
        "emotional_state": {
            "type": "object",
            "properties": {
                "readiness": {"type": "string"},
                "primary_fear": {"type": "string"},
                "primary_hope": {"type": "string"},
                "fear_intensity": {"type": "number"},
                "hope_intensity": {"type": "number"},
            }
        },
        # ... full schema from design Part 7
    }
}


async def extract_intent(
    message_text: str,
    current_layer_data: dict,
    patient_context: str,
) -> dict:
    """Extract Layer 1 signals from a conversation turn.

    Returns delta — only newly extracted signals, not the full layer.
    Caller merges delta into existing layer state.
    """
    prompt = load_prompt("intent_extraction", "en", version="v1")
    # ... build messages, call llm_invoke with expects_json=True
    # ... parse response, return delta

Step 4: Prompt Layer Contexts

New: config/prompts/layer_contexts/ (7 files)

File Purpose
intent_capture.yaml Guide agent to capture why, fears, hopes
medical_status.yaml Guide agent to gather clinical data (≈ existing intake.yaml)
travel_readiness.yaml Guide agent to assess travel fitness
logistics.yaml Guide agent to capture passport, visa, timeline
financial_readiness.yaml Guide agent to understand budget (with market range anchoring)
remediation.yaml Guide agent through PFS gap remediation
multi_layer.yaml Guide agent when patient bounces between layers

New: config/prompts/base/ (4 extractor prompts)

File Purpose
intent_extraction_v1.yaml Base prompt for intent extractor
travel_extraction_v1.yaml Base prompt for travel extractor
logistics_extraction_v1.yaml Base prompt for logistics extractor
financial_extraction_v1.yaml Base prompt for financial extractor

New: config/prompts/examples/en/ (4 example files)

One example file per extractor, following existing patterns.

Modified: app/services/prompt_loader.py

Add load_layer_context(active_layer, layer_state, pfs_band) (~50 lines). Same pattern as load_phase_context().


Step 5: Triage Agent as LangGraph Graph

IMPORTANT — Concurrency model (from Step 1 code review):

PatientLayerState is NOT thread-safe. The run_extractors parallel node must NOT share a single PatientLayerState instance across concurrent extractors. Use LangGraph's reducer pattern:

from typing import Annotated
from operator import methodcaller  # or a custom reducer


def merge_layer_deltas(current: dict, delta: dict) -> dict:
    """LangGraph reducer — merges per-extractor deltas into layer_state dict."""
    state = PatientLayerState.from_dict(current)
    # Apply delta — delta is {layer_name: {"completion": ..., "data_delta": ...}}
    for layer_name, layer_delta in delta.items():
        state.update_layer(
            LayerName(layer_name),
            completion=layer_delta.get("completion"),
            data_delta=layer_delta.get("data_delta"),
        )
    return state.to_dict()


class TriageState(TypedDict):
    layer_state: Annotated[dict, merge_layer_deltas]
    # ... other fields

Each extractor node returns a delta, not the full layer_state. LangGraph applies the reducer to combine parallel extractor outputs safely.


New: app/agents/triage_agent.py

The core of Wave 1. Replaces the phase-routing logic in the orchestrator with a LangGraph compiled graph.

"""
Curaway — Triage Agent (A3 Model).

Supervisor pattern: one conversational agent + parallel extractors.
Code nodes handle scoring and routing (deterministic).
LLM nodes handle conversation and extraction (non-deterministic).

Graph topology:
  triage_conversation → parallel_extractors → pfs_scorer → route_by_pfs
"""

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver


class TriageState(TypedDict):
    """State that flows through the triage graph."""
    messages: list[BaseMessage]
    layer_state: dict
    pfs: dict | None
    hss: dict | None
    fms: float | None
    active_layer: str
    response: str | None
    case_id: str
    patient_id: str
    tenant_id: str


def build_triage_graph(checkpointer=None):
    graph = StateGraph(TriageState)

    # Nodes
    graph.add_node("choose_layer", choose_next_layer)       # Code: adaptive routing
    graph.add_node("generate_response", generate_response)  # LLM: conversation
    graph.add_node("run_extractors", run_extractors)        # LLM: parallel extraction
    graph.add_node("compute_pfs", compute_pfs_node)         # Code: deterministic
    graph.add_node("check_gates", check_matching_gates)     # Code: threshold check

    # Edges
    graph.add_edge("choose_layer", "generate_response")
    graph.add_edge("generate_response", "run_extractors")
    graph.add_edge("run_extractors", "compute_pfs")
    graph.add_conditional_edges(
        "compute_pfs",
        route_by_pfs_band,
        {
            "continue": "choose_layer",   # PFS < threshold, keep collecting
            "matching_ready": END,         # PFS >= 60, layers >= 0.6
            "remediation": "choose_layer", # PFS 40-59, remediation prompts
            "not_ready": END,              # PFS < 40, honest redirect
        }
    )

    graph.set_entry_point("choose_layer")

    if checkpointer:
        return graph.compile(checkpointer=checkpointer)
    return graph.compile()

Modified: app/agents/orchestrator.py

The orchestrator routes to the Triage Agent for intake (replacing the current phase-based routing). Other flows (matching, explanation, consent) stay unchanged.

# In orchestrator, replace phase routing:
if case.status == "intake":
    # Old: route to identify_procedure / records_first / intake phases
    # New: route to triage agent
    from app.agents.triage_agent import run_triage_turn
    return await run_triage_turn(state)

Feature flag: triage_agent_v3

When triage_agent_v3 = true, intake uses the new Triage Agent. When false, the existing orchestrator phases run (backward compatible).


Step 6: Audit Tables

Alembic migration: 3 new tables

def upgrade():
    # graph_node_audit_log
    op.create_table(
        'graph_node_audit_log',
        sa.Column('id', sa.String(36), primary_key=True),
        sa.Column('case_id', sa.String(36), nullable=False, index=True),
        sa.Column('turn_number', sa.Integer, nullable=False),
        sa.Column('node_name', sa.String(100), nullable=False),
        sa.Column('node_type', sa.String(20), nullable=False),  # llm/extractor/scorer/router
        sa.Column('input_hash', sa.String(64)),
        sa.Column('output_hash', sa.String(64)),
        sa.Column('state_before', FlexibleJSON),
        sa.Column('state_after', FlexibleJSON),
        sa.Column('decision_made', sa.String(100)),
        sa.Column('decision_reason', sa.Text),
        sa.Column('model_used', sa.String(50)),
        sa.Column('duration_ms', sa.Integer),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
    )

    # scoring_audit_log
    op.create_table(
        'scoring_audit_log',
        sa.Column('id', sa.String(36), primary_key=True),
        sa.Column('case_id', sa.String(36), nullable=False, index=True),
        sa.Column('score_type', sa.String(10), nullable=False),  # pfs/hss/fms
        sa.Column('score_value', sa.Float, nullable=False),
        sa.Column('config_version', sa.String(64)),
        sa.Column('inputs', FlexibleJSON),
        sa.Column('component_breakdown', FlexibleJSON),
        sa.Column('threshold_crossed', sa.Boolean, default=False),
        sa.Column('threshold_direction', sa.String(10)),
        sa.Column('previous_score', sa.Float),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
    )

    # extraction_audit_log
    op.create_table(
        'extraction_audit_log',
        sa.Column('id', sa.String(36), primary_key=True),
        sa.Column('case_id', sa.String(36), nullable=False, index=True),
        sa.Column('turn_number', sa.Integer, nullable=False),
        sa.Column('layer', sa.String(30), nullable=False),
        sa.Column('extracted_signals', FlexibleJSON),
        sa.Column('confidence_scores', FlexibleJSON),
        sa.Column('completion_before', sa.Float),
        sa.Column('completion_after', sa.Float),
        sa.Column('model_used', sa.String(50)),
        sa.Column('input_text', sa.Text),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
    )

New models: app/models/graph_node_audit.py, app/models/scoring_audit.py, app/models/extraction_audit.py

Append-only. Repositories expose create() and list_by_case() only.


Step 7: LangGraph Checkpointing

New: app/agents/checkpointing.py

"""Configure LangGraph checkpointing → Railway PostgreSQL."""

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from app.config import settings


async def get_checkpointer():
    """Create checkpointer using DATABASE_URL_ADMIN (superuser, bypasses RLS).

    Thread ID = case_id. Checkpoints are case-scoped.
    """
    return AsyncPostgresSaver.from_conn_string(
        settings.database_url_admin or settings.database_url
    )

Modified: app/config.py

Add database_url_admin setting (reads DATABASE_URL_ADMIN env var).


Step 8: PFS Remediation Behavior

No new files — implemented via prompt layer contexts (Step 4) and the Triage Agent's route_by_pfs_band function (Step 5).

The remediation.yaml layer context changes the agent's behavior: - PFS 60-79: "Flag gaps, offer to proceed or address" - PFS 40-59: "Focused remediation loop with concrete actions" - PFS 0-39: "Honest redirect, save state, offer email summary"


Step 9: Tests

New test files:

File Coverage Tests
tests/test_layer_state.py LayerState, PatientLayerState, serialization, thresholds ~10
tests/test_pfs_scorer.py PFS computation, bands, blocking flags, config loading ~8
tests/test_hss_scorer.py HSS reframing from existing matching, cost alignment ~5
tests/test_fms_scorer.py Sigmoid computation, thresholds ~4
tests/test_extractors/test_intent_extractor.py Schema validation, signal extraction, confidence ~6
tests/test_extractors/test_travel_extractor.py Tier assignment, auto-overrides ~6
tests/test_extractors/test_logistics_extractor.py Visa derivation, timeline feasibility ~5
tests/test_extractors/test_financial_extractor.py PV qualification, budget gates ~5
tests/test_triage_agent.py Graph structure, layer routing, PFS gating ~8

Estimated: ~57 new tests.


Step 10: Integration + Verification

  1. Run full test suite — verify no regressions
  2. Test with Flagsmith triage_agent_v3 = false — existing behavior preserved
  3. Test with triage_agent_v3 = true — new triage agent active
  4. Shadow mode: run both paths on same input, compare outputs via Langfuse
  5. Code review via subagent
  6. Documentation sweep (CLAUDE.md, changelog, mkdocs nav)

File Summary

New files (~25)

Category Files
Core modules layer_state.py, pfs_scorer.py, hss_scorer.py, fms_scorer.py
Extractors intent_extractor.py, travel_extractor.py, logistics_extractor.py, financial_extractor.py
Agent triage_agent.py, checkpointing.py
Models graph_node_audit.py, scoring_audit.py, extraction_audit.py
Config scoring.yaml
Prompts 4 base + 4 examples + 7 layer contexts = 15 YAML files
Migrations 2 Alembic files
Tests ~9 test files

Modified files (~5)

File Change
app/agents/orchestrator.py Route to triage agent when triage_agent_v3 flag on
app/services/prompt_loader.py Add load_layer_context()
app/config.py Add database_url_admin setting
app/models/case.py Reference new layer_state column
config/feature_flags.yaml Add triage_agent_v3, pfs_weights, hss_weights, fms_weights

Edge Cases (from design, verified against impl)

Scenario Handling Step
Patient completes Layer 5 before Layer 1 Layers tracked independently. Agent bookmarks, circles back. 5
Layer 2 data contradicts Layer 1 emotional state PFS weights medical over emotional. 2
Patient refuses Layer 1 questions Layer 1 defaults to neutral. Does not block. 3
PFS drops below threshold mid-intake Matching blocked. Agent explains. Coordinator notified. 8
Financial qualification gates out all providers Agent redirects with market range anchoring. 3, 8
Multiple layers complete same turn Multiple summary cards stacked. 5
Layer extractor fails (LLM timeout) Non-blocking. Completion stays at previous value. Retry next turn. 3
Existing workflow_state must keep working Both workflow_state and layer_state coexist. Feature flag controls which path runs. 1, 5

Rollback Plan

Feature flag triage_agent_v3 controls the routing: - false (default): existing orchestrator phases, workflow_state used - true: new Triage Agent, layer_state used

If issues found after rollout: 1. Set triage_agent_v3 = false in Flagsmith 2. Existing behavior restored immediately (no deploy needed) 3. New layer_state column is nullable — ignored when flag is off


Implementation Checklist (Opus + Sonnet tiers)

Opus tier (architecture, judgment calls)

# Task Est.
O1 Design TriageState TypedDict and graph topology 30 min
O2 Design extractor structured output schemas (4) 45 min
O3 Design PFS/HSS/FMS scorer interfaces 30 min
O4 Write layer context prompts (7 YAML) 1 hour
O5 Design audit table schemas and hash chain 30 min
O6 Review orchestrator integration approach 20 min

Sonnet tier (implementation)

# Task Est.
S1 Implement layer_state.py + Alembic migration 45 min
S2 Implement config/scoring.yaml 20 min
S3 Implement pfs_scorer.py 45 min
S4 Implement hss_scorer.py 30 min
S5 Implement fms_scorer.py 20 min
S6 Implement intent_extractor.py 45 min
S7 Implement travel_extractor.py 45 min
S8 Implement logistics_extractor.py 30 min
S9 Implement financial_extractor.py 30 min
S10 Write 4 base + 4 example prompt YAMLs 1 hour
S11 Update prompt_loader.py with load_layer_context() 30 min
S12 Implement triage_agent.py (graph + nodes) 1.5 hours
S13 Implement checkpointing.py 20 min
S14 Implement audit models + Alembic migration 45 min
S15 Update orchestrator.py with triage routing 30 min
S16 Write tests (9 files, ~57 tests) 3 hours
S17 Integration testing + verification 1 hour
S18 Documentation sweep 30 min