Spec: PostgreSQL Row-Level Security (Gap #5)¶
Status: Deferred to Wave 0.2 (after DAO Layer) Issue: #165 Effort: 2-3 days implementation + 1 day testing Tier: Opus Depends on: DAO Layer (Wave 0.1) — DAO ships first, RLS adds defense-in-depth Updated: 2026-04-17 (Session 41 audit — expanded from original 75-line spec)
Problem¶
Tenant isolation is app-layer only (WHERE tenant_id = :tid on every query via DAO repositories). A bug in any service that bypasses the repository, a raw SQL query, or direct DB access could leak cross-tenant data. RLS provides database-level enforcement as defense-in-depth.
Table Inventory — Complete RLS Scope¶
Tables REQUIRING RLS (13 tables — patient data with tenant_id)¶
| # | Table | Model | Has tenant_id | Reason |
|---|---|---|---|---|
| 1 | patients |
Patient | Yes | Core patient PII |
| 2 | cases |
Case | Yes | Case workflow, comorbidities, EHR snapshot |
| 3 | fhir_resources |
FHIRResource | Yes | Clinical data (PHI) |
| 4 | document_references |
DocumentReference | Yes | Uploaded medical documents |
| 5 | consent_records |
ConsentRecord | Yes | Patient consent history |
| 6 | conversations |
Conversation | Yes | Full conversation history |
| 7 | messages |
Message | No (via conversation → case) | Chat messages — see note below |
| 8 | match_results |
MatchResult | Yes | Provider matching outcomes |
| 9 | notifications |
Notification | Yes | May contain PII in content |
| 10 | consultations |
Consultation | Yes | MSO/teleconsultation records |
| 11 | feedback_records |
FeedbackRecord | Yes | Patient satisfaction data |
| 12 | device_registrations |
DeviceRegistration | Yes | Push tokens, device IDs |
| 13 | data_forwarding_audits |
DataForwardingAudit | Yes | Immutable forwarding records |
Note on messages: Messages don't have a direct tenant_id column. RLS policy joins through conversations table:
CREATE POLICY tenant_isolation ON messages
USING (conversation_id IN (
SELECT id FROM conversations
WHERE tenant_id = current_setting('app.tenant_id', true)
));
Tables NOT requiring RLS (system-wide, no tenant isolation needed)¶
| Table | Reason |
|---|---|
audit_logs |
System-wide, intentionally cross-tenant for compliance |
events |
System analytics, cross-tenant by design |
providers, provider_procedures, provider_facilities |
Global catalog — all patients see all providers |
doctors, doctor_procedures |
Global catalog |
treatment_categories, procedure_requirements |
System reference data |
consent_purposes |
Enum table |
notification_templates |
System templates |
notification_preferences |
Scoped by user_id, not patient/tenant |
idempotency_keys |
Scoped by key, not tenant |
legal_agreements, user_agreement_acceptances |
System-wide |
tenants, tenant_settings |
System/admin tables |
Session 41 Future Tables (RLS decisions for when implemented)¶
| Table | RLS needed? | Policy |
|---|---|---|
coordinator_vendors |
Yes | Direct tenant_id column |
coordinator_service_bookings |
Yes | Via case_id → case.tenant_id join |
coordinator_audit_log |
Yes | Direct tenant_id column |
graph_node_audit_log |
Yes | Via case_id → case.tenant_id join |
scoring_audit_log |
Yes | Via case_id → case.tenant_id join |
extraction_audit_log |
Yes | Via case_id → case.tenant_id join |
Alembic Migration¶
Step-by-step SQL¶
"""Enable Row-Level Security on patient-data tables.
Defense-in-depth: DAO layer (Wave 0.1) provides application-level
tenant isolation. RLS adds database-level enforcement.
"""
from alembic import op
def upgrade():
# ── Step 1: Create application role (non-superuser) ──
# Railway PostgreSQL default user is superuser.
# Create a restricted role for the application.
op.execute("""
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'curaway_app') THEN
CREATE ROLE curaway_app WITH NOSUPERUSER NOINHERIT LOGIN PASSWORD 'SET_VIA_ENV';
END IF;
END $$;
""")
# ── Step 2: Create indexes on tenant_id for performance ─��
# RLS evaluates tenant_id per row. Index prevents full table scans.
tables_with_tenant_id = [
'patients', 'cases', 'fhir_resources', 'document_references',
'consent_records', 'conversations', 'match_results', 'notifications',
'consultations', 'feedback_records', 'device_registrations',
'data_forwarding_audits',
]
for table in tables_with_tenant_id:
op.execute(f"""
CREATE INDEX IF NOT EXISTS idx_{table}_tenant_id
ON {table} (tenant_id);
""")
# ── Step 3: Enable RLS on all patient-data tables ──
all_rls_tables = tables_with_tenant_id + ['messages']
for table in all_rls_tables:
op.execute(f"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY;")
# ── Step 4: Create tenant isolation policies ──
# Standard policy for tables with direct tenant_id
for table in tables_with_tenant_id:
op.execute(f"""
CREATE POLICY tenant_isolation ON {table}
USING (tenant_id = current_setting('app.tenant_id', true))
WITH CHECK (tenant_id = current_setting('app.tenant_id', true));
""")
# Special policy for messages (no direct tenant_id — join through conversations)
op.execute("""
CREATE POLICY tenant_isolation ON messages
USING (conversation_id IN (
SELECT id FROM conversations
WHERE tenant_id = current_setting('app.tenant_id', true)
))
WITH CHECK (conversation_id IN (
SELECT id FROM conversations
WHERE tenant_id = current_setting('app.tenant_id', true)
));
""")
# ── Step 5: Grant permissions to application role ──
for table in all_rls_tables:
op.execute(f"GRANT SELECT, INSERT, UPDATE, DELETE ON {table} TO curaway_app;")
# Grant on system tables too (no RLS, but role needs access)
system_tables = [
'audit_logs', 'events', 'providers', 'provider_procedures',
'provider_facilities', 'doctors', 'doctor_procedures',
'treatment_categories', 'procedure_requirements', 'consent_purposes',
'notification_templates', 'notification_preferences',
'idempotency_keys', 'legal_agreements', 'user_agreement_acceptances',
'tenants', 'tenant_settings',
]
for table in system_tables:
op.execute(f"GRANT SELECT, INSERT, UPDATE, DELETE ON {table} TO curaway_app;")
# Grant sequence access for auto-increment
op.execute("GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO curaway_app;")
def downgrade():
"""Reverse all RLS changes. Safe — drops policies and disables RLS."""
all_rls_tables = [
'patients', 'cases', 'fhir_resources', 'document_references',
'consent_records', 'conversations', 'messages', 'match_results',
'notifications', 'consultations', 'feedback_records',
'device_registrations', 'data_forwarding_audits',
]
for table in all_rls_tables:
op.execute(f"DROP POLICY IF EXISTS tenant_isolation ON {table};")
op.execute(f"ALTER TABLE {table} DISABLE ROW LEVEL SECURITY;")
# Drop indexes (optional — they help performance regardless of RLS)
# Keep them: they benefit regular queries too.
# Drop role
op.execute("DROP ROLE IF EXISTS curaway_app;")
Connection Setup — SET tenant_id Per Request¶
In app/database.py, the get_db() dependency sets tenant_id on connection checkout:
async def get_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
tenant_id = request.headers.get("X-Tenant-ID")
async with async_session() as session:
if tenant_id:
await session.execute(
text("SET app.tenant_id = :tid"),
{"tid": tenant_id},
)
yield session
await session.commit()
If tenant_id is not set: RLS policy evaluates current_setting('app.tenant_id', true) — the true parameter means "return NULL if not set" (instead of erroring). NULL doesn't match any row → zero results returned. This is a safe failure (no data leak), but produces a 500 or empty response.
Detection: Log a warning when tenant_id is missing on a request that hits patient-data tables. The DAO layer's TenantIsolationViolation catches this at the app layer before RLS even comes into play.
Internal Endpoints — Tenant Context for Background Jobs¶
Internal endpoints (QStash callbacks, cron tasks) don't have a user session but still need tenant_id for RLS.
Pattern: Extract tenant_id from the payload¶
| Internal caller | How to get tenant_id |
|---|---|
| QStash OCR callback | document_id → lookup document_references.tenant_id using superuser connection (before SET) |
| Cron: intake reminders | Iterate over active cases — each case has tenant_id. SET per-case. |
| Cron: exchange rate refresh | System table — no RLS, no tenant_id needed |
| GDPR erasure cascade | Superuser role — bypasses RLS entirely (see below) |
| Case porting | Source case has tenant_id → SET for reads. Target case has tenant_id → SET for writes. Two SETs in one flow. |
Implementation for QStash callbacks:¶
async def qstash_callback(document_id: str, db: AsyncSession):
# Step 1: Lookup tenant_id using superuser (no RLS context yet)
result = await db.execute(
text("SELECT tenant_id FROM document_references WHERE id = :did"),
{"did": document_id},
)
tenant_id = result.scalar()
# Step 2: SET tenant context for subsequent queries
await db.execute(text("SET app.tenant_id = :tid"), {"tid": tenant_id})
# Step 3: Proceed with tenant-scoped operations
...
Alternative: Include tenant_id in every QStash event payload (redundant but simpler). Both approaches work — choose per endpoint.
Superuser / Admin Bypass¶
Railway PostgreSQL roles¶
| Role | Purpose | RLS behavior |
|---|---|---|
| Default Railway user (superuser) | Migrations, admin ops, GDPR erasure | Bypasses RLS (superuser privilege) |
curaway_app |
Application runtime | Subject to RLS policies |
Switching roles at runtime¶
For admin operations that need cross-tenant access:
# In GDPR erasure handler — use superuser connection (default)
# Do NOT set app.tenant_id — superuser bypasses RLS
async def gdpr_erase_patient(patient_id: str, db: AsyncSession):
# This runs as superuser — no RLS filtering
# Delete across all tenants
await db.execute(
text("DELETE FROM fhir_resources WHERE patient_id = :pid"),
{"pid": patient_id},
)
# ... cascade through all tables
Railway-specific consideration¶
Railway's PostgreSQL doesn't allow creating roles via standard Alembic. The curaway_app role must be created manually or via Railway's dashboard:
The Alembic migration uses IF NOT EXISTS to handle this gracefully — if the role was created manually, the migration skips role creation.
Application connection: Update Railway environment to use curaway_app role in DATABASE_URL after RLS is enabled. Migration connection still uses superuser.
Rollback Plan¶
If RLS causes unexpected 500 errors or empty responses in production:
| Step | Action | Time |
|---|---|---|
| 1 | Detect | Monitor: >5% increase in 500 errors or 0-row responses on patient endpoints within 10 min of deploy |
| 2 | Disable RLS | Run downgrade migration: alembic downgrade -1 (disables RLS on all 13 tables, drops policies) |
| 3 | Revert connection | Switch DATABASE_URL back to superuser role (if changed) |
| 4 | Verify | Hit /ready endpoint + spot-check patient data returns normally |
| 5 | Investigate | Check which table/query failed. Likely: missing SET app.tenant_id on a specific endpoint. |
| Total rollback time | ~5 minutes |
Safe failure mode: RLS blocks all rows (returns empty) rather than leaking data. This means the worst case is a service outage (patients see "no data"), never a privacy breach.
Test Plan¶
Unit Tests (tests/test_rls.py)¶
def test_rls_enabled_on_all_required_tables():
"""Verify RLS is enabled on all 13 patient-data tables."""
for table in REQUIRED_RLS_TABLES:
result = db.execute(text(f"""
SELECT rowsecurity FROM pg_tables
WHERE tablename = '{table}'
"""))
assert result.scalar() == True, f"RLS not enabled on {table}"
def test_rls_policy_exists_on_all_tables():
"""Verify tenant_isolation policy exists on each table."""
for table in REQUIRED_RLS_TABLES:
result = db.execute(text(f"""
SELECT COUNT(*) FROM pg_policies
WHERE tablename = '{table}' AND policyname = 'tenant_isolation'
"""))
assert result.scalar() == 1, f"Policy missing on {table}"
Integration Tests (tests/test_rls_isolation.py)¶
async def test_cross_tenant_select_blocked():
"""Tenant A cannot SELECT tenant B's patients."""
# Create patients in two tenants
patient_a = await create_patient(tenant="tenant-a")
patient_b = await create_patient(tenant="tenant-b")
# Set context to tenant A
await db.execute(text("SET app.tenant_id = 'tenant-a'"))
result = await db.execute(select(Patient))
patients = result.scalars().all()
assert patient_a in patients
assert patient_b not in patients
async def test_cross_tenant_update_blocked():
"""Tenant A cannot UPDATE tenant B's patients."""
patient_b = await create_patient(tenant="tenant-b")
await db.execute(text("SET app.tenant_id = 'tenant-a'"))
rows = await db.execute(
text("UPDATE patients SET intake_status = 'hacked' WHERE id = :id"),
{"id": patient_b.id},
)
assert rows.rowcount == 0 # No rows updated
async def test_cross_tenant_delete_blocked():
"""Tenant A cannot DELETE tenant B's patients."""
patient_b = await create_patient(tenant="tenant-b")
await db.execute(text("SET app.tenant_id = 'tenant-a'"))
rows = await db.execute(
text("DELETE FROM patients WHERE id = :id"),
{"id": patient_b.id},
)
assert rows.rowcount == 0 # No rows deleted
async def test_null_tenant_id_returns_nothing():
"""If SET is not called, no rows are visible."""
patient = await create_patient(tenant="tenant-a")
# Do NOT set app.tenant_id
result = await db.execute(select(Patient))
assert len(result.scalars().all()) == 0
async def test_messages_rls_via_conversation_join():
"""Messages are filtered through conversation's tenant_id."""
conv_a = await create_conversation(tenant="tenant-a")
msg_a = await create_message(conversation_id=conv_a.id)
await db.execute(text("SET app.tenant_id = 'tenant-b'"))
result = await db.execute(select(Message))
assert msg_a not in result.scalars().all()
Performance Baseline Test¶
async def test_rls_performance_overhead():
"""RLS adds <5% overhead on 1000-row SELECT."""
# Insert 1000 patients in tenant-a
await bulk_insert_patients(1000, tenant="tenant-a")
# Measure without RLS context (superuser, bypasses RLS)
start = time.monotonic()
await db.execute(text("SELECT COUNT(*) FROM patients"))
baseline_ms = (time.monotonic() - start) * 1000
# Measure with RLS context
await db.execute(text("SET app.tenant_id = 'tenant-a'"))
start = time.monotonic()
await db.execute(text("SELECT COUNT(*) FROM patients"))
rls_ms = (time.monotonic() - start) * 1000
overhead = (rls_ms - baseline_ms) / baseline_ms
assert overhead < 0.05, f"RLS overhead {overhead:.1%} exceeds 5%"
Edge Cases¶
| Scenario | Handling | Severity |
|---|---|---|
| NULL tenant_id in existing rows | NOT NULL constraint on tenant_id columns prevents this. Migration backfills any NULLs before enabling RLS. | Critical |
| Data porting across tenants (Session 38) | Case porting service uses superuser connection to read source tenant, then SET to target tenant for writes. Two SET calls in one flow. | High |
| Background job without user context | Extract tenant_id from payload/lookup (see Internal Endpoints section). | High |
| Super-admin needs cross-tenant view | Uses superuser role (bypasses RLS). All admin actions audit-logged with actor_type='system_admin'. |
High |
| GDPR erasure across tenants | Superuser role — bypasses RLS. Deletes across all tenants for a given patient_id. | High |
| RLS policy evaluates subquery on every row (messages table) | The conversation subquery is indexed (idx_conversations_tenant_id). For large message tables, consider adding tenant_id directly to messages in a future migration. |
Medium |
| Railway doesn't support role creation via Alembic | Create curaway_app role manually via Railway dashboard/psql. Migration uses IF NOT EXISTS. |
Medium |
| Connection pool returns a connection with stale tenant_id from previous request | SQLAlchemy's async session creates a new transaction per request. SET is per-transaction, not per-connection. No stale state. | Low |
| Forgot SET on a new endpoint | DAO layer catches this first (TenantIsolationViolation). RLS catches it second (0 rows returned). Double safety net. | Low |
Performance Considerations¶
| Factor | Impact | Mitigation |
|---|---|---|
SET app.tenant_id per request |
~1-5ms overhead per connection | Amortized by connection pooling. Negligible at current QPS. |
| RLS policy evaluation per row | ~0.1ms per 1000 rows | Index on tenant_id ensures index scan, not full table scan. |
| Redundant filtering (DAO + RLS) | 0ms — query optimizer deduplicates | PostgreSQL recognizes identical predicates. No double evaluation. |
| Messages subquery policy | Higher cost than direct column check | Index on conversations.tenant_id. Consider adding tenant_id to messages later. |
Implementation Checklist¶
| # | Task | Tier | Effort |
|---|---|---|---|
| 1 | Create curaway_app role on Railway PostgreSQL (manual) |
— | 15 min |
| 2 | Write Alembic migration (indexes + RLS + policies + grants) | Opus | 1 hour |
| 3 | Update get_db() to SET tenant_id per request |
Opus | 30 min |
| 4 | Update QStash callback endpoints with tenant_id lookup | Sonnet | 1 hour |
| 5 | Write RLS unit tests (schema validation) | Sonnet | 30 min |
| 6 | Write RLS integration tests (cross-tenant isolation) | Sonnet | 1 hour |
| 7 | Write RLS performance baseline test | Sonnet | 30 min |
| 8 | Test rollback procedure (downgrade + verify) | Sonnet | 30 min |
| 9 | Update GDPR erasure handler to use superuser explicitly | Sonnet | 30 min |
| 10 | Update DATABASE_URL env to use curaway_app role |
— | 15 min |
| 11 | Verify all existing tests pass with RLS enabled | Sonnet | 30 min |
| Total | ~6-7 hours |