goworker2 is a Go-based background job processing library with pluggable components. It provides a clean, modular architecture supporting multiple queue backends, serializers, and statistics providers.
Originally inspired by Resque-compatible job processing, goworker2 has evolved into a flexible framework that can work with Redis, RabbitMQ, and custom backends.
Note: This is a complete rewrite and modernization of the original goworker library by Benjamin Manns, designed as a new project rather than a backwards-compatible upgrade. We're grateful for the inspiration and foundation provided by the original work.
- Multiple Queue Backends: Redis, RabbitMQ, or bring your own
- Pluggable Serializers: JSON, Resque, Sneakers/ActiveJob, or custom formats
- Statistics Providers: Resque-compatible, NoOp, or custom monitoring
- Pre-configured Engines: Ready-to-use setups for common scenarios
- Graceful Shutdown: Proper signal handling and worker cleanup
- Concurrent Processing: Configurable worker pools with job distribution
- Health Monitoring: Built-in health checks and statistics
The easiest way to get started is with pre-configured engines:
package main
import (
"context"
"log"
"time"
"github.com/BranchIntl/goworker2/engines"
)
func emailJob(queue string, args ...interface{}) error {
// Process email job
return nil
}
func main() {
options := engines.DefaultResqueOptions()
options.Queues = []string{"email", "default"}
options.PollInterval = 3 * time.Second
engine := engines.NewResqueEngine(options)
engine.Register("EmailJob", emailJob)
if err := engine.Run(context.Background()); err != nil {
log.Fatal(err)
}
}package main
import (
"context"
"log"
"github.com/BranchIntl/goworker2/engines"
)
func imageProcessor(queue string, args ...interface{}) error {
// Process image
return nil
}
func main() {
options := engines.DefaultSneakersOptions()
options.Queues = []string{"images", "default"}
engine := engines.NewSneakersEngine(options)
engine.Register("ImageProcessor", imageProcessor)
if err := engine.Run(context.Background()); err != nil {
log.Fatal(err)
}
}For more control, you can configure components manually:
package main
import (
"context"
"log"
"time"
"github.com/BranchIntl/goworker2/brokers/redis"
"github.com/BranchIntl/goworker2/core"
"github.com/BranchIntl/goworker2/registry"
"github.com/BranchIntl/goworker2/serializers/resque"
"github.com/BranchIntl/goworker2/statistics/resque"
)
func main() {
// Configure broker with queues
brokerOpts := redis.DefaultOptions()
brokerOpts.Queues = []string{"critical", "default"}
brokerOpts.PollInterval = 5 * time.Second
// Create components
serializer := resque.NewSerializer()
broker := redis.NewBroker(brokerOpts, serializer)
stats := resque.NewStatistics(resque.DefaultOptions())
registry := registry.NewRegistry()
// Create engine with custom options
engine := core.NewEngine(
broker,
stats,
registry,
core.WithConcurrency(10),
core.WithShutdownTimeout(30*time.Second),
core.WithJobBufferSize(200),
)
// Register workers
registry.Register("MyJob", func(queue string, args ...interface{}) error {
// Handle job
return nil
})
// Start processing
if err := engine.Run(context.Background()); err != nil {
log.Fatal(err)
}
}go get github.com/BranchIntl/goworker2This project was inspired by and builds upon the concepts from the original goworker library by Benjamin Manns. While this is a complete rewrite with different architecture and capabilities, we acknowledge and appreciate the foundational work that made this project possible.
goworker2 uses a modular architecture with dependency injection:
┌─────────────────┐
│ Engine │ ← Orchestrates components
├─────────────────┤
│ Broker │ ← Queue backend with job consumption
│ Statistics │ ← Metrics and monitoring
│ Registry │ ← Worker function registry
│ Serializer │ ← Job serialization format
│ WorkerPool │ ← Manages concurrent workers
└─────────────────┘
- Broker: Handles queue operations and job consumption (enqueue, ack/nack, polling/pushing)
- Statistics: Records metrics and worker information
- Registry: Maps job classes to worker functions
- Serializer: Converts jobs to/from bytes
- Engine: Orchestrates all components and handles lifecycle
- ResqueEngine: Redis + Resque serializer + Resque statistics (Ruby Resque compatibility)
- SneakersEngine: RabbitMQ + ActiveJob serializer + NoOp statistics (Rails ActiveJob compatibility)
See engines/ directory for detailed engine documentation.
engine := core.NewEngine(
broker, stats, registry, serializer,
core.WithConcurrency(25), // Number of workers
core.WithShutdownTimeout(30*time.Second), // Graceful shutdown timeout
core.WithJobBufferSize(100), // Job channel buffer
)options := redis.DefaultOptions()
options.URI = "redis://localhost:6379/"
options.Namespace = "jobs:"
options.Queues = []string{"high", "low"} // Queues to consume from
options.PollInterval = 5 * time.Second // Polling frequency
options.MaxConnections = 10options := rabbitmq.DefaultOptions()
options.URI = "amqp://guest:guest@localhost:5672/"
options.Exchange = "jobs"
options.Queues = []string{"high", "low"} // Queues to consume from
options.PrefetchCount = 1goworker2 uses Go's standard log/slog library for structured logging. By default, it uses the default slog logger. To customize logging, configure your logger before creating the engine using slog.SetDefault(logger). For example: slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))).
Worker functions must match this signature:
func(queue string, args ...interface{}) errorUse Go type assertions to handle job arguments:
func processUser(queue string, args ...interface{}) error {
if len(args) != 2 {
return fmt.Errorf("expected 2 arguments, got %d", len(args))
}
userID, ok := args[0].(float64) // JSON numbers are float64
if !ok {
return fmt.Errorf("invalid user ID type")
}
action, ok := args[1].(string)
if !ok {
return fmt.Errorf("invalid action type")
}
// Process user
return processUserAction(int(userID), action)
}goworker handles these signals automatically:
- SIGINT/SIGTERM: Graceful shutdown
- Custom signals: Can be handled in advanced examples
// Automatic signal handling
engine.Run(ctx) // Blocks until SIGINT/SIGTERM
// Manual control
engine.Start(ctx)
// ... custom signal handling ...
engine.Stop()For testing, use mocks or lightweight alternatives like miniredis for Redis, or run actual brokers in Docker containers for integration tests.
Complete working examples are available in the examples/ directory covering both pre-configured engines and manual component setup.
health := engine.Health()
fmt.Printf("Healthy: %v\n", health.Healthy)
fmt.Printf("Active Workers: %d\n", health.ActiveWorkers)
for queue, count := range health.QueuedJobs {
fmt.Printf("Queue %s: %d jobs\n", queue, count)
}stats, err := engine.GetStats().GetGlobalStats(ctx)
if err == nil {
fmt.Printf("Total Processed: %d\n", stats.TotalProcessed)
fmt.Printf("Total Failed: %d\n", stats.TotalFailed)
}- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -am 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.