Skip to content

feat: saga orchestration with Result monad and typed event flow #78

@rorybyrne

Description

@rorybyrne

Summary

Add saga orchestration infrastructure to coordinate multi-step, event-driven processes (e.g. convention initialization) with typed event flow, compensation on failure, and a Result monad (Ok/Err) for all event handlers.

Motivation

Convention initialization currently works as fire-and-forget event choreography (ConventionRegistered → CreateFeatureTables → ConventionReady). There is no:

  • Progress tracking — no way to know if initialization stalled
  • Failure handling — if CreateFeatureTables fails after max retries, nothing happens
  • Compensation — no rollback of partially completed steps
  • Timeout detection — stuck processes go unnoticed

As we add more multi-step processes (deposition workflow, source ingestion), this gap becomes a real problem. We need reusable orchestration infrastructure — not ad-hoc status fields sprinkled across aggregates.

Design decisions

  1. Every handler returns Ok[Event] | Err[Event] — Result monad, not exceptions, for domain outcomes. Infrastructure crashes (DB down, OOM) remain exceptions handled by Worker retry/dead-letter. Every handler declares both success and failure event types — no silent None returns.

  2. Handlers build per-step events — the handler just did the work and has the data. It returns a typed event wrapped in Ok or Err. Handlers never touch the outbox for saga-significant events.

  3. Orchestrator owns all saga event emission — the Worker does NOT emit events for saga handlers. It passes the handler result to the orchestrator via handle_result(handler_type, result). The orchestrator validates the event against the step definition, emits it, and updates saga state. For non-saga handlers, the Worker emits directly.

  4. Two levels of events: per-step and saga-level — per-step events (SchemaCreated, FeatureTablesFailed) are built by handlers and describe what a step did. Saga-level events (ConventionReady, ConventionInitFailed) are built by the orchestrator via on_completed/on_failed and describe what the process as a whole achieved. Both are emitted. This separation exists because step handlers live in their own domains (e.g. feature domain) and shouldn't emit events about cross-domain processes (e.g. convention initialization).

  5. Compensation events are commands, not observations — success/failure events are bottom-up ("this happened"), built by handlers. Compensation events are top-down ("undo this"), built by the orchestrator via on_compensate. The orchestrator initiates compensation — no handler is running at that point. Compensation events are lean (just the identifier) because compensation handlers should act on current state, not a stale snapshot.

  6. Side-effect events via injected outbox — the returned event is the saga-significant one. Anything else (audit logs, notifications) goes through the outbox directly.

  7. SagaStep is a frozen dataclass with class referenceshandler, success, failure, compensation are all type[Event] or type[EventHandler]. No lambdas, no callables. Step has check_success() / check_failure() validation methods (isinstance checks).

  8. SagaDef[TTrigger, TId] is generic over trigger event and typed identifierTTrigger types extract_id() for LSP-safe type narrowing. TId types all factory methods so they receive e.g. ConventionSRN, not str. String serialization only at the storage boundary. Both are load-bearing for type safety.

  9. correlation_field on SagaDef — all events in a saga share a common identifier field (e.g. convention_srn). The orchestrator uses this to extract the process ID from any event in the chain.

  10. Inter-step flow via event type chaining — step 1's success type is step 2's handler TIn type. The orchestrator emits the step's success event via the outbox; the next handler picks it up via normal event dispatch.

  11. Handlers are saga-unawareHandlerSagaMapping built at startup from saga definitions. Handlers have zero saga-related code. Worker checks the mapping to decide whether to route results to the orchestrator.

  12. Saga state in its own tablesaga_state keyed by (saga_type, process_id). Not on domain aggregates. Clean separation.

Core types

domain/shared/result.py

@dataclass(frozen=True, slots=True)
class Ok(Generic[T]):
    event: T


@dataclass(frozen=True, slots=True)
class Err(Generic[T]):
    event: T

domain/shared/event.py

TIn = TypeVar("TIn", bound=Event)
TSuccess = TypeVar("TSuccess", bound=Event)
TFailure = TypeVar("TFailure", bound=Event)


class EventHandler(ABC, Generic[TIn, TSuccess, TFailure]):
    @abstractmethod
    async def handle(self, event: TIn) -> Ok[TSuccess] | Err[TFailure]: ...

domain/shared/saga.py

TTrigger = TypeVar("TTrigger", bound=Event)
TId = TypeVar("TId")


@dataclass(frozen=True)
class SagaStep:
    name: str
    handler: type[EventHandler]
    success: type[Event]
    failure: type[Event] | None = None
    compensation: type[Event] | None = None

    def check_success(self, event: Event) -> Event:
        if not isinstance(event, self.success):
            raise SagaContractViolation(
                f"{self.name}: expected {self.success.__name__}, "
                f"got {type(event).__name__}"
            )
        return event

    def check_failure(self, event: Event) -> Event:
        if self.failure is None or not isinstance(event, self.failure):
            raise SagaContractViolation(
                f"{self.name}: unexpected failure {type(event).__name__}"
            )
        return event


class SagaStatus(StrEnum):
    RUNNING = "running"
    COMPENSATING = "compensating"
    COMPLETED = "completed"
    FAILED = "failed"


@dataclass
class SagaState:
    saga_id: str
    saga_type: str
    process_id: str
    status: SagaStatus = SagaStatus.RUNNING
    current_step: int = 0
    completed_steps: list[str] = field(default_factory=list)
    started_at: datetime = field(default_factory=lambda: datetime.now(UTC))
    error: str | None = None


class SagaStore(Protocol):
    async def save(self, state: SagaState) -> None: ...
    async def get(self, saga_type: str, process_id: str) -> SagaState | None: ...


class SagaDef(ABC, Generic[TTrigger, TId]):
    correlation_field: ClassVar[str]

    @property
    @abstractmethod
    def name(self) -> str: ...

    @property
    @abstractmethod
    def steps(self) -> tuple[SagaStep, ...]: ...

    # ── Identity ──

    @abstractmethod
    def extract_id(self, event: TTrigger) -> TId: ...

    @abstractmethod
    def serialize_id(self, id: TId) -> str: ...

    @abstractmethod
    def deserialize_id(self, raw: str) -> TId: ...

    def get_process_id(self, event: Event) -> str:
        """Extract process_id from any event in the saga via correlation_field."""
        return self.serialize_id(getattr(event, self.correlation_field))

    # ── Saga-level event factories (orchestrator calls these) ──

    @abstractmethod
    async def on_completed(self, id: TId) -> Event: ...

    @abstractmethod
    async def on_failed(self, id: TId, error: str) -> Event: ...

    @abstractmethod
    async def on_compensate(self, id: TId, step: SagaStep) -> Event | None: ...


# ── Wiring (built at startup from saga definitions) ──

HandlerSagaMapping = NewType(
    "HandlerSagaMapping",
    dict[type[EventHandler], tuple[str, str]],  # handler → (saga_type, step_name)
)

SagaRegistry = NewType("SagaRegistry", dict[str, SagaDef])


def build_handler_saga_mapping(sagas: list[SagaDef]) -> HandlerSagaMapping:
    mapping: dict[type[EventHandler], tuple[str, str]] = {}
    for saga in sagas:
        for step in saga.steps:
            if step.handler in mapping:
                raise ValueError(f"{step.handler.__name__} registered in multiple sagas")
            mapping[step.handler] = (saga.name, step.name)
    return HandlerSagaMapping(mapping)

infrastructure/saga/orchestrator.py

The orchestrator owns all event emission for saga handlers. The Worker passes it handler_type and result — that's the entire interface.

class SagaOrchestrator:
    def __init__(
        self,
        store: SagaStore,
        outbox: Outbox,
        registry: SagaRegistry,
        handler_mapping: HandlerSagaMapping,
    ) -> None:
        self._store = store
        self._outbox = outbox
        self._registry = registry
        self._handler_mapping = handler_mapping

    async def handle_result(
        self,
        handler_type: type[EventHandler],
        result: Ok[Event] | Err[Event],
    ) -> None:
        """Single entry point from Worker. Validates, emits, and tracks."""
        saga_type, step_name = self._handler_mapping[handler_type]
        defn = self._registry[saga_type]
        step = next(s for s in defn.steps if s.name == step_name)

        match result:
            case Ok(event=out):
                step.check_success(out)
                await self._outbox.append(out)

                process_id = defn.get_process_id(out)
                state = await self._get_or_create(saga_type, process_id)

                if state.status != SagaStatus.RUNNING:
                    return

                state.completed_steps.append(step_name)
                state.current_step += 1

                if state.current_step >= len(defn.steps):
                    state.status = SagaStatus.COMPLETED
                    await self._store.save(state)
                    typed_id = defn.deserialize_id(process_id)
                    await self._outbox.append(await defn.on_completed(typed_id))
                else:
                    await self._store.save(state)

            case Err(event=out):
                step.check_failure(out)
                await self._outbox.append(out)

                process_id = defn.get_process_id(out)
                state = await self._get_or_create(saga_type, process_id)

                if state.status != SagaStatus.RUNNING:
                    return

                typed_id = defn.deserialize_id(process_id)

                state.status = SagaStatus.COMPENSATING
                await self._store.save(state)

                for name in reversed(state.completed_steps):
                    s = next(s for s in defn.steps if s.name == name)
                    event = await defn.on_compensate(typed_id, s)
                    if event:
                        await self._outbox.append(event)

                state.status = SagaStatus.FAILED
                state.error = step_name
                await self._store.save(state)
                await self._outbox.append(
                    await defn.on_failed(typed_id, step_name)
                )

    async def _get_or_create(
        self, saga_type: str, process_id: str,
    ) -> SagaState:
        state = await self._store.get(saga_type, process_id)
        if state is None:
            state = SagaState(
                saga_id=str(uuid4()),
                saga_type=saga_type,
                process_id=process_id,
            )
        return state

Worker integration

The Worker is minimal — it runs the handler and routes the result. For saga handlers it delegates entirely to the orchestrator. For non-saga handlers it emits directly.

result = await handler.handle(event)

if type(handler) in self._handler_saga_mapping:
    # Saga handler — orchestrator validates, emits, and tracks
    await self._orchestrator.handle_result(type(handler), result)
else:
    # Non-saga handler — emit directly
    match result:
        case Ok(event=out):
            await self._outbox.append(out)
        case Err(event=out):
            await self._outbox.append(out)

saga_state table

saga_state_table = sa.Table(
    "saga_state",
    mapper_registry.metadata,
    sa.Column("saga_id", sa.String, nullable=False, unique=True),
    sa.Column("saga_type", sa.String(128), nullable=False),
    sa.Column("process_id", sa.String, nullable=False),
    sa.Column("status", sa.String(32), nullable=False),
    sa.Column("current_step", sa.Integer, nullable=False),
    sa.Column("completed_steps", sa.JSON, nullable=False),
    sa.Column("started_at", sa.DateTime(timezone=True), nullable=False),
    sa.Column("error", sa.Text, nullable=True),
    sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
    sa.PrimaryKeyConstraint("saga_type", "process_id"),
)

Example: ConventionInit saga

Events

Per-step events (built by handlers):

class SchemaCreated(Event):
    convention_srn: ConventionSRN
    schema_srn: SchemaSRN
    field_count: int

class SchemaCreationFailed(Event):
    convention_srn: ConventionSRN
    error: str

class FeatureTablesCreated(Event):
    convention_srn: ConventionSRN
    table_count: int

class FeatureTablesFailed(Event):
    convention_srn: ConventionSRN
    failed_hook: str

Saga-level events (built by orchestrator via SagaDef):

class ConventionReady(Event):
    convention_srn: ConventionSRN

class ConventionInitFailed(Event):
    convention_srn: ConventionSRN
    error: str

Compensation commands (built by orchestrator via SagaDef):

class CompensateSchema(Event):
    convention_srn: ConventionSRN

class CompensateFeatureTables(Event):
    convention_srn: ConventionSRN

Saga definition

class ConventionInit(SagaDef[ConventionRegistered, ConventionSRN]):
    name = "convention_init"
    correlation_field = "convention_srn"

    steps = (
        SagaStep(
            name="create_schema",
            handler=CreateConventionSchema,
            success=SchemaCreated,
            failure=SchemaCreationFailed,
            compensation=CompensateSchema,
        ),
        SagaStep(
            name="create_feature_tables",
            handler=CreateFeatureTables,
            success=FeatureTablesCreated,
            failure=FeatureTablesFailed,
            compensation=CompensateFeatureTables,
        ),
    )

    def extract_id(self, event: ConventionRegistered) -> ConventionSRN:
        return event.convention_srn

    def serialize_id(self, id: ConventionSRN) -> str:
        return str(id)

    def deserialize_id(self, raw: str) -> ConventionSRN:
        return ConventionSRN.parse(raw)

    async def on_completed(self, id: ConventionSRN) -> ConventionReady:
        return ConventionReady(id=EventId(uuid4()), convention_srn=id)

    async def on_failed(self, id: ConventionSRN, error: str) -> ConventionInitFailed:
        return ConventionInitFailed(
            id=EventId(uuid4()), convention_srn=id, error=error,
        )

    async def on_compensate(
        self, id: ConventionSRN, step: SagaStep,
    ) -> Event | None:
        if step.compensation is None:
            return None
        return step.compensation(id=EventId(uuid4()), convention_srn=id)

Saga handlers

class CreateConventionSchema(
    EventHandler[ConventionRegistered, SchemaCreated, SchemaCreationFailed]
):
    schema_service: SchemaService

    async def handle(
        self, event: ConventionRegistered,
    ) -> Ok[SchemaCreated] | Err[SchemaCreationFailed]:
        try:
            schema = await self.schema_service.create_schema(
                title=event.title,
                fields=event.schema_fields,
            )
        except DomainError as e:
            return Err(SchemaCreationFailed(
                id=EventId(uuid4()),
                convention_srn=event.convention_srn,
                error=str(e),
            ))

        return Ok(SchemaCreated(
            id=EventId(uuid4()),
            convention_srn=event.convention_srn,
            schema_srn=schema.srn,
            field_count=len(event.schema_fields),
        ))


class CreateFeatureTables(
    EventHandler[SchemaCreated, FeatureTablesCreated, FeatureTablesFailed]
):
    convention_repo: ConventionRepository
    feature_service: FeatureService

    async def handle(
        self, event: SchemaCreated,
    ) -> Ok[FeatureTablesCreated] | Err[FeatureTablesFailed]:
        convention = await self.convention_repo.get(event.convention_srn)

        for hook in convention.hooks:
            try:
                await self.feature_service.create_table(hook)
            except ConflictError:
                return Err(FeatureTablesFailed(
                    id=EventId(uuid4()),
                    convention_srn=event.convention_srn,
                    failed_hook=hook.name,
                ))

        return Ok(FeatureTablesCreated(
            id=EventId(uuid4()),
            convention_srn=event.convention_srn,
            table_count=len(convention.hooks),
        ))

Compensation handler

class UndoFeatureTables(
    EventHandler[CompensateFeatureTables, TablesCompensated, CompensationFailed]
):
    convention_repo: ConventionRepository
    feature_service: FeatureService

    async def handle(
        self, event: CompensateFeatureTables,
    ) -> Ok[TablesCompensated] | Err[CompensationFailed]:
        # Compensation event is lean — query repo for current state to undo
        convention = await self.convention_repo.get(event.convention_srn)
        for hook in convention.hooks:
            await self.feature_service.drop_table(hook)
        return Ok(TablesCompensated(
            id=EventId(uuid4()), convention_srn=event.convention_srn,
        ))

Non-saga handler

class IndexPublishedRecord(
    EventHandler[RecordPublished, RecordIndexed, IndexingFailed]
):
    search_service: SearchService

    async def handle(
        self, event: RecordPublished,
    ) -> Ok[RecordIndexed] | Err[IndexingFailed]:
        await self.search_service.index(event.record_srn, event.metadata)
        return Ok(RecordIndexed(
            id=EventId(uuid4()), record_srn=event.record_srn,
        ))

Handler with side-effect events

class CreateConventionSchema(
    EventHandler[ConventionRegistered, SchemaCreated, SchemaCreationFailed]
):
    schema_service: SchemaService
    outbox: Outbox  # for side effects only

    async def handle(
        self, event: ConventionRegistered,
    ) -> Ok[SchemaCreated] | Err[SchemaCreationFailed]:
        schema = await self.schema_service.create_schema(...)

        # Side effect — not saga-significant, emitted directly
        await self.outbox.append(AuditEntry(
            id=EventId(uuid4()),
            action="schema_created",
            resource=str(event.convention_srn),
        ))

        # Saga-significant — returned, validated and emitted by orchestrator
        return Ok(SchemaCreated(
            id=EventId(uuid4()),
            convention_srn=event.convention_srn,
            schema_srn=schema.srn,
            field_count=len(event.schema_fields),
        ))

Event flow

Happy path

ConventionService.create_convention()
  → saves Convention aggregate
  → outbox.append(ConventionRegistered(srn, hooks, schema_fields))

Worker picks up ConventionRegistered
  → dispatches to CreateConventionSchema
  → handler returns Ok(SchemaCreated(srn, schema_srn, field_count))
  → Worker calls orchestrator.handle_result(CreateConventionSchema, result)
  → Orchestrator validates via step.check_success()
  → Orchestrator emits SchemaCreated via outbox (per-step event)
  → Orchestrator creates saga state, marks step 1 done

Worker picks up SchemaCreated
  → dispatches to CreateFeatureTables
  → handler returns Ok(FeatureTablesCreated(srn, table_count))
  → Worker calls orchestrator.handle_result(CreateFeatureTables, result)
  → Orchestrator validates, emits FeatureTablesCreated (per-step event)
  → Orchestrator sees all steps done
  → Orchestrator emits ConventionReady (saga-level event via on_completed)

Failure path

Worker picks up SchemaCreated
  → dispatches to CreateFeatureTables
  → handler returns Err(FeatureTablesFailed(srn, failed_hook))
  → Worker calls orchestrator.handle_result(CreateFeatureTables, result)
  → Orchestrator validates via step.check_failure()
  → Orchestrator emits FeatureTablesFailed (per-step failure event)
  → Orchestrator compensates completed steps in reverse:
    → on_compensate(srn, step="create_schema") → emits CompensateSchema
  → Orchestrator emits ConventionInitFailed (saga-level failure event via on_failed)

Scope

In scope

  • Ok/Err Result types
  • EventHandler[TIn, TSuccess, TFailure] — updated contract for all handlers
  • SagaStep, SagaDef[TTrigger, TId], SagaState, SagaStore, SagaOrchestrator
  • HandlerSagaMapping + build_handler_saga_mapping() startup wiring
  • saga_state table + Alembic migration + PostgresSagaStore adapter
  • SagaOrchestrator.handle_result(handler_type, result) — minimal Worker→Orchestrator interface
  • Worker integration (route saga handler results to orchestrator, emit directly for non-saga)
  • ConventionInit saga definition (first concrete saga)
  • Migrate existing CreateFeatureTables handler to new contract
  • Migrate all existing event handlers to return Ok | Err

Out of scope

  • Timeout detection / stuck saga reaper (follow-up issue)
  • Saga dashboard / observability UI
  • Additional saga definitions beyond ConventionInit

Metadata

Metadata

Assignees

No one assigned

    Labels

    design-neededNeeds architectural discussion before implementationfeatureNew functionalityinfrastructureCI, Docker, deployment, migrations

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions