Skip to content

Session N: Synchronous Chat Extractor — Feature Spec

Context

You are working on the Curaway backend (curaway_src repo). This is the implementation spec for Layer 3 of the conversation flow remediation plan. Read the steer first: docs/specs/ai-steer/sync-chat-extractor-steer.md

Also read CLAUDE.md and Layer 1's docs.

Pre-condition: Layer 1 (gates_v2, PR #70) deployed. Layer 3 is independent of Layer 2 — can ship before, after, or in parallel with the planner refactor.

Branch

git checkout main && git pull origin main
git checkout -b fix/sync-chat-extractor

PART A — Feature flag

A1. Add chat_extractor_sync to config/feature_flags.yaml

chat_extractor_sync:
  default: true
  description: "Layer 3  run chat extractor synchronously on the routing path so the orchestrator sees fresh meds/allergies/demographics state. Disable to revert to Session 31's deferred extraction."

PART B — Chat extractor invocation

B1. Find the current invocation site

The chat extractor is currently invoked from one of these places (verify with grep -n "chat_extractor\|extract_from_chat" app/):

  • app/routers/cases.py (chat endpoint)
  • app/agents/case_orchestrator.py (somewhere near the top of handle_message)
  • A BackgroundTasks.add_task or asyncio.create_task call

The extractor's output writes to case.extra_metadata (medications, allergies, demographics, preferences) and also flips workflow_state.medications_asked when applicable.

B2. Move the call onto the critical path

Before (Session 31, deferred):

# At the end of handle_message, after the response is built:
asyncio.create_task(run_chat_extractor(db, case, message))

After (Layer 3, sync):

# At the very top of handle_message, after auth + state load,
# before the routing decision:
from app.services.feature_flags import is_feature_enabled
if is_feature_enabled("chat_extractor_sync"):
    try:
        await run_chat_extractor(db, case, message, langfuse_handler=langfuse_handler)
        # Re-fetch case so subsequent code sees the extractor's writes.
        # If the extractor mutates the in-memory `case` object directly,
        # this is a no-op. If it goes through case_service.update_case_metadata,
        # we need a refresh:
        case = await case_service.get_case(db, case.id, case.tenant_id)
    except Exception as exc:
        logger.warning("chat_extractor sync run failed: %s", exc)
        # Fall through — better to route on stale state than crash

B3. Run in parallel with the input classifier

If the input classifier and the chat extractor are both LLM calls and have no dependency, wrap them with asyncio.gather:

classify_task = asyncio.create_task(classify_input(message))
extract_task = asyncio.create_task(run_chat_extractor(db, case, message, ...))
classification, _ = await asyncio.gather(classify_task, extract_task)

This keeps the marginal latency near zero.

B4. Keep the deferred path as a fallback

When chat_extractor_sync is false, the existing deferred path runs unchanged. Both code paths must coexist behind the flag for the rollout window.


PART C — Verify the extractor's writes are picked up

The extractor populates case.extra_metadata:

  • medications: list[str]
  • medications_confirmed_none: bool
  • allergies: list[str]
  • allergies_confirmed_none: bool
  • patient_age, patient_country, etc.

The Layer 1 _intake_complete_v2 helper (case_orchestrator.py) reads exactly these fields. Verify by:

  1. Local test: send a message containing "I take metformin and lisinopril, no allergies" against a case in the records-first phase.
  2. Inspect case.extra_metadata after the chat endpoint returns.
  3. Confirm medications=["metformin", "lisinopril"] and allergies_confirmed_none=true.
  4. Send a follow-up message ("anything else?") and verify the agent does NOT re-ask for medications or allergies — the routing should advance the workflow.

PART D — Tests

D1. tests/test_chat_extractor_sync.py

Mock the chat extractor and assert it runs before the orchestrator routes. Use the in-memory SQLite fixture.

Cases: - test_extractor_runs_before_routing — patch case_orchestrator.handle_message to record the order of calls; assert extractor is called before the first state read. - test_extractor_writes_visible_to_routing — extractor writes medications=["metformin"]; routing reads case.extra_metadata and sees the new value. - test_flag_off_uses_deferredchat_extractor_sync=false, assert the extractor is queued via asyncio.create_task and not awaited synchronously. - test_extractor_failure_does_not_block_routing — patch the extractor to raise; assert the chat turn still completes and the warning is logged. - test_parallel_with_classifier — both tasks scheduled via asyncio.gather; assert total time ≈ max(classifier_time, extractor_time), not the sum.

D2. End-to-end smoke

Add a single end-to-end test that hits /cases/{id}/chat twice:

  1. First message: "I need TKR. I take metformin 500mg, no allergies."
  2. Second message: "anything else you need?"

Assertions: - After turn 1, case.extra_metadata["medications"] == ["metformin"] AND allergies_confirmed_none is True - After turn 2, the agent's reply does NOT contain "medications" or "allergies" (it has them already) - After turn 2, gates_v2.intake_complete event fires (assuming procedure + 1 doc OR 1 answer + age + country are also set)


PART E — Measuring the re-ask drop

To validate the success criterion ("re-ask rate drops by >=50%"), add a small SQL query you can run against the events table after deploy:

-- Count "agent re-asked for X" events in a 7-day window
SELECT
  date_trunc('day', created_at) as day,
  count(*) filter (where payload->>'reasked_field' = 'medications') as reask_meds,
  count(*) filter (where payload->>'reasked_field' = 'allergies') as reask_allergies
FROM events
WHERE event_type = 'agent.reask'
  AND created_at > now() - interval '7 days'
GROUP BY 1 ORDER BY 1;

If we don't already log agent.reask events, this layer should add a lightweight detector: when the agent's reply contains substrings like "what medications" / "any allergies" AND the case already has those fields populated, write a agent.reask event with the field name. This is the only way to measure the success criterion quantitatively.


PART F — Verification

pytest tests/test_chat_extractor_sync.py -v
pytest -q

Then deploy behind the flag and watch: - p99 latency on the chat endpoint (target <=9.6s) - agent.reask event count (target -50% in week 1) - gates_v2.intake_complete event count (target +100% in week 1)


FINAL CHECKLIST

  • [ ] chat_extractor_sync flag added to feature_flags.yaml (default true)
  • [ ] Chat extractor invocation moved to top of handle_message, gated by the flag
  • [ ] Extractor + classifier run in parallel via asyncio.gather
  • [ ] Case is re-fetched after the extractor if the writes don't mutate the in-memory object directly
  • [ ] Deferred fallback path preserved for chat_extractor_sync=false
  • [ ] 5 unit tests + 1 e2e test pass + full backend suite green
  • [ ] agent.reask event logging added (if not already present)
  • [ ] Langfuse query saved for the success criterion measurement