Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] PIP-379 Key_Shared implementation racecondition causing out-of-order message delivery #23874

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
67605e7
Add solution to PulsarMockBookKeeper for intercepting reads
lhotari Oct 21, 2024
e40461f
Improve quiet time implementation in receiveMessages
lhotari Jan 17, 2025
7e69af5
Add debug when added to replay
lhotari Jan 17, 2025
6a354df
Enable test logging at debug level, add more logging
lhotari Jan 20, 2025
7c1d3db
Cancel pending read
lhotari Jan 20, 2025
47f7583
Add debug log to skipping pending replay read
lhotari Jan 21, 2025
4dc843b
Add test
lhotari Jan 16, 2025
0512f38
Notify also when the consumer isn't closing
lhotari Jan 21, 2025
3913e64
Postpone removals after critical sections to prevent race conditions
lhotari Jan 21, 2025
fa60a7a
Add test and more docs for OutsideCriticalSectionsExecutor
lhotari Jan 21, 2025
ec898d6
Adjust test logging - disable debug logging for key_shared related di…
lhotari Jan 21, 2025
7bc7b92
Fix failing test
lhotari Jan 21, 2025
7bca0ad
Fix race condition in test
lhotari Jan 21, 2025
c8d14f3
Revert "Add solution to PulsarMockBookKeeper for intercepting reads"
lhotari Jan 22, 2025
799fab9
Revert "Improve quiet time implementation in receiveMessages"
lhotari Jan 22, 2025
d8361bf
Fix test after reverting read handle interceptor changes
lhotari Jan 22, 2025
eef83e9
Revert "Add test and more docs for OutsideCriticalSectionsExecutor"
lhotari Jan 22, 2025
31db5fa
Revert "Postpone removals after critical sections to prevent race con…
lhotari Jan 22, 2025
231b300
Fix test after reverting previous changes
lhotari Jan 22, 2025
e734cdf
Add solution by Yubiao to prevent race condition in unblocking while …
lhotari Jan 22, 2025
a5d2029
Add "already blocked hashes" solution to dispatching phase
lhotari Jan 22, 2025
450069a
Run test with most classic and PIP-379 implementation
lhotari Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public void reduceRefCount(Consumer consumer, int stickyHash, boolean closing) {
lock.writeLock().lock();
try {
removed = drainingHashes.remove(stickyHash);
if (!closing && removed.isBlocking()) {
if (removed.isBlocking()) {
if (batchLevel > 0) {
unblockedWhileBatching = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,13 @@ public synchronized void readMoreEntries() {
}
return;
}
if (havePendingReplayRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to replay in-progress.", topic.getName(),
getSubscriptionName());
}
return;
}
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
Expand Down Expand Up @@ -379,13 +386,23 @@ public synchronized void readMoreEntries() {
long bytesToRead = calculateResult.getRight();

if (messagesToRead == -1 || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
// Skip read as topic/dispatcher has exceed the dispatch rate
return;
}

Set<Position> messagesToReplayNow =
canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet();
if (!messagesToReplayNow.isEmpty()) {
// before replaying, cancel possible pending read that is waiting for more entries
cancelPendingRead();
if (havePendingRead) {
// skip read since a pending read is already in progress which cannot be cancelled
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping replay read for the topic, Due to pending read in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
messagesToReplayNow.size(), consumerList.size());
Expand Down Expand Up @@ -615,13 +632,6 @@ protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits)
}
}

if (havePendingReplayRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
}
return Pair.of(-1, -1L);
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
messagesToRead = Math.max(messagesToRead, 1);
bytesToRead = Math.max(bytesToRead, 1);
Expand Down Expand Up @@ -717,6 +727,12 @@ public SubType getType() {
public final synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
ReadType readType = (ReadType) ctx;
if (readType == ReadType.Normal) {
if (!havePendingRead) {
log.debug("Discarding read entries as there is no pending read");
entries.forEach(Entry::release);
readMoreEntriesAsync();
return;
}
havePendingRead = false;
} else {
havePendingReplayRead = false;
Expand Down Expand Up @@ -1422,6 +1438,9 @@ public void cursorIsReset() {

protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) {
if (checkIfMessageIsUnacked(ledgerId, entryId)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Adding message to replay for {}:{} hash: {}", name, ledgerId, entryId, stickyKeyHash);
}
redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET;
import com.google.common.annotations.VisibleForTesting;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -407,6 +409,8 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
Set<Consumer> blockedByHashConsumers = lookAheadAllowed && readType == ReadType.Normal ? new HashSet<>() : null;
// in replay read mode, keep track of consumers for entries, used for look-ahead check
Set<Consumer> consumersForEntriesForLookaheadCheck = lookAheadAllowed ? new HashSet<>() : null;
// track already blocked hashes to block any further messages with the same hash
IntSet alreadyBlockedHashes = new IntOpenHashSet();

for (Entry inputEntry : entries) {
EntryAndMetadata entry;
Expand All @@ -419,24 +423,29 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(), getSubscriptionName(), -1));
}
int stickyKeyHash = getStickyKeyHash(entry);
Consumer consumer = selector.select(stickyKeyHash);
Consumer consumer = null;
MutableBoolean blockedByHash = null;
boolean dispatchEntry = false;
if (consumer != null) {
if (lookAheadAllowed) {
consumersForEntriesForLookaheadCheck.add(consumer);
}
blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null;
MutableInt permits =
permitsForConsumer.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(consumer)));
// a consumer was found for the sticky key hash and the entry can be dispatched
if (permits.intValue() > 0
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
// check if the hash is already blocked
boolean hashIsAlreadyBlocked = alreadyBlockedHashes.contains(stickyKeyHash);
if (!hashIsAlreadyBlocked) {
consumer = selector.select(stickyKeyHash);
if (consumer != null) {
if (lookAheadAllowed) {
consumersForEntriesForLookaheadCheck.add(consumer);
}
blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null;
MutableInt permits =
permitsForConsumer.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the entry can be dispatched
if (permits.intValue() > 0
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
}
}
}
if (dispatchEntry) {
Expand All @@ -445,6 +454,10 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
entriesGroupedByConsumer.computeIfAbsent(consumer, k -> new ArrayList<>());
consumerEntries.add(entry);
} else {
if (!hashIsAlreadyBlocked) {
// the hash is blocked, add it to the set of blocked hashes
alreadyBlockedHashes.add(stickyKeyHash);
}
if (blockedByHash != null && blockedByHash.isTrue()) {
// the entry is blocked by hash, add the consumer to the blocked set
blockedByHashConsumers.add(consumer);
Expand Down Expand Up @@ -536,6 +549,9 @@ private class ReplayPositionFilter implements Predicate<Position> {
// tracks the available permits for each consumer for the duration of the filter usage
// the filter is stateful and shouldn't be shared or reused later
private final Map<Consumer, MutableInt> availablePermitsMap = new HashMap<>();
// tracks the hashes that have been blocked during the filtering
// it is necessary to block all later messages after a hash gets blocked so that ordering is preserved
private final Set<Long> alreadyBlockedHashes = new HashSet<>();

@Override
public boolean test(Position position) {
Expand All @@ -553,25 +569,34 @@ public boolean test(Position position) {
}
return true;
}
// check if the hash is already blocked, if so, then replaying of the position should be skipped
// to preserve ordering
if (alreadyBlockedHashes.contains(stickyKeyHash)) {
return false;
}

// find the consumer for the sticky key hash
Consumer consumer = selector.select(stickyKeyHash.intValue());
// skip replaying the message position if there's no assigned consumer
if (consumer == null) {
alreadyBlockedHashes.add(stickyKeyHash);
return false;
}

// lookup the available permits for the consumer
MutableInt availablePermits =
availablePermitsMap.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(consumer)));
// skip replaying the message position if the consumer has no available permits
if (availablePermits.intValue() <= 0) {
alreadyBlockedHashes.add(stickyKeyHash);
return false;
}

if (drainingHashesRequired
&& drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash.intValue())) {
// the hash is draining and the consumer is not the draining consumer
alreadyBlockedHashes.add(stickyKeyHash);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,23 +191,4 @@ public void unblockingHandler_InvokesStickyKeyHashUnblocked() {
// then unblocking call should be done
verify(unblockingHandler).stickyKeyHashUnblocked(1);
}

@Test
public void unblockingHandler_DoesNotInvokeStickyKeyHashUnblockedWhenClosing() {
// given a tracker with unblocking handler
UnblockingHandler unblockingHandler = mock(UnblockingHandler.class);
DrainingHashesTracker tracker = new DrainingHashesTracker("dispatcher1", unblockingHandler);

// when a hash is draining
Consumer consumer = createMockConsumer("consumer1");
tracker.addEntry(consumer, 1);
// aand hash gets blocked
Consumer consumer2 = createMockConsumer("consumer2");
tracker.shouldBlockStickyKeyHash(consumer2, 1);
// and hash gets unblocked
tracker.reduceRefCount(consumer, 1, true);

// then unblocking call should be done
verify(unblockingHandler, never()).stickyKeyHashUnblocked(anyInt());
}
}
Loading
Loading