Authentication & Privacy¶
Curaway handles Protected Health Information (PHI) across international borders, making authentication, authorization, and data privacy foundational concerns. This document covers the full security model from JWT verification through GDPR compliance.
Authentication: Clerk¶
Clerk provides managed authentication with JWT-based session tokens, social login, multi-factor authentication, and an Organizations feature used for multi-tenancy.
JWT Flow¶
sequenceDiagram
participant U as User
participant FE as Frontend (Vercel)
participant CK as Clerk
participant BE as Backend (Railway)
U->>FE: Login
FE->>CK: Authenticate
CK-->>FE: JWT (short-lived)
FE->>BE: API Request + Authorization: Bearer {jwt}
BE->>CK: Verify JWT (JWKS endpoint)
CK-->>BE: Token claims {user_id, org_id, role}
BE->>BE: Extract tenant_id from org_id
BE-->>FE: Response (tenant-scoped data)
JWT Claims Structure¶
{
"sub": "user_2abc123",
"org_id": "org_tenant_apollo_001",
"org_role": "patient",
"org_permissions": ["read:own_data", "write:own_data"],
"iat": 1711900800,
"exp": 1711904400,
"iss": "https://clerk.curaway.com"
}
Backend JWT Verification¶
from clerk_backend_api import Clerk
clerk = Clerk(bearer_auth=CLERK_SECRET_KEY)
async def verify_token(authorization: str) -> TokenClaims:
"""Verify Clerk JWT and extract claims."""
token = authorization.replace("Bearer ", "")
claims = clerk.verify_token(token)
return TokenClaims(
user_id=claims["sub"],
tenant_id=claims["org_id"],
role=claims["org_role"],
)
Role-Based Access Control (RBAC)¶
Four roles with hierarchical permissions manage access across the platform.
| Role | Scope | Capabilities |
|---|---|---|
patient |
Own data only | View own records, upload documents, chat with assistant, manage consents |
provider_admin |
Own organization | View matched patients, manage provider profile, view anonymized analytics |
curaway_admin |
All tenants (read) | View all data, manage providers, review flagged content, run reports |
super_admin |
All tenants (write) | Everything above + manage admins, access audit logs, trigger data deletions |
Permission Matrix¶
| Resource | patient | provider_admin | curaway_admin | super_admin |
|---|---|---|---|---|
| Own patient record | RW | - | R | RW |
| Own documents | RW | - | R | RW |
| Matched patient data | - | R | R | RW |
| Provider profiles | R | RW (own) | RW | RW |
| Audit logs | - | - | R | R |
| Feature flags | - | - | R | RW |
| User management | - | - | R | RW |
| Data deletion | - | - | - | Execute |
Role Enforcement Middleware¶
from functools import wraps
def require_role(*allowed_roles: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, claims: TokenClaims, **kwargs):
if claims.role not in allowed_roles:
raise HTTPException(403, "Insufficient permissions")
return await func(*args, claims=claims, **kwargs)
return wrapper
return decorator
# Usage
@router.get("/admin/audit-logs")
@require_role("curaway_admin", "super_admin")
async def get_audit_logs(claims: TokenClaims = Depends(get_claims)):
...
Multi-Tenancy¶
Every table in PostgreSQL includes a tenant_id column. Tenant isolation is enforced at
multiple levels to prevent data leakage between organizations.
Enforcement Layers¶
flowchart LR
A[Request] --> B[JWT: org_id claim]
B --> C[Header: X-Tenant-ID]
C --> D[Middleware: validate match]
D --> E[Query: WHERE tenant_id = ?]
E --> F[RLS: PostgreSQL policy]
| Layer | Mechanism | Purpose |
|---|---|---|
| JWT claim | org_id in token |
Cryptographically bound tenant identity |
| HTTP header | X-Tenant-ID |
Explicit tenant context for debugging |
| Middleware | Claim-header match | Prevent header spoofing |
| Query filter | WHERE tenant_id = :tid |
Application-level isolation |
| RLS policy | PostgreSQL Row-Level Security | Database-level isolation (defense in depth) |
Tenant Context Middleware¶
async def tenant_middleware(request: Request, call_next):
claims = request.state.claims
header_tenant = request.headers.get("X-Tenant-ID")
if header_tenant and header_tenant != claims.tenant_id:
raise HTTPException(403, "Tenant ID mismatch")
request.state.tenant_id = claims.tenant_id
response = await call_next(request)
return response
PostgreSQL RLS¶
ALTER TABLE patients ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation ON patients
USING (tenant_id = current_setting('app.current_tenant_id'));
-- Set tenant context per request
SET app.current_tenant_id = 'tenant-apollo-001';
GDPR Consent Management¶
Curaway tracks explicit consent for 6 purposes, each with version tracking and immutable audit records. Consents are never silently updated — patients must actively re-consent when terms change.
Consent Purposes¶
| Purpose | Required | Description |
|---|---|---|
data_processing |
Yes | Core data processing for service delivery |
medical_data_sharing |
Yes | Sharing medical records with matched providers |
cross_border_transfer |
Yes | Transferring data across international borders |
communication |
Yes | Essential service communications (booking confirmations) |
marketing |
No | Marketing emails and promotional content |
analytics |
No | Anonymous usage analytics for service improvement |
Consent Record Schema¶
CREATE TABLE consent_records (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
patient_id UUID NOT NULL REFERENCES patients(id),
tenant_id VARCHAR(100) NOT NULL,
purpose VARCHAR(50) NOT NULL,
granted BOOLEAN NOT NULL,
version INTEGER NOT NULL,
granted_at TIMESTAMPTZ,
revoked_at TIMESTAMPTZ,
ip_address INET,
user_agent TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Consent records are append-only: no UPDATE or DELETE
REVOKE UPDATE, DELETE ON consent_records FROM app_user;
Consent Verification¶
async def verify_consent(patient_id: str, purpose: str, tenant_id: str) -> bool:
"""Check if patient has active consent for a specific purpose."""
record = await db.fetch_one(
"""
SELECT granted FROM consent_records
WHERE patient_id = :pid AND purpose = :purpose AND tenant_id = :tid
ORDER BY created_at DESC
LIMIT 1
""",
{"pid": patient_id, "purpose": purpose, "tid": tenant_id},
)
return record and record["granted"]
Version Tracking¶
When consent terms change (e.g., a new data processing agreement), the version is incremented. Patients with consent for the old version are prompted to review and re-consent to the new version. The QStash consent expiry cron job (daily at 09:00 UTC) identifies patients with outdated consent versions.
Data Classification¶
All data fields are classified into three tiers with corresponding protection levels.
| Classification | Examples | Encryption | Access Control |
|---|---|---|---|
| PII | Name, email, phone, address | Field-level AES-256 | Role-based, consent-required |
| PHI | Medical records, diagnoses, prescriptions | Field-level AES-256 + consent-gated | Role-based + explicit medical consent |
| Operational | Preferences, language, timezone | Standard encryption (at rest) | Role-based |
Field-Level Encryption¶
PII and PHI fields are encrypted at the application layer using AES-256 via Python's
cryptography.fernet module. The encryption key is stored as an environment variable,
never in source code.
from cryptography.fernet import Fernet
fernet = Fernet(ENCRYPTION_KEY.encode())
def encrypt_field(value: str) -> str:
"""Encrypt a sensitive field for storage."""
return fernet.encrypt(value.encode()).decode()
def decrypt_field(encrypted: str) -> str:
"""Decrypt a sensitive field for display."""
return fernet.decrypt(encrypted.encode()).decode()
Encrypted Fields¶
| Table | Field | Classification |
|---|---|---|
| patients | full_name | PII |
| patients | PII | |
| patients | phone | PII |
| patients | date_of_birth | PII |
| patient_medical_info | medical_history | PHI |
| patient_medical_info | current_medications | PHI |
| patient_medical_info | allergies | PHI |
| documents | original_filename | PII |
Decryption at Display Time¶
@router.get("/patients/{patient_id}")
@require_role("patient", "curaway_admin", "super_admin")
async def get_patient(patient_id: str, claims: TokenClaims = Depends(get_claims)):
patient = await db.get_patient(patient_id, claims.tenant_id)
return {
"id": patient.id,
"full_name": decrypt_field(patient.full_name),
"email": decrypt_field(patient.email),
# Operational fields returned as-is
"preferred_language": patient.preferred_language,
}
Data Subject Request Handler (GDPR Article 17)¶
The Right to Erasure requires deletion of all personal data across every data store. Curaway implements a cascade delete handler that systematically purges data from all five storage systems.
Deletion Cascade¶
flowchart TD
A[DSR Request Received] --> B[Verify Identity]
B --> C[Generate Deletion Job ID]
C --> D1[PostgreSQL: Delete patient records]
C --> D2[Neo4j: Remove patient nodes + edges]
C --> D3[Qdrant: Delete patient vectors]
C --> D4[R2: Delete patient documents]
C --> D5[Redis: Flush patient cache keys]
D1 --> E[Verify All Deletions]
D2 --> E
D3 --> E
D4 --> E
D5 --> E
E --> F[Generate Deletion Certificate]
F --> G[Send Certificate to Patient]
Deletion Implementation¶
async def execute_data_subject_deletion(patient_id: str, tenant_id: str) -> DeletionCertificate:
"""GDPR Article 17: Right to Erasure across all data stores."""
job_id = str(uuid4())
results = {}
# 1. PostgreSQL — cascade delete via foreign keys
results["postgresql"] = await delete_patient_postgres(patient_id, tenant_id)
# 2. Neo4j — remove patient node and all relationships
results["neo4j"] = await delete_patient_neo4j(patient_id)
# 3. Qdrant — delete any patient-associated vectors
results["qdrant"] = await delete_patient_vectors(patient_id)
# 4. R2 — delete all uploaded documents
results["r2"] = await delete_patient_documents(patient_id, tenant_id)
# 5. Redis — flush cached data
results["redis"] = await flush_patient_cache(patient_id)
# Generate immutable deletion certificate
certificate = DeletionCertificate(
job_id=job_id,
patient_id=patient_id, # Retained only in certificate
tenant_id=tenant_id,
deleted_at=datetime.utcnow(),
stores=results,
all_successful=all(r["success"] for r in results.values()),
)
# Store certificate in audit log (retained for compliance)
await store_deletion_certificate(certificate)
return certificate
Deletion Certificate¶
The certificate is a signed JSON document recording exactly what was deleted, when, and whether all deletions succeeded. It is stored in the append-only audit log and a copy is sent to the patient's email (captured before deletion).
Audit Logging¶
Every write operation and sensitive read generates an immutable audit log entry. The audit table is append-only — even super admins cannot delete audit records.
Audit Record Schema¶
CREATE TABLE audit_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL,
actor_id VARCHAR(100) NOT NULL, -- who
action VARCHAR(100) NOT NULL, -- what
resource_type VARCHAR(50) NOT NULL, -- what type
resource_id VARCHAR(100), -- which resource
details JSONB, -- additional context
ip_address INET, -- where
created_at TIMESTAMPTZ DEFAULT NOW() -- when
);
-- Append-only: no UPDATE or DELETE allowed
REVOKE UPDATE, DELETE ON audit_logs FROM app_user;
REVOKE DELETE ON audit_logs FROM super_admin_user;
-- Index for efficient querying
CREATE INDEX idx_audit_tenant_created ON audit_logs(tenant_id, created_at DESC);
CREATE INDEX idx_audit_actor ON audit_logs(actor_id, created_at DESC);
Audit Event Examples¶
| Action | Resource Type | Trigger |
|---|---|---|
patient.created |
patient | New patient registration |
consent.granted |
consent_record | Patient grants consent |
consent.revoked |
consent_record | Patient revokes consent |
document.uploaded |
document | File upload completed |
document.accessed |
document | Document viewed or downloaded |
dsr.executed |
deletion_job | Data subject deletion completed |
provider.matched |
match | AI matching result delivered |
admin.login |
session | Admin user authenticated |
Logging Middleware¶
async def audit_log(
tenant_id: str,
actor_id: str,
action: str,
resource_type: str,
resource_id: str = None,
details: dict = None,
ip_address: str = None,
):
await db.execute(
"""
INSERT INTO audit_logs (tenant_id, actor_id, action, resource_type, resource_id, details, ip_address)
VALUES (:tenant_id, :actor_id, :action, :resource_type, :resource_id, :details, :ip_address)
""",
{
"tenant_id": tenant_id,
"actor_id": actor_id,
"action": action,
"resource_type": resource_type,
"resource_id": resource_id,
"details": json.dumps(details) if details else None,
"ip_address": ip_address,
},
)
Legal Agreements¶
Terms of Service and Privacy Policy are versioned documents. User acceptance is tracked with timestamps and version numbers.
Agreement Schema¶
CREATE TABLE legal_agreements (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id VARCHAR(100) NOT NULL,
type VARCHAR(50) NOT NULL, -- 'terms_of_service' or 'privacy_policy'
version INTEGER NOT NULL,
content TEXT NOT NULL,
effective_date TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(tenant_id, type, version)
);
CREATE TABLE agreement_acceptances (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
patient_id UUID NOT NULL REFERENCES patients(id),
agreement_id UUID NOT NULL REFERENCES legal_agreements(id),
accepted_at TIMESTAMPTZ DEFAULT NOW(),
ip_address INET,
user_agent TEXT
);
When a new version of the ToS or Privacy Policy is published, patients are prompted to review and accept the updated terms on their next login.
Idempotency Keys¶
All write operations accept an X-Idempotency-Key header to prevent duplicate processing
from network retries or client-side double-submits.
Implementation¶
@router.post("/patients")
async def create_patient(
request: CreatePatientRequest,
idempotency_key: str = Header(alias="X-Idempotency-Key"),
claims: TokenClaims = Depends(get_claims),
):
# Check for existing result with this key
existing = await redis.get(f"idempotency:{idempotency_key}")
if existing:
return json.loads(existing) # Return cached result
# Process the request
patient = await db.create_patient(request, claims.tenant_id)
result = patient.dict()
# Cache result for 24 hours
await redis.set(f"idempotency:{idempotency_key}", json.dumps(result), ex=86400)
return result
The idempotency key is a client-generated UUID. Results are cached in Redis for 24 hours. Subsequent requests with the same key return the cached result without re-processing.
Correlation IDs¶
Every request is assigned an X-Correlation-ID that propagates across all service calls,
database queries, and log entries. This enables end-to-end request tracing.
Middleware¶
import uuid
async def correlation_id_middleware(request: Request, call_next):
correlation_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
request.state.correlation_id = correlation_id
response = await call_next(request)
response.headers["X-Correlation-ID"] = correlation_id
return response
Log Integration¶
logger.info(
"Patient created",
extra={
"correlation_id": request.state.correlation_id,
"tenant_id": request.state.tenant_id,
"patient_id": patient.id,
},
)
Correlation IDs appear in Langfuse traces, application logs, and audit records, enabling operators to trace a single patient action across every system it touches.
Security Headers¶
async def security_headers_middleware(request: Request, call_next):
response = await call_next(request)
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response