Wave 1 Implementation Spec — Intake Restructuring¶
Status: Implemented in Session 43
Date: 2026-04-17
Session: 42
Design source: docs/specs/platform-restructuring-design.md (Parts 1-3, 7-11, 17)
Depends on: Wave 0 complete (DAO Layer + RLS + LLM Gateway)
Effort: 3-4 weeks
UI/UX: None — pure backend. Frontend continues unchanged.
Overview¶
Restructure the intake pipeline from a phase-based orchestrator into a layer-aware Triage Agent with parallel extractors and three-score system (PFS/HSS/FMS). The existing chat API surface is unchanged — this is an internal architecture evolution.
Implementation Order¶
Execute sequentially — each step depends on the previous:
| Step | What | New files | Modified files | Effort |
|---|---|---|---|---|
| 1 | Layer state model + Alembic migration | 1 migration, 1 module | case model | 1 day |
| 2 | Scoring config + pure function scorers | 1 YAML, 3 modules | — | 1 day |
| 3 | Layer extractors (5) | 5 modules | — | 2-3 days |
| 4 | Prompt layer contexts | 7 YAML, 1 module update | prompt_loader.py | 1 day |
| 5 | Triage Agent as LangGraph graph | 1 module | orchestrator.py | 2-3 days |
| 6 | Audit tables + Alembic migration | 1 migration, 3 modules | — | 1 day |
| 7 | LangGraph checkpointing | 1 module | database.py | 1 day |
| 8 | PFS remediation behavior | — | triage agent prompts | 1 day |
| 9 | Tests | 8+ test files | — | 2 days |
| 10 | Integration + verification | — | — | 1 day |
Step 1: Layer State Model¶
New: app/services/layer_state.py¶
Manages the patient's 5-layer intake state. Pure data — no LLM calls.
"""
Curaway — Layer State Management.
Tracks completion of 5 intake dimensions. Layers are independent —
completion is a high-water mark, not a linear pipeline.
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class LayerName(str, Enum):
INTENT = "intent_capture"
MEDICAL = "medical_status"
TRAVEL = "travel_readiness"
LOGISTICS = "logistics"
FINANCIAL = "financial_readiness"
class LayerStatus(str, Enum):
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
COMPLETE = "complete"
@dataclass
class LayerState:
status: LayerStatus = LayerStatus.NOT_STARTED
completion: float = 0.0
data: dict[str, Any] = field(default_factory=dict)
summary_card_fired: bool = False
@dataclass
class PatientLayerState:
"""Full layer state for a patient's intake journey."""
intent_capture: LayerState = field(default_factory=LayerState)
medical_status: LayerState = field(default_factory=LayerState)
travel_readiness: LayerState = field(default_factory=LayerState)
logistics: LayerState = field(default_factory=LayerState)
financial_readiness: LayerState = field(default_factory=LayerState)
def get_layer(self, name: LayerName) -> LayerState:
return getattr(self, name.value)
def set_layer(self, name: LayerName, state: LayerState) -> None:
setattr(self, name.value, state)
def to_dict(self) -> dict:
return {
name.value: {
"status": self.get_layer(name).status.value,
"completion": self.get_layer(name).completion,
"data": self.get_layer(name).data,
"summary_card_fired": self.get_layer(name).summary_card_fired,
}
for name in LayerName
}
@classmethod
def from_dict(cls, data: dict) -> "PatientLayerState":
state = cls()
for name in LayerName:
if name.value in data:
layer_data = data[name.value]
setattr(state, name.value, LayerState(
status=LayerStatus(layer_data.get("status", "not_started")),
completion=layer_data.get("completion", 0.0),
data=layer_data.get("data", {}),
summary_card_fired=layer_data.get("summary_card_fired", False),
))
return state
def all_above_threshold(self, threshold: float, exclude: list[LayerName] | None = None) -> bool:
"""Check if all layers (except excluded) are above a completion threshold."""
for name in LayerName:
if exclude and name in exclude:
continue
if self.get_layer(name).completion < threshold:
return False
return True
Alembic migration: Add layer_state column to cases¶
# alembic/versions/xxxx_add_layer_state_to_cases.py
def upgrade():
op.add_column('cases', sa.Column('layer_state', FlexibleJSON, nullable=True))
def downgrade():
op.drop_column('cases', 'layer_state')
The layer_state column stores PatientLayerState.to_dict() as JSONB alongside the existing workflow_state. Both coexist during transition — workflow_state for backward compatibility, layer_state for the new intake.
Step 2: Scoring Config + Scorers¶
New: config/scoring.yaml¶
Full YAML from platform-restructuring-design.md Part 9. All weights configurable, Flagsmith override per key.
New: app/services/pfs_scorer.py¶
"""PFS — Patient Fitness Score (0-100). Pure function, deterministic."""
def compute_pfs(
layer_state: dict,
risk_items: list[dict],
config: dict,
) -> dict:
"""Compute PFS from layer data + risk assessor output.
Returns: {
"score": 78,
"band": "conditionally_ready",
"transport_tier": "T1",
"blocking_flags": [],
"component_breakdown": {"medical": 85, "travel": 100, ...},
"gaps": ["passport_timeline_tight"]
}
"""
New: app/services/hss_scorer.py¶
Reframes existing matching engine output as HSS (0-100). Thin wrapper.
New: app/services/fms_scorer.py¶
Sigmoid combination of PFS + HSS + preference alignment + historical conversion.
Step 3: Layer Extractors¶
New: app/services/extractors/intent_extractor.py¶
Structured output from conversation turn → Layer 1 data (case_type, emotional_state, urgency_score, decision_stage, trigger_event).
New: app/services/extractors/travel_extractor.py¶
Structured output → Layer 3 data (mobility, oxygen, hospitalization, transport_tier).
New: app/services/extractors/logistics_extractor.py¶
Structured output → Layer 4 data (country, passport, visa, timeline, companion).
New: app/services/extractors/financial_extractor.py¶
Structured output → Layer 5 data (funding_source, budget_range, flexibility, insurance).
Existing: app/agents/clinical_context.py¶
Already handles Layer 2 (medical extraction). Wired as a LangGraph node — no new code, just integration.
Extractor pattern (all 4 new extractors follow this):¶
"""
Curaway — Intent Capture Extractor (Layer 1).
Pure function: (conversation_turn, patient_state) → structured_data_delta.
Uses structured output (JSON schema enforcement) via llm_gateway.invoke().
"""
from app.services.llm_gateway import invoke as llm_invoke
from app.services.prompt_loader import load_prompt
EXTRACTION_SCHEMA = {
"type": "object",
"properties": {
"case_type": {"type": "string", "enum": ["elective", "selective", "urgent"]},
"emotional_state": {
"type": "object",
"properties": {
"readiness": {"type": "string"},
"primary_fear": {"type": "string"},
"primary_hope": {"type": "string"},
"fear_intensity": {"type": "number"},
"hope_intensity": {"type": "number"},
}
},
# ... full schema from design Part 7
}
}
async def extract_intent(
message_text: str,
current_layer_data: dict,
patient_context: str,
) -> dict:
"""Extract Layer 1 signals from a conversation turn.
Returns delta — only newly extracted signals, not the full layer.
Caller merges delta into existing layer state.
"""
prompt = load_prompt("intent_extraction", "en", version="v1")
# ... build messages, call llm_invoke with expects_json=True
# ... parse response, return delta
Step 4: Prompt Layer Contexts¶
New: config/prompts/layer_contexts/ (7 files)¶
| File | Purpose |
|---|---|
intent_capture.yaml |
Guide agent to capture why, fears, hopes |
medical_status.yaml |
Guide agent to gather clinical data (≈ existing intake.yaml) |
travel_readiness.yaml |
Guide agent to assess travel fitness |
logistics.yaml |
Guide agent to capture passport, visa, timeline |
financial_readiness.yaml |
Guide agent to understand budget (with market range anchoring) |
remediation.yaml |
Guide agent through PFS gap remediation |
multi_layer.yaml |
Guide agent when patient bounces between layers |
New: config/prompts/base/ (4 extractor prompts)¶
| File | Purpose |
|---|---|
intent_extraction_v1.yaml |
Base prompt for intent extractor |
travel_extraction_v1.yaml |
Base prompt for travel extractor |
logistics_extraction_v1.yaml |
Base prompt for logistics extractor |
financial_extraction_v1.yaml |
Base prompt for financial extractor |
New: config/prompts/examples/en/ (4 example files)¶
One example file per extractor, following existing patterns.
Modified: app/services/prompt_loader.py¶
Add load_layer_context(active_layer, layer_state, pfs_band) (~50 lines). Same pattern as load_phase_context().
Step 5: Triage Agent as LangGraph Graph¶
IMPORTANT — Concurrency model (from Step 1 code review):
PatientLayerState is NOT thread-safe. The run_extractors parallel node must NOT share a single PatientLayerState instance across concurrent extractors. Use LangGraph's reducer pattern:
from typing import Annotated
from operator import methodcaller # or a custom reducer
def merge_layer_deltas(current: dict, delta: dict) -> dict:
"""LangGraph reducer — merges per-extractor deltas into layer_state dict."""
state = PatientLayerState.from_dict(current)
# Apply delta — delta is {layer_name: {"completion": ..., "data_delta": ...}}
for layer_name, layer_delta in delta.items():
state.update_layer(
LayerName(layer_name),
completion=layer_delta.get("completion"),
data_delta=layer_delta.get("data_delta"),
)
return state.to_dict()
class TriageState(TypedDict):
layer_state: Annotated[dict, merge_layer_deltas]
# ... other fields
Each extractor node returns a delta, not the full layer_state. LangGraph applies the reducer to combine parallel extractor outputs safely.
New: app/agents/triage_agent.py¶
The core of Wave 1. Replaces the phase-routing logic in the orchestrator with a LangGraph compiled graph.
"""
Curaway — Triage Agent (A3 Model).
Supervisor pattern: one conversational agent + parallel extractors.
Code nodes handle scoring and routing (deterministic).
LLM nodes handle conversation and extraction (non-deterministic).
Graph topology:
triage_conversation → parallel_extractors → pfs_scorer → route_by_pfs
"""
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
class TriageState(TypedDict):
"""State that flows through the triage graph."""
messages: list[BaseMessage]
layer_state: dict
pfs: dict | None
hss: dict | None
fms: float | None
active_layer: str
response: str | None
case_id: str
patient_id: str
tenant_id: str
def build_triage_graph(checkpointer=None):
graph = StateGraph(TriageState)
# Nodes
graph.add_node("choose_layer", choose_next_layer) # Code: adaptive routing
graph.add_node("generate_response", generate_response) # LLM: conversation
graph.add_node("run_extractors", run_extractors) # LLM: parallel extraction
graph.add_node("compute_pfs", compute_pfs_node) # Code: deterministic
graph.add_node("check_gates", check_matching_gates) # Code: threshold check
# Edges
graph.add_edge("choose_layer", "generate_response")
graph.add_edge("generate_response", "run_extractors")
graph.add_edge("run_extractors", "compute_pfs")
graph.add_conditional_edges(
"compute_pfs",
route_by_pfs_band,
{
"continue": "choose_layer", # PFS < threshold, keep collecting
"matching_ready": END, # PFS >= 60, layers >= 0.6
"remediation": "choose_layer", # PFS 40-59, remediation prompts
"not_ready": END, # PFS < 40, honest redirect
}
)
graph.set_entry_point("choose_layer")
if checkpointer:
return graph.compile(checkpointer=checkpointer)
return graph.compile()
Modified: app/agents/orchestrator.py¶
The orchestrator routes to the Triage Agent for intake (replacing the current phase-based routing). Other flows (matching, explanation, consent) stay unchanged.
# In orchestrator, replace phase routing:
if case.status == "intake":
# Old: route to identify_procedure / records_first / intake phases
# New: route to triage agent
from app.agents.triage_agent import run_triage_turn
return await run_triage_turn(state)
Feature flag: triage_agent_v3¶
When triage_agent_v3 = true, intake uses the new Triage Agent.
When false, the existing orchestrator phases run (backward compatible).
Step 6: Audit Tables¶
Alembic migration: 3 new tables¶
def upgrade():
# graph_node_audit_log
op.create_table(
'graph_node_audit_log',
sa.Column('id', sa.String(36), primary_key=True),
sa.Column('case_id', sa.String(36), nullable=False, index=True),
sa.Column('turn_number', sa.Integer, nullable=False),
sa.Column('node_name', sa.String(100), nullable=False),
sa.Column('node_type', sa.String(20), nullable=False), # llm/extractor/scorer/router
sa.Column('input_hash', sa.String(64)),
sa.Column('output_hash', sa.String(64)),
sa.Column('state_before', FlexibleJSON),
sa.Column('state_after', FlexibleJSON),
sa.Column('decision_made', sa.String(100)),
sa.Column('decision_reason', sa.Text),
sa.Column('model_used', sa.String(50)),
sa.Column('duration_ms', sa.Integer),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# scoring_audit_log
op.create_table(
'scoring_audit_log',
sa.Column('id', sa.String(36), primary_key=True),
sa.Column('case_id', sa.String(36), nullable=False, index=True),
sa.Column('score_type', sa.String(10), nullable=False), # pfs/hss/fms
sa.Column('score_value', sa.Float, nullable=False),
sa.Column('config_version', sa.String(64)),
sa.Column('inputs', FlexibleJSON),
sa.Column('component_breakdown', FlexibleJSON),
sa.Column('threshold_crossed', sa.Boolean, default=False),
sa.Column('threshold_direction', sa.String(10)),
sa.Column('previous_score', sa.Float),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# extraction_audit_log
op.create_table(
'extraction_audit_log',
sa.Column('id', sa.String(36), primary_key=True),
sa.Column('case_id', sa.String(36), nullable=False, index=True),
sa.Column('turn_number', sa.Integer, nullable=False),
sa.Column('layer', sa.String(30), nullable=False),
sa.Column('extracted_signals', FlexibleJSON),
sa.Column('confidence_scores', FlexibleJSON),
sa.Column('completion_before', sa.Float),
sa.Column('completion_after', sa.Float),
sa.Column('model_used', sa.String(50)),
sa.Column('input_text', sa.Text),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
New models: app/models/graph_node_audit.py, app/models/scoring_audit.py, app/models/extraction_audit.py¶
Append-only. Repositories expose create() and list_by_case() only.
Step 7: LangGraph Checkpointing¶
New: app/agents/checkpointing.py¶
"""Configure LangGraph checkpointing → Railway PostgreSQL."""
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from app.config import settings
async def get_checkpointer():
"""Create checkpointer using DATABASE_URL_ADMIN (superuser, bypasses RLS).
Thread ID = case_id. Checkpoints are case-scoped.
"""
return AsyncPostgresSaver.from_conn_string(
settings.database_url_admin or settings.database_url
)
Modified: app/config.py¶
Add database_url_admin setting (reads DATABASE_URL_ADMIN env var).
Step 8: PFS Remediation Behavior¶
No new files — implemented via prompt layer contexts (Step 4) and the Triage Agent's route_by_pfs_band function (Step 5).
The remediation.yaml layer context changes the agent's behavior:
- PFS 60-79: "Flag gaps, offer to proceed or address"
- PFS 40-59: "Focused remediation loop with concrete actions"
- PFS 0-39: "Honest redirect, save state, offer email summary"
Step 9: Tests¶
New test files:¶
| File | Coverage | Tests |
|---|---|---|
tests/test_layer_state.py |
LayerState, PatientLayerState, serialization, thresholds | ~10 |
tests/test_pfs_scorer.py |
PFS computation, bands, blocking flags, config loading | ~8 |
tests/test_hss_scorer.py |
HSS reframing from existing matching, cost alignment | ~5 |
tests/test_fms_scorer.py |
Sigmoid computation, thresholds | ~4 |
tests/test_extractors/test_intent_extractor.py |
Schema validation, signal extraction, confidence | ~6 |
tests/test_extractors/test_travel_extractor.py |
Tier assignment, auto-overrides | ~6 |
tests/test_extractors/test_logistics_extractor.py |
Visa derivation, timeline feasibility | ~5 |
tests/test_extractors/test_financial_extractor.py |
PV qualification, budget gates | ~5 |
tests/test_triage_agent.py |
Graph structure, layer routing, PFS gating | ~8 |
Estimated: ~57 new tests.
Step 10: Integration + Verification¶
- Run full test suite — verify no regressions
- Test with Flagsmith
triage_agent_v3 = false— existing behavior preserved - Test with
triage_agent_v3 = true— new triage agent active - Shadow mode: run both paths on same input, compare outputs via Langfuse
- Code review via subagent
- Documentation sweep (CLAUDE.md, changelog, mkdocs nav)
File Summary¶
New files (~25)¶
| Category | Files |
|---|---|
| Core modules | layer_state.py, pfs_scorer.py, hss_scorer.py, fms_scorer.py |
| Extractors | intent_extractor.py, travel_extractor.py, logistics_extractor.py, financial_extractor.py |
| Agent | triage_agent.py, checkpointing.py |
| Models | graph_node_audit.py, scoring_audit.py, extraction_audit.py |
| Config | scoring.yaml |
| Prompts | 4 base + 4 examples + 7 layer contexts = 15 YAML files |
| Migrations | 2 Alembic files |
| Tests | ~9 test files |
Modified files (~5)¶
| File | Change |
|---|---|
app/agents/orchestrator.py |
Route to triage agent when triage_agent_v3 flag on |
app/services/prompt_loader.py |
Add load_layer_context() |
app/config.py |
Add database_url_admin setting |
app/models/case.py |
Reference new layer_state column |
config/feature_flags.yaml |
Add triage_agent_v3, pfs_weights, hss_weights, fms_weights |
Edge Cases (from design, verified against impl)¶
| Scenario | Handling | Step |
|---|---|---|
| Patient completes Layer 5 before Layer 1 | Layers tracked independently. Agent bookmarks, circles back. | 5 |
| Layer 2 data contradicts Layer 1 emotional state | PFS weights medical over emotional. | 2 |
| Patient refuses Layer 1 questions | Layer 1 defaults to neutral. Does not block. | 3 |
| PFS drops below threshold mid-intake | Matching blocked. Agent explains. Coordinator notified. | 8 |
| Financial qualification gates out all providers | Agent redirects with market range anchoring. | 3, 8 |
| Multiple layers complete same turn | Multiple summary cards stacked. | 5 |
| Layer extractor fails (LLM timeout) | Non-blocking. Completion stays at previous value. Retry next turn. | 3 |
| Existing workflow_state must keep working | Both workflow_state and layer_state coexist. Feature flag controls which path runs. |
1, 5 |
Rollback Plan¶
Feature flag triage_agent_v3 controls the routing:
- false (default): existing orchestrator phases, workflow_state used
- true: new Triage Agent, layer_state used
If issues found after rollout:
1. Set triage_agent_v3 = false in Flagsmith
2. Existing behavior restored immediately (no deploy needed)
3. New layer_state column is nullable — ignored when flag is off
Implementation Checklist (Opus + Sonnet tiers)¶
Opus tier (architecture, judgment calls)¶
| # | Task | Est. |
|---|---|---|
| O1 | Design TriageState TypedDict and graph topology | 30 min |
| O2 | Design extractor structured output schemas (4) | 45 min |
| O3 | Design PFS/HSS/FMS scorer interfaces | 30 min |
| O4 | Write layer context prompts (7 YAML) | 1 hour |
| O5 | Design audit table schemas and hash chain | 30 min |
| O6 | Review orchestrator integration approach | 20 min |
Sonnet tier (implementation)¶
| # | Task | Est. |
|---|---|---|
| S1 | Implement layer_state.py + Alembic migration |
45 min |
| S2 | Implement config/scoring.yaml |
20 min |
| S3 | Implement pfs_scorer.py |
45 min |
| S4 | Implement hss_scorer.py |
30 min |
| S5 | Implement fms_scorer.py |
20 min |
| S6 | Implement intent_extractor.py |
45 min |
| S7 | Implement travel_extractor.py |
45 min |
| S8 | Implement logistics_extractor.py |
30 min |
| S9 | Implement financial_extractor.py |
30 min |
| S10 | Write 4 base + 4 example prompt YAMLs | 1 hour |
| S11 | Update prompt_loader.py with load_layer_context() |
30 min |
| S12 | Implement triage_agent.py (graph + nodes) |
1.5 hours |
| S13 | Implement checkpointing.py |
20 min |
| S14 | Implement audit models + Alembic migration | 45 min |
| S15 | Update orchestrator.py with triage routing |
30 min |
| S16 | Write tests (9 files, ~57 tests) | 3 hours |
| S17 | Integration testing + verification | 1 hour |
| S18 | Documentation sweep | 30 min |