A Rust framework for building parallel execution engines with dependency management, type-safe method dispatch, and event observation.
Run the following Cargo command in your project directory:
cargo add sandl
Or add the following line to your Cargo.toml:
sandl = "0.1.0"
use sandl::*;
fn main() -> Result<()> {
// Define a computational layer with a method
let compute = Layer::builder("L_compute")
.method("M_double")
.args::<i64>()
.bind(|&x, _ctx| Ok(value!(x * 2)))
.build();
// Creates slices that call methods with some args
let mut slices: Vec<Slice> = vec![];
for i in 0..1000000 {
let slice_name = format!("task_{}", i);
let slice = Slice::builder(slice_name)
.layer("L_compute", |m| m.call("M_double", i))
.build();
slices.push(slice);
}
// Configure the engine
let config = EngineConfig::new().batch_size(1000);
// Build it
let engine = Engine::builder()
.add_layer(compute)
.add_slices(&mut slices)
.config(config)
.build()?;
// Run with flags of choice
let results = engine.run(RunFlags::default());
Ok(())
}sandl enables you to define reusable computational Layers with typed methods, compose them into execution Slices, and run them in parallel with automatic dependency resolution. It's designed for scenarios where you need to:
- Execute the same operations across many similar workloads in parallel
- Manage complex dependencies between computation stages
- Maintain type safety across dynamic execution boundaries
- Observe and monitor execution progress
Some use cases:
- Data pipeline processing
- Parallel API request handling
- Batch image processing
- Monte-Carlo simulations
- Generic ETL operations
A Layer is a collection of independent methods that perform related operations. Each method has:
- A name
- Type-safe arguments (via the
FromValue/ToValuetraits) - Optional default arguments
- An implementation function
#[derive(Args)]
struct TransformArgs {
value: f64
}
let process_layer = Layer::builder("process")
.method("transform")
// The type is optional
.args_with_default<TransformArgs>(value!({ "value": 16 })) // Defaults are optional, just call args()
.bind(|args, _ctx| {
let x = args.value * 2; // Type-safe!
Ok(value!({ "value": x })) // { "value": 32 }
})
.build();Slices have access to a thread-safe get/set context by default. For methods that don't need it, use bind_pure:
let process_layer = Layer::builder("process")
.method("transform")
// The type is optional
.args_with_default<TransformArgs>(value!({ "value": 16 }))
.bind_pure(|args| { // Transform is a pure function!
let x = args.value * 2;
Ok(value!({ "value": x }))
})
.build();A Slice specifies which layer methods to execute and with what arguments. Slices are the units of work that get executed in parallel:
let slice = Slice::builder("slice")
.layer("process", |methods| {
methods.call("transform", value!({ "value": 64 })) // On the "process" layer, call "transform" with these args.
}) // If you have more than one layer and/or method,
// slices will only interact with the ones you configure them to.
.build();Default arguments can be overridden at the slice level with automatic merging for object types:
// Layer defines defaults
.args_with_default(value!({ "timeout": 30, "retries": 3 }))
// Slice overrides one field
.call("fetch", value!({ "retries": 5 }))
// Merged: { "timeout": 30, "retries": 5 }The Engine orchestrates execution:
- Registers layers and slices
- Manages layer dependencies
- Executes slices in parallel using rayon
- Performs topological sorting for proper execution order
- Provides event observation hooks
let engine = Engine::builder()
.add_layer(process_layer)
.add_slice(slice1)
.add_slice(slice2)
.build()?;
let results = engine.run(RunFlags::default());sandl relies on rayon for its parallelism, whose parameters can be configured with EngineConfig:
let config = EngineConfig::new()
// Rayon configuration
.num_threads(4)
.stack_size(MiB!(2)) // Expands to 2 * 1024 * 1024
.chunk_size(100)
// sandl specific. Set this to enable batching
.batch_size(1000);
let engine = Engine::builder()
.config(config)
.build()?;
let results = engine.run(RunFlags::default());You can also pass run flags to the engine:
let results = engine.run(RunFlags::TRACKED); // Default. Tracks runtime data and prints to stdout.
let results = engine.run(RunFlags::SILENT); // Prints nothing (still tracks timing data).
let results = engine.run(RunFlags::SILENT_NO_OBSERVER); // Minimal overhead - Nothing but runtime.
// run() still returns RunResults in all cases, so you can do your own processing *after* it's done.Overhead from stdout writes and from observer hooks is minimal, but it exists.
A Context provides thread-safe, per-slice shared state during execution. Methods can read from and write to the context:
.bind(|args, ctx| {
ctx.set("result", value!(42));
let prev = ctx.get("result");
Ok(value!(null))
})Beware the shared state. Methods within a slice run in parallel, so all behavior is undefined by default. You can set dependencies amongst layers in the engine builder:
// Use the builder...
let engine = Engine::builder()
.add_layer(process_layer)
.add_layer(show_layer)
.dependency("show", "process") // show depends on process
.add_slice(slice1)
.add_slice(slice2)
.build()?;
// ...or macros
let engine = dependencies!(
add_layers!(Engine::builder(), layer1, layer2, layer3, layer4),
"layer2" => ["layer1"], // layer => depends_on[layers]
"layer3" => ["layer2", "layer4"]
)
.add_slice(slice)
.build()?;You can also set an initialization layer - All layers will depend on it:
// Here, we're using the quick_layer! macro to create a layer with a single method and some arg type.
let init = quick_layer!("init", "setup", Value, |_args, ctx| {
ctx.set("config", Value::from("test")); // Values set in the init layer...
ctx.set("version", Value::from(1));
Ok(value!({}))
});
let verify = quick_layer!("verify", "check", Value, |_args, ctx| {
let config: String = ctx.get_as("config")?; // ...can be acessed in other layers, safely!
let version: i64 = ctx.get_as("version")?;
assert_eq!(config, "test");
assert_eq!(version, 1);
Ok(value!({}))
});
let slice = Slice::builder("test")
.layer("init", |m| m.call_default("setup"))
.layer("verify", |m| m.call_default("check"))
.build();
let engine = Engine::builder()
.add_layer(init)
.add_layer(verify)
.init_layer("init") // Every other layer now depends on "init"
.add_slice(slice)
.build()?;You can inspect the runtime by creating an observer:
// Create an observer with various event handlers
let mut observer = Observer::new();
// Track when slices start and complete
observer.on_slice_start(|slice_name| {
println!("Starting slice: {}", slice_name);
});
observer.on_slice_complete(|slice_name, duration| {
println!("Completed slice: {} in {:?}", slice_name, duration);
});
// Track when methods are called
observer.on_method_start(|slice_name, layer, method| {
println!(" Executing: {}.{}.{}", slice_name, layer, method);
});
observer.on_method_complete(|slice_name, layer, method, duration| {
println!(" {}.{}.{} took {:?}", slice_name, layer, method, duration);
});
// Hook into failures
observer.on_method_failed(|slice_name, layer, method, error| {
println!(" {}.{}.{} failed: {}", slice_name, layer, method, error);
});
// Define a simple computational layer
let compute = Layer::builder("calculator")
.method("divide")
.args::<CalculatorArgs>() // {x: i32, y: i32}, derives Args
.bind(|args, _ctx| {
if args.y == 0 {
Err(execution_error!("Division by zero")) // sandl handles failure gracefully
} else {
Ok(value!(args.x / args.y))
}
})
.build();
let mut slices = vec![
// Successful slices
Slice::builder("slice_1")
.layer("calculator", |m| {
m.call("divide", CalculatorArgs { x: 6, y: 7 })
})
.build()
];
let engine = Engine::builder()
.add_layer(compute)
.add_slices(&mut slices)
.observer(observer) // Set the observer
.build()?;
// Inspect errors with one of the many helper methods
let failures = results.get_all_method_errors();Quickly define a single-method layer:
let init = quick_layer!("init", "setup", Value, |_args, ctx| {
ctx.set("ready", Value::from(true));
Ok(value!({}))
});Construct Value instances easily:
let v = value!({ "name": "Alice", "age": 30, "scores": [85, 90, 95] });sandl Value is fully compatible with serde_json::Value.
Fluently add multiple items:
let builder = add_slices!(Engine::builder(), slice1, slice2, slice3);
let builder = add_layers!(Engine::builder(), layer1, layer2);Define multiple dependencies concisely:
let builder = dependencies!(
Engine::builder(),
"layer2" => ["layer1"]
);When using serde, wrap types with a sandl compatibility layer:
json_wrapper!(pub WrapperType, HashMap<String, CustomType>); // pub is optional!Inside a method, quickly return an error:
.args::<CalculatorArgs>()
.bind(|args, _ctx| {
if args.y == 0 {
Err(execution_error!("Division by zero"))
} else {
Ok(value!(args.x / args.y))
}
})This error is automatically wrapped with runtime context:
MethodExecutionFailed {
slice: String,
layer: String,
method: String,
args: Value,
cause: Box<Error>,
},Useful when configuring stack_size for rayon:
let two_kilobytes = KiB!(2);
let two_megabytes = MiB!(2);
let two_gigabytes = GiB!(2);The RunResults type provides rich analysis:
let results = engine.run(RunFlags::default());
println!("Total slices: {}", results.total_slices());
println!("Successful: {}", results.successful_slices());
println!("Failed methods: {}", results.failed_methods());
// fn total_slices(&self) -> usize;
// fn successful_slices(&self) -> usize;
// fn failed_slices(&self) -> usize;
// fn total_methods(&self) -> usize;
// fn successful_methods(&self) -> usize;
// fn failed_methods(&self) -> usize;
// fn is_all_success(&self) -> bool;
// fn has_failures(&self) -> bool;
// fn summary(&self) -> String;
// fn get_slice_errors(&self) -> Vec<(&String, &Error)>;
// fn get_all_method_errors(&self) -> Vec<(&String, &String, &String, &Error)>;
// fn get_execution_errors(&self) -> Vec<(&String, &String, &String, &Error)>;
// fn from_slice(&self, slice_name: &str) -> Option<&Result<SliceResults>>;
// fn slice_names(&self) -> Vec<&String>;
// fn average_slice_duration(&self) -> Option<Duration>;
// fn min_slice_duration(&self) -> Option<Duration>;
// fn max_slice_duration(&self) -> Option<Duration>;
// fn timing_summary(&self) -> String;
if results.has_failures() {
for (slice, layer, method, error) in results.get_execution_errors() {
eprintln!("Error in {}.{}.{}: {}", slice, layer, method, error);
}
}
// Get results for a specific slice
if let Some(slice_result) = results.from_slice("slice1") {
// Process slice-specific results
}Slice results contain whatever was returned from each method that was run, as well as how long it took to run the whole slice:
pub struct SliceResults {
// HashMap<(layer, method), Result>
pub method_results: HashMap<(String, String), Result<Value>>,
pub duration: Duration,
}sandl adds minimal overhead over rayon. For maximum performance:
- Use
RunFlags::SILENT_NO_OBSERVER - Consider larger batch sizes and smaller chunks for 10ms~ workloads
- Limit stack size per worker thread
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.
MIT OR Apache-2.0