Skip to content

Conversation

@dingyi222666
Copy link
Member

This pr refines the message queue delay logic and state management to improve robustness and prevent race conditions.

New Features

  • Introduced a state machine (collecting, processing) for message batches to clearly define the lifecycle of a batch.
  • Added collectWaiters to handle multiple concurrent messages during the collection phase.

Bug fixes

  • Improved synchronization between message collection and processing to avoid duplicate or lost messages during high-frequency input.
  • Enhanced cleanup logic for resolve waiters to ensure proper cancellation and prevent memory leaks.

Other Changes

  • Refactored interruptAndMerge and awaitCollectingBatch for better readability and maintainability.
  • Standardized the use of ChainMiddlewareRunStatus across the delay middleware.

- Introduced state machine ('collecting', 'processing') for MessageBatch
- Added collectWaiters to handle messages during delay periods
- Improved cleanup and cancellation of waiters to prevent leaks and race conditions
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 25, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

引入收集等待者(collectWaiters)与批次状态('collecting' | 'processing'),移除原有 processorResolve,重构消息批次的收集与超时触发逻辑,使用等待者模型(awaitCollectingBatch / awaitBatchCompletion / interruptAndMerge 等)驱动收集到处理的切换与合并流程。

Changes

Cohort / File(s) 变更摘要
消息延迟状态机重构
packages/core/src/middlewares/chat/message_delay.ts
• 在 MessageBatch 中新增 collectWaiters: ((status?: ChainMiddlewareRunStatus) => void)[] 与 `state: 'collecting'

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Client as 客户端
    participant Middleware as 中间件
    participant Batch as MessageBatch
    participant Waiters as CollectWaiters

    Client->>Middleware: 发送/追加消息
    Middleware->>Batch: createOrGetBatch() (state: collecting)
    activate Batch
    Batch->>Waiters: 注册 waiter 到 collectWaiters
    Note over Batch,Waiters: 在 messageQueueDelay 窗口内收集消息与等待者
    Client->>Middleware: 继续追加消息
    Middleware->>Batch: addMessage()
    Batch->>Waiters: 追加等待者或消息记录

    alt 超时或显式触发(flush/interruption)
        Batch->>Waiters: collectWaiters(...) 通知继续
        Waiters-->>Batch: 返回继续信号
        Batch->>Batch: state = processing
        Batch->>Batch: 合并收集的消息并准备处理
        Batch->>Middleware: 返回合并后的批次以继续处理
    end
    deactivate Batch
    Middleware-->>Client: 返回处理结果或状态
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 分钟

Poem

🐰 新批次轻声起,
等待者列队记节拍,
时间一到合并去,
消息成行向前推,
兔耳摇曳笑开怀。

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed 标题清晰准确地总结了主要变更:改进消息队列延迟逻辑和状态管理,与变更集完全匹配。
Description check ✅ Passed 描述详细相关,涵盖新增功能、错误修复和其他改动,与变更集内容高度相关且信息充分。
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/message-queue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @dingyi222666, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly refines the message queue delay logic and state management within the chat middleware. The primary goal is to enhance the system's robustness and eliminate potential race conditions by introducing a formal state machine for message batches. This change ensures more predictable behavior when handling message delays and concurrent inputs, leading to a more stable and reliable message processing pipeline.

Highlights

  • State Machine for Message Batches: Introduced a clear state machine (collecting, processing) for message batches to define their lifecycle, improving robustness and preventing race conditions.
  • Concurrent Message Handling: Added collectWaiters to effectively manage multiple concurrent messages during the collection phase, ensuring proper synchronization and preventing message loss or duplication.
  • Enhanced Cleanup and Synchronization: Improved cleanup logic for resolveWaiters and collectWaiters to ensure proper cancellation and prevent memory leaks, alongside better synchronization between message collection and processing.
  • Code Refactoring: Refactored interruptAndMerge and introduced new helper functions like awaitCollectingBatch, flushCollectWaiters, cancelCollectWaiters, and awaitBatchCompletion for better readability and maintainability.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the message queue delay logic by introducing a state machine and dedicated waiters for message collection, which improves clarity and robustness. The changes effectively address potential race conditions and make the state management more explicit. My review includes a high-priority fix for a potential race condition in awaitCollectingBatch and a couple of medium-priority suggestions to refactor duplicated code for better maintainability. Overall, this is a solid improvement to the message batching mechanism.

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)

1-298: 修复嵌套等待者中的消息数组处理问题

waitForBatchCompletion 中的嵌套等待者在特定并发场景下会触发关键缺陷。当新用户消息经过 interruptAndMerge 清空消息数组后,嵌套等待者尝试调用 mergeMessages() 时会访问空数组,导致 messages[0] 返回 undefined,进而引发崩溃。

具体场景

  • 用户 B 消息被 waitForBatchCompletion 捕获,创建新批次(batch2)
  • 用户 B 继续发送消息 → awaitBatchCompletion 添加到 batch2
  • interruptAndMerge 清空 batch2.messages = []
  • 若 batch2 在收到新消息前完成,嵌套等待者尝试合并空数组导致崩溃

建议修复
在嵌套等待者中清空消息前增加检查,或存储原始消息用于备用合并。也可在 mergeMessages 函数添加空数组防护。

🧹 Nitpick comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)

61-76: 检查死代码和逻辑清晰度

config.messageQueueDelay > 0 时,第 72 行会提前返回,导致第 75 行 newBatch.messages = [] 永远不会执行。

当延迟为 0 时,第 75 行清空消息数组的目的是让第一条消息立即继续处理(不作为批次的一部分)。但这个逻辑不够清晰,建议:

  1. messageQueueDelay === 0 的情况添加注释说明为何清空消息
  2. 或者考虑重构以提高可读性,例如使用明确的 if-else 分支
🔎 建议的重构方案
             if (config.messageQueueDelay > 0) {
                 newBatch.timeout = ctx.setTimeout(() => {
                     if (batches.get(conversationId) === newBatch) {
                         logger.debug(
                             // eslint-disable-next-line max-len
                             `Delay timeout (${config.messageQueueDelay}s) for ${conversationId}, processing batch with ${newBatch.messages.length} messages`
                         )
                         newBatch.timeout = undefined
                         flushCollectWaiters(newBatch)
                     }
                 }, config.messageQueueDelay * 1000)
                 return await awaitCollectingBatch(newBatch, context)
+            } else {
+                // 无延迟时,清空消息数组以便第一条消息立即继续处理
+                newBatch.messages = []
+                return ChainMiddlewareRunStatus.CONTINUE
             }
-
-            newBatch.messages = []
-            return ChainMiddlewareRunStatus.CONTINUE
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a0de752 and e6aa54a.

📒 Files selected for processing (1)
  • packages/core/src/middlewares/chat/message_delay.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/core/src/middlewares/chat/message_delay.ts (3)
packages/core/src/index.ts (1)
  • logger (38-38)
packages/core/src/chains/chain.ts (1)
  • ChainMiddlewareContext (846-855)
packages/core/src/types.ts (1)
  • Message (53-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build
  • GitHub Check: lint
🔇 Additional comments (9)
packages/core/src/middlewares/chat/message_delay.ts (9)

19-21: 接口变更符合新的状态机设计

添加 collectWaitersstate 字段以支持收集和处理两阶段的状态机,这与 PR 目标一致。这些变更为后续的并发消息处理提供了基础。


92-111: 收集阶段逻辑正确

collecting 状态下,每条新消息会重置超时计时器,这符合延迟批处理的预期行为。注意 awaitCollectingBatch 会取消之前的等待者(见第 199 行),只有最后一条消息会继续处理整个批次。


113-123: 处理阶段的等待逻辑正确

当批次处于 processing 状态时,新消息会等待当前批次完成,然后中断并合并形成新批次。这种设计允许在处理期间继续接收消息。


138-142: 等待者清理逻辑正确

先提取等待者数组,清空原数组,再调用等待者。这种模式防止了在遍历过程中数组被修改的竞态条件。


174-176: 清理逻辑完善

在清除聊天历史时同时停止 collectWaitersresolveWaiters,确保所有等待的消息都被正确取消,防止内存泄漏。


182-193: 中断和合并逻辑清晰

interruptAndMerge 函数正确处理了批次的中断和消息合并,状态转换为 processing 也是合理的。


195-212: 收集等待逻辑符合设计

awaitCollectingBatch 首先取消所有之前的收集等待者(第 199 行),然后添加新的等待者。这意味着在收集窗口内,只有最后一条消息会继续处理(携带所有收集到的消息),而之前的消息会收到 STOP 状态。这是有意为之的设计,用于将多条消息批处理为一次处理。


214-224: 辅助函数实现正确

flushCollectWaiterscancelCollectWaiters 采用了相同的安全模式:先提取等待者数组,清空原数组,再调用等待者。这防止了并发修改问题。


226-240: 批次完成等待逻辑正确

根据 resolveWaiters.length 决定是替换还是追加消息,这是合理的:

  • 无等待者时,开始新批次(替换消息)
  • 有等待者时,加入现有批次(追加消息)

…resolution

- Extracted `resetBatchTimeout` helper to handle batch timeout creation and cleanup
- Unified `resolveCollectWaiters` to handle both continuation and cancellation
- Refined `awaitCollectingBatch` to resolve previous waiters before starting a new one
Ensures that messages are properly cleared after they are merged into the input message, preventing potential duplicate message processing in the next batch.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)

265-284: 严重问题:未清空消息数组导致潜在的重复处理

之前的评审已指出此问题但尚未解决:嵌套等待者在 Line 275-277 合并消息后没有清空 newBatch.messages 数组。

问题场景

  1. 用户A的批次正在处理中
  2. 用户B发送消息B1,触发 waitForBatchCompletion,创建新批次(包含嵌套等待者)
  3. 用户B再发送消息B2,调用 awaitBatchCompletion 添加到同一批次
  4. 用户A的处理完成,触发 completeBatch,解决嵌套等待者
  5. 嵌套等待者合并消息 [B1, B2] 并分配给B1的上下文
  6. B2的等待者随后也被解决,执行 interruptAndMerge
  7. 问题:此时 batch.messages 仍包含 [B1, B2],导致B2会重复处理这些消息

影响:可能导致消息重复处理和不正确的对话状态。

🔎 建议的修复方案

在嵌套等待者中清空消息数组:

                 resolveWaiters: [
                     (nextStatus) => {
                         if (nextStatus === ChainMiddlewareRunStatus.STOP) {
                             resolve(ChainMiddlewareRunStatus.STOP)
                             return
                         }
                         const newBatch = batches.get(conversationId)!
                         context.options.inputMessage = mergeMessages(
                             newBatch.messages
                         )
+                        newBatch.messages = []
                         batches.delete(conversationId)
                         resolve(ChainMiddlewareRunStatus.CONTINUE)
                     }
                 ],
🧹 Nitpick comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)

174-213: 辅助函数实现良好

新增的辅助函数 interruptAndMergeawaitCollectingBatchresolveCollectWaiters 成功地解决了之前评审中提出的代码重复问题,提升了可维护性。

关于 Line 191 的 resolveCollectWaiters 调用位置:之前的评审建议将其移入 Promise 构造函数内部以确保原子性。虽然 JavaScript 单线程特性使当前实现功能正确,但建议采纳之前的意见以提高代码清晰度。

🔎 可选的改进方案(参考之前的评审建议)

将取消操作移入 Promise 构造函数内:

 async function awaitCollectingBatch(
     batch: MessageBatch,
     context: ChainMiddlewareContext
 ): Promise<ChainMiddlewareRunStatus> {
-    resolveCollectWaiters(batch, ChainMiddlewareRunStatus.STOP)
     return await new Promise((resolve) => {
+        resolveCollectWaiters(batch, ChainMiddlewareRunStatus.STOP)
         batch.collectWaiters.push((status) => {
             if (status === ChainMiddlewareRunStatus.STOP) {
                 resolve(ChainMiddlewareRunStatus.STOP)
                 return
             }
             context.options.inputMessage = mergeMessages(batch.messages)
             batch.messages = []
             batch.state = 'processing'
             resolve(ChainMiddlewareRunStatus.CONTINUE)
         })
     })
 }
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 198b034 and d38fdac.

📒 Files selected for processing (1)
  • packages/core/src/middlewares/chat/message_delay.ts
🧰 Additional context used
🪛 GitHub Check: CodeFactor
packages/core/src/middlewares/chat/message_delay.ts

[warning] 62-62: packages/core/src/middlewares/chat/message_delay.ts#L62
Replace ⏎························ctx,⏎························config,⏎························newBatch,⏎························conversationId⏎···················· with ctx,·config,·newBatch,·conversationId (prettier/prettier)


[warning] 95-95: packages/core/src/middlewares/chat/message_delay.ts#L95
Replace ⏎························ctx,⏎························config,⏎························batch,⏎························conversationId⏎···················· with ctx,·config,·batch,·conversationId (prettier/prettier)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: build
🔇 Additional comments (3)
packages/core/src/middlewares/chat/message_delay.ts (3)

15-22: 接口定义合理

collectWaitersstate 字段的添加符合PR目标,为消息批次引入了明确的状态机模型。


215-234: 成功提取超时逻辑

resetBatchTimeout 函数成功地集中了批次超时的创建和清理逻辑,消除了代码重复,符合之前评审的建议。


154-171: 清理逻辑完善

在清除聊天历史时正确地停止了 collectWaiters,确保了新增的收集等待者也能被妥善清理,避免了潜在的内存泄漏。

Fixed the indentation of resetBatchTimeout calls for better code readability.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
packages/core/src/middlewares/chat/message_delay.ts (1)

177-194: 收集等待逻辑可考虑更防御性的写法

当前实现在 JavaScript 单线程事件循环模型下是安全的——resolveCollectWaiterspush 之间的同步操作不会被其他代码打断。但为了代码意图更明确,可考虑将 resolveCollectWaiters 移入 Promise 构造函数内部:

🔎 可选的防御性重构
 async function awaitCollectingBatch(
     batch: MessageBatch,
     context: ChainMiddlewareContext
 ): Promise<ChainMiddlewareRunStatus> {
-    resolveCollectWaiters(batch, ChainMiddlewareRunStatus.STOP)
     return await new Promise((resolve) => {
+        resolveCollectWaiters(batch, ChainMiddlewareRunStatus.STOP)
         batch.collectWaiters.push((status) => {
📜 Review details

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d38fdac and 05fa325.

📒 Files selected for processing (1)
  • packages/core/src/middlewares/chat/message_delay.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/core/src/middlewares/chat/message_delay.ts (4)
packages/core/src/index.ts (1)
  • logger (38-38)
packages/core/src/chains/chain.ts (1)
  • ChainMiddlewareContext (846-855)
packages/core/src/config.ts (2)
  • Config (3-54)
  • Config (56-167)
packages/core/src/types.ts (1)
  • Message (53-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: build
🔇 Additional comments (9)
packages/core/src/middlewares/chat/message_delay.ts (9)

15-22: 状态机模式设计良好

接口定义清晰地区分了 collectingprocessing 两个状态,配合 collectWaitersresolveWaiters 分别管理收集阶段和处理阶段的等待者。这种设计有效避免了之前 processorResolve 单一 Promise 可能导致的竞态问题。


45-68: 批次初始化逻辑清晰

根据 messageQueueDelay 配置正确初始化状态:延迟 > 0 时进入 collecting 状态等待更多消息,延迟 = 0 时直接进入 processing 状态。第 66 行清空 messages 数组的处理是正确的,因为此时 inputMessage 已在 context.options 中保留。


83-106: 基于状态的分支处理正确

collecting 状态时将消息追加到批次并等待收集完成,processing 状态时等待当前处理完成后再进行中断合并。状态切换逻辑清晰,避免了消息丢失或重复处理的问题。


120-124: 等待者解析模式正确

先复制等待者数组再清空,然后逐个调用。这种模式可以防止在迭代过程中数组被修改导致的问题。


156-158: 完善的清理逻辑

新增对 collectWaiters 的清理确保了清除聊天历史时不会遗留悬挂的 Promise,有效防止内存泄漏。


196-224: 辅助函数提取良好

resolveCollectWaiters 统一了等待者解析逻辑,通过 status 参数区分继续和停止场景。resetBatchTimeout 的提取消除了超时设置的重复代码。这两个重构都提高了代码的可维护性。


226-240: 批次完成等待逻辑清晰

当没有现有等待者时(处理阶段的第一个后续消息),重置消息数组;有等待者时追加消息。这确保了处理阶段的消息正确累积。


255-275: 重复处理问题已修复

第 268 行 newBatch.messages = [] 正确解决了之前评审中指出的潜在重复处理问题。在嵌套等待者合并消息后立即清空数组,防止后续等待者重复处理相同消息。


280-298: 消息合并实现正确

单消息时直接返回,多消息时用双换行符连接内容,并合并所有 additional_kwargs。逻辑简洁有效。

@dingyi222666 dingyi222666 merged commit 5b8dd02 into v1-dev Dec 25, 2025
5 checks passed
@dingyi222666 dingyi222666 deleted the fix/message-queue branch December 25, 2025 16:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants