[fix][client] Avoid redelivering duplicated messages when batching is enabled#18486
Conversation
### Motivation apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.
|
/pulsarbot run-failure-checks |
Codecov Report
@@ Coverage Diff @@
## master #18486 +/- ##
=============================================
+ Coverage 31.39% 48.26% +16.86%
- Complexity 6651 9447 +2796
=============================================
Files 697 618 -79
Lines 68015 58558 -9457
Branches 7285 6091 -1194
=============================================
+ Hits 21353 28262 +6909
+ Misses 43667 27278 -16389
- Partials 2995 3018 +23
Flags with carried forward coverage won't be shown. Click here to find out more.
|
| { 3, 5, CommandAck.AckType.Individual }, | ||
| { 5, 5, CommandAck.AckType.Individual }, | ||
| { 3, 5, CommandAck.AckType.Cumulative }, | ||
| { 5, 5, CommandAck.AckType.Cumulative } |
There was a problem hiding this comment.
5, 5 test case may not stable
There was a problem hiding this comment.
I see, I will take a look soon.
There was a problem hiding this comment.
It's fixed now. The root cause is that after b0fad40, a BatchedMessageIdImpl will be compared with a MessageIdImpl, which should be forbidden.
liangyepianzhou
left a comment
There was a problem hiding this comment.
LGTM, good work, and the root cause of the flaky test should be this.
@congbobo184 See the TODO in my PR description. I think it's a bug for batch index ACK and it needs some extra work to fix. |
...nt/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
Outdated
Show resolved
Hide resolved
sorry, I miss your description. doesn't the batch index ack fix should include in release/2.9.4? |
…#18486) #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e)
I think not. It's not a regression. It's a bug from the beginning when batch index ACK was introduced. |
## Motivation 1. fix flaky test #18466 caused by txn async send method 2. decrease run time by optimizing receive method ## Modification 1. fix flaky test * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` This also can be resolved by #17836 and #18486 later. 2. decrease run time by optimizing receive method * modify ` Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNull(message);` to ` Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message);` * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);` * keep other `consumer.receive(x, y)` no change.
…#18486) #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e)
…apache#18486) apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e) (cherry picked from commit 870b060)
…#18486) #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8 (cherry picked from commit be1d07e)
…apache#18486) ### Motivation apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: BewareMyPower#8
## Motivation 1. fix flaky test apache#18466 caused by txn async send method 2. decrease run time by optimizing receive method ## Modification 1. fix flaky test * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` This also can be resolved by apache#17836 and apache#18486 later. 2. decrease run time by optimizing receive method * modify ` Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNull(message);` to ` Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message);` * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);` * keep other `consumer.receive(x, y)` no change.
…apache#18486) ### Motivation apache#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by apache#18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. ### Modifications Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in apache#18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. ### TODO The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> ### Matching PR in forked repository PR in forked repository: BewareMyPower#8
## Motivation 1. fix flaky test apache#18466 caused by txn async send method 2. decrease run time by optimizing receive method ## Modification 1. fix flaky test * modify `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();` This also can be resolved by apache#17836 and apache#18486 later. 2. decrease run time by optimizing receive method * modify ` Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); Assert.assertNull(message);` to ` Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS); Assert.assertNull(message);` * modify `message = consumer.receive();` to `message = consumer.receive(5, TimeUnit.SECONDS);` * keep other `consumer.receive(x, y)` no change.
…#18486) #18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the `testConsumerDedup` test modified by #18454. The root cause is that single messages will still be passed into the `isDuplicated` method in `receiveIndividualMessagesFromBatch`. However, in this case, the `MessageId` is a `BatchedMessageIdImpl`, while the `MessageId` in `lastCumulativeAck` or `pendingIndividualAcks` are `MessageIdImpl` implementations. Validate the class type in `isDuplicated` and convert a `BatchedMessageIdImpl` to `MessageIdImpl`. Then revert the unnecessary changes in #18454. `ConsumerRedeliveryTest#testAckNotSent` is added to verify it works. The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it. <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> PR in forked repository: BewareMyPower#8
Motivation
#18454 fixed the potential message loss when a batched message is redelivered and one single message of the batch is added to the ACK tracker. However, it also leads to a potential message duplication, see the
testConsumerDeduptest modified by #18454.The root cause is that single messages will still be passed into the
isDuplicatedmethod inreceiveIndividualMessagesFromBatch. However, in this case, theMessageIdis aBatchedMessageIdImpl, while theMessageIdinlastCumulativeAckorpendingIndividualAcksareMessageIdImplimplementations.Modifications
Validate the class type in
isDuplicatedand convert aBatchedMessageIdImpltoMessageIdImpl. Then revert the unnecessary changes in #18454.ConsumerRedeliveryTest#testAckNotSentis added to verify it works.TODO
The duplication could still happen when batch index ACK is enabled. Because even after the ACK tracker is flushed, if only parts of a batched message are not acknowledged, the whole batched message would still be redelivered. I will open another PR to fix it.
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: BewareMyPower#8