Skip to content

A distributed task queue written in Go with PostgreSQL backend, supporting job scheduling, retries with exponential backoff, and worker pools.

Notifications You must be signed in to change notification settings

0xb0b1/taskqueue

Repository files navigation

TaskQueue

A distributed task queue written in Go with PostgreSQL backend, supporting job scheduling, retries with exponential backoff, and worker pools.

Features

  • Worker Pools: Configurable concurrency with graceful shutdown
  • Job Scheduling: Immediate, delayed, and scheduled job execution
  • Retry Logic: Exponential backoff with jitter to prevent thundering herd
  • Dead Letter Queue: Failed jobs moved to DLQ after max retries
  • Priority Queues: Jobs processed by priority within each queue
  • REST API: Full-featured HTTP API for job management
  • gRPC API: High-performance RPC interface
  • Observability: Prometheus metrics and structured JSON logging
  • PostgreSQL Backend: ACID-compliant storage with FOR UPDATE SKIP LOCKED for concurrent safety

Quick Start

Using Docker Compose

# Start all services (PostgreSQL, TaskQueue, Prometheus, Grafana)
docker-compose up -d

# Check logs
docker-compose logs -f taskqueue

Local Development

# Start PostgreSQL and run migrations
make dev-db

# Run the server
make dev

API Usage

Create a Job

curl -X POST http://localhost:8080/api/v1/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "queue": "default",
    "type": "echo",
    "payload": {"message": "Hello, World!"},
    "priority": 10,
    "max_retries": 3
  }'

Create a Scheduled Job

curl -X POST http://localhost:8080/api/v1/jobs \
  -H "Content-Type: application/json" \
  -d '{
    "queue": "default",
    "type": "echo",
    "payload": {"message": "Scheduled job"},
    "scheduled_at": "2024-12-06T10:00:00Z"
  }'

Get Job Status

curl http://localhost:8080/api/v1/jobs/{job_id}

Get Job Result

curl http://localhost:8080/api/v1/jobs/{job_id}/result

List Queue Jobs

curl "http://localhost:8080/api/v1/queues/default/jobs?limit=10&offset=0"

Get Queue Stats

curl http://localhost:8080/api/v1/queues/default/stats

Health Check

curl http://localhost:8080/api/v1/health

Configuration

Flag Environment Variable Default Description
-http-addr HTTP_ADDR :8080 HTTP server address
-metrics-addr METRICS_ADDR :9090 Metrics server address
-database-url DATABASE_URL postgres://localhost:5432/taskqueue?sslmode=disable PostgreSQL connection URL
-concurrency - 10 Number of concurrent workers
-queue - default Queue name to process
-poll-interval - 1s Job polling interval

Architecture

┌─────────────────────────────────────────────────────────────┐
│                        TaskQueue                             │
├──────────────┬──────────────┬──────────────┬────────────────┤
│   REST API   │   gRPC API   │  Scheduler   │  Worker Pool   │
│   (HTTP)     │   (Proto)    │              │                │
├──────────────┴──────────────┴──────────────┴────────────────┤
│                       Job Store                              │
├─────────────────────────────────────────────────────────────┤
│                      PostgreSQL                              │
└─────────────────────────────────────────────────────────────┘

Job Lifecycle

Created → Pending → Running → Completed
                 ↓          ↓
           Scheduled    Failed → Pending (retry)
                              ↓
                            Dead (max retries exceeded)

Metrics

Available at /metrics on the metrics port (default 9090):

  • taskqueue_jobs_created_total - Total jobs created by queue and type
  • taskqueue_jobs_processed_total - Total jobs processed with status
  • taskqueue_jobs_in_flight - Currently processing jobs
  • taskqueue_job_duration_seconds - Job processing duration histogram
  • taskqueue_job_retries_total - Total retry attempts
  • taskqueue_queue_depth - Pending jobs per queue
  • taskqueue_store_latency_seconds - Database operation latency

Registering Custom Handlers

registry := domain.NewHandlerRegistry()

// Register a function handler
registry.RegisterFunc("send-email", func(ctx context.Context, job *domain.Job) error {
    var payload EmailPayload
    if err := json.Unmarshal(job.Payload, &payload); err != nil {
        return err
    }
    return emailService.Send(payload.To, payload.Subject, payload.Body)
})

// Register a struct handler
registry.Register("process-image", &ImageProcessor{})

Development

# Run tests
make test

# Run tests with coverage
make test-coverage

# Lint code
make lint

# Build binary
make build

# Build Docker image
make docker-build

License

MIT

About

A distributed task queue written in Go with PostgreSQL backend, supporting job scheduling, retries with exponential backoff, and worker pools.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published