Multi-Tenancy Phase 0 — Implementation Spec¶
Status: Implemented in Session 34 ADR: 0018-multi-tenancy-platform-architecture Execution Roadmap: Wave 3 (items 3.1–3.4) Prerequisites: DAO Layer (Wave 0.1 ✓), PostgreSQL RLS (Wave 0.2 ✓) Estimated effort: 2–3 weeks
Scope¶
Phase 0 delivers the multi-tenancy foundation — the tables, RBAC engine, redaction engine, and state machine that all subsequent phases build on. No new UI. No new API routes for external consumers. Just the backend plumbing.
What's In¶
| # | Deliverable | Depends on |
|---|---|---|
| 3.1 | RBAC model — roles, permissions, user-role assignment | Tenant model (exists) |
| 3.2 | Case shares — cross-tenant case references with redaction policy | 3.1 |
| 3.3 | Redaction engine — policy executor that masks fields per actor type | 3.2 |
| 3.4 | Case lifecycle state machine — replace if/else with python-statemachine | Tenant model |
What's Out (later phases)¶
- Provider portal UI (Phase 1)
- Coordinator dashboard (Phase 1)
- Facilitator/MSO flows (Phase 2–3)
- Clerk multi-org wiring (Phase 1 — we define the mapping here but don't wire webhooks)
- Upstash Workflow agentic pipelines (Phase 1+)
3.1 — RBAC Model¶
New Files¶
| File | Purpose |
|---|---|
app/models/role.py |
Role + UserRole + Permission models |
app/repositories/role_repository.py |
RBAC data access |
app/services/rbac_service.py |
Permission checking, role assignment |
app/middleware/rbac_middleware.py |
Request-level permission enforcement |
alembic/versions/xxxx_add_rbac_tables.py |
Migration |
tests/test_rbac.py |
Unit + integration tests |
config/rbac_defaults.yaml |
Default role definitions + permissions |
Models¶
# app/models/role.py
class RoleCode(str, Enum):
"""7 platform actor types from ADR-0018."""
PATIENT = "patient"
FACILITATOR = "facilitator"
COORDINATOR = "coordinator"
MSO_DOCTOR = "mso_doctor"
PROVIDER_ADMIN = "provider_admin"
PLATFORM_ADMIN = "platform_admin"
SUPER_ADMIN = "super_admin"
class Permission(str, Enum):
"""Granular permissions. Grouped by domain."""
# Patient domain
CASE_READ_OWN = "case:read:own"
CASE_WRITE_OWN = "case:write:own"
CONSENT_MANAGE = "consent:manage"
# Coordinator domain
CASE_READ_ASSIGNED = "case:read:assigned"
CASE_WRITE_ASSIGNED = "case:write:assigned"
CASE_ASSIGN = "case:assign"
CASE_QUEUE_VIEW = "case:queue:view"
# Facilitator domain
CASE_READ_DELEGATED = "case:read:delegated"
CASE_WRITE_DELEGATED = "case:write:delegated"
# MSO domain
CONSULTATION_READ = "consultation:read"
CONSULTATION_WRITE = "consultation:write"
CLINICAL_DATA_READ = "clinical:read"
# Provider domain
CASE_READ_FORWARDED = "case:read:forwarded"
QUOTE_WRITE = "quote:write"
PROVIDER_MANAGE = "provider:manage"
STAFF_MANAGE = "staff:manage"
# Platform admin domain
TENANT_MANAGE = "tenant:manage"
RBAC_MANAGE = "rbac:manage"
FEATURE_FLAG_MANAGE = "feature_flag:manage"
USER_MANAGE = "user:manage"
# Super admin domain
TENANT_OVERRIDE = "tenant:override"
AUDIT_READ = "audit:read"
SYSTEM_MANAGE = "system:manage"
class Role(Base, UUIDPrimaryKeyMixin, TimestampMixin):
"""Platform role definition.
DESIGN DECISION: Roles are a global catalog (like providers, treatment_categories).
They are NOT tenant-scoped. System roles are seeded at migration time.
Custom per-tenant roles are deferred to a future phase.
Repository: BaseUnscopedRepository (same pattern as ProviderRepository).
"""
__tablename__ = "roles"
code: Mapped[str] = mapped_column(String(30), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(100), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
# Which tenant type this role belongs to (None = shared tenant)
tenant_type: Mapped[str | None] = mapped_column(String(30))
# "provider" | None (shared)
permissions: Mapped[list] = mapped_column(FlexibleJSON, nullable=False, default=list)
# ["case:read:own", "consent:manage", ...]
is_system: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
# System roles can't be deleted. Custom roles (future) can.
class UserRole(Base, UUIDPrimaryKeyMixin, TimestampMixin):
"""Assigns a user to a role within a tenant scope."""
__tablename__ = "user_roles"
user_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
# Clerk user ID
role_id: Mapped[str] = mapped_column(
String(36), ForeignKey("roles.id"), nullable=False, index=True
)
tenant_id: Mapped[str] = mapped_column(
String(36), ForeignKey("tenants.id"), nullable=False, index=True
)
# For shared-tenant roles: the platform tenant ID
# For provider roles: the provider's tenant ID
granted_by: Mapped[str] = mapped_column(String(255), nullable=False)
# Clerk user ID of who granted the role
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
__table_args__ = (
UniqueConstraint("user_id", "role_id", "tenant_id", name="uq_user_role_tenant"),
)
Default Role→Permission Mapping (config/rbac_defaults.yaml)¶
roles:
patient:
name: "Patient"
permissions:
- case:read:own
- case:write:own
- consent:manage
tenant_type: null # shared tenant
facilitator:
name: "Facilitator"
permissions:
- case:read:delegated
- case:write:delegated
tenant_type: null
coordinator:
name: "Coordinator"
permissions:
- case:read:assigned
- case:write:assigned
- case:assign
- case:queue:view
tenant_type: null
mso_doctor:
name: "MSO Doctor"
permissions:
- consultation:read
- consultation:write
- clinical:read
tenant_type: null
provider_admin:
name: "Provider Admin"
permissions:
- case:read:forwarded
- quote:write
- provider:manage
- staff:manage
tenant_type: provider
platform_admin:
name: "Platform Admin"
permissions:
- tenant:manage
- rbac:manage
- feature_flag:manage
- user:manage
- case:queue:view
- audit:read
tenant_type: null
super_admin:
name: "Super Admin"
permissions:
- tenant:override
- audit:read
- system:manage
# Inherits all other permissions implicitly
tenant_type: null
RBAC Service¶
# app/services/rbac_service.py
class RBACService:
"""Check permissions, assign/revoke roles."""
async def has_permission(
self, db: AsyncSession, user_id: str, tenant_id: str, permission: str
) -> bool:
"""Check if user has a specific permission in the given tenant."""
async def get_user_roles(
self, db: AsyncSession, user_id: str, tenant_id: str
) -> list[Role]:
"""Get all active roles for a user in a tenant."""
async def assign_role(
self, db: AsyncSession, user_id: str, role_code: str,
tenant_id: str, granted_by: str
) -> UserRole:
"""Assign a role. Idempotent — returns existing if duplicate."""
async def revoke_role(
self, db: AsyncSession, user_id: str, role_code: str, tenant_id: str
) -> None:
"""Soft-revoke (is_active=False). Audit logged."""
async def check_or_raise(
self, db: AsyncSession, user_id: str, tenant_id: str, permission: str
) -> None:
"""Raise PermissionDenied if user lacks permission."""
RBAC Middleware¶
# app/middleware/rbac_middleware.py
class RBACMiddleware(BaseHTTPMiddleware):
"""Extracts user identity from Clerk JWT, resolves roles.
Sets request.state.user_id, request.state.roles, request.state.permissions.
Does NOT enforce — individual routes use @require_permission() decorator.
"""
Route Decorator¶
# Usage in routers:
@router.get("/cases/{case_id}")
@require_permission("case:read:own")
async def get_case(case_id: str, request: Request, db: AsyncSession = Depends(get_db)):
...
Migration¶
- Create
rolestable with UUID PK, code (unique), name, permissions JSONB, tenant_type, is_system - Create
user_rolestable with UUID PK, user_id, role_id FK, tenant_id FK, granted_by, is_active - Seed 7 system roles from
config/rbac_defaults.yaml - Add RLS policy on
user_roles(tenant-scoped)
Tests¶
- Unit: permission check logic, role assignment/revocation, duplicate handling
- Integration: RBAC middleware sets request.state correctly
- Isolation: user in tenant A cannot access tenant B resources
- Super admin: can override tenant scope
3.2 — Case Shares¶
New Files¶
| File | Purpose |
|---|---|
app/models/case_share.py |
CaseShare model |
app/repositories/case_share_repository.py |
Data access |
app/services/case_share_service.py |
Create/revoke shares, consent tracking |
alembic/versions/xxxx_add_case_shares.py |
Migration |
tests/test_case_shares.py |
Tests |
Model¶
# app/models/case_share.py
class SharingMode(str, Enum):
COPY = "copy" # One-way snapshot (providers)
REFERENCE = "reference" # Live read access (coordinators, MSO, facilitators)
class RedactionPolicy(str, Enum):
PROVIDER_SNAPSHOT = "provider_snapshot"
MSO_CLINICAL = "mso_clinical"
COORDINATOR_FULL = "coordinator_full"
FACILITATOR_CONSENT = "facilitator_consent"
class CaseShare(Base, UUIDPrimaryKeyMixin, TimestampMixin):
"""Cross-tenant case access with redaction policy.
DESIGN DECISION: No TenantScopedMixin. This is the first cross-tenant table.
Instead of a single tenant_id, it has source_tenant_id and target_tenant_id.
RLS uses dual-scope: WHERE source_tenant_id = app.tenant_id OR target_tenant_id = app.tenant_id.
Repository: CaseShareRepository (custom, not BaseRepository — handles dual scope).
"""
__tablename__ = "case_shares"
case_id: Mapped[str] = mapped_column(
String(36), ForeignKey("cases.id"), nullable=False, index=True
)
source_tenant_id: Mapped[str] = mapped_column(
String(36), ForeignKey("tenants.id"), nullable=False, index=True
)
target_tenant_id: Mapped[str] = mapped_column(
String(36), ForeignKey("tenants.id"), nullable=False, index=True
)
# Actor receiving access
actor_type: Mapped[str] = mapped_column(String(30), nullable=False)
# RoleCode value: provider_admin | mso_doctor | coordinator | facilitator
actor_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
# Clerk user ID or provider tenant ID
sharing_mode: Mapped[str] = mapped_column(String(20), nullable=False)
# "copy" or "reference"
redaction_policy: Mapped[str] = mapped_column(String(50), nullable=False)
# Which fields to mask
# Consent tracking (facilitators) — links to existing ConsentRecord table
# Uses purpose_code="facilitator_data_sharing" in consent_records
consent_record_id: Mapped[str | None] = mapped_column(
String(36), ForeignKey("consent_records.id"), nullable=True
)
# Denormalized for query performance — mirrors consent_record.status
consent_granted: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Scope limiting (MSO consultations)
context_type: Mapped[str | None] = mapped_column(String(30))
# "consultation" | "forwarding" | None
context_id: Mapped[str | None] = mapped_column(String(36))
# Provider-side status tracking (only for actor_type="provider_admin")
provider_status: Mapped[str | None] = mapped_column(String(30))
# ProviderCaseStatus: received | reviewing | info_requested | quoted | rejected | selected | not_selected
# Lifecycle
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# Provider snapshots expire after 30 days
created_by: Mapped[str] = mapped_column(String(255), nullable=False)
# Who initiated the share
Service¶
# app/services/case_share_service.py
class CaseShareService:
async def create_share(
self, db: AsyncSession, *,
case_id: str, source_tenant_id: str, target_tenant_id: str,
actor_type: str, actor_id: str,
sharing_mode: SharingMode, redaction_policy: RedactionPolicy,
created_by: str, context_type: str | None = None,
context_id: str | None = None, expires_days: int | None = 30,
) -> CaseShare:
"""Create a case share. Audit logged."""
async def revoke_share(
self, db: AsyncSession, share_id: str, revoked_by: str
) -> None:
"""Soft-revoke a share. Audit logged."""
async def grant_facilitator_consent(
self, db: AsyncSession, share_id: str, patient_id: str
) -> CaseShare:
"""Patient grants facilitator access. Audit logged."""
async def revoke_facilitator_consent(
self, db: AsyncSession, share_id: str, revoked_by: str
) -> None:
"""Patient or coordinator revokes facilitator access."""
async def get_shares_for_case(
self, db: AsyncSession, case_id: str, tenant_id: str
) -> list[CaseShare]:
"""All active shares for a case (source tenant only)."""
async def get_shares_for_actor(
self, db: AsyncSession, actor_id: str, target_tenant_id: str
) -> list[CaseShare]:
"""All cases shared with an actor."""
Migration¶
- Create
case_sharestable - Add dual-scope RLS policy:
USING (source_tenant_id = current_setting('app.tenant_id', true) OR target_tenant_id = current_setting('app.tenant_id', true)) - Add indexes on
case_id,source_tenant_id,target_tenant_id,actor_id - Add composite unique constraint:
(case_id, actor_id, context_id)to prevent duplicate shares - Add partial unique index for null context:
CREATE UNIQUE INDEX uq_case_share_no_context ON case_shares (case_id, actor_id) WHERE context_id IS NULL
3.3 — Redaction Engine¶
New Files¶
| File | Purpose |
|---|---|
app/services/redaction_engine.py |
Core redaction logic |
config/redaction_policies.yaml |
Field-level redaction rules per policy |
tests/test_redaction_engine.py |
Tests |
Redaction Policy Config¶
# config/redaction_policies.yaml
policies:
provider_snapshot:
description: "Pseudonymized copy for healthcare providers"
mode: copy
patient_identity:
name: pseudonymize # CRW-2026-XXXXX
email: remove
phone: remove
address: remove
passport: remove
date_of_birth: age_only # Calculate age, drop DOB
financial:
budget: range_only # "$5,000-$10,000" instead of exact
other_quotes: remove
clinical:
procedure: keep
comorbidities: keep
medical_history: keep
test_results: keep
fhir_resources: keep
conversation: remove
match_results: remove # Don't show other providers
mso_clinical:
description: "Clinical-only view for MSO second opinion"
mode: reference
patient_identity:
name: remove
email: remove
phone: remove
address: remove
passport: remove
date_of_birth: age_only
financial: remove_all
clinical:
procedure: keep
comorbidities: keep
medical_history: keep
test_results: keep
fhir_resources: keep
conversation: remove
match_results: remove
scope: consultation # Only data relevant to consultation
coordinator_full:
description: "Full access for Curaway coordinators"
mode: reference
# No redaction — platform employment agreement covers data handling
patient_identity: keep_all
financial: keep_all
clinical: keep_all
conversation: keep_all
match_results: keep_all
facilitator_consent:
description: "Consent-gated access for external facilitators"
mode: reference
# Access controlled by patient consent grant/revoke
# When consent is active: same as coordinator (full)
# When consent is revoked: no access (share deactivated)
patient_identity: keep_all
financial: keep_all
clinical: keep_all
conversation: keep_all
match_results: keep_all
Engine¶
# app/services/redaction_engine.py
@dataclass
class RedactedCaseData:
"""Typed return from redaction engine. Guarantees no PII leaks in typed fields."""
case_number: str
patient_display_name: str # Pseudonymized or real, per policy
age: int | None # Age only (no DOB), or None if removed
procedure: dict | None
comorbidities: list[dict]
medical_history: dict | None
test_results: list[dict]
fhir_resources: list[dict]
budget_display: str | None # Range string or None if removed
risk_factors: list[dict]
clinical_summary: str | None
# Fields explicitly excluded by redaction are not present (not set to None)
class RedactionEngine:
"""Applies redaction policies to case data before cross-tenant sharing.
Singleton — instantiate once via get_redaction_engine(). Policies loaded
from YAML at startup, not per-request.
"""
def __init__(self):
self._policies = load_yaml("config/redaction_policies.yaml")
def redact_case(
self, case_data: dict, policy: RedactionPolicy,
case_number: str | None = None,
) -> RedactedCaseData:
"""Apply a redaction policy to case data.
Args:
case_data: Full case dict (from case + patient + FHIR)
policy: Which redaction policy to apply
case_number: For pseudonymization (provider_snapshot)
Returns:
Typed RedactedCaseData. Original is never mutated.
"""
def redact_patient(self, patient_data: dict, policy: RedactionPolicy) -> dict:
"""Redact patient PII fields."""
def redact_fhir(self, fhir_bundle: dict, policy: RedactionPolicy) -> dict:
"""Redact FHIR resources (remove patient identifiers from Bundle)."""
# Singleton factory
_engine: RedactionEngine | None = None
def get_redaction_engine() -> RedactionEngine:
global _engine
if _engine is None:
_engine = RedactionEngine()
return _engine
def pseudonymize_name(self, case_number: str) -> str:
"""Generate pseudonym from case number: 'Patient CRW-2026-00034'."""
return f"Patient {case_number}"
def age_from_dob(self, date_of_birth: str) -> int:
"""Calculate age from DOB. Used for age_only redaction."""
def budget_to_range(self, budget_cents: int) -> str:
"""Convert exact budget to range: 500000 → '$4,000–$6,000'."""
Tests¶
- Unit: each redaction policy correctly masks/keeps fields
- Provider snapshot: patient name → pseudonym, no email/phone, age only, budget range
- MSO clinical: no PII, no financial, clinical data intact
- Coordinator: nothing redacted
- Facilitator: full access when consent active, no access when revoked
- FHIR bundle: patient identifier resources stripped in provider/MSO policies
- Edge cases: missing fields, None values, empty dicts
3.4 — Case Lifecycle State Machine¶
New Files¶
| File | Purpose |
|---|---|
app/state_machine/case_states.py |
State + transition definitions |
app/state_machine/case_machine.py |
python-statemachine CaseStateMachine |
app/services/case_lifecycle.py |
Service wrapping state machine + persistence |
tests/test_case_state_machine.py |
State machine tests |
Dependencies¶
Add to requirements.txt.
State Definitions¶
# app/state_machine/case_states.py
class CaseStatus(str, Enum):
"""All case lifecycle states from ADR-0018."""
# Patient intake flow
INTAKE = "intake"
PROCEDURE_IDENTIFIED = "procedure_identified"
RECORDS_COLLECTED = "records_collected"
INTAKE_COMPLETE = "intake_complete"
# Matching flow
MATCHING = "matching"
PROVIDERS_SELECTED = "providers_selected"
CONSENT_GIVEN = "consent_given"
# Platform review flow
RISK_REVIEW_PENDING = "risk_review_pending"
RISK_CLEARED = "risk_cleared"
PROVIDERS_NOTIFIED = "providers_notified"
QUOTING = "quoting"
# Provider flow (per provider — tracked in case_shares, not case status)
# Provider-side states: received, reviewing, info_requested, quoted, rejected
# Patient decision flow
QUOTES_POOLED = "quotes_pooled"
PATIENT_REVIEWING = "patient_reviewing"
PROVIDER_SELECTED = "provider_selected"
MSO_OFFERED = "mso_offered"
MSO_COMPLETE = "mso_complete"
PAYMENT_LOCKED = "payment_locked"
# Coordination flow
COORDINATOR_ASSIGNED = "coordinator_assigned"
PRE_OP = "pre_op"
TRAVEL_BOOKED = "travel_booked"
ADMITTED = "admitted"
PROCEDURE_COMPLETE = "procedure_complete"
POST_OP = "post_op"
FOLLOW_UP = "follow_up"
CASE_COMPLETE = "case_complete"
# Terminal
CLOSED = "closed"
CANCELLED = "cancelled"
class ProviderCaseStatus(str, Enum):
"""Provider-side case tracking (per case_share)."""
RECEIVED = "received"
REVIEWING = "reviewing"
INFO_REQUESTED = "info_requested"
QUOTED = "quoted"
REJECTED = "rejected"
SELECTED = "selected" # Patient chose this provider
NOT_SELECTED = "not_selected"
State Machine¶
# app/state_machine/case_machine.py
from statemachine import StateMachine, State
class CaseStateMachine(StateMachine):
"""Case lifecycle state machine.
Replaces the forward-only status list in case.py with explicit,
validated transitions. Guards enforce business rules.
"""
# ── Patient Intake ──
intake = State(initial=True)
procedure_identified = State()
records_collected = State()
intake_complete = State()
# ── Matching ──
matching = State()
providers_selected = State()
consent_given = State()
# ── Platform Review ──
risk_review_pending = State()
risk_cleared = State()
providers_notified = State()
quoting = State()
# ── Patient Decision ──
quotes_pooled = State()
patient_reviewing = State()
provider_selected = State()
mso_offered = State()
mso_complete = State()
payment_locked = State()
# ── Coordination ──
coordinator_assigned = State()
pre_op = State()
travel_booked = State()
admitted = State()
procedure_complete = State()
post_op = State()
follow_up = State()
case_complete = State(final=True)
# ── Terminal ──
closed = State(final=True)
cancelled = State(final=True)
# ── Transitions ──
identify_procedure = intake.to(procedure_identified)
collect_records = procedure_identified.to(records_collected)
complete_intake = records_collected.to(intake_complete)
start_matching = intake_complete.to(matching)
select_providers = matching.to(providers_selected)
give_consent = providers_selected.to(consent_given)
request_risk_review = consent_given.to(risk_review_pending)
clear_risk = risk_review_pending.to(risk_cleared)
notify_providers = risk_cleared.to(providers_notified)
start_quoting = providers_notified.to(quoting)
pool_quotes = quoting.to(quotes_pooled)
begin_patient_review = quotes_pooled.to(patient_reviewing)
select_provider = patient_reviewing.to(provider_selected)
offer_mso = provider_selected.to(mso_offered)
complete_mso = mso_offered.to(mso_complete)
skip_mso = provider_selected.to(payment_locked) # Skip MSO path
lock_payment = mso_complete.to(payment_locked)
assign_coordinator = payment_locked.to(coordinator_assigned)
begin_pre_op = coordinator_assigned.to(pre_op)
book_travel = pre_op.to(travel_booked)
admit_patient = travel_booked.to(admitted)
complete_procedure = admitted.to(procedure_complete)
begin_post_op = procedure_complete.to(post_op)
begin_follow_up = post_op.to(follow_up)
complete_case = follow_up.to(case_complete)
# Any state → closed/cancelled
close_case = (
intake.to(closed) | procedure_identified.to(closed) |
records_collected.to(closed) | intake_complete.to(closed) |
matching.to(closed) | providers_selected.to(closed) |
consent_given.to(closed) | risk_review_pending.to(closed) |
risk_cleared.to(closed) | providers_notified.to(closed) |
quoting.to(closed) | quotes_pooled.to(closed) |
patient_reviewing.to(closed) | provider_selected.to(closed) |
mso_offered.to(closed) | mso_complete.to(closed) |
payment_locked.to(closed) | coordinator_assigned.to(closed) |
pre_op.to(closed) | travel_booked.to(closed) |
admitted.to(closed) | procedure_complete.to(closed) |
post_op.to(closed) | follow_up.to(closed)
)
cancel_case = (
intake.to(cancelled) | procedure_identified.to(cancelled) |
records_collected.to(cancelled) | intake_complete.to(cancelled) |
matching.to(cancelled) | providers_selected.to(cancelled) |
consent_given.to(cancelled) | risk_review_pending.to(cancelled) |
risk_cleared.to(cancelled) | providers_notified.to(cancelled) |
quoting.to(cancelled) | quotes_pooled.to(cancelled) |
patient_reviewing.to(cancelled) | provider_selected.to(cancelled) |
mso_offered.to(cancelled) | mso_complete.to(cancelled) |
payment_locked.to(cancelled) | coordinator_assigned.to(cancelled) |
pre_op.to(cancelled) | travel_booked.to(cancelled)
# After admitted: medical decision, not patient cancellation
)
# ── Callbacks for audit logging ──
def on_enter_state(self, state, event):
"""Log every state transition for audit trail."""
# Will be wired to AuditLog in case_lifecycle service
pass
Lifecycle Service¶
# app/services/case_lifecycle.py
class CaseLifecycleService:
"""Wraps the state machine with persistence and audit logging.
This is the ONLY way to change case status. No direct status writes.
"""
async def transition(
self, db: AsyncSession, case_id: str, tenant_id: str,
event: str, actor_id: str, actor_type: str,
metadata: dict | None = None,
) -> Case:
"""Execute a state transition.
1. Load case from DB
2. Initialize state machine at current state
3. Fire event (raises InvalidTransition if not allowed)
4. Persist new status + workflow_state
5. Create AuditLog entry
6. Return updated case
"""
async def get_available_transitions(
self, db: AsyncSession, case_id: str, tenant_id: str
) -> list[str]:
"""What events can fire from current state?"""
async def get_case_timeline(
self, db: AsyncSession, case_id: str, tenant_id: str
) -> list[dict]:
"""Ordered list of state transitions from audit log."""
Status Migration Mapping¶
When enabling the state machine on a database with existing cases, the old statuses must map to the new state machine states:
Old status (VALID_CASE_STATUSES) |
New state (CaseStatus) |
Notes |
|---|---|---|
intake |
intake |
Direct match |
documents_pending |
records_collected |
Renamed to be document-type-agnostic |
ehr_complete |
intake_complete |
EHR construction is part of intake completion |
matching |
matching |
Direct match |
matched |
providers_selected |
Renamed — "matched" was ambiguous |
provider_selected |
provider_selected |
Direct match |
consent_given |
consent_given |
Direct match |
forwarded |
providers_notified |
Renamed — forwarding is one step of notification |
closed |
closed |
Direct match |
The migration does NOT alter existing data. When case_state_machine_v2=true, the
CaseLifecycleService maps old statuses to new states at load time:
_OLD_TO_NEW = {
"documents_pending": "records_collected",
"ehr_complete": "intake_complete",
"matched": "providers_selected",
"forwarded": "providers_notified",
}
Backward Compatibility¶
The existing VALID_CASE_STATUSES list and is_valid_transition() in app/models/case.py stay during Phase 0. The state machine runs in parallel behind a feature flag:
When case_state_machine_v2=true:
- CaseLifecycleService.transition() validates via state machine
- Old case_orchestrator.py calls CaseLifecycleService instead of direct status writes
When case_state_machine_v2=false:
- Current behavior unchanged
- State machine code exists but is not invoked
This allows gradual rollout and rollback.
Migration Strategy¶
All migrations run in a single Alembic revision chain:
xxxx_01_add_rbac_tables.py → roles, user_roles + RLS + seed
xxxx_02_add_case_shares.py → case_shares + RLS + indexes
xxxx_03_update_case_statuses.py → expand VALID_CASE_STATUSES (no data migration needed — new states are only used when state machine is enabled)
Rollback Plan¶
All tables are additive — rollback = drop tables. No existing columns are modified. Feature flag controls activation.
Implementation Order¶
Week 1:
Day 1-2: RBAC models + migration + repository + seed
Day 3-4: RBAC service + middleware + decorator + tests
Week 2:
Day 1-2: Case shares model + migration + repository + service
Day 3: Redaction engine + policy config + tests
Week 3:
Day 1-2: State machine + lifecycle service + feature flag
Day 3: Integration tests + case_orchestrator wiring
Test Plan¶
| Area | Test Type | Count (est.) |
|---|---|---|
| RBAC permission logic | Unit | 15–20 |
| RBAC middleware | Integration | 5–8 |
| Role assignment/revocation | Integration | 8–10 |
| Case share CRUD | Integration | 10–12 |
| Consent grant/revoke | Integration | 5–8 |
| Redaction engine | Unit | 20–25 |
| State machine transitions | Unit | 25–30 |
| Invalid transitions | Unit | 10–15 |
| Lifecycle service | Integration | 10–12 |
| Cross-tenant isolation | Integration | 8–10 |
| Total | ~120–150 |
Feature Flags¶
| Flag | Default | Purpose |
|---|---|---|
rbac_enforcement |
false |
Enable RBAC permission checks on routes |
case_state_machine_v2 |
false |
Use python-statemachine instead of status list |
redaction_engine |
false |
Enable redaction on case share creation |
All three are independent — can be enabled/disabled separately.
Open Questions¶
-
Clerk multi-org wiring: Do we create provider orgs in Clerk during Phase 0 or defer to Phase 1? Recommendation: Defer. Phase 0 creates the RBAC tables; Phase 1 wires Clerk webhooks.
-
Provider-side state tracking: Should
ProviderCaseStatuslive incase_shares.provider_statusor in a separateprovider_case_responsestable? Recommendation: Column oncase_shares— simpler, fewer joins. -
Redaction engine sync vs async: Should redaction happen at share-creation time (sync) or via Upstash Workflow (async)? Recommendation: Sync for Phase 0 (simpler). Async agent in Phase 1 for batch forwarding.