Session N: Orchestrator Planner — Feature Spec¶
Context¶
You are working on the Curaway backend (curaway_src repo). This is the
implementation spec for Layer 2 of the conversation flow remediation
plan. Read the steer first:
docs/specs/ai-steer/orchestrator-planner-steer.md
Also read CLAUDE.md and Layer 1's docs:
- docs/specs/ai-steer/conversation-flow-gates-steer.md
- docs/specs/conversation-flow-gates-feature.md
Pre-condition: Layer 1 (gates_v2, PR #70) must be deployed and
have run for at least one full week of real traffic. Do not start Layer 2
until you can confirm Layer 1's gates_v2.intake_complete and
gates_v2.matching_advanced events are firing in production.
Branch¶
PART A — Planner module¶
A1. Create app/agents/planner.py¶
"""
Curaway — Orchestrator planner.
Single LLM call per turn that picks the next action for the orchestrator
to take. Replaces the hand-coded if/elif tree in case_orchestrator. See
docs/specs/ai-steer/orchestrator-planner-steer.md for design rationale.
"""
from enum import Enum
from typing import TypedDict, Literal
class PlannerAction(str, Enum):
IDENTIFY_PROCEDURE = "identify_procedure"
REQUEST_RECORDS = "request_records"
PROCESS_UPLOADED_DOCUMENTS = "process_uploaded_documents"
COLLECT_INTAKE_INFO = "collect_intake_info"
ADVANCE_TO_MATCHING = "advance_to_matching"
SHOW_MATCHES = "show_matches"
REQUEST_PROVIDER_SELECTION = "request_provider_selection"
REQUEST_CONSENT = "request_consent"
FORWARD_RECORDS = "forward_records"
ANSWER_QUESTION = "answer_question"
HANDLE_BLOCKING_ISSUE = "handle_blocking_issue"
CELEBRATE_JOURNEY_COMPLETE = "celebrate_journey_complete"
class PlannerInput(TypedDict):
case_id: str
procedure_identified: bool
procedure_name: str | None
intake_complete: bool
ehr_completeness: float
has_documents: bool
documents_pending_processing: bool
blocking_issues: list[str]
missing_critical_info: list[str]
matching_complete: bool
providers_selected: bool
consent_given: bool
forwarded: bool
last_user_message_summary: str
last_user_intent: str | None
turn_number: int
class PlannerOutput(TypedDict):
next_action: str # PlannerAction value
reason: str
missing_for_advance: list[str]
confidence: float
PLANNER_SYSTEM_PROMPT = """You are the routing planner for the Curaway
patient intake conversation. Your only job is to pick the next action
the orchestrator should take, based on the current case state.
You do NOT generate patient-facing text. You do NOT diagnose. You do NOT
make clinical decisions. You ONLY pick which action handler runs next.
Available actions:
{actions_with_descriptions}
Rules:
- If the procedure is not identified yet, return identify_procedure.
- If documents are still being processed (OCR/extraction in flight),
return process_uploaded_documents to acknowledge and wait.
- If the case has blocking issues (uncontrolled diabetes, anticoagulation
without bridging plan, etc.), return handle_blocking_issue first.
- If intake_complete is true and matching is not done, return advance_to_matching.
- If matching is done but providers not selected, return request_provider_selection.
- If providers selected but consent not given, return request_consent.
- If consent given but not forwarded, return forward_records.
- If everything is done, return celebrate_journey_complete.
- If the patient asked a direct question (intent=question), return answer_question
even if other gates are not satisfied — answer the question first.
Output strict JSON. No markdown, no prose, no commentary. Schema:
{output_schema}
"""
async def plan_next_action(
state: PlannerInput,
*,
langfuse_handler=None,
) -> PlannerOutput:
"""Run one Haiku call and return the planner's chosen action.
Falls back to a deterministic rule-based default if the LLM call
fails or returns garbage. Caller is responsible for the actual
dispatch.
"""
# Implementation: build the system prompt with the action enum
# rendered, format `state` as the user message, call _get_llm()
# with structured output, parse JSON, validate against
# PlannerAction enum, return.
#
# On any error: return _fallback_decision(state).
raise NotImplementedError("see implementation steps below")
def _fallback_decision(state: PlannerInput) -> PlannerOutput:
"""Deterministic fallback when the LLM call fails. Mirrors the
legacy if/elif order so we never crash."""
if not state["procedure_identified"]:
return {
"next_action": PlannerAction.IDENTIFY_PROCEDURE.value,
"reason": "fallback: procedure not identified",
"missing_for_advance": ["procedure"],
"confidence": 0.5,
}
if state["documents_pending_processing"]:
return {"next_action": PlannerAction.PROCESS_UPLOADED_DOCUMENTS.value, ...}
if state["blocking_issues"]:
return {"next_action": PlannerAction.HANDLE_BLOCKING_ISSUE.value, ...}
if not state["intake_complete"]:
return {"next_action": PlannerAction.COLLECT_INTAKE_INFO.value, ...}
if not state["matching_complete"]:
return {"next_action": PlannerAction.ADVANCE_TO_MATCHING.value, ...}
if not state["providers_selected"]:
return {"next_action": PlannerAction.REQUEST_PROVIDER_SELECTION.value, ...}
if not state["consent_given"]:
return {"next_action": PlannerAction.REQUEST_CONSENT.value, ...}
if not state["forwarded"]:
return {"next_action": PlannerAction.FORWARD_RECORDS.value, ...}
return {"next_action": PlannerAction.CELEBRATE_JOURNEY_COMPLETE.value, ...}
A2. Wire into _get_llm() and Langfuse¶
The planner uses the same Haiku client as _llm_generate. Pass the
Langfuse handler so every planner call shows up in traces with
source=planner. Cost and latency are auto-captured.
PART B — State adapter¶
B1. app/agents/planner_state.py¶
A small pure-function helper that takes a case (SQLAlchemy model
already loaded by the orchestrator) and returns a PlannerInput dict.
The data is already computed by patient_state.py (Session 28) — this
is just a shape adapter.
def build_planner_input(case, *, last_message: str, turn_number: int) -> PlannerInput:
"""Snapshot the case state into the planner's input shape."""
ehr = case.ehr_snapshot or {}
ws = case.workflow_state or {}
meta = case.extra_metadata or {}
risks = ehr.get("risk_factors") or []
blocking = [r["factor"] for r in risks if r.get("is_blocking")]
return {
"case_id": str(case.id),
"procedure_identified": bool(ws.get("procedure_identified")),
"procedure_name": case.procedure_name,
"intake_complete": bool(ws.get("intake_complete")),
"ehr_completeness": float(ehr.get("completeness_score", 0)),
"has_documents": bool(ws.get("analyzed_doc_count", 0) > 0),
"documents_pending_processing": bool(ws.get("documents_pending", 0) > 0),
"blocking_issues": blocking,
"missing_critical_info": list(ehr.get("missing_information") or []),
"matching_complete": bool(ws.get("matching_complete")),
"providers_selected": bool(ws.get("providers_selected")),
"consent_given": bool(ws.get("consent_given")),
"forwarded": bool(ws.get("forwarded")),
"last_user_message_summary": (last_message or "")[:200],
"last_user_intent": meta.get("last_classified_intent"),
"turn_number": turn_number,
}
PART C — Shadow mode wiring¶
C1. Feature flag planner_v1_enabled¶
Add to config/feature_flags.yaml:
planner_v1_enabled:
default: false
description: "Layer 2 — orchestrator planner. When false, runs in shadow mode (decision logged, not used). When true, planner becomes authoritative and the legacy if/elif logic is the fallback."
C2. Shadow logging¶
In case_orchestrator.handle_message, after the legacy logic decides
the action but before the response is returned, insert:
try:
from app.agents.planner import plan_next_action
from app.agents.planner_state import build_planner_input
state = build_planner_input(case, last_message=message, turn_number=ws.get("turn_number", 0))
shadow_decision = await plan_next_action(state, langfuse_handler=langfuse_handler)
await _record_planner_shadow_event(db, case, state, shadow_decision, control_action=...)
except Exception as exc:
logger.debug("planner shadow skipped: %s", exc)
_record_planner_shadow_event writes to the events table with
event_type="planner.shadow_decision". Payload includes the input
snapshot, the planner's chosen action, the legacy logic's chosen action,
agreement (bool), and confidence.
C3. Cutover (deferred to a separate PR)¶
Once shadow mode shows >=95% agreement on a week of traffic, a follow-up PR replaces the legacy branching with:
if is_feature_enabled("planner_v1_enabled"):
state = build_planner_input(...)
decision = await plan_next_action(state, ...)
return await _dispatch(decision["next_action"], db, case, message, ...)
else:
# legacy if/elif tree
...
_dispatch is a small dict mapping PlannerAction values to the
existing handler functions. No new handlers — see Steer §3.3 for the
full table.
PART D — Tests¶
D1. tests/test_planner_state.py¶
Pure-function tests for build_planner_input:
- empty case → all bools false, completeness 0.0
- case with procedure + 2 docs + intake_complete + 0 blocking risks
- case with 1 blocking risk → blocking_issues populated
- case at journey-complete state → all flags true
D2. tests/test_planner_fallback.py¶
Tests for _fallback_decision covering every legacy branch:
- no procedure → IDENTIFY_PROCEDURE
- procedure + pending docs → PROCESS_UPLOADED_DOCUMENTS
- procedure + intake done + no matching → ADVANCE_TO_MATCHING
- ... etc, one test per ordered branch
D3. tests/test_planner_llm.py¶
Contract tests for plan_next_action. Mock the LLM to return:
- valid JSON with a real action → returned as-is
- valid JSON with an action not in the enum → fallback used
- invalid JSON → fallback used
- LLM exception → fallback used
- missing required fields in JSON → fallback used
D4. tests/test_planner_shadow_event.py¶
Smoke test that _record_planner_shadow_event writes to the events
table with the right shape. Use the existing in-memory SQLite fixture.
PART E — Verification¶
pytest tests/test_planner_state.py tests/test_planner_fallback.py \
tests/test_planner_llm.py tests/test_planner_shadow_event.py -v
pytest -q
After deploy: in Langfuse, filter for event_type=planner.shadow_decision
and confirm events are flowing for every chat turn. Check the
agreement field — at this stage we expect ~50-70% agreement (the
planner needs prompt tuning).
PART F — Rollout¶
- Week 1: Ship Parts A, B, C. Flag
planner_v1_enabled = false. Shadow mode only. - Week 2: Review shadow decisions in Langfuse. Tune the planner system prompt based on disagreements with the legacy logic that look correct (i.e. the planner was right, the legacy was wrong).
- Week 3: Cutover PR — planner authoritative, legacy as fallback. Flag at 5% rollout.
- Week 4: 25% → 100% if no regressions.
- Week 5+: Delete the legacy if/elif tree if cutover holds.
FINAL CHECKLIST¶
- [ ]
app/agents/planner.pycreated withplan_next_action+_fallback_decision - [ ]
app/agents/planner_state.pycreated withbuild_planner_input - [ ]
planner_v1_enabledflag added tofeature_flags.yaml(default false) - [ ]
case_orchestrator.handle_messagewrites a shadow decision event after the legacy decision - [ ] All 4 test files pass + full backend suite green
- [ ] Langfuse shows planner traces in production within 1h of deploy
- [ ] Shadow agreement reviewed before cutover PR