A distributed task queue written in Go with PostgreSQL backend, supporting job scheduling, retries with exponential backoff, and worker pools.
- Worker Pools: Configurable concurrency with graceful shutdown
- Job Scheduling: Immediate, delayed, and scheduled job execution
- Retry Logic: Exponential backoff with jitter to prevent thundering herd
- Dead Letter Queue: Failed jobs moved to DLQ after max retries
- Priority Queues: Jobs processed by priority within each queue
- REST API: Full-featured HTTP API for job management
- gRPC API: High-performance RPC interface
- Observability: Prometheus metrics and structured JSON logging
- PostgreSQL Backend: ACID-compliant storage with
FOR UPDATE SKIP LOCKEDfor concurrent safety
# Start all services (PostgreSQL, TaskQueue, Prometheus, Grafana)
docker-compose up -d
# Check logs
docker-compose logs -f taskqueue# Start PostgreSQL and run migrations
make dev-db
# Run the server
make devcurl -X POST http://localhost:8080/api/v1/jobs \
-H "Content-Type: application/json" \
-d '{
"queue": "default",
"type": "echo",
"payload": {"message": "Hello, World!"},
"priority": 10,
"max_retries": 3
}'curl -X POST http://localhost:8080/api/v1/jobs \
-H "Content-Type: application/json" \
-d '{
"queue": "default",
"type": "echo",
"payload": {"message": "Scheduled job"},
"scheduled_at": "2024-12-06T10:00:00Z"
}'curl http://localhost:8080/api/v1/jobs/{job_id}curl http://localhost:8080/api/v1/jobs/{job_id}/resultcurl "http://localhost:8080/api/v1/queues/default/jobs?limit=10&offset=0"curl http://localhost:8080/api/v1/queues/default/statscurl http://localhost:8080/api/v1/health| Flag | Environment Variable | Default | Description |
|---|---|---|---|
-http-addr |
HTTP_ADDR |
:8080 |
HTTP server address |
-metrics-addr |
METRICS_ADDR |
:9090 |
Metrics server address |
-database-url |
DATABASE_URL |
postgres://localhost:5432/taskqueue?sslmode=disable |
PostgreSQL connection URL |
-concurrency |
- | 10 |
Number of concurrent workers |
-queue |
- | default |
Queue name to process |
-poll-interval |
- | 1s |
Job polling interval |
┌─────────────────────────────────────────────────────────────┐
│ TaskQueue │
├──────────────┬──────────────┬──────────────┬────────────────┤
│ REST API │ gRPC API │ Scheduler │ Worker Pool │
│ (HTTP) │ (Proto) │ │ │
├──────────────┴──────────────┴──────────────┴────────────────┤
│ Job Store │
├─────────────────────────────────────────────────────────────┤
│ PostgreSQL │
└─────────────────────────────────────────────────────────────┘
Created → Pending → Running → Completed
↓ ↓
Scheduled Failed → Pending (retry)
↓
Dead (max retries exceeded)
Available at /metrics on the metrics port (default 9090):
taskqueue_jobs_created_total- Total jobs created by queue and typetaskqueue_jobs_processed_total- Total jobs processed with statustaskqueue_jobs_in_flight- Currently processing jobstaskqueue_job_duration_seconds- Job processing duration histogramtaskqueue_job_retries_total- Total retry attemptstaskqueue_queue_depth- Pending jobs per queuetaskqueue_store_latency_seconds- Database operation latency
registry := domain.NewHandlerRegistry()
// Register a function handler
registry.RegisterFunc("send-email", func(ctx context.Context, job *domain.Job) error {
var payload EmailPayload
if err := json.Unmarshal(job.Payload, &payload); err != nil {
return err
}
return emailService.Send(payload.To, payload.Subject, payload.Body)
})
// Register a struct handler
registry.Register("process-image", &ImageProcessor{})# Run tests
make test
# Run tests with coverage
make test-coverage
# Lint code
make lint
# Build binary
make build
# Build Docker image
make docker-buildMIT