A Rust framework for building distributed, stateful systems with durable workflows.
Status: Under active development. API may change.
- Stateless Entities -- Addressable RPC handlers with automatic sharding and routing. State is your responsibility (use your database directly).
- Durable Workflows -- Long-running orchestrations that survive crashes. Activities are journaled and replay automatically.
- Activity Groups -- Reusable bundles of transactional activities, composable across workflows.
- RPC Groups -- Reusable bundles of RPC methods, composable across entities.
- Singletons -- Cluster-wide unique tasks with automatic failover.
- Cron Scheduling -- Distributed cron jobs running as singletons.
- gRPC Transport -- Inter-node communication with connection pooling and streaming.
Protocol Buffers compiler is required:
# Debian/Ubuntu
sudo apt-get install protobuf-compiler
# macOS
brew install protobuf[dependencies]
cruster = "version"Entities are stateless RPC handlers. You manage state yourself via your database.
use cruster::prelude::*;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
#[derive(Serialize, Deserialize)]
struct IncrementRequest {
entity_id: String,
amount: i64,
}
#[derive(Serialize, Deserialize)]
struct GetCounterRequest {
entity_id: String,
}
#[entity]
#[derive(Clone)]
struct Counter {
pool: PgPool,
}
#[entity_impl]
impl Counter {
// Persisted RPC: at-least-once delivery for writes
#[rpc(persisted)]
pub async fn increment(&self, request: IncrementRequest) -> Result<i64, ClusterError> {
let (value,): (i64,) = sqlx::query_as(
"INSERT INTO counters (id, value) VALUES ($1, $2)
ON CONFLICT (id) DO UPDATE SET value = counters.value + $2
RETURNING value"
)
.bind(&request.entity_id)
.bind(request.amount)
.fetch_one(&self.pool)
.await
.map_err(|e| ClusterError::PersistenceError {
reason: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(value)
}
// Non-persisted RPC: best-effort delivery for reads
#[rpc]
pub async fn get(&self, request: GetCounterRequest) -> Result<i64, ClusterError> {
let result: Option<(i64,)> = sqlx::query_as(
"SELECT value FROM counters WHERE id = $1"
)
.bind(&request.entity_id)
.fetch_optional(&self.pool)
.await
.map_err(|e| ClusterError::PersistenceError {
reason: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(result.map(|r| r.0).unwrap_or(0))
}
}use cruster::types::EntityId;
// Register entity and get a typed client (CounterClient is generated by the macro)
let counter = Counter { pool: pool.clone() }
.register(sharding.clone())
.await?;
// Call methods -- entity is created on first access, routed by shard
let entity_id = EntityId::new("counter-1");
let value = counter.increment(&entity_id, &IncrementRequest {
entity_id: "counter-1".into(),
amount: 5,
}).await?;
assert_eq!(value, 5);
let value = counter.get(&entity_id, &GetCounterRequest {
entity_id: "counter-1".into(),
}).await?;
assert_eq!(value, 5);| Attribute | Purpose | Delivery |
|---|---|---|
#[rpc] |
Best-effort queries/reads | At-most-once |
#[rpc(persisted)] |
Durable writes/mutations | At-least-once |
Both use &self -- entities are stateless. State lives in your database.
#[entity(
name = "user", // Custom entity type name (default: struct name)
shard_group = "premium", // Shard group for isolation
max_idle_time_secs = 300, // Eviction timeout
mailbox_capacity = 50, // Message queue size
concurrency = 4, // Parallel request handling (default: 1)
)]
#[derive(Clone)]
struct User {
pool: PgPool,
}#[entity_impl]
impl MyEntity {
// Public: on generated client, callable by external callers (default for #[rpc])
#[rpc]
pub async fn query(&self, req: QueryRequest) -> Result<Data, ClusterError> { ... }
// Protected: dispatchable entity-to-entity, but NOT on the generated client
#[rpc]
#[protected]
pub async fn internal_sync(&self, req: SyncRequest) -> Result<(), ClusterError> { ... }
// Private: not dispatchable, not on client -- internal helper only
#[rpc]
#[private]
async fn validate(&self, req: ValidateRequest) -> Result<bool, ClusterError> { ... }
}Workflows are standalone durable orchestration constructs backed by hidden entities. They have an execute entry point and #[activity] side effects. Activities are journaled -- on crash recovery, completed activities return their cached results.
use cruster::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct ProcessOrderRequest {
order_id: String,
item_count: u32,
amount: i64,
}
#[derive(Serialize, Deserialize)]
struct OrderResult {
order_id: String,
status: String,
}
#[workflow]
#[derive(Clone)]
struct OrderWorkflow;
#[workflow_impl(
key = |req: &ProcessOrderRequest| req.order_id.clone(),
hash = false,
)]
impl OrderWorkflow {
async fn execute(&self, request: ProcessOrderRequest) -> Result<OrderResult, ClusterError> {
// Each activity is journaled. On replay, completed steps return cached results.
self.reserve_inventory(request.order_id.clone(), request.item_count).await?;
self.charge_payment(request.order_id.clone(), request.amount).await?;
self.confirm_order(request.order_id.clone()).await?;
Ok(OrderResult {
order_id: request.order_id,
status: "completed".into(),
})
}
#[activity]
async fn reserve_inventory(&self, order_id: String, count: u32) -> Result<(), ClusterError> {
// self.tx is an ActivityTx -- a SQL transaction committed atomically with the journal
sqlx::query("INSERT INTO reservations (order_id, count) VALUES ($1, $2)")
.bind(&order_id)
.bind(count as i64)
.execute(&self.tx)
.await
.map_err(|e| ClusterError::PersistenceError {
reason: e.to_string(),
source: Some(Box::new(e)),
})?;
Ok(())
}
#[activity]
async fn charge_payment(&self, order_id: String, amount: i64) -> Result<(), ClusterError> {
sqlx::query("INSERT INTO payments (order_id, amount) VALUES ($1, $2)")
.bind(&order_id)
.bind(amount)
.execute(&self.tx)
.await?;
Ok(())
}
#[activity]
async fn confirm_order(&self, order_id: String) -> Result<(), ClusterError> {
sqlx::query("UPDATE orders SET status = 'confirmed' WHERE id = $1")
.bind(&order_id)
.execute(&self.tx)
.await?;
Ok(())
}
}// Register and get typed client (OrderWorkflowClient is generated)
let client = OrderWorkflow.register(sharding.clone()).await?;
// Execute synchronously -- blocks until workflow completes
let result = client.execute(&ProcessOrderRequest {
order_id: "order-42".into(),
item_count: 3,
amount: 9900,
}).await?;
// Start asynchronously -- returns execution ID immediately
let exec_id = client.start(&request).await?;
// Poll for result later
let result: Option<OrderResult> = client.poll(&exec_id).await?;
// Idempotency keys
let result = client.with_key("my-key").execute(&request).await?; // hashed
let result = client.with_key_raw("raw-id").execute(&request).await?; // used as-isInside #[activity] methods, self.tx is an ActivityTx that implements sqlx::Executor. All SQL writes through self.tx are committed atomically with the workflow journal entry. If the activity fails, both the journal and your SQL writes roll back together.
#[activity]
async fn transfer_funds(&self, from: String, to: String, amount: i64) -> Result<(), ClusterError> {
sqlx::query("UPDATE accounts SET balance = balance - $2 WHERE id = $1")
.bind(&from)
.bind(amount)
.execute(&self.tx) // committed with journal
.await?;
sqlx::query("UPDATE accounts SET balance = balance + $2 WHERE id = $1")
.bind(&to)
.bind(amount)
.execute(&self.tx) // same transaction
.await?;
Ok(())
}#[activity(retries = 3)]
async fn call_external_api(&self, url: String) -> Result<String, ClusterError> {
// Retried up to 3 times with exponential backoff on failure
...
}
#[activity(retries = 5, backoff = "constant")]
async fn idempotent_write(&self, data: String) -> Result<(), ClusterError> {
// Retried with constant backoff
...
}#[workflow_impl(...)]
impl ReminderWorkflow {
async fn execute(&self, request: ReminderRequest) -> Result<(), ClusterError> {
// Durable sleep -- survives crashes, resumes where it left off
self.sleep(Duration::from_secs(3600)).await?;
self.send_reminder(request.user_id).await?;
Ok(())
}
}Reusable bundles of transactional activities that can be composed into multiple workflows.
#[activity_group]
#[derive(Clone)]
struct Inventory;
#[activity_group_impl]
impl Inventory {
#[activity]
async fn reserve(&self, order_id: String, count: u32) -> Result<String, ClusterError> {
let id = format!("res-{order_id}-{count}");
sqlx::query("INSERT INTO reservations (id, order_id, count) VALUES ($1, $2, $3)")
.bind(&id)
.bind(&order_id)
.bind(count as i64)
.execute(&self.tx)
.await?;
Ok(id)
}
}
#[activity_group]
#[derive(Clone)]
struct Payments;
#[activity_group_impl]
impl Payments {
#[activity]
async fn charge(&self, order_id: String, amount: i64) -> Result<String, ClusterError> {
let tx_id = format!("tx-{order_id}");
sqlx::query("INSERT INTO payments (id, order_id, amount) VALUES ($1, $2, $3)")
.bind(&tx_id)
.bind(&order_id)
.bind(amount)
.execute(&self.tx)
.await?;
Ok(tx_id)
}
}#[workflow]
#[derive(Clone)]
struct OrderWorkflow;
#[workflow_impl(
key = |req: &ProcessOrderRequest| req.order_id.clone(),
hash = false,
activity_groups(Inventory, Payments)
)]
impl OrderWorkflow {
async fn execute(&self, request: ProcessOrderRequest) -> Result<OrderResult, ClusterError> {
// Call activities from composed groups
let reservation_id = self.reserve(request.order_id.clone(), request.item_count).await?;
let tx_id = self.charge(request.order_id.clone(), request.amount).await?;
// Local activities work alongside group activities
self.finalize(request.order_id.clone(), reservation_id, tx_id).await?;
Ok(OrderResult { ... })
}
#[activity]
async fn finalize(&self, order_id: String, res_id: String, tx_id: String) -> Result<(), ClusterError> {
sqlx::query("UPDATE orders SET status = 'done', reservation = $2, payment = $3 WHERE id = $1")
.bind(&order_id)
.bind(&res_id)
.bind(&tx_id)
.execute(&self.tx)
.await?;
Ok(())
}
}let client = OrderWorkflow
.register(sharding.clone(), Inventory, Payments)
.await?;Reusable bundles of RPC methods that can be composed into multiple entities.
#[rpc_group]
#[derive(Clone)]
struct Auditable {
pool: PgPool,
}
#[rpc_group_impl]
impl Auditable {
#[rpc(persisted)]
pub async fn log_action(&self, request: LogActionRequest) -> Result<AuditEntry, ClusterError> {
sqlx::query_as("INSERT INTO audit_log (...) VALUES (...) RETURNING *")
.bind(&request.action)
.fetch_one(&self.pool)
.await
.map_err(|e| ClusterError::PersistenceError {
reason: e.to_string(),
source: Some(Box::new(e)),
})
}
#[rpc]
pub async fn get_audit_log(&self, request: GetAuditLogRequest) -> Result<Vec<AuditEntry>, ClusterError> {
...
}
}#[entity]
#[derive(Clone)]
struct User {
pool: PgPool,
}
#[entity_impl(rpc_groups(Auditable))]
impl User {
#[rpc(persisted)]
pub async fn update_email(&self, request: UpdateEmailRequest) -> Result<(), ClusterError> {
...
}
}// Pass RPC group instances during registration
let client = User { pool: pool.clone() }
.register(sharding.clone(), Auditable { pool: pool.clone() })
.await?;
// Generated client includes both entity methods and RPC group methods
client.update_email(&entity_id, &request).await?;
client.log_action(&entity_id, &log_request).await?;
client.get_audit_log(&entity_id, &query).await?;Cluster-wide unique tasks. If the owning node dies, another takes over.
use cruster::singleton::{register_singleton, SingletonContext};
register_singleton(sharding.as_ref(), "leader-election", |ctx: SingletonContext| async move {
// Opt-in to graceful shutdown by calling ctx.cancellation()
let cancel = ctx.cancellation();
loop {
tokio::select! {
_ = cancel.cancelled() => {
// Graceful shutdown -- clean up
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
perform_leader_duties().await;
}
}
}
Ok(())
}).await?;If a singleton does not call ctx.cancellation(), it is force-cancelled on shutdown.
use cruster::singleton::singleton;
singleton("metrics-collector", |ctx: SingletonContext| async move {
...
Ok(())
})
.register(sharding.as_ref())
.await?;Distributed cron jobs that run as cluster singletons:
use cruster::cron::{ClusterCron, CronSchedule};
let cron = ClusterCron::new(
"cleanup-job",
CronSchedule::parse("0 */5 * * * * *").unwrap(), // every 5 minutes
|scheduled_time| Box::pin(async move {
// Perform cleanup
Ok(())
}),
)
.with_calculate_next_from_previous(true)
.with_skip_if_older_than(Duration::from_secs(3600));
cron.register(sharding.as_ref()).await?;The generated typed clients provide ergonomic methods. The underlying EntityClient supports:
| Method | Delivery | Use Case |
|---|---|---|
send(id, tag, req) |
Best-effort, await reply | Reads |
send_persisted(id, tag, req, uninterruptible) |
At-least-once, await reply | Writes |
notify(id, tag, req) |
Best-effort, fire-and-forget | Events |
notify_persisted(id, tag, req) |
At-least-once, fire-and-forget | Durable events |
send_at(id, tag, req, deliver_at) |
Scheduled delivery | Timers |
notify_at(id, tag, req, deliver_at) |
Scheduled fire-and-forget | Scheduled events |
send_stream(id, tag, req) |
Streaming response | Large result sets |
use cruster::config::ShardingConfig;
use cruster::metrics::ClusterMetrics;
use cruster::sharding_impl::ShardingImpl;
use cruster::storage::etcd_runner::EtcdRunnerStorage;
use cruster::storage::sql_message::SqlMessageStorage;
use cruster::storage::sql_workflow::SqlWorkflowStorage;
use cruster::storage::sql_workflow_engine::SqlWorkflowEngine;
use cruster::transport::grpc::{GrpcRunnerHealth, GrpcRunnerServer, GrpcRunners};
// 1. Storage backends
let pool = PgPoolOptions::new().max_connections(10).connect(&postgres_url).await?;
let message_storage = Arc::new(SqlMessageStorage::new(pool.clone()));
message_storage.migrate().await?;
let state_storage = Arc::new(SqlWorkflowStorage::new(pool.clone()));
let workflow_engine = Arc::new(SqlWorkflowEngine::new(pool.clone()));
// 2. Runner discovery via etcd
let etcd_client = etcd_client::Client::connect(endpoints, None).await?;
let runner_storage = Arc::new(EtcdRunnerStorage::new(etcd_client, "/my-cluster/", 30));
// 3. gRPC transport
let grpc_runners = Arc::new(GrpcRunners::new());
let runner_health = Arc::new(GrpcRunnerHealth::new(grpc_runners.clone()));
// 4. Configure and create
let config = Arc::new(ShardingConfig {
runner_address: "10.0.0.1:9000".parse()?,
shards_per_group: 300,
..Default::default()
});
let sharding = ShardingImpl::new_with_engines(
config,
grpc_runners,
Some(runner_storage),
Some(runner_health),
Some(message_storage),
Some(state_storage),
Some(workflow_engine),
Arc::new(ClusterMetrics::unregistered()),
)?;
// 5. Register entities and workflows
let counter = Counter { pool: pool.clone() }.register(sharding.clone()).await?;
let workflow = OrderWorkflow.register(sharding.clone()).await?;
// 6. Start background loops + gRPC server
sharding.start().await?;
let grpc_server = GrpcRunnerServer::new(sharding.clone());
tonic::transport::Server::builder()
.add_service(grpc_server.into_service())
.serve(grpc_addr)
.await?;| Component | Purpose |
|---|---|
| PostgreSQL | Message storage, workflow journals, activity transactions |
| etcd | Runner discovery, shard locks, health monitoring |
| Option | Default | Description |
|---|---|---|
shards_per_group |
300 | Shards per shard group |
entity_max_idle_time |
60s | Idle timeout before eviction |
entity_mailbox_capacity |
100 | Per-entity message queue size |
entity_max_concurrent_requests |
1 | Concurrent requests per entity (0 = unbounded) |
storage_poll_interval |
500ms | Message storage polling frequency |
storage_message_max_retries |
10 | Max delivery attempts before dead-letter |
runner_lock_ttl |
30s | Runner lock TTL in etcd |
send_retry_count |
3 | Retries on routing failures during rebalancing |
singleton_crash_backoff_base |
1s | Base backoff for singleton crash recovery |
detachment_enabled |
false | Enable automatic detachment on storage errors |
┌─────────────────────────────────────────────────────────┐
│ Clients │
└─────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Sharding Layer │
│ Rendezvous hashing + request routing │
└─────────────────────────┬───────────────────────────────┘
│
┌───────────────┼───────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Runner 1 │ │ Runner 2 │ │ Runner 3 │
│ Shards │ │ Shards │ │ Shards │
│ 0-99 │ │ 100-199 │ │ 200-299 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└───────────────┴───────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌───────────┐ ┌───────────┐
│ PostgreSQL│ │ etcd │
│ - Messages│ │ - Runners │
│ - Journals│ │ - Locks │
│ - Tx data │ │ - Health │
└───────────┘ └───────────┘
See examples/ for a complete working example:
- cluster-tests -- End-to-end test suite covering entities, workflows, RPC groups, activity groups, singletons, and timers.
MIT OR Apache-2.0