A modular Python framework for building ML infrastructure: microservices, artifact registries, job orchestration, hardware integrations, and more.
π Docs Β· π‘ Samples Β· π€ Contributing
pip install mindtrace
# or
uv add mindtraceOr install only what you need:
pip install mindtrace-services # Microservices
pip install mindtrace-registry # Artifact storage
pip install mindtrace-cluster # Distributed workersfrom mindtrace.core import Mindtrace
class MyProcessor(Mindtrace):
def run(self):
# self.config and self.logger are provided automatically
self.logger.error(f"Cache dir: {self.config.MINDTRACE_DIR_PATHS.ROOT}")
processor = MyProcessor()
processor.run()
# [2026-01-08 10:39:42] ERROR: MyProcessor: Cache dir: ~/.cache/mindtracefrom mindtrace.services.samples.echo_service import EchoService
# Launch service and get auto-generated client
client = EchoService.launch(port=8080)
result = client.echo(message="Hello, world!")
print(result.echoed) # "Hello, world!"
client.shutdown()Define your own service (must be in an importable module):
# mypackage/predictor.py
from pydantic import BaseModel
from mindtrace.services import Service
from mindtrace.core import TaskSchema
class PredictInput(BaseModel):
text: str
class PredictOutput(BaseModel):
label: str
confidence: float
predict_schema = TaskSchema(
name="predict",
input_schema=PredictInput,
output_schema=PredictOutput,
)
class PredictorService(Service):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.add_endpoint("predict", self.predict, schema=predict_schema)
def predict(self, payload: PredictInput) -> PredictOutput:
return PredictOutput(label="positive", confidence=0.95)from mindtrace.registry import Registry
import numpy as np
registry = Registry()
# Save anything: arrays, datasets, configs, dicts
embeddings = np.random.rand(100, 768).astype(np.float32)
registry.save("data:embeddings:v1", embeddings)
# Load it back (with automatic versioning)
loaded = registry.load("data:embeddings:v1")
print(f"Loaded: {loaded.shape}, {loaded.dtype}")
# Loaded: (100, 768), float32from mindtrace.core import ObservableContext
@ObservableContext(vars=["status", "progress"])
class Pipeline:
def __init__(self):
self.status = "idle"
self.progress = 0
def on_change(source, var, old, new):
print(f"{var}: {old} β {new}")
pipeline = Pipeline()
pipeline.subscribe(on_change, "context_updated")
pipeline.status = "running" # prints: status: idle β running
pipeline.progress = 50 # prints: progress: 0 β 50| Module | Description |
|---|---|
core |
Config, logging, observables, base classes |
services |
Microservice framework with auto-generated clients |
registry |
Versioned artifact storage (models, datasets, configs) |
database |
Redis & MongoDB ODM with async support |
cluster |
Distributed worker orchestration |
jobs |
Job schemas and execution backends |
hardware |
Camera, PLC, and sensor integrations |
datalake |
Query and manage datasets, models, labels, and datums |
models |
Model definitions, inference, and leaderboards |
storage |
Cloud storage interfaces (GCS, S3) |
automation |
Pipeline orchestration and Label Studio integration |
ui |
UI components and visualization |
apps |
End-user applications and demos |
Modules are organized into levels based on dependency direction. Each layer only depends on modules in lower levels.
| Level | Modules |
|---|---|
| 1. Foundation | core |
| 2. Core Consumers | jobs, registry, database, services, storage, ui |
| 3. Infrastructure | hardware, cluster, datalake, models |
| 4. Automation | automation |
| 5. Applications | apps |