Skip to content

shakacode/messagebus

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

71 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Message Bus

Async Message Bus for Rust

Inspired by Actix

Basics

  1. Can deliver messages between actors using receivers (usually a queue implementations)
  2. Messages distincts and delivers by TypeId
  3. Messages delivers ether in a broadcast fashion to many receivers (Cloned) or addressed by recevier id, balanced (depends on queue load) or random
  4. There are different kind of receivers implemented:
  • BufferUnordered Receiver (sync and async)
  • Synchronized (sync and async)
  • BatchedBufferUnordered Receiver (sync and async)
  • BatchedSynchronized (sync and async)
  1. Request/response api. There is an example is demo_req_resp.rs
  2. Task grouping - assign group IDs to messages and wait for group completion

Task Grouping

Task grouping allows you to track related messages and wait for all tasks in a group to complete. This is useful for job processing, batch operations, or any scenario where you need to know when a set of related tasks has finished.

Basic Usage

use messagebus::{Bus, GroupId, derive::Message};

// Define a message with a group_id
#[derive(Debug, Clone, Message)]
#[group_id(self.job_id)]
struct ProcessJob {
    job_id: i64,
    task_name: String,
}

async fn process_job(bus: &Bus) {
    let job_id: GroupId = 1001;

    // Send multiple messages with the same group_id
    bus.send(ProcessJob { job_id, task_name: "Task A".into() }).await.unwrap();
    bus.send(ProcessJob { job_id, task_name: "Task B".into() }).await.unwrap();

    // Wait for all tasks in the group to complete
    bus.flush_group(job_id).await;

    // Check if group is idle
    assert!(bus.is_group_idle(job_id));
}

Group ID Propagation

Group IDs propagate to child messages - when a handler sends new messages, they inherit the parent task's group ID unless the child message has its own #[group_id] attribute.

Flush Methods

There are three flush methods for groups, each with different use cases:

Method Use Case
flush_group(id) Flushes partial batches and waits for completion. Simple and sufficient for most cases.
flush_and_sync_group(id, force) Flushes, waits, then syncs receivers. Use for cascading messages or when you need sync.
flush_current_group() For use within handlers to flush child messages without blocking.

Use flush_and_sync_group() when handlers send cascading messages (handler A sends to handler B) that need the two-pass flush strategy to avoid deadlocks.

// Simple case - works for all handler types including batched:
bus.flush_group(job_id).await;

// For cascading messages or when you need sync:
bus.flush_and_sync_group(job_id, false).await;

// The `force` parameter skips waiting and proceeds directly to sync:
bus.flush_and_sync_group(job_id, true).await;  // Use when you know group is already idle

Triggering Child Message Processing from Within Handlers

When a handler sends child messages to batched receivers, those messages may sit in a buffer until the batch is full. Use flush_current_group() to trigger processing of buffered child messages:

#[async_trait]
impl AsyncHandler<ParentMessage> for MyHandler {
    async fn handle(&self, msg: ParentMessage, bus: &Bus) -> Result<(), Error> {
        // Send child messages (they inherit the parent's group_id)
        for item in msg.items {
            bus.send(ChildMessage { data: item }).await?;
        }

        // Trigger flush of child messages (non-blocking)
        bus.flush_current_group().await;

        Ok(())
    }
}

Important: flush_current_group() triggers non-blocking flushes via flush_nowait() - it does not wait for child messages to complete. This is intentional to prevent deadlocks (the calling handler is itself counted in processing_count). The actual waiting for all group messages to complete happens when the caller uses flush_group() or flush_and_sync_group() after sending the initial messages.

Batch Handler Behavior

Batched handlers have special group handling:

  • Messages are batched per group_id - messages from different groups are never mixed in the same batch
  • Each message's group counter is properly decremented after batch processing
  • Child messages sent via bus.send() from within a batch handler inherit the batch's group_id (unless they define their own #[group_id])

Group Cleanup

Groups remain in memory until explicitly removed. For long-running applications, clean up completed groups:

use messagebus::GroupRemovalResult;

// Safe removal - only removes if idle
match bus.remove_group(job_id) {
    GroupRemovalResult::Removed => println!("Group cleaned up"),
    GroupRemovalResult::NotIdle => println!("Group still has in-flight tasks"),
    GroupRemovalResult::NotFound => println!("Group doesn't exist"),
}

// Force removal (use with caution - can cause tracking issues)
bus.force_remove_group(job_id);

// Monitor tracked groups
let count = bus.tracked_group_count();

Utility Methods

// Check if a group has no in-flight tasks
bus.is_group_idle(group_id);

// Get current in-flight task count for a group
bus.group_processing_count(group_id);

// Get the current task's group ID from within a handler
Bus::current_group_id();

There is an example at demo_groups.rs

Here are the list of implmented handler kinds:

pub trait Handler<M: Message>: Send + Sync {
    type Error: StdSyncSendError;
    type Response: Message;

    fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
    fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

#[async_trait]
pub trait AsyncHandler<M: Message>: Send + Sync {
    type Error: StdSyncSendError;
    type Response: Message;

    async fn handle(&self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
    async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

pub trait SynchronizedHandler<M: Message>: Send {
    type Error: StdSyncSendError;
    type Response: Message;

    fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
    fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

#[async_trait]
pub trait AsyncSynchronizedHandler<M: Message>: Send {
    type Error: StdSyncSendError;
    type Response: Message;

    async fn handle(&mut self, msg: M, bus: &Bus) -> Result<Self::Response, Self::Error>;
    async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

pub trait BatchHandler<M: Message>: Send + Sync {
    type Error: StdSyncSendError + Clone;
    type Response: Message;
    type InBatch: FromIterator<M> + Send;
    type OutBatch: IntoIterator<Item = Self::Response> + Send;

    fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
    fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

#[async_trait]
pub trait AsyncBatchHandler<M: Message>: Send + Sync {
    type Error: StdSyncSendError + Clone;
    type Response: Message;
    type InBatch: FromIterator<M> + Send;
    type OutBatch: IntoIterator<Item = Self::Response> + Send;

    async fn handle(&self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
    async fn sync(&self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

pub trait BatchSynchronizedHandler<M: Message>: Send {
    type Error: StdSyncSendError + Clone;
    type Response: Message;
    type InBatch: FromIterator<M> + Send;
    type OutBatch: IntoIterator<Item = Self::Response> + Send;

    fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
    fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}

#[async_trait]
pub trait AsyncBatchSynchronizedHandler<M: Message>: Send {
    type Error: StdSyncSendError + Clone;
    type Response: Message;
    type InBatch: FromIterator<M> + Send;
    type OutBatch: IntoIterator<Item = Self::Response> + Send;

    async fn handle(&mut self, msg: Self::InBatch, bus: &Bus) -> Result<Self::OutBatch, Self::Error>;
    async fn sync(&mut self, _bus: &Bus) -> Result<(), Self::Error> {
        Ok(())
    }
}
  1. Implemented handler kinds:

    1. No Synchronization needed (Handler implements Send and Sync)
      • Not batched operations
        • sync (spawn_blocking)
        • async (spawn)
      • Batched
        • sync (spawn_blocking)
        • async (spawn)
    2. Synchronization needed (Handler implements only Send but not implements Sync)
      • Not batched operations
        • sync (spawn_blocking)
        • async (spawn)
      • Batched
        • sync (spawn_blocking)
        • async (spawn)
  2. Not yet implemented handler kinds:

    1. Synchronization needed and thread dedicated (Handler is !Sync and !Send)
      • Not batched operations
        • sync (spawn_blocking)
        • async (spawn)
      • Batched
        • sync (spawn_blocking)
        • async (spawn)
  3. Example:

use messagebus::{error::Error, receivers, AsyncHandler, Bus};
use async_trait::async_trait;

struct TmpReceiver;

#[async_trait]
impl AsyncHandler<i32> for TmpReceiver {
    type Error = Error;
    type Response = ();

    async fn handle(&self, msg: i32, bus: &Bus) -> Result<Self::Response, Self::Error> {
        println!("---> i32 {}", msg);

        bus.send(2i64).await?;

        Ok(())
    }
}

#[async_trait]
impl AsyncHandler<i64> for TmpReceiver {
    type Error = Error;
    type Response = ();

    async fn handle(&self, msg: i64, _bus: &Bus) -> Result<Self::Response, Self::Error> {
        println!("---> i64 {}", msg);

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let (b, poller) = Bus::build()
        .register(TmpReceiver)
            .subscribe::<i32, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
            .subscribe::<i64, receivers::BufferUnorderedAsync<_>, _, _>(8, Default::default())
            .done()
        .build();

    b.send(1i32).await.unwrap();

    println!("flush");
    b.flush().await;

    println!("close");
    b.close().await;

    println!("closed");

    poller.await;
    println!("[done]");
}

About

Message bus - enables async communication between different parts of application using messages

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages