Skip to content

Session N: Orchestrator Planner — Feature Spec

Context

You are working on the Curaway backend (curaway_src repo). This is the implementation spec for Layer 2 of the conversation flow remediation plan. Read the steer first: docs/specs/ai-steer/orchestrator-planner-steer.md

Also read CLAUDE.md and Layer 1's docs: - docs/specs/ai-steer/conversation-flow-gates-steer.md - docs/specs/conversation-flow-gates-feature.md

Pre-condition: Layer 1 (gates_v2, PR #70) must be deployed and have run for at least one full week of real traffic. Do not start Layer 2 until you can confirm Layer 1's gates_v2.intake_complete and gates_v2.matching_advanced events are firing in production.

Branch

git checkout main && git pull origin main
git checkout -b feat/orchestrator-planner

PART A — Planner module

A1. Create app/agents/planner.py

"""
Curaway — Orchestrator planner.

Single LLM call per turn that picks the next action for the orchestrator
to take. Replaces the hand-coded if/elif tree in case_orchestrator. See
docs/specs/ai-steer/orchestrator-planner-steer.md for design rationale.
"""
from enum import Enum
from typing import TypedDict, Literal


class PlannerAction(str, Enum):
    IDENTIFY_PROCEDURE = "identify_procedure"
    REQUEST_RECORDS = "request_records"
    PROCESS_UPLOADED_DOCUMENTS = "process_uploaded_documents"
    COLLECT_INTAKE_INFO = "collect_intake_info"
    ADVANCE_TO_MATCHING = "advance_to_matching"
    SHOW_MATCHES = "show_matches"
    REQUEST_PROVIDER_SELECTION = "request_provider_selection"
    REQUEST_CONSENT = "request_consent"
    FORWARD_RECORDS = "forward_records"
    ANSWER_QUESTION = "answer_question"
    HANDLE_BLOCKING_ISSUE = "handle_blocking_issue"
    CELEBRATE_JOURNEY_COMPLETE = "celebrate_journey_complete"


class PlannerInput(TypedDict):
    case_id: str
    procedure_identified: bool
    procedure_name: str | None
    intake_complete: bool
    ehr_completeness: float
    has_documents: bool
    documents_pending_processing: bool
    blocking_issues: list[str]
    missing_critical_info: list[str]
    matching_complete: bool
    providers_selected: bool
    consent_given: bool
    forwarded: bool
    last_user_message_summary: str
    last_user_intent: str | None
    turn_number: int


class PlannerOutput(TypedDict):
    next_action: str  # PlannerAction value
    reason: str
    missing_for_advance: list[str]
    confidence: float


PLANNER_SYSTEM_PROMPT = """You are the routing planner for the Curaway
patient intake conversation. Your only job is to pick the next action
the orchestrator should take, based on the current case state.

You do NOT generate patient-facing text. You do NOT diagnose. You do NOT
make clinical decisions. You ONLY pick which action handler runs next.

Available actions:
{actions_with_descriptions}

Rules:
- If the procedure is not identified yet, return identify_procedure.
- If documents are still being processed (OCR/extraction in flight),
  return process_uploaded_documents to acknowledge and wait.
- If the case has blocking issues (uncontrolled diabetes, anticoagulation
  without bridging plan, etc.), return handle_blocking_issue first.
- If intake_complete is true and matching is not done, return advance_to_matching.
- If matching is done but providers not selected, return request_provider_selection.
- If providers selected but consent not given, return request_consent.
- If consent given but not forwarded, return forward_records.
- If everything is done, return celebrate_journey_complete.
- If the patient asked a direct question (intent=question), return answer_question
  even if other gates are not satisfied — answer the question first.

Output strict JSON. No markdown, no prose, no commentary. Schema:
{output_schema}
"""


async def plan_next_action(
    state: PlannerInput,
    *,
    langfuse_handler=None,
) -> PlannerOutput:
    """Run one Haiku call and return the planner's chosen action.

    Falls back to a deterministic rule-based default if the LLM call
    fails or returns garbage. Caller is responsible for the actual
    dispatch.
    """
    # Implementation: build the system prompt with the action enum
    # rendered, format `state` as the user message, call _get_llm()
    # with structured output, parse JSON, validate against
    # PlannerAction enum, return.
    #
    # On any error: return _fallback_decision(state).
    raise NotImplementedError("see implementation steps below")


def _fallback_decision(state: PlannerInput) -> PlannerOutput:
    """Deterministic fallback when the LLM call fails. Mirrors the
    legacy if/elif order so we never crash."""
    if not state["procedure_identified"]:
        return {
            "next_action": PlannerAction.IDENTIFY_PROCEDURE.value,
            "reason": "fallback: procedure not identified",
            "missing_for_advance": ["procedure"],
            "confidence": 0.5,
        }
    if state["documents_pending_processing"]:
        return {"next_action": PlannerAction.PROCESS_UPLOADED_DOCUMENTS.value, ...}
    if state["blocking_issues"]:
        return {"next_action": PlannerAction.HANDLE_BLOCKING_ISSUE.value, ...}
    if not state["intake_complete"]:
        return {"next_action": PlannerAction.COLLECT_INTAKE_INFO.value, ...}
    if not state["matching_complete"]:
        return {"next_action": PlannerAction.ADVANCE_TO_MATCHING.value, ...}
    if not state["providers_selected"]:
        return {"next_action": PlannerAction.REQUEST_PROVIDER_SELECTION.value, ...}
    if not state["consent_given"]:
        return {"next_action": PlannerAction.REQUEST_CONSENT.value, ...}
    if not state["forwarded"]:
        return {"next_action": PlannerAction.FORWARD_RECORDS.value, ...}
    return {"next_action": PlannerAction.CELEBRATE_JOURNEY_COMPLETE.value, ...}

A2. Wire into _get_llm() and Langfuse

The planner uses the same Haiku client as _llm_generate. Pass the Langfuse handler so every planner call shows up in traces with source=planner. Cost and latency are auto-captured.


PART B — State adapter

B1. app/agents/planner_state.py

A small pure-function helper that takes a case (SQLAlchemy model already loaded by the orchestrator) and returns a PlannerInput dict. The data is already computed by patient_state.py (Session 28) — this is just a shape adapter.

def build_planner_input(case, *, last_message: str, turn_number: int) -> PlannerInput:
    """Snapshot the case state into the planner's input shape."""
    ehr = case.ehr_snapshot or {}
    ws = case.workflow_state or {}
    meta = case.extra_metadata or {}
    risks = ehr.get("risk_factors") or []
    blocking = [r["factor"] for r in risks if r.get("is_blocking")]
    return {
        "case_id": str(case.id),
        "procedure_identified": bool(ws.get("procedure_identified")),
        "procedure_name": case.procedure_name,
        "intake_complete": bool(ws.get("intake_complete")),
        "ehr_completeness": float(ehr.get("completeness_score", 0)),
        "has_documents": bool(ws.get("analyzed_doc_count", 0) > 0),
        "documents_pending_processing": bool(ws.get("documents_pending", 0) > 0),
        "blocking_issues": blocking,
        "missing_critical_info": list(ehr.get("missing_information") or []),
        "matching_complete": bool(ws.get("matching_complete")),
        "providers_selected": bool(ws.get("providers_selected")),
        "consent_given": bool(ws.get("consent_given")),
        "forwarded": bool(ws.get("forwarded")),
        "last_user_message_summary": (last_message or "")[:200],
        "last_user_intent": meta.get("last_classified_intent"),
        "turn_number": turn_number,
    }

PART C — Shadow mode wiring

C1. Feature flag planner_v1_enabled

Add to config/feature_flags.yaml:

planner_v1_enabled:
  default: false
  description: "Layer 2  orchestrator planner. When false, runs in shadow mode (decision logged, not used). When true, planner becomes authoritative and the legacy if/elif logic is the fallback."

C2. Shadow logging

In case_orchestrator.handle_message, after the legacy logic decides the action but before the response is returned, insert:

try:
    from app.agents.planner import plan_next_action
    from app.agents.planner_state import build_planner_input
    state = build_planner_input(case, last_message=message, turn_number=ws.get("turn_number", 0))
    shadow_decision = await plan_next_action(state, langfuse_handler=langfuse_handler)
    await _record_planner_shadow_event(db, case, state, shadow_decision, control_action=...)
except Exception as exc:
    logger.debug("planner shadow skipped: %s", exc)

_record_planner_shadow_event writes to the events table with event_type="planner.shadow_decision". Payload includes the input snapshot, the planner's chosen action, the legacy logic's chosen action, agreement (bool), and confidence.

C3. Cutover (deferred to a separate PR)

Once shadow mode shows >=95% agreement on a week of traffic, a follow-up PR replaces the legacy branching with:

if is_feature_enabled("planner_v1_enabled"):
    state = build_planner_input(...)
    decision = await plan_next_action(state, ...)
    return await _dispatch(decision["next_action"], db, case, message, ...)
else:
    # legacy if/elif tree
    ...

_dispatch is a small dict mapping PlannerAction values to the existing handler functions. No new handlers — see Steer §3.3 for the full table.


PART D — Tests

D1. tests/test_planner_state.py

Pure-function tests for build_planner_input:

  • empty case → all bools false, completeness 0.0
  • case with procedure + 2 docs + intake_complete + 0 blocking risks
  • case with 1 blocking risk → blocking_issues populated
  • case at journey-complete state → all flags true

D2. tests/test_planner_fallback.py

Tests for _fallback_decision covering every legacy branch:

  • no procedure → IDENTIFY_PROCEDURE
  • procedure + pending docs → PROCESS_UPLOADED_DOCUMENTS
  • procedure + intake done + no matching → ADVANCE_TO_MATCHING
  • ... etc, one test per ordered branch

D3. tests/test_planner_llm.py

Contract tests for plan_next_action. Mock the LLM to return:

  • valid JSON with a real action → returned as-is
  • valid JSON with an action not in the enum → fallback used
  • invalid JSON → fallback used
  • LLM exception → fallback used
  • missing required fields in JSON → fallback used

D4. tests/test_planner_shadow_event.py

Smoke test that _record_planner_shadow_event writes to the events table with the right shape. Use the existing in-memory SQLite fixture.


PART E — Verification

pytest tests/test_planner_state.py tests/test_planner_fallback.py \
       tests/test_planner_llm.py tests/test_planner_shadow_event.py -v
pytest -q

After deploy: in Langfuse, filter for event_type=planner.shadow_decision and confirm events are flowing for every chat turn. Check the agreement field — at this stage we expect ~50-70% agreement (the planner needs prompt tuning).


PART F — Rollout

  1. Week 1: Ship Parts A, B, C. Flag planner_v1_enabled = false. Shadow mode only.
  2. Week 2: Review shadow decisions in Langfuse. Tune the planner system prompt based on disagreements with the legacy logic that look correct (i.e. the planner was right, the legacy was wrong).
  3. Week 3: Cutover PR — planner authoritative, legacy as fallback. Flag at 5% rollout.
  4. Week 4: 25% → 100% if no regressions.
  5. Week 5+: Delete the legacy if/elif tree if cutover holds.

FINAL CHECKLIST

  • [ ] app/agents/planner.py created with plan_next_action + _fallback_decision
  • [ ] app/agents/planner_state.py created with build_planner_input
  • [ ] planner_v1_enabled flag added to feature_flags.yaml (default false)
  • [ ] case_orchestrator.handle_message writes a shadow decision event after the legacy decision
  • [ ] All 4 test files pass + full backend suite green
  • [ ] Langfuse shows planner traces in production within 1h of deploy
  • [ ] Shadow agreement reviewed before cutover PR