Skip to content

Effectful-Tech/cruster

Repository files navigation

Cruster

A Rust framework for building distributed, stateful systems with durable workflows.

Status: Under active development. API may change.

Features

  • Stateless Entities -- Addressable RPC handlers with automatic sharding and routing. State is your responsibility (use your database directly).
  • Durable Workflows -- Long-running orchestrations that survive crashes. Activities are journaled and replay automatically.
  • Activity Groups -- Reusable bundles of transactional activities, composable across workflows.
  • RPC Groups -- Reusable bundles of RPC methods, composable across entities.
  • Singletons -- Cluster-wide unique tasks with automatic failover.
  • Cron Scheduling -- Distributed cron jobs running as singletons.
  • gRPC Transport -- Inter-node communication with connection pooling and streaming.

Installation

Prerequisites

Protocol Buffers compiler is required:

# Debian/Ubuntu
sudo apt-get install protobuf-compiler

# macOS
brew install protobuf

Cargo

[dependencies]
cruster = "version"

Quick Start

Defining an Entity

Entities are stateless RPC handlers. You manage state yourself via your database.

use cruster::prelude::*;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

#[derive(Serialize, Deserialize)]
struct IncrementRequest {
    entity_id: String,
    amount: i64,
}

#[derive(Serialize, Deserialize)]
struct GetCounterRequest {
    entity_id: String,
}

#[entity]
#[derive(Clone)]
struct Counter {
    pool: PgPool,
}

#[entity_impl]
impl Counter {
    // Persisted RPC: at-least-once delivery for writes
    #[rpc(persisted)]
    pub async fn increment(&self, request: IncrementRequest) -> Result<i64, ClusterError> {
        let (value,): (i64,) = sqlx::query_as(
            "INSERT INTO counters (id, value) VALUES ($1, $2)
             ON CONFLICT (id) DO UPDATE SET value = counters.value + $2
             RETURNING value"
        )
        .bind(&request.entity_id)
        .bind(request.amount)
        .fetch_one(&self.pool)
        .await
        .map_err(|e| ClusterError::PersistenceError {
            reason: e.to_string(),
            source: Some(Box::new(e)),
        })?;
        Ok(value)
    }

    // Non-persisted RPC: best-effort delivery for reads
    #[rpc]
    pub async fn get(&self, request: GetCounterRequest) -> Result<i64, ClusterError> {
        let result: Option<(i64,)> = sqlx::query_as(
            "SELECT value FROM counters WHERE id = $1"
        )
        .bind(&request.entity_id)
        .fetch_optional(&self.pool)
        .await
        .map_err(|e| ClusterError::PersistenceError {
            reason: e.to_string(),
            source: Some(Box::new(e)),
        })?;
        Ok(result.map(|r| r.0).unwrap_or(0))
    }
}

Using the Entity

use cruster::types::EntityId;

// Register entity and get a typed client (CounterClient is generated by the macro)
let counter = Counter { pool: pool.clone() }
    .register(sharding.clone())
    .await?;

// Call methods -- entity is created on first access, routed by shard
let entity_id = EntityId::new("counter-1");
let value = counter.increment(&entity_id, &IncrementRequest {
    entity_id: "counter-1".into(),
    amount: 5,
}).await?;
assert_eq!(value, 5);

let value = counter.get(&entity_id, &GetCounterRequest {
    entity_id: "counter-1".into(),
}).await?;
assert_eq!(value, 5);

Core Concepts

Entity Method Types

Attribute Purpose Delivery
#[rpc] Best-effort queries/reads At-most-once
#[rpc(persisted)] Durable writes/mutations At-least-once

Both use &self -- entities are stateless. State lives in your database.

Entity Configuration

#[entity(
    name = "user",               // Custom entity type name (default: struct name)
    shard_group = "premium",     // Shard group for isolation
    max_idle_time_secs = 300,    // Eviction timeout
    mailbox_capacity = 50,       // Message queue size
    concurrency = 4,             // Parallel request handling (default: 1)
)]
#[derive(Clone)]
struct User {
    pool: PgPool,
}

Visibility Modifiers

#[entity_impl]
impl MyEntity {
    // Public: on generated client, callable by external callers (default for #[rpc])
    #[rpc]
    pub async fn query(&self, req: QueryRequest) -> Result<Data, ClusterError> { ... }

    // Protected: dispatchable entity-to-entity, but NOT on the generated client
    #[rpc]
    #[protected]
    pub async fn internal_sync(&self, req: SyncRequest) -> Result<(), ClusterError> { ... }

    // Private: not dispatchable, not on client -- internal helper only
    #[rpc]
    #[private]
    async fn validate(&self, req: ValidateRequest) -> Result<bool, ClusterError> { ... }
}

Durable Workflows

Workflows are standalone durable orchestration constructs backed by hidden entities. They have an execute entry point and #[activity] side effects. Activities are journaled -- on crash recovery, completed activities return their cached results.

Defining a Workflow

use cruster::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct ProcessOrderRequest {
    order_id: String,
    item_count: u32,
    amount: i64,
}

#[derive(Serialize, Deserialize)]
struct OrderResult {
    order_id: String,
    status: String,
}

#[workflow]
#[derive(Clone)]
struct OrderWorkflow;

#[workflow_impl(
    key = |req: &ProcessOrderRequest| req.order_id.clone(),
    hash = false,
)]
impl OrderWorkflow {
    async fn execute(&self, request: ProcessOrderRequest) -> Result<OrderResult, ClusterError> {
        // Each activity is journaled. On replay, completed steps return cached results.
        self.reserve_inventory(request.order_id.clone(), request.item_count).await?;
        self.charge_payment(request.order_id.clone(), request.amount).await?;
        self.confirm_order(request.order_id.clone()).await?;

        Ok(OrderResult {
            order_id: request.order_id,
            status: "completed".into(),
        })
    }

    #[activity]
    async fn reserve_inventory(&self, order_id: String, count: u32) -> Result<(), ClusterError> {
        // self.tx is an ActivityTx -- a SQL transaction committed atomically with the journal
        sqlx::query("INSERT INTO reservations (order_id, count) VALUES ($1, $2)")
            .bind(&order_id)
            .bind(count as i64)
            .execute(&self.tx)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: e.to_string(),
                source: Some(Box::new(e)),
            })?;
        Ok(())
    }

    #[activity]
    async fn charge_payment(&self, order_id: String, amount: i64) -> Result<(), ClusterError> {
        sqlx::query("INSERT INTO payments (order_id, amount) VALUES ($1, $2)")
            .bind(&order_id)
            .bind(amount)
            .execute(&self.tx)
            .await?;
        Ok(())
    }

    #[activity]
    async fn confirm_order(&self, order_id: String) -> Result<(), ClusterError> {
        sqlx::query("UPDATE orders SET status = 'confirmed' WHERE id = $1")
            .bind(&order_id)
            .execute(&self.tx)
            .await?;
        Ok(())
    }
}

Using a Workflow

// Register and get typed client (OrderWorkflowClient is generated)
let client = OrderWorkflow.register(sharding.clone()).await?;

// Execute synchronously -- blocks until workflow completes
let result = client.execute(&ProcessOrderRequest {
    order_id: "order-42".into(),
    item_count: 3,
    amount: 9900,
}).await?;

// Start asynchronously -- returns execution ID immediately
let exec_id = client.start(&request).await?;

// Poll for result later
let result: Option<OrderResult> = client.poll(&exec_id).await?;

// Idempotency keys
let result = client.with_key("my-key").execute(&request).await?;       // hashed
let result = client.with_key_raw("raw-id").execute(&request).await?;   // used as-is

self.tx -- Transactional Activities

Inside #[activity] methods, self.tx is an ActivityTx that implements sqlx::Executor. All SQL writes through self.tx are committed atomically with the workflow journal entry. If the activity fails, both the journal and your SQL writes roll back together.

#[activity]
async fn transfer_funds(&self, from: String, to: String, amount: i64) -> Result<(), ClusterError> {
    sqlx::query("UPDATE accounts SET balance = balance - $2 WHERE id = $1")
        .bind(&from)
        .bind(amount)
        .execute(&self.tx)  // committed with journal
        .await?;

    sqlx::query("UPDATE accounts SET balance = balance + $2 WHERE id = $1")
        .bind(&to)
        .bind(amount)
        .execute(&self.tx)  // same transaction
        .await?;

    Ok(())
}

Activity Retries

#[activity(retries = 3)]
async fn call_external_api(&self, url: String) -> Result<String, ClusterError> {
    // Retried up to 3 times with exponential backoff on failure
    ...
}

#[activity(retries = 5, backoff = "constant")]
async fn idempotent_write(&self, data: String) -> Result<(), ClusterError> {
    // Retried with constant backoff
    ...
}

Durable Timers

#[workflow_impl(...)]
impl ReminderWorkflow {
    async fn execute(&self, request: ReminderRequest) -> Result<(), ClusterError> {
        // Durable sleep -- survives crashes, resumes where it left off
        self.sleep(Duration::from_secs(3600)).await?;

        self.send_reminder(request.user_id).await?;
        Ok(())
    }
}

Activity Groups

Reusable bundles of transactional activities that can be composed into multiple workflows.

Defining Activity Groups

#[activity_group]
#[derive(Clone)]
struct Inventory;

#[activity_group_impl]
impl Inventory {
    #[activity]
    async fn reserve(&self, order_id: String, count: u32) -> Result<String, ClusterError> {
        let id = format!("res-{order_id}-{count}");
        sqlx::query("INSERT INTO reservations (id, order_id, count) VALUES ($1, $2, $3)")
            .bind(&id)
            .bind(&order_id)
            .bind(count as i64)
            .execute(&self.tx)
            .await?;
        Ok(id)
    }
}

#[activity_group]
#[derive(Clone)]
struct Payments;

#[activity_group_impl]
impl Payments {
    #[activity]
    async fn charge(&self, order_id: String, amount: i64) -> Result<String, ClusterError> {
        let tx_id = format!("tx-{order_id}");
        sqlx::query("INSERT INTO payments (id, order_id, amount) VALUES ($1, $2, $3)")
            .bind(&tx_id)
            .bind(&order_id)
            .bind(amount)
            .execute(&self.tx)
            .await?;
        Ok(tx_id)
    }
}

Composing into a Workflow

#[workflow]
#[derive(Clone)]
struct OrderWorkflow;

#[workflow_impl(
    key = |req: &ProcessOrderRequest| req.order_id.clone(),
    hash = false,
    activity_groups(Inventory, Payments)
)]
impl OrderWorkflow {
    async fn execute(&self, request: ProcessOrderRequest) -> Result<OrderResult, ClusterError> {
        // Call activities from composed groups
        let reservation_id = self.reserve(request.order_id.clone(), request.item_count).await?;
        let tx_id = self.charge(request.order_id.clone(), request.amount).await?;

        // Local activities work alongside group activities
        self.finalize(request.order_id.clone(), reservation_id, tx_id).await?;

        Ok(OrderResult { ... })
    }

    #[activity]
    async fn finalize(&self, order_id: String, res_id: String, tx_id: String) -> Result<(), ClusterError> {
        sqlx::query("UPDATE orders SET status = 'done', reservation = $2, payment = $3 WHERE id = $1")
            .bind(&order_id)
            .bind(&res_id)
            .bind(&tx_id)
            .execute(&self.tx)
            .await?;
        Ok(())
    }
}

Registration with Activity Groups

let client = OrderWorkflow
    .register(sharding.clone(), Inventory, Payments)
    .await?;

RPC Groups

Reusable bundles of RPC methods that can be composed into multiple entities.

Defining an RPC Group

#[rpc_group]
#[derive(Clone)]
struct Auditable {
    pool: PgPool,
}

#[rpc_group_impl]
impl Auditable {
    #[rpc(persisted)]
    pub async fn log_action(&self, request: LogActionRequest) -> Result<AuditEntry, ClusterError> {
        sqlx::query_as("INSERT INTO audit_log (...) VALUES (...) RETURNING *")
            .bind(&request.action)
            .fetch_one(&self.pool)
            .await
            .map_err(|e| ClusterError::PersistenceError {
                reason: e.to_string(),
                source: Some(Box::new(e)),
            })
    }

    #[rpc]
    pub async fn get_audit_log(&self, request: GetAuditLogRequest) -> Result<Vec<AuditEntry>, ClusterError> {
        ...
    }
}

Composing into an Entity

#[entity]
#[derive(Clone)]
struct User {
    pool: PgPool,
}

#[entity_impl(rpc_groups(Auditable))]
impl User {
    #[rpc(persisted)]
    pub async fn update_email(&self, request: UpdateEmailRequest) -> Result<(), ClusterError> {
        ...
    }
}

Registration with RPC Groups

// Pass RPC group instances during registration
let client = User { pool: pool.clone() }
    .register(sharding.clone(), Auditable { pool: pool.clone() })
    .await?;

// Generated client includes both entity methods and RPC group methods
client.update_email(&entity_id, &request).await?;
client.log_action(&entity_id, &log_request).await?;
client.get_audit_log(&entity_id, &query).await?;

Singletons

Cluster-wide unique tasks. If the owning node dies, another takes over.

use cruster::singleton::{register_singleton, SingletonContext};

register_singleton(sharding.as_ref(), "leader-election", |ctx: SingletonContext| async move {
    // Opt-in to graceful shutdown by calling ctx.cancellation()
    let cancel = ctx.cancellation();

    loop {
        tokio::select! {
            _ = cancel.cancelled() => {
                // Graceful shutdown -- clean up
                break;
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                perform_leader_duties().await;
            }
        }
    }
    Ok(())
}).await?;

If a singleton does not call ctx.cancellation(), it is force-cancelled on shutdown.

Builder API

use cruster::singleton::singleton;

singleton("metrics-collector", |ctx: SingletonContext| async move {
    ...
    Ok(())
})
.register(sharding.as_ref())
.await?;

Cron Jobs

Distributed cron jobs that run as cluster singletons:

use cruster::cron::{ClusterCron, CronSchedule};

let cron = ClusterCron::new(
    "cleanup-job",
    CronSchedule::parse("0 */5 * * * * *").unwrap(), // every 5 minutes
    |scheduled_time| Box::pin(async move {
        // Perform cleanup
        Ok(())
    }),
)
.with_calculate_next_from_previous(true)
.with_skip_if_older_than(Duration::from_secs(3600));

cron.register(sharding.as_ref()).await?;

Client Methods

The generated typed clients provide ergonomic methods. The underlying EntityClient supports:

Method Delivery Use Case
send(id, tag, req) Best-effort, await reply Reads
send_persisted(id, tag, req, uninterruptible) At-least-once, await reply Writes
notify(id, tag, req) Best-effort, fire-and-forget Events
notify_persisted(id, tag, req) At-least-once, fire-and-forget Durable events
send_at(id, tag, req, deliver_at) Scheduled delivery Timers
notify_at(id, tag, req, deliver_at) Scheduled fire-and-forget Scheduled events
send_stream(id, tag, req) Streaming response Large result sets

Production Deployment

Cluster Setup

use cruster::config::ShardingConfig;
use cruster::metrics::ClusterMetrics;
use cruster::sharding_impl::ShardingImpl;
use cruster::storage::etcd_runner::EtcdRunnerStorage;
use cruster::storage::sql_message::SqlMessageStorage;
use cruster::storage::sql_workflow::SqlWorkflowStorage;
use cruster::storage::sql_workflow_engine::SqlWorkflowEngine;
use cruster::transport::grpc::{GrpcRunnerHealth, GrpcRunnerServer, GrpcRunners};

// 1. Storage backends
let pool = PgPoolOptions::new().max_connections(10).connect(&postgres_url).await?;
let message_storage = Arc::new(SqlMessageStorage::new(pool.clone()));
message_storage.migrate().await?;
let state_storage = Arc::new(SqlWorkflowStorage::new(pool.clone()));
let workflow_engine = Arc::new(SqlWorkflowEngine::new(pool.clone()));

// 2. Runner discovery via etcd
let etcd_client = etcd_client::Client::connect(endpoints, None).await?;
let runner_storage = Arc::new(EtcdRunnerStorage::new(etcd_client, "/my-cluster/", 30));

// 3. gRPC transport
let grpc_runners = Arc::new(GrpcRunners::new());
let runner_health = Arc::new(GrpcRunnerHealth::new(grpc_runners.clone()));

// 4. Configure and create
let config = Arc::new(ShardingConfig {
    runner_address: "10.0.0.1:9000".parse()?,
    shards_per_group: 300,
    ..Default::default()
});

let sharding = ShardingImpl::new_with_engines(
    config,
    grpc_runners,
    Some(runner_storage),
    Some(runner_health),
    Some(message_storage),
    Some(state_storage),
    Some(workflow_engine),
    Arc::new(ClusterMetrics::unregistered()),
)?;

// 5. Register entities and workflows
let counter = Counter { pool: pool.clone() }.register(sharding.clone()).await?;
let workflow = OrderWorkflow.register(sharding.clone()).await?;

// 6. Start background loops + gRPC server
sharding.start().await?;

let grpc_server = GrpcRunnerServer::new(sharding.clone());
tonic::transport::Server::builder()
    .add_service(grpc_server.into_service())
    .serve(grpc_addr)
    .await?;

Infrastructure Requirements

Component Purpose
PostgreSQL Message storage, workflow journals, activity transactions
etcd Runner discovery, shard locks, health monitoring

Configuration Reference

Option Default Description
shards_per_group 300 Shards per shard group
entity_max_idle_time 60s Idle timeout before eviction
entity_mailbox_capacity 100 Per-entity message queue size
entity_max_concurrent_requests 1 Concurrent requests per entity (0 = unbounded)
storage_poll_interval 500ms Message storage polling frequency
storage_message_max_retries 10 Max delivery attempts before dead-letter
runner_lock_ttl 30s Runner lock TTL in etcd
send_retry_count 3 Retries on routing failures during rebalancing
singleton_crash_backoff_base 1s Base backoff for singleton crash recovery
detachment_enabled false Enable automatic detachment on storage errors

Architecture

┌─────────────────────────────────────────────────────────┐
│                       Clients                           │
└─────────────────────────┬───────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────┐
│                   Sharding Layer                        │
│         Rendezvous hashing + request routing            │
└─────────────────────────┬───────────────────────────────┘
                          │
          ┌───────────────┼───────────────┐
          ▼               ▼               ▼
    ┌──────────┐    ┌──────────┐    ┌──────────┐
    │ Runner 1 │    │ Runner 2 │    │ Runner 3 │
    │ Shards   │    │ Shards   │    │ Shards   │
    │  0-99    │    │ 100-199  │    │ 200-299  │
    └────┬─────┘    └────┬─────┘    └────┬─────┘
         │               │               │
         └───────────────┴───────────────┘
                         │
         ┌───────────────┴───────────────┐
         ▼                               ▼
   ┌───────────┐                   ┌───────────┐
   │ PostgreSQL│                   │   etcd    │
   │ - Messages│                   │ - Runners │
   │ - Journals│                   │ - Locks   │
   │ - Tx data │                   │ - Health  │
   └───────────┘                   └───────────┘

Examples

See examples/ for a complete working example:

  • cluster-tests -- End-to-end test suite covering entities, workflows, RPC groups, activity groups, singletons, and timers.

License

MIT OR Apache-2.0

About

A Rust framework for building distributed, stateful entity systems with durable workflows

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages