-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Summary
Add saga orchestration infrastructure to coordinate multi-step, event-driven processes (e.g. convention initialization) with typed event flow, compensation on failure, and a Result monad (Ok/Err) for all event handlers.
Motivation
Convention initialization currently works as fire-and-forget event choreography (ConventionRegistered → CreateFeatureTables → ConventionReady). There is no:
- Progress tracking — no way to know if initialization stalled
- Failure handling — if
CreateFeatureTablesfails after max retries, nothing happens - Compensation — no rollback of partially completed steps
- Timeout detection — stuck processes go unnoticed
As we add more multi-step processes (deposition workflow, source ingestion), this gap becomes a real problem. We need reusable orchestration infrastructure — not ad-hoc status fields sprinkled across aggregates.
Design decisions
-
Every handler returns
Ok[Event] | Err[Event]— Result monad, not exceptions, for domain outcomes. Infrastructure crashes (DB down, OOM) remain exceptions handled by Worker retry/dead-letter. Every handler declares both success and failure event types — no silentNonereturns. -
Handlers build per-step events — the handler just did the work and has the data. It returns a typed event wrapped in
OkorErr. Handlers never touch the outbox for saga-significant events. -
Orchestrator owns all saga event emission — the Worker does NOT emit events for saga handlers. It passes the handler result to the orchestrator via
handle_result(handler_type, result). The orchestrator validates the event against the step definition, emits it, and updates saga state. For non-saga handlers, the Worker emits directly. -
Two levels of events: per-step and saga-level — per-step events (
SchemaCreated,FeatureTablesFailed) are built by handlers and describe what a step did. Saga-level events (ConventionReady,ConventionInitFailed) are built by the orchestrator viaon_completed/on_failedand describe what the process as a whole achieved. Both are emitted. This separation exists because step handlers live in their own domains (e.g. feature domain) and shouldn't emit events about cross-domain processes (e.g. convention initialization). -
Compensation events are commands, not observations — success/failure events are bottom-up ("this happened"), built by handlers. Compensation events are top-down ("undo this"), built by the orchestrator via
on_compensate. The orchestrator initiates compensation — no handler is running at that point. Compensation events are lean (just the identifier) because compensation handlers should act on current state, not a stale snapshot. -
Side-effect events via injected outbox — the returned event is the saga-significant one. Anything else (audit logs, notifications) goes through the outbox directly.
-
SagaStepis a frozen dataclass with class references —handler,success,failure,compensationare alltype[Event]ortype[EventHandler]. No lambdas, no callables. Step hascheck_success()/check_failure()validation methods (isinstance checks). -
SagaDef[TTrigger, TId]is generic over trigger event and typed identifier —TTriggertypesextract_id()for LSP-safe type narrowing.TIdtypes all factory methods so they receive e.g.ConventionSRN, notstr. String serialization only at the storage boundary. Both are load-bearing for type safety. -
correlation_fieldon SagaDef — all events in a saga share a common identifier field (e.g.convention_srn). The orchestrator uses this to extract the process ID from any event in the chain. -
Inter-step flow via event type chaining — step 1's
successtype is step 2's handlerTIntype. The orchestrator emits the step's success event via the outbox; the next handler picks it up via normal event dispatch. -
Handlers are saga-unaware —
HandlerSagaMappingbuilt at startup from saga definitions. Handlers have zero saga-related code. Worker checks the mapping to decide whether to route results to the orchestrator. -
Saga state in its own table —
saga_statekeyed by(saga_type, process_id). Not on domain aggregates. Clean separation.
Core types
domain/shared/result.py
@dataclass(frozen=True, slots=True)
class Ok(Generic[T]):
event: T
@dataclass(frozen=True, slots=True)
class Err(Generic[T]):
event: Tdomain/shared/event.py
TIn = TypeVar("TIn", bound=Event)
TSuccess = TypeVar("TSuccess", bound=Event)
TFailure = TypeVar("TFailure", bound=Event)
class EventHandler(ABC, Generic[TIn, TSuccess, TFailure]):
@abstractmethod
async def handle(self, event: TIn) -> Ok[TSuccess] | Err[TFailure]: ...domain/shared/saga.py
TTrigger = TypeVar("TTrigger", bound=Event)
TId = TypeVar("TId")
@dataclass(frozen=True)
class SagaStep:
name: str
handler: type[EventHandler]
success: type[Event]
failure: type[Event] | None = None
compensation: type[Event] | None = None
def check_success(self, event: Event) -> Event:
if not isinstance(event, self.success):
raise SagaContractViolation(
f"{self.name}: expected {self.success.__name__}, "
f"got {type(event).__name__}"
)
return event
def check_failure(self, event: Event) -> Event:
if self.failure is None or not isinstance(event, self.failure):
raise SagaContractViolation(
f"{self.name}: unexpected failure {type(event).__name__}"
)
return event
class SagaStatus(StrEnum):
RUNNING = "running"
COMPENSATING = "compensating"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SagaState:
saga_id: str
saga_type: str
process_id: str
status: SagaStatus = SagaStatus.RUNNING
current_step: int = 0
completed_steps: list[str] = field(default_factory=list)
started_at: datetime = field(default_factory=lambda: datetime.now(UTC))
error: str | None = None
class SagaStore(Protocol):
async def save(self, state: SagaState) -> None: ...
async def get(self, saga_type: str, process_id: str) -> SagaState | None: ...
class SagaDef(ABC, Generic[TTrigger, TId]):
correlation_field: ClassVar[str]
@property
@abstractmethod
def name(self) -> str: ...
@property
@abstractmethod
def steps(self) -> tuple[SagaStep, ...]: ...
# ── Identity ──
@abstractmethod
def extract_id(self, event: TTrigger) -> TId: ...
@abstractmethod
def serialize_id(self, id: TId) -> str: ...
@abstractmethod
def deserialize_id(self, raw: str) -> TId: ...
def get_process_id(self, event: Event) -> str:
"""Extract process_id from any event in the saga via correlation_field."""
return self.serialize_id(getattr(event, self.correlation_field))
# ── Saga-level event factories (orchestrator calls these) ──
@abstractmethod
async def on_completed(self, id: TId) -> Event: ...
@abstractmethod
async def on_failed(self, id: TId, error: str) -> Event: ...
@abstractmethod
async def on_compensate(self, id: TId, step: SagaStep) -> Event | None: ...
# ── Wiring (built at startup from saga definitions) ──
HandlerSagaMapping = NewType(
"HandlerSagaMapping",
dict[type[EventHandler], tuple[str, str]], # handler → (saga_type, step_name)
)
SagaRegistry = NewType("SagaRegistry", dict[str, SagaDef])
def build_handler_saga_mapping(sagas: list[SagaDef]) -> HandlerSagaMapping:
mapping: dict[type[EventHandler], tuple[str, str]] = {}
for saga in sagas:
for step in saga.steps:
if step.handler in mapping:
raise ValueError(f"{step.handler.__name__} registered in multiple sagas")
mapping[step.handler] = (saga.name, step.name)
return HandlerSagaMapping(mapping)infrastructure/saga/orchestrator.py
The orchestrator owns all event emission for saga handlers. The Worker passes it handler_type and result — that's the entire interface.
class SagaOrchestrator:
def __init__(
self,
store: SagaStore,
outbox: Outbox,
registry: SagaRegistry,
handler_mapping: HandlerSagaMapping,
) -> None:
self._store = store
self._outbox = outbox
self._registry = registry
self._handler_mapping = handler_mapping
async def handle_result(
self,
handler_type: type[EventHandler],
result: Ok[Event] | Err[Event],
) -> None:
"""Single entry point from Worker. Validates, emits, and tracks."""
saga_type, step_name = self._handler_mapping[handler_type]
defn = self._registry[saga_type]
step = next(s for s in defn.steps if s.name == step_name)
match result:
case Ok(event=out):
step.check_success(out)
await self._outbox.append(out)
process_id = defn.get_process_id(out)
state = await self._get_or_create(saga_type, process_id)
if state.status != SagaStatus.RUNNING:
return
state.completed_steps.append(step_name)
state.current_step += 1
if state.current_step >= len(defn.steps):
state.status = SagaStatus.COMPLETED
await self._store.save(state)
typed_id = defn.deserialize_id(process_id)
await self._outbox.append(await defn.on_completed(typed_id))
else:
await self._store.save(state)
case Err(event=out):
step.check_failure(out)
await self._outbox.append(out)
process_id = defn.get_process_id(out)
state = await self._get_or_create(saga_type, process_id)
if state.status != SagaStatus.RUNNING:
return
typed_id = defn.deserialize_id(process_id)
state.status = SagaStatus.COMPENSATING
await self._store.save(state)
for name in reversed(state.completed_steps):
s = next(s for s in defn.steps if s.name == name)
event = await defn.on_compensate(typed_id, s)
if event:
await self._outbox.append(event)
state.status = SagaStatus.FAILED
state.error = step_name
await self._store.save(state)
await self._outbox.append(
await defn.on_failed(typed_id, step_name)
)
async def _get_or_create(
self, saga_type: str, process_id: str,
) -> SagaState:
state = await self._store.get(saga_type, process_id)
if state is None:
state = SagaState(
saga_id=str(uuid4()),
saga_type=saga_type,
process_id=process_id,
)
return stateWorker integration
The Worker is minimal — it runs the handler and routes the result. For saga handlers it delegates entirely to the orchestrator. For non-saga handlers it emits directly.
result = await handler.handle(event)
if type(handler) in self._handler_saga_mapping:
# Saga handler — orchestrator validates, emits, and tracks
await self._orchestrator.handle_result(type(handler), result)
else:
# Non-saga handler — emit directly
match result:
case Ok(event=out):
await self._outbox.append(out)
case Err(event=out):
await self._outbox.append(out)saga_state table
saga_state_table = sa.Table(
"saga_state",
mapper_registry.metadata,
sa.Column("saga_id", sa.String, nullable=False, unique=True),
sa.Column("saga_type", sa.String(128), nullable=False),
sa.Column("process_id", sa.String, nullable=False),
sa.Column("status", sa.String(32), nullable=False),
sa.Column("current_step", sa.Integer, nullable=False),
sa.Column("completed_steps", sa.JSON, nullable=False),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("error", sa.Text, nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("saga_type", "process_id"),
)Example: ConventionInit saga
Events
Per-step events (built by handlers):
class SchemaCreated(Event):
convention_srn: ConventionSRN
schema_srn: SchemaSRN
field_count: int
class SchemaCreationFailed(Event):
convention_srn: ConventionSRN
error: str
class FeatureTablesCreated(Event):
convention_srn: ConventionSRN
table_count: int
class FeatureTablesFailed(Event):
convention_srn: ConventionSRN
failed_hook: strSaga-level events (built by orchestrator via SagaDef):
class ConventionReady(Event):
convention_srn: ConventionSRN
class ConventionInitFailed(Event):
convention_srn: ConventionSRN
error: strCompensation commands (built by orchestrator via SagaDef):
class CompensateSchema(Event):
convention_srn: ConventionSRN
class CompensateFeatureTables(Event):
convention_srn: ConventionSRNSaga definition
class ConventionInit(SagaDef[ConventionRegistered, ConventionSRN]):
name = "convention_init"
correlation_field = "convention_srn"
steps = (
SagaStep(
name="create_schema",
handler=CreateConventionSchema,
success=SchemaCreated,
failure=SchemaCreationFailed,
compensation=CompensateSchema,
),
SagaStep(
name="create_feature_tables",
handler=CreateFeatureTables,
success=FeatureTablesCreated,
failure=FeatureTablesFailed,
compensation=CompensateFeatureTables,
),
)
def extract_id(self, event: ConventionRegistered) -> ConventionSRN:
return event.convention_srn
def serialize_id(self, id: ConventionSRN) -> str:
return str(id)
def deserialize_id(self, raw: str) -> ConventionSRN:
return ConventionSRN.parse(raw)
async def on_completed(self, id: ConventionSRN) -> ConventionReady:
return ConventionReady(id=EventId(uuid4()), convention_srn=id)
async def on_failed(self, id: ConventionSRN, error: str) -> ConventionInitFailed:
return ConventionInitFailed(
id=EventId(uuid4()), convention_srn=id, error=error,
)
async def on_compensate(
self, id: ConventionSRN, step: SagaStep,
) -> Event | None:
if step.compensation is None:
return None
return step.compensation(id=EventId(uuid4()), convention_srn=id)Saga handlers
class CreateConventionSchema(
EventHandler[ConventionRegistered, SchemaCreated, SchemaCreationFailed]
):
schema_service: SchemaService
async def handle(
self, event: ConventionRegistered,
) -> Ok[SchemaCreated] | Err[SchemaCreationFailed]:
try:
schema = await self.schema_service.create_schema(
title=event.title,
fields=event.schema_fields,
)
except DomainError as e:
return Err(SchemaCreationFailed(
id=EventId(uuid4()),
convention_srn=event.convention_srn,
error=str(e),
))
return Ok(SchemaCreated(
id=EventId(uuid4()),
convention_srn=event.convention_srn,
schema_srn=schema.srn,
field_count=len(event.schema_fields),
))
class CreateFeatureTables(
EventHandler[SchemaCreated, FeatureTablesCreated, FeatureTablesFailed]
):
convention_repo: ConventionRepository
feature_service: FeatureService
async def handle(
self, event: SchemaCreated,
) -> Ok[FeatureTablesCreated] | Err[FeatureTablesFailed]:
convention = await self.convention_repo.get(event.convention_srn)
for hook in convention.hooks:
try:
await self.feature_service.create_table(hook)
except ConflictError:
return Err(FeatureTablesFailed(
id=EventId(uuid4()),
convention_srn=event.convention_srn,
failed_hook=hook.name,
))
return Ok(FeatureTablesCreated(
id=EventId(uuid4()),
convention_srn=event.convention_srn,
table_count=len(convention.hooks),
))Compensation handler
class UndoFeatureTables(
EventHandler[CompensateFeatureTables, TablesCompensated, CompensationFailed]
):
convention_repo: ConventionRepository
feature_service: FeatureService
async def handle(
self, event: CompensateFeatureTables,
) -> Ok[TablesCompensated] | Err[CompensationFailed]:
# Compensation event is lean — query repo for current state to undo
convention = await self.convention_repo.get(event.convention_srn)
for hook in convention.hooks:
await self.feature_service.drop_table(hook)
return Ok(TablesCompensated(
id=EventId(uuid4()), convention_srn=event.convention_srn,
))Non-saga handler
class IndexPublishedRecord(
EventHandler[RecordPublished, RecordIndexed, IndexingFailed]
):
search_service: SearchService
async def handle(
self, event: RecordPublished,
) -> Ok[RecordIndexed] | Err[IndexingFailed]:
await self.search_service.index(event.record_srn, event.metadata)
return Ok(RecordIndexed(
id=EventId(uuid4()), record_srn=event.record_srn,
))Handler with side-effect events
class CreateConventionSchema(
EventHandler[ConventionRegistered, SchemaCreated, SchemaCreationFailed]
):
schema_service: SchemaService
outbox: Outbox # for side effects only
async def handle(
self, event: ConventionRegistered,
) -> Ok[SchemaCreated] | Err[SchemaCreationFailed]:
schema = await self.schema_service.create_schema(...)
# Side effect — not saga-significant, emitted directly
await self.outbox.append(AuditEntry(
id=EventId(uuid4()),
action="schema_created",
resource=str(event.convention_srn),
))
# Saga-significant — returned, validated and emitted by orchestrator
return Ok(SchemaCreated(
id=EventId(uuid4()),
convention_srn=event.convention_srn,
schema_srn=schema.srn,
field_count=len(event.schema_fields),
))Event flow
Happy path
ConventionService.create_convention()
→ saves Convention aggregate
→ outbox.append(ConventionRegistered(srn, hooks, schema_fields))
Worker picks up ConventionRegistered
→ dispatches to CreateConventionSchema
→ handler returns Ok(SchemaCreated(srn, schema_srn, field_count))
→ Worker calls orchestrator.handle_result(CreateConventionSchema, result)
→ Orchestrator validates via step.check_success()
→ Orchestrator emits SchemaCreated via outbox (per-step event)
→ Orchestrator creates saga state, marks step 1 done
Worker picks up SchemaCreated
→ dispatches to CreateFeatureTables
→ handler returns Ok(FeatureTablesCreated(srn, table_count))
→ Worker calls orchestrator.handle_result(CreateFeatureTables, result)
→ Orchestrator validates, emits FeatureTablesCreated (per-step event)
→ Orchestrator sees all steps done
→ Orchestrator emits ConventionReady (saga-level event via on_completed)
Failure path
Worker picks up SchemaCreated
→ dispatches to CreateFeatureTables
→ handler returns Err(FeatureTablesFailed(srn, failed_hook))
→ Worker calls orchestrator.handle_result(CreateFeatureTables, result)
→ Orchestrator validates via step.check_failure()
→ Orchestrator emits FeatureTablesFailed (per-step failure event)
→ Orchestrator compensates completed steps in reverse:
→ on_compensate(srn, step="create_schema") → emits CompensateSchema
→ Orchestrator emits ConventionInitFailed (saga-level failure event via on_failed)
Scope
In scope
Ok/ErrResult typesEventHandler[TIn, TSuccess, TFailure]— updated contract for all handlersSagaStep,SagaDef[TTrigger, TId],SagaState,SagaStore,SagaOrchestratorHandlerSagaMapping+build_handler_saga_mapping()startup wiringsaga_statetable + Alembic migration +PostgresSagaStoreadapterSagaOrchestrator.handle_result(handler_type, result)— minimal Worker→Orchestrator interface- Worker integration (route saga handler results to orchestrator, emit directly for non-saga)
ConventionInitsaga definition (first concrete saga)- Migrate existing
CreateFeatureTableshandler to new contract - Migrate all existing event handlers to return
Ok | Err
Out of scope
- Timeout detection / stuck saga reaper (follow-up issue)
- Saga dashboard / observability UI
- Additional saga definitions beyond ConventionInit