protoflow

package module
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: MIT Imports: 13 Imported by: 0

README

Protoflow

Go Reference Go Report Card CI Coverage Latest Tag License

Stop writing plumbing. Start shipping features.

Protoflow is a productivity layer for Watermill that simplifies event-driven architecture. It manages routers, publishers, subscribers, and middleware so you can focus on your domain logic.

Whether you are using Protobufs or JSON, Protoflow provides a type-safe, production-ready foundation for Kafka, RabbitMQ, AWS SNS/SQS, NATS, HTTP, and Go Channels.

Feature Highlights

  • Type-Safe Handlers: Generic RegisterProtoHandler and RegisterJSONHandler helpers keep your code clean.
  • 7 Built-in Transports: Kafka, RabbitMQ, AWS SNS/SQS, NATS, HTTP, File I/O, and Go Channels - switch with a single config change.
  • Batteries Included: Default middleware stack with correlation IDs, structured logging, protobuf validation, outbox pattern, OpenTelemetry tracing, Prometheus metrics, retries with exponential backoff, poison queues, and panic recovery.
  • Pluggable Logging: Bring your own logger (slog, logrus, zerolog) via ServiceLogger abstraction.
  • Safe Configuration: Built-in validation with credential redaction in logs.
  • Graceful Lifecycle: Clean shutdown of HTTP servers and message routers.
  • Extensible: Custom transport factories, middleware, validators, and outbox stores.

Quick Start

  1. Install: go get github.com/drblury/protoflow (Go 1.23+).
  2. Configure: Set up protoflow.Config.
  3. Launch: Create a Service, register your handlers, and Start.

```go // 1. Configure your transport (Kafka, RabbitMQ, AWS, NATS, HTTP, IO, Channel) cfg := &protoflow.Config{ PubSubSystem: "channel", // Use in-memory channel for testing PoisonQueue: "orders.poison", }

// 2. Use your preferred logger (slog, logrus, zerolog, etc.) logger := protoflow.NewSlogServiceLogger(slog.Default()) svc := protoflow.NewService(cfg, logger, ctx, protoflow.ServiceDependencies{})

// 3. Register a strongly-typed handler must(protoflow.RegisterProtoHandler(svc, protoflow.ProtoHandlerRegistration[*models.OrderCreated]{ Name: "orders-created", ConsumeQueue: "orders.created", Handler: func(ctx context.Context, evt protoflow.ProtoMessageContext[*models.OrderCreated]) ([]protoflow.ProtoMessageOutput, error) { evt.Logger.Info("Order received", protoflow.LogFields{"id": evt.Payload.OrderId}) return nil, nil }, }))

// 4. Start the service go func() { _ = svc.Start(ctx) }() ```

Want to emit events? Need metadata handling? Check out the Handlers Guide.

Supported Transports

Transport Config Value Use Case
Go Channels "channel" Testing, local development
Kafka "kafka" High-throughput streaming
RabbitMQ "rabbitmq" Traditional message queuing
AWS SNS/SQS "aws" Cloud-native pub/sub
NATS "nats" High-performance messaging
HTTP "http" Request/response patterns
File I/O "io" Simple message persistence

Default Middleware Stack

  1. Correlation ID: Injects tracing IDs into message metadata
  2. Message Logging: Debug logging with payload and metadata
  3. Proto Validation: Schema validation for protobuf messages
  4. Outbox Pattern: Reliable message delivery via OutboxStore
  5. OpenTelemetry Tracing: Distributed tracing with span propagation
  6. Prometheus Metrics: Request counts and latencies
  7. Retry with Backoff: Configurable exponential backoff
  8. Poison Queue: Dead letter queue for unprocessable messages
  9. Panic Recovery: Converts panics to errors for graceful handling

Logging

protoflow.ServiceLogger unifies logging across the router, middleware, and transports. Wrap your favorite logger and pass it to NewService:

  • protoflow.NewSlogServiceLogger: Adapts log/slog (standard library).
  • protoflow.NewEntryServiceLogger: Adapts structured loggers (logrus, zerolog) via EntryLoggerAdapter[T].

```go svc := protoflow.NewService(cfg, protoflow.NewEntryServiceLogger(myLogrusEntry), ctx, protoflow.ServiceDependencies{}, ) ```

Error Handling

Protoflow provides TryNewService for error-returning service creation:

```go svc, err := protoflow.TryNewService(cfg, logger, ctx, deps) if err != nil { // Handle protoflow.ConfigValidationError, etc. } ```

Documentation

Examples

Check out examples/ for runnable code:

  • simple: Basic Protoflow usage with Go Channels.
  • json: Typed JSON handlers with metadata enrichment.
  • proto: Protobuf handlers with generated models.
  • full: Comprehensive example with custom middleware, validators, and outbox.

Run them with: go run ./examples/<name>

Development Workflow

We use task (Taskfile) to manage development:

  • task lint: Run golangci-lint
  • task test: Run full test suite

See the Development Guide for details.

Contributing

  1. Fork the repo and branch from main.
  2. Run task lint and task test before opening a PR.
  3. Add or update docs in docs/ for new features.
  4. Keep commits focused with context in PR descriptions.

License

Protoflow is available under the MIT License.

Documentation

Overview

Package protoflow is a small layer on top of Watermill that wires routers, publishers, subscribers, and middleware for protobuf- or JSON-driven services. It reads the target transport (Kafka, RabbitMQ, AWS SNS/SQS, NATS, HTTP, I/O, SQLite, PostgreSQL, or Go Channels) from Config, bootstraps the Watermill router, and registers the default middleware chain for correlation IDs, logging, validation, outbox persistence, tracing, retries, and poison queue forwarding.

Service hosts the router and exposes typed helpers: RegisterProtoHandler and RegisterJSONHandler take care of marshaling, metadata cloning, and optional protobuf validation, while Service.PublishProto lets HTTP/RPC handlers emit events without touching low-level Watermill APIs. A minimal setup therefore involves filling Config, creating a Service, registering handlers, and calling Start; see README.md for a copy/paste quick start snippet.

Transports

Protoflow supports 9 message transports out of the box:

  • channel: In-memory Go channels for testing
  • kafka: High-throughput streaming with consumer groups
  • rabbitmq: AMQP-based durable queues
  • aws: AWS SNS/SQS with LocalStack support
  • nats: High-performance messaging
  • http: Request/response messaging
  • io: File-based persistence
  • sqlite: Embedded persistent queue with delayed messages and DLQ management
  • postgres: Production-ready PostgreSQL queue with SKIP LOCKED and DLQ

Middleware

The default middleware chain includes correlation ID injection, structured logging, protobuf validation, outbox persistence, OpenTelemetry tracing, Prometheus metrics, retry with exponential backoff, poison queue forwarding, and panic recovery. Custom middleware can be added via ServiceDependencies.Middlewares.

Job Hooks

JobHooksMiddleware provides OnJobStart, OnJobDone, and OnJobError callbacks for custom logging, metrics collection, and alerting around handler execution.

When you need more control, ServiceDependencies exposes well-scoped hooks: bring your own OutboxStore, ProtoValidator, middleware registrations, or even an entire TransportFactory to plug in custom brokers. The README organises these knobs by topic so you can dive into the exact setting you want to adjust without rereading the whole guide.

Index

Constants

View Source
const (
	MetadataKeyCorrelationID = handlerpkg.MetadataKeyCorrelationID
	MetadataKeyEventSchema   = handlerpkg.MetadataKeyEventSchema
	MetadataKeyQueueDepth    = handlerpkg.MetadataKeyQueueDepth
	MetadataKeyEnqueuedAt    = handlerpkg.MetadataKeyEnqueuedAt
	MetadataKeyTraceID       = handlerpkg.MetadataKeyTraceID
	MetadataKeySpanID        = handlerpkg.MetadataKeySpanID

	// MetadataKeyDelay is used by SQLite and PostgreSQL transports for delayed message processing.
	// Set to a duration string like "30s", "5m", "1h".
	MetadataKeyDelay = "protoflow_delay"
)

Metadata keys - use these constants for standard metadata fields.

View Source
const (
	// ExtAttempt is the current retry attempt number (1-based).
	ExtAttempt = ce.ExtAttempt

	// ExtMaxAttempts is the maximum number of retry attempts allowed.
	ExtMaxAttempts = ce.ExtMaxAttempts

	// ExtNextAttemptAt is the RFC3339 timestamp for the next retry.
	ExtNextAttemptAt = ce.ExtNextAttemptAt

	// ExtDeadLetter indicates the event has been moved to DLQ.
	ExtDeadLetter = ce.ExtDeadLetter

	// ExtTraceID is the distributed trace ID (W3C traceparent compatible).
	ExtTraceID = ce.ExtTraceID

	// ExtParentID is the parent span ID for trace correlation.
	ExtParentID = ce.ExtParentID

	// ExtDelayMs is the delay in milliseconds before processing.
	ExtDelayMs = ce.ExtDelayMs

	// ExtEventVersion is an optional version number for the event schema.
	ExtEventVersion = ce.ExtEventVersion

	// ExtOriginalTopic stores the original topic when moved to DLQ.
	ExtOriginalTopic = ce.ExtOriginalTopic

	// ExtErrorMessage stores the last error message when moved to DLQ.
	ExtErrorMessage = ce.ExtErrorMessage

	// ExtCorrelationID is a correlation identifier for request tracing.
	ExtCorrelationID = ce.ExtCorrelationID
)

CloudEvents extension keys for protoflow reliability semantics.

View Source
const (
	ErrorCategoryNone       = runtimepkg.ErrorCategoryNone
	ErrorCategoryValidation = runtimepkg.ErrorCategoryValidation
	ErrorCategoryTransport  = runtimepkg.ErrorCategoryTransport
	ErrorCategoryDownstream = runtimepkg.ErrorCategoryDownstream
	ErrorCategoryOther      = runtimepkg.ErrorCategoryOther
)

Error category constants for ErrorClassifier.

Variables

View Source
var (
	NewService     = runtimepkg.NewService
	TryNewService  = runtimepkg.TryNewService
	ValidateConfig = configpkg.ValidateConfig

	RegisterMessageHandler  = runtimepkg.RegisterMessageHandler
	WithPublishMessageTypes = handlerpkg.WithPublishMessageTypes

	DefaultMiddlewares      = runtimepkg.DefaultMiddlewares
	CorrelationIDMiddleware = runtimepkg.CorrelationIDMiddleware
	LogMessagesMiddleware   = runtimepkg.LogMessagesMiddleware
	ProtoValidateMiddleware = runtimepkg.ProtoValidateMiddleware
	OutboxMiddleware        = runtimepkg.OutboxMiddleware
	TracerMiddleware        = runtimepkg.TracerMiddleware
	MetricsMiddleware       = runtimepkg.MetricsMiddleware
	RetryMiddleware         = runtimepkg.RetryMiddleware
	PoisonQueueMiddleware   = runtimepkg.PoisonQueueMiddleware
	RecovererMiddleware     = runtimepkg.RecovererMiddleware

	// Job lifecycle hooks
	JobHooksMiddleware = runtimepkg.JobHooksMiddleware
	LoggingHooks       = runtimepkg.LoggingHooks
	MetricsHooks       = runtimepkg.MetricsHooks
	AlertingHooks      = runtimepkg.AlertingHooks

	// DLQ metrics
	NewDLQMetrics = runtimepkg.NewDLQMetrics

	// CloudEvents constructors and helpers
	NewCloudEvent       = ce.New
	NewCloudEventWithID = ce.NewWithID

	// CloudEvents extension helpers
	GetAttempt          = ce.GetAttempt
	SetAttempt          = ce.SetAttempt
	GetMaxAttempts      = ce.GetMaxAttempts
	SetMaxAttempts      = ce.SetMaxAttempts
	IncrementAttempt    = ce.IncrementAttempt
	ExceedsMaxAttempts  = ce.ExceedsMaxAttempts
	GetNextAttemptAt    = ce.GetNextAttemptAt
	SetNextAttemptAt    = ce.SetNextAttemptAt
	SetNextAttemptAfter = ce.SetNextAttemptAfter
	IsDeadLetter        = ce.IsDeadLetter
	SetDeadLetter       = ce.SetDeadLetter
	GetOriginalTopic    = ce.GetOriginalTopic
	SetOriginalTopic    = ce.SetOriginalTopic
	GetErrorMessage     = ce.GetErrorMessage
	SetErrorMessage     = ce.SetErrorMessage
	GetTraceID          = ce.GetTraceID
	SetTraceID          = ce.SetTraceID
	GetParentID         = ce.GetParentID
	SetParentID         = ce.SetParentID
	GetCorrelationID    = ce.GetCorrelationID
	SetCorrelationID    = ce.SetCorrelationID
	GetDelayMs          = ce.GetDelayMs
	SetDelayMs          = ce.SetDelayMs
	GetDelay            = ce.GetDelay
	SetDelay            = ce.SetDelay
	GetEventVersion     = ce.GetEventVersion
	SetEventVersion     = ce.SetEventVersion
	PrepareForRetry     = ce.PrepareForRetry
	PrepareForDLQ       = ce.PrepareForDLQ
	DLQTopic            = ce.DLQTopic
	CopyTracingContext  = ce.CopyTracingContext

	// CloudEvents error types
	ErrRetry                = ce.ErrRetry
	ErrDeadLetter           = ce.ErrDeadLetter
	ErrSkip                 = ce.ErrSkip
	ErrUnprocessable        = ce.ErrUnprocessable
	ErrRetryAfter           = ce.ErrRetryAfter
	ErrDeadLetterWithReason = ce.ErrDeadLetterWithReason
	ClassifyError           = ce.ClassifyError
	IsRetryable             = ce.IsRetryable
	ShouldDeadLetter        = ce.ShouldDeadLetter

	// CloudEvents API
	RegisterCloudEventsHandler = runtimepkg.RegisterCloudEventsHandler

	// Transport capabilities
	GetCapabilities = transportpkg.GetCapabilities

	// Modular transport registry (new package structure)
	// Use RegisterTransport and BuildTransport to work with the modular transport packages.
	// Import individual transports via: _ "github.com/drblury/protoflow/transport/kafka"
	DefaultTransportRegistry = newtransport.DefaultRegistry
	RegisterTransport        = newtransport.Register
	BuildTransport           = newtransport.Build

	// Publish options
	WithSubject         = runtimepkg.WithSubject
	WithDataContentType = runtimepkg.WithDataContentType
	WithDataSchema      = runtimepkg.WithDataSchema
	WithExtension       = runtimepkg.WithExtension
	WithMaxAttempts     = runtimepkg.WithMaxAttempts
	WithTracing         = runtimepkg.WithTracing
	WithCorrelationID   = runtimepkg.WithCorrelationID

	Marshal       = jsoncodec.Marshal
	MarshalIndent = jsoncodec.MarshalIndent
	Unmarshal     = jsoncodec.Unmarshal
	Encode        = jsoncodec.Encode
	Decode        = jsoncodec.Decode

	ErrServiceRequired             = errspkg.ErrServiceRequired
	ErrHandlerRequired             = errspkg.ErrHandlerRequired
	ErrConsumeQueueRequired        = errspkg.ErrConsumeQueueRequired
	ErrHandlerNameRequired         = errspkg.ErrHandlerNameRequired
	ErrConsumeMessageTypeRequired  = errspkg.ErrConsumeMessageTypeRequired
	ErrConsumeMessagePointerNeeded = errspkg.ErrConsumeMessagePointerNeeded
	ErrPublisherRequired           = errspkg.ErrPublisherRequired
	ErrTopicRequired               = errspkg.ErrTopicRequired
	ErrConfigRequired              = errspkg.ErrConfigRequired
	ErrLoggerRequired              = errspkg.ErrLoggerRequired
	ErrEventPayloadRequired        = errspkg.ErrEventPayloadRequired

	NewSlogServiceLogger = loggingpkg.NewSlogServiceLogger

	NewMetadata = metadatapkg.New

	CreateULID = idspkg.CreateULID

	// NewEventID generates a unique event ID using ULID.
	NewEventID = runtimepkg.NewEventID
)

Functions

func MustProtoMessage added in v0.2.3

func MustProtoMessage[T proto.Message]() T

func NewProtoMessage added in v0.2.3

func NewProtoMessage[T proto.Message]() (T, error)

func RegisterJSONHandler

func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error

func RegisterProtoHandler

func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error

Types

type Capabilities added in v0.4.1

type Capabilities = transportpkg.Capabilities

Transport capabilities

type CloudEventsHandlerRegistration added in v0.4.1

type CloudEventsHandlerRegistration = runtimepkg.CloudEventsHandlerRegistration

type Config

type Config = configpkg.Config

type ConfigValidationError added in v0.4.1

type ConfigValidationError = errspkg.ConfigValidationError

type DLQMetrics added in v0.4.1

type DLQMetrics = runtimepkg.DLQMetrics

DLQ metrics

type DLQMetricsSnapshot added in v0.4.1

type DLQMetricsSnapshot = runtimepkg.DLQMetricsSnapshot

type DLQTopicMetrics added in v0.4.1

type DLQTopicMetrics = runtimepkg.DLQTopicMetrics

type EntryLogger added in v0.2.1

type EntryLogger = loggingpkg.EntryLogger

type EntryLoggerAdapter added in v0.2.2

type EntryLoggerAdapter[T any] = loggingpkg.EntryLoggerAdapter[T]

type ErrorCategory added in v0.4.1

type ErrorCategory = runtimepkg.ErrorCategory

type ErrorClassifier added in v0.4.1

type ErrorClassifier = runtimepkg.ErrorClassifier

Error classification

type Event added in v0.4.1

type Event = ce.Event

CloudEvents types

type EventHandler added in v0.4.1

type EventHandler = runtimepkg.EventHandler

type HandlerInfo added in v0.3.3

type HandlerInfo = runtimepkg.HandlerInfo

type HandlerStats added in v0.3.3

type HandlerStats = runtimepkg.HandlerStats

type JSONHandlerRegistration

type JSONHandlerRegistration[T any, O any] = handlerpkg.JSONHandlerRegistration[T, O]

type JSONMessageContext

type JSONMessageContext[T any] = handlerpkg.JSONMessageContext[T]

type JSONMessageHandler

type JSONMessageHandler[T any, O any] = handlerpkg.JSONMessageHandler[T, O]

type JSONMessageOutput

type JSONMessageOutput[T any] = handlerpkg.JSONMessageOutput[T]

type JobContext added in v0.4.1

type JobContext = runtimepkg.JobContext

Job lifecycle hooks

type JobHooks added in v0.4.1

type JobHooks = runtimepkg.JobHooks

type LogFields added in v0.2.0

type LogFields = loggingpkg.LogFields

type MessageContextBase added in v0.4.1

type MessageContextBase = handlerpkg.MessageContextBase

type MessageHandlerRegistration added in v0.2.0

type MessageHandlerRegistration = runtimepkg.MessageHandlerRegistration

type Metadata

type Metadata = metadatapkg.Metadata

func WithDelay added in v0.4.1

func WithDelay(delay time.Duration) Metadata

WithDelay returns a Metadata with the protoflow_delay key set for delayed message processing. This is a convenience wrapper for SQLite and PostgreSQL transports' delayed message feature. Example: protoflow.NewMetadata().Merge(protoflow.WithDelay(30 * time.Second))

type MiddlewareBuilder

type MiddlewareBuilder = runtimepkg.MiddlewareBuilder

type MiddlewareRegistration

type MiddlewareRegistration = runtimepkg.MiddlewareRegistration

type OutboxStore

type OutboxStore = runtimepkg.OutboxStore

type Producer

type Producer = runtimepkg.Producer

type ProtoHandlerOption

type ProtoHandlerOption = handlerpkg.ProtoHandlerOption

type ProtoHandlerRegistration

type ProtoHandlerRegistration[T proto.Message] = handlerpkg.ProtoHandlerRegistration[T]

type ProtoMessageContext

type ProtoMessageContext[T proto.Message] = handlerpkg.ProtoMessageContext[T]

type ProtoMessageHandler

type ProtoMessageHandler[T proto.Message] = handlerpkg.ProtoMessageHandler[T]

type ProtoMessageOutput

type ProtoMessageOutput = handlerpkg.ProtoMessageOutput

type ProtoValidator

type ProtoValidator = runtimepkg.ProtoValidator

type PublishOption added in v0.4.1

type PublishOption = runtimepkg.PublishOption

type RetryMiddlewareConfig

type RetryMiddlewareConfig = runtimepkg.RetryMiddlewareConfig

type Service

type Service = runtimepkg.Service

type ServiceDependencies

type ServiceDependencies = runtimepkg.ServiceDependencies

type ServiceLogger added in v0.2.0

type ServiceLogger = loggingpkg.ServiceLogger

func NewEntryServiceLogger added in v0.2.1

func NewEntryServiceLogger[T EntryLoggerAdapter[T]](entry T) ServiceLogger

type Transport added in v0.3.0

type Transport = transportpkg.Transport

type TransportBuilder added in v0.4.1

type TransportBuilder = newtransport.Builder

Modular transport types (new package structure)

type TransportCapabilities added in v0.4.1

type TransportCapabilities = newtransport.Capabilities

type TransportConfig added in v0.4.1

type TransportConfig = newtransport.Config

type TransportDLQManager added in v0.4.1

type TransportDLQManager = newtransport.DLQManager

type TransportDelayedPub added in v0.4.1

type TransportDelayedPub = newtransport.DelayedPublisher

type TransportFactory added in v0.3.0

type TransportFactory = transportpkg.Factory

type TransportQueueIntrospect added in v0.4.1

type TransportQueueIntrospect = newtransport.QueueIntrospector

type TransportRegistry added in v0.4.1

type TransportRegistry = newtransport.Registry

type UnprocessableEventError

type UnprocessableEventError = runtimepkg.UnprocessableEventError

Directories

Path Synopsis
examples
dlq_metrics command
full command
hooks command
json command
postgres command
Package main demonstrates using the PostgreSQL transport with protoflow.
Package main demonstrates using the PostgreSQL transport with protoflow.
proto command
simple command
sqlite command
internal
runtime
Package runtime provides the core event processing infrastructure for protoflow.
Package runtime provides the core event processing infrastructure for protoflow.
runtime/cloudevents
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow.
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow.
runtime/transport
Package transport provides transport types and interfaces for the internal runtime.
Package transport provides transport types and interfaces for the internal runtime.
Package transport defines the core interfaces and types for protoflow transports.
Package transport defines the core interfaces and types for protoflow transports.
aws
Package aws provides an AWS SNS/SQS transport for protoflow.
Package aws provides an AWS SNS/SQS transport for protoflow.
channel
Package channel provides an in-memory Go channel transport for protoflow.
Package channel provides an in-memory Go channel transport for protoflow.
http
Package http provides an HTTP transport for protoflow.
Package http provides an HTTP transport for protoflow.
io
Package io provides a file-based I/O transport for protoflow.
Package io provides a file-based I/O transport for protoflow.
jetstream
Package jetstream provides a NATS JetStream transport for protoflow.
Package jetstream provides a NATS JetStream transport for protoflow.
kafka
Package kafka provides a Kafka transport for protoflow.
Package kafka provides a Kafka transport for protoflow.
nats
Package nats provides a NATS Core transport for protoflow.
Package nats provides a NATS Core transport for protoflow.
postgres
Package postgres provides a PostgreSQL-based transport for protoflow.
Package postgres provides a PostgreSQL-based transport for protoflow.
rabbitmq
Package rabbitmq provides a RabbitMQ/AMQP transport for protoflow.
Package rabbitmq provides a RabbitMQ/AMQP transport for protoflow.
sqlite
Package sqlite provides a SQLite-based transport for protoflow.
Package sqlite provides a SQLite-based transport for protoflow.
transports
Package transports imports all built-in transports for auto-registration.
Package transports imports all built-in transports for auto-registration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL