-
-
Notifications
You must be signed in to change notification settings - Fork 41
[Fix] refine message queue delay logic and state management #668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Introduced state machine ('collecting', 'processing') for MessageBatch
- Added collectWaiters to handle messages during delay periods
- Improved cleanup and cancellation of waiters to prevent leaks and race conditions
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Walkthrough引入收集等待者(collectWaiters)与批次状态('collecting' | 'processing'),移除原有 processorResolve,重构消息批次的收集与超时触发逻辑,使用等待者模型(awaitCollectingBatch / awaitBatchCompletion / interruptAndMerge 等)驱动收集到处理的切换与合并流程。 Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client as 客户端
participant Middleware as 中间件
participant Batch as MessageBatch
participant Waiters as CollectWaiters
Client->>Middleware: 发送/追加消息
Middleware->>Batch: createOrGetBatch() (state: collecting)
activate Batch
Batch->>Waiters: 注册 waiter 到 collectWaiters
Note over Batch,Waiters: 在 messageQueueDelay 窗口内收集消息与等待者
Client->>Middleware: 继续追加消息
Middleware->>Batch: addMessage()
Batch->>Waiters: 追加等待者或消息记录
alt 超时或显式触发(flush/interruption)
Batch->>Waiters: collectWaiters(...) 通知继续
Waiters-->>Batch: 返回继续信号
Batch->>Batch: state = processing
Batch->>Batch: 合并收集的消息并准备处理
Batch->>Middleware: 返回合并后的批次以继续处理
end
deactivate Batch
Middleware-->>Client: 返回处理结果或状态
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 分钟 Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @dingyi222666, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refines the message queue delay logic and state management within the chat middleware. The primary goal is to enhance the system's robustness and eliminate potential race conditions by introducing a formal state machine for message batches. This change ensures more predictable behavior when handling message delays and concurrent inputs, leading to a more stable and reliable message processing pipeline. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the message queue delay logic by introducing a state machine and dedicated waiters for message collection, which improves clarity and robustness. The changes effectively address potential race conditions and make the state management more explicit. My review includes a high-priority fix for a potential race condition in awaitCollectingBatch and a couple of medium-priority suggestions to refactor duplicated code for better maintainability. Overall, this is a solid improvement to the message batching mechanism.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)
1-298: 修复嵌套等待者中的消息数组处理问题
waitForBatchCompletion中的嵌套等待者在特定并发场景下会触发关键缺陷。当新用户消息经过interruptAndMerge清空消息数组后,嵌套等待者尝试调用mergeMessages()时会访问空数组,导致messages[0]返回undefined,进而引发崩溃。具体场景:
- 用户 B 消息被
waitForBatchCompletion捕获,创建新批次(batch2)- 用户 B 继续发送消息 →
awaitBatchCompletion添加到 batch2interruptAndMerge清空 batch2.messages = []- 若 batch2 在收到新消息前完成,嵌套等待者尝试合并空数组导致崩溃
建议修复:
在嵌套等待者中清空消息前增加检查,或存储原始消息用于备用合并。也可在mergeMessages函数添加空数组防护。
🧹 Nitpick comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)
61-76: 检查死代码和逻辑清晰度当
config.messageQueueDelay > 0时,第 72 行会提前返回,导致第 75 行newBatch.messages = []永远不会执行。当延迟为 0 时,第 75 行清空消息数组的目的是让第一条消息立即继续处理(不作为批次的一部分)。但这个逻辑不够清晰,建议:
- 为
messageQueueDelay === 0的情况添加注释说明为何清空消息- 或者考虑重构以提高可读性,例如使用明确的 if-else 分支
🔎 建议的重构方案
if (config.messageQueueDelay > 0) { newBatch.timeout = ctx.setTimeout(() => { if (batches.get(conversationId) === newBatch) { logger.debug( // eslint-disable-next-line max-len `Delay timeout (${config.messageQueueDelay}s) for ${conversationId}, processing batch with ${newBatch.messages.length} messages` ) newBatch.timeout = undefined flushCollectWaiters(newBatch) } }, config.messageQueueDelay * 1000) return await awaitCollectingBatch(newBatch, context) + } else { + // 无延迟时,清空消息数组以便第一条消息立即继续处理 + newBatch.messages = [] + return ChainMiddlewareRunStatus.CONTINUE } - - newBatch.messages = [] - return ChainMiddlewareRunStatus.CONTINUE
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/core/src/middlewares/chat/message_delay.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/core/src/middlewares/chat/message_delay.ts (3)
packages/core/src/index.ts (1)
logger(38-38)packages/core/src/chains/chain.ts (1)
ChainMiddlewareContext(846-855)packages/core/src/types.ts (1)
Message(53-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build
- GitHub Check: lint
🔇 Additional comments (9)
packages/core/src/middlewares/chat/message_delay.ts (9)
19-21: 接口变更符合新的状态机设计添加
collectWaiters和state字段以支持收集和处理两阶段的状态机,这与 PR 目标一致。这些变更为后续的并发消息处理提供了基础。
92-111: 收集阶段逻辑正确在
collecting状态下,每条新消息会重置超时计时器,这符合延迟批处理的预期行为。注意awaitCollectingBatch会取消之前的等待者(见第 199 行),只有最后一条消息会继续处理整个批次。
113-123: 处理阶段的等待逻辑正确当批次处于
processing状态时,新消息会等待当前批次完成,然后中断并合并形成新批次。这种设计允许在处理期间继续接收消息。
138-142: 等待者清理逻辑正确先提取等待者数组,清空原数组,再调用等待者。这种模式防止了在遍历过程中数组被修改的竞态条件。
174-176: 清理逻辑完善在清除聊天历史时同时停止
collectWaiters和resolveWaiters,确保所有等待的消息都被正确取消,防止内存泄漏。
182-193: 中断和合并逻辑清晰
interruptAndMerge函数正确处理了批次的中断和消息合并,状态转换为processing也是合理的。
195-212: 收集等待逻辑符合设计
awaitCollectingBatch首先取消所有之前的收集等待者(第 199 行),然后添加新的等待者。这意味着在收集窗口内,只有最后一条消息会继续处理(携带所有收集到的消息),而之前的消息会收到 STOP 状态。这是有意为之的设计,用于将多条消息批处理为一次处理。
214-224: 辅助函数实现正确
flushCollectWaiters和cancelCollectWaiters采用了相同的安全模式:先提取等待者数组,清空原数组,再调用等待者。这防止了并发修改问题。
226-240: 批次完成等待逻辑正确根据
resolveWaiters.length决定是替换还是追加消息,这是合理的:
- 无等待者时,开始新批次(替换消息)
- 有等待者时,加入现有批次(追加消息)
…resolution - Extracted `resetBatchTimeout` helper to handle batch timeout creation and cleanup - Unified `resolveCollectWaiters` to handle both continuation and cancellation - Refined `awaitCollectingBatch` to resolve previous waiters before starting a new one
Ensures that messages are properly cleared after they are merged into the input message, preventing potential duplicate message processing in the next batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)
265-284: 严重问题:未清空消息数组导致潜在的重复处理之前的评审已指出此问题但尚未解决:嵌套等待者在 Line 275-277 合并消息后没有清空
newBatch.messages数组。问题场景:
- 用户A的批次正在处理中
- 用户B发送消息B1,触发
waitForBatchCompletion,创建新批次(包含嵌套等待者)- 用户B再发送消息B2,调用
awaitBatchCompletion添加到同一批次- 用户A的处理完成,触发
completeBatch,解决嵌套等待者- 嵌套等待者合并消息 [B1, B2] 并分配给B1的上下文
- B2的等待者随后也被解决,执行
interruptAndMerge- 问题:此时
batch.messages仍包含 [B1, B2],导致B2会重复处理这些消息影响:可能导致消息重复处理和不正确的对话状态。
🔎 建议的修复方案
在嵌套等待者中清空消息数组:
resolveWaiters: [ (nextStatus) => { if (nextStatus === ChainMiddlewareRunStatus.STOP) { resolve(ChainMiddlewareRunStatus.STOP) return } const newBatch = batches.get(conversationId)! context.options.inputMessage = mergeMessages( newBatch.messages ) + newBatch.messages = [] batches.delete(conversationId) resolve(ChainMiddlewareRunStatus.CONTINUE) } ],
🧹 Nitpick comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)
174-213: 辅助函数实现良好新增的辅助函数
interruptAndMerge、awaitCollectingBatch和resolveCollectWaiters成功地解决了之前评审中提出的代码重复问题,提升了可维护性。关于 Line 191 的
resolveCollectWaiters调用位置:之前的评审建议将其移入 Promise 构造函数内部以确保原子性。虽然 JavaScript 单线程特性使当前实现功能正确,但建议采纳之前的意见以提高代码清晰度。🔎 可选的改进方案(参考之前的评审建议)
将取消操作移入 Promise 构造函数内:
async function awaitCollectingBatch( batch: MessageBatch, context: ChainMiddlewareContext ): Promise<ChainMiddlewareRunStatus> { - resolveCollectWaiters(batch, ChainMiddlewareRunStatus.STOP) return await new Promise((resolve) => { + resolveCollectWaiters(batch, ChainMiddlewareRunStatus.STOP) batch.collectWaiters.push((status) => { if (status === ChainMiddlewareRunStatus.STOP) { resolve(ChainMiddlewareRunStatus.STOP) return } context.options.inputMessage = mergeMessages(batch.messages) batch.messages = [] batch.state = 'processing' resolve(ChainMiddlewareRunStatus.CONTINUE) }) }) }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/core/src/middlewares/chat/message_delay.ts
🧰 Additional context used
🪛 GitHub Check: CodeFactor
packages/core/src/middlewares/chat/message_delay.ts
[warning] 62-62: packages/core/src/middlewares/chat/message_delay.ts#L62
Replace ⏎························ctx,⏎························config,⏎························newBatch,⏎························conversationId⏎···················· with ctx,·config,·newBatch,·conversationId (prettier/prettier)
[warning] 95-95: packages/core/src/middlewares/chat/message_delay.ts#L95
Replace ⏎························ctx,⏎························config,⏎························batch,⏎························conversationId⏎···················· with ctx,·config,·batch,·conversationId (prettier/prettier)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: lint
- GitHub Check: build
🔇 Additional comments (3)
packages/core/src/middlewares/chat/message_delay.ts (3)
15-22: 接口定义合理
collectWaiters和state字段的添加符合PR目标,为消息批次引入了明确的状态机模型。
215-234: 成功提取超时逻辑
resetBatchTimeout函数成功地集中了批次超时的创建和清理逻辑,消除了代码重复,符合之前评审的建议。
154-171: 清理逻辑完善在清除聊天历史时正确地停止了
collectWaiters,确保了新增的收集等待者也能被妥善清理,避免了潜在的内存泄漏。
Fixed the indentation of resetBatchTimeout calls for better code readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)
177-194: 收集等待逻辑可考虑更防御性的写法当前实现在 JavaScript 单线程事件循环模型下是安全的——
resolveCollectWaiters和push之间的同步操作不会被其他代码打断。但为了代码意图更明确,可考虑将resolveCollectWaiters移入 Promise 构造函数内部:🔎 可选的防御性重构
async function awaitCollectingBatch( batch: MessageBatch, context: ChainMiddlewareContext ): Promise<ChainMiddlewareRunStatus> { - resolveCollectWaiters(batch, ChainMiddlewareRunStatus.STOP) return await new Promise((resolve) => { + resolveCollectWaiters(batch, ChainMiddlewareRunStatus.STOP) batch.collectWaiters.push((status) => {
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/core/src/middlewares/chat/message_delay.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/core/src/middlewares/chat/message_delay.ts (4)
packages/core/src/index.ts (1)
logger(38-38)packages/core/src/chains/chain.ts (1)
ChainMiddlewareContext(846-855)packages/core/src/config.ts (2)
Config(3-54)Config(56-167)packages/core/src/types.ts (1)
Message(53-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: lint
- GitHub Check: build
🔇 Additional comments (9)
packages/core/src/middlewares/chat/message_delay.ts (9)
15-22: 状态机模式设计良好接口定义清晰地区分了
collecting和processing两个状态,配合collectWaiters和resolveWaiters分别管理收集阶段和处理阶段的等待者。这种设计有效避免了之前processorResolve单一 Promise 可能导致的竞态问题。
45-68: 批次初始化逻辑清晰根据
messageQueueDelay配置正确初始化状态:延迟 > 0 时进入collecting状态等待更多消息,延迟 = 0 时直接进入processing状态。第 66 行清空messages数组的处理是正确的,因为此时inputMessage已在context.options中保留。
83-106: 基于状态的分支处理正确
collecting状态时将消息追加到批次并等待收集完成,processing状态时等待当前处理完成后再进行中断合并。状态切换逻辑清晰,避免了消息丢失或重复处理的问题。
120-124: 等待者解析模式正确先复制等待者数组再清空,然后逐个调用。这种模式可以防止在迭代过程中数组被修改导致的问题。
156-158: 完善的清理逻辑新增对
collectWaiters的清理确保了清除聊天历史时不会遗留悬挂的 Promise,有效防止内存泄漏。
196-224: 辅助函数提取良好
resolveCollectWaiters统一了等待者解析逻辑,通过status参数区分继续和停止场景。resetBatchTimeout的提取消除了超时设置的重复代码。这两个重构都提高了代码的可维护性。
226-240: 批次完成等待逻辑清晰当没有现有等待者时(处理阶段的第一个后续消息),重置消息数组;有等待者时追加消息。这确保了处理阶段的消息正确累积。
255-275: 重复处理问题已修复第 268 行
newBatch.messages = []正确解决了之前评审中指出的潜在重复处理问题。在嵌套等待者合并消息后立即清空数组,防止后续等待者重复处理相同消息。
280-298: 消息合并实现正确单消息时直接返回,多消息时用双换行符连接内容,并合并所有
additional_kwargs。逻辑简洁有效。
This pr refines the message queue delay logic and state management to improve robustness and prevent race conditions.
New Features
collecting,processing) for message batches to clearly define the lifecycle of a batch.collectWaitersto handle multiple concurrent messages during the collection phase.Bug fixes
Other Changes
interruptAndMergeandawaitCollectingBatchfor better readability and maintainability.ChainMiddlewareRunStatusacross the delay middleware.