Skip to content

Conversation

@nodece
Copy link
Member

@nodece nodece commented Mar 12, 2025

Fixes #1332

Motivation

When the producer reconnects, the batching/non-batching messages can not be added to the pending queue, so the timeout error can not be triggered. This leads to situations where SendAsync callbacks are never invoked when the producer is in a reconnecting state, potentially causing resource leaks and unresponsive applications.

Modifications

  • Separated internal data writing and network requests from the event loop.
    • Data flow: SendAsync -> dataChan -> writeChan -> broker.
  • Fixed payload size handling in TestProducerSendWithContext test:
    time="2025-03-16T09:05:13Z" level=error msg="Single message serialize failed %!s(<nil>)" error="encryptedPayload 
    exceeds MaxMessageSize, size: 1048607, MaxMessageSize: 1048576" producerID=1 producer_name=standalone-0- 
    358 topic="persistent://public/default/my-topic-857833034"
    
  • Added a comprehensive test case TestProducerKeepReconnectingAndThenCallSendAsync that verifies:
    • SendAsync callbacks are properly invoked with appropriate errors during reconnection.
    • The fix works correctly with both batching enabled and disabled configurations.

@nodece nodece marked this pull request as draft March 12, 2025 17:36
@nodece nodece force-pushed the fix-SendAsync-callback branch 4 times, most recently from f38a7ad to 61794b2 Compare March 16, 2025 07:53
@nodece nodece marked this pull request as ready for review March 16, 2025 15:04
@nodece nodece self-assigned this Mar 16, 2025
@nodece
Copy link
Member Author

nodece commented Mar 16, 2025

@gunli Could you have a chance to review?

@BewareMyPower BewareMyPower added this to the v0.15.0 milestone Mar 31, 2025
Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR introduces two separate message writing paths:

  1. Non-batch message: SendAsync -> dataChan
  2. Batch messages: SendAsync -> batchChan -> dataChan

Both paths are asynchronous, which breaks the message order guaranteed by SendAsync.

For example, if we send two messages using SendAsync like this:

producer.SendAsync(context.Background(), &ProducerMessage{
    Payload: []byte("A"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
    errChan <- err
})

producer.SendAsync(context.Background(), &ProducerMessage{
    Payload: []byte("B"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
    errChan <- err
})

We need to ensure the message order is A, B.

@nodece nodece marked this pull request as draft March 31, 2025 17:03
@nodece nodece force-pushed the fix-SendAsync-callback branch 5 times, most recently from 926c0f3 to 081e7a5 Compare April 1, 2025 13:26
@nodece nodece requested a review from RobertIndie April 1, 2025 13:27
@nodece nodece marked this pull request as ready for review April 1, 2025 13:28
@nodece nodece force-pushed the fix-SendAsync-callback branch from 081e7a5 to 687c80a Compare April 3, 2025 09:51
@nodece
Copy link
Member Author

nodece commented Apr 7, 2025

ping @RobertIndie

Comment on lines 2672 to 2678

// send again
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// send again
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})
for i := 0; i < 10; i++ {
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(id MessageID, producerMessage *ProducerMessage, err error) {
fmt.Println("send async callback", id, producerMessage, err)
})
}
// send again
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})

You can eaisly reproduce the issue by sending some messages before this step. And also set the BatchingMaxMessages of the producer to a smaller value like 5.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@nodece nodece force-pushed the fix-SendAsync-callback branch from edfce2f to 933807b Compare April 18, 2025 07:18
@nodece nodece requested a review from RobertIndie April 18, 2025 07:18
options: options,
producerID: client.rpcClient.NewProducerID(),
dataChan: make(chan *sendRequest, maxPendingMessages),
writeChan: make(chan *pendingItem, maxPendingMessages),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't address the root cause. If we send too many messages, the writeChan can still get stuck, and the issue persists. I can reproduce the problem by producing at least maxPendingMessages after the cluster is shut down.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RobertIndie Once the internal queue reaches its configured capacity (maxPendingMessages), the producer should either block or reject incoming messages based on the DisableBlockIfQueueFull setting:

  • If DisableBlockIfQueueFull is false, the producer should block until space becomes available.
  • If DisableBlockIfQueueFull is true, the producer should fail fast by rejecting new messages.

Proper backpressure handling is essential to avoid unbounded memory usage and ensure the system remains responsive under load or during broker unavailability.

By default, DisableBlockIfQueueFull is set to false, which means the producer will block when the number of pending messages exceeds maxPendingMessages. If it is explicitly set to true, the producer will fail fast with a producer send queue is full error.

I think this topic is different from the current PR and might be better addressed separately. Let’s keep this PR focused on the specific issue it aims to resolve.

@nodece
Copy link
Member Author

nodece commented Sep 16, 2025

Closed by #1422

@nodece nodece closed this Sep 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug][Producer] The callback was not invoked during reconnecting.

3 participants