Skip to content

Multi-Tenancy Phase 0 — Implementation Spec

Status: Implemented in Session 34 ADR: 0018-multi-tenancy-platform-architecture Execution Roadmap: Wave 3 (items 3.1–3.4) Prerequisites: DAO Layer (Wave 0.1 ✓), PostgreSQL RLS (Wave 0.2 ✓) Estimated effort: 2–3 weeks


Scope

Phase 0 delivers the multi-tenancy foundation — the tables, RBAC engine, redaction engine, and state machine that all subsequent phases build on. No new UI. No new API routes for external consumers. Just the backend plumbing.

What's In

# Deliverable Depends on
3.1 RBAC model — roles, permissions, user-role assignment Tenant model (exists)
3.2 Case shares — cross-tenant case references with redaction policy 3.1
3.3 Redaction engine — policy executor that masks fields per actor type 3.2
3.4 Case lifecycle state machine — replace if/else with python-statemachine Tenant model

What's Out (later phases)

  • Provider portal UI (Phase 1)
  • Coordinator dashboard (Phase 1)
  • Facilitator/MSO flows (Phase 2–3)
  • Clerk multi-org wiring (Phase 1 — we define the mapping here but don't wire webhooks)
  • Upstash Workflow agentic pipelines (Phase 1+)

3.1 — RBAC Model

New Files

File Purpose
app/models/role.py Role + UserRole + Permission models
app/repositories/role_repository.py RBAC data access
app/services/rbac_service.py Permission checking, role assignment
app/middleware/rbac_middleware.py Request-level permission enforcement
alembic/versions/xxxx_add_rbac_tables.py Migration
tests/test_rbac.py Unit + integration tests
config/rbac_defaults.yaml Default role definitions + permissions

Models

# app/models/role.py

class RoleCode(str, Enum):
    """7 platform actor types from ADR-0018."""
    PATIENT = "patient"
    FACILITATOR = "facilitator"
    COORDINATOR = "coordinator"
    MSO_DOCTOR = "mso_doctor"
    PROVIDER_ADMIN = "provider_admin"
    PLATFORM_ADMIN = "platform_admin"
    SUPER_ADMIN = "super_admin"


class Permission(str, Enum):
    """Granular permissions. Grouped by domain."""
    # Patient domain
    CASE_READ_OWN = "case:read:own"
    CASE_WRITE_OWN = "case:write:own"
    CONSENT_MANAGE = "consent:manage"

    # Coordinator domain
    CASE_READ_ASSIGNED = "case:read:assigned"
    CASE_WRITE_ASSIGNED = "case:write:assigned"
    CASE_ASSIGN = "case:assign"
    CASE_QUEUE_VIEW = "case:queue:view"

    # Facilitator domain
    CASE_READ_DELEGATED = "case:read:delegated"
    CASE_WRITE_DELEGATED = "case:write:delegated"

    # MSO domain
    CONSULTATION_READ = "consultation:read"
    CONSULTATION_WRITE = "consultation:write"
    CLINICAL_DATA_READ = "clinical:read"

    # Provider domain
    CASE_READ_FORWARDED = "case:read:forwarded"
    QUOTE_WRITE = "quote:write"
    PROVIDER_MANAGE = "provider:manage"
    STAFF_MANAGE = "staff:manage"

    # Platform admin domain
    TENANT_MANAGE = "tenant:manage"
    RBAC_MANAGE = "rbac:manage"
    FEATURE_FLAG_MANAGE = "feature_flag:manage"
    USER_MANAGE = "user:manage"

    # Super admin domain
    TENANT_OVERRIDE = "tenant:override"
    AUDIT_READ = "audit:read"
    SYSTEM_MANAGE = "system:manage"


class Role(Base, UUIDPrimaryKeyMixin, TimestampMixin):
    """Platform role definition.

    DESIGN DECISION: Roles are a global catalog (like providers, treatment_categories).
    They are NOT tenant-scoped. System roles are seeded at migration time.
    Custom per-tenant roles are deferred to a future phase.
    Repository: BaseUnscopedRepository (same pattern as ProviderRepository).
    """

    __tablename__ = "roles"

    code: Mapped[str] = mapped_column(String(30), unique=True, nullable=False)
    name: Mapped[str] = mapped_column(String(100), nullable=False)
    description: Mapped[str | None] = mapped_column(Text)
    # Which tenant type this role belongs to (None = shared tenant)
    tenant_type: Mapped[str | None] = mapped_column(String(30))
    # "provider" | None (shared)
    permissions: Mapped[list] = mapped_column(FlexibleJSON, nullable=False, default=list)
    # ["case:read:own", "consent:manage", ...]
    is_system: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
    # System roles can't be deleted. Custom roles (future) can.


class UserRole(Base, UUIDPrimaryKeyMixin, TimestampMixin):
    """Assigns a user to a role within a tenant scope."""

    __tablename__ = "user_roles"

    user_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
    # Clerk user ID
    role_id: Mapped[str] = mapped_column(
        String(36), ForeignKey("roles.id"), nullable=False, index=True
    )
    tenant_id: Mapped[str] = mapped_column(
        String(36), ForeignKey("tenants.id"), nullable=False, index=True
    )
    # For shared-tenant roles: the platform tenant ID
    # For provider roles: the provider's tenant ID
    granted_by: Mapped[str] = mapped_column(String(255), nullable=False)
    # Clerk user ID of who granted the role
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)

    __table_args__ = (
        UniqueConstraint("user_id", "role_id", "tenant_id", name="uq_user_role_tenant"),
    )

Default Role→Permission Mapping (config/rbac_defaults.yaml)

roles:
  patient:
    name: "Patient"
    permissions:
      - case:read:own
      - case:write:own
      - consent:manage
    tenant_type: null  # shared tenant

  facilitator:
    name: "Facilitator"
    permissions:
      - case:read:delegated
      - case:write:delegated
    tenant_type: null

  coordinator:
    name: "Coordinator"
    permissions:
      - case:read:assigned
      - case:write:assigned
      - case:assign
      - case:queue:view
    tenant_type: null

  mso_doctor:
    name: "MSO Doctor"
    permissions:
      - consultation:read
      - consultation:write
      - clinical:read
    tenant_type: null

  provider_admin:
    name: "Provider Admin"
    permissions:
      - case:read:forwarded
      - quote:write
      - provider:manage
      - staff:manage
    tenant_type: provider

  platform_admin:
    name: "Platform Admin"
    permissions:
      - tenant:manage
      - rbac:manage
      - feature_flag:manage
      - user:manage
      - case:queue:view
      - audit:read
    tenant_type: null

  super_admin:
    name: "Super Admin"
    permissions:
      - tenant:override
      - audit:read
      - system:manage
      # Inherits all other permissions implicitly
    tenant_type: null

RBAC Service

# app/services/rbac_service.py

class RBACService:
    """Check permissions, assign/revoke roles."""

    async def has_permission(
        self, db: AsyncSession, user_id: str, tenant_id: str, permission: str
    ) -> bool:
        """Check if user has a specific permission in the given tenant."""

    async def get_user_roles(
        self, db: AsyncSession, user_id: str, tenant_id: str
    ) -> list[Role]:
        """Get all active roles for a user in a tenant."""

    async def assign_role(
        self, db: AsyncSession, user_id: str, role_code: str,
        tenant_id: str, granted_by: str
    ) -> UserRole:
        """Assign a role. Idempotent — returns existing if duplicate."""

    async def revoke_role(
        self, db: AsyncSession, user_id: str, role_code: str, tenant_id: str
    ) -> None:
        """Soft-revoke (is_active=False). Audit logged."""

    async def check_or_raise(
        self, db: AsyncSession, user_id: str, tenant_id: str, permission: str
    ) -> None:
        """Raise PermissionDenied if user lacks permission."""

RBAC Middleware

# app/middleware/rbac_middleware.py

class RBACMiddleware(BaseHTTPMiddleware):
    """Extracts user identity from Clerk JWT, resolves roles.

    Sets request.state.user_id, request.state.roles, request.state.permissions.
    Does NOT enforce — individual routes use @require_permission() decorator.
    """

Route Decorator

# Usage in routers:
@router.get("/cases/{case_id}")
@require_permission("case:read:own")
async def get_case(case_id: str, request: Request, db: AsyncSession = Depends(get_db)):
    ...

Migration

  • Create roles table with UUID PK, code (unique), name, permissions JSONB, tenant_type, is_system
  • Create user_roles table with UUID PK, user_id, role_id FK, tenant_id FK, granted_by, is_active
  • Seed 7 system roles from config/rbac_defaults.yaml
  • Add RLS policy on user_roles (tenant-scoped)

Tests

  • Unit: permission check logic, role assignment/revocation, duplicate handling
  • Integration: RBAC middleware sets request.state correctly
  • Isolation: user in tenant A cannot access tenant B resources
  • Super admin: can override tenant scope

3.2 — Case Shares

New Files

File Purpose
app/models/case_share.py CaseShare model
app/repositories/case_share_repository.py Data access
app/services/case_share_service.py Create/revoke shares, consent tracking
alembic/versions/xxxx_add_case_shares.py Migration
tests/test_case_shares.py Tests

Model

# app/models/case_share.py

class SharingMode(str, Enum):
    COPY = "copy"          # One-way snapshot (providers)
    REFERENCE = "reference" # Live read access (coordinators, MSO, facilitators)


class RedactionPolicy(str, Enum):
    PROVIDER_SNAPSHOT = "provider_snapshot"
    MSO_CLINICAL = "mso_clinical"
    COORDINATOR_FULL = "coordinator_full"
    FACILITATOR_CONSENT = "facilitator_consent"


class CaseShare(Base, UUIDPrimaryKeyMixin, TimestampMixin):
    """Cross-tenant case access with redaction policy.

    DESIGN DECISION: No TenantScopedMixin. This is the first cross-tenant table.
    Instead of a single tenant_id, it has source_tenant_id and target_tenant_id.
    RLS uses dual-scope: WHERE source_tenant_id = app.tenant_id OR target_tenant_id = app.tenant_id.
    Repository: CaseShareRepository (custom, not BaseRepository — handles dual scope).
    """

    __tablename__ = "case_shares"

    case_id: Mapped[str] = mapped_column(
        String(36), ForeignKey("cases.id"), nullable=False, index=True
    )
    source_tenant_id: Mapped[str] = mapped_column(
        String(36), ForeignKey("tenants.id"), nullable=False, index=True
    )
    target_tenant_id: Mapped[str] = mapped_column(
        String(36), ForeignKey("tenants.id"), nullable=False, index=True
    )

    # Actor receiving access
    actor_type: Mapped[str] = mapped_column(String(30), nullable=False)
    # RoleCode value: provider_admin | mso_doctor | coordinator | facilitator
    actor_id: Mapped[str] = mapped_column(String(255), nullable=False, index=True)
    # Clerk user ID or provider tenant ID

    sharing_mode: Mapped[str] = mapped_column(String(20), nullable=False)
    # "copy" or "reference"
    redaction_policy: Mapped[str] = mapped_column(String(50), nullable=False)
    # Which fields to mask

    # Consent tracking (facilitators) — links to existing ConsentRecord table
    # Uses purpose_code="facilitator_data_sharing" in consent_records
    consent_record_id: Mapped[str | None] = mapped_column(
        String(36), ForeignKey("consent_records.id"), nullable=True
    )
    # Denormalized for query performance — mirrors consent_record.status
    consent_granted: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)

    # Scope limiting (MSO consultations)
    context_type: Mapped[str | None] = mapped_column(String(30))
    # "consultation" | "forwarding" | None
    context_id: Mapped[str | None] = mapped_column(String(36))

    # Provider-side status tracking (only for actor_type="provider_admin")
    provider_status: Mapped[str | None] = mapped_column(String(30))
    # ProviderCaseStatus: received | reviewing | info_requested | quoted | rejected | selected | not_selected

    # Lifecycle
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
    expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
    # Provider snapshots expire after 30 days

    created_by: Mapped[str] = mapped_column(String(255), nullable=False)
    # Who initiated the share

Service

# app/services/case_share_service.py

class CaseShareService:

    async def create_share(
        self, db: AsyncSession, *,
        case_id: str, source_tenant_id: str, target_tenant_id: str,
        actor_type: str, actor_id: str,
        sharing_mode: SharingMode, redaction_policy: RedactionPolicy,
        created_by: str, context_type: str | None = None,
        context_id: str | None = None, expires_days: int | None = 30,
    ) -> CaseShare:
        """Create a case share. Audit logged."""

    async def revoke_share(
        self, db: AsyncSession, share_id: str, revoked_by: str
    ) -> None:
        """Soft-revoke a share. Audit logged."""

    async def grant_facilitator_consent(
        self, db: AsyncSession, share_id: str, patient_id: str
    ) -> CaseShare:
        """Patient grants facilitator access. Audit logged."""

    async def revoke_facilitator_consent(
        self, db: AsyncSession, share_id: str, revoked_by: str
    ) -> None:
        """Patient or coordinator revokes facilitator access."""

    async def get_shares_for_case(
        self, db: AsyncSession, case_id: str, tenant_id: str
    ) -> list[CaseShare]:
        """All active shares for a case (source tenant only)."""

    async def get_shares_for_actor(
        self, db: AsyncSession, actor_id: str, target_tenant_id: str
    ) -> list[CaseShare]:
        """All cases shared with an actor."""

Migration

  • Create case_shares table
  • Add dual-scope RLS policy: USING (source_tenant_id = current_setting('app.tenant_id', true) OR target_tenant_id = current_setting('app.tenant_id', true))
  • Add indexes on case_id, source_tenant_id, target_tenant_id, actor_id
  • Add composite unique constraint: (case_id, actor_id, context_id) to prevent duplicate shares
  • Add partial unique index for null context: CREATE UNIQUE INDEX uq_case_share_no_context ON case_shares (case_id, actor_id) WHERE context_id IS NULL

3.3 — Redaction Engine

New Files

File Purpose
app/services/redaction_engine.py Core redaction logic
config/redaction_policies.yaml Field-level redaction rules per policy
tests/test_redaction_engine.py Tests

Redaction Policy Config

# config/redaction_policies.yaml

policies:
  provider_snapshot:
    description: "Pseudonymized copy for healthcare providers"
    mode: copy
    patient_identity:
      name: pseudonymize  # CRW-2026-XXXXX
      email: remove
      phone: remove
      address: remove
      passport: remove
      date_of_birth: age_only  # Calculate age, drop DOB
    financial:
      budget: range_only  # "$5,000-$10,000" instead of exact
      other_quotes: remove
    clinical:
      procedure: keep
      comorbidities: keep
      medical_history: keep
      test_results: keep
      fhir_resources: keep
    conversation: remove
    match_results: remove  # Don't show other providers

  mso_clinical:
    description: "Clinical-only view for MSO second opinion"
    mode: reference
    patient_identity:
      name: remove
      email: remove
      phone: remove
      address: remove
      passport: remove
      date_of_birth: age_only
    financial: remove_all
    clinical:
      procedure: keep
      comorbidities: keep
      medical_history: keep
      test_results: keep
      fhir_resources: keep
    conversation: remove
    match_results: remove
    scope: consultation  # Only data relevant to consultation

  coordinator_full:
    description: "Full access for Curaway coordinators"
    mode: reference
    # No redaction — platform employment agreement covers data handling
    patient_identity: keep_all
    financial: keep_all
    clinical: keep_all
    conversation: keep_all
    match_results: keep_all

  facilitator_consent:
    description: "Consent-gated access for external facilitators"
    mode: reference
    # Access controlled by patient consent grant/revoke
    # When consent is active: same as coordinator (full)
    # When consent is revoked: no access (share deactivated)
    patient_identity: keep_all
    financial: keep_all
    clinical: keep_all
    conversation: keep_all
    match_results: keep_all

Engine

# app/services/redaction_engine.py

@dataclass
class RedactedCaseData:
    """Typed return from redaction engine. Guarantees no PII leaks in typed fields."""
    case_number: str
    patient_display_name: str   # Pseudonymized or real, per policy
    age: int | None             # Age only (no DOB), or None if removed
    procedure: dict | None
    comorbidities: list[dict]
    medical_history: dict | None
    test_results: list[dict]
    fhir_resources: list[dict]
    budget_display: str | None  # Range string or None if removed
    risk_factors: list[dict]
    clinical_summary: str | None
    # Fields explicitly excluded by redaction are not present (not set to None)


class RedactionEngine:
    """Applies redaction policies to case data before cross-tenant sharing.

    Singleton — instantiate once via get_redaction_engine(). Policies loaded
    from YAML at startup, not per-request.
    """

    def __init__(self):
        self._policies = load_yaml("config/redaction_policies.yaml")

    def redact_case(
        self, case_data: dict, policy: RedactionPolicy,
        case_number: str | None = None,
    ) -> RedactedCaseData:
        """Apply a redaction policy to case data.

        Args:
            case_data: Full case dict (from case + patient + FHIR)
            policy: Which redaction policy to apply
            case_number: For pseudonymization (provider_snapshot)

        Returns:
            Typed RedactedCaseData. Original is never mutated.
        """

    def redact_patient(self, patient_data: dict, policy: RedactionPolicy) -> dict:
        """Redact patient PII fields."""

    def redact_fhir(self, fhir_bundle: dict, policy: RedactionPolicy) -> dict:
        """Redact FHIR resources (remove patient identifiers from Bundle)."""


# Singleton factory
_engine: RedactionEngine | None = None

def get_redaction_engine() -> RedactionEngine:
    global _engine
    if _engine is None:
        _engine = RedactionEngine()
    return _engine

    def pseudonymize_name(self, case_number: str) -> str:
        """Generate pseudonym from case number: 'Patient CRW-2026-00034'."""
        return f"Patient {case_number}"

    def age_from_dob(self, date_of_birth: str) -> int:
        """Calculate age from DOB. Used for age_only redaction."""

    def budget_to_range(self, budget_cents: int) -> str:
        """Convert exact budget to range: 500000 → '$4,000–$6,000'."""

Tests

  • Unit: each redaction policy correctly masks/keeps fields
  • Provider snapshot: patient name → pseudonym, no email/phone, age only, budget range
  • MSO clinical: no PII, no financial, clinical data intact
  • Coordinator: nothing redacted
  • Facilitator: full access when consent active, no access when revoked
  • FHIR bundle: patient identifier resources stripped in provider/MSO policies
  • Edge cases: missing fields, None values, empty dicts

3.4 — Case Lifecycle State Machine

New Files

File Purpose
app/state_machine/case_states.py State + transition definitions
app/state_machine/case_machine.py python-statemachine CaseStateMachine
app/services/case_lifecycle.py Service wrapping state machine + persistence
tests/test_case_state_machine.py State machine tests

Dependencies

pip install python-statemachine>=2.3

Add to requirements.txt.

State Definitions

# app/state_machine/case_states.py

class CaseStatus(str, Enum):
    """All case lifecycle states from ADR-0018."""

    # Patient intake flow
    INTAKE = "intake"
    PROCEDURE_IDENTIFIED = "procedure_identified"
    RECORDS_COLLECTED = "records_collected"
    INTAKE_COMPLETE = "intake_complete"

    # Matching flow
    MATCHING = "matching"
    PROVIDERS_SELECTED = "providers_selected"
    CONSENT_GIVEN = "consent_given"

    # Platform review flow
    RISK_REVIEW_PENDING = "risk_review_pending"
    RISK_CLEARED = "risk_cleared"
    PROVIDERS_NOTIFIED = "providers_notified"
    QUOTING = "quoting"

    # Provider flow (per provider — tracked in case_shares, not case status)
    # Provider-side states: received, reviewing, info_requested, quoted, rejected

    # Patient decision flow
    QUOTES_POOLED = "quotes_pooled"
    PATIENT_REVIEWING = "patient_reviewing"
    PROVIDER_SELECTED = "provider_selected"
    MSO_OFFERED = "mso_offered"
    MSO_COMPLETE = "mso_complete"
    PAYMENT_LOCKED = "payment_locked"

    # Coordination flow
    COORDINATOR_ASSIGNED = "coordinator_assigned"
    PRE_OP = "pre_op"
    TRAVEL_BOOKED = "travel_booked"
    ADMITTED = "admitted"
    PROCEDURE_COMPLETE = "procedure_complete"
    POST_OP = "post_op"
    FOLLOW_UP = "follow_up"
    CASE_COMPLETE = "case_complete"

    # Terminal
    CLOSED = "closed"
    CANCELLED = "cancelled"


class ProviderCaseStatus(str, Enum):
    """Provider-side case tracking (per case_share)."""
    RECEIVED = "received"
    REVIEWING = "reviewing"
    INFO_REQUESTED = "info_requested"
    QUOTED = "quoted"
    REJECTED = "rejected"
    SELECTED = "selected"  # Patient chose this provider
    NOT_SELECTED = "not_selected"

State Machine

# app/state_machine/case_machine.py

from statemachine import StateMachine, State

class CaseStateMachine(StateMachine):
    """Case lifecycle state machine.

    Replaces the forward-only status list in case.py with explicit,
    validated transitions. Guards enforce business rules.
    """

    # ── Patient Intake ──
    intake = State(initial=True)
    procedure_identified = State()
    records_collected = State()
    intake_complete = State()

    # ── Matching ──
    matching = State()
    providers_selected = State()
    consent_given = State()

    # ── Platform Review ──
    risk_review_pending = State()
    risk_cleared = State()
    providers_notified = State()
    quoting = State()

    # ── Patient Decision ──
    quotes_pooled = State()
    patient_reviewing = State()
    provider_selected = State()
    mso_offered = State()
    mso_complete = State()
    payment_locked = State()

    # ── Coordination ──
    coordinator_assigned = State()
    pre_op = State()
    travel_booked = State()
    admitted = State()
    procedure_complete = State()
    post_op = State()
    follow_up = State()
    case_complete = State(final=True)

    # ── Terminal ──
    closed = State(final=True)
    cancelled = State(final=True)

    # ── Transitions ──
    identify_procedure = intake.to(procedure_identified)
    collect_records = procedure_identified.to(records_collected)
    complete_intake = records_collected.to(intake_complete)

    start_matching = intake_complete.to(matching)
    select_providers = matching.to(providers_selected)
    give_consent = providers_selected.to(consent_given)

    request_risk_review = consent_given.to(risk_review_pending)
    clear_risk = risk_review_pending.to(risk_cleared)
    notify_providers = risk_cleared.to(providers_notified)
    start_quoting = providers_notified.to(quoting)

    pool_quotes = quoting.to(quotes_pooled)
    begin_patient_review = quotes_pooled.to(patient_reviewing)
    select_provider = patient_reviewing.to(provider_selected)
    offer_mso = provider_selected.to(mso_offered)
    complete_mso = mso_offered.to(mso_complete)
    skip_mso = provider_selected.to(payment_locked)  # Skip MSO path
    lock_payment = mso_complete.to(payment_locked)

    assign_coordinator = payment_locked.to(coordinator_assigned)
    begin_pre_op = coordinator_assigned.to(pre_op)
    book_travel = pre_op.to(travel_booked)
    admit_patient = travel_booked.to(admitted)
    complete_procedure = admitted.to(procedure_complete)
    begin_post_op = procedure_complete.to(post_op)
    begin_follow_up = post_op.to(follow_up)
    complete_case = follow_up.to(case_complete)

    # Any state → closed/cancelled
    close_case = (
        intake.to(closed) | procedure_identified.to(closed) |
        records_collected.to(closed) | intake_complete.to(closed) |
        matching.to(closed) | providers_selected.to(closed) |
        consent_given.to(closed) | risk_review_pending.to(closed) |
        risk_cleared.to(closed) | providers_notified.to(closed) |
        quoting.to(closed) | quotes_pooled.to(closed) |
        patient_reviewing.to(closed) | provider_selected.to(closed) |
        mso_offered.to(closed) | mso_complete.to(closed) |
        payment_locked.to(closed) | coordinator_assigned.to(closed) |
        pre_op.to(closed) | travel_booked.to(closed) |
        admitted.to(closed) | procedure_complete.to(closed) |
        post_op.to(closed) | follow_up.to(closed)
    )

    cancel_case = (
        intake.to(cancelled) | procedure_identified.to(cancelled) |
        records_collected.to(cancelled) | intake_complete.to(cancelled) |
        matching.to(cancelled) | providers_selected.to(cancelled) |
        consent_given.to(cancelled) | risk_review_pending.to(cancelled) |
        risk_cleared.to(cancelled) | providers_notified.to(cancelled) |
        quoting.to(cancelled) | quotes_pooled.to(cancelled) |
        patient_reviewing.to(cancelled) | provider_selected.to(cancelled) |
        mso_offered.to(cancelled) | mso_complete.to(cancelled) |
        payment_locked.to(cancelled) | coordinator_assigned.to(cancelled) |
        pre_op.to(cancelled) | travel_booked.to(cancelled)
        # After admitted: medical decision, not patient cancellation
    )

    # ── Callbacks for audit logging ──
    def on_enter_state(self, state, event):
        """Log every state transition for audit trail."""
        # Will be wired to AuditLog in case_lifecycle service
        pass

Lifecycle Service

# app/services/case_lifecycle.py

class CaseLifecycleService:
    """Wraps the state machine with persistence and audit logging.

    This is the ONLY way to change case status. No direct status writes.
    """

    async def transition(
        self, db: AsyncSession, case_id: str, tenant_id: str,
        event: str, actor_id: str, actor_type: str,
        metadata: dict | None = None,
    ) -> Case:
        """Execute a state transition.

        1. Load case from DB
        2. Initialize state machine at current state
        3. Fire event (raises InvalidTransition if not allowed)
        4. Persist new status + workflow_state
        5. Create AuditLog entry
        6. Return updated case
        """

    async def get_available_transitions(
        self, db: AsyncSession, case_id: str, tenant_id: str
    ) -> list[str]:
        """What events can fire from current state?"""

    async def get_case_timeline(
        self, db: AsyncSession, case_id: str, tenant_id: str
    ) -> list[dict]:
        """Ordered list of state transitions from audit log."""

Status Migration Mapping

When enabling the state machine on a database with existing cases, the old statuses must map to the new state machine states:

Old status (VALID_CASE_STATUSES) New state (CaseStatus) Notes
intake intake Direct match
documents_pending records_collected Renamed to be document-type-agnostic
ehr_complete intake_complete EHR construction is part of intake completion
matching matching Direct match
matched providers_selected Renamed — "matched" was ambiguous
provider_selected provider_selected Direct match
consent_given consent_given Direct match
forwarded providers_notified Renamed — forwarding is one step of notification
closed closed Direct match

The migration does NOT alter existing data. When case_state_machine_v2=true, the CaseLifecycleService maps old statuses to new states at load time:

_OLD_TO_NEW = {
    "documents_pending": "records_collected",
    "ehr_complete": "intake_complete",
    "matched": "providers_selected",
    "forwarded": "providers_notified",
}

Backward Compatibility

The existing VALID_CASE_STATUSES list and is_valid_transition() in app/models/case.py stay during Phase 0. The state machine runs in parallel behind a feature flag:

# Feature flag
case_state_machine_v2: false

When case_state_machine_v2=true: - CaseLifecycleService.transition() validates via state machine - Old case_orchestrator.py calls CaseLifecycleService instead of direct status writes

When case_state_machine_v2=false: - Current behavior unchanged - State machine code exists but is not invoked

This allows gradual rollout and rollback.


Migration Strategy

All migrations run in a single Alembic revision chain:

xxxx_01_add_rbac_tables.py       → roles, user_roles + RLS + seed
xxxx_02_add_case_shares.py       → case_shares + RLS + indexes
xxxx_03_update_case_statuses.py  → expand VALID_CASE_STATUSES (no data migration needed — new states are only used when state machine is enabled)

Rollback Plan

All tables are additive — rollback = drop tables. No existing columns are modified. Feature flag controls activation.


Implementation Order

Week 1:
  Day 1-2: RBAC models + migration + repository + seed
  Day 3-4: RBAC service + middleware + decorator + tests

Week 2:
  Day 1-2: Case shares model + migration + repository + service
  Day 3:   Redaction engine + policy config + tests

Week 3:
  Day 1-2: State machine + lifecycle service + feature flag
  Day 3:   Integration tests + case_orchestrator wiring

Test Plan

Area Test Type Count (est.)
RBAC permission logic Unit 15–20
RBAC middleware Integration 5–8
Role assignment/revocation Integration 8–10
Case share CRUD Integration 10–12
Consent grant/revoke Integration 5–8
Redaction engine Unit 20–25
State machine transitions Unit 25–30
Invalid transitions Unit 10–15
Lifecycle service Integration 10–12
Cross-tenant isolation Integration 8–10
Total ~120–150

Feature Flags

Flag Default Purpose
rbac_enforcement false Enable RBAC permission checks on routes
case_state_machine_v2 false Use python-statemachine instead of status list
redaction_engine false Enable redaction on case share creation

All three are independent — can be enabled/disabled separately.


Open Questions

  1. Clerk multi-org wiring: Do we create provider orgs in Clerk during Phase 0 or defer to Phase 1? Recommendation: Defer. Phase 0 creates the RBAC tables; Phase 1 wires Clerk webhooks.

  2. Provider-side state tracking: Should ProviderCaseStatus live in case_shares.provider_status or in a separate provider_case_responses table? Recommendation: Column on case_shares — simpler, fewer joins.

  3. Redaction engine sync vs async: Should redaction happen at share-creation time (sync) or via Upstash Workflow (async)? Recommendation: Sync for Phase 0 (simpler). Async agent in Phase 1 for batch forwarding.