-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix] [broker] Fix negative subscription/consumer's unack-messages #24096
base: master
Are you sure you want to change the base?
Conversation
for (Consumer consumer : consumerList) { | ||
consumer.getPendingAcks() | ||
.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this removed in this PR?
There's a comment on line 361 "remove possible expired messages from redelivery tracker and pending acks".
I guess the original code would need more explanation, but the reason for having this is that in Pulsar is avoid issues with Key_Shared when messages get expired (or the mark delete position moves forward as a result of admin operations). I guess a test case would need to be added to ensure that this doesn't get accidentially removed. Another impact besides causing issues to Key_Shared would be that it causes a temporary memory leak when pending acks aren't removed when messages are expired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or the mark delete position moves forward as a result of admin operations
I think that rather than always removing consumers' pending ack when reading entries, we'd better delete consumers' pending acks when the md-position is moving.
Why is this removed in this PR?
readMoreEntries
runs in a different thread, which may cause a race condition below
readMoreEntries
removes consumers' pending acksafter delete messages
reduces un-ack messages by consumers' pending acks, but it gets a null value because the taskreadMoreEntries
already removed them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or the mark delete position moves forward as a result of admin operations
I think that rather than always removing consumers' pending ack when reading entries, we'd better delete consumers' pending acks when the md-position is moving.
Since this logic isn't covered in this PR, I think that removing the existing logic should be omitted from this PR. As mentioned in the previous comment there would be a temporarily memory leak and potential draining hashes being blocked infinitely in Key_Share if this logic would be removed. Most likely we don't have tests to cover this case at the moment, but it doesn't mean that we could just remove the logic in this PR
The mark delete position is updated frequently compared to readMoreEntries
calls. That's the reason why it's more efficient to handle the consumer side of mark delete position advancing.
Why is this removed in this PR?
readMoreEntries
runs in a different thread, which may cause a race condition below
readMoreEntries
removes consumers' pending acksafter delete messages
reduces un-ack messages by consumers' pending acks, but it gets a null value because the taskreadMoreEntries
already removed them.
Also for the negative unack message count, there would need to be a solution for handling message expiration, where the mark delete position moves forward without all messages being acknowledged.
It's not OK to remove calls to getPendingAcks().removeAllUpTo
in this PR.
I just wonder what the race conditions are since in PendingAcksMap
, there are locks to prevent race conditions. That's exactly the reason why the there's a read and write lock so that the acknowledgements happening in a different thread don't cause problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After you run ConsumerAckAndDisableBrokerBatchAckTest.testAcknowledgeConcurrently
more than 200
times, you will get an issue, which was leads by these lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After you run ConsumerAckAndDisableBrokerBatchAckTest.testAcknowledgeConcurrently more than 200 times, you will get an issue, which was leads by these lines
@poorbarcode What issue exactly happened?
How are pending acks removed (in the case of message expiration) if this code is being removed? That's something that you should address in this PR if you prefer to remove this code in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are pending acks removed if this code is being removed?
I described here: #24096 (comment)
That's something that you should address in this PR if you prefer to remove this code in this PR.
I feel that pip-379 should solve this problem because these lines of code were added by PIP-379 in order avoid to solving the problems you described at #24096 (comment)
Another solution is to left these lines there, then the test ConsumerAckAndDisableBrokerBatchAckTest.testAcknowledgeConcurrently
will be a flaky test. And we will leave an issue that consumer.un-ack-messages
may be larger than expected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Outdated
Show resolved
Hide resolved
286e024
to
6c6ab23
Compare
Rebased master branch |
Motivation
Issue: negative unack-messages
unack-messages
may be reduced twice in above scenario, you can reproduce the issue by the new testtestAcknowledgeConcurrently
Modifications
unack-messages
who actually deleted messages successfully.unack-messages
is not accurateDocumentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x