#directed-acyclic-graph #parallel-execution #async-task #dag #executor #task-executor

dagx

A minimal, type-safe, runtime-agnostic async DAG (Directed Acyclic Graph) executor with compile-time cycle prevention and true parallel execution

7 releases

0.3.1 Oct 10, 2025
0.3.0 Oct 8, 2025
0.2.3 Oct 8, 2025
0.1.0 Oct 7, 2025

#145 in Concurrency

MIT license

200KB
3K SLoC

Rust 3K SLoC // 0.1% comments Shell 350 SLoC // 0.1% comments

dagx

Crates.io Documentation Build Status License: MIT Rust Version

A minimal, type-safe, runtime-agnostic async DAG (Directed Acyclic Graph) executor with compile-time cycle prevention and true parallel execution.

Why dagx?

🚀 Blazing Fast: 1.04-129x faster than dagrs

Workload Tasks dagx dagrs Speedup
Sequential chain 5 3.0 µs 385 µs 129x faster 🚀
Sequential chain 100 79 µs 703 µs 8.9x faster
Diamond pattern 4 11 µs 387 µs 34x faster
Fan-out (1→100) 101 155 µs 595 µs 3.85x faster
Independent tasks 10,000 12.7 ms 13.3 ms 1.04x faster

Per-task overhead:

  • Construction: ~100 ns/task
  • Inline execution (sequential): ~790 ns/task
  • Parallel execution: ~1.3 µs/task

🛡️ Compile-Time Safety

  • Cycles are impossible — the type system prevents them at compile time, zero runtime overhead
  • No runtime type errors — dependencies validated at compile time
  • Compiler-verified correctness — no surprise failures in production

See how it works.

✨ Simple API

let sum = dag.add_task(Add).depends_on((&x, &y));
dag.run(|fut| tokio::spawn(fut)).await?;

That's it. No trait boilerplate, no manual channels, no node IDs.

Quick Start

Add to your Cargo.toml:

[dependencies]
dagx = "0.3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Basic example:

use dagx::{task, DagRunner, Task};

// Define tasks with the #[task] macro

struct Value(i32);

#[task]
impl Value {
    async fn run(&self) -> i32 {
        self.0
    }
}

struct Add;

#[task]
impl Add {
    async fn run(a: &i32, b: &i32) -> i32 {
        a + b
    }
}

#[tokio::main]
async fn main() {
    let dag = DagRunner::new();

    // Add source tasks with no dependencies
    let x = dag.add_task(Value(2));
    let y = dag.add_task(Value(3));

    // Add task that depends on both x and y
    let sum = dag.add_task(Add).depends_on((&x, &y));

    // Execute with true parallelism
    dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();

    // Retrieve results
    assert_eq!(dag.get(sum).unwrap(), 5);
}

Performance

dagx provides true parallel execution with sub-microsecond overhead per task.

Why is dagx so fast?

  • Inline fast-path: Sequential chains execute inline without spawning (8.9-129x faster)
  • Primitives as scheduler: No custom scheduler — channels coordinate execution
  • Adaptive execution: Inline for sequential work, true parallelism for concurrent work
  • Zero-cost abstractions: Generics and monomorphization eliminate overhead

See design philosophy for details.

Features

  • Compile-time cycle prevention: Type system makes cycles impossible — no runtime checks
  • Compile-time type safety: Dependencies validated at compile time, no runtime type errors
  • Works with ANY type: Custom types work automatically — just Clone + Send + Sync, no trait implementations needed
  • Runtime-agnostic: Works with Tokio, async-std, smol, or any async runtime
  • True parallelism: Tasks spawn to multiple threads for genuine parallel execution
  • Type-state pattern: API prevents incorrect wiring through compile-time errors
  • Zero-cost abstractions: Leverages generics and monomorphization for minimal overhead
  • Flexible task patterns: Supports stateless, read-only, and mutable state tasks
  • Simple API: Just #[task], DagRunner, TaskHandle, and TaskBuilder
  • Comprehensive error handling: Result-based errors with actionable messages
  • Optional tracing: Zero-cost observability via optional tracing feature flag

Core Concepts

Task Patterns

dagx supports three task patterns:

1. Stateless - Pure functions with no state:

struct Add;

#[task]
impl Add {
    async fn run(a: &i32, b: &i32) -> i32 { a + b }
}

2. Read-only state - Configuration accessed via &self:

struct Multiplier(i32);

#[task]
impl Multiplier {
    async fn run(&self, input: &i32) -> i32 { input * self.0 }
}

3. Mutable state - State modification via &mut self:

struct Counter(i32);

#[task]
impl Counter {
    async fn run(&mut self, value: &i32) -> i32 {
        self.0 += value;
        self.0
    }
}

DagRunner

The DagRunner orchestrates task execution:

let dag = DagRunner::new();
let handle = dag.add_task(MyTask::new());

TaskHandle

A TaskHandle<T> is a typed reference to a task's output. Use it to wire dependencies and retrieve results:

// Single dependency
let task = dag.add_task(my_task).depends_on(&upstream);

// Multiple dependencies (order matters!)
let task = dag.add_task(my_task).depends_on((&upstream1, &upstream2));

Custom Types

dagx works with ANY type automatically! As long as your type implements Clone + Send + Sync + 'static, it works seamlessly:

#[derive(Clone)]  // Just derive Clone!
struct User {
    name: String,
    age: u32,
}

#[task]
impl CreateUser {
    async fn run(&self) -> User {
        User { name: "Alice".to_string(), age: 30 }
    }
}

#[task]
impl ProcessUser {
    async fn run(user: &User) -> String {
        format!("{} is {} years old", user.name, user.age)
    }
}

No trait implementations needed! The #[task] macro generates type-specific extraction logic automatically. This includes nested structs, collections, enums, and any other type you define.

See custom_types.rs for a complete example with nested custom types.

Runtime Agnostic

dagx works with any async runtime. Provide a spawner function to run():

// With Tokio
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();

// With async-std
dag.run(|fut| { async_std::task::spawn(fut); }).await.unwrap();

// With smol
dag.run(|fut| { smol::spawn(fut).detach(); }).await.unwrap();

Tutorials & Examples

Tutorials (Start Here)

Step-by-step introduction to dagx:

Run tutorial examples:

cargo run --example 01_basic
cargo run --example 02_fan_out
cargo run --example 03_fan_in
cargo run --example 04_parallel_computation

Advanced Examples

Real-world patterns:

Run any example: cargo run --example custom_types

Advanced Topics

Detailed documentation on dagx internals and advanced features:

Important Limitations

Tasks Cannot Return Bare Tuples

Tasks cannot return bare tuples as output types. If you need to return multiple values, use one of these workarounds:

Option 1: Use a struct (recommended)

struct UserData {
    name: String,
    age: i32,
}

struct FetchUser;

#[task]
impl FetchUser {
    async fn run(id: &i32) -> UserData {
        UserData {
            name: "Alice".to_string(),
            age: 30,
        }
    }
}

Option 2: Wrap in Result

struct FetchData;

#[task]
impl FetchData {
    async fn run(id: &i32) -> Result<(String, i32), String> {
        Ok(("Alice".to_string(), 30))
    }
}

Structs are preferred because they're self-documenting and easier to refactor.

When to Use dagx

dagx is ideal for:

  • Data pipelines with complex dependencies between stages
  • Build systems where tasks depend on outputs of other tasks
  • Parallel computation where work can be split and aggregated
  • Workflow engines with typed data flow between stages
  • ETL processes with validation and transformation steps

Benchmarks

Run the full benchmark suite:

cargo bench

View detailed HTML reports:

# macOS
open target/criterion/report/index.html

# Linux
xdg-open target/criterion/report/index.html

# Windows
start target/criterion/report/index.html

Benchmarks run on AMD Ryzen 7 7840U (Zen 4) @ 3.3GHz.

Code of Conduct

This project follows the Builder's Code of Conduct.

Documentation

Full API documentation is available at docs.rs/dagx.

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for guidelines.

For security issues, see SECURITY.md.

License

Licensed under the MIT License. See LICENSE for details.

Copyright (c) 2025 Stephen Waits steve@waits.net

Dependencies

~1.1–3.5MB
~51K SLoC