DAO Layer — Feature Spec¶
Status: Implemented in Session 41
Companion steer: ai-steer/dao-layer-steer.md
This is the implementation plan for Phase 1 (PostgreSQL repositories). Read the steer first — it covers the problem statement, design principles, error taxonomy, circuit breaker design, and migration strategy.
Overview¶
Phase 1 introduces a repository layer for PostgreSQL access. Five typed
repositories share a BaseRepository that enforces tenant isolation, wraps
errors in a unified taxonomy, and provides automatic audit logging for write
operations. Services are migrated one at a time — each migration is a separate
PR that replaces inline SQLAlchemy queries with repository method calls.
No changes to Neo4j, Redis, R2, or QStash in Phase 1.
File-by-file change list¶
New files¶
| File | Purpose | Lines (est.) |
|---|---|---|
app/errors.py |
DataStoreError hierarchy: base class + 5 subclasses |
~80 |
app/repositories/__init__.py |
Package init, re-exports all repositories | ~15 |
app/repositories/base.py |
BaseRepository with tenant scoping, audit logging, error wrapping |
~150 |
app/repositories/patient_repository.py |
PatientRepository — typed methods for Patient CRUD |
~180 |
app/repositories/case_repository.py |
CaseRepository — typed methods for Case + Conversation |
~160 |
app/repositories/fhir_repository.py |
FhirRepository — typed methods for FHIR resource CRUD |
~120 |
app/repositories/document_repository.py |
DocumentRepository — typed methods for DocumentReference |
~130 |
app/repositories/provider_repository.py |
ProviderRepository — typed methods for Provider reads |
~100 |
tests/test_repositories/test_base_repository.py |
Unit tests for BaseRepository tenant isolation + audit | ~120 |
tests/test_repositories/test_patient_repository.py |
Unit tests for PatientRepository | ~150 |
tests/test_repositories/test_case_repository.py |
Unit tests for CaseRepository | ~120 |
tests/test_repositories/test_tenant_isolation.py |
Integration test verifying cross-tenant access is blocked | ~80 |
tests/test_errors.py |
Unit tests for error taxonomy (serialization, retry_safe flags) | ~60 |
Modified files (Phase 1 service migrations)¶
| File | Change |
|---|---|
app/services/patient_service.py |
Replace inline select(Patient).where(...) with PatientRepository calls. Remove manual AuditLog / Event creation for CRUD ops (BaseRepository handles it). Keep business logic (encryption, status transitions) in the service. |
app/services/case_service.py |
Replace inline select(Case).where(...) with CaseRepository calls. Fix _generate_case_number() to scope by tenant_id. Keep workflow state machine logic in the service. |
app/services/fhir_service.py |
Replace inline select(FhirResource).where(...) with FhirRepository calls. Keep FHIR validation logic in the service. |
app/services/document_service.py |
Replace inline select(DocumentReference).where(...) with DocumentRepository calls. R2 and QStash calls stay in the service (Phase 3). |
app/services/provider_service.py |
Replace inline select(Provider).where(...) with ProviderRepository calls. Neo4j auto-sync stays in the service (Phase 2). |
app/services/match_service.py |
Replace direct Patient/Provider/FHIR reads with repository calls. The match orchestration logic stays in the service. |
Unchanged files¶
| File | Why unchanged |
|---|---|
app/database.py |
Session factory and get_db() dependency stay as-is. Repositories receive sessions, they don't create them. |
app/services/graph_service.py |
Phase 2 (GraphDAO). |
app/integrations/redis_pubsub.py |
Phase 2 (CacheDAO). |
app/integrations/r2_client.py |
Phase 3 (StorageDAO). |
app/integrations/qstash_client.py |
Phase 3 (QueueDAO). |
app/integrations/qdrant_service.py |
Already well-abstracted. Not wrapped. |
app/services/semantic_search_service.py |
Already well-abstracted. Not wrapped. |
Detailed designs¶
app/errors.py — DataStoreError Taxonomy¶
"""
Curaway — Unified Data Store Error Taxonomy.
All data access errors are wrapped in DataStoreError subclasses.
Services catch these instead of store-specific exceptions (SQLAlchemy,
neo4j, botocore, httpx).
"""
class DataStoreError(Exception):
"""Base class for all data store errors."""
def __init__(
self,
message: str,
*,
store: str, # "postgres", "neo4j", "redis", "r2", "qstash"
operation: str, # "read", "write", "delete", "connect"
retry_safe: bool = False,
original_error: Exception | None = None,
):
super().__init__(message)
self.store = store
self.operation = operation
self.retry_safe = retry_safe
self.original_error = original_error
def to_dict(self) -> dict:
return {
"error_type": type(self).__name__,
"store": self.store,
"operation": self.operation,
"retry_safe": self.retry_safe,
"message": str(self),
}
class StoreUnavailableError(DataStoreError):
"""Store is down, connection refused, or circuit breaker is open."""
def __init__(self, store: str, operation: str = "connect", **kwargs):
super().__init__(
f"{store} is unavailable",
store=store,
operation=operation,
retry_safe=True,
**kwargs,
)
class TenantIsolationViolation(DataStoreError):
"""A query was attempted without a tenant_id filter."""
def __init__(self, store: str = "postgres", operation: str = "read"):
super().__init__(
"Query attempted without tenant_id — this is a bug, not a user error",
store=store,
operation=operation,
retry_safe=False,
)
class RecordNotFoundError(DataStoreError):
"""Expected record does not exist (after tenant-scoped lookup)."""
def __init__(self, store: str, resource_type: str, resource_id: str):
super().__init__(
f"{resource_type} {resource_id} not found",
store=store,
operation="read",
retry_safe=False,
)
class DuplicateRecordError(DataStoreError):
"""Unique constraint violation."""
def __init__(self, store: str, detail: str = ""):
super().__init__(
f"Duplicate record: {detail}" if detail else "Duplicate record",
store=store,
operation="write",
retry_safe=False,
)
class QueryError(DataStoreError):
"""Malformed query or constraint violation (not connectivity)."""
def __init__(self, store: str, detail: str, **kwargs):
super().__init__(
f"Query error: {detail}",
store=store,
operation="read",
retry_safe=False,
**kwargs,
)
app/repositories/base.py — BaseRepository¶
"""
Curaway — Base Repository.
All PostgreSQL repositories inherit from this. Provides:
1. Tenant-scoped query builder (enforced — no unscoped reads)
2. Automatic AuditLog creation on writes
3. Error wrapping (SQLAlchemy exceptions -> DataStoreError)
"""
import uuid
import logging
from datetime import datetime, timezone
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError, OperationalError
from app.errors import (
DataStoreError,
StoreUnavailableError,
DuplicateRecordError,
QueryError,
)
from app.models.audit import AuditLog
logger = logging.getLogger(__name__)
class BaseRepository:
"""Base class for all PostgreSQL repositories.
Subclasses set `model` to the SQLAlchemy model class.
All read methods must use `_scoped_query()` to enforce tenant isolation.
"""
model = None # Subclasses override: e.g., Patient, Case
def __init__(self, db: AsyncSession):
self.db = db
# ── Tenant-scoped query builder ──
def _scoped_query(self, tenant_id: str):
"""Return a SELECT pre-filtered by tenant_id.
This is the ONLY way to start a query in a repository.
Direct `select(self.model)` without tenant scoping is forbidden.
"""
return select(self.model).where(self.model.tenant_id == tenant_id)
def _scoped_count(self, tenant_id: str):
"""Return a COUNT query pre-filtered by tenant_id."""
return select(func.count(self.model.id)).where(
self.model.tenant_id == tenant_id
)
# ── Soft-delete filtering ──
def _active_only(self, query):
"""Add is_deleted=False filter if the model supports soft-delete."""
if hasattr(self.model, "is_deleted"):
return query.where(self.model.is_deleted == False) # noqa: E712
return query
# ── Audit logging ──
def _audit(
self,
action: str,
resource_id: str,
tenant_id: str,
actor_id: str = "system",
actor_type: str = "user",
changes: dict | None = None,
):
"""Create an AuditLog entry. Called automatically by write methods."""
self.db.add(
AuditLog(
id=str(uuid.uuid4()),
actor_id=actor_id,
actor_type=actor_type,
action=action,
resource_type=self.model.__name__ if self.model else "unknown",
resource_id=resource_id,
tenant_id=tenant_id,
changes=changes,
)
)
# ── Error wrapping ──
async def _execute(self, query):
"""Execute a query with error wrapping."""
try:
return await self.db.execute(query)
except OperationalError as e:
raise StoreUnavailableError(
"postgres", operation="read", original_error=e
) from e
except IntegrityError as e:
error_msg = str(e.orig) if e.orig else str(e)
if "unique" in error_msg.lower() or "duplicate" in error_msg.lower():
raise DuplicateRecordError("postgres", detail=error_msg) from e
raise QueryError("postgres", detail=error_msg, original_error=e) from e
except Exception as e:
raise DataStoreError(
f"Unexpected database error: {e}",
store="postgres",
operation="read",
original_error=e,
) from e
async def _flush(self):
"""Flush pending changes with error wrapping."""
try:
await self.db.flush()
except OperationalError as e:
raise StoreUnavailableError(
"postgres", operation="write", original_error=e
) from e
except IntegrityError as e:
error_msg = str(e.orig) if e.orig else str(e)
if "unique" in error_msg.lower() or "duplicate" in error_msg.lower():
raise DuplicateRecordError("postgres", detail=error_msg) from e
raise QueryError("postgres", detail=error_msg, original_error=e) from e
# ── Common read patterns ──
async def get_by_id(self, record_id: str, tenant_id: str):
"""Fetch a single record by ID, scoped to tenant. Returns None if not found."""
query = self._scoped_query(tenant_id).where(self.model.id == record_id)
query = self._active_only(query)
result = await self._execute(query)
return result.scalar_one_or_none()
async def list_paginated(
self,
tenant_id: str,
page: int = 1,
page_size: int = 20,
order_by=None,
) -> tuple[list, int]:
"""List records with pagination, scoped to tenant.
Returns (records, total_count).
"""
base = self._scoped_query(tenant_id)
base = self._active_only(base)
count_q = self._scoped_count(tenant_id)
if hasattr(self.model, "is_deleted"):
count_q = count_q.where(self.model.is_deleted == False) # noqa: E712
total = (await self._execute(count_q)).scalar() or 0
if order_by is not None:
base = base.order_by(order_by)
elif hasattr(self.model, "created_at"):
base = base.order_by(self.model.created_at.desc())
base = base.offset((page - 1) * page_size).limit(page_size)
result = await self._execute(base)
return list(result.scalars().all()), total
app/repositories/patient_repository.py — PatientRepository¶
Public API (typed methods, not generic CRUD):
"""
Curaway — Patient Repository.
Typed data access for the Patient model with tenant isolation,
audit logging, and error wrapping.
"""
import uuid
from datetime import datetime, timezone
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.patient import Patient
from app.models.event import Event
from app.repositories.base import BaseRepository
class PatientRepository(BaseRepository):
model = Patient
async def get_by_id(self, patient_id: str, tenant_id: str) -> Patient | None:
"""Fetch a patient by ID, scoped to tenant. Excludes soft-deleted."""
query = (
self._scoped_query(tenant_id)
.where(Patient.id == patient_id)
)
query = self._active_only(query)
result = await self._execute(query)
return result.scalar_one_or_none()
async def get_by_auth_id(
self, external_auth_id: str, tenant_id: str
) -> Patient | None:
"""Fetch a patient by Clerk auth ID, scoped to tenant."""
query = (
self._scoped_query(tenant_id)
.where(Patient.external_auth_id == external_auth_id)
)
query = self._active_only(query)
result = await self._execute(query)
return result.scalar_one_or_none()
async def list_active(
self,
tenant_id: str,
intake_status: str | None = None,
is_active: bool | None = True,
page: int = 1,
page_size: int = 20,
) -> tuple[list[Patient], int]:
"""List patients with filtering and pagination, scoped to tenant."""
query = self._scoped_query(tenant_id)
query = self._active_only(query)
count_query = self._scoped_count(tenant_id)
if hasattr(Patient, "is_deleted"):
count_query = count_query.where(Patient.is_deleted == False) # noqa: E712
if intake_status:
query = query.where(Patient.intake_status == intake_status)
count_query = count_query.where(Patient.intake_status == intake_status)
if is_active is not None:
query = query.where(Patient.is_active == is_active)
count_query = count_query.where(Patient.is_active == is_active)
total = (await self._execute(count_query)).scalar() or 0
query = query.order_by(Patient.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await self._execute(query)
return list(result.scalars().all()), total
async def create(
self,
patient: Patient,
tenant_id: str,
actor_id: str = "system",
) -> Patient:
"""Persist a new Patient. Auto-creates audit log and intake event."""
self.db.add(patient)
self._audit(
action="patient.create",
resource_id=patient.id,
tenant_id=tenant_id,
actor_id=actor_id,
)
self.db.add(
Event(
id=str(uuid.uuid4()),
event_type="intake.started",
tenant_id=tenant_id,
actor_id=actor_id,
patient_id=patient.id,
payload={
"locale": patient.preferred_locale,
"source": patient.referral_source,
},
source_service="intake_api",
)
)
await self._flush()
return patient
async def update(
self,
patient: Patient,
changes: dict,
actor_id: str = "system",
) -> Patient:
"""Update a patient and log changes. Caller sets fields before calling."""
patient.updated_at = datetime.now(timezone.utc)
if changes:
self._audit(
action="patient.update",
resource_id=patient.id,
tenant_id=patient.tenant_id,
actor_id=actor_id,
changes=changes,
)
await self._flush()
return patient
async def update_intake_status(
self,
patient: Patient,
new_status: str,
actor_id: str = "system",
) -> Patient:
"""Advance patient intake status with audit + event."""
old_status = patient.intake_status
patient.intake_status = new_status
patient.updated_at = datetime.now(timezone.utc)
self._audit(
action="patient.intake_status_change",
resource_id=patient.id,
tenant_id=patient.tenant_id,
actor_id=actor_id,
changes={"intake_status": {"before": old_status, "after": new_status}},
)
self.db.add(
Event(
id=str(uuid.uuid4()),
event_type=f"intake.{new_status}",
tenant_id=patient.tenant_id,
actor_id=actor_id,
patient_id=patient.id,
payload={"old_status": old_status, "new_status": new_status},
source_service="intake_api",
)
)
await self._flush()
return patient
app/repositories/case_repository.py — CaseRepository¶
Public API:
| Method | Signature | Notes |
|---|---|---|
get_by_id |
(case_id: str, tenant_id: str) -> Case \| None |
Tenant-scoped by base class |
get_by_patient |
(patient_id: str, tenant_id: str) -> list[Case] |
Ordered by updated_at desc |
create |
(patient_id: str, tenant_id: str) -> dict |
Creates Case + Conversation. Generates case number scoped to tenant (fixes the current global sequence bug). Returns {case_id, case_number, conversation_id, status, workflow_state}. |
update_status |
(case_id: str, tenant_id: str, new_status: str) -> Case \| None |
Validates transition via is_valid_transition(). Sets completed_at on close/forward. |
update_workflow_state |
(case_id: str, tenant_id: str, phase: str, value: bool) -> Case \| None |
Updates a single workflow phase. |
set_procedure |
(case_id: str, tenant_id: str, procedure_code: str, procedure_name: str) -> Case \| None |
Sets procedure and marks procedure_identified=True. |
update_ehr_snapshot |
(case_id: str, tenant_id: str, ehr_data: dict) -> Case \| None |
Sets EHR data and marks ehr_constructed=True. |
set_selected_providers |
(case_id: str, tenant_id: str, providers: list[dict]) -> Case \| None |
Sets provider selections and marks providers_selected=True. |
add_comorbidity |
(case_id: str, tenant_id: str, comorbidity: dict) -> Case \| None |
Appends to comorbidity list. |
generate_case_number |
(tenant_id: str) -> str |
Tenant-scoped sequence: CRW-{YEAR}-{5-digit}. Fixes the current bug where _generate_case_number() queries all tenants. |
Key fix in generate_case_number:
async def generate_case_number(self, tenant_id: str) -> str:
"""Generate CRW-{YEAR}-{5-digit} case number, scoped to tenant."""
year = datetime.now(timezone.utc).year
prefix = f"CRW-{year}-"
result = await self._execute(
select(func.max(Case.case_number))
.where(Case.tenant_id == tenant_id)
.where(Case.case_number.like(f"{prefix}%"))
)
max_number = result.scalar()
if max_number:
seq = int(max_number.split("-")[-1]) + 1
else:
seq = 1
return f"{prefix}{seq:05d}"
app/repositories/fhir_repository.py — FhirRepository¶
Public API:
| Method | Signature | Notes |
|---|---|---|
get_by_id |
(resource_id: str, tenant_id: str) -> FhirResource \| None |
Tenant-scoped |
list_by_patient |
(patient_id: str, tenant_id: str, resource_type: str \| None, active_only: bool) -> list[FhirResource] |
Filters by resource_type and active status |
create |
(resource: FhirResource, tenant_id: str, actor_id: str) -> FhirResource |
Auto-audit. FHIR validation stays in fhir_service.py. |
update |
(resource: FhirResource, changes: dict, actor_id: str) -> FhirResource |
Auto-audit |
delete |
(resource_id: str, tenant_id: str, actor_id: str) -> bool |
Soft-delete with audit |
count_by_patient |
(patient_id: str, tenant_id: str) -> int |
For completeness scoring |
app/repositories/document_repository.py — DocumentRepository¶
Public API:
| Method | Signature | Notes |
|---|---|---|
get_by_id |
(document_id: str, patient_id: str, tenant_id: str) -> DocumentReference \| None |
Triple-scoped: document + patient + tenant |
list_by_patient |
(patient_id: str, tenant_id: str, category: str \| None) -> list[DocumentReference] |
Ordered by uploaded_at desc |
create |
(doc: DocumentReference, tenant_id: str, actor_id: str) -> DocumentReference |
Auto-audit + event |
update_ocr_status |
(document_id: str, tenant_id: str, status: str, extracted_data: dict \| None) -> DocumentReference \| None |
For OCR pipeline updates |
supersede_previous |
(patient_id: str, tenant_id: str, filename: str) -> list[str] |
Marks previous versions as superseded. Replaces the raw SQL in document_service.py:confirm_upload(). |
app/repositories/provider_repository.py — ProviderRepository¶
Public API:
| Method | Signature | Notes |
|---|---|---|
get_by_id |
(provider_id: str, tenant_id: str) -> Provider \| None |
Tenant-scoped |
get_by_slug |
(slug: str, tenant_id: str) -> Provider \| None |
For public API |
list_active |
(tenant_id: str, page: int, page_size: int) -> tuple[list[Provider], int] |
Paginated, active only |
get_for_matching |
(tenant_id: str, specialties: list[str] \| None) -> list[Provider] |
For matching engine. Returns all active providers (optionally filtered by specialty). |
search |
(tenant_id: str, query: str, country: str \| None) -> list[Provider] |
Text search on name, city, specialties |
Service migration examples¶
patient_service.py — Before and After¶
Before (current get_patient):
async def get_patient(db: AsyncSession, patient_id: str, tenant_id: str) -> Patient | None:
result = await db.execute(
select(Patient).where(
Patient.id == patient_id,
Patient.tenant_id == tenant_id,
Patient.is_deleted == False,
)
)
return result.scalar_one_or_none()
After (using repository):
async def get_patient(db: AsyncSession, patient_id: str, tenant_id: str) -> Patient | None:
repo = PatientRepository(db)
return await repo.get_by_id(patient_id, tenant_id)
The service function signature is unchanged. Callers (routers) see no difference. The migration is invisible to the API surface.
case_service.py — Before and After¶
Before (current _generate_case_number — bug: no tenant filter):
async def _generate_case_number(db: AsyncSession) -> str:
year = datetime.now(timezone.utc).year
prefix = f"CRW-{year}-"
result = await db.execute(
text("SELECT MAX(case_number) FROM cases WHERE case_number LIKE :prefix"),
{"prefix": f"{prefix}%"},
)
max_number = result.scalar()
...
After (using repository — fixed: tenant-scoped):
async def create_case(db: AsyncSession, patient_id: str, tenant_id: str) -> dict:
repo = CaseRepository(db)
return await repo.create(patient_id, tenant_id)
The generate_case_number function moves into CaseRepository where it uses
_scoped_query to ensure tenant isolation.
Circuit breaker integration (Phase 1 prep)¶
Phase 1 does not add circuit breakers to PostgreSQL (SQLAlchemy's connection pool
handles reconnection). However, the app/errors.py module and the
StoreUnavailableError class are designed to be used by all stores in Phases 2
and 3. The error taxonomy is store-agnostic from day 1.
The BaseRepository._execute() method wraps OperationalError as
StoreUnavailableError(store="postgres"). If a service catches
StoreUnavailableError, it works the same whether the error came from Postgres,
Neo4j, or Redis.
Test plan¶
Unit tests for each repository¶
| Test file | Coverage |
|---|---|
test_base_repository.py |
_scoped_query always includes tenant_id. _active_only adds soft-delete filter. _audit creates AuditLog. _execute wraps OperationalError as StoreUnavailableError. _execute wraps IntegrityError as DuplicateRecordError. |
test_patient_repository.py |
get_by_id returns None for wrong tenant. get_by_auth_id scopes by tenant. list_active paginates correctly. create writes audit log and event. update_intake_status records old and new status. |
test_case_repository.py |
generate_case_number scopes by tenant (two tenants get independent sequences). create generates case + conversation. update_status validates transitions. |
test_errors.py |
DataStoreError.to_dict() serialization. StoreUnavailableError.retry_safe is True. TenantIsolationViolation.retry_safe is False. |
Integration test for tenant isolation¶
test_tenant_isolation.py — a single test that:
- Creates Patient A in tenant-1
- Creates Patient B in tenant-2
- Asserts
repo.get_by_id(patient_a_id, "tenant-2")returns None - Asserts
repo.list_active("tenant-1")does not include Patient B - Asserts
repo.list_active("tenant-2")does not include Patient A
This test uses the real SQLAlchemy session (async SQLite in-memory) to verify actual query behavior, not mocks.
CI enforcement¶
A lint test (test_repository_scoping.py) scans all app/repositories/*.py
files and asserts that:
- No file contains
select(without going through_scoped_queryor_scoped_count - No file uses raw
text()SQL (which bypasses the ORM and tenant filtering)
Edge Cases¶
| Edge Case | Scenario | Handling | Severity |
|---|---|---|---|
Repository method called without tenant_id |
A service accidentally passes None or an empty string as tenant_id to a repository method (e.g., repo.get_by_id(patient_id, None)). The _scoped_query would generate WHERE tenant_id IS NULL or WHERE tenant_id = '', silently returning no results instead of failing loudly. |
Add a guard at the top of _scoped_query and _scoped_count: if not tenant_id: raise TenantIsolationViolation(). This converts a silent data-loss bug into an immediate, obvious crash. The TenantIsolationViolation error (already defined in app/errors.py) has retry_safe=False so callers know not to retry. Add a unit test that asserts _scoped_query(None) and _scoped_query("") both raise. |
Critical |
_scoped_query used on a table without tenant_id column |
A developer creates a new repository for a table (e.g., exchange_rates from the multicurrency spec) that doesn't have a tenant_id column and inherits from BaseRepository. _scoped_query would raise AttributeError: type object 'ExchangeRate' has no attribute 'tenant_id'. |
Two options: (1) Add a class-level tenant_scoped = True flag on BaseRepository (default True). Tables without tenant scoping set tenant_scoped = False and override _scoped_query to raise NotImplementedError("This repository is not tenant-scoped — use direct queries"). (2) Alternatively, don't inherit from BaseRepository for unscoped tables — create a separate BaseUnscopedRepository with audit logging but no tenant filtering. Option 2 is cleaner. Document the choice in the BaseRepository docstring. |
High |
| Concurrent writes to the same record from different services | Two services (e.g., case_service updating workflow state and ehr_builder_agent updating ehr_snapshot) both read the same Case row, modify different fields, and flush. The second flush overwrites the first's changes because SQLAlchemy tracks the full object. |
This is an existing bug that the DAO layer inherits, not introduces. Mitigation: repository update methods should use UPDATE ... SET field = value WHERE id = :id AND tenant_id = :tid (column-level updates) rather than flushing the full ORM object. For update_workflow_state and update_ehr_snapshot, use db.execute(update(Case).where(...).values(field=new_value)) to avoid clobbering unrelated columns. Add a comment in BaseRepository documenting that callers should prefer column-level updates for hot tables. |
High |
| Transaction rollback in one repository while another committed | Service A calls PatientRepository.create() (flushes) then CaseRepository.create() (flushes). If the case creation fails, the patient row is already flushed but not committed — both are in the same session. However, if the service explicitly called db.commit() between the two operations, the patient is committed and the case rollback leaves an orphaned patient. |
Repositories only flush(), never commit(). The commit boundary is owned by the router (via FastAPI's dependency injection get_db() which commits on success and rolls back on exception). This is already the pattern — document it explicitly in BaseRepository docstring: "Repositories flush but never commit. The session lifecycle is managed by the caller (router/endpoint)." Add a CI lint test that asserts no repository file contains db.commit(). |
High |
| Repository import fails at startup (circular import) | patient_service.py imports PatientRepository which imports Patient model. If Patient model ever imports something from patient_service.py (e.g., for a hybrid property that calls a service function), Python raises ImportError at startup. |
Enforce a strict dependency direction: Models -> Repositories -> Services -> Routers. No backward imports. The CI lint test (test_repository_scoping.py) should also scan for imports: assert no app/repositories/*.py file imports from app/services/ and no app/models/*.py file imports from app/repositories/. Document the dependency direction in app/repositories/__init__.py. |
Medium |
BaseRepository subclass forgets to call super().__init__ |
A new repository overrides __init__ to add custom initialization but forgets super().__init__(db). All subsequent calls to self.db raise AttributeError. |
Make BaseRepository.__init__ set self.db and also validate that self.model is not None: if self.model is None: raise TypeError(f"{type(self).__name__} must set 'model' class attribute"). This catches two bugs at instantiation time rather than at query time. Add a unit test that creates a subclass without model set and asserts the error. |
Low |
Query returns empty result — None vs empty list inconsistency |
get_by_id returns None when not found. list_active returns ([], 0). But what does get_by_patient return for zero cases — None or []? If the contract is inconsistent, services have to handle both shapes. |
Establish and document a clear contract: single-record lookups (get_by_id, get_by_auth_id, get_by_slug) return T | None. Multi-record lookups (list_active, get_by_patient, get_for_matching) return list[T] (empty list, never None). Paginated methods return tuple[list[T], int]. Add this contract to the BaseRepository class docstring and enforce it in type annotations. |
Medium |
tenant_id mismatch between URL param and authenticated user's tenant |
A request arrives with X-Tenant-ID: tenant-apollo-001 but the Clerk JWT's org_id resolves to a different tenant. The repository will faithfully scope to whatever tenant_id the service passes — it doesn't know about the auth layer. If the router passes the wrong one, tenant isolation is bypassed. |
This is an auth-layer concern, not a repository concern, but the DAO layer should be the last line of defense. The router's _get_tenant_id dependency (which already exists) must validate that the URL/header tenant matches the JWT tenant. The repository trusts its caller. Document in BaseRepository: "Repositories trust the tenant_id parameter. Callers (routers) are responsible for validating tenant authorization before calling repository methods." Add an integration test in test_tenant_isolation.py that verifies a repository call with a mismatched tenant returns no results. |
Critical |
Implementation checklist¶
Opus tier (architectural decisions, design)¶
| Task | Description | Est. time |
|---|---|---|
| O1 | Design BaseRepository class with tenant scoping, audit logging, and error wrapping |
30 min |
| O2 | Design DataStoreError hierarchy (base + 5 subclasses, to_dict(), retry_safe semantics) |
20 min |
| O3 | Design PatientRepository public API (method signatures, return types, edge cases) |
20 min |
| O4 | Design tenant isolation enforcement strategy (lint test, CI check) | 15 min |
| O5 | Review service migration plan — verify no business logic leaks into repositories | 15 min |
Sonnet tier (implementation following established pattern)¶
| Task | Description | Est. time |
|---|---|---|
| S1 | Implement app/errors.py (DataStoreError hierarchy) |
15 min |
| S2 | Implement app/repositories/base.py (BaseRepository) |
30 min |
| S3 | Implement app/repositories/patient_repository.py |
30 min |
| S4 | Implement app/repositories/case_repository.py (including tenant-scoped case number fix) |
30 min |
| S5 | Implement app/repositories/fhir_repository.py |
20 min |
| S6 | Implement app/repositories/document_repository.py |
20 min |
| S7 | Implement app/repositories/provider_repository.py |
20 min |
| S8 | Implement app/repositories/__init__.py |
5 min |
| S9 | Migrate patient_service.py to use PatientRepository |
20 min |
| S10 | Migrate case_service.py to use CaseRepository |
25 min |
| S11 | Migrate fhir_service.py to use FhirRepository |
15 min |
| S12 | Migrate document_service.py to use DocumentRepository |
15 min |
| S13 | Migrate provider_service.py to use ProviderRepository |
15 min |
| S14 | Migrate match_service.py reads to use repositories |
20 min |
| S15 | Write tests/test_repositories/test_base_repository.py |
25 min |
| S16 | Write tests/test_repositories/test_patient_repository.py |
25 min |
| S17 | Write tests/test_repositories/test_case_repository.py |
20 min |
| S18 | Write tests/test_repositories/test_tenant_isolation.py |
15 min |
| S19 | Write tests/test_errors.py |
10 min |
| S20 | Write tests/test_repository_scoping.py (CI lint) |
10 min |
| S21 | Verify all 760+ existing tests still pass | 10 min |
Total estimated time: ~7 hours (1 Opus + Sonnet session)
Phase 2 outline — GraphDAO + CacheDAO¶
Scope (not detailed here):
app/dao/graph_dao.py— Wrapsgraph_service.pywith:pybreakercircuit breaker on the Neo4j driver (fail_threshold=3, reset=60s)DataStoreErrorwrapping for all Neo4j exceptions- Health state exposed via
get_store_health() -
The 30+ existing functions in
graph_service.pystay — GraphDAO is a thin wrapper, not a rewrite -
app/dao/cache_dao.py— Wrapsredis_pubsub.pycache functions with: pybreakercircuit breaker on the Upstash REST endpoint (fail_threshold=5, reset=15s)DataStoreErrorwrapping for httpx errors-
Pub/sub functions stay in
redis_pubsub.pyunchanged -
Circuit breaker registry —
app/dao/breaker_registry.py: - Central registry of all circuit breakers
get_store_health()returns all breaker states-
Wired into
/landscapehealth page -
Modified files:
app/services/match_service.py— catchStoreUnavailableErrorfrom GraphDAO instead of bare Exception from graph_serviceapp/services/case_service.py— catchStoreUnavailableErrorfrom CacheDAO for Redis cache callsapp/agents/case_orchestrator.py— catchStoreUnavailableErrorfrom GraphDAOapp/routers/health.py(orlandscape) — use breaker registry for health status-
9 files that import
redis_pubsubcache functions — switch to CacheDAO -
Dependencies:
pybreakeradded torequirements.txt -
Estimated time: 1 session
Phase 3 outline — StorageDAO + QueueDAO¶
Scope (not detailed here):
app/dao/storage_dao.py— Wrapsr2_client.pywith:pybreakercircuit breaker (fail_threshold=3, reset=30s)- Retry on transient 503 errors (1 retry with 2s backoff)
-
DataStoreErrorwrapping for BotoCoreError/ClientError -
app/dao/queue_dao.py— Wrapsqstash_client.pywith: pybreakercircuit breaker (fail_threshold=3, reset=60s)- Retry on 429 with exponential backoff (max 2 retries)
-
DataStoreErrorwrapping for httpx errors -
Modified files:
app/services/document_service.py— switch R2 calls to StorageDAOapp/agents/case_orchestrator.py— switch R2 calls to StorageDAOapp/services/document_processing.py— switch QStash calls to QueueDAO- 3 files that import
r2_client— switch to StorageDAO -
3 files that import
qstash_client— switch to QueueDAO -
Estimated time: Half a session
Session 41 New Tables — Future Repositories¶
Session 41 designed 6 new tables that will need repositories when implemented. These are NOT in Phase 1 scope but the DAO architecture must accommodate them.
| Table | Owner repository | Phase | Notes |
|---|---|---|---|
coordinator_vendors |
CoordinatorVendorRepository |
Wave 4 | Vendor CRUD, search by city + capabilities. Tenant-scoped (owning coordinator tenant). |
coordinator_service_bookings |
BookingRepository |
Wave 4 | Booking lifecycle (requested → confirmed → completed). Tenant-scoped via case_id → patient → tenant. |
coordinator_audit_log |
CoordinatorAuditRepository |
Wave 4 | Append-only. Tenant-scoped. Every coordinator action logged. |
graph_node_audit_log |
GraphNodeAuditRepository |
Wave 1 (intake restructuring) | Append-only. Per-turn graph execution trace. Tenant-scoped via case_id. |
scoring_audit_log |
ScoringAuditRepository |
Wave 1 | Append-only. PFS/HSS/FMS computation inputs + outputs. Tenant-scoped via case_id. |
extraction_audit_log |
ExtractionAuditRepository |
Wave 1 | Append-only. Per-layer extractor outputs. Tenant-scoped via case_id. |
Design principle: All audit tables are append-only — repositories expose create() and list_by_case() only, never update() or delete(). This aligns with compliance requirements (tamper-evident).
BaseRepository compatibility: All 6 tables have case_id which resolves to tenant_id via the case. Two patterns:
1. Tables with direct tenant_id column → standard _scoped_query(tenant_id) (coordinator_vendors, coordinator_audit_log)
2. Tables scoped via case_id → add _scoped_by_case(case_id, tenant_id) helper to BaseRepository that joins through case:
def _scoped_by_case(self, case_id: str, tenant_id: str):
"""Scope query by case_id, with tenant_id validation via join."""
return (
select(self.model)
.join(Case, self.model.case_id == Case.id)
.where(self.model.case_id == case_id)
.where(Case.tenant_id == tenant_id)
)
LangGraph Checkpointing — DB Access Strategy¶
LangGraph's AsyncPostgresSaver handles checkpoint persistence independently of the DAO layer. It uses its own table (checkpoints) managed by LangGraph's schema, not by our Alembic migrations.
Decision: LangGraph checkpointing is NOT wrapped in a repository.
Rationale:
- AsyncPostgresSaver is a battle-tested LangGraph component with its own connection management
- Checkpoints are keyed by thread_id (= case_id in our mapping), not by tenant_id directly
- Wrapping it would break LangGraph's internal contract with no benefit
- Tenant isolation is enforced by the fact that each case has a unique thread_id, and case access is already tenant-gated at the router level
What we DO need:
- Configure AsyncPostgresSaver to use the same Railway PostgreSQL connection string
- Map thread_id = case_id so checkpoints are case-scoped
- The checkpoint table is auto-created by LangGraph on first use (no Alembic migration needed)
- Add checkpoint cleanup to the GDPR erasure cascade (ADR-0019): DELETE FROM checkpoints WHERE thread_id = :case_id
Where this is configured: In the Triage Agent graph setup (Wave 1, item 1.13):
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
checkpointer = AsyncPostgresSaver.from_conn_string(DATABASE_URL)
graph = triage_agent_graph.compile(checkpointer=checkpointer)
Service Migration Order¶
Phase 1 targets 6 services. Migration order matters — start with highest-value (most queries, known bugs):
| Order | Service | Why first | Known issues |
|---|---|---|---|
| 1 | case_service.py |
Has the tenant-scoping bug in _generate_case_number(). Most complex service. Fixes a real bug. |
Global case number sequence |
| 2 | patient_service.py |
Most called service. Clean migration template for the rest. | None |
| 3 | fhir_service.py |
High volume (multiple FHIR resources per case). Tenant isolation critical for PHI. | None |
| 4 | document_service.py |
Has raw SQL for supersede_previous. |
Raw text() SQL bypassing ORM |
| 5 | provider_service.py |
Read-heavy, simpler migration. | None |
| 6 | match_service.py |
Depends on Patient + Provider repos being ready. Migrate last. | None |
Phase 1.5 — Additional Services (post-Phase 1, pre-Wave 1)¶
These services have inline DB queries but are lower priority. Migrate them between Wave 0 and Wave 1:
| Service | DB access pattern | Effort |
|---|---|---|
ehr_rebuild_service.py |
Reads FHIR, case, patient; writes EHR snapshot | 1 hour |
conversation_service.py |
Reads/writes messages, conversations | 45 min |
feedback_service.py |
Reads match_results, feedback_records | 30 min |
case_porting_service.py |
Cascading deletes across 6 tables | 1 hour |
patient_state.py |
Reads patient, FHIR, case, document, EHR | 45 min |
data_subject_handler.py |
GDPR cascade deletes across 10 tables | 1 hour |
Phase 1.5 total: ~5 hours additional effort.
RLS Integration Decision¶
Decision: DAO first (Wave 0.1), RLS second (Wave 0.2). Defense-in-depth, not either/or.
The DAO layer provides application-level tenant isolation. PostgreSQL RLS provides database-level enforcement as a second line of defense. Both are needed:
| Layer | What it catches | When it fails |
|---|---|---|
DAO (_scoped_query) |
Missing or wrong tenant_id in service code | Bug in a new service that bypasses the repository |
| RLS (database policy) | Any query from any source — even raw SQL, migration scripts, direct DB access | Misconfigured current_setting('app.tenant_id') |
Implementation sequence: 1. DAO repositories ship first — they enforce tenant isolation in application code 2. RLS ships second — adds database-level enforcement on the same tables 3. When both are active, DAO filtering and RLS filtering are redundant by design. This is intentional — if one layer has a bug, the other catches it.
Performance: Both layers filter by tenant_id. PostgreSQL's query optimizer will recognize the redundant predicate and not double-evaluate. No measurable performance impact.
Connection setup for RLS: Each request must SET app.tenant_id = :tid on the database connection. This is done in the FastAPI middleware or get_db() dependency — NOT in the repository layer. Repositories are RLS-unaware.
Cross-Tenant Access Pattern¶
For super-admin operations (GDPR deletion, analytics, system reports), a BaseUnscopedRepository provides access without tenant filtering:
class BaseUnscopedRepository:
"""For system tables and cross-tenant admin operations.
WARNING: Only use for:
- System-wide tables (providers, exchange_rates, treatment_categories)
- Super-admin operations with explicit authorization check
- GDPR erasure cascade (which must delete across all tenants)
All uses MUST be logged in audit_log with actor_type='system_admin'.
"""
model = None
def __init__(self, db: AsyncSession):
self.db = db
def _query(self):
"""Unscoped query. No tenant filter."""
return select(self.model)
async def _execute(self, query):
"""Same error wrapping as BaseRepository."""
# ... same try/except pattern
Usage: ProviderRepository extends BaseUnscopedRepository (providers are globally visible). System tables (exchange_rates, treatment_categories, consent_purposes) use BaseUnscopedRepository. Admin reports use a specific AdminReportRepository extending BaseUnscopedRepository with mandatory audit logging.
GDPR data_subject_handler: Uses BaseUnscopedRepository to cascade deletes across all tenants for a given patient. Every delete is audit-logged with actor_type='gdpr_cascade'.
Additional Edge Cases (Session 41 Audit)¶
| Edge Case | Scenario | Handling | Severity |
|---|---|---|---|
| Multi-repository transaction failure | PatientRepository.create() flushes, then CaseRepository.create() fails. Both are in the same session. |
Both repositories flush but never commit. The session rolls back entirely on exception. Document: "Always wrap multi-repository operations in a single session. Never call db.commit() between repository operations." |
High |
| Non-existent tenant_id (valid UUID format but no matching tenant) | Service passes a well-formed UUID that doesn't exist in the tenants table. Repository returns empty results — no error raised. | Repository trusts the tenant_id (it's a WHERE filter, not a JOIN). The router/auth layer validates tenant existence. Add a note: "Repositories do not validate tenant existence. A query with a non-existent tenant_id returns empty results by design — this is correct behavior." | Medium |
| Connection pool exhaustion | High traffic causes all connections to be in use. New repository calls block waiting for a connection. | StoreUnavailableError raised by _execute() when SQLAlchemy's pool times out. Service catches StoreUnavailableError and returns 503. The error has retry_safe=True so clients can retry. |
High |
Dependencies and prerequisites¶
| Dependency | Status | Notes |
|---|---|---|
pybreaker |
Not installed | Required for Phase 2. Not needed for Phase 1. |
config/service_registry.yaml |
Exists (per CLAUDE.md Rule 1) | Add circuit_breakers section in Phase 2. |
| Flagsmith flags | N/A | No feature flag needed �� the DAO layer is an internal refactor, not a user-facing feature. |
| PostgreSQL RLS | Deferred to Wave 0.2 | DAO ships first (Wave 0.1), RLS adds defense-in-depth after. |
| LangGraph AsyncPostgresSaver | Not yet configured | Configured in Wave 1 (item 1.13). Uses same Railway PostgreSQL. Not wrapped in a repository. |
What changes for SD¶
Nothing changes in the development workflow:
Depends(get_db)still provides the database session- Service functions still have the same signatures
- Routers are completely unaffected
- Tests run the same way
- The migration is done one service at a time — each PR is small and reviewable
The main benefit SD sees: when a new service is created, the repository pattern
provides a template to copy. Instead of writing raw select(Model).where(...),
copy patient_repository.py and change the model + method names. Tenant
isolation comes free.