Skip to content

Commit

Permalink
[improve] [broker] PIP-299-part-1: Stop dispatch messages if the indi…
Browse files Browse the repository at this point in the history
…vidual acks will be lost in the persistent storage (#21423)

The part 1 of [PIP-299](https://github.com/apache/pulsar/pull/21118/files?short_path=cf766b5#diff-cf766b5d463b6832017e482baad14832f6a4d41dc969da279b98b69e26ec6f6a): the implementation of "Stop dispatch messages if the individual acks will be lost in the persistent storage"
  • Loading branch information
poorbarcode authored Dec 27, 2023
1 parent 2d57624 commit 8ff32db
Show file tree
Hide file tree
Showing 12 changed files with 624 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){}
* @return whether this cursor is closed.
*/
boolean isClosed();

default boolean isCursorDataFullyPersistable() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public boolean isCursorDataFullyPersistable() {
return individualDeletedMessages.size() <= config.getMaxUnackedRangesToPersist();
}

@Override
public Map<String, String> getCursorProperties() {
return cursorProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate()));
topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced());
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters());
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled());
this.subscriptionPolicies = data.getSubscriptionPolicies();

updateEntryFilters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
Expand Down Expand Up @@ -141,6 +142,14 @@ default boolean checkAndUnblockIfStuck() {
return false;
}

/**
* A callback hook after acknowledge messages.
* @param exOfDeletion the ex of {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete},
* {@link ManagedCursor#asyncClearBacklog} or {@link ManagedCursor#asyncSkipEntries)}.
* @param ctxOfDeletion the param ctx of calling {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete},
* {@link ManagedCursor#asyncClearBacklog} or {@link ManagedCursor#asyncSkipEntries)}.
*/
default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){}

default long getFilterProcessedMsgCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"totalUnackedMessages");
protected volatile int totalUnackedMessages = 0;
/**
* A signature that relate to the check of "Dispatching has paused on cursor data can fully persist".
* Note: It is a tool that helps determine whether it should trigger a new reading after acknowledgments to avoid
* too many CPU circles, see {@link #afterAckMessages(Throwable, Object)} for more details. Do not use this
* to confirm whether the delivery should be paused, please call {@link #shouldPauseOnAckStatePersist}.
*/
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"blockedDispatcherOnCursorDataCanNotFullyPersist");
private volatile int blockedDispatcherOnCursorDataCanNotFullyPersist = FALSE;
private volatile int blockedDispatcherOnUnackedMsgs = FALSE;
protected static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers>
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
Expand All @@ -123,6 +134,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;


protected enum ReadType {
Normal, Replay
}
Expand Down Expand Up @@ -271,9 +283,17 @@ public synchronized void readMoreEntries() {
if (isSendInProgress()) {
// we cannot read more entries while sending the previous batch
// otherwise we could re-read the same entries and send duplicates
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to sending in-progress.",
topic.getName(), getSubscriptionName());
}
return;
}
if (shouldPauseDeliveryForDelayTracker()) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to pause delivery for delay tracker.",
topic.getName(), getSubscriptionName());
}
return;
}
if (topic.isTransferring()) {
Expand Down Expand Up @@ -322,6 +342,13 @@ public synchronized void readMoreEntries() {
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
}
} else if (!havePendingRead) {
if (shouldPauseOnAckStatePersist(ReadType.Normal)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.",
topic.getName(), getSubscriptionName());
}
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
consumerList.size());
Expand Down Expand Up @@ -359,6 +386,20 @@ public synchronized void readMoreEntries() {
}
}

private boolean shouldPauseOnAckStatePersist(ReadType readType) {
// Allows new consumers to consume redelivered messages caused by the just-closed consumer.
if (readType != ReadType.Normal) {
return false;
}
if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) {
return false;
}
if (cursor == null) {
return true;
}
return blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE;
}

@Override
protected void reScheduleRead() {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Expand Down Expand Up @@ -996,6 +1037,29 @@ public void addUnAckedMessages(int numberOfMessages) {
topic.getBrokerService().addUnAckedMessages(this, numberOfMessages);
}

@Override
public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) {
if (blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE) {
if (cursor.isCursorDataFullyPersistable()) {
// If there was no previous pause due to cursor data is too large to persist, we don't need to manually
// trigger a new read. This can avoid too many CPU circles.
if (BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this, TRUE, FALSE)) {
readMoreEntriesAsync();
} else {
// Retry due to conflict update.
afterAckMessages(exOfDeletion, ctxOfDeletion);
}
}
} else {
if (!cursor.isCursorDataFullyPersistable()) {
if (BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this, FALSE, TRUE)) {
// Retry due to conflict update.
afterAckMessages(exOfDeletion, ctxOfDeletion);
}
}
}
}

public boolean isBlockedDispatcherOnUnackedMsgs() {
return blockedDispatcherOnUnackedMsgs == TRUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,15 @@ private void readMoreEntries(Consumer consumer) {
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
log.debug("[{}] [{}] Skipping read for the topic, Due to the current consumer is null", topic.getName(),
getSubscriptionName());
}
return;
}
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
log.debug("[{}] [{}] Skipping read for the topic, Due to we have pending read.", topic.getName(),
getSubscriptionName());
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ public void markDeleteComplete(Object ctx) {
topicName, subName, newMD, oldMD);
}
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
// Signal the dispatchers to give chance to take extra actions
notifyTheMarkDeletePositionMoveForwardIfNeeded(oldMD);
}

Expand All @@ -451,22 +455,34 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to mark delete for position {}: {}", topicName, subName, ctx, exception);
}
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
}
};

private final DeleteCallback deleteCallback = new DeleteCallback() {
@Override
public void deleteComplete(Object position) {
public void deleteComplete(Object context) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
// The value of the param "context" is a position.
log.debug("[{}][{}] Deleted message at {}", topicName, subName, context);
}
// Signal the dispatchers to give chance to take extra actions
notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) position);
if (dispatcher != null) {
dispatcher.afterAckMessages(null, context);
}
notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) context);
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}][{}] Failed to delete message at {}: {}", topicName, subName, ctx, exception);
// Signal the dispatchers to give chance to take extra actions
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
};

Expand Down Expand Up @@ -645,6 +661,7 @@ public void clearBacklogComplete(Object ctx) {
future.complete(null);
}
});
dispatcher.afterAckMessages(null, ctx);
} else {
future.complete(null);
}
Expand All @@ -654,6 +671,9 @@ public void clearBacklogComplete(Object ctx) {
public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to clear backlog", topicName, subName, exception);
future.completeExceptionally(exception);
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
}, null);

Expand All @@ -677,13 +697,19 @@ public void skipEntriesComplete(Object ctx) {
numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false));
}
future.complete(null);
if (dispatcher != null) {
dispatcher.afterAckMessages(null, ctx);
}
}

@Override
public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to skip {} messages", topicName, subName, numMessagesToSkip,
exception);
future.completeExceptionally(exception);
if (dispatcher != null) {
dispatcher.afterAckMessages(exception, ctx);
}
}
}, null);

Expand Down Expand Up @@ -808,6 +834,7 @@ public void resetComplete(Object ctx) {
}
if (dispatcher != null) {
dispatcher.cursorIsReset();
dispatcher.afterAckMessages(null, finalPosition);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3728,6 +3728,11 @@ public int getMaxUnackedMessagesOnSubscription() {
return topicPolicies.getMaxUnackedMessagesOnSubscription().get();
}

public boolean isDispatcherPauseOnAckStatePersistentEnabled() {
Boolean b = topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().get();
return b == null ? false : b.booleanValue();
}

@Override
public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -104,12 +105,17 @@ public void setup() throws Exception {
doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing();
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
doReturn(false).when(configMock).isAllowOverrideEntryFilters();

pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();

EntryFilterProvider mockEntryFilterProvider = mock(EntryFilterProvider.class);
when(mockEntryFilterProvider.getBrokerEntryFilters()).thenReturn(Collections.emptyList());

brokerMock = mock(BrokerService.class);
doReturn(pulsarMock).when(brokerMock).pulsar();
when(brokerMock.getEntryFilterProvider()).thenReturn(mockEntryFilterProvider);

HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);
Expand Down Expand Up @@ -149,6 +155,7 @@ public void setup() throws Exception {
);

subscriptionMock = mock(PersistentSubscription.class);
when(subscriptionMock.getTopic()).thenReturn(topicMock);
persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
Expand Down
Loading

0 comments on commit 8ff32db

Please sign in to comment.