Skip to content

GAP-M3: Workflow State Persistence - EPCC Implementation Plan

Version: 1.1.0 Created: 2025-12-27 Completed: 2025-12-27 Gap ID: GAP-M3 Severity: MEDIUM (Production Blocker) Actual Effort: ~10 hours Related ADR: ADR-001-workflow-persistence (Accepted) Status: ✅ IMPLEMENTED


Executive Summary

This plan implements durable workflow state persistence for AEGIS LIBERTAS workflows using SQLAlchemy 2.0 async with PostgreSQL (production) and SQLite (testing). The implementation enables crash recovery, audit trail persistence, and horizontal scaling for governance workflows.

Current State

  • Workflows are ephemeral (in-memory only)
  • State lost on process restart
  • No audit trail for in-progress workflows
  • Cannot scale horizontally

Target State

  • All workflow state persisted to database
  • Automatic resume from last checkpoint on restart
  • Complete audit trail for compliance
  • Support for horizontal scaling

1. Research & Validation Summary

1.1 Research Sources

Source Key Findings Applicability
Temporal.io Industry-leading durable execution, requires external server Rejected - too heavy
DBOS Lightweight Python decorators, PostgreSQL backing Rejected - too opinionated
SQLAlchemy 2.0 Async First-class async support, AsyncSession, asyncpg driver Selected
asyncpg High-performance PostgreSQL driver, 3x faster than psycopg2 Selected
LangGraph Checkpoint-based persistence pattern Pattern adopted
FSM-Workflow Lightweight async workflow with PostgreSQL Pattern adopted
python-statemachine Abstract persistence layer pattern Pattern adopted

1.2 Architecture Decision

Selected Approach: Custom SQLAlchemy Async Persistence with checkpoint pattern

Rationale: 1. Zero external dependencies beyond database 2. Full control over audit trail schema 3. Proven SQLAlchemy ecosystem 4. asyncpg for high performance 5. SQLite for testing simplicity


2. Architecture Overview

2.1 Component Diagram

┌─────────────────────────────────────────────────────────────────────┐
│                        Application Layer                             │
│  ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐        │
│  │ ProposalWorkflow│ │ConsensusWorkflow│ │ OverrideWorkflow│        │
│  │   (Stateful)    │ │   (Stateful)    │ │   (Stateful)    │        │
│  └────────┬────────┘ └────────┬────────┘ └────────┬────────┘        │
└───────────┼────────────────────┼────────────────────┼───────────────┘
            │                    │                    │
            ▼                    ▼                    ▼
┌─────────────────────────────────────────────────────────────────────┐
│                     DurableWorkflowEngine                            │
│  ┌─────────────────────────────────────────────────────────────┐    │
│  │  run()                                                       │    │
│  │  ├── load_checkpoint() → Restore from DB if exists          │    │
│  │  ├── execute_task() → Run workflow step                     │    │
│  │  ├── save_checkpoint() → Persist after each step            │    │
│  │  └── finalize() → Mark complete                             │    │
│  └─────────────────────────────────────────────────────────────┘    │
└───────────────────────────────────┬─────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│                     WorkflowPersistence                              │
│  ┌────────────────────────────────────────────────────────────────┐ │
│  │ Async Methods:                                                  │ │
│  │  • save_checkpoint(workflow, actor_id)                         │ │
│  │  • load_checkpoint(workflow_id) → Optional[dict]               │ │
│  │  • record_transition(workflow_id, transition)                  │ │
│  │  • list_pending(workflow_type) → list[dict]                    │ │
│  │  • get_audit_trail(workflow_id) → list[Transition]             │ │
│  │  • mark_complete(workflow_id)                                  │ │
│  └────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────┬─────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│                   SQLAlchemy 2.0 Async ORM                           │
│  ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐        │
│  │ WorkflowInstance│ │WorkflowTransition│ │WorkflowCheckpoint│       │
│  │    (Model)      │ │     (Model)      │ │     (Model)      │       │
│  └─────────────────┘ └─────────────────┘ └─────────────────┘        │
└───────────────────────────────────┬─────────────────────────────────┘
            ┌───────────────────────┴───────────────────────┐
            ▼                                               ▼
┌───────────────────────────┐               ┌───────────────────────────┐
│   PostgreSQL (asyncpg)    │               │   SQLite (aiosqlite)      │
│      Production           │               │      Testing              │
│  • Connection pooling     │               │  • In-memory option       │
│  • JSONB for state_data   │               │  • JSON for state_data    │
│  • Concurrent writes      │               │  • Single connection      │
└───────────────────────────┘               └───────────────────────────┘

2.2 Database Schema

-- workflow_instances: Core workflow state
CREATE TABLE workflow_instances (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_type VARCHAR(50) NOT NULL,      -- 'proposal', 'consensus', 'override'
    workflow_id VARCHAR(100) NOT NULL UNIQUE, -- External ID
    current_state VARCHAR(50) NOT NULL,       -- State enum value
    state_data JSONB NOT NULL DEFAULT '{}',   -- Full serialized state
    metadata JSONB DEFAULT '{}',              -- Additional metadata
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    completed_at TIMESTAMPTZ,                 -- NULL = in-progress
    parent_workflow_id UUID REFERENCES workflow_instances(id),

    CONSTRAINT idx_workflow_unique UNIQUE (workflow_type, workflow_id)
);

CREATE INDEX idx_workflow_type ON workflow_instances(workflow_type);
CREATE INDEX idx_workflow_state ON workflow_instances(current_state);
CREATE INDEX idx_workflow_pending ON workflow_instances(workflow_type)
    WHERE completed_at IS NULL;

-- workflow_transitions: Audit trail
CREATE TABLE workflow_transitions (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
    sequence_number INTEGER NOT NULL,          -- Order within workflow
    from_state VARCHAR(50) NOT NULL,
    to_state VARCHAR(50) NOT NULL,
    actor_id VARCHAR(100) NOT NULL,
    actor_type VARCHAR(20) NOT NULL,           -- 'AI', 'HUMAN', 'GOVERNANCE'
    reason TEXT,
    metadata JSONB DEFAULT '{}',
    transition_hash VARCHAR(64) NOT NULL,      -- SHA-256 for integrity
    created_at TIMESTAMPTZ DEFAULT NOW(),

    CONSTRAINT idx_transition_sequence UNIQUE (workflow_instance_id, sequence_number)
);

CREATE INDEX idx_transition_workflow ON workflow_transitions(workflow_instance_id);
CREATE INDEX idx_transition_actor ON workflow_transitions(actor_id);
CREATE INDEX idx_transition_created ON workflow_transitions(created_at);

-- workflow_checkpoints: Resume points
CREATE TABLE workflow_checkpoints (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    workflow_instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
    checkpoint_number INTEGER NOT NULL,
    state_snapshot JSONB NOT NULL,
    checkpoint_hash VARCHAR(64) NOT NULL,      -- SHA-256 for integrity
    created_at TIMESTAMPTZ DEFAULT NOW(),

    CONSTRAINT idx_checkpoint_unique UNIQUE (workflow_instance_id, checkpoint_number)
);

CREATE INDEX idx_checkpoint_workflow ON workflow_checkpoints(workflow_instance_id);

2.3 API Contract

from abc import ABC, abstractmethod
from typing import Optional, TypeVar, Generic
from dataclasses import dataclass

T = TypeVar('T', bound='BaseWorkflow')

@dataclass
class CheckpointResult:
    """Result of checkpoint operation."""
    success: bool
    checkpoint_id: str
    checkpoint_number: int
    transition_count: int

@dataclass
class LoadResult(Generic[T]):
    """Result of loading a workflow."""
    workflow: Optional[T]
    checkpoint_number: int
    transitions_loaded: int

class WorkflowPersistenceProtocol(ABC):
    """Protocol for workflow persistence implementations."""

    @abstractmethod
    async def save_checkpoint(
        self,
        workflow: 'BaseWorkflow',
        actor_id: str,
        force: bool = False
    ) -> CheckpointResult:
        """Save current workflow state as checkpoint."""
        ...

    @abstractmethod
    async def load_checkpoint(
        self,
        workflow_id: str,
        workflow_type: type[T]
    ) -> LoadResult[T]:
        """Load workflow from last checkpoint."""
        ...

    @abstractmethod
    async def list_pending(
        self,
        workflow_type: str,
        limit: int = 100
    ) -> list[dict]:
        """List all non-completed workflows."""
        ...

    @abstractmethod
    async def get_audit_trail(
        self,
        workflow_id: str
    ) -> list[dict]:
        """Get complete audit trail for workflow."""
        ...

    @abstractmethod
    async def mark_complete(
        self,
        workflow_id: str,
        actor_id: str,
        final_state: str
    ) -> bool:
        """Mark workflow as completed."""
        ...

3. Implementation Strategy (EPCC)

3.1 Phase 1: Explore (2 hours)

Task Duration Deliverable Owner
E1.1 Review existing workflow implementations 30m Notes on serialization needs Dev
E1.2 Design serialization protocol for each workflow 30m to_dict()/from_dict() specs Dev
E1.3 Validate SQLAlchemy async patterns 30m Working async session example Dev
E1.4 Define test strategy 30m Test plan document Dev

Deliverables: - Serialization protocol for ProposalWorkflow, ConsensusWorkflow, OverrideWorkflow - Database connection configuration pattern - Test fixtures design

3.2 Phase 2: Plan (2 hours)

Task Duration Deliverable Owner
P2.1 Create detailed task breakdown 30m GitHub issues or task list Dev
P2.2 Design migration strategy 30m Alembic migration plan Dev
P2.3 Define integration points 30m Workflow modification spec Dev
P2.4 Create test matrix 30m Test case specifications Dev

Deliverables: - Task breakdown with dependencies - Migration scripts outline - Integration test specifications

3.3 Phase 3: Code (6 hours)

3.3.1 Core Persistence Layer (2 hours)

# src/workflows/persistence/__init__.py
# src/workflows/persistence/models.py
# src/workflows/persistence/repository.py
# src/workflows/persistence/engine.py
File Purpose LOC Estimate
models.py SQLAlchemy ORM models ~100
repository.py WorkflowPersistence implementation ~200
engine.py DurableWorkflowEngine wrapper ~150
serializers.py Workflow serialization ~100

3.3.2 Workflow Integration (2 hours)

# Modify existing workflows to support persistence
# src/workflows/proposal.py - Add to_dict(), from_dict()
# src/workflows/consensus.py - Add to_dict(), from_dict()
# src/workflows/override.py - Add to_dict(), from_dict()
Workflow Changes Required
ProposalWorkflow Add serialization, integrate persistence calls
ConsensusWorkflow Add serialization, integrate persistence calls
OverrideWorkflow Add serialization, integrate persistence calls

3.3.3 Database Migrations (1 hour)

# migrations/versions/001_add_workflow_tables.py
  • Create Alembic configuration
  • Write initial migration for all three tables
  • Add indexes and constraints

3.3.4 Testing (1 hour)

# tests/workflows/test_persistence.py
# tests/workflows/test_durable_engine.py
Test Category Test Cases
Unit: Persistence save_checkpoint, load_checkpoint, list_pending
Unit: Serialization to_dict, from_dict for each workflow
Integration: Crash Recovery Simulate restart, verify resume
Integration: Audit Trail Verify all transitions recorded

3.4 Phase 4: Commit (2 hours)

Task Duration Deliverable Owner
C4.1 Run full test suite 30m All tests passing Dev
C4.2 Update documentation 30m README, API docs Dev
C4.3 Update gap-analysis.md 15m Mark GAP-M3 complete Dev
C4.4 Create PR with review checklist 45m PR ready for review Dev

4. Technical Excellence

4.1 Error Handling

class WorkflowPersistenceError(Exception):
    """Base exception for persistence errors."""
    pass

class CheckpointError(WorkflowPersistenceError):
    """Failed to save checkpoint."""
    pass

class LoadError(WorkflowPersistenceError):
    """Failed to load workflow."""
    pass

class IntegrityError(WorkflowPersistenceError):
    """Checkpoint or transition integrity check failed."""
    pass

# Usage pattern with retry
async def save_with_retry(
    persistence: WorkflowPersistence,
    workflow: BaseWorkflow,
    actor_id: str,
    max_retries: int = 3
) -> CheckpointResult:
    """Save checkpoint with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            return await persistence.save_checkpoint(workflow, actor_id)
        except CheckpointError as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

4.2 Performance Considerations

Aspect Target Implementation
Checkpoint latency < 10ms Async non-blocking writes
Load latency < 5ms Index on workflow_id
Connection pool 10-20 connections asyncpg pool configuration
Batch operations Support bulk loads list_pending() with pagination

4.3 Security Considerations

Concern Mitigation
SQL injection SQLAlchemy parameterized queries
Data integrity SHA-256 hash for checkpoints and transitions
Access control Actor ID required for all mutations
Audit completeness Foreign key constraints, no orphan transitions

4.4 Observability

# Telemetry integration
from telemetry import TelemetryEmitter

class WorkflowPersistence:
    def __init__(self, database_url: str, telemetry: TelemetryEmitter):
        self.telemetry = telemetry

    async def save_checkpoint(self, workflow, actor_id):
        start = time.monotonic()
        try:
            result = await self._save_checkpoint_impl(workflow, actor_id)
            self.telemetry.emit({
                "event": "workflow_checkpoint_saved",
                "workflow_id": workflow.workflow_id,
                "checkpoint_number": result.checkpoint_number,
                "latency_ms": (time.monotonic() - start) * 1000
            })
            return result
        except Exception as e:
            self.telemetry.emit({
                "event": "workflow_checkpoint_failed",
                "workflow_id": workflow.workflow_id,
                "error": str(e)
            })
            raise

5. Development Workflow

5.1 Branch Strategy

main
  └── feature/gap-m3-workflow-persistence
        ├── feat/persistence-models
        ├── feat/persistence-repository
        ├── feat/workflow-serialization
        └── feat/durable-engine

5.2 PR Checklist

  • [ ] All new code has type hints
  • [ ] Unit tests for persistence layer (>90% coverage)
  • [ ] Integration tests for crash recovery
  • [ ] Documentation updated
  • [ ] ADR reviewed and approved
  • [ ] No security vulnerabilities (bandit scan)
  • [ ] Performance benchmarks meet targets

5.3 Review Criteria

Category Requirement
Code quality Passes ruff, mypy, black
Test coverage >90% for new code
Documentation API docstrings complete
Security No SQL injection, proper hashing
Performance Checkpoint < 10ms latency

6. Success Criteria & Metrics

6.1 Functional Criteria

Criterion Measurement Target
Workflow resume Restart process, workflow continues 100% success
Audit trail complete All transitions recorded 100%
State integrity Hash verification passes 100%
Concurrent workflows Multiple workflows in parallel Supported

6.2 Non-Functional Criteria

Criterion Measurement Target
Checkpoint latency P99 latency < 10ms
Load latency P99 latency < 5ms
Storage overhead Per workflow instance < 2KB
Test coverage Line coverage for new code > 90%

6.3 Definition of Done

  • [ ] All functional criteria met
  • [ ] All non-functional criteria met
  • [ ] ADR-001 status changed to "Accepted"
  • [ ] GAP-M3 marked as "Implemented" in gap-analysis.md
  • [ ] CI/CD pipeline passing
  • [ ] Code reviewed and approved
  • [ ] Documentation complete

7. Dependencies

7.1 New Dependencies

# pyproject.toml additions
[project.optional-dependencies]
persistence = [
    "sqlalchemy[asyncio]>=2.0.0",
    "asyncpg>=0.29.0",      # PostgreSQL async driver
    "aiosqlite>=0.19.0",    # SQLite async driver (testing)
    "alembic>=1.13.0",      # Database migrations
]

7.2 Existing Dependencies

  • No changes to core dependencies
  • Persistence is optional (graceful degradation)

8. Rollback Plan

If issues arise post-deployment:

  1. Disable persistence: Set AEGIS_PERSISTENCE_ENABLED=false
  2. Workflows fall back to ephemeral mode: In-memory only
  3. No data loss: Existing database records preserved
  4. Fix and re-enable: Deploy fix, set AEGIS_PERSISTENCE_ENABLED=true

9. Timeline

Phase Start End Duration
Explore T+0h T+2h 2 hours
Plan T+2h T+4h 2 hours
Code T+4h T+10h 6 hours
Commit T+10h T+12h 2 hours
Total 12 hours

Changelog

Version Date Author Changes
1.0.0 2025-12-27 Claude Code Initial EPCC plan