Skip to content

BDHU/thinkagain

Repository files navigation

ThinkAgain

PyPI version License Ask DeepWiki GitHub Workflow Status CodSpeed Badge uv

A minimal, debuggable framework for async-first AI pipelines. Write small async functions, wrap them in Node objects, and chain them through a lazy Context that only runs when you need results. Contexts track dependencies via back-pointers, enabling DAG execution with automatic deduplication for fanout patterns. When pipelines need stateful helpers (LLMs, retrievers, tools) you can deploy them as replica pools that run locally or behind a gRPC server.

Why ThinkAgain?

  • Declarative – Build pipelines by chaining Node objects in plain Python
  • LazyContext collects pending nodes and executes them on demand
  • DAG Execution – Back-pointer tracking enables fanout with automatic deduplication
  • Multi-Input Nodes – Nodes can accept multiple context arguments
  • Minimal – Small surface area, no DSLs or schedulers
  • Async-first – Async nodes with sync and async entrypoints (run / arun)
  • Replica-aware – Optional distributed runtime for stateful service pools

Core Concepts

  • Context – Wrapper around user data that tracks parent contexts via back-pointers.
  • Node / @node – Wrap an async def so it can be lazily chained. Nodes can accept multiple inputs.
  • run / arun – Helpers that normalize inputs and materialize pending nodes via DAG traversal.
  • replica – Decorator that registers a class as a managed pool of workers.
  • distributed.runtime – Context manager that deploys registered replicas.

Installation

pip install thinkagain
# or with uv
uv add thinkagain

Quick Start

from dataclasses import dataclass
from thinkagain import node, run

@dataclass
class State:
    query: str
    documents: list[str] | None = None
    answer: str = ""

@node
async def retrieve(s: State) -> State:
    return State(query=s.query, documents=["doc1", "doc2"])

@node
async def generate(s: State) -> State:
    docs = s.documents or []
    return State(query=s.query, documents=docs, answer=f"Answer based on {docs}")

def pipeline(ctx):
    ctx = retrieve(ctx)
    ctx = generate(ctx)
    return ctx

result = run(pipeline, State(query="What is ML?"))
print(result.data.answer)

Nodes receive and return plain Python values (dataclasses, dicts, etc.) which are automatically wrapped in Context. The context materializes pending nodes whenever you access ctx.data, call run, arun, or await ctx, so normal Python control flow (if, while, recursion) just works.

Distributed Replica Pools

Need a stateful helper (LLM, vector store, tool adapter)? Decorate the class with @replica and let ThinkAgain manage the pool. The decorator returns a ReplicaHandle with deploy/shutdown/get methods; the original class is available as handle.cls if needed.

from dataclasses import dataclass
from thinkagain import node, replica, distributed, run

@dataclass
class ChatState:
    prompt: str
    reply: str = ""

@replica(n=2)
class FakeLLM:
    def __init__(self, prefix="Bot"):
        self.prefix = prefix

    def invoke(self, prompt: str) -> str:
        return f"{self.prefix}: {prompt}"

@node
async def call_llm(s: ChatState) -> ChatState:
    llm = FakeLLM.get()
    return ChatState(prompt=s.prompt, reply=llm.invoke(s.prompt))

def pipeline(ctx):
    return call_llm(ctx)

with distributed.runtime():
    result = run(pipeline, ChatState(prompt="Hello"))
    print(result.data.reply)

Need isolation between different replica registries? Create a ReplicaManager and pass it to @replica(manager=...) and distributed.runtime(manager=...).

For remote deployments run the bundled gRPC server next to your replica classes and call distributed.runtime(backend="grpc", address="host:port").

Examples

Run the declarative demo to see conditional branches and loops:

python examples/demo.py

Documentation

License

Apache 2.0 – see LICENSE

Sponsor this project

 

Packages

No packages published

Contributors 3

  •  
  •  
  •  

Languages