高性能 Go 事件总线 — 三预设架构,零分配,零 CAS,零外部依赖 高性能 Go 事件总线 + 消息框架 — 零外部依赖
- 零外部依赖: 纯标准库,
go.mod无任何第三方 require - 三预设架构: Sync(同步直调)/ Async(Per-P SPSC)/ Flow(Pipeline 流处理)
- 零分配 Emit: 全部三预设 0 B/op, 0 allocs/op
- 极致性能: Sync ~10 ns 单线程(96M ops/s),Async ~27 ns 高并发(37M ops/s)
- 零 CAS 热路径: Per-P SPSC ring,atomic Load/Store only(x86 ≈ 普通 MOV)
- 模式匹配: 通配符
*(单层)和**(多层)
- Publisher/Subscriber 接口: Context 感知的发布/订阅契约
- Router 路由器: 消息管道调度中心,三种处理模式
- 顺序处理:单 goroutine 保证有序(默认)
- 并发处理:
Workers(n)多 worker 并行,信号量背压 - 批量处理:
HandleBatch/Batch批量累积触发
- 动态路由:
Route按消息内容路由到不同 topic - 死信队列:
DLQ处理失败消息自动转发 - 路由级重试:
Retry路由器层面兆底重试 - 流控背压:
InFlight限制在途消息数 - 中间件链: 洋葱模型,全局 + Handler 级别叠加
- JSON 序列化: 内置 Codec 接口 + JSON 实现
- 本地/远程统一:
pubsub/local桥接 beat 引擎,适配器扩展到 Redis / Kafka / NATS / HTTP / SQL
cd _benchmarks && go test -bench="." -benchmem -benchtime=3s -count=3 -run="^$" ./...| 场景 | beat (Sync) | beat (Async) | EventBus | ×倍 | gookit/event | ×倍 |
|---|---|---|---|---|---|---|
| 单 handler | 11 ns 0 alloc | 38 ns 0 alloc | 190 ns 0 alloc | 17× | 609 ns 2 alloc | 55× |
| 10 handler | 26 ns 0 alloc | 34 ns 0 alloc | 1663 ns 1 alloc | 64× | 717 ns 2 alloc | 28× |
| 高并发 | 28 ns 0 alloc | 27 ns 0 alloc | 261 ns 0 alloc | 10× | 201 ns 2 alloc | 7× |
| 数据来源 benchmarks_windows_6c12t.txt。 Linux/BSD 多核性能更好 |
go get github.com/uniyakcom/beatimport (
"fmt"
"github.com/uniyakcom/beat"
)
func main() {
beat.On("user.created", func(e *beat.Event) error {
fmt.Printf("User: %s\n", string(e.Data))
return nil
})
beat.Emit(&beat.Event{
Type: "user.created",
Data: []byte("alice"),
})
}bus, _ := beat.ForAsync() // Per-P SPSC 高并发
defer bus.Close()
bus.On("order.**", func(e *beat.Event) error {
fmt.Printf("Order event: %s\n", e.Type)
return nil
})
bus.Emit(&beat.Event{Type: "order.created", Data: []byte(`{"id":123}`)})import (
"context"
"github.com/uniyakcom/beat"
"github.com/uniyakcom/beat/message"
"github.com/uniyakcom/beat/pubsub/local"
"github.com/uniyakcom/beat/router"
)
func main() {
bus, _ := beat.ForSync()
defer bus.Close()
pub := local.NewPublisher(bus)
sub := local.NewSubscriber(bus)
r := router.NewRouter()
// 纯消费
r.On("notify", "order.created", sub, func(msg *message.Message) error {
fmt.Printf("订单: %s\n", string(msg.Payload))
return nil
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go r.Run(ctx)
<-r.Running()
pub.Publish(context.Background(), "order.created",
message.New("", []byte(`{"id":123}`)))
}| 预设 | 适用场景 | 单线程延迟 | 高并发吞吐 | error 返回 | 生命周期 |
|---|---|---|---|---|---|
| Sync | RPC 钩子、权限校验 | 10 ns | ~36 ns | ✅ | 无需 Close |
| Async | 事件总线、日志聚合 | 41 ns | 27 ns | ❌ | 需 Close |
| Flow | ETL 流处理、批量加载 | 47 ns | — | ❌ | 需 Close |
// 包级 API(Sync 语义)
beat.On("event", handler)
beat.Emit(event)
// 三核心
bus, _ := beat.ForSync() // 同步直调
bus, _ := beat.ForAsync() // Per-P SPSC
bus, _ := beat.ForFlow() // Pipeline
// 自动检测
bus, _ := beat.New() // ≥4 核 → Async,<4 核 → Sync
// 字符串配置
bus, _ := beat.Scenario("async")
// 完全控制
bus, _ := beat.Option(&beat.Profile{Name: "async", Conc: 10000, TPS: 50000})适配器均是简单示例,尚未优化,请勿用于生产环境。
| 模块 | 传输层 | 特点 |
|---|---|---|
beat-redis |
Redis Pub/Sub + Streams | Pipeline 批量、DirectFields 零封装、Consumer Group |
beat-kafka |
Apache Kafka (sarama) | Partition Key、Header 传播、SyncProducer |
beat-nats |
NATS Core + JetStream | 持久化、消费者组、Header 传播 |
beat-http |
HTTP Webhook | 零依赖 net/http、Subscriber 内置服务端 |
beat-sql |
SQL Outbox | 事务原子写入(PublishInTx)、多数据库支持 |
type Message struct {
UUID string // 消息唯一标识(自动生成 UUID v4)
Key string // 分区/路由键(Kafka partition、Redis key)
Metadata Metadata // 元数据 map[string]string
Payload []byte // 消息体
Timestamp time.Time // 创建时间戳(自动设置)
}
// 构造
msg := message.New("", payload) // 通用(含 Ack/Nack channel)
msg := message.NewPub("", payload) // 发布专用(无 Ack,省分配)
msg.Key = "user-123" // 设置路由键// Publisher 发布者接口
type Publisher interface {
Publish(ctx context.Context, topic string, messages ...*Message) error
Close() error
}
// Subscriber 订阅者接口
type Subscriber interface {
Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
Close() error
}Router 是消息框架的调度中心:Subscriber → 中间件链 → Handler → Publisher。
r := router.NewRouter()
// 纯消费(不产出)
r.On("notify", "order.created", sub, func(msg *message.Message) error {
sendEmail(msg.Payload)
return nil
})
// 完整管道:subscribe → process → publish
r.Handle(
"transform",
"raw.data", inSub,
"processed.data", outPub,
func(msg *message.Message) ([]*message.Message, error) {
result := transform(msg.Payload)
return []*message.Message{message.New("", result)}, nil
},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r.Run(ctx)// 8 个 worker 并行消费,信号量自动背压
r.Handle("fast", "events", sub, "out", pub, handler).
Workers(8)// 累积 100 条或 1s 超时后触发批量处理
r.HandleBatch(
"bulk-insert",
"log.entries", sub,
"", nil, // 无产出
func(msgs []*message.Message) ([]*message.Message, error) {
rows := make([]Row, len(msgs))
for i, m := range msgs {
rows[i] = parseRow(m.Payload)
}
return nil, db.BulkInsert(rows)
},
100, // batchSize
time.Second, // batchTimeout
)
// 便捷写法
r.OnBatch("bulk", "log.entries", sub, func(msgs []*message.Message) error {
return db.BulkInsert(toRows(msgs))
}, 100, time.Second)// 根据消息内容路由到不同 topic
r.Handle("route", "events", sub, "default.out", pub, handler).
Route(func(msg *message.Message) string {
switch msg.Metadata.Get("type") {
case "urgent":
return "priority.queue"
case "bulk":
return "batch.queue"
default:
return "" // 使用默认 publishTopic
}
})dlqPub, _ := beatredis.NewPublisher(redisCfg)
r.Handle("process", "orders", sub, "done", pub, handler).
Retry(3).
DLQ(router.DLQConfig{
Topic: "orders.dlq",
Publisher: dlqPub,
})
// 处理失败 3 次后消息自动发送到 orders.dlq,附带 dlq_reason / dlq_handler / dlq_topic 元数据// 限制最大在途消息数(与并发配合使用)
r.Handle("controlled", "events", sub, "out", pub, handler).
Workers(16).
InFlight(8) // 最多 8 条未确认消息import (
"github.com/uniyakcom/beat/middleware/retry"
"github.com/uniyakcom/beat/middleware/timeout"
"github.com/uniyakcom/beat/middleware/recoverer"
"github.com/uniyakcom/beat/middleware/logging"
"github.com/uniyakcom/beat/middleware/correlation"
)
r := router.NewRouter()
// 全局中间件(洋葱模型:外层先执行)
r.Use(
recoverer.New(), // panic → error 恢复
correlation.New(), // correlation_id 传播
logging.New(slog.Default()), // slog 处理日志
timeout.New(5 * time.Second), // 消息处理超时
retry.New(retry.Config{MaxRetries: 3}), // 指数退避重试
)
// Handler 专属中间件(在全局之后执行)
handler.AddMiddleware(customMiddleware)注意: 中间件仅对单条处理器(HandlerFunc)生效。批量处理器(BatchFunc)不经过中间件链。
import "github.com/uniyakcom/beat/marshal"
m := marshal.JSON{}
data, _ := m.Marshal("topic", msg) // Message → []byte
restored, _ := m.Unmarshal("topic", data) // []byte → Message所有适配器共享相同的 message.Publisher / message.Subscriber 接口,通过 Router 统一调度。
import beatredis "github.com/uniyakcom/beat-redis"
pub, _ := beatredis.NewPublisher(beatredis.PublisherConfig{
Client: rdb,
UseStreams: true,
DirectFields: true, // 零封装直存字段
EnablePipeline: true, // 批量 Pipeline
})
sub, _ := beatredis.NewSubscriber(beatredis.SubscriberConfig{
Client: rdb,
ConsumerGroup: "my-service",
})import beatkafka "github.com/uniyakcom/beat-kafka"
pub, _ := beatkafka.NewPublisher(beatkafka.PublisherConfig{
Brokers: []string{"localhost:9092"},
})
// msg.Key 自动映射为 Kafka partition key
// msg.Metadata 传播为 Kafka headersimport beatnats "github.com/uniyakcom/beat-nats"
pub, _ := beatnats.NewPublisher(beatnats.PublisherConfig{
Conn: nc,
JetStream: js, // 可选:启用持久化
})import beathttp "github.com/uniyakcom/beat-http"
pub, _ := beathttp.NewPublisher(beathttp.PublisherConfig{
EndpointURL: "https://api.example.com/webhooks",
})
// POST {url}/{topic},Header 传播 metadataimport beatsql "github.com/uniyakcom/beat-sql"
pub, _ := beatsql.NewPublisher(beatsql.PublisherConfig{DB: db})
// 事务原子写入
tx, _ := db.Begin()
tx.Exec("INSERT INTO orders ...")
pub.PublishInTx(tx, "order.created", msg)
tx.Commit()| 实现 | 核心技术 | 适用场景 |
|---|---|---|
| Sync | 同步直调 + CoW atomic.Pointer | RPC 中间件、权限验证 |
| Async | Per-P SPSC ring + RCU | 事件总线、日志聚合 |
| Flow | Pipeline + 批处理窗口 | ETL、窗口聚合 |
Async 架构要点:
- Per-P SPSC Ring:procPin 保证单写者,零 CAS
- Worker 亲和性:worker[i] 静态拥有 rings {i, i+w, i+2w, ...}
- 三级自适应空转:PAUSE spin → Gosched → channel park
Subscriber ──────── Router ──────── Publisher
│ │ │
Subscribe(ctx) 中间件链(洋葱) Publish(ctx)
│ │ │
<-chan *Message HandlerFunc 动态路由(topicFunc)
│
┌──────┼──────┐
顺序 并发 批量
(1 worker) (N sem) (accumulate)
│
DLQ (失败)
handler := r.Handle(name, subTopic, sub, pubTopic, pub, fn)
handler.
Workers(8). // 8 worker 并行
InFlight(4). // 最多 4 条在途
Route(routeFn). // 动态路由
Retry(3). // 路由级重试
DLQ(dlqCfg). // 死信队列
AddMiddleware(myMiddleware) // Handler 专属中间件beat/
├── core/ # 核心接口(Bus / Event / Handler)
│ ├── interfaces.go
│ └── matcher.go # TrieMatcher 通配符匹配
├── message/ # 消息框架核心类型
│ ├── message.go # Message(UUID / Key / Timestamp / Ack / Nack)
│ ├── metadata.go # map[string]string 元数据
│ ├── publisher.go # Publisher 接口(ctx 感知)
│ ├── subscriber.go # Subscriber 接口
│ └── uuid.go # UUID v4 + FastUUID(零依赖)
├── router/ # 消息路由器
│ ├── router.go # 调度中心(顺序/并发/批量循环、DLQ、动态路由)
│ ├── handler.go # Handler 配置(并发/批量/DLQ/流控)
│ ├── middleware.go # Middleware 类型
│ └── plugin.go # Plugin 生命周期钩子
├── pubsub/local/ # beat Bus ↔ Publisher/Subscriber 桥接
├── middleware/ # 内置中间件
│ ├── retry/ # 指数退避重试
│ ├── timeout/ # 消息处理超时
│ ├── recoverer/ # panic → error 恢复
│ ├── logging/ # slog 日志
│ └── correlation/ # correlation_id 传播
├── marshal/ # 序列化(Codec 接口 + JSON)
├── optimize/ # Profile → Advisor → Factory
├── internal/impl/ # 三预设实现(sync / async / flow)
├── internal/support/ # SPSC ring、对象池等基础设施
├── util/ # PerCPUCounter 等工具
└── api.go # 统一 API 入口
- 需要 error 返回 → Sync
- 高并发 fire-and-forget → Async
- 批量数据处理 → Flow
- Async/Flow 必须
defer bus.Close() - handler 保持轻量,避免阻塞 I/O
- ctx 传播: Publisher.Publish 第一个参数是
context.Context,Router 自动使用msg.Context()传播超时 - Message.Key: Kafka partition、Redis routing 等分区键,适配器自动映射
- 并发 vs 有序: 默认顺序处理保证有序;
Workers(n)牺牲有序换吞吐 - 批量入库: 高频写入用
HandleBatch,减少数据库往返 - DLQ 兆底: 生产环境务必配置
DLQ,避免毒消息阻塞队列 - 中间件顺序: recoverer 放最外层(捕获所有 panic),retry 放最内层(仅重试业务逻辑)
| 文件 | 类型 | 说明 |
|---|---|---|
scenario_test.go |
功能 | ForSync/ForAsync/ForFlow/Scenario 全场景 |
feature_error_test.go |
功能 | 单/多 handler 错误、批量错误 |
feature_concurrent_test.go |
功能 | On/Off/Emit 竞态、嵌套订阅、并发 Close |
edge_cases_test.go |
边界 | 零 handler、大数据、特殊字符 |
router/router_test.go |
功能 | Router On/Handle/中间件/panic 恢复 |
pubsub/local/local_test.go |
功能 | 本地 Pub/Sub 收发、Context 取消 |
marshal/marshal_test.go |
功能 | JSON 序列化往返 |
middleware/middleware_test.go |
功能 | 中间件组合测试 |
impl_bench_test.go |
基准 | 核心性能回归(三预设、Arena、EventPool) |
test/stress_test.go |
压力 | 1000 goroutine · 10s 长运行 |
go build ./... # 编译
go vet ./... # 静态分析
go test ./... -count=1 # 功能测试
go test -race ./... -short # 竞态检测
gofmt -s -w ./ # 格式化整个项目# 关键回归指标
go test -bench="BenchmarkAllImpls" -benchtime=1s -count=1 -run=^$
# Redis 对比 Watermill
cd ../beat-redis/_benchmarks
go test -bench="." -benchmem -benchtime=3s -count=3 -run="^$" ./...关键指标:
| 指标 | Windows 11 (6C/12T) | Linux (1C/2T) | Linux (2C/4T) |
|---|---|---|---|
| Sync 单线程 | 10.4 ns/op | 9.4 ns/op | 12.4 ns/op |
| Async 高并发 | 27 ns/op | 30 ns/op | 69 ns/op |
| Flow 单线程 | 47 ns/op | 53 ns/op | 58 ns/op |
| 分配 | 0 allocs/op | 0 allocs/op | 0 allocs/op |
| 测试数据 | benchmarks_windows_6c12t.txt | benchmarks_linux_1c2t_2vc.txt | benchmarks_linux_2c4t_4vc.txt |
MIT License