Skip to content

DAO Layer — Feature Spec

Status: Implemented in Session 41 Companion steer: ai-steer/dao-layer-steer.md

This is the implementation plan for Phase 1 (PostgreSQL repositories). Read the steer first — it covers the problem statement, design principles, error taxonomy, circuit breaker design, and migration strategy.


Overview

Phase 1 introduces a repository layer for PostgreSQL access. Five typed repositories share a BaseRepository that enforces tenant isolation, wraps errors in a unified taxonomy, and provides automatic audit logging for write operations. Services are migrated one at a time — each migration is a separate PR that replaces inline SQLAlchemy queries with repository method calls.

No changes to Neo4j, Redis, R2, or QStash in Phase 1.


File-by-file change list

New files

File Purpose Lines (est.)
app/errors.py DataStoreError hierarchy: base class + 5 subclasses ~80
app/repositories/__init__.py Package init, re-exports all repositories ~15
app/repositories/base.py BaseRepository with tenant scoping, audit logging, error wrapping ~150
app/repositories/patient_repository.py PatientRepository — typed methods for Patient CRUD ~180
app/repositories/case_repository.py CaseRepository — typed methods for Case + Conversation ~160
app/repositories/fhir_repository.py FhirRepository — typed methods for FHIR resource CRUD ~120
app/repositories/document_repository.py DocumentRepository — typed methods for DocumentReference ~130
app/repositories/provider_repository.py ProviderRepository — typed methods for Provider reads ~100
tests/test_repositories/test_base_repository.py Unit tests for BaseRepository tenant isolation + audit ~120
tests/test_repositories/test_patient_repository.py Unit tests for PatientRepository ~150
tests/test_repositories/test_case_repository.py Unit tests for CaseRepository ~120
tests/test_repositories/test_tenant_isolation.py Integration test verifying cross-tenant access is blocked ~80
tests/test_errors.py Unit tests for error taxonomy (serialization, retry_safe flags) ~60

Modified files (Phase 1 service migrations)

File Change
app/services/patient_service.py Replace inline select(Patient).where(...) with PatientRepository calls. Remove manual AuditLog / Event creation for CRUD ops (BaseRepository handles it). Keep business logic (encryption, status transitions) in the service.
app/services/case_service.py Replace inline select(Case).where(...) with CaseRepository calls. Fix _generate_case_number() to scope by tenant_id. Keep workflow state machine logic in the service.
app/services/fhir_service.py Replace inline select(FhirResource).where(...) with FhirRepository calls. Keep FHIR validation logic in the service.
app/services/document_service.py Replace inline select(DocumentReference).where(...) with DocumentRepository calls. R2 and QStash calls stay in the service (Phase 3).
app/services/provider_service.py Replace inline select(Provider).where(...) with ProviderRepository calls. Neo4j auto-sync stays in the service (Phase 2).
app/services/match_service.py Replace direct Patient/Provider/FHIR reads with repository calls. The match orchestration logic stays in the service.

Unchanged files

File Why unchanged
app/database.py Session factory and get_db() dependency stay as-is. Repositories receive sessions, they don't create them.
app/services/graph_service.py Phase 2 (GraphDAO).
app/integrations/redis_pubsub.py Phase 2 (CacheDAO).
app/integrations/r2_client.py Phase 3 (StorageDAO).
app/integrations/qstash_client.py Phase 3 (QueueDAO).
app/integrations/qdrant_service.py Already well-abstracted. Not wrapped.
app/services/semantic_search_service.py Already well-abstracted. Not wrapped.

Detailed designs

app/errors.py — DataStoreError Taxonomy

"""
Curaway — Unified Data Store Error Taxonomy.

All data access errors are wrapped in DataStoreError subclasses.
Services catch these instead of store-specific exceptions (SQLAlchemy,
neo4j, botocore, httpx).
"""


class DataStoreError(Exception):
    """Base class for all data store errors."""

    def __init__(
        self,
        message: str,
        *,
        store: str,               # "postgres", "neo4j", "redis", "r2", "qstash"
        operation: str,           # "read", "write", "delete", "connect"
        retry_safe: bool = False,
        original_error: Exception | None = None,
    ):
        super().__init__(message)
        self.store = store
        self.operation = operation
        self.retry_safe = retry_safe
        self.original_error = original_error

    def to_dict(self) -> dict:
        return {
            "error_type": type(self).__name__,
            "store": self.store,
            "operation": self.operation,
            "retry_safe": self.retry_safe,
            "message": str(self),
        }


class StoreUnavailableError(DataStoreError):
    """Store is down, connection refused, or circuit breaker is open."""

    def __init__(self, store: str, operation: str = "connect", **kwargs):
        super().__init__(
            f"{store} is unavailable",
            store=store,
            operation=operation,
            retry_safe=True,
            **kwargs,
        )


class TenantIsolationViolation(DataStoreError):
    """A query was attempted without a tenant_id filter."""

    def __init__(self, store: str = "postgres", operation: str = "read"):
        super().__init__(
            "Query attempted without tenant_id — this is a bug, not a user error",
            store=store,
            operation=operation,
            retry_safe=False,
        )


class RecordNotFoundError(DataStoreError):
    """Expected record does not exist (after tenant-scoped lookup)."""

    def __init__(self, store: str, resource_type: str, resource_id: str):
        super().__init__(
            f"{resource_type} {resource_id} not found",
            store=store,
            operation="read",
            retry_safe=False,
        )


class DuplicateRecordError(DataStoreError):
    """Unique constraint violation."""

    def __init__(self, store: str, detail: str = ""):
        super().__init__(
            f"Duplicate record: {detail}" if detail else "Duplicate record",
            store=store,
            operation="write",
            retry_safe=False,
        )


class QueryError(DataStoreError):
    """Malformed query or constraint violation (not connectivity)."""

    def __init__(self, store: str, detail: str, **kwargs):
        super().__init__(
            f"Query error: {detail}",
            store=store,
            operation="read",
            retry_safe=False,
            **kwargs,
        )

app/repositories/base.py — BaseRepository

"""
Curaway — Base Repository.

All PostgreSQL repositories inherit from this. Provides:
1. Tenant-scoped query builder (enforced — no unscoped reads)
2. Automatic AuditLog creation on writes
3. Error wrapping (SQLAlchemy exceptions -> DataStoreError)
"""

import uuid
import logging
from datetime import datetime, timezone

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError, OperationalError

from app.errors import (
    DataStoreError,
    StoreUnavailableError,
    DuplicateRecordError,
    QueryError,
)
from app.models.audit import AuditLog

logger = logging.getLogger(__name__)


class BaseRepository:
    """Base class for all PostgreSQL repositories.

    Subclasses set `model` to the SQLAlchemy model class.
    All read methods must use `_scoped_query()` to enforce tenant isolation.
    """

    model = None  # Subclasses override: e.g., Patient, Case

    def __init__(self, db: AsyncSession):
        self.db = db

    # ── Tenant-scoped query builder ──

    def _scoped_query(self, tenant_id: str):
        """Return a SELECT pre-filtered by tenant_id.

        This is the ONLY way to start a query in a repository.
        Direct `select(self.model)` without tenant scoping is forbidden.
        """
        return select(self.model).where(self.model.tenant_id == tenant_id)

    def _scoped_count(self, tenant_id: str):
        """Return a COUNT query pre-filtered by tenant_id."""
        return select(func.count(self.model.id)).where(
            self.model.tenant_id == tenant_id
        )

    # ── Soft-delete filtering ──

    def _active_only(self, query):
        """Add is_deleted=False filter if the model supports soft-delete."""
        if hasattr(self.model, "is_deleted"):
            return query.where(self.model.is_deleted == False)  # noqa: E712
        return query

    # ── Audit logging ──

    def _audit(
        self,
        action: str,
        resource_id: str,
        tenant_id: str,
        actor_id: str = "system",
        actor_type: str = "user",
        changes: dict | None = None,
    ):
        """Create an AuditLog entry. Called automatically by write methods."""
        self.db.add(
            AuditLog(
                id=str(uuid.uuid4()),
                actor_id=actor_id,
                actor_type=actor_type,
                action=action,
                resource_type=self.model.__name__ if self.model else "unknown",
                resource_id=resource_id,
                tenant_id=tenant_id,
                changes=changes,
            )
        )

    # ── Error wrapping ──

    async def _execute(self, query):
        """Execute a query with error wrapping."""
        try:
            return await self.db.execute(query)
        except OperationalError as e:
            raise StoreUnavailableError(
                "postgres", operation="read", original_error=e
            ) from e
        except IntegrityError as e:
            error_msg = str(e.orig) if e.orig else str(e)
            if "unique" in error_msg.lower() or "duplicate" in error_msg.lower():
                raise DuplicateRecordError("postgres", detail=error_msg) from e
            raise QueryError("postgres", detail=error_msg, original_error=e) from e
        except Exception as e:
            raise DataStoreError(
                f"Unexpected database error: {e}",
                store="postgres",
                operation="read",
                original_error=e,
            ) from e

    async def _flush(self):
        """Flush pending changes with error wrapping."""
        try:
            await self.db.flush()
        except OperationalError as e:
            raise StoreUnavailableError(
                "postgres", operation="write", original_error=e
            ) from e
        except IntegrityError as e:
            error_msg = str(e.orig) if e.orig else str(e)
            if "unique" in error_msg.lower() or "duplicate" in error_msg.lower():
                raise DuplicateRecordError("postgres", detail=error_msg) from e
            raise QueryError("postgres", detail=error_msg, original_error=e) from e

    # ── Common read patterns ──

    async def get_by_id(self, record_id: str, tenant_id: str):
        """Fetch a single record by ID, scoped to tenant. Returns None if not found."""
        query = self._scoped_query(tenant_id).where(self.model.id == record_id)
        query = self._active_only(query)
        result = await self._execute(query)
        return result.scalar_one_or_none()

    async def list_paginated(
        self,
        tenant_id: str,
        page: int = 1,
        page_size: int = 20,
        order_by=None,
    ) -> tuple[list, int]:
        """List records with pagination, scoped to tenant.

        Returns (records, total_count).
        """
        base = self._scoped_query(tenant_id)
        base = self._active_only(base)

        count_q = self._scoped_count(tenant_id)
        if hasattr(self.model, "is_deleted"):
            count_q = count_q.where(self.model.is_deleted == False)  # noqa: E712

        total = (await self._execute(count_q)).scalar() or 0

        if order_by is not None:
            base = base.order_by(order_by)
        elif hasattr(self.model, "created_at"):
            base = base.order_by(self.model.created_at.desc())

        base = base.offset((page - 1) * page_size).limit(page_size)
        result = await self._execute(base)

        return list(result.scalars().all()), total

app/repositories/patient_repository.py — PatientRepository

Public API (typed methods, not generic CRUD):

"""
Curaway — Patient Repository.

Typed data access for the Patient model with tenant isolation,
audit logging, and error wrapping.
"""

import uuid
from datetime import datetime, timezone

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession

from app.models.patient import Patient
from app.models.event import Event
from app.repositories.base import BaseRepository


class PatientRepository(BaseRepository):
    model = Patient

    async def get_by_id(self, patient_id: str, tenant_id: str) -> Patient | None:
        """Fetch a patient by ID, scoped to tenant. Excludes soft-deleted."""
        query = (
            self._scoped_query(tenant_id)
            .where(Patient.id == patient_id)
        )
        query = self._active_only(query)
        result = await self._execute(query)
        return result.scalar_one_or_none()

    async def get_by_auth_id(
        self, external_auth_id: str, tenant_id: str
    ) -> Patient | None:
        """Fetch a patient by Clerk auth ID, scoped to tenant."""
        query = (
            self._scoped_query(tenant_id)
            .where(Patient.external_auth_id == external_auth_id)
        )
        query = self._active_only(query)
        result = await self._execute(query)
        return result.scalar_one_or_none()

    async def list_active(
        self,
        tenant_id: str,
        intake_status: str | None = None,
        is_active: bool | None = True,
        page: int = 1,
        page_size: int = 20,
    ) -> tuple[list[Patient], int]:
        """List patients with filtering and pagination, scoped to tenant."""
        query = self._scoped_query(tenant_id)
        query = self._active_only(query)
        count_query = self._scoped_count(tenant_id)
        if hasattr(Patient, "is_deleted"):
            count_query = count_query.where(Patient.is_deleted == False)  # noqa: E712

        if intake_status:
            query = query.where(Patient.intake_status == intake_status)
            count_query = count_query.where(Patient.intake_status == intake_status)
        if is_active is not None:
            query = query.where(Patient.is_active == is_active)
            count_query = count_query.where(Patient.is_active == is_active)

        total = (await self._execute(count_query)).scalar() or 0
        query = query.order_by(Patient.created_at.desc())
        query = query.offset((page - 1) * page_size).limit(page_size)
        result = await self._execute(query)

        return list(result.scalars().all()), total

    async def create(
        self,
        patient: Patient,
        tenant_id: str,
        actor_id: str = "system",
    ) -> Patient:
        """Persist a new Patient. Auto-creates audit log and intake event."""
        self.db.add(patient)

        self._audit(
            action="patient.create",
            resource_id=patient.id,
            tenant_id=tenant_id,
            actor_id=actor_id,
        )

        self.db.add(
            Event(
                id=str(uuid.uuid4()),
                event_type="intake.started",
                tenant_id=tenant_id,
                actor_id=actor_id,
                patient_id=patient.id,
                payload={
                    "locale": patient.preferred_locale,
                    "source": patient.referral_source,
                },
                source_service="intake_api",
            )
        )

        await self._flush()
        return patient

    async def update(
        self,
        patient: Patient,
        changes: dict,
        actor_id: str = "system",
    ) -> Patient:
        """Update a patient and log changes. Caller sets fields before calling."""
        patient.updated_at = datetime.now(timezone.utc)

        if changes:
            self._audit(
                action="patient.update",
                resource_id=patient.id,
                tenant_id=patient.tenant_id,
                actor_id=actor_id,
                changes=changes,
            )

        await self._flush()
        return patient

    async def update_intake_status(
        self,
        patient: Patient,
        new_status: str,
        actor_id: str = "system",
    ) -> Patient:
        """Advance patient intake status with audit + event."""
        old_status = patient.intake_status
        patient.intake_status = new_status
        patient.updated_at = datetime.now(timezone.utc)

        self._audit(
            action="patient.intake_status_change",
            resource_id=patient.id,
            tenant_id=patient.tenant_id,
            actor_id=actor_id,
            changes={"intake_status": {"before": old_status, "after": new_status}},
        )

        self.db.add(
            Event(
                id=str(uuid.uuid4()),
                event_type=f"intake.{new_status}",
                tenant_id=patient.tenant_id,
                actor_id=actor_id,
                patient_id=patient.id,
                payload={"old_status": old_status, "new_status": new_status},
                source_service="intake_api",
            )
        )

        await self._flush()
        return patient

app/repositories/case_repository.py — CaseRepository

Public API:

Method Signature Notes
get_by_id (case_id: str, tenant_id: str) -> Case \| None Tenant-scoped by base class
get_by_patient (patient_id: str, tenant_id: str) -> list[Case] Ordered by updated_at desc
create (patient_id: str, tenant_id: str) -> dict Creates Case + Conversation. Generates case number scoped to tenant (fixes the current global sequence bug). Returns {case_id, case_number, conversation_id, status, workflow_state}.
update_status (case_id: str, tenant_id: str, new_status: str) -> Case \| None Validates transition via is_valid_transition(). Sets completed_at on close/forward.
update_workflow_state (case_id: str, tenant_id: str, phase: str, value: bool) -> Case \| None Updates a single workflow phase.
set_procedure (case_id: str, tenant_id: str, procedure_code: str, procedure_name: str) -> Case \| None Sets procedure and marks procedure_identified=True.
update_ehr_snapshot (case_id: str, tenant_id: str, ehr_data: dict) -> Case \| None Sets EHR data and marks ehr_constructed=True.
set_selected_providers (case_id: str, tenant_id: str, providers: list[dict]) -> Case \| None Sets provider selections and marks providers_selected=True.
add_comorbidity (case_id: str, tenant_id: str, comorbidity: dict) -> Case \| None Appends to comorbidity list.
generate_case_number (tenant_id: str) -> str Tenant-scoped sequence: CRW-{YEAR}-{5-digit}. Fixes the current bug where _generate_case_number() queries all tenants.

Key fix in generate_case_number:

async def generate_case_number(self, tenant_id: str) -> str:
    """Generate CRW-{YEAR}-{5-digit} case number, scoped to tenant."""
    year = datetime.now(timezone.utc).year
    prefix = f"CRW-{year}-"
    result = await self._execute(
        select(func.max(Case.case_number))
        .where(Case.tenant_id == tenant_id)
        .where(Case.case_number.like(f"{prefix}%"))
    )
    max_number = result.scalar()
    if max_number:
        seq = int(max_number.split("-")[-1]) + 1
    else:
        seq = 1
    return f"{prefix}{seq:05d}"

app/repositories/fhir_repository.py — FhirRepository

Public API:

Method Signature Notes
get_by_id (resource_id: str, tenant_id: str) -> FhirResource \| None Tenant-scoped
list_by_patient (patient_id: str, tenant_id: str, resource_type: str \| None, active_only: bool) -> list[FhirResource] Filters by resource_type and active status
create (resource: FhirResource, tenant_id: str, actor_id: str) -> FhirResource Auto-audit. FHIR validation stays in fhir_service.py.
update (resource: FhirResource, changes: dict, actor_id: str) -> FhirResource Auto-audit
delete (resource_id: str, tenant_id: str, actor_id: str) -> bool Soft-delete with audit
count_by_patient (patient_id: str, tenant_id: str) -> int For completeness scoring

app/repositories/document_repository.py — DocumentRepository

Public API:

Method Signature Notes
get_by_id (document_id: str, patient_id: str, tenant_id: str) -> DocumentReference \| None Triple-scoped: document + patient + tenant
list_by_patient (patient_id: str, tenant_id: str, category: str \| None) -> list[DocumentReference] Ordered by uploaded_at desc
create (doc: DocumentReference, tenant_id: str, actor_id: str) -> DocumentReference Auto-audit + event
update_ocr_status (document_id: str, tenant_id: str, status: str, extracted_data: dict \| None) -> DocumentReference \| None For OCR pipeline updates
supersede_previous (patient_id: str, tenant_id: str, filename: str) -> list[str] Marks previous versions as superseded. Replaces the raw SQL in document_service.py:confirm_upload().

app/repositories/provider_repository.py — ProviderRepository

Public API:

Method Signature Notes
get_by_id (provider_id: str, tenant_id: str) -> Provider \| None Tenant-scoped
get_by_slug (slug: str, tenant_id: str) -> Provider \| None For public API
list_active (tenant_id: str, page: int, page_size: int) -> tuple[list[Provider], int] Paginated, active only
get_for_matching (tenant_id: str, specialties: list[str] \| None) -> list[Provider] For matching engine. Returns all active providers (optionally filtered by specialty).
search (tenant_id: str, query: str, country: str \| None) -> list[Provider] Text search on name, city, specialties

Service migration examples

patient_service.py — Before and After

Before (current get_patient):

async def get_patient(db: AsyncSession, patient_id: str, tenant_id: str) -> Patient | None:
    result = await db.execute(
        select(Patient).where(
            Patient.id == patient_id,
            Patient.tenant_id == tenant_id,
            Patient.is_deleted == False,
        )
    )
    return result.scalar_one_or_none()

After (using repository):

async def get_patient(db: AsyncSession, patient_id: str, tenant_id: str) -> Patient | None:
    repo = PatientRepository(db)
    return await repo.get_by_id(patient_id, tenant_id)

The service function signature is unchanged. Callers (routers) see no difference. The migration is invisible to the API surface.

case_service.py — Before and After

Before (current _generate_case_number — bug: no tenant filter):

async def _generate_case_number(db: AsyncSession) -> str:
    year = datetime.now(timezone.utc).year
    prefix = f"CRW-{year}-"
    result = await db.execute(
        text("SELECT MAX(case_number) FROM cases WHERE case_number LIKE :prefix"),
        {"prefix": f"{prefix}%"},
    )
    max_number = result.scalar()
    ...

After (using repository — fixed: tenant-scoped):

async def create_case(db: AsyncSession, patient_id: str, tenant_id: str) -> dict:
    repo = CaseRepository(db)
    return await repo.create(patient_id, tenant_id)

The generate_case_number function moves into CaseRepository where it uses _scoped_query to ensure tenant isolation.


Circuit breaker integration (Phase 1 prep)

Phase 1 does not add circuit breakers to PostgreSQL (SQLAlchemy's connection pool handles reconnection). However, the app/errors.py module and the StoreUnavailableError class are designed to be used by all stores in Phases 2 and 3. The error taxonomy is store-agnostic from day 1.

The BaseRepository._execute() method wraps OperationalError as StoreUnavailableError(store="postgres"). If a service catches StoreUnavailableError, it works the same whether the error came from Postgres, Neo4j, or Redis.


Test plan

Unit tests for each repository

Test file Coverage
test_base_repository.py _scoped_query always includes tenant_id. _active_only adds soft-delete filter. _audit creates AuditLog. _execute wraps OperationalError as StoreUnavailableError. _execute wraps IntegrityError as DuplicateRecordError.
test_patient_repository.py get_by_id returns None for wrong tenant. get_by_auth_id scopes by tenant. list_active paginates correctly. create writes audit log and event. update_intake_status records old and new status.
test_case_repository.py generate_case_number scopes by tenant (two tenants get independent sequences). create generates case + conversation. update_status validates transitions.
test_errors.py DataStoreError.to_dict() serialization. StoreUnavailableError.retry_safe is True. TenantIsolationViolation.retry_safe is False.

Integration test for tenant isolation

test_tenant_isolation.py — a single test that:

  1. Creates Patient A in tenant-1
  2. Creates Patient B in tenant-2
  3. Asserts repo.get_by_id(patient_a_id, "tenant-2") returns None
  4. Asserts repo.list_active("tenant-1") does not include Patient B
  5. Asserts repo.list_active("tenant-2") does not include Patient A

This test uses the real SQLAlchemy session (async SQLite in-memory) to verify actual query behavior, not mocks.

CI enforcement

A lint test (test_repository_scoping.py) scans all app/repositories/*.py files and asserts that:

  • No file contains select( without going through _scoped_query or _scoped_count
  • No file uses raw text() SQL (which bypasses the ORM and tenant filtering)

Edge Cases

Edge Case Scenario Handling Severity
Repository method called without tenant_id A service accidentally passes None or an empty string as tenant_id to a repository method (e.g., repo.get_by_id(patient_id, None)). The _scoped_query would generate WHERE tenant_id IS NULL or WHERE tenant_id = '', silently returning no results instead of failing loudly. Add a guard at the top of _scoped_query and _scoped_count: if not tenant_id: raise TenantIsolationViolation(). This converts a silent data-loss bug into an immediate, obvious crash. The TenantIsolationViolation error (already defined in app/errors.py) has retry_safe=False so callers know not to retry. Add a unit test that asserts _scoped_query(None) and _scoped_query("") both raise. Critical
_scoped_query used on a table without tenant_id column A developer creates a new repository for a table (e.g., exchange_rates from the multicurrency spec) that doesn't have a tenant_id column and inherits from BaseRepository. _scoped_query would raise AttributeError: type object 'ExchangeRate' has no attribute 'tenant_id'. Two options: (1) Add a class-level tenant_scoped = True flag on BaseRepository (default True). Tables without tenant scoping set tenant_scoped = False and override _scoped_query to raise NotImplementedError("This repository is not tenant-scoped — use direct queries"). (2) Alternatively, don't inherit from BaseRepository for unscoped tables — create a separate BaseUnscopedRepository with audit logging but no tenant filtering. Option 2 is cleaner. Document the choice in the BaseRepository docstring. High
Concurrent writes to the same record from different services Two services (e.g., case_service updating workflow state and ehr_builder_agent updating ehr_snapshot) both read the same Case row, modify different fields, and flush. The second flush overwrites the first's changes because SQLAlchemy tracks the full object. This is an existing bug that the DAO layer inherits, not introduces. Mitigation: repository update methods should use UPDATE ... SET field = value WHERE id = :id AND tenant_id = :tid (column-level updates) rather than flushing the full ORM object. For update_workflow_state and update_ehr_snapshot, use db.execute(update(Case).where(...).values(field=new_value)) to avoid clobbering unrelated columns. Add a comment in BaseRepository documenting that callers should prefer column-level updates for hot tables. High
Transaction rollback in one repository while another committed Service A calls PatientRepository.create() (flushes) then CaseRepository.create() (flushes). If the case creation fails, the patient row is already flushed but not committed — both are in the same session. However, if the service explicitly called db.commit() between the two operations, the patient is committed and the case rollback leaves an orphaned patient. Repositories only flush(), never commit(). The commit boundary is owned by the router (via FastAPI's dependency injection get_db() which commits on success and rolls back on exception). This is already the pattern — document it explicitly in BaseRepository docstring: "Repositories flush but never commit. The session lifecycle is managed by the caller (router/endpoint)." Add a CI lint test that asserts no repository file contains db.commit(). High
Repository import fails at startup (circular import) patient_service.py imports PatientRepository which imports Patient model. If Patient model ever imports something from patient_service.py (e.g., for a hybrid property that calls a service function), Python raises ImportError at startup. Enforce a strict dependency direction: Models -> Repositories -> Services -> Routers. No backward imports. The CI lint test (test_repository_scoping.py) should also scan for imports: assert no app/repositories/*.py file imports from app/services/ and no app/models/*.py file imports from app/repositories/. Document the dependency direction in app/repositories/__init__.py. Medium
BaseRepository subclass forgets to call super().__init__ A new repository overrides __init__ to add custom initialization but forgets super().__init__(db). All subsequent calls to self.db raise AttributeError. Make BaseRepository.__init__ set self.db and also validate that self.model is not None: if self.model is None: raise TypeError(f"{type(self).__name__} must set 'model' class attribute"). This catches two bugs at instantiation time rather than at query time. Add a unit test that creates a subclass without model set and asserts the error. Low
Query returns empty result — None vs empty list inconsistency get_by_id returns None when not found. list_active returns ([], 0). But what does get_by_patient return for zero cases — None or []? If the contract is inconsistent, services have to handle both shapes. Establish and document a clear contract: single-record lookups (get_by_id, get_by_auth_id, get_by_slug) return T | None. Multi-record lookups (list_active, get_by_patient, get_for_matching) return list[T] (empty list, never None). Paginated methods return tuple[list[T], int]. Add this contract to the BaseRepository class docstring and enforce it in type annotations. Medium
tenant_id mismatch between URL param and authenticated user's tenant A request arrives with X-Tenant-ID: tenant-apollo-001 but the Clerk JWT's org_id resolves to a different tenant. The repository will faithfully scope to whatever tenant_id the service passes — it doesn't know about the auth layer. If the router passes the wrong one, tenant isolation is bypassed. This is an auth-layer concern, not a repository concern, but the DAO layer should be the last line of defense. The router's _get_tenant_id dependency (which already exists) must validate that the URL/header tenant matches the JWT tenant. The repository trusts its caller. Document in BaseRepository: "Repositories trust the tenant_id parameter. Callers (routers) are responsible for validating tenant authorization before calling repository methods." Add an integration test in test_tenant_isolation.py that verifies a repository call with a mismatched tenant returns no results. Critical

Implementation checklist

Opus tier (architectural decisions, design)

Task Description Est. time
O1 Design BaseRepository class with tenant scoping, audit logging, and error wrapping 30 min
O2 Design DataStoreError hierarchy (base + 5 subclasses, to_dict(), retry_safe semantics) 20 min
O3 Design PatientRepository public API (method signatures, return types, edge cases) 20 min
O4 Design tenant isolation enforcement strategy (lint test, CI check) 15 min
O5 Review service migration plan — verify no business logic leaks into repositories 15 min

Sonnet tier (implementation following established pattern)

Task Description Est. time
S1 Implement app/errors.py (DataStoreError hierarchy) 15 min
S2 Implement app/repositories/base.py (BaseRepository) 30 min
S3 Implement app/repositories/patient_repository.py 30 min
S4 Implement app/repositories/case_repository.py (including tenant-scoped case number fix) 30 min
S5 Implement app/repositories/fhir_repository.py 20 min
S6 Implement app/repositories/document_repository.py 20 min
S7 Implement app/repositories/provider_repository.py 20 min
S8 Implement app/repositories/__init__.py 5 min
S9 Migrate patient_service.py to use PatientRepository 20 min
S10 Migrate case_service.py to use CaseRepository 25 min
S11 Migrate fhir_service.py to use FhirRepository 15 min
S12 Migrate document_service.py to use DocumentRepository 15 min
S13 Migrate provider_service.py to use ProviderRepository 15 min
S14 Migrate match_service.py reads to use repositories 20 min
S15 Write tests/test_repositories/test_base_repository.py 25 min
S16 Write tests/test_repositories/test_patient_repository.py 25 min
S17 Write tests/test_repositories/test_case_repository.py 20 min
S18 Write tests/test_repositories/test_tenant_isolation.py 15 min
S19 Write tests/test_errors.py 10 min
S20 Write tests/test_repository_scoping.py (CI lint) 10 min
S21 Verify all 760+ existing tests still pass 10 min

Total estimated time: ~7 hours (1 Opus + Sonnet session)


Phase 2 outline — GraphDAO + CacheDAO

Scope (not detailed here):

  • app/dao/graph_dao.py — Wraps graph_service.py with:
  • pybreaker circuit breaker on the Neo4j driver (fail_threshold=3, reset=60s)
  • DataStoreError wrapping for all Neo4j exceptions
  • Health state exposed via get_store_health()
  • The 30+ existing functions in graph_service.py stay — GraphDAO is a thin wrapper, not a rewrite

  • app/dao/cache_dao.py — Wraps redis_pubsub.py cache functions with:

  • pybreaker circuit breaker on the Upstash REST endpoint (fail_threshold=5, reset=15s)
  • DataStoreError wrapping for httpx errors
  • Pub/sub functions stay in redis_pubsub.py unchanged

  • Circuit breaker registryapp/dao/breaker_registry.py:

  • Central registry of all circuit breakers
  • get_store_health() returns all breaker states
  • Wired into /landscape health page

  • Modified files:

  • app/services/match_service.py — catch StoreUnavailableError from GraphDAO instead of bare Exception from graph_service
  • app/services/case_service.py — catch StoreUnavailableError from CacheDAO for Redis cache calls
  • app/agents/case_orchestrator.py — catch StoreUnavailableError from GraphDAO
  • app/routers/health.py (or landscape) — use breaker registry for health status
  • 9 files that import redis_pubsub cache functions — switch to CacheDAO

  • Dependencies: pybreaker added to requirements.txt

  • Estimated time: 1 session


Phase 3 outline — StorageDAO + QueueDAO

Scope (not detailed here):

  • app/dao/storage_dao.py — Wraps r2_client.py with:
  • pybreaker circuit breaker (fail_threshold=3, reset=30s)
  • Retry on transient 503 errors (1 retry with 2s backoff)
  • DataStoreError wrapping for BotoCoreError/ClientError

  • app/dao/queue_dao.py — Wraps qstash_client.py with:

  • pybreaker circuit breaker (fail_threshold=3, reset=60s)
  • Retry on 429 with exponential backoff (max 2 retries)
  • DataStoreError wrapping for httpx errors

  • Modified files:

  • app/services/document_service.py — switch R2 calls to StorageDAO
  • app/agents/case_orchestrator.py — switch R2 calls to StorageDAO
  • app/services/document_processing.py — switch QStash calls to QueueDAO
  • 3 files that import r2_client — switch to StorageDAO
  • 3 files that import qstash_client — switch to QueueDAO

  • Estimated time: Half a session


Session 41 New Tables — Future Repositories

Session 41 designed 6 new tables that will need repositories when implemented. These are NOT in Phase 1 scope but the DAO architecture must accommodate them.

Table Owner repository Phase Notes
coordinator_vendors CoordinatorVendorRepository Wave 4 Vendor CRUD, search by city + capabilities. Tenant-scoped (owning coordinator tenant).
coordinator_service_bookings BookingRepository Wave 4 Booking lifecycle (requested → confirmed → completed). Tenant-scoped via case_id → patient → tenant.
coordinator_audit_log CoordinatorAuditRepository Wave 4 Append-only. Tenant-scoped. Every coordinator action logged.
graph_node_audit_log GraphNodeAuditRepository Wave 1 (intake restructuring) Append-only. Per-turn graph execution trace. Tenant-scoped via case_id.
scoring_audit_log ScoringAuditRepository Wave 1 Append-only. PFS/HSS/FMS computation inputs + outputs. Tenant-scoped via case_id.
extraction_audit_log ExtractionAuditRepository Wave 1 Append-only. Per-layer extractor outputs. Tenant-scoped via case_id.

Design principle: All audit tables are append-only — repositories expose create() and list_by_case() only, never update() or delete(). This aligns with compliance requirements (tamper-evident).

BaseRepository compatibility: All 6 tables have case_id which resolves to tenant_id via the case. Two patterns: 1. Tables with direct tenant_id column → standard _scoped_query(tenant_id) (coordinator_vendors, coordinator_audit_log) 2. Tables scoped via case_id → add _scoped_by_case(case_id, tenant_id) helper to BaseRepository that joins through case:

def _scoped_by_case(self, case_id: str, tenant_id: str):
    """Scope query by case_id, with tenant_id validation via join."""
    return (
        select(self.model)
        .join(Case, self.model.case_id == Case.id)
        .where(self.model.case_id == case_id)
        .where(Case.tenant_id == tenant_id)
    )

LangGraph Checkpointing — DB Access Strategy

LangGraph's AsyncPostgresSaver handles checkpoint persistence independently of the DAO layer. It uses its own table (checkpoints) managed by LangGraph's schema, not by our Alembic migrations.

Decision: LangGraph checkpointing is NOT wrapped in a repository.

Rationale: - AsyncPostgresSaver is a battle-tested LangGraph component with its own connection management - Checkpoints are keyed by thread_id (= case_id in our mapping), not by tenant_id directly - Wrapping it would break LangGraph's internal contract with no benefit - Tenant isolation is enforced by the fact that each case has a unique thread_id, and case access is already tenant-gated at the router level

What we DO need: - Configure AsyncPostgresSaver to use the same Railway PostgreSQL connection string - Map thread_id = case_id so checkpoints are case-scoped - The checkpoint table is auto-created by LangGraph on first use (no Alembic migration needed) - Add checkpoint cleanup to the GDPR erasure cascade (ADR-0019): DELETE FROM checkpoints WHERE thread_id = :case_id

Where this is configured: In the Triage Agent graph setup (Wave 1, item 1.13):

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

checkpointer = AsyncPostgresSaver.from_conn_string(DATABASE_URL)
graph = triage_agent_graph.compile(checkpointer=checkpointer)

Service Migration Order

Phase 1 targets 6 services. Migration order matters — start with highest-value (most queries, known bugs):

Order Service Why first Known issues
1 case_service.py Has the tenant-scoping bug in _generate_case_number(). Most complex service. Fixes a real bug. Global case number sequence
2 patient_service.py Most called service. Clean migration template for the rest. None
3 fhir_service.py High volume (multiple FHIR resources per case). Tenant isolation critical for PHI. None
4 document_service.py Has raw SQL for supersede_previous. Raw text() SQL bypassing ORM
5 provider_service.py Read-heavy, simpler migration. None
6 match_service.py Depends on Patient + Provider repos being ready. Migrate last. None

Phase 1.5 — Additional Services (post-Phase 1, pre-Wave 1)

These services have inline DB queries but are lower priority. Migrate them between Wave 0 and Wave 1:

Service DB access pattern Effort
ehr_rebuild_service.py Reads FHIR, case, patient; writes EHR snapshot 1 hour
conversation_service.py Reads/writes messages, conversations 45 min
feedback_service.py Reads match_results, feedback_records 30 min
case_porting_service.py Cascading deletes across 6 tables 1 hour
patient_state.py Reads patient, FHIR, case, document, EHR 45 min
data_subject_handler.py GDPR cascade deletes across 10 tables 1 hour

Phase 1.5 total: ~5 hours additional effort.


RLS Integration Decision

Decision: DAO first (Wave 0.1), RLS second (Wave 0.2). Defense-in-depth, not either/or.

The DAO layer provides application-level tenant isolation. PostgreSQL RLS provides database-level enforcement as a second line of defense. Both are needed:

Layer What it catches When it fails
DAO (_scoped_query) Missing or wrong tenant_id in service code Bug in a new service that bypasses the repository
RLS (database policy) Any query from any source — even raw SQL, migration scripts, direct DB access Misconfigured current_setting('app.tenant_id')

Implementation sequence: 1. DAO repositories ship first — they enforce tenant isolation in application code 2. RLS ships second — adds database-level enforcement on the same tables 3. When both are active, DAO filtering and RLS filtering are redundant by design. This is intentional — if one layer has a bug, the other catches it.

Performance: Both layers filter by tenant_id. PostgreSQL's query optimizer will recognize the redundant predicate and not double-evaluate. No measurable performance impact.

Connection setup for RLS: Each request must SET app.tenant_id = :tid on the database connection. This is done in the FastAPI middleware or get_db() dependency — NOT in the repository layer. Repositories are RLS-unaware.


Cross-Tenant Access Pattern

For super-admin operations (GDPR deletion, analytics, system reports), a BaseUnscopedRepository provides access without tenant filtering:

class BaseUnscopedRepository:
    """For system tables and cross-tenant admin operations.

    WARNING: Only use for:
    - System-wide tables (providers, exchange_rates, treatment_categories)
    - Super-admin operations with explicit authorization check
    - GDPR erasure cascade (which must delete across all tenants)

    All uses MUST be logged in audit_log with actor_type='system_admin'.
    """

    model = None

    def __init__(self, db: AsyncSession):
        self.db = db

    def _query(self):
        """Unscoped query. No tenant filter."""
        return select(self.model)

    async def _execute(self, query):
        """Same error wrapping as BaseRepository."""
        # ... same try/except pattern

Usage: ProviderRepository extends BaseUnscopedRepository (providers are globally visible). System tables (exchange_rates, treatment_categories, consent_purposes) use BaseUnscopedRepository. Admin reports use a specific AdminReportRepository extending BaseUnscopedRepository with mandatory audit logging.

GDPR data_subject_handler: Uses BaseUnscopedRepository to cascade deletes across all tenants for a given patient. Every delete is audit-logged with actor_type='gdpr_cascade'.


Additional Edge Cases (Session 41 Audit)

Edge Case Scenario Handling Severity
Multi-repository transaction failure PatientRepository.create() flushes, then CaseRepository.create() fails. Both are in the same session. Both repositories flush but never commit. The session rolls back entirely on exception. Document: "Always wrap multi-repository operations in a single session. Never call db.commit() between repository operations." High
Non-existent tenant_id (valid UUID format but no matching tenant) Service passes a well-formed UUID that doesn't exist in the tenants table. Repository returns empty results — no error raised. Repository trusts the tenant_id (it's a WHERE filter, not a JOIN). The router/auth layer validates tenant existence. Add a note: "Repositories do not validate tenant existence. A query with a non-existent tenant_id returns empty results by design — this is correct behavior." Medium
Connection pool exhaustion High traffic causes all connections to be in use. New repository calls block waiting for a connection. StoreUnavailableError raised by _execute() when SQLAlchemy's pool times out. Service catches StoreUnavailableError and returns 503. The error has retry_safe=True so clients can retry. High

Dependencies and prerequisites

Dependency Status Notes
pybreaker Not installed Required for Phase 2. Not needed for Phase 1.
config/service_registry.yaml Exists (per CLAUDE.md Rule 1) Add circuit_breakers section in Phase 2.
Flagsmith flags N/A No feature flag needed �� the DAO layer is an internal refactor, not a user-facing feature.
PostgreSQL RLS Deferred to Wave 0.2 DAO ships first (Wave 0.1), RLS adds defense-in-depth after.
LangGraph AsyncPostgresSaver Not yet configured Configured in Wave 1 (item 1.13). Uses same Railway PostgreSQL. Not wrapped in a repository.

What changes for SD

Nothing changes in the development workflow:

  • Depends(get_db) still provides the database session
  • Service functions still have the same signatures
  • Routers are completely unaffected
  • Tests run the same way
  • The migration is done one service at a time — each PR is small and reviewable

The main benefit SD sees: when a new service is created, the repository pattern provides a template to copy. Instead of writing raw select(Model).where(...), copy patient_repository.py and change the model + method names. Tenant isolation comes free.