-
Notifications
You must be signed in to change notification settings - Fork 368
Fix: SendAsync callback was not invoked when producer is in reconnecting #1345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
f38a7ad to
61794b2
Compare
|
@gunli Could you have a chance to review? |
RobertIndie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR introduces two separate message writing paths:
- Non-batch message: SendAsync -> dataChan
- Batch messages: SendAsync -> batchChan -> dataChan
Both paths are asynchronous, which breaks the message order guaranteed by SendAsync.
For example, if we send two messages using SendAsync like this:
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("A"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("B"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
errChan <- err
})We need to ensure the message order is A, B.
926c0f3 to
081e7a5
Compare
081e7a5 to
687c80a
Compare
|
ping @RobertIndie |
|
|
||
| // send again | ||
| testProducer.SendAsync(context.Background(), &ProducerMessage{ | ||
| Payload: []byte("test"), | ||
| }, func(_ MessageID, _ *ProducerMessage, err error) { | ||
| errChan <- err | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // send again | |
| testProducer.SendAsync(context.Background(), &ProducerMessage{ | |
| Payload: []byte("test"), | |
| }, func(_ MessageID, _ *ProducerMessage, err error) { | |
| errChan <- err | |
| }) | |
| for i := 0; i < 10; i++ { | |
| testProducer.SendAsync(context.Background(), &ProducerMessage{ | |
| Payload: []byte("test"), | |
| }, func(id MessageID, producerMessage *ProducerMessage, err error) { | |
| fmt.Println("send async callback", id, producerMessage, err) | |
| }) | |
| } | |
| // send again | |
| testProducer.SendAsync(context.Background(), &ProducerMessage{ | |
| Payload: []byte("test"), | |
| }, func(_ MessageID, _ *ProducerMessage, err error) { | |
| errChan <- err | |
| }) |
You can eaisly reproduce the issue by sending some messages before this step. And also set the BatchingMaxMessages of the producer to a smaller value like 5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
edfce2f to
933807b
Compare
| options: options, | ||
| producerID: client.rpcClient.NewProducerID(), | ||
| dataChan: make(chan *sendRequest, maxPendingMessages), | ||
| writeChan: make(chan *pendingItem, maxPendingMessages), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't address the root cause. If we send too many messages, the writeChan can still get stuck, and the issue persists. I can reproduce the problem by producing at least maxPendingMessages after the cluster is shut down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RobertIndie Once the internal queue reaches its configured capacity (maxPendingMessages), the producer should either block or reject incoming messages based on the DisableBlockIfQueueFull setting:
- If
DisableBlockIfQueueFullisfalse, the producer should block until space becomes available. - If
DisableBlockIfQueueFullistrue, the producer should fail fast by rejecting new messages.
Proper backpressure handling is essential to avoid unbounded memory usage and ensure the system remains responsive under load or during broker unavailability.
By default, DisableBlockIfQueueFull is set to false, which means the producer will block when the number of pending messages exceeds maxPendingMessages. If it is explicitly set to true, the producer will fail fast with a producer send queue is full error.
I think this topic is different from the current PR and might be better addressed separately. Let’s keep this PR focused on the specific issue it aims to resolve.
|
Closed by #1422 |
Fixes #1332
Motivation
When the producer reconnects, the batching/non-batching messages can not be added to the pending queue, so the timeout error can not be triggered. This leads to situations where SendAsync callbacks are never invoked when the producer is in a reconnecting state, potentially causing resource leaks and unresponsive applications.
Modifications
TestProducerSendWithContexttest:TestProducerKeepReconnectingAndThenCallSendAsyncthat verifies: