Skip to content

Spec: PostgreSQL Row-Level Security (Gap #5)

Status: Deferred to Wave 0.2 (after DAO Layer) Issue: #165 Effort: 2-3 days implementation + 1 day testing Tier: Opus Depends on: DAO Layer (Wave 0.1) — DAO ships first, RLS adds defense-in-depth Updated: 2026-04-17 (Session 41 audit — expanded from original 75-line spec)


Problem

Tenant isolation is app-layer only (WHERE tenant_id = :tid on every query via DAO repositories). A bug in any service that bypasses the repository, a raw SQL query, or direct DB access could leak cross-tenant data. RLS provides database-level enforcement as defense-in-depth.


Table Inventory — Complete RLS Scope

Tables REQUIRING RLS (13 tables — patient data with tenant_id)

# Table Model Has tenant_id Reason
1 patients Patient Yes Core patient PII
2 cases Case Yes Case workflow, comorbidities, EHR snapshot
3 fhir_resources FHIRResource Yes Clinical data (PHI)
4 document_references DocumentReference Yes Uploaded medical documents
5 consent_records ConsentRecord Yes Patient consent history
6 conversations Conversation Yes Full conversation history
7 messages Message No (via conversation → case) Chat messages — see note below
8 match_results MatchResult Yes Provider matching outcomes
9 notifications Notification Yes May contain PII in content
10 consultations Consultation Yes MSO/teleconsultation records
11 feedback_records FeedbackRecord Yes Patient satisfaction data
12 device_registrations DeviceRegistration Yes Push tokens, device IDs
13 data_forwarding_audits DataForwardingAudit Yes Immutable forwarding records

Note on messages: Messages don't have a direct tenant_id column. RLS policy joins through conversations table:

CREATE POLICY tenant_isolation ON messages
  USING (conversation_id IN (
    SELECT id FROM conversations 
    WHERE tenant_id = current_setting('app.tenant_id', true)
  ));

Tables NOT requiring RLS (system-wide, no tenant isolation needed)

Table Reason
audit_logs System-wide, intentionally cross-tenant for compliance
events System analytics, cross-tenant by design
providers, provider_procedures, provider_facilities Global catalog — all patients see all providers
doctors, doctor_procedures Global catalog
treatment_categories, procedure_requirements System reference data
consent_purposes Enum table
notification_templates System templates
notification_preferences Scoped by user_id, not patient/tenant
idempotency_keys Scoped by key, not tenant
legal_agreements, user_agreement_acceptances System-wide
tenants, tenant_settings System/admin tables

Session 41 Future Tables (RLS decisions for when implemented)

Table RLS needed? Policy
coordinator_vendors Yes Direct tenant_id column
coordinator_service_bookings Yes Via case_id → case.tenant_id join
coordinator_audit_log Yes Direct tenant_id column
graph_node_audit_log Yes Via case_id → case.tenant_id join
scoring_audit_log Yes Via case_id → case.tenant_id join
extraction_audit_log Yes Via case_id → case.tenant_id join

Alembic Migration

Step-by-step SQL

"""Enable Row-Level Security on patient-data tables.

Defense-in-depth: DAO layer (Wave 0.1) provides application-level
tenant isolation. RLS adds database-level enforcement.
"""

from alembic import op


def upgrade():
    # ── Step 1: Create application role (non-superuser) ──
    # Railway PostgreSQL default user is superuser.
    # Create a restricted role for the application.
    op.execute("""
        DO $$
        BEGIN
            IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'curaway_app') THEN
                CREATE ROLE curaway_app WITH NOSUPERUSER NOINHERIT LOGIN PASSWORD 'SET_VIA_ENV';
            END IF;
        END $$;
    """)

    # ── Step 2: Create indexes on tenant_id for performance ─��
    # RLS evaluates tenant_id per row. Index prevents full table scans.
    tables_with_tenant_id = [
        'patients', 'cases', 'fhir_resources', 'document_references',
        'consent_records', 'conversations', 'match_results', 'notifications',
        'consultations', 'feedback_records', 'device_registrations',
        'data_forwarding_audits',
    ]
    for table in tables_with_tenant_id:
        op.execute(f"""
            CREATE INDEX IF NOT EXISTS idx_{table}_tenant_id 
            ON {table} (tenant_id);
        """)

    # ── Step 3: Enable RLS on all patient-data tables ──
    all_rls_tables = tables_with_tenant_id + ['messages']
    for table in all_rls_tables:
        op.execute(f"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY;")

    # ── Step 4: Create tenant isolation policies ──
    # Standard policy for tables with direct tenant_id
    for table in tables_with_tenant_id:
        op.execute(f"""
            CREATE POLICY tenant_isolation ON {table}
                USING (tenant_id = current_setting('app.tenant_id', true))
                WITH CHECK (tenant_id = current_setting('app.tenant_id', true));
        """)

    # Special policy for messages (no direct tenant_id — join through conversations)
    op.execute("""
        CREATE POLICY tenant_isolation ON messages
            USING (conversation_id IN (
                SELECT id FROM conversations 
                WHERE tenant_id = current_setting('app.tenant_id', true)
            ))
            WITH CHECK (conversation_id IN (
                SELECT id FROM conversations 
                WHERE tenant_id = current_setting('app.tenant_id', true)
            ));
    """)

    # ── Step 5: Grant permissions to application role ──
    for table in all_rls_tables:
        op.execute(f"GRANT SELECT, INSERT, UPDATE, DELETE ON {table} TO curaway_app;")

    # Grant on system tables too (no RLS, but role needs access)
    system_tables = [
        'audit_logs', 'events', 'providers', 'provider_procedures',
        'provider_facilities', 'doctors', 'doctor_procedures',
        'treatment_categories', 'procedure_requirements', 'consent_purposes',
        'notification_templates', 'notification_preferences',
        'idempotency_keys', 'legal_agreements', 'user_agreement_acceptances',
        'tenants', 'tenant_settings',
    ]
    for table in system_tables:
        op.execute(f"GRANT SELECT, INSERT, UPDATE, DELETE ON {table} TO curaway_app;")

    # Grant sequence access for auto-increment
    op.execute("GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO curaway_app;")


def downgrade():
    """Reverse all RLS changes. Safe — drops policies and disables RLS."""
    all_rls_tables = [
        'patients', 'cases', 'fhir_resources', 'document_references',
        'consent_records', 'conversations', 'messages', 'match_results',
        'notifications', 'consultations', 'feedback_records',
        'device_registrations', 'data_forwarding_audits',
    ]
    for table in all_rls_tables:
        op.execute(f"DROP POLICY IF EXISTS tenant_isolation ON {table};")
        op.execute(f"ALTER TABLE {table} DISABLE ROW LEVEL SECURITY;")

    # Drop indexes (optional — they help performance regardless of RLS)
    # Keep them: they benefit regular queries too.

    # Drop role
    op.execute("DROP ROLE IF EXISTS curaway_app;")

Connection Setup — SET tenant_id Per Request

In app/database.py, the get_db() dependency sets tenant_id on connection checkout:

async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
    tenant_id = request.headers.get("X-Tenant-ID")
    async with async_session() as session:
        if tenant_id:
            await session.execute(
                text("SET app.tenant_id = :tid"),
                {"tid": tenant_id},
            )
        yield session
        await session.commit()

If tenant_id is not set: RLS policy evaluates current_setting('app.tenant_id', true) — the true parameter means "return NULL if not set" (instead of erroring). NULL doesn't match any row → zero results returned. This is a safe failure (no data leak), but produces a 500 or empty response.

Detection: Log a warning when tenant_id is missing on a request that hits patient-data tables. The DAO layer's TenantIsolationViolation catches this at the app layer before RLS even comes into play.


Internal Endpoints — Tenant Context for Background Jobs

Internal endpoints (QStash callbacks, cron tasks) don't have a user session but still need tenant_id for RLS.

Pattern: Extract tenant_id from the payload

Internal caller How to get tenant_id
QStash OCR callback document_id → lookup document_references.tenant_id using superuser connection (before SET)
Cron: intake reminders Iterate over active cases — each case has tenant_id. SET per-case.
Cron: exchange rate refresh System table — no RLS, no tenant_id needed
GDPR erasure cascade Superuser role — bypasses RLS entirely (see below)
Case porting Source case has tenant_id → SET for reads. Target case has tenant_id → SET for writes. Two SETs in one flow.

Implementation for QStash callbacks:

async def qstash_callback(document_id: str, db: AsyncSession):
    # Step 1: Lookup tenant_id using superuser (no RLS context yet)
    result = await db.execute(
        text("SELECT tenant_id FROM document_references WHERE id = :did"),
        {"did": document_id},
    )
    tenant_id = result.scalar()

    # Step 2: SET tenant context for subsequent queries
    await db.execute(text("SET app.tenant_id = :tid"), {"tid": tenant_id})

    # Step 3: Proceed with tenant-scoped operations
    ...

Alternative: Include tenant_id in every QStash event payload (redundant but simpler). Both approaches work — choose per endpoint.


Superuser / Admin Bypass

Railway PostgreSQL roles

Role Purpose RLS behavior
Default Railway user (superuser) Migrations, admin ops, GDPR erasure Bypasses RLS (superuser privilege)
curaway_app Application runtime Subject to RLS policies

Switching roles at runtime

For admin operations that need cross-tenant access:

# In GDPR erasure handler — use superuser connection (default)
# Do NOT set app.tenant_id — superuser bypasses RLS
async def gdpr_erase_patient(patient_id: str, db: AsyncSession):
    # This runs as superuser — no RLS filtering
    # Delete across all tenants
    await db.execute(
        text("DELETE FROM fhir_resources WHERE patient_id = :pid"),
        {"pid": patient_id},
    )
    # ... cascade through all tables

Railway-specific consideration

Railway's PostgreSQL doesn't allow creating roles via standard Alembic. The curaway_app role must be created manually or via Railway's dashboard:

# Via Railway CLI or psql
CREATE ROLE curaway_app WITH NOSUPERUSER NOINHERIT LOGIN PASSWORD 'xxx';

The Alembic migration uses IF NOT EXISTS to handle this gracefully — if the role was created manually, the migration skips role creation.

Application connection: Update Railway environment to use curaway_app role in DATABASE_URL after RLS is enabled. Migration connection still uses superuser.


Rollback Plan

If RLS causes unexpected 500 errors or empty responses in production:

Step Action Time
1 Detect Monitor: >5% increase in 500 errors or 0-row responses on patient endpoints within 10 min of deploy
2 Disable RLS Run downgrade migration: alembic downgrade -1 (disables RLS on all 13 tables, drops policies)
3 Revert connection Switch DATABASE_URL back to superuser role (if changed)
4 Verify Hit /ready endpoint + spot-check patient data returns normally
5 Investigate Check which table/query failed. Likely: missing SET app.tenant_id on a specific endpoint.
Total rollback time ~5 minutes

Safe failure mode: RLS blocks all rows (returns empty) rather than leaking data. This means the worst case is a service outage (patients see "no data"), never a privacy breach.


Test Plan

Unit Tests (tests/test_rls.py)

def test_rls_enabled_on_all_required_tables():
    """Verify RLS is enabled on all 13 patient-data tables."""
    for table in REQUIRED_RLS_TABLES:
        result = db.execute(text(f"""
            SELECT rowsecurity FROM pg_tables 
            WHERE tablename = '{table}'
        """))
        assert result.scalar() == True, f"RLS not enabled on {table}"

def test_rls_policy_exists_on_all_tables():
    """Verify tenant_isolation policy exists on each table."""
    for table in REQUIRED_RLS_TABLES:
        result = db.execute(text(f"""
            SELECT COUNT(*) FROM pg_policies 
            WHERE tablename = '{table}' AND policyname = 'tenant_isolation'
        """))
        assert result.scalar() == 1, f"Policy missing on {table}"

Integration Tests (tests/test_rls_isolation.py)

async def test_cross_tenant_select_blocked():
    """Tenant A cannot SELECT tenant B's patients."""
    # Create patients in two tenants
    patient_a = await create_patient(tenant="tenant-a")
    patient_b = await create_patient(tenant="tenant-b")

    # Set context to tenant A
    await db.execute(text("SET app.tenant_id = 'tenant-a'"))
    result = await db.execute(select(Patient))
    patients = result.scalars().all()

    assert patient_a in patients
    assert patient_b not in patients

async def test_cross_tenant_update_blocked():
    """Tenant A cannot UPDATE tenant B's patients."""
    patient_b = await create_patient(tenant="tenant-b")

    await db.execute(text("SET app.tenant_id = 'tenant-a'"))
    rows = await db.execute(
        text("UPDATE patients SET intake_status = 'hacked' WHERE id = :id"),
        {"id": patient_b.id},
    )
    assert rows.rowcount == 0  # No rows updated

async def test_cross_tenant_delete_blocked():
    """Tenant A cannot DELETE tenant B's patients."""
    patient_b = await create_patient(tenant="tenant-b")

    await db.execute(text("SET app.tenant_id = 'tenant-a'"))
    rows = await db.execute(
        text("DELETE FROM patients WHERE id = :id"),
        {"id": patient_b.id},
    )
    assert rows.rowcount == 0  # No rows deleted

async def test_null_tenant_id_returns_nothing():
    """If SET is not called, no rows are visible."""
    patient = await create_patient(tenant="tenant-a")
    # Do NOT set app.tenant_id
    result = await db.execute(select(Patient))
    assert len(result.scalars().all()) == 0

async def test_messages_rls_via_conversation_join():
    """Messages are filtered through conversation's tenant_id."""
    conv_a = await create_conversation(tenant="tenant-a")
    msg_a = await create_message(conversation_id=conv_a.id)

    await db.execute(text("SET app.tenant_id = 'tenant-b'"))
    result = await db.execute(select(Message))
    assert msg_a not in result.scalars().all()

Performance Baseline Test

async def test_rls_performance_overhead():
    """RLS adds <5% overhead on 1000-row SELECT."""
    # Insert 1000 patients in tenant-a
    await bulk_insert_patients(1000, tenant="tenant-a")

    # Measure without RLS context (superuser, bypasses RLS)
    start = time.monotonic()
    await db.execute(text("SELECT COUNT(*) FROM patients"))
    baseline_ms = (time.monotonic() - start) * 1000

    # Measure with RLS context
    await db.execute(text("SET app.tenant_id = 'tenant-a'"))
    start = time.monotonic()
    await db.execute(text("SELECT COUNT(*) FROM patients"))
    rls_ms = (time.monotonic() - start) * 1000

    overhead = (rls_ms - baseline_ms) / baseline_ms
    assert overhead < 0.05, f"RLS overhead {overhead:.1%} exceeds 5%"

Edge Cases

Scenario Handling Severity
NULL tenant_id in existing rows NOT NULL constraint on tenant_id columns prevents this. Migration backfills any NULLs before enabling RLS. Critical
Data porting across tenants (Session 38) Case porting service uses superuser connection to read source tenant, then SET to target tenant for writes. Two SET calls in one flow. High
Background job without user context Extract tenant_id from payload/lookup (see Internal Endpoints section). High
Super-admin needs cross-tenant view Uses superuser role (bypasses RLS). All admin actions audit-logged with actor_type='system_admin'. High
GDPR erasure across tenants Superuser role — bypasses RLS. Deletes across all tenants for a given patient_id. High
RLS policy evaluates subquery on every row (messages table) The conversation subquery is indexed (idx_conversations_tenant_id). For large message tables, consider adding tenant_id directly to messages in a future migration. Medium
Railway doesn't support role creation via Alembic Create curaway_app role manually via Railway dashboard/psql. Migration uses IF NOT EXISTS. Medium
Connection pool returns a connection with stale tenant_id from previous request SQLAlchemy's async session creates a new transaction per request. SET is per-transaction, not per-connection. No stale state. Low
Forgot SET on a new endpoint DAO layer catches this first (TenantIsolationViolation). RLS catches it second (0 rows returned). Double safety net. Low

Performance Considerations

Factor Impact Mitigation
SET app.tenant_id per request ~1-5ms overhead per connection Amortized by connection pooling. Negligible at current QPS.
RLS policy evaluation per row ~0.1ms per 1000 rows Index on tenant_id ensures index scan, not full table scan.
Redundant filtering (DAO + RLS) 0ms — query optimizer deduplicates PostgreSQL recognizes identical predicates. No double evaluation.
Messages subquery policy Higher cost than direct column check Index on conversations.tenant_id. Consider adding tenant_id to messages later.

Implementation Checklist

# Task Tier Effort
1 Create curaway_app role on Railway PostgreSQL (manual) 15 min
2 Write Alembic migration (indexes + RLS + policies + grants) Opus 1 hour
3 Update get_db() to SET tenant_id per request Opus 30 min
4 Update QStash callback endpoints with tenant_id lookup Sonnet 1 hour
5 Write RLS unit tests (schema validation) Sonnet 30 min
6 Write RLS integration tests (cross-tenant isolation) Sonnet 1 hour
7 Write RLS performance baseline test Sonnet 30 min
8 Test rollback procedure (downgrade + verify) Sonnet 30 min
9 Update GDPR erasure handler to use superuser explicitly Sonnet 30 min
10 Update DATABASE_URL env to use curaway_app role 15 min
11 Verify all existing tests pass with RLS enabled Sonnet 30 min
Total ~6-7 hours