Skip to content

ADR-001: Workflow State Persistence Architecture

Status: Implemented Date: 2025-12-27 Decision Makers: AEGIS Team Related Gap: GAP-M3 (Workflow Persistence) Implementation: src/workflows/persistence/ (completed 2025-12-27)


Context

AEGIS LIBERTAS workflows are currently ephemeral - workflow state exists only in memory and is lost on process restart. This creates several production issues:

  1. Workflow state lost on restart: Human review workflows spanning days lose all progress
  2. Cannot resume interrupted workflows: No recovery mechanism for failed processes
  3. No audit trail for in-progress workflows: Compliance gap for governance requirements
  4. No cross-instance coordination: Cannot scale horizontally

Current Implementation

# src/workflows/proposal.py - Ephemeral state
class ProposalWorkflow:
    def __init__(self, metadata: ProposalMetadata):
        self.state = ProposalState.DRAFT  # Lost on restart
        self.transitions: list[StateTransition] = []  # In-memory only

Requirements

  • R1: Persist workflow state across process restarts
  • R2: Support PostgreSQL (production) and SQLite (testing)
  • R3: Async-native for high concurrency
  • R4: Maintain audit trail for all state transitions
  • R5: Support workflow resume from last checkpoint
  • R6: Zero external dependencies beyond database

Decision Drivers

Driver Weight Notes
Zero external runtime dependencies HIGH No separate Temporal/Cadence server
Async-native performance HIGH Non-blocking database operations
Type safety MEDIUM SQLAlchemy ORM integration
Testing simplicity MEDIUM SQLite for unit tests
Implementation complexity LOW Minimal additional code

Options Considered

Option 1: Temporal.io

Approach: Full durable execution platform with external server

Pros: - Industry-leading durability guarantees (Stripe, Netflix, Uber) - Built-in retries, timeouts, visibility - Time-travel debugging

Cons: - Requires external Temporal server (Docker/Kubernetes) - Heavy operational burden - 10-20x more complex than needed for AEGIS workflows - Python SDK less mature than Go/Java

Verdict: REJECTED - Excessive complexity for governance workflows

Option 2: DBOS (Database-backed Durable Execution)

Approach: Lightweight Python decorators with PostgreSQL/SQLite backing

Pros: - Decorator-based, minimal code changes - PostgreSQL native, no external server - 1.1k GitHub stars, active development

Cons: - New library (2024), less battle-tested - Opinionated workflow structure - Limited customization for audit trails

Verdict: REJECTED - Too opinionated, limited audit control

Option 3: Custom SQLAlchemy Async Persistence (SELECTED)

Approach: Checkpoint-based persistence layer using SQLAlchemy 2.0 async + asyncpg

Pros: - Full control over schema and audit trail - Uses proven SQLAlchemy ecosystem - asyncpg driver: 3x faster than psycopg2 - SQLite for testing, PostgreSQL for production - Minimal additional dependencies (already in requirements) - Follows LangGraph checkpoint pattern (proven at scale)

Cons: - More initial implementation work (~12 hours) - Must implement retry/timeout logic manually

Verdict: SELECTED - Best fit for AEGIS requirements


Decision

Implement Custom SQLAlchemy Async Persistence with checkpoint-based state management:

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    LIBERTAS Workflow Layer                       │
├─────────────────────────────────────────────────────────────────┤
│  ProposalWorkflow  │  ConsensusWorkflow  │  OverrideWorkflow    │
└─────────┬──────────┴──────────┬──────────┴──────────┬───────────┘
          │                     │                     │
          ▼                     ▼                     ▼
┌─────────────────────────────────────────────────────────────────┐
│                  WorkflowPersistence Layer                       │
├─────────────────────────────────────────────────────────────────┤
│  save_checkpoint()  │  load_checkpoint()  │  list_workflows()   │
│  update_state()     │  record_transition()│  get_audit_trail()  │
└─────────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│              SQLAlchemy 2.0 Async ORM                            │
├─────────────────────────────────────────────────────────────────┤
│  AsyncSession  │  async_sessionmaker  │  create_async_engine    │
└─────────────────────────────┬───────────────────────────────────┘
              ┌───────────────┴───────────────┐
              ▼                               ▼
┌─────────────────────────┐     ┌─────────────────────────┐
│  PostgreSQL (asyncpg)   │     │   SQLite (aiosqlite)    │
│     Production          │     │     Testing             │
└─────────────────────────┘     └─────────────────────────┘

Database Schema

-- Workflow instances table
CREATE TABLE workflow_instances (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_type VARCHAR(50) NOT NULL,  -- 'proposal', 'consensus', 'override'
    workflow_id VARCHAR(100) NOT NULL UNIQUE,
    current_state VARCHAR(50) NOT NULL,
    state_data JSONB NOT NULL DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    completed_at TIMESTAMP WITH TIME ZONE,
    parent_workflow_id UUID REFERENCES workflow_instances(id),

    INDEX idx_workflow_type (workflow_type),
    INDEX idx_workflow_state (current_state),
    INDEX idx_workflow_created (created_at)
);

-- State transitions table (audit trail)
CREATE TABLE workflow_transitions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_instance_id UUID NOT NULL REFERENCES workflow_instances(id),
    from_state VARCHAR(50) NOT NULL,
    to_state VARCHAR(50) NOT NULL,
    actor_id VARCHAR(100) NOT NULL,
    reason TEXT,
    metadata JSONB DEFAULT '{}',
    transition_hash VARCHAR(64) NOT NULL,  -- SHA-256 for integrity
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    INDEX idx_transition_workflow (workflow_instance_id),
    INDEX idx_transition_actor (actor_id)
);

-- Checkpoints table (for resume)
CREATE TABLE workflow_checkpoints (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_instance_id UUID NOT NULL REFERENCES workflow_instances(id),
    checkpoint_number INTEGER NOT NULL,
    state_snapshot JSONB NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    UNIQUE (workflow_instance_id, checkpoint_number)
);

Core Implementation Pattern

# src/workflows/persistence/repository.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from typing import Optional
import json

class Base(DeclarativeBase):
    pass

class WorkflowInstance(Base):
    __tablename__ = "workflow_instances"

    id: Mapped[str] = mapped_column(primary_key=True)
    workflow_type: Mapped[str]
    workflow_id: Mapped[str]
    current_state: Mapped[str]
    state_data: Mapped[dict] = mapped_column(default_factory=dict)
    created_at: Mapped[datetime]
    updated_at: Mapped[datetime]
    completed_at: Mapped[Optional[datetime]]

class WorkflowPersistence:
    """Async workflow state persistence layer."""

    def __init__(self, database_url: str):
        self.engine = create_async_engine(database_url)
        self.session_factory = async_sessionmaker(self.engine, expire_on_commit=False)

    async def save_checkpoint(
        self,
        workflow: "ProposalWorkflow",
        actor_id: str
    ) -> None:
        """Persist current workflow state as checkpoint."""
        async with self.session_factory() as session:
            async with session.begin():
                instance = await self._get_or_create_instance(session, workflow)
                instance.current_state = workflow.state.value
                instance.state_data = workflow.to_dict()
                instance.updated_at = datetime.now(timezone.utc)

                # Record transition if state changed
                if workflow.transitions:
                    last_transition = workflow.transitions[-1]
                    await self._record_transition(session, instance.id, last_transition)

    async def load_checkpoint(
        self,
        workflow_id: str
    ) -> Optional[dict]:
        """Load workflow state from last checkpoint."""
        async with self.session_factory() as session:
            result = await session.execute(
                select(WorkflowInstance).where(WorkflowInstance.workflow_id == workflow_id)
            )
            instance = result.scalar_one_or_none()
            if instance:
                return instance.state_data
            return None

    async def list_pending_workflows(
        self,
        workflow_type: str
    ) -> list[dict]:
        """List all non-terminal workflows of given type."""
        async with self.session_factory() as session:
            result = await session.execute(
                select(WorkflowInstance)
                .where(WorkflowInstance.workflow_type == workflow_type)
                .where(WorkflowInstance.completed_at.is_(None))
            )
            return [row.state_data for row in result.scalars()]

Consequences

Positive

  1. Full audit trail: Every state transition persisted with actor, timestamp, hash
  2. Crash recovery: Workflows resume from last checkpoint automatically
  3. Horizontal scaling: Multiple instances can process different workflows
  4. Testing simplicity: SQLite in-memory for unit tests
  5. No operational burden: No external servers to manage

Negative

  1. Initial implementation: ~12 hours of development work
  2. Schema migrations: Need Alembic for production schema changes
  3. Database dependency: Requires PostgreSQL for production

Neutral

  1. Performance: ~1-5ms overhead per checkpoint (acceptable for governance workflows)
  2. Storage: ~1KB per workflow instance + ~100B per transition

Implementation Plan

See docs/implementation-plans/gap-m3-workflow-persistence.md for detailed EPCC plan.

Phase Summary

Phase Duration Deliverables
Explore 2 hours Schema design, API contract
Plan 2 hours Implementation tasks, test strategy
Code 6 hours Persistence layer, workflow integration
Commit 2 hours Testing, documentation, PR

References


Changelog

Date Author Changes
2025-12-27 Claude Code ADR Accepted: Full implementation complete in src/workflows/persistence/. 51 tests, 90.09% coverage.
2025-12-27 Claude Code Initial ADR creation