Documentation
¶
Overview ¶
Package protoflow is a small layer on top of Watermill that wires routers, publishers, subscribers, and middleware for protobuf- or JSON-driven services. It reads the target transport (Kafka, RabbitMQ, AWS SNS/SQS, NATS, HTTP, I/O, SQLite, PostgreSQL, or Go Channels) from Config, bootstraps the Watermill router, and registers the default middleware chain for correlation IDs, logging, validation, outbox persistence, tracing, retries, and poison queue forwarding.
Service hosts the router and exposes typed helpers: RegisterProtoHandler and RegisterJSONHandler take care of marshaling, metadata cloning, and optional protobuf validation, while Service.PublishProto lets HTTP/RPC handlers emit events without touching low-level Watermill APIs. A minimal setup therefore involves filling Config, creating a Service, registering handlers, and calling Start; see README.md for a copy/paste quick start snippet.
Transports ¶
Protoflow supports 9 message transports out of the box:
- channel: In-memory Go channels for testing
- kafka: High-throughput streaming with consumer groups
- rabbitmq: AMQP-based durable queues
- aws: AWS SNS/SQS with LocalStack support
- nats: High-performance messaging
- http: Request/response messaging
- io: File-based persistence
- sqlite: Embedded persistent queue with delayed messages and DLQ management
- postgres: Production-ready PostgreSQL queue with SKIP LOCKED and DLQ
Middleware ¶
The default middleware chain includes correlation ID injection, structured logging, protobuf validation, outbox persistence, OpenTelemetry tracing, Prometheus metrics, retry with exponential backoff, poison queue forwarding, and panic recovery. Custom middleware can be added via ServiceDependencies.Middlewares.
Job Hooks ¶
JobHooksMiddleware provides OnJobStart, OnJobDone, and OnJobError callbacks for custom logging, metrics collection, and alerting around handler execution.
When you need more control, ServiceDependencies exposes well-scoped hooks: bring your own OutboxStore, ProtoValidator, middleware registrations, or even an entire TransportFactory to plug in custom brokers. The README organises these knobs by topic so you can dive into the exact setting you want to adjust without rereading the whole guide.
Index ¶
- Constants
- Variables
- func MustProtoMessage[T proto.Message]() T
- func NewProtoMessage[T proto.Message]() (T, error)
- func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error
- func RegisterProtoHandler[T proto.Message](svc *Service, cfg ProtoHandlerRegistration[T]) error
- type Capabilities
- type CloudEventsHandlerRegistration
- type Config
- type ConfigValidationError
- type DLQMetrics
- type DLQMetricsSnapshot
- type DLQTopicMetrics
- type EntryLogger
- type EntryLoggerAdapter
- type ErrorCategory
- type ErrorClassifier
- type Event
- type EventHandler
- type HandlerInfo
- type HandlerStats
- type JSONHandlerRegistration
- type JSONMessageContext
- type JSONMessageHandler
- type JSONMessageOutput
- type JobContext
- type JobHooks
- type LogFields
- type MessageContextBase
- type MessageHandlerRegistration
- type Metadata
- type MiddlewareBuilder
- type MiddlewareRegistration
- type OutboxStore
- type Producer
- type ProtoHandlerOption
- type ProtoHandlerRegistration
- type ProtoMessageContext
- type ProtoMessageHandler
- type ProtoMessageOutput
- type ProtoValidator
- type PublishOption
- type RetryMiddlewareConfig
- type Service
- type ServiceDependencies
- type ServiceLogger
- type Transport
- type TransportBuilder
- type TransportCapabilities
- type TransportConfig
- type TransportDLQManager
- type TransportDelayedPub
- type TransportFactory
- type TransportQueueIntrospect
- type TransportRegistry
- type UnprocessableEventError
Constants ¶
const ( MetadataKeyCorrelationID = handlerpkg.MetadataKeyCorrelationID MetadataKeyEventSchema = handlerpkg.MetadataKeyEventSchema MetadataKeyQueueDepth = handlerpkg.MetadataKeyQueueDepth MetadataKeyEnqueuedAt = handlerpkg.MetadataKeyEnqueuedAt MetadataKeyTraceID = handlerpkg.MetadataKeyTraceID MetadataKeySpanID = handlerpkg.MetadataKeySpanID // MetadataKeyDelay is used by SQLite and PostgreSQL transports for delayed message processing. // Set to a duration string like "30s", "5m", "1h". MetadataKeyDelay = "protoflow_delay" )
Metadata keys - use these constants for standard metadata fields.
const ( // ExtAttempt is the current retry attempt number (1-based). ExtAttempt = ce.ExtAttempt // ExtMaxAttempts is the maximum number of retry attempts allowed. ExtMaxAttempts = ce.ExtMaxAttempts // ExtNextAttemptAt is the RFC3339 timestamp for the next retry. ExtNextAttemptAt = ce.ExtNextAttemptAt // ExtDeadLetter indicates the event has been moved to DLQ. ExtDeadLetter = ce.ExtDeadLetter // ExtTraceID is the distributed trace ID (W3C traceparent compatible). ExtTraceID = ce.ExtTraceID // ExtParentID is the parent span ID for trace correlation. ExtParentID = ce.ExtParentID // ExtDelayMs is the delay in milliseconds before processing. ExtDelayMs = ce.ExtDelayMs // ExtEventVersion is an optional version number for the event schema. ExtEventVersion = ce.ExtEventVersion // ExtOriginalTopic stores the original topic when moved to DLQ. ExtOriginalTopic = ce.ExtOriginalTopic // ExtErrorMessage stores the last error message when moved to DLQ. ExtErrorMessage = ce.ExtErrorMessage // ExtCorrelationID is a correlation identifier for request tracing. ExtCorrelationID = ce.ExtCorrelationID )
CloudEvents extension keys for protoflow reliability semantics.
const ( ErrorCategoryNone = runtimepkg.ErrorCategoryNone ErrorCategoryValidation = runtimepkg.ErrorCategoryValidation ErrorCategoryTransport = runtimepkg.ErrorCategoryTransport ErrorCategoryDownstream = runtimepkg.ErrorCategoryDownstream ErrorCategoryOther = runtimepkg.ErrorCategoryOther )
Error category constants for ErrorClassifier.
Variables ¶
var ( NewService = runtimepkg.NewService TryNewService = runtimepkg.TryNewService ValidateConfig = configpkg.ValidateConfig RegisterMessageHandler = runtimepkg.RegisterMessageHandler WithPublishMessageTypes = handlerpkg.WithPublishMessageTypes DefaultMiddlewares = runtimepkg.DefaultMiddlewares CorrelationIDMiddleware = runtimepkg.CorrelationIDMiddleware LogMessagesMiddleware = runtimepkg.LogMessagesMiddleware ProtoValidateMiddleware = runtimepkg.ProtoValidateMiddleware OutboxMiddleware = runtimepkg.OutboxMiddleware TracerMiddleware = runtimepkg.TracerMiddleware MetricsMiddleware = runtimepkg.MetricsMiddleware RetryMiddleware = runtimepkg.RetryMiddleware PoisonQueueMiddleware = runtimepkg.PoisonQueueMiddleware RecovererMiddleware = runtimepkg.RecovererMiddleware // Job lifecycle hooks JobHooksMiddleware = runtimepkg.JobHooksMiddleware LoggingHooks = runtimepkg.LoggingHooks MetricsHooks = runtimepkg.MetricsHooks AlertingHooks = runtimepkg.AlertingHooks // DLQ metrics NewDLQMetrics = runtimepkg.NewDLQMetrics // CloudEvents constructors and helpers NewCloudEvent = ce.New NewCloudEventWithID = ce.NewWithID // CloudEvents extension helpers GetAttempt = ce.GetAttempt SetAttempt = ce.SetAttempt GetMaxAttempts = ce.GetMaxAttempts SetMaxAttempts = ce.SetMaxAttempts IncrementAttempt = ce.IncrementAttempt ExceedsMaxAttempts = ce.ExceedsMaxAttempts GetNextAttemptAt = ce.GetNextAttemptAt SetNextAttemptAt = ce.SetNextAttemptAt SetNextAttemptAfter = ce.SetNextAttemptAfter IsDeadLetter = ce.IsDeadLetter SetDeadLetter = ce.SetDeadLetter GetOriginalTopic = ce.GetOriginalTopic SetOriginalTopic = ce.SetOriginalTopic GetErrorMessage = ce.GetErrorMessage SetErrorMessage = ce.SetErrorMessage GetTraceID = ce.GetTraceID SetTraceID = ce.SetTraceID GetParentID = ce.GetParentID SetParentID = ce.SetParentID GetCorrelationID = ce.GetCorrelationID SetCorrelationID = ce.SetCorrelationID GetDelayMs = ce.GetDelayMs SetDelayMs = ce.SetDelayMs GetDelay = ce.GetDelay SetDelay = ce.SetDelay GetEventVersion = ce.GetEventVersion SetEventVersion = ce.SetEventVersion PrepareForRetry = ce.PrepareForRetry PrepareForDLQ = ce.PrepareForDLQ DLQTopic = ce.DLQTopic CopyTracingContext = ce.CopyTracingContext // CloudEvents error types ErrRetry = ce.ErrRetry ErrDeadLetter = ce.ErrDeadLetter ErrSkip = ce.ErrSkip ErrUnprocessable = ce.ErrUnprocessable ErrRetryAfter = ce.ErrRetryAfter ErrDeadLetterWithReason = ce.ErrDeadLetterWithReason ClassifyError = ce.ClassifyError IsRetryable = ce.IsRetryable ShouldDeadLetter = ce.ShouldDeadLetter // CloudEvents API RegisterCloudEventsHandler = runtimepkg.RegisterCloudEventsHandler // Transport capabilities GetCapabilities = transportpkg.GetCapabilities // Modular transport registry (new package structure) // Use RegisterTransport and BuildTransport to work with the modular transport packages. // Import individual transports via: _ "github.com/drblury/protoflow/transport/kafka" DefaultTransportRegistry = newtransport.DefaultRegistry RegisterTransport = newtransport.Register BuildTransport = newtransport.Build // Publish options WithSubject = runtimepkg.WithSubject WithDataContentType = runtimepkg.WithDataContentType WithDataSchema = runtimepkg.WithDataSchema WithExtension = runtimepkg.WithExtension WithMaxAttempts = runtimepkg.WithMaxAttempts WithTracing = runtimepkg.WithTracing WithCorrelationID = runtimepkg.WithCorrelationID Marshal = jsoncodec.Marshal MarshalIndent = jsoncodec.MarshalIndent Unmarshal = jsoncodec.Unmarshal Encode = jsoncodec.Encode Decode = jsoncodec.Decode ErrServiceRequired = errspkg.ErrServiceRequired ErrHandlerRequired = errspkg.ErrHandlerRequired ErrConsumeQueueRequired = errspkg.ErrConsumeQueueRequired ErrHandlerNameRequired = errspkg.ErrHandlerNameRequired ErrConsumeMessageTypeRequired = errspkg.ErrConsumeMessageTypeRequired ErrConsumeMessagePointerNeeded = errspkg.ErrConsumeMessagePointerNeeded ErrPublisherRequired = errspkg.ErrPublisherRequired ErrTopicRequired = errspkg.ErrTopicRequired ErrConfigRequired = errspkg.ErrConfigRequired ErrLoggerRequired = errspkg.ErrLoggerRequired ErrEventPayloadRequired = errspkg.ErrEventPayloadRequired NewSlogServiceLogger = loggingpkg.NewSlogServiceLogger NewMetadata = metadatapkg.New CreateULID = idspkg.CreateULID // NewEventID generates a unique event ID using ULID. NewEventID = runtimepkg.NewEventID )
Functions ¶
func MustProtoMessage ¶ added in v0.2.3
func NewProtoMessage ¶ added in v0.2.3
func RegisterJSONHandler ¶
func RegisterJSONHandler[T any, O any](svc *Service, cfg JSONHandlerRegistration[T, O]) error
Types ¶
type Capabilities ¶ added in v0.4.1
type Capabilities = transportpkg.Capabilities
Transport capabilities
type CloudEventsHandlerRegistration ¶ added in v0.4.1
type CloudEventsHandlerRegistration = runtimepkg.CloudEventsHandlerRegistration
type ConfigValidationError ¶ added in v0.4.1
type ConfigValidationError = errspkg.ConfigValidationError
type DLQMetricsSnapshot ¶ added in v0.4.1
type DLQMetricsSnapshot = runtimepkg.DLQMetricsSnapshot
type DLQTopicMetrics ¶ added in v0.4.1
type DLQTopicMetrics = runtimepkg.DLQTopicMetrics
type EntryLogger ¶ added in v0.2.1
type EntryLogger = loggingpkg.EntryLogger
type EntryLoggerAdapter ¶ added in v0.2.2
type EntryLoggerAdapter[T any] = loggingpkg.EntryLoggerAdapter[T]
type ErrorCategory ¶ added in v0.4.1
type ErrorCategory = runtimepkg.ErrorCategory
type ErrorClassifier ¶ added in v0.4.1
type ErrorClassifier = runtimepkg.ErrorClassifier
Error classification
type EventHandler ¶ added in v0.4.1
type EventHandler = runtimepkg.EventHandler
type HandlerInfo ¶ added in v0.3.3
type HandlerInfo = runtimepkg.HandlerInfo
type HandlerStats ¶ added in v0.3.3
type HandlerStats = runtimepkg.HandlerStats
type JSONHandlerRegistration ¶
type JSONHandlerRegistration[T any, O any] = handlerpkg.JSONHandlerRegistration[T, O]
type JSONMessageContext ¶
type JSONMessageContext[T any] = handlerpkg.JSONMessageContext[T]
type JSONMessageHandler ¶
type JSONMessageHandler[T any, O any] = handlerpkg.JSONMessageHandler[T, O]
type JSONMessageOutput ¶
type JSONMessageOutput[T any] = handlerpkg.JSONMessageOutput[T]
type JobHooks ¶ added in v0.4.1
type JobHooks = runtimepkg.JobHooks
type LogFields ¶ added in v0.2.0
type LogFields = loggingpkg.LogFields
type MessageContextBase ¶ added in v0.4.1
type MessageContextBase = handlerpkg.MessageContextBase
type MessageHandlerRegistration ¶ added in v0.2.0
type MessageHandlerRegistration = runtimepkg.MessageHandlerRegistration
type Metadata ¶
type Metadata = metadatapkg.Metadata
type MiddlewareBuilder ¶
type MiddlewareBuilder = runtimepkg.MiddlewareBuilder
type MiddlewareRegistration ¶
type MiddlewareRegistration = runtimepkg.MiddlewareRegistration
type OutboxStore ¶
type OutboxStore = runtimepkg.OutboxStore
type Producer ¶
type Producer = runtimepkg.Producer
type ProtoHandlerOption ¶
type ProtoHandlerOption = handlerpkg.ProtoHandlerOption
type ProtoHandlerRegistration ¶
type ProtoHandlerRegistration[T proto.Message] = handlerpkg.ProtoHandlerRegistration[T]
type ProtoMessageContext ¶
type ProtoMessageContext[T proto.Message] = handlerpkg.ProtoMessageContext[T]
type ProtoMessageHandler ¶
type ProtoMessageHandler[T proto.Message] = handlerpkg.ProtoMessageHandler[T]
type ProtoMessageOutput ¶
type ProtoMessageOutput = handlerpkg.ProtoMessageOutput
type ProtoValidator ¶
type ProtoValidator = runtimepkg.ProtoValidator
type PublishOption ¶ added in v0.4.1
type PublishOption = runtimepkg.PublishOption
type RetryMiddlewareConfig ¶
type RetryMiddlewareConfig = runtimepkg.RetryMiddlewareConfig
type Service ¶
type Service = runtimepkg.Service
type ServiceDependencies ¶
type ServiceDependencies = runtimepkg.ServiceDependencies
type ServiceLogger ¶ added in v0.2.0
type ServiceLogger = loggingpkg.ServiceLogger
func NewEntryServiceLogger ¶ added in v0.2.1
func NewEntryServiceLogger[T EntryLoggerAdapter[T]](entry T) ServiceLogger
type Transport ¶ added in v0.3.0
type Transport = transportpkg.Transport
type TransportBuilder ¶ added in v0.4.1
type TransportBuilder = newtransport.Builder
Modular transport types (new package structure)
type TransportCapabilities ¶ added in v0.4.1
type TransportCapabilities = newtransport.Capabilities
type TransportConfig ¶ added in v0.4.1
type TransportConfig = newtransport.Config
type TransportDLQManager ¶ added in v0.4.1
type TransportDLQManager = newtransport.DLQManager
type TransportDelayedPub ¶ added in v0.4.1
type TransportDelayedPub = newtransport.DelayedPublisher
type TransportFactory ¶ added in v0.3.0
type TransportFactory = transportpkg.Factory
type TransportQueueIntrospect ¶ added in v0.4.1
type TransportQueueIntrospect = newtransport.QueueIntrospector
type TransportRegistry ¶ added in v0.4.1
type TransportRegistry = newtransport.Registry
type UnprocessableEventError ¶
type UnprocessableEventError = runtimepkg.UnprocessableEventError
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
dlq_metrics
command
|
|
|
full
command
|
|
|
hooks
command
|
|
|
json
command
|
|
|
nats-cloudevents-delayed
command
|
|
|
postgres
command
Package main demonstrates using the PostgreSQL transport with protoflow.
|
Package main demonstrates using the PostgreSQL transport with protoflow. |
|
proto
command
|
|
|
simple
command
|
|
|
sqlite
command
|
|
|
internal
|
|
|
runtime
Package runtime provides the core event processing infrastructure for protoflow.
|
Package runtime provides the core event processing infrastructure for protoflow. |
|
runtime/cloudevents
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow.
|
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow. |
|
runtime/transport
Package transport provides transport types and interfaces for the internal runtime.
|
Package transport provides transport types and interfaces for the internal runtime. |
|
Package transport defines the core interfaces and types for protoflow transports.
|
Package transport defines the core interfaces and types for protoflow transports. |
|
aws
Package aws provides an AWS SNS/SQS transport for protoflow.
|
Package aws provides an AWS SNS/SQS transport for protoflow. |
|
channel
Package channel provides an in-memory Go channel transport for protoflow.
|
Package channel provides an in-memory Go channel transport for protoflow. |
|
http
Package http provides an HTTP transport for protoflow.
|
Package http provides an HTTP transport for protoflow. |
|
io
Package io provides a file-based I/O transport for protoflow.
|
Package io provides a file-based I/O transport for protoflow. |
|
jetstream
Package jetstream provides a NATS JetStream transport for protoflow.
|
Package jetstream provides a NATS JetStream transport for protoflow. |
|
kafka
Package kafka provides a Kafka transport for protoflow.
|
Package kafka provides a Kafka transport for protoflow. |
|
nats
Package nats provides a NATS Core transport for protoflow.
|
Package nats provides a NATS Core transport for protoflow. |
|
postgres
Package postgres provides a PostgreSQL-based transport for protoflow.
|
Package postgres provides a PostgreSQL-based transport for protoflow. |
|
rabbitmq
Package rabbitmq provides a RabbitMQ/AMQP transport for protoflow.
|
Package rabbitmq provides a RabbitMQ/AMQP transport for protoflow. |
|
sqlite
Package sqlite provides a SQLite-based transport for protoflow.
|
Package sqlite provides a SQLite-based transport for protoflow. |
|
transports
Package transports imports all built-in transports for auto-registration.
|
Package transports imports all built-in transports for auto-registration. |