Session N: Synchronous Chat Extractor — Feature Spec¶
Context¶
You are working on the Curaway backend (curaway_src repo). This is the
implementation spec for Layer 3 of the conversation flow remediation
plan. Read the steer first:
docs/specs/ai-steer/sync-chat-extractor-steer.md
Also read CLAUDE.md and Layer 1's docs.
Pre-condition: Layer 1 (gates_v2, PR #70) deployed. Layer 3 is
independent of Layer 2 — can ship before, after, or in parallel with
the planner refactor.
Branch¶
PART A — Feature flag¶
A1. Add chat_extractor_sync to config/feature_flags.yaml¶
chat_extractor_sync:
default: true
description: "Layer 3 — run chat extractor synchronously on the routing path so the orchestrator sees fresh meds/allergies/demographics state. Disable to revert to Session 31's deferred extraction."
PART B — Chat extractor invocation¶
B1. Find the current invocation site¶
The chat extractor is currently invoked from one of these places (verify
with grep -n "chat_extractor\|extract_from_chat" app/):
app/routers/cases.py(chat endpoint)app/agents/case_orchestrator.py(somewhere near the top ofhandle_message)- A
BackgroundTasks.add_taskorasyncio.create_taskcall
The extractor's output writes to case.extra_metadata (medications,
allergies, demographics, preferences) and also flips
workflow_state.medications_asked when applicable.
B2. Move the call onto the critical path¶
Before (Session 31, deferred):
# At the end of handle_message, after the response is built:
asyncio.create_task(run_chat_extractor(db, case, message))
After (Layer 3, sync):
# At the very top of handle_message, after auth + state load,
# before the routing decision:
from app.services.feature_flags import is_feature_enabled
if is_feature_enabled("chat_extractor_sync"):
try:
await run_chat_extractor(db, case, message, langfuse_handler=langfuse_handler)
# Re-fetch case so subsequent code sees the extractor's writes.
# If the extractor mutates the in-memory `case` object directly,
# this is a no-op. If it goes through case_service.update_case_metadata,
# we need a refresh:
case = await case_service.get_case(db, case.id, case.tenant_id)
except Exception as exc:
logger.warning("chat_extractor sync run failed: %s", exc)
# Fall through — better to route on stale state than crash
B3. Run in parallel with the input classifier¶
If the input classifier and the chat extractor are both LLM calls and
have no dependency, wrap them with asyncio.gather:
classify_task = asyncio.create_task(classify_input(message))
extract_task = asyncio.create_task(run_chat_extractor(db, case, message, ...))
classification, _ = await asyncio.gather(classify_task, extract_task)
This keeps the marginal latency near zero.
B4. Keep the deferred path as a fallback¶
When chat_extractor_sync is false, the existing deferred path runs
unchanged. Both code paths must coexist behind the flag for the rollout
window.
PART C — Verify the extractor's writes are picked up¶
The extractor populates case.extra_metadata:
medications: list[str]medications_confirmed_none: boolallergies: list[str]allergies_confirmed_none: boolpatient_age,patient_country, etc.
The Layer 1 _intake_complete_v2 helper (case_orchestrator.py) reads
exactly these fields. Verify by:
- Local test: send a message containing "I take metformin and lisinopril, no allergies" against a case in the records-first phase.
- Inspect
case.extra_metadataafter the chat endpoint returns. - Confirm
medications=["metformin", "lisinopril"]andallergies_confirmed_none=true. - Send a follow-up message ("anything else?") and verify the agent does NOT re-ask for medications or allergies — the routing should advance the workflow.
PART D — Tests¶
D1. tests/test_chat_extractor_sync.py¶
Mock the chat extractor and assert it runs before the orchestrator routes. Use the in-memory SQLite fixture.
Cases:
- test_extractor_runs_before_routing — patch
case_orchestrator.handle_message to record the order of calls;
assert extractor is called before the first state read.
- test_extractor_writes_visible_to_routing — extractor writes
medications=["metformin"]; routing reads case.extra_metadata and
sees the new value.
- test_flag_off_uses_deferred — chat_extractor_sync=false,
assert the extractor is queued via asyncio.create_task and not
awaited synchronously.
- test_extractor_failure_does_not_block_routing — patch the
extractor to raise; assert the chat turn still completes and the
warning is logged.
- test_parallel_with_classifier — both tasks scheduled via
asyncio.gather; assert total time ≈ max(classifier_time,
extractor_time), not the sum.
D2. End-to-end smoke¶
Add a single end-to-end test that hits /cases/{id}/chat twice:
- First message: "I need TKR. I take metformin 500mg, no allergies."
- Second message: "anything else you need?"
Assertions:
- After turn 1, case.extra_metadata["medications"] == ["metformin"]
AND allergies_confirmed_none is True
- After turn 2, the agent's reply does NOT contain "medications" or
"allergies" (it has them already)
- After turn 2, gates_v2.intake_complete event fires (assuming
procedure + 1 doc OR 1 answer + age + country are also set)
PART E — Measuring the re-ask drop¶
To validate the success criterion ("re-ask rate drops by >=50%"), add a small SQL query you can run against the events table after deploy:
-- Count "agent re-asked for X" events in a 7-day window
SELECT
date_trunc('day', created_at) as day,
count(*) filter (where payload->>'reasked_field' = 'medications') as reask_meds,
count(*) filter (where payload->>'reasked_field' = 'allergies') as reask_allergies
FROM events
WHERE event_type = 'agent.reask'
AND created_at > now() - interval '7 days'
GROUP BY 1 ORDER BY 1;
If we don't already log agent.reask events, this layer should add a
lightweight detector: when the agent's reply contains substrings like
"what medications" / "any allergies" AND the case already has those
fields populated, write a agent.reask event with the field name. This
is the only way to measure the success criterion quantitatively.
PART F — Verification¶
Then deploy behind the flag and watch:
- p99 latency on the chat endpoint (target <=9.6s)
- agent.reask event count (target -50% in week 1)
- gates_v2.intake_complete event count (target +100% in week 1)
FINAL CHECKLIST¶
- [ ]
chat_extractor_syncflag added tofeature_flags.yaml(default true) - [ ] Chat extractor invocation moved to top of
handle_message, gated by the flag - [ ] Extractor + classifier run in parallel via
asyncio.gather - [ ] Case is re-fetched after the extractor if the writes don't mutate the in-memory object directly
- [ ] Deferred fallback path preserved for
chat_extractor_sync=false - [ ] 5 unit tests + 1 e2e test pass + full backend suite green
- [ ]
agent.reaskevent logging added (if not already present) - [ ] Langfuse query saved for the success criterion measurement