Message Pipe 是一款基于 Redis 实现的分布式顺序消息管道框架。它利用 Redisson 的分布式锁特性确保了线程安全,使得在多线程环境下也能保证消息严格按照写入管道的顺序被消费。
该项目采用了经典的 Client-Server 架构设计:
- Server 端:负责消息的接收、存储、分发以及管道的管理。
- Client 端:负责注册到 Server,并接收 Server 分发的任务进行业务逻辑处理。
两者之间通过 gRPC(基于 Netty)建立长连接进行通信,保证了高效的数据传输。Server 端在分发消息时,会采用负载均衡策略从在线的 Client 列表中选择合适的目标进行顺序发送。
Message Pipe 的核心架构围绕着“管道(Pipe)”这一概念展开。每一个业务场景可以对应一个或多个管道,消息被写入特定的管道中。
- 存储层:使用
Redis的 List 数据结构作为底层消息队列,结合Redisson实现分布式锁,确保并发读写的安全性。 - 通信层:使用
gRPC定义服务接口(Protobuf),实现 Server 与 Client 之间的高性能通信,包括客户端注册、心跳维持、消息推送等。 - 调度层:Server 端维护着多个管道的调度线程,负责从 Redis 中拉取消息并推送到 Client。
Message Pipe 保证:✅ 消息严格有序处理 ✅ 自动重试和死信队列 ✅ 分布式高可用
┌─────────────────────────────────────────────────────────────┐
│ │
│ ┌──────────────┐ gRPC ┌──────────────┐ │
│ │ Client 1 │◄──────注册+心跳──────►│ │ │
│ │ (Port:5201) │ │ Server │ │
│ │ │◄────消息批处理────────┤ (Port:5200) │ │
│ └──────────────┘ │ │ │
│ │ │ │
│ ┌──────────────┐ gRPC │ 消息分配器 │ │
│ │ Client 2 │◄──────注册+心跳──────►│ │ │
│ │ (Port:5202) │ │ 服务发现 │ │
│ │ │◄────消息批处理────────┤ │ │
│ └──────────────┘ │ Redis │ │
│ │ (Redisson) │ │
│ ┌──────────────┐ gRPC │ │ │
│ │ Client N │◄──────注册+心跳──────►│ 分布式锁 │ │
│ │ (Port:520x) │ │ │ │
│ │ │◄────消息批处理────────│ 重试机制 │ │
│ └──────────────┘ │ 死信队列 │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
message-pipe/
├── message-pipe-core/ # 核心共享模块
│ ├── proto/ # gRPC 协议定义
│ │ ├── client-service.proto # 客户端注册服务
│ │ └── message-service.proto # 消息处理服务
│ ├── domain/ # 数据模型
│ └── util/ # 工具类
│
├── message-pipe-server/ # 服务端(消息分配)
│ ├── MessagePipe.java # 消息管道核心类
│ ├── MessagePipeManager.java # 管道工厂和管理
│ ├── MessagePipeScheduler.java # 消息调度器
│ ├── MessagePipeDistributor.java # 消息分配器
│ ├── ClientServiceDiscovery.java # 服务发现
│ ├── MessageDeadLetterQueue.java # 死信队列
│ └── MessageRetryRecord.java # 重试记录
│
├── message-pipe-client/ # 客户端(消息消费)
│ ├── MessagePipeClientRunner.java # 客户端启动器
│ ├── ReceiveMessageService.java # 消息接收服务
│ ├── MessageProcessor.java # 消息处理器接口
│ └── MessageProcessorManager.java# 处理器管理
│
├── message-pipe-spring-context/ # Spring 集成
│ ├── @EnableMessagePipeServer # 启用服务端
│ ├── @EnableMessagePipeClient # 启用客户端
│ └── 配置自动装配 # 自动配置
│
└── pom.xml # Maven 配置
特点:
- 消息严格按写入顺序处理,不允许并发
- 基于 Redis 分布式锁 + 单线程消费
- 支持批量消息处理(默认 200 条/批)
工作原理:
消息队列: [msg1] → [msg2] → [msg3] → [msg4] → [msg5]
↓
获取分布式锁
↓
取出批量消息(200条)
↓
顺序发送到客户端处理
↓
仅在全部处理成功后删除消息
代码示例:
// 生产者 - 添加消息
MessagePipe pipe = messagePipeManager.getMessagePipe("order-processing");
Message message = new Message("order-123".getBytes());
pipe.putLast(message); // 线程安全地添加消息
// 客户端 - 处理消息
@Component
public class OrderProcessor implements MessageProcessor {
@Override
public String bindingPipeName() {
return "order-processing"; // 绑定到指定管道
}
@Override
public boolean processing(String pipeName, String requestId, Message message) {
// 顺序处理订单
String orderId = new String(message.getBody());
processOrder(orderId);
return true; // 返回 true 表示成功
}
}特点:
- 指数退避重试:1s → 2s → 4s → 8s → 16s
- 默认最多重试 5 次
- 失败信息存储在 Redis,支持查询和手动处理
重试流程:
消息处理失败
↓
创建重试记录 (MessageRetryRecord)
↓
计算延迟: delay = 1000ms × 2^retryCount
↓
等待后重新尝试
↓
达到最大重试次数?
├─ 否 → 继续重试
└─ 是 → 移入死信队列
配置示例:
@Bean
public MessagePipeConfiguration messagePipeConfiguration() {
return MessagePipeConfiguration.defaultConfiguration()
// 重试记录在 Redis 中保留 30 天
.setDlqMessageExpireSeconds(30 * 24 * 60 * 60)
// 其他配置...
;
}特点:
- 重试失败的消息自动进入 DLQ
- 消息保留 30 天,便于追踪和恢复
- 包含完整的失败上下文:失败原因、重试次数、时间戳
数据模型:
class DeadLetterRecord {
String messageId; // 消息 ID
byte[] messageBody; // 原始消息内容
String failureReason; // 失败原因
int retryCount; // 重试次数
long expireTime; // 过期时间
LocalDateTime createTime; // 创建时间
}查询和恢复:
// 查询死信队列中的消息
List<DeadLetterRecord> records =
deadLetterQueue.listMessages("order-processing");
// 手动恢复消息到主队列
deadLetterQueue.recoverMessage("order-processing", messageId);
// 清空死信队列
deadLetterQueue.clear("order-processing");特点:
- 客户端自动心跳保活(10 秒间隔)
- 离线客户端自动检测和隔离
- 服务端故障自动转移
客户端生命周期:
启动
↓
注册到服务器 → 发送心跳信号
↓ ↓
处理消息 ← ─ ─ ─ ─
↓
掉线 → 服务器 10 秒无心跳 → 标记离线
↓
恢复 → 自动重新注册 → 继续处理
配置示例:
@Bean
public ClientConfiguration clientConfiguration() {
return new ClientConfiguration()
.setLocalPort(5201)
.setServerAddress("localhost")
.setServerPort(5200)
.setHeartBeatIntervalSeconds(10) // 心跳间隔
.setRetryRegisterTimes(3); // 注册重试次数
}特点:
- 加权随机分配算法
- 支持多客户端分散处理
- 可插拔的负载均衡策略
分配策略:
客户端 A (权重 1) → 分配 25% 的消息批
客户端 B (权重 2) → 分配 50% 的消息批
客户端 C (权重 1) → 分配 25% 的消息批
总权重 4
自定义策略示例:
@Component
public class CustomLoadBalanceStrategy implements ClientLoadBalanceStrategy {
@Override
public ClientInformation select(String pipeName, List<ClientInformation> clients) {
// 实现自己的负载均衡逻辑
// 例如:轮询、最少连接、一致性哈希等
return clients.get(new Random().nextInt(clients.size()));
}
}特点:
- 支持精确匹配和正则表达式匹配
- 一个管道可以绑定多个客户端
- 一个客户端可以处理多个管道
两种匹配模式:
模式 1:精确匹配
@Component
public class OrderProcessor implements MessageProcessor {
@Override
public String bindingPipeName() {
return "order-processing"; // 精确匹配这个管道名
}
@Override
public ProcessorType processorType() {
return ProcessorType.SPECIFIC;
}
@Override
public boolean processing(String pipeName, String requestId, Message message) {
// 处理 order-processing 管道的消息
return true;
}
}模式 2:正则表达式匹配
@Component
public class RegexProcessor implements MessageProcessor {
@Override
public String bindingPipeName() {
return "order-.*"; // 正则表达式
}
@Override
public ProcessorType processorType() {
return ProcessorType.REGEX;
}
@Override
public boolean processing(String pipeName, String requestId, Message message) {
// 处理所有匹配 order-.* 的管道
System.out.println("Processing pipe: " + pipeName);
return true;
}
}特点:
- 实时指标收集:输入数量、处理数量
- 每个管道独立计数
- 支持聚合指标报告
指标数据:
class PipeMetrics {
long totalInputCount; // 总输入消息数
long totalProcessCount; // 总处理消息数
int currentQueueSize; // 当前队列大小
List<ClientInformation> boundClients; // 绑定的客户端
}查询指标示例:
// 获取单个管道指标
MessagePipeMetrics metrics = pipe.getMetrics();
System.out.println("输入: " + metrics.getTotalInputCount());
System.out.println("处理: " + metrics.getTotalProcessCount());
// 获取所有管道的聚合指标
MessagePipeMetricsAggregator aggregator = new MessagePipeMetricsAggregator();
long totalInput = aggregator.aggregateInputCount();
long totalProcess = aggregator.aggregateProcessCount();特点:
- 支持读写分离的逻辑架构设计
- 为高吞吐量应用提高扩展空间
- 底层使用 Redis,支持读写分离的二次开发
设计思想: 虽然 Message Pipe 目前底层依赖单一的 Redis,但在逻辑上支持读写分离的扩展。这意味着:
- 消息写入(put)和消息读取(take)可以独立扩展
- 可以针对性地优化写入或读取性能
- 便于后续演进为 Redis Cluster 或其他存储方案
性能优化示例:
┌─────────────────────────────────────────────┐
│ 写入优化:多个 Producer 并发写入 │
│ (put lock 保护,防止并发冲突) │
└────────────┬────────────────────────────────┘
│
▼
┌──────────┐
│ Redis │
│ List │
└────────┬─┘
│
┌────────────────┴────────────────────────────┐
│ 读取优化:多个 Consumer 并发读取 │
│ (take lock 保护,严格顺序处理) │
└─────────────────────────────────────────────┘
特点:
- 开箱即用的 Spring Boot Starter 集成
- 自动配置和 Bean 注册
- 声明式启用/禁用
启用服务端:
@SpringBootApplication
@EnableMessagePipeServer // 启用 Server 端功能
public class ServerApplication {
public static void main(String[] args) {
SpringApplication.run(ServerApplication.class, args);
}
}启用客户端:
@SpringBootApplication
@EnableMessagePipeClient // 启用 Client 端功能
public class ClientApplication {
public static void main(String[] args) {
SpringApplication.run(ClientApplication.class, args);
}
}配置文件示例 (application.yml):
方式 1:标准配置前缀
message-pipe:
server:
enabled: true
port: 5200
max-pipe-count: 100
cleanup-threshold-seconds: 1800
client:
enabled: true
local-port: 5201
server-address: localhost
server-port: 5200
heartbeat-interval-seconds: 10方式 2:MinBox 前缀配置
minbox:
message:
pipe:
server:
server-port: 5200 # Server gRPC 监听端口
check-client-expired-interval-seconds: 5 # 检查 Client 过期的时间间隔
client:
server-address: 127.0.0.1 # Server 端 IP
server-port: 5200 # Server 端端口
local-port: 5201 # 本机 gRPC 监听端口模式 1:gRPC 直连模式
┌─────────────────┐
│ Client 1 │
│ Client 2 │
│ Client 3 │
└────────┬────────┘
│ gRPC
▼
┌─────────────┐
│ Server │
│ Redis │
└─────────────┘
模式 2:Nacos 服务发现模式
┌──────────────┐ ┌─────────────┐
│ Client 1 │────────►│ │
└──────────────┘ │ Nacos │
│ Registry │
┌──────────────┐ │ │
│ Client 2 │────────►│ │
└──────────────┘ └─────┬───────┘
│
┌──────────────┐ ┌─────▼──────┐
│ Client 3 │────────►│ Server │
└──────────────┘ │ Redis │
└────────────┘
服务端启动:
- Spring 容器初始化 → 识别
@EnableMessagePipeServer - 创建 Redis 连接(Redisson 客户端)
- 初始化
MessagePipeManager、ServiceDiscovery、MessagePipeScheduler - 启动 gRPC 服务器(默认端口 5200)
- 启动定时清理任务(清理过期管道和客户端)
客户端启动:
- Spring 容器初始化 → 识别
@EnableMessagePipeClient - 扫描所有
MessageProcessorBean - 启动 gRPC 服务器(默认端口 5201)
- 发送注册请求到服务端,告知绑定的管道名
- 启动心跳线程,定期发送心跳信号(间隔 10 秒)
// 生产者代码
MessagePipe pipe = messagePipeManager.getMessagePipe("order-process");
Message msg = new Message("order-123".getBytes());
pipe.putLast(msg); // 或 putLastBatch(List) 批量添加内部流程:
MessagePipe.putLast()→rBlockingQueue.add(message)- 消息添加到 Redis 队列:
{pipeName}.queue - 增加指标计数:
totalInputCount++ - 唤醒调度器线程:
scheduler.notifyAll()
MessagePipeScheduler 工作循环:
┌─────────────────────────────────────────┐
│ 1. 服务发现:查询健康的客户端 │
└────────────────┬────────────────────────┘
│
▼
┌────────────────────┐
│ 无可用客户端? │
└─┬──────────────┬───┘
│ 是 │ 否
▼ ▼
等待通知 ┌──────────────────────┐
│ 2. 获取批量消息 │
│ (默认 200 条) │
└────────────┬─────────┘
│
▼
┌──────────────────────┐
│ 3. 获取分布式锁 │
│ (take lock) │
└────────────┬─────────┘
│
▼
┌──────────────────────┐
│ 4. 选择负载均衡 │
│ 客户端 │
└────────────┬─────────┘
│
▼
┌──────────────────────┐
│ 5. gRPC 发送消息批 │
│ 到客户端 │
└────────────┬─────────┘
│
▼
┌──────────────────────┐
│ 6. 接收处理结果 │
│ successCount │
└─┬──────────────┬─────┘
│ │
┌───────────┘ └───────────┐
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ 失败? │ │ 全部成功?│
│ 网络错误 │ │ (count= │
│ (count=-1) │ batch │
└─┬────────┘ │ size) │
│ └┬─────────┘
│ 是 │ 是
▼ ▼
break ┌─────────────────┐
重试下一轮 │ 7. 删除消息 │
│ (trim queue) │
└────────────────┘
│
▼
┌──────────────────┐
│ 8. 更新指标 │
│ totalProcessCount│
└────────────────┘
客户端处理流程:
┌───────────────────────────────────┐
│ 服务端 gRPC 发送消息批 │
│ MessageRequestBody { │
│ pipeName: "order-process" │
│ requestId: "req-123" │
│ messages: [msg1, msg2, msg3...] │
│ } │
└────────────────┬──────────────────┘
│
▼
┌────────────────────────┐
│ 客户端 gRPC 服务接收 │
│ ReceiveMessageService │
│ .messageProcessing() │
└────────────┬───────────┘
│
▼
┌────────────────────────┐
│ 查找对应的处理器 │
│ MessageProcessor │
│ (根据 pipeName 匹配) │
└────────────┬───────────┘
│
▼
┌─────────────────────────────┐
│ 顺序处理消息(关键!) │
│ for (Message msg : messages)│
│ { │
│ processor.processing() │
│ successCount++ │
│ } │
│ 一旦失败,停止处理 │
└────────────┬────────────────┘
│
▼
┌─────────────────────────────┐
│ 返回处理结果到服务端 │
│ MessageResponseBody { │
│ status: SUCCESS/ERROR │
│ successCount: N │
│ } │
└─────────────────────────────┘
部分失败处理(例如处理了 150/200 条):
成功处理 150 条消息
↓
返回 successCount = 150
↓
服务端检测:150 < 200(部分失败)
↓
删除成功的 150 条消息
↓
剩余 50 条消息重新在队列头部
↓
触发失败处理逻辑
失败消息处理:
消息处理失败(processor 返回 false)
↓
检查是否有健康的客户端
├─ 否 → 跳过,留在队列中等待
└─ 是 ↓
获取重试记录:MessageRetryRecord
↓
增加重试次数
↓
计算延迟时间
delay = 1000ms × 2^retryCount
├─ 重试 1:1 秒
├─ 重试 2:2 秒
├─ 重试 3:4 秒
├─ 重试 4:8 秒
└─ 重试 5:16 秒
↓
睡眠延迟时间
↓
重试次数 > 5?
├─ 否 → 继续重试(消息回到队列)
└─ 是 ↓
创建死信记录
DeadLetterRecord {
messageId,
failureReason,
retryCount: 5,
expireTime: now + 30days
}
↓
移入死信队列
{pipeName}_dead_letter
↓
从主队列删除消息
↓
清理重试记录
- Java 11 或更高版本
- Redis 服务器(推荐 5.0+)
- Spring Boot 2.7.0+
- Maven 3.6.0+
1. Maven 依赖:
<dependency>
<groupId>org.minbox.framework</groupId>
<artifactId>message-pipe-spring-context</artifactId>
<version>1.0.8</version>
</dependency>2. Spring Boot 应用类:
@SpringBootApplication
@EnableMessagePipeServer // 启用服务端
public class MessagePipeServerApplication {
public static void main(String[] args) {
SpringApplication.run(MessagePipeServerApplication.class, args);
}
}3. 配置文件 (application.yml):
server:
port: 8080
spring:
redis:
host: localhost
port: 6379
database: 0
message-pipe:
server:
port: 5200 # gRPC 服务器端口
max-pipe-count: 100 # 最大管道数
service-type: GRPC # 服务类型:GRPC 或 NACOS
cleanup-threshold-seconds: 1800 # 清理过期管道间隔(30分钟)4. 发送消息:
@RestController
@RequestMapping("/api/messages")
public class MessageController {
@Autowired
private DefaultMessagePipeManager messagePipeManager;
@PostMapping("/send")
public void sendMessage(@RequestParam String pipeName,
@RequestParam String messageBody) {
MessagePipe pipe = messagePipeManager.getMessagePipe(pipeName);
Message message = new Message(messageBody.getBytes());
pipe.putLast(message);
}
@PostMapping("/send-batch")
public void sendBatch(@RequestParam String pipeName,
@RequestBody List<String> messages) {
MessagePipe pipe = messagePipeManager.getMessagePipe(pipeName);
List<Message> msgList = messages.stream()
.map(body -> new Message(body.getBytes()))
.collect(Collectors.toList());
pipe.putLastBatch(msgList);
}
}1. Maven 依赖:
<dependency>
<groupId>org.minbox.framework</groupId>
<artifactId>message-pipe-spring-context</artifactId>
<version>1.0.8</version>
</dependency>2. Spring Boot 应用类:
@SpringBootApplication
@EnableMessagePipeClient // 启用客户端
public class MessagePipeClientApplication {
public static void main(String[] args) {
SpringApplication.run(MessagePipeClientApplication.class, args);
}
}3. 配置文件 (application.yml):
server:
port: 8081
message-pipe:
client:
local-port: 5201 # 本地 gRPC 服务器端口
server-address: localhost # 服务端地址
server-port: 5200 # 服务端 gRPC 端口
heartbeat-interval-seconds: 10 # 心跳间隔
retry-register-times: 3 # 注册重试次数4. 实现消息处理器:
@Component
public class OrderMessageProcessor implements MessageProcessor {
private static final Logger logger = LoggerFactory.getLogger(OrderMessageProcessor.class);
@Override
public String bindingPipeName() {
return "order-processing";
}
@Override
public ProcessorType processorType() {
return ProcessorType.SPECIFIC;
}
@Override
public boolean processing(String pipeName, String requestId, Message message) {
try {
String orderId = new String(message.getBody());
logger.info("Processing order: {}, requestId: {}", orderId, requestId);
// 您的业务逻辑
processOrder(orderId);
return true; // 返回 true 表示处理成功
} catch (Exception e) {
logger.error("Failed to process order", e);
return false; // 返回 false 表示处理失败,会触发重试
}
}
private void processOrder(String orderId) {
// 处理订单逻辑
// 例如:数据库更新、调用下游服务等
}
}5. 实现正则匹配处理器:
@Component
public class PaymentMessageProcessor implements MessageProcessor {
@Override
public String bindingPipeName() {
return "payment-.*"; // 匹配所有 payment-* 的管道
}
@Override
public ProcessorType processorType() {
return ProcessorType.REGEX;
}
@Override
public boolean processing(String pipeName, String requestId, Message message) {
String paymentId = new String(message.getBody());
System.out.println("Processing payment: " + paymentId +
" from pipe: " + pipeName);
// 处理支付逻辑
return true;
}
}场景:订单处理系统
发送消息:
curl -X POST "http://localhost:8080/api/messages/send" \
-H "Content-Type: application/json" \
-d '{
"pipeName": "order-processing",
"messageBody": "order-123"
}'查看指标:
@GetMapping("/metrics")
public ResponseEntity<?> getMetrics() {
MessagePipe pipe = messagePipeManager.getMessagePipe("order-processing");
return ResponseEntity.ok(Map.of(
"totalInput", pipe.getTotalInputCount(),
"totalProcess", pipe.getTotalProcessCount(),
"currentQueueSize", pipe.getCurrentQueueSize(),
"boundClients", pipe.getBoundClients()
));
}死信队列恢复:
@PostMapping("/dlq/recover")
public void recoverFromDLQ(@RequestParam String pipeName,
@RequestParam String messageId) {
MessageDeadLetterQueue dlq = deadLetterQueueManager.getQueue(pipeName);
dlq.recoverMessage(messageId);
}
@GetMapping("/dlq/list")
public List<DeadLetterRecord> listDLQ(@RequestParam String pipeName) {
MessageDeadLetterQueue dlq = deadLetterQueueManager.getQueue(pipeName);
return dlq.listMessages();
}| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
batchSize |
int | 200 | 每批处理的消息数 |
putLockTime.waitTime |
int | 5 | put 锁等待时间(秒) |
putLockTime.leaseTime |
int | 10 | put 锁租期(秒) |
takeLockTime.waitTime |
int | 10 | take 锁等待时间(秒) |
takeLockTime.leaseTime |
int | 300 | take 锁租期(秒) |
dlqMessageExpireSeconds |
long | 2592000 | 死信消息过期时间(30 天) |
retryRecordExpireSeconds |
long | 2592000 | 重试记录过期时间(30 天) |
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
serverPort |
int | 5200 | gRPC 服务器端口 |
maxMessagePipeCount |
int | 100 | 最大管道数量 |
expiredExcludeThresholdSeconds |
int | 10 | 客户端离线判断阈值(秒) |
cleanupExpiredMessagePipeThresholdSeconds |
int | 1800 | 清理过期管道间隔(秒) |
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
localPort |
int | 5201 | 本地 gRPC 服务器端口 |
serverAddress |
String | localhost | 服务端地址 |
serverPort |
int | 5200 | 服务端 gRPC 端口 |
heartBeatIntervalSeconds |
int | 10 | 心跳间隔(秒) |
retryRegisterTimes |
int | 3 | 注册重试次数 |
localNetworkInterface |
String | null | 本地网卡名(可选) |
A: 通过以下机制保证:
- 单线程调度:
MessagePipeScheduler每个管道只有一个工作线程 - 分布式锁:在处理前获取 take lock,确保全局只有一个处理者
- 批量删除:仅在全部消息处理成功后才删除,失败则保留
take lock ──→ fetch batch ──→ sequential process ──→ atomic delete
(单线程) (不允许并发) (全部成功)
A: 多层保护机制:
- Redis 持久化:消息存在 RDB/AOF
- 重试机制:处理失败自动重试最多 5 次
- 死信队列:重试失败后保存 30 天便于恢复
- 客户端确认:仅收到成功响应才删除消息
A:
- 消息保留在 Redis:由于是基于 Redis 存储,消息不丢失
- 客户端自动重连:客户端会重新注册(带重试机制)
- 消息重新分配:服务端恢复后继续分配消息
- 建议使用 Redis 集群/哨兵模式提高可用性
A: 实现 ClientLoadBalanceStrategy 接口:
@Component
public class ConsistentHashStrategy implements ClientLoadBalanceStrategy {
@Override
public ClientInformation select(String pipeName,
List<ClientInformation> clients) {
// 一致性哈希实现
// 确保同一消息始终发往同一客户端
return clients.get(hashFunction(pipeName) % clients.size());
}
}A: 建议方案:
- 消息本体存储在文件系统或对象存储(如 S3)
- 消息内容只包含引用 ID
- 客户端根据 ID 获取实际内容
// 推荐做法
byte[] messageBody = referenceId.getBytes(); // "oss://file-id-123"
Message message = new Message(messageBody);
pipe.putLast(message);
// 客户端处理
public boolean processing(..., Message message) {
String fileId = new String(message.getBody());
byte[] actualContent = fetchFromOss(fileId);
// 处理实际内容
return true;
}A: 目前不支持原生优先级。可以通过以下方案实现:
- 使用多个管道,高优先级消息放在单独管道
- 高优先级管道分配更多客户端资源
- 在应用层实现基于消息内容的路由
- 单机 Redis:内存模式,不开启持久化
- 网络延迟:< 1ms(同机房)
- 消息大小:1KB
- 批大小:200 条
| 指标 | 数值 | 说明 |
|---|---|---|
| 吞吐量(Producer) | 50,000 msg/s | 取决于 Redis 性能 |
| 吞吐量(Consumer) | 100,000 msg/s | 取决于处理逻辑 |
| 端到端延迟 | < 100ms (p99) | 包括网络延迟 |
| 内存占用 | ~1MB per 10K msg | 消息在 Redis 中 |
| 消息处理失败恢复 | < 5s | 包括重试和 DLQ 转移 |
✅ 推荐:
- 消息大小 < 10KB
- 仅包含必要数据或引用 ID
- 包含唯一标识符用于幂等性判断
❌ 不推荐:
- 超大消息(> 100KB)直接传输
- 敏感信息明文存储(应该加密)
- 没有唯一 ID 的消息
✅ 推荐:
@Component
public class OrderProcessor implements MessageProcessor {
@Override
public boolean processing(String pipeName, String requestId, Message message) {
String orderId = new String(message.getBody());
try {
// 业务逻辑(应该是幂等的)
boolean success = processOrderIdempotent(orderId);
return success;
} catch (Exception e) {
logger.error("Error processing order: {}", orderId, e);
return false; // 返回 false 触发重试
}
}
// 实现幂等性处理
private boolean processOrderIdempotent(String orderId) {
// 检查是否已处理过
Order order = orderRepository.findById(orderId);
if (order != null && order.isProcessed()) {
return true; // 已处理过,返回成功
}
// 处理订单
order.setProcessed(true);
orderRepository.save(order);
return true;
}
}❌ 不推荐:
// 阻塞过长
public boolean processing(..., Message message) {
Thread.sleep(60000); // 不要阻塞!
return true;
}
// 抛出异常
public boolean processing(..., Message message) {
throw new RuntimeException("Error"); // 应该返回 false
}
// 非幂等
public boolean processing(..., Message message) {
// 无法判断是否已处理,重试会重复处理
deductFromAccount(accountId, amount);
return true;
}✅ 推荐:
- 区分可恢复和不可恢复错误
- 可恢复错误返回 false 触发重试
- 不可恢复错误记录日志后返回 true
public boolean processing(..., Message message) {
try {
// 业务处理
processMessage(message);
return true;
} catch (DatabaseException e) {
logger.warn("Database error, will retry", e);
return false; // 数据库错误 → 重试
} catch (ValidationException e) {
logger.error("Invalid message, move to DLQ", e);
return true; // 验证失败 → 不重试(虽然会被删除)
}
}调整批大小:
@Bean
public MessagePipeConfiguration messagePipeConfiguration() {
return MessagePipeConfiguration.defaultConfiguration()
.setBatchSize(500); // 增加批大小提高吞吐,但增加延迟
}调整重试参数:
// 更激进的重试(更快恢复)
.setRetryRecordExpireSeconds(24 * 60 * 60) // 1 天
// 更保守的重试(减少告警)
.setRetryRecordExpireSeconds(7 * 24 * 60 * 60) // 7 天多客户端并行处理:
// 部署多个客户端实例,绑定同一管道
// 服务端会自动负载均衡分配消息- Redis 5.0 及以上
- Redis 集群模式
- Redis 哨兵模式
- Spring Boot 2.3.0+
- Spring Boot 3.0.0+(需要 Java 17)
- gRPC 1.40.0+
- Protocol Buffers 3.17.0+
- Nacos: 用于服务发现和配置管理
- Prometheus: 用于指标收集(可自定义实现)
- ELK: 用于日志收集和分析
| 依赖 | 版本 | 用途 |
|---|---|---|
| Redisson | 3.17.7 | Redis 客户端 |
| gRPC | 1.45.1 | RPC 框架 |
| Protobuf | 3.19.4 | 消息序列化 |
| Spring Framework | 5.3.31 | IoC 容器 |
| Spring Data Redis | 2.7.18 | Redis 集成 |
| Jackson | 2.15.3 | JSON 序列化 |
| Nacos Client | 1.4.3 | 服务发现(可选) |
| Lombok | 1.18.30 | 代码生成 |
message-pipe-core:
org.minbox.framework.message.pipe.core.Message- 消息类org.minbox.framework.message.pipe.core.domain.ClientInformation- 客户端信息org.minbox.framework.message.pipe.core.domain.ServerInformation- 服务端信息
message-pipe-server:
org.minbox.framework.message.pipe.server.MessagePipe- 消息管道核心类org.minbox.framework.message.pipe.server.manager.MessagePipeManager- 管道管理器org.minbox.framework.message.pipe.server.scheduler.MessagePipeScheduler- 消息调度器org.minbox.framework.message.pipe.server.distributor.MessagePipeDistributor- 消息分配器org.minbox.framework.message.pipe.server.ServiceDiscovery- 服务发现org.minbox.framework.message.pipe.server.manager.MessageDeadLetterQueue- 死信队列org.minbox.framework.message.pipe.server.domain.MessageRetryRecord- 重试记录
message-pipe-client:
org.minbox.framework.message.pipe.client.MessageProcessor- 消息处理器接口org.minbox.framework.message.pipe.client.MessagePipeClientRunner- 客户端启动器org.minbox.framework.message.pipe.client.ReceiveMessageService- 消息接收服务
message-pipe-spring-context:
org.minbox.framework.message.pipe.spring.context.annotation.EnableMessagePipeServer- 启用服务端注解org.minbox.framework.message.pipe.spring.context.annotation.EnableMessagePipeClient- 启用客户端注解
症状:
ERROR: Connection refused at localhost:5200
排查步骤:
- 检查服务端是否启动:
curl -v localhost:5200 - 检查防火墙:
sudo lsof -i :5200 - 检查配置文件中的地址和端口是否正确
- 检查 gRPC 服务是否正常启动(查看日志)
症状:
totalInputCount = 10000
totalProcessCount = 1000
currentQueueSize = 9000
排查步骤:
- 检查处理器是否持续返回 false(查看日志)
- 检查客户端是否在线:
ServiceDiscovery.listClients() - 检查处理逻辑性能:是否处理时间过长
- 增加客户端数量或提高处理性能
症状:
消息经过 5 次重试后仍失败,移入 DLQ
排查步骤:
- 查询死信队列:
MessageDeadLetterQueue.listMessages(pipeName) - 分析失败原因:
DeadLetterRecord.failureReason - 修复根本原因(如依赖服务恢复)
- 恢复消息到主队列:
deadLetterQueue.recoverMessage(messageId)
症状:
Redis 内存持续增长,客户端 JVM 内存也在增长
排查步骤:
- 检查是否有过期管道未清理:
MessagePipeManager.getMetrics() - 检查死信队列是否过大:
deadLetterQueue.size() - 检查 Redis 键过期设置:
redis-cli TTL {pipeName}_retry_records - 手动清理:
deadLetterQueue.clear()或messagePipeManager.clearExpiredPipes()
症状:
客户端离线 → 消息不处理 → 重新上线 → 再离线
排查步骤:
- 检查客户端和服务端网络连接
- 增加心跳间隔:
heartBeatIntervalSeconds = 30 - 检查是否有卡顿:业务线程阻塞或 GC 暂停
- 查看客户端日志中的异常堆栈
Message Pipe 非常适合以下业务场景和技术需求:
✅ 订单流程处理
创建订单 → 支付 → 发货 → 收货 → 完成
必须严格按顺序处理,不能乱序
✅ 库存扣减管理
检查库存 → 冻结库存 → 实际扣减
顺序错乱会导致超卖
✅ 账户资金变更
余额变更必须按顺序记录
保证账户余额的准确性
- 已有 Redis 基础设施的项目
- 不想额外部署 RabbitMQ、Kafka 等重型消息队列
- 对消息吞吐量有一定要求但不是极限追求
- Java 技术栈项目
- 需要灵活控制消息处理流程
- 处理逻辑复杂,需要通过代码精细控制
消息量规模:每秒 1K-10K 消息
节点规模:3-20 个服务节点
网络环境:局域网或同机房(推荐)
对 RT 要求:不超过 500ms (p99)
- 需要监控每个管道的处理指标
- 需要人工干预死信队列恢复失败消息
- 需要自定义负载均衡和重试策略
❌ 以下场景不建议使用 Message Pipe:
| 场景 | 原因 | 建议替代方案 |
|---|---|---|
| 超大消息量 | 单 Redis 吞吐有限 | Kafka、Pulsar |
| 极低延迟要求 | gRPC 网络延迟 | 本地消息总线、内存队列 |
| 完全无序消费 | 框架强制顺序 | RabbitMQ、SQS |
| 需要事务支持 | 不支持分布式事务 | 专业消息队列 + Saga |
| 复杂 Topic/分片 | 管道概念较简洁 | 专业消息队列 |
| 消息持久化要求极高 | 依赖 Redis 持久化 | 专业消息队列 |
Apache License 2.0
- 项目仓库:message-pipe
- 示例项目:message-pipe-example
- 作者:恒宇少年 (hengboy)
- 组织:MinBox Projects
最后更新: 2025-12-25
此文档为 Message Pipe 框架的完整介绍和使用指南,涵盖了从基础概念到高级应用的所有方面。如有问题,请参考示例项目或提交 Issue。