Skip to content

Authentication & Privacy

Curaway handles Protected Health Information (PHI) across international borders, making authentication, authorization, and data privacy foundational concerns. This document covers the full security model from JWT verification through GDPR compliance.


Authentication: Clerk

Clerk provides managed authentication with JWT-based session tokens, social login, multi-factor authentication, and an Organizations feature used for multi-tenancy.

JWT Flow

sequenceDiagram
    participant U as User
    participant FE as Frontend (Vercel)
    participant CK as Clerk
    participant BE as Backend (Railway)

    U->>FE: Login
    FE->>CK: Authenticate
    CK-->>FE: JWT (short-lived)
    FE->>BE: API Request + Authorization: Bearer {jwt}
    BE->>CK: Verify JWT (JWKS endpoint)
    CK-->>BE: Token claims {user_id, org_id, role}
    BE->>BE: Extract tenant_id from org_id
    BE-->>FE: Response (tenant-scoped data)

JWT Claims Structure

{
  "sub": "user_2abc123",
  "org_id": "org_tenant_apollo_001",
  "org_role": "patient",
  "org_permissions": ["read:own_data", "write:own_data"],
  "iat": 1711900800,
  "exp": 1711904400,
  "iss": "https://clerk.curaway.com"
}

Backend JWT Verification

from clerk_backend_api import Clerk

clerk = Clerk(bearer_auth=CLERK_SECRET_KEY)

async def verify_token(authorization: str) -> TokenClaims:
    """Verify Clerk JWT and extract claims."""
    token = authorization.replace("Bearer ", "")
    claims = clerk.verify_token(token)
    return TokenClaims(
        user_id=claims["sub"],
        tenant_id=claims["org_id"],
        role=claims["org_role"],
    )

Role-Based Access Control (RBAC)

Four roles with hierarchical permissions manage access across the platform.

Role Scope Capabilities
patient Own data only View own records, upload documents, chat with assistant, manage consents
provider_admin Own organization View matched patients, manage provider profile, view anonymized analytics
curaway_admin All tenants (read) View all data, manage providers, review flagged content, run reports
super_admin All tenants (write) Everything above + manage admins, access audit logs, trigger data deletions

Permission Matrix

Resource patient provider_admin curaway_admin super_admin
Own patient record RW - R RW
Own documents RW - R RW
Matched patient data - R R RW
Provider profiles R RW (own) RW RW
Audit logs - - R R
Feature flags - - R RW
User management - - R RW
Data deletion - - - Execute

Role Enforcement Middleware

from functools import wraps

def require_role(*allowed_roles: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, claims: TokenClaims, **kwargs):
            if claims.role not in allowed_roles:
                raise HTTPException(403, "Insufficient permissions")
            return await func(*args, claims=claims, **kwargs)
        return wrapper
    return decorator

# Usage
@router.get("/admin/audit-logs")
@require_role("curaway_admin", "super_admin")
async def get_audit_logs(claims: TokenClaims = Depends(get_claims)):
    ...

Multi-Tenancy

Every table in PostgreSQL includes a tenant_id column. Tenant isolation is enforced at multiple levels to prevent data leakage between organizations.

Enforcement Layers

flowchart LR
    A[Request] --> B[JWT: org_id claim]
    B --> C[Header: X-Tenant-ID]
    C --> D[Middleware: validate match]
    D --> E[Query: WHERE tenant_id = ?]
    E --> F[RLS: PostgreSQL policy]
Layer Mechanism Purpose
JWT claim org_id in token Cryptographically bound tenant identity
HTTP header X-Tenant-ID Explicit tenant context for debugging
Middleware Claim-header match Prevent header spoofing
Query filter WHERE tenant_id = :tid Application-level isolation
RLS policy PostgreSQL Row-Level Security Database-level isolation (defense in depth)

Tenant Context Middleware

async def tenant_middleware(request: Request, call_next):
    claims = request.state.claims
    header_tenant = request.headers.get("X-Tenant-ID")

    if header_tenant and header_tenant != claims.tenant_id:
        raise HTTPException(403, "Tenant ID mismatch")

    request.state.tenant_id = claims.tenant_id
    response = await call_next(request)
    return response

PostgreSQL RLS

ALTER TABLE patients ENABLE ROW LEVEL SECURITY;

CREATE POLICY tenant_isolation ON patients
    USING (tenant_id = current_setting('app.current_tenant_id'));

-- Set tenant context per request
SET app.current_tenant_id = 'tenant-apollo-001';

Curaway tracks explicit consent for 6 purposes, each with version tracking and immutable audit records. Consents are never silently updated — patients must actively re-consent when terms change.

Purpose Required Description
data_processing Yes Core data processing for service delivery
medical_data_sharing Yes Sharing medical records with matched providers
cross_border_transfer Yes Transferring data across international borders
communication Yes Essential service communications (booking confirmations)
marketing No Marketing emails and promotional content
analytics No Anonymous usage analytics for service improvement
CREATE TABLE consent_records (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    patient_id UUID NOT NULL REFERENCES patients(id),
    tenant_id VARCHAR(100) NOT NULL,
    purpose VARCHAR(50) NOT NULL,
    granted BOOLEAN NOT NULL,
    version INTEGER NOT NULL,
    granted_at TIMESTAMPTZ,
    revoked_at TIMESTAMPTZ,
    ip_address INET,
    user_agent TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Consent records are append-only: no UPDATE or DELETE
REVOKE UPDATE, DELETE ON consent_records FROM app_user;
async def verify_consent(patient_id: str, purpose: str, tenant_id: str) -> bool:
    """Check if patient has active consent for a specific purpose."""
    record = await db.fetch_one(
        """
        SELECT granted FROM consent_records
        WHERE patient_id = :pid AND purpose = :purpose AND tenant_id = :tid
        ORDER BY created_at DESC
        LIMIT 1
        """,
        {"pid": patient_id, "purpose": purpose, "tid": tenant_id},
    )
    return record and record["granted"]

Version Tracking

When consent terms change (e.g., a new data processing agreement), the version is incremented. Patients with consent for the old version are prompted to review and re-consent to the new version. The QStash consent expiry cron job (daily at 09:00 UTC) identifies patients with outdated consent versions.


Data Classification

All data fields are classified into three tiers with corresponding protection levels.

Classification Examples Encryption Access Control
PII Name, email, phone, address Field-level AES-256 Role-based, consent-required
PHI Medical records, diagnoses, prescriptions Field-level AES-256 + consent-gated Role-based + explicit medical consent
Operational Preferences, language, timezone Standard encryption (at rest) Role-based

Field-Level Encryption

PII and PHI fields are encrypted at the application layer using AES-256 via Python's cryptography.fernet module. The encryption key is stored as an environment variable, never in source code.

from cryptography.fernet import Fernet

fernet = Fernet(ENCRYPTION_KEY.encode())

def encrypt_field(value: str) -> str:
    """Encrypt a sensitive field for storage."""
    return fernet.encrypt(value.encode()).decode()

def decrypt_field(encrypted: str) -> str:
    """Decrypt a sensitive field for display."""
    return fernet.decrypt(encrypted.encode()).decode()

Encrypted Fields

Table Field Classification
patients full_name PII
patients email PII
patients phone PII
patients date_of_birth PII
patient_medical_info medical_history PHI
patient_medical_info current_medications PHI
patient_medical_info allergies PHI
documents original_filename PII

Decryption at Display Time

@router.get("/patients/{patient_id}")
@require_role("patient", "curaway_admin", "super_admin")
async def get_patient(patient_id: str, claims: TokenClaims = Depends(get_claims)):
    patient = await db.get_patient(patient_id, claims.tenant_id)
    return {
        "id": patient.id,
        "full_name": decrypt_field(patient.full_name),
        "email": decrypt_field(patient.email),
        # Operational fields returned as-is
        "preferred_language": patient.preferred_language,
    }

Data Subject Request Handler (GDPR Article 17)

The Right to Erasure requires deletion of all personal data across every data store. Curaway implements a cascade delete handler that systematically purges data from all five storage systems.

Deletion Cascade

flowchart TD
    A[DSR Request Received] --> B[Verify Identity]
    B --> C[Generate Deletion Job ID]
    C --> D1[PostgreSQL: Delete patient records]
    C --> D2[Neo4j: Remove patient nodes + edges]
    C --> D3[Qdrant: Delete patient vectors]
    C --> D4[R2: Delete patient documents]
    C --> D5[Redis: Flush patient cache keys]
    D1 --> E[Verify All Deletions]
    D2 --> E
    D3 --> E
    D4 --> E
    D5 --> E
    E --> F[Generate Deletion Certificate]
    F --> G[Send Certificate to Patient]

Deletion Implementation

async def execute_data_subject_deletion(patient_id: str, tenant_id: str) -> DeletionCertificate:
    """GDPR Article 17: Right to Erasure across all data stores."""
    job_id = str(uuid4())
    results = {}

    # 1. PostgreSQL — cascade delete via foreign keys
    results["postgresql"] = await delete_patient_postgres(patient_id, tenant_id)

    # 2. Neo4j — remove patient node and all relationships
    results["neo4j"] = await delete_patient_neo4j(patient_id)

    # 3. Qdrant — delete any patient-associated vectors
    results["qdrant"] = await delete_patient_vectors(patient_id)

    # 4. R2 — delete all uploaded documents
    results["r2"] = await delete_patient_documents(patient_id, tenant_id)

    # 5. Redis — flush cached data
    results["redis"] = await flush_patient_cache(patient_id)

    # Generate immutable deletion certificate
    certificate = DeletionCertificate(
        job_id=job_id,
        patient_id=patient_id,  # Retained only in certificate
        tenant_id=tenant_id,
        deleted_at=datetime.utcnow(),
        stores=results,
        all_successful=all(r["success"] for r in results.values()),
    )

    # Store certificate in audit log (retained for compliance)
    await store_deletion_certificate(certificate)

    return certificate

Deletion Certificate

The certificate is a signed JSON document recording exactly what was deleted, when, and whether all deletions succeeded. It is stored in the append-only audit log and a copy is sent to the patient's email (captured before deletion).


Audit Logging

Every write operation and sensitive read generates an immutable audit log entry. The audit table is append-only — even super admins cannot delete audit records.

Audit Record Schema

CREATE TABLE audit_logs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id VARCHAR(100) NOT NULL,
    actor_id VARCHAR(100) NOT NULL,       -- who
    action VARCHAR(100) NOT NULL,          -- what
    resource_type VARCHAR(50) NOT NULL,    -- what type
    resource_id VARCHAR(100),              -- which resource
    details JSONB,                         -- additional context
    ip_address INET,                       -- where
    created_at TIMESTAMPTZ DEFAULT NOW()   -- when
);

-- Append-only: no UPDATE or DELETE allowed
REVOKE UPDATE, DELETE ON audit_logs FROM app_user;
REVOKE DELETE ON audit_logs FROM super_admin_user;

-- Index for efficient querying
CREATE INDEX idx_audit_tenant_created ON audit_logs(tenant_id, created_at DESC);
CREATE INDEX idx_audit_actor ON audit_logs(actor_id, created_at DESC);

Audit Event Examples

Action Resource Type Trigger
patient.created patient New patient registration
consent.granted consent_record Patient grants consent
consent.revoked consent_record Patient revokes consent
document.uploaded document File upload completed
document.accessed document Document viewed or downloaded
dsr.executed deletion_job Data subject deletion completed
provider.matched match AI matching result delivered
admin.login session Admin user authenticated

Logging Middleware

async def audit_log(
    tenant_id: str,
    actor_id: str,
    action: str,
    resource_type: str,
    resource_id: str = None,
    details: dict = None,
    ip_address: str = None,
):
    await db.execute(
        """
        INSERT INTO audit_logs (tenant_id, actor_id, action, resource_type, resource_id, details, ip_address)
        VALUES (:tenant_id, :actor_id, :action, :resource_type, :resource_id, :details, :ip_address)
        """,
        {
            "tenant_id": tenant_id,
            "actor_id": actor_id,
            "action": action,
            "resource_type": resource_type,
            "resource_id": resource_id,
            "details": json.dumps(details) if details else None,
            "ip_address": ip_address,
        },
    )

Terms of Service and Privacy Policy are versioned documents. User acceptance is tracked with timestamps and version numbers.

Agreement Schema

CREATE TABLE legal_agreements (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id VARCHAR(100) NOT NULL,
    type VARCHAR(50) NOT NULL,        -- 'terms_of_service' or 'privacy_policy'
    version INTEGER NOT NULL,
    content TEXT NOT NULL,
    effective_date TIMESTAMPTZ NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(tenant_id, type, version)
);

CREATE TABLE agreement_acceptances (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    patient_id UUID NOT NULL REFERENCES patients(id),
    agreement_id UUID NOT NULL REFERENCES legal_agreements(id),
    accepted_at TIMESTAMPTZ DEFAULT NOW(),
    ip_address INET,
    user_agent TEXT
);

When a new version of the ToS or Privacy Policy is published, patients are prompted to review and accept the updated terms on their next login.


Idempotency Keys

All write operations accept an X-Idempotency-Key header to prevent duplicate processing from network retries or client-side double-submits.

Implementation

@router.post("/patients")
async def create_patient(
    request: CreatePatientRequest,
    idempotency_key: str = Header(alias="X-Idempotency-Key"),
    claims: TokenClaims = Depends(get_claims),
):
    # Check for existing result with this key
    existing = await redis.get(f"idempotency:{idempotency_key}")
    if existing:
        return json.loads(existing)  # Return cached result

    # Process the request
    patient = await db.create_patient(request, claims.tenant_id)
    result = patient.dict()

    # Cache result for 24 hours
    await redis.set(f"idempotency:{idempotency_key}", json.dumps(result), ex=86400)
    return result

The idempotency key is a client-generated UUID. Results are cached in Redis for 24 hours. Subsequent requests with the same key return the cached result without re-processing.


Correlation IDs

Every request is assigned an X-Correlation-ID that propagates across all service calls, database queries, and log entries. This enables end-to-end request tracing.

Middleware

import uuid

async def correlation_id_middleware(request: Request, call_next):
    correlation_id = request.headers.get("X-Correlation-ID", str(uuid.uuid4()))
    request.state.correlation_id = correlation_id

    response = await call_next(request)
    response.headers["X-Correlation-ID"] = correlation_id
    return response

Log Integration

logger.info(
    "Patient created",
    extra={
        "correlation_id": request.state.correlation_id,
        "tenant_id": request.state.tenant_id,
        "patient_id": patient.id,
    },
)

Correlation IDs appear in Langfuse traces, application logs, and audit records, enabling operators to trace a single patient action across every system it touches.


Security Headers

async def security_headers_middleware(request: Request, call_next):
    response = await call_next(request)
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
    return response