Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Fix negative subscription/consumer's unack-messages #24096

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Mar 19, 2025

Motivation

Issue: negative unack-messages

  • Consumer-1 received messages.
  • Unload the topic.
  • The message may be sent to consumer-2, but the consumption of consumer-1 is still in progress now.
  • Consumer-1 and consumer-2 acknowledge the message concurrently.

unack-messages may be reduced twice in above scenario, you can reproduce the issue by the new test testAcknowledgeConcurrently

Modifications

  • Only reduce unack-messages who actually deleted messages successfully.
  • This PR also fixes the issue of the unack-messages is not accurate
  • The current PR does not fix the same issue when enable TXN, we need a separate PR to fix it

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added type/bug The PR fixed a bug or issue reported a bug release/4.0.4 labels Mar 19, 2025
@poorbarcode poorbarcode added this to the 4.1.0 milestone Mar 19, 2025
@poorbarcode poorbarcode self-assigned this Mar 19, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Mar 19, 2025
Comment on lines -365 to -368
for (Consumer consumer : consumerList) {
consumer.getPendingAcks()
.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this removed in this PR?
There's a comment on line 361 "remove possible expired messages from redelivery tracker and pending acks".
I guess the original code would need more explanation, but the reason for having this is that in Pulsar is avoid issues with Key_Shared when messages get expired (or the mark delete position moves forward as a result of admin operations). I guess a test case would need to be added to ensure that this doesn't get accidentially removed. Another impact besides causing issues to Key_Shared would be that it causes a temporary memory leak when pending acks aren't removed when messages are expired.

Copy link
Contributor Author

@poorbarcode poorbarcode Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or the mark delete position moves forward as a result of admin operations

I think that rather than always removing consumers' pending ack when reading entries, we'd better delete consumers' pending acks when the md-position is moving.

Why is this removed in this PR?

readMoreEntries runs in a different thread, which may cause a race condition below

  • readMoreEntries removes consumers' pending acks
  • after delete messages reduces un-ack messages by consumers' pending acks, but it gets a null value because the task readMoreEntries already removed them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or the mark delete position moves forward as a result of admin operations

I think that rather than always removing consumers' pending ack when reading entries, we'd better delete consumers' pending acks when the md-position is moving.

Since this logic isn't covered in this PR, I think that removing the existing logic should be omitted from this PR. As mentioned in the previous comment there would be a temporarily memory leak and potential draining hashes being blocked infinitely in Key_Share if this logic would be removed. Most likely we don't have tests to cover this case at the moment, but it doesn't mean that we could just remove the logic in this PR

The mark delete position is updated frequently compared to readMoreEntries calls. That's the reason why it's more efficient to handle the consumer side of mark delete position advancing.

Why is this removed in this PR?

readMoreEntries runs in a different thread, which may cause a race condition below

  • readMoreEntries removes consumers' pending acks
  • after delete messages reduces un-ack messages by consumers' pending acks, but it gets a null value because the task readMoreEntries already removed them.

Also for the negative unack message count, there would need to be a solution for handling message expiration, where the mark delete position moves forward without all messages being acknowledged.
It's not OK to remove calls to getPendingAcks().removeAllUpTo in this PR.

I just wonder what the race conditions are since in PendingAcksMap, there are locks to prevent race conditions. That's exactly the reason why the there's a read and write lock so that the acknowledgements happening in a different thread don't cause problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After you run ConsumerAckAndDisableBrokerBatchAckTest.testAcknowledgeConcurrently more than 200 times, you will get an issue, which was leads by these lines

Copy link
Member

@lhotari lhotari Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After you run ConsumerAckAndDisableBrokerBatchAckTest.testAcknowledgeConcurrently more than 200 times, you will get an issue, which was leads by these lines

@poorbarcode What issue exactly happened?
How are pending acks removed (in the case of message expiration) if this code is being removed? That's something that you should address in this PR if you prefer to remove this code in this PR.

Copy link
Contributor Author

@poorbarcode poorbarcode Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are pending acks removed if this code is being removed?

I described here: #24096 (comment)

That's something that you should address in this PR if you prefer to remove this code in this PR.

I feel that pip-379 should solve this problem because these lines of code were added by PIP-379 in order avoid to solving the problems you described at #24096 (comment)

Another solution is to left these lines there, then the test ConsumerAckAndDisableBrokerBatchAckTest.testAcknowledgeConcurrently will be a flaky test. And we will leave an issue that consumer.un-ack-messages may be larger than expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari

Maybe we can solve the potential issue of pending acks that you mentioned here one by one, but it will be slow because I am busy with other works. Could you explain more information you know, more detail is better. ❤️

@poorbarcode
Copy link
Contributor Author

Rebased master branch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs ready-to-test release/4.0.5 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants