GAP-M3: Workflow State Persistence - EPCC Implementation Plan
Version: 1.1.0 Created: 2025-12-27 Completed: 2025-12-27 Gap ID: GAP-M3 Severity: MEDIUM (Production Blocker) Actual Effort: ~10 hours Related ADR: ADR-001-workflow-persistence (Accepted) Status: ✅ IMPLEMENTED
Executive Summary
This plan implements durable workflow state persistence for AEGIS LIBERTAS workflows using SQLAlchemy 2.0 async with PostgreSQL (production) and SQLite (testing). The implementation enables crash recovery, audit trail persistence, and horizontal scaling for governance workflows.
Current State
- Workflows are ephemeral (in-memory only)
- State lost on process restart
- No audit trail for in-progress workflows
- Cannot scale horizontally
Target State
- All workflow state persisted to database
- Automatic resume from last checkpoint on restart
- Complete audit trail for compliance
- Support for horizontal scaling
1. Research & Validation Summary
1.1 Research Sources
| Source | Key Findings | Applicability |
| Temporal.io | Industry-leading durable execution, requires external server | Rejected - too heavy |
| DBOS | Lightweight Python decorators, PostgreSQL backing | Rejected - too opinionated |
| SQLAlchemy 2.0 Async | First-class async support, AsyncSession, asyncpg driver | Selected |
| asyncpg | High-performance PostgreSQL driver, 3x faster than psycopg2 | Selected |
| LangGraph | Checkpoint-based persistence pattern | Pattern adopted |
| FSM-Workflow | Lightweight async workflow with PostgreSQL | Pattern adopted |
| python-statemachine | Abstract persistence layer pattern | Pattern adopted |
1.2 Architecture Decision
Selected Approach: Custom SQLAlchemy Async Persistence with checkpoint pattern
Rationale: 1. Zero external dependencies beyond database 2. Full control over audit trail schema 3. Proven SQLAlchemy ecosystem 4. asyncpg for high performance 5. SQLite for testing simplicity
2. Architecture Overview
2.1 Component Diagram
┌─────────────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ ProposalWorkflow│ │ConsensusWorkflow│ │ OverrideWorkflow│ │
│ │ (Stateful) │ │ (Stateful) │ │ (Stateful) │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
└───────────┼────────────────────┼────────────────────┼───────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ DurableWorkflowEngine │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ run() │ │
│ │ ├── load_checkpoint() → Restore from DB if exists │ │
│ │ ├── execute_task() → Run workflow step │ │
│ │ ├── save_checkpoint() → Persist after each step │ │
│ │ └── finalize() → Mark complete │ │
│ └─────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ WorkflowPersistence │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Async Methods: │ │
│ │ • save_checkpoint(workflow, actor_id) │ │
│ │ • load_checkpoint(workflow_id) → Optional[dict] │ │
│ │ • record_transition(workflow_id, transition) │ │
│ │ • list_pending(workflow_type) → list[dict] │ │
│ │ • get_audit_trail(workflow_id) → list[Transition] │ │
│ │ • mark_complete(workflow_id) │ │
│ └────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────────┬─────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────┐
│ SQLAlchemy 2.0 Async ORM │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ WorkflowInstance│ │WorkflowTransition│ │WorkflowCheckpoint│ │
│ │ (Model) │ │ (Model) │ │ (Model) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└───────────────────────────────────┬─────────────────────────────────┘
│
┌───────────────────────┴───────────────────────┐
▼ ▼
┌───────────────────────────┐ ┌───────────────────────────┐
│ PostgreSQL (asyncpg) │ │ SQLite (aiosqlite) │
│ Production │ │ Testing │
│ • Connection pooling │ │ • In-memory option │
│ • JSONB for state_data │ │ • JSON for state_data │
│ • Concurrent writes │ │ • Single connection │
└───────────────────────────┘ └───────────────────────────┘
2.2 Database Schema
-- workflow_instances: Core workflow state
CREATE TABLE workflow_instances (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_type VARCHAR(50) NOT NULL, -- 'proposal', 'consensus', 'override'
workflow_id VARCHAR(100) NOT NULL UNIQUE, -- External ID
current_state VARCHAR(50) NOT NULL, -- State enum value
state_data JSONB NOT NULL DEFAULT '{}', -- Full serialized state
metadata JSONB DEFAULT '{}', -- Additional metadata
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ, -- NULL = in-progress
parent_workflow_id UUID REFERENCES workflow_instances(id),
CONSTRAINT idx_workflow_unique UNIQUE (workflow_type, workflow_id)
);
CREATE INDEX idx_workflow_type ON workflow_instances(workflow_type);
CREATE INDEX idx_workflow_state ON workflow_instances(current_state);
CREATE INDEX idx_workflow_pending ON workflow_instances(workflow_type)
WHERE completed_at IS NULL;
-- workflow_transitions: Audit trail
CREATE TABLE workflow_transitions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
sequence_number INTEGER NOT NULL, -- Order within workflow
from_state VARCHAR(50) NOT NULL,
to_state VARCHAR(50) NOT NULL,
actor_id VARCHAR(100) NOT NULL,
actor_type VARCHAR(20) NOT NULL, -- 'AI', 'HUMAN', 'GOVERNANCE'
reason TEXT,
metadata JSONB DEFAULT '{}',
transition_hash VARCHAR(64) NOT NULL, -- SHA-256 for integrity
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT idx_transition_sequence UNIQUE (workflow_instance_id, sequence_number)
);
CREATE INDEX idx_transition_workflow ON workflow_transitions(workflow_instance_id);
CREATE INDEX idx_transition_actor ON workflow_transitions(actor_id);
CREATE INDEX idx_transition_created ON workflow_transitions(created_at);
-- workflow_checkpoints: Resume points
CREATE TABLE workflow_checkpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_instance_id UUID NOT NULL REFERENCES workflow_instances(id) ON DELETE CASCADE,
checkpoint_number INTEGER NOT NULL,
state_snapshot JSONB NOT NULL,
checkpoint_hash VARCHAR(64) NOT NULL, -- SHA-256 for integrity
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT idx_checkpoint_unique UNIQUE (workflow_instance_id, checkpoint_number)
);
CREATE INDEX idx_checkpoint_workflow ON workflow_checkpoints(workflow_instance_id);
2.3 API Contract
from abc import ABC, abstractmethod
from typing import Optional, TypeVar, Generic
from dataclasses import dataclass
T = TypeVar('T', bound='BaseWorkflow')
@dataclass
class CheckpointResult:
"""Result of checkpoint operation."""
success: bool
checkpoint_id: str
checkpoint_number: int
transition_count: int
@dataclass
class LoadResult(Generic[T]):
"""Result of loading a workflow."""
workflow: Optional[T]
checkpoint_number: int
transitions_loaded: int
class WorkflowPersistenceProtocol(ABC):
"""Protocol for workflow persistence implementations."""
@abstractmethod
async def save_checkpoint(
self,
workflow: 'BaseWorkflow',
actor_id: str,
force: bool = False
) -> CheckpointResult:
"""Save current workflow state as checkpoint."""
...
@abstractmethod
async def load_checkpoint(
self,
workflow_id: str,
workflow_type: type[T]
) -> LoadResult[T]:
"""Load workflow from last checkpoint."""
...
@abstractmethod
async def list_pending(
self,
workflow_type: str,
limit: int = 100
) -> list[dict]:
"""List all non-completed workflows."""
...
@abstractmethod
async def get_audit_trail(
self,
workflow_id: str
) -> list[dict]:
"""Get complete audit trail for workflow."""
...
@abstractmethod
async def mark_complete(
self,
workflow_id: str,
actor_id: str,
final_state: str
) -> bool:
"""Mark workflow as completed."""
...
3. Implementation Strategy (EPCC)
3.1 Phase 1: Explore (2 hours)
| Task | Duration | Deliverable | Owner |
| E1.1 Review existing workflow implementations | 30m | Notes on serialization needs | Dev |
| E1.2 Design serialization protocol for each workflow | 30m | to_dict()/from_dict() specs | Dev |
| E1.3 Validate SQLAlchemy async patterns | 30m | Working async session example | Dev |
| E1.4 Define test strategy | 30m | Test plan document | Dev |
Deliverables: - Serialization protocol for ProposalWorkflow, ConsensusWorkflow, OverrideWorkflow - Database connection configuration pattern - Test fixtures design
3.2 Phase 2: Plan (2 hours)
| Task | Duration | Deliverable | Owner |
| P2.1 Create detailed task breakdown | 30m | GitHub issues or task list | Dev |
| P2.2 Design migration strategy | 30m | Alembic migration plan | Dev |
| P2.3 Define integration points | 30m | Workflow modification spec | Dev |
| P2.4 Create test matrix | 30m | Test case specifications | Dev |
Deliverables: - Task breakdown with dependencies - Migration scripts outline - Integration test specifications
3.3 Phase 3: Code (6 hours)
3.3.1 Core Persistence Layer (2 hours)
# src/workflows/persistence/__init__.py
# src/workflows/persistence/models.py
# src/workflows/persistence/repository.py
# src/workflows/persistence/engine.py
| File | Purpose | LOC Estimate |
models.py | SQLAlchemy ORM models | ~100 |
repository.py | WorkflowPersistence implementation | ~200 |
engine.py | DurableWorkflowEngine wrapper | ~150 |
serializers.py | Workflow serialization | ~100 |
3.3.2 Workflow Integration (2 hours)
# Modify existing workflows to support persistence
# src/workflows/proposal.py - Add to_dict(), from_dict()
# src/workflows/consensus.py - Add to_dict(), from_dict()
# src/workflows/override.py - Add to_dict(), from_dict()
| Workflow | Changes Required |
| ProposalWorkflow | Add serialization, integrate persistence calls |
| ConsensusWorkflow | Add serialization, integrate persistence calls |
| OverrideWorkflow | Add serialization, integrate persistence calls |
3.3.3 Database Migrations (1 hour)
# migrations/versions/001_add_workflow_tables.py
- Create Alembic configuration
- Write initial migration for all three tables
- Add indexes and constraints
3.3.4 Testing (1 hour)
# tests/workflows/test_persistence.py
# tests/workflows/test_durable_engine.py
| Test Category | Test Cases |
| Unit: Persistence | save_checkpoint, load_checkpoint, list_pending |
| Unit: Serialization | to_dict, from_dict for each workflow |
| Integration: Crash Recovery | Simulate restart, verify resume |
| Integration: Audit Trail | Verify all transitions recorded |
3.4 Phase 4: Commit (2 hours)
| Task | Duration | Deliverable | Owner |
| C4.1 Run full test suite | 30m | All tests passing | Dev |
| C4.2 Update documentation | 30m | README, API docs | Dev |
| C4.3 Update gap-analysis.md | 15m | Mark GAP-M3 complete | Dev |
| C4.4 Create PR with review checklist | 45m | PR ready for review | Dev |
4. Technical Excellence
4.1 Error Handling
class WorkflowPersistenceError(Exception):
"""Base exception for persistence errors."""
pass
class CheckpointError(WorkflowPersistenceError):
"""Failed to save checkpoint."""
pass
class LoadError(WorkflowPersistenceError):
"""Failed to load workflow."""
pass
class IntegrityError(WorkflowPersistenceError):
"""Checkpoint or transition integrity check failed."""
pass
# Usage pattern with retry
async def save_with_retry(
persistence: WorkflowPersistence,
workflow: BaseWorkflow,
actor_id: str,
max_retries: int = 3
) -> CheckpointResult:
"""Save checkpoint with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await persistence.save_checkpoint(workflow, actor_id)
except CheckpointError as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
| Aspect | Target | Implementation |
| Checkpoint latency | < 10ms | Async non-blocking writes |
| Load latency | < 5ms | Index on workflow_id |
| Connection pool | 10-20 connections | asyncpg pool configuration |
| Batch operations | Support bulk loads | list_pending() with pagination |
4.3 Security Considerations
| Concern | Mitigation |
| SQL injection | SQLAlchemy parameterized queries |
| Data integrity | SHA-256 hash for checkpoints and transitions |
| Access control | Actor ID required for all mutations |
| Audit completeness | Foreign key constraints, no orphan transitions |
4.4 Observability
# Telemetry integration
from telemetry import TelemetryEmitter
class WorkflowPersistence:
def __init__(self, database_url: str, telemetry: TelemetryEmitter):
self.telemetry = telemetry
async def save_checkpoint(self, workflow, actor_id):
start = time.monotonic()
try:
result = await self._save_checkpoint_impl(workflow, actor_id)
self.telemetry.emit({
"event": "workflow_checkpoint_saved",
"workflow_id": workflow.workflow_id,
"checkpoint_number": result.checkpoint_number,
"latency_ms": (time.monotonic() - start) * 1000
})
return result
except Exception as e:
self.telemetry.emit({
"event": "workflow_checkpoint_failed",
"workflow_id": workflow.workflow_id,
"error": str(e)
})
raise
5. Development Workflow
5.1 Branch Strategy
main
└── feature/gap-m3-workflow-persistence
├── feat/persistence-models
├── feat/persistence-repository
├── feat/workflow-serialization
└── feat/durable-engine
5.2 PR Checklist
- [ ] All new code has type hints
- [ ] Unit tests for persistence layer (>90% coverage)
- [ ] Integration tests for crash recovery
- [ ] Documentation updated
- [ ] ADR reviewed and approved
- [ ] No security vulnerabilities (bandit scan)
- [ ] Performance benchmarks meet targets
5.3 Review Criteria
| Category | Requirement |
| Code quality | Passes ruff, mypy, black |
| Test coverage | >90% for new code |
| Documentation | API docstrings complete |
| Security | No SQL injection, proper hashing |
| Performance | Checkpoint < 10ms latency |
6. Success Criteria & Metrics
6.1 Functional Criteria
| Criterion | Measurement | Target |
| Workflow resume | Restart process, workflow continues | 100% success |
| Audit trail complete | All transitions recorded | 100% |
| State integrity | Hash verification passes | 100% |
| Concurrent workflows | Multiple workflows in parallel | Supported |
6.2 Non-Functional Criteria
| Criterion | Measurement | Target |
| Checkpoint latency | P99 latency | < 10ms |
| Load latency | P99 latency | < 5ms |
| Storage overhead | Per workflow instance | < 2KB |
| Test coverage | Line coverage for new code | > 90% |
6.3 Definition of Done
- [ ] All functional criteria met
- [ ] All non-functional criteria met
- [ ] ADR-001 status changed to "Accepted"
- [ ] GAP-M3 marked as "Implemented" in gap-analysis.md
- [ ] CI/CD pipeline passing
- [ ] Code reviewed and approved
- [ ] Documentation complete
7. Dependencies
7.1 New Dependencies
# pyproject.toml additions
[project.optional-dependencies]
persistence = [
"sqlalchemy[asyncio]>=2.0.0",
"asyncpg>=0.29.0", # PostgreSQL async driver
"aiosqlite>=0.19.0", # SQLite async driver (testing)
"alembic>=1.13.0", # Database migrations
]
7.2 Existing Dependencies
- No changes to core dependencies
- Persistence is optional (graceful degradation)
8. Rollback Plan
If issues arise post-deployment:
- Disable persistence: Set
AEGIS_PERSISTENCE_ENABLED=false - Workflows fall back to ephemeral mode: In-memory only
- No data loss: Existing database records preserved
- Fix and re-enable: Deploy fix, set
AEGIS_PERSISTENCE_ENABLED=true
9. Timeline
| Phase | Start | End | Duration |
| Explore | T+0h | T+2h | 2 hours |
| Plan | T+2h | T+4h | 2 hours |
| Code | T+4h | T+10h | 6 hours |
| Commit | T+10h | T+12h | 2 hours |
| Total | | | 12 hours |
Changelog
| Version | Date | Author | Changes |
| 1.0.0 | 2025-12-27 | Claude Code | Initial EPCC plan |