Skip to content

DAO Layer — Steer Document

Date: 2026-04-10 Author: Srikanth Donthi (CPO/CTO) + Claude Code Status: Design Complete — Not Yet Implemented Companion spec: dao-layer-feature.md


1. Problem Statement

A data persistence audit found that Curaway has no centralized data access layer. Every service file writes its own database queries inline — raw SQLAlchemy select() calls, direct Neo4j session.run() calls, bare Redis HTTP requests, boto3 client.get_object() calls. The result is 6 data stores accessed from 50+ service files with no shared error handling, no retry policies, no circuit breakers, and no enforced tenant isolation at the data layer.

Current state by the numbers

Data Store Client Location Files Accessing Directly Pattern
PostgreSQL (Railway, via SQLAlchemy async) app/database.py ~50 service files select(Model).where(...) inline in every service function. Tenant filtering is manual — each query must remember to add .where(Model.tenant_id == tenant_id).
Neo4j Aura (graph DB) app/services/graph_service.py 11 files import graph_service or neo4j directly _get_driver() + session.run(cypher) with per-function try/except. No retry. No circuit breaker.
Qdrant Cloud (vector DB) app/integrations/qdrant_service.py ~5 files via semantic_search_service.py Already well-abstracted. Graceful degradation, empty-result fallback. Leave this alone.
Redis/Upstash (cache + pub/sub) app/integrations/redis_pubsub.py 9 files import directly HTTP REST API calls with per-function try/except. Swallows all errors. No retry.
Cloudflare R2 (document storage) app/integrations/r2_client.py 3 files import directly boto3 S3 client with lazy init. Per-function BotoCoreError catch. No retry on transient failures.
QStash (async job queue) app/integrations/qstash_client.py 3 files import directly httpx POST with per-function try/except. Logs and returns None on failure.

What this causes

1. Tenant isolation is opt-in, not enforced. Every PostgreSQL query must manually include .where(Model.tenant_id == tenant_id). A single missed filter is a cross-tenant data leak. Today, patient_service.py, case_service.py, fhir_service.py, and document_service.py all correctly add the filter — but there's nothing preventing a future service from forgetting it. In case_service.py:_generate_case_number(), the raw SQL SELECT MAX(case_number) FROM cases WHERE case_number LIKE :prefix has no tenant filter at all — the case number sequence is global, not per-tenant.

2. Error handling is inconsistent. graph_service.py has 30+ functions that each follow the same pattern: get driver, check None, try/run cypher, except/log/return False. redis_pubsub.py swallows all exceptions silently. r2_client.py catches BotoCoreError but not transient network errors. qstash_client.py catches HTTPStatusError separately from general exceptions. There is no shared error taxonomy — callers cannot distinguish "store is down" from "query is wrong" from "tenant doesn't exist."

3. No retry on transient failures. When Railway PostgreSQL drops an idle connection (it does this after 5 minutes of inactivity), the query fails. pool_pre_ping=True helps but doesn't cover all cases. Neo4j Aura's free tier has cold-start latency — the first query after an idle period regularly times out. Redis REST API returns 5xx during Upstash maintenance windows. R2 occasionally returns 503 on high-traffic presigned URL generation. None of these have retry logic.

4. No circuit breaker pattern. When Neo4j Aura goes down (as it did during Session 31 debugging), every request that touches graph_service makes a full round-trip attempt to a dead endpoint, adding 10-30 seconds of latency per request before timing out. The same applies to Qdrant, Redis, and R2. There is no mechanism to "fail fast" after detecting a store is unhealthy.

5. Audit logging is duplicated. patient_service.py, case_service.py, document_service.py, and match_service.py all manually construct AuditLog and Event objects with nearly identical boilerplate. A repository base class could handle this automatically.

6. No centralized connection health monitoring. The /landscape health page polls each store independently via ad-hoc checks. A DAO layer with circuit breaker state would make health status a first-class, always-current property rather than a periodic probe.


2. Why DAO Now

Three upcoming features make the current state untenable:

2a. Multilingual support needs consistent data contracts

The multilingual spec (pending) requires every patient-facing string to go through a translation layer. If data access is scattered across 50 files, adding preferred_locale awareness to every query is error-prone. A repository layer provides a single place to inject locale-aware data contracts.

2b. Case record porting touches 4 stores

The case-record-porting feature (docs/specs/case-record-porting-feature.md) copies patient records across tenants. This requires coordinated reads from PostgreSQL (patient, FHIR resources, documents), Neo4j (patient conditions), R2 (document files), and Redis (cache invalidation). Without a DAO layer, the porting service must import and coordinate 4 different clients directly.

2c. LLM fallback gateway needs centralized retry

The LLM fallback gateway (docs/specs/llm-fallback-gateway-feature.md) establishes the pattern of centralized retry + circuit breaker for LLM calls. The DAO layer extends this pattern to data stores. Same architectural principle, applied to persistence instead of inference.

2d. The codebase is still small enough to refactor

50 service files is the sweet spot — large enough that the duplication hurts, small enough that a systematic refactor is still tractable. At 100+ services (post-Series A), this becomes a multi-sprint project. Doing it now costs days; doing it later costs weeks.


3. Design Principles

3.1 Thin repositories, not heavy ORM abstraction

The DAO layer is not a generic ORM abstraction. It does not hide SQLAlchemy — services still work with SQLAlchemy models. The repositories provide:

  • Typed query methods (not generic find_by(field, value))
  • Automatic tenant_id filtering (enforced at the base class)
  • Automatic audit logging (write operations create AuditLog entries)
  • Consistent error wrapping (all exceptions mapped to the error taxonomy)

What the repositories do NOT do:

  • Replace SQLAlchemy's query builder
  • Abstract away the database engine
  • Provide a "unit of work" pattern (SQLAlchemy sessions already do this)
  • Create a generic Repository[T] that works for any model

3.2 Tenant isolation enforced at DAO level

Every PostgreSQL repository method that reads data must accept a tenant_id parameter and apply it as a query filter. The BaseRepository enforces this — there is no "get without tenant" method. Super-admin cross-tenant queries go through a separate, explicitly named method (get_cross_tenant(...)) that requires an audit reason.

3.3 Circuit breaker pattern

Each external data store (Neo4j, Redis, R2, QStash) gets a circuit breaker. When a store exceeds its failure threshold, the breaker opens and subsequent calls fail immediately with StoreUnavailableError instead of waiting for a timeout. The breaker auto-resets after a configurable cooldown.

Circuit breakers are per-store, not per-operation. If Neo4j is down, all Neo4j operations fail fast — we don't need per-function breakers because the root cause (connectivity) is shared.

3.4 Unified error taxonomy

Every data access error maps to a DataStoreError subclass with structured fields:

  • store: which data store (postgres, neo4j, redis, r2, qstash)
  • operation: what was attempted (read, write, delete, connect)
  • retry_safe: whether the caller can safely retry
  • original_error: the underlying exception (for logging, not for callers)

Callers catch DataStoreError (or its subclasses) instead of store-specific exceptions. This means a service doesn't need to know whether it's catching a neo4j.exceptions.ServiceUnavailable or a botocore.exceptions.ClientError — it catches StoreUnavailableError in both cases.

3.5 Graceful degradation preserved

The current codebase has a strong pattern of graceful degradation — when Neo4j is down, the matching engine falls back to Postgres-only scoring. When Redis is down, cache misses are treated as empty results. When R2 is down, presigned URLs fall back to placeholders.

The DAO layer preserves this. Circuit breakers raise StoreUnavailableError, and services catch it to activate their fallback paths. The DAO layer does NOT automatically retry in cases where the caller has a fallback — it lets the caller decide.


4. What Each DAO Looks Like

4.1 PostgresDAO — Typed Repositories

Not a single class, but a family of repositories sharing a BaseRepository:

app/repositories/
    __init__.py
    base.py              # BaseRepository with tenant filtering, audit logging
    patient_repository.py
    case_repository.py
    fhir_repository.py
    document_repository.py
    provider_repository.py

Each repository has typed methods. PatientRepository has get_by_id(patient_id, tenant_id), get_by_auth_id(external_auth_id, tenant_id), list_active(tenant_id, page, page_size) — not generic find_one() / find_many().

The repositories accept an AsyncSession (injected by FastAPI Depends(get_db)) and delegate to it. They do not own the session lifecycle.

4.2 GraphDAO — Wrapping graph_service.py

graph_service.py already has 30+ well-organized functions. The GraphDAO does not replace them — it wraps the module with:

  • A circuit breaker (pybreaker) on the Neo4j driver
  • Consistent DataStoreError wrapping for all exceptions
  • Connection health state exposed to /landscape

The existing _get_driver() lazy-init pattern stays. The circuit breaker sits between the caller and _get_driver().

4.3 CacheDAO — Wrapping Redis

redis_pubsub.py has two concerns: caching and pub/sub. The CacheDAO wraps the caching functions (cache_get, cache_set, cache_delete) with:

  • A circuit breaker on the Upstash REST endpoint
  • DataStoreError wrapping
  • Optional TTL override per-call (already supported, but not exposed consistently)

Pub/sub functions (publish, push_message, subscribe_poll) stay in redis_pubsub.py unchanged — they're event infrastructure, not data access.

4.4 StorageDAO — Wrapping R2

r2_client.py gets a thin DAO wrapper with:

  • A circuit breaker on the boto3 client
  • Retry on transient 503 errors (R2 occasionally returns these under load)
  • DataStoreError wrapping for BotoCoreError and ClientError

4.5 QueueDAO — Wrapping QStash

qstash_client.py gets a thin DAO wrapper with:

  • A circuit breaker on the QStash endpoint
  • Retry on 429 (rate limit) with exponential backoff
  • DataStoreError wrapping

4.6 Qdrant — Already Done

qdrant_service.py + semantic_search_service.py already provide a clean abstraction with graceful degradation. Do not wrap these again. The only addition: expose Qdrant health state to the circuit breaker registry for /landscape consistency.


5. Tenant Isolation Strategy

Current state

Every service function manually adds .where(Model.tenant_id == tenant_id) to its queries. This works but relies on developer discipline. The _generate_case_number() function in case_service.py (line 23) demonstrates the risk — it queries MAX(case_number) across all tenants because the raw SQL has no tenant filter.

DAO approach

BaseRepository enforces tenant filtering at the query-building level:

class BaseRepository:
    def _scoped_query(self, model, tenant_id: str):
        """Returns a base query pre-filtered by tenant_id."""
        return select(model).where(model.tenant_id == tenant_id)

Every read method in every repository starts from _scoped_query(). There is no public method that returns un-scoped data.

For the rare cases where cross-tenant access is needed (super-admin, data export, analytics), a separate _cross_tenant_query() method exists that:

  1. Requires an audit_reason: str parameter
  2. Writes an audit log entry with the reason
  3. Is never called from patient-facing code paths

Enforcement

A simple lint rule (or test) can verify that no repository method calls select(Model) directly — it must go through _scoped_query(). This catches mistakes at CI time, not in production.


6. Error Taxonomy

DataStoreError (base)
    store: str           # "postgres", "neo4j", "redis", "r2", "qstash"
    operation: str       # "read", "write", "delete", "connect"
    retry_safe: bool     # Can the caller retry this operation?
    original_error: Exception | None

    StoreUnavailableError     # Store is down or circuit breaker is open
    TenantIsolationViolation  # Query attempted without tenant_id
    RecordNotFoundError       # Expected record does not exist
    DuplicateRecordError      # Unique constraint violation
    QueryError                # Malformed query or constraint violation
    ConnectionPoolExhausted   # All connections in use

Mapping from current exceptions

Current Exception Maps To retry_safe
sqlalchemy.exc.OperationalError (connection lost) StoreUnavailableError True
sqlalchemy.exc.IntegrityError (unique violation) DuplicateRecordError False
neo4j.exceptions.ServiceUnavailable StoreUnavailableError True
neo4j.exceptions.ClientError (bad cypher) QueryError False
botocore.exceptions.ClientError (403) DataStoreError False
botocore.exceptions.ClientError (503) StoreUnavailableError True
httpx.ConnectError (Redis/QStash) StoreUnavailableError True
httpx.HTTPStatusError (429) StoreUnavailableError True
httpx.HTTPStatusError (500+) StoreUnavailableError True

7. Circuit Breaker Design

Library

pybreaker — lightweight, well-maintained, no async dependency conflicts. Used by production systems at similar scale.

Configuration

Per-store thresholds in config/service_registry.yaml (already exists per CLAUDE.md Rule 1):

circuit_breakers:
  postgres:
    fail_threshold: 5         # Open after 5 consecutive failures
    reset_timeout_seconds: 30 # Try again after 30s
    exclude_exceptions:       # Don't count these as failures
      - RecordNotFoundError
      - DuplicateRecordError
  neo4j:
    fail_threshold: 3         # Neo4j is less reliable (free tier)
    reset_timeout_seconds: 60
  redis:
    fail_threshold: 5
    reset_timeout_seconds: 15 # Redis recovers fast
  r2:
    fail_threshold: 3
    reset_timeout_seconds: 30
  qstash:
    fail_threshold: 3
    reset_timeout_seconds: 60

State exposure

Each circuit breaker's state (closed, open, half-open) is exposed via:

  1. A get_store_health() function that returns all breaker states
  2. The /landscape health page (replaces ad-hoc connectivity probes)
  3. Events table entries when a breaker trips or resets

Integration with existing fallback patterns

When match_service.py queries Neo4j for graph-enhanced matching and the breaker is open, it catches StoreUnavailableError and falls back to Postgres-only scoring — exactly as it does today when Neo4j throws an exception, but without the 10-30 second timeout wait.


8. Migration Strategy

Principle: Incremental, prove-then-expand

The DAO layer is NOT a big-bang refactor. It's a phased rollout where each phase proves the pattern before expanding.

Phase 1 — PostgreSQL Repositories (highest value, lowest risk)

Why first: PostgreSQL is the primary data store used by all 50 service files. Tenant isolation enforcement is the highest-value improvement. SQLAlchemy's existing session management means the repository pattern is low-friction.

Scope: PatientRepository, CaseRepository, FhirRepository, DocumentRepository, ProviderRepository + BaseRepository + app/errors.py.

Duration: 1 session (Opus designs base + error taxonomy, Sonnet implements individual repositories).

Validation: After Phase 1, run the full test suite. Tenant isolation test: verify that calling patient_repository.get_by_id(patient_id, wrong_tenant_id) returns None.

Phase 2 — GraphDAO + CacheDAO (medium value, medium risk)

Why second: Neo4j is the second most accessed store (11 files). Redis is used for caching and SSE — both are latency-sensitive and benefit from circuit breakers.

Scope: GraphDAO wrapper around graph_service.py, CacheDAO wrapper around redis_pubsub.py cache functions. Circuit breaker integration for both.

Duration: 1 session.

Phase 3 — StorageDAO + QueueDAO (lowest value, lowest risk)

Why last: R2 and QStash are accessed by only 3 files each. The existing abstractions (r2_client.py, qstash_client.py) are already reasonably clean. The main value is circuit breaker integration.

Scope: Thin DAO wrappers with circuit breakers.

Duration: Half a session.


9. What NOT to Do

Do not create a generic "query anything" layer

A Repository[T] base class with find_by_id(), find_all(), save(), delete() methods that work for any model is a trap. It looks elegant but:

  • Loses type safety (callers get Model instead of Patient)
  • Encourages lazy query patterns (load all, filter in Python)
  • Hides the actual SQL from code reviewers
  • Makes tenant isolation harder to verify

Each repository has typed, domain-specific methods.

Do not re-wrap Qdrant

semantic_search_service.py is already a good abstraction. It has:

  • Graceful degradation (returns empty on failure)
  • Typed methods (search_providers(), get_semantic_score_for_providers())
  • No direct Qdrant imports in service files

Adding a QdrantDAO on top would be pure ceremony.

Do not introduce a Unit of Work pattern

SQLAlchemy's AsyncSession already implements Unit of Work. The FastAPI Depends(get_db) dependency yields a session, commits on success, rolls back on failure. Adding a custom Unit of Work on top would conflict with this.

Do not abstract away SQLAlchemy

The repositories use SQLAlchemy's select(), insert(), update(). They do not invent a custom query DSL. Services that need complex queries (joins, aggregations, CTEs) can still use SQLAlchemy directly via the session — the repository is a convenience, not a cage.

Do not add caching at the repository level

Caching belongs in the service layer, not the repository layer. case_service.get_procedure_requirements() already caches in Redis — this stays in the service. The repository returns fresh data; the service decides whether to cache.


10. Risks and Mitigations

Risk Likelihood Impact Mitigation
Repositories add boilerplate without enough value Medium Low Phase 1 is limited to 5 repositories. If the pattern doesn't pay for itself after PatientRepository + CaseRepository, stop.
Circuit breakers trigger false positives (e.g., counting query errors as store failures) Medium Medium Configure exclude_exceptionsRecordNotFoundError and DuplicateRecordError do not count as failures.
Migration breaks existing tests Low High Phase 1 is additive — new files, not changed files. Services switch to repositories one at a time. Each switch is a separate PR.
Performance overhead from error wrapping Low Low Exception creation is negligible compared to network I/O. Benchmarked at <0.1ms per wrapped call.
Team unfamiliarity with repository pattern Low Medium Pattern is simple: typed methods that delegate to SQLAlchemy. No framework, no magic. The base class is <100 lines.
Over-engineering for a seed-stage startup Medium Medium The DAO layer adds three things that have immediate value: tenant isolation enforcement, circuit breakers, and error taxonomy. Everything else is optional.

11. Success Criteria

Phase 1 is successful if:

  1. Tenant isolation is enforced: No repository method returns data without a tenant_id filter. A CI test verifies this.
  2. Error taxonomy is used: Service files catch DataStoreError subclasses instead of raw SQLAlchemy exceptions.
  3. Audit logging is automatic: Write operations in repositories create AuditLog entries without the service having to do it manually.
  4. Test suite passes: All 760 existing tests pass after the migration.
  5. No performance regression: P95 latency on /api/v1/patients and /api/v1/cases stays within 10% of pre-DAO baseline.

Phase 2 is successful if:

  1. Circuit breakers prevent cascading timeouts: When Neo4j is down, matching requests complete in <2 seconds (vs current 10-30 second timeout).
  2. Health page uses breaker state: /landscape shows real-time circuit breaker status instead of periodic probes.

  • dao-layer-feature.md — File-by-file implementation plan
  • llm-fallback-gateway-steer.md — Same centralized-resilience pattern applied to LLM calls
  • case-record-porting-steer.md — Feature that benefits most from DAO layer
  • docs/architecture/20-microservices-readiness.md — Microservices readiness audit. The DAO layer is the foundational step: per-domain transaction ownership, typed contracts, and tenant isolation at the repository level enable independent service extraction.
  • CLAUDE.md Section 3 (Multi-Tenancy from Day 1) — The ground rule this enforces
  • CLAUDE.md Section 13 (Error Code Taxonomy) — The SYS_* error codes this extends