From 8ff32dbf9eb63de3554ed546703030d4345b6d27 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 28 Dec 2023 03:22:44 +0800 Subject: [PATCH] [improve] [broker] PIP-299-part-1: Stop dispatch messages if the individual acks will be lost in the persistent storage (#21423) The part 1 of [PIP-299](https://github.com/apache/pulsar/pull/21118/files?short_path=cf766b5#diff-cf766b5d463b6832017e482baad14832f6a4d41dc969da279b98b69e26ec6f6a): the implementation of "Stop dispatch messages if the individual acks will be lost in the persistent storage" --- .../bookkeeper/mledger/ManagedCursor.java | 4 + .../mledger/impl/ManagedCursorImpl.java | 5 + .../pulsar/broker/service/AbstractTopic.java | 2 + .../pulsar/broker/service/Dispatcher.java | 9 + ...PersistentDispatcherMultipleConsumers.java | 64 +++ ...sistentDispatcherSingleActiveConsumer.java | 6 +- .../persistent/PersistentSubscription.java | 33 +- .../service/persistent/PersistentTopic.java | 5 + ...ckyKeyDispatcherMultipleConsumersTest.java | 7 + ...SubscriptionPauseOnAckStatPersistTest.java | 491 ++++++++++++++++++ .../policies/data/HierarchyTopicPolicies.java | 2 + .../common/policies/data/TopicPolicies.java | 1 + 12 files changed, 624 insertions(+), 5 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index d1ffdf6d2d763..68f1840afd8b2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -862,4 +862,8 @@ default void skipNonRecoverableLedger(long ledgerId){} * @return whether this cursor is closed. */ boolean isClosed(); + + default boolean isCursorDataFullyPersistable() { + return true; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 4b65d62f0eee8..fb77f1de624c3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -340,6 +340,11 @@ public Map getProperties() { return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap(); } + @Override + public boolean isCursorDataFullyPersistable() { + return individualDeletedMessages.size() <= config.getMaxUnackedRangesToPersist(); + } + @Override public Map getCursorProperties() { return cursorProperties; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 837f073b00dba..ba35c8a280e9e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -249,6 +249,8 @@ protected void updateTopicPolicy(TopicPolicies data) { topicPolicies.getDispatchRate().updateTopicValue(DispatchRateImpl.normalize(data.getDispatchRate())); topicPolicies.getSchemaValidationEnforced().updateTopicValue(data.getSchemaValidationEnforced()); topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters()); + topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled() + .updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled()); this.subscriptionPolicies = data.getSubscriptionPolicies(); updateEntryFilters(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index 08e5caaa2ddd5..aebafacd584b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; @@ -141,6 +142,14 @@ default boolean checkAndUnblockIfStuck() { return false; } + /** + * A callback hook after acknowledge messages. + * @param exOfDeletion the ex of {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete}, + * {@link ManagedCursor#asyncClearBacklog} or {@link ManagedCursor#asyncSkipEntries)}. + * @param ctxOfDeletion the param ctx of calling {@link org.apache.bookkeeper.mledger.ManagedCursor#asyncDelete}, + * {@link ManagedCursor#asyncClearBacklog} or {@link ManagedCursor#asyncSkipEntries)}. + */ + default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){} default long getFilterProcessedMsgCount() { return 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index cafc398a3c843..0d1f198a7ca7e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -113,6 +113,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages"); protected volatile int totalUnackedMessages = 0; + /** + * A signature that relate to the check of "Dispatching has paused on cursor data can fully persist". + * Note: It is a tool that helps determine whether it should trigger a new reading after acknowledgments to avoid + * too many CPU circles, see {@link #afterAckMessages(Throwable, Object)} for more details. Do not use this + * to confirm whether the delivery should be paused, please call {@link #shouldPauseOnAckStatePersist}. + */ + protected static final AtomicIntegerFieldUpdater + BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, + "blockedDispatcherOnCursorDataCanNotFullyPersist"); + private volatile int blockedDispatcherOnCursorDataCanNotFullyPersist = FALSE; private volatile int blockedDispatcherOnUnackedMsgs = FALSE; protected static final AtomicIntegerFieldUpdater BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER = @@ -123,6 +134,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected final ExecutorService dispatchMessagesThread; private final SharedConsumerAssignor assignor; + protected enum ReadType { Normal, Replay } @@ -271,9 +283,17 @@ public synchronized void readMoreEntries() { if (isSendInProgress()) { // we cannot read more entries while sending the previous batch // otherwise we could re-read the same entries and send duplicates + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Skipping read for the topic, Due to sending in-progress.", + topic.getName(), getSubscriptionName()); + } return; } if (shouldPauseDeliveryForDelayTracker()) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Skipping read for the topic, Due to pause delivery for delay tracker.", + topic.getName(), getSubscriptionName()); + } return; } if (topic.isTransferring()) { @@ -322,6 +342,13 @@ public synchronized void readMoreEntries() { totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription()); } } else if (!havePendingRead) { + if (shouldPauseOnAckStatePersist(ReadType.Normal)) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.", + topic.getName(), getSubscriptionName()); + } + return; + } if (log.isDebugEnabled()) { log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead, consumerList.size()); @@ -359,6 +386,20 @@ public synchronized void readMoreEntries() { } } + private boolean shouldPauseOnAckStatePersist(ReadType readType) { + // Allows new consumers to consume redelivered messages caused by the just-closed consumer. + if (readType != ReadType.Normal) { + return false; + } + if (!((PersistentTopic) subscription.getTopic()).isDispatcherPauseOnAckStatePersistentEnabled()) { + return false; + } + if (cursor == null) { + return true; + } + return blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE; + } + @Override protected void reScheduleRead() { if (isRescheduleReadInProgress.compareAndSet(false, true)) { @@ -996,6 +1037,29 @@ public void addUnAckedMessages(int numberOfMessages) { topic.getBrokerService().addUnAckedMessages(this, numberOfMessages); } + @Override + public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) { + if (blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE) { + if (cursor.isCursorDataFullyPersistable()) { + // If there was no previous pause due to cursor data is too large to persist, we don't need to manually + // trigger a new read. This can avoid too many CPU circles. + if (BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this, TRUE, FALSE)) { + readMoreEntriesAsync(); + } else { + // Retry due to conflict update. + afterAckMessages(exOfDeletion, ctxOfDeletion); + } + } + } else { + if (!cursor.isCursorDataFullyPersistable()) { + if (BLOCKED_DISPATCHER_ON_CURSOR_DATA_CAN_NOT_FULLY_PERSIST_UPDATER.compareAndSet(this, FALSE, TRUE)) { + // Retry due to conflict update. + afterAckMessages(exOfDeletion, ctxOfDeletion); + } + } + } + } + public boolean isBlockedDispatcherOnUnackedMsgs() { return blockedDispatcherOnUnackedMsgs == TRUE; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 806773af45189..3fac65a3ce189 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -319,13 +319,15 @@ private void readMoreEntries(Consumer consumer) { // so skip reading more entries if currently there is no active consumer. if (null == consumer) { if (log.isDebugEnabled()) { - log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName()); + log.debug("[{}] [{}] Skipping read for the topic, Due to the current consumer is null", topic.getName(), + getSubscriptionName()); } return; } if (havePendingRead) { if (log.isDebugEnabled()) { - log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName()); + log.debug("[{}] [{}] Skipping read for the topic, Due to we have pending read.", topic.getName(), + getSubscriptionName()); } return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index dc79146110f00..812747860456c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -442,6 +442,10 @@ public void markDeleteComplete(Object ctx) { topicName, subName, newMD, oldMD); } // Signal the dispatchers to give chance to take extra actions + if (dispatcher != null) { + dispatcher.afterAckMessages(null, ctx); + } + // Signal the dispatchers to give chance to take extra actions notifyTheMarkDeletePositionMoveForwardIfNeeded(oldMD); } @@ -451,22 +455,34 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Failed to mark delete for position {}: {}", topicName, subName, ctx, exception); } + // Signal the dispatchers to give chance to take extra actions + if (dispatcher != null) { + dispatcher.afterAckMessages(null, ctx); + } } }; private final DeleteCallback deleteCallback = new DeleteCallback() { @Override - public void deleteComplete(Object position) { + public void deleteComplete(Object context) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Deleted message at {}", topicName, subName, position); + // The value of the param "context" is a position. + log.debug("[{}][{}] Deleted message at {}", topicName, subName, context); } // Signal the dispatchers to give chance to take extra actions - notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) position); + if (dispatcher != null) { + dispatcher.afterAckMessages(null, context); + } + notifyTheMarkDeletePositionMoveForwardIfNeeded((PositionImpl) context); } @Override public void deleteFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}][{}] Failed to delete message at {}: {}", topicName, subName, ctx, exception); + // Signal the dispatchers to give chance to take extra actions + if (dispatcher != null) { + dispatcher.afterAckMessages(exception, ctx); + } } }; @@ -645,6 +661,7 @@ public void clearBacklogComplete(Object ctx) { future.complete(null); } }); + dispatcher.afterAckMessages(null, ctx); } else { future.complete(null); } @@ -654,6 +671,9 @@ public void clearBacklogComplete(Object ctx) { public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}][{}] Failed to clear backlog", topicName, subName, exception); future.completeExceptionally(exception); + if (dispatcher != null) { + dispatcher.afterAckMessages(exception, ctx); + } } }, null); @@ -677,6 +697,9 @@ public void skipEntriesComplete(Object ctx) { numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); } future.complete(null); + if (dispatcher != null) { + dispatcher.afterAckMessages(null, ctx); + } } @Override @@ -684,6 +707,9 @@ public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}][{}] Failed to skip {} messages", topicName, subName, numMessagesToSkip, exception); future.completeExceptionally(exception); + if (dispatcher != null) { + dispatcher.afterAckMessages(exception, ctx); + } } }, null); @@ -808,6 +834,7 @@ public void resetComplete(Object ctx) { } if (dispatcher != null) { dispatcher.cursorIsReset(); + dispatcher.afterAckMessages(null, finalPosition); } IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE); future.complete(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 48069cf555448..c199cd2c954f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3728,6 +3728,11 @@ public int getMaxUnackedMessagesOnSubscription() { return topicPolicies.getMaxUnackedMessagesOnSubscription().get(); } + public boolean isDispatcherPauseOnAckStatePersistentEnabled() { + Boolean b = topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled().get(); + return b == null ? false : b.booleanValue(); + } + @Override public void onUpdate(TopicPolicies policies) { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 9082a9caafcb4..8a83735287e60 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -65,6 +65,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes; import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -104,12 +105,17 @@ public void setup() throws Exception { doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); + doReturn(false).when(configMock).isAllowOverrideEntryFilters(); pulsarMock = mock(PulsarService.class); doReturn(configMock).when(pulsarMock).getConfiguration(); + EntryFilterProvider mockEntryFilterProvider = mock(EntryFilterProvider.class); + when(mockEntryFilterProvider.getBrokerEntryFilters()).thenReturn(Collections.emptyList()); + brokerMock = mock(BrokerService.class); doReturn(pulsarMock).when(brokerMock).pulsar(); + when(brokerMock.getEntryFilterProvider()).thenReturn(mockEntryFilterProvider); HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies(); topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0); @@ -149,6 +155,7 @@ public void setup() throws Exception { ); subscriptionMock = mock(PersistentSubscription.class); + when(subscriptionMock.getTopic()).thenReturn(topicMock); persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers( topicMock, cursorMock, subscriptionMock, configMock, new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java new file mode 100644 index 0000000000000..0029e61df4c49 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.GetStatsOptions; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.awaitility.reflect.WhiteboxImpl; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class SubscriptionPauseOnAckStatPersistTest extends ProducerConsumerBase { + + private static final int MAX_UNACKED_RANGES_TO_PERSIST = 50; + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + protected void doInitConf() throws Exception { + conf.setManagedLedgerMaxUnackedRangesToPersist(MAX_UNACKED_RANGES_TO_PERSIST); + } + + private void enablePolicyDispatcherPauseOnAckStatePersistent(String tpName) { + TopicPolicies policies = new TopicPolicies(); + policies.setDispatcherPauseOnAckStatePersistentEnabled(true); + policies.setIsGlobal(false); + SystemTopicBasedTopicPoliciesService policiesService = + (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService(); + Map policiesCache = + WhiteboxImpl.getInternalState(policiesService, "policiesCache"); + policiesCache.put(TopicName.get(tpName), policies); + } + + private void cancelPendingRead(String tpName, String cursorName) throws Exception { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get(); + Dispatcher dispatcher = persistentTopic.getSubscription(cursorName).getDispatcher(); + if (dispatcher instanceof PersistentDispatcherMultipleConsumers) { + Method cancelPendingRead = PersistentDispatcherMultipleConsumers.class + .getDeclaredMethod("cancelPendingRead", new Class[]{}); + cancelPendingRead.setAccessible(true); + cancelPendingRead.invoke(dispatcher, new Object[]{}); + } else if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) { + Method cancelPendingRead = PersistentDispatcherSingleActiveConsumer.class + .getDeclaredMethod("cancelPendingRead", new Class[]{}); + cancelPendingRead.setAccessible(true); + cancelPendingRead.invoke(dispatcher, new Object[]{}); + } + } + + private void triggerNewReadMoreEntries(String tpName, String cursorName) throws Exception { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get(); + Dispatcher dispatcher = persistentTopic.getSubscription(cursorName).getDispatcher(); + if (dispatcher instanceof PersistentDispatcherMultipleConsumers) { + ((PersistentDispatcherMultipleConsumers) dispatcher).readMoreEntries(); + } else if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) { + PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer = + ((PersistentDispatcherSingleActiveConsumer) dispatcher); + Method readMoreEntries = PersistentDispatcherSingleActiveConsumer.class.getDeclaredMethod( + "readMoreEntries", new Class[]{org.apache.pulsar.broker.service.Consumer.class}); + readMoreEntries.setAccessible(true); + readMoreEntries.invoke(dispatcher, + new Object[]{persistentDispatcherSingleActiveConsumer.getActiveConsumer()}); + } + } + + @DataProvider(name = "multiConsumerSubscriptionTypes") + private Object[][] multiConsumerSubscriptionTypes() { + return new Object[][]{ + {SubscriptionType.Key_Shared}, + {SubscriptionType.Shared} + }; + } + + @DataProvider(name = "singleConsumerSubscriptionTypes") + private Object[][] singleConsumerSubscriptionTypes() { + return new Object[][]{ + {SubscriptionType.Failover}, + {SubscriptionType.Exclusive} + }; + } + + @DataProvider(name = "skipTypes") + private Object[][] skipTypes() { + return new Object[][]{ + {SkipType.SKIP_ENTRIES}, + {SkipType.CLEAR_BACKLOG}, + {SkipType.SEEK}, + {SkipType.RESET_CURSOR} + }; + } + + private enum SkipType{ + SKIP_ENTRIES, + CLEAR_BACKLOG, + SEEK, + RESET_CURSOR; + } + + private ReceivedMessages receiveAndAckMessages(BiFunction ackPredicate, + Consumer...consumers) throws Exception { + ReceivedMessages receivedMessages = new ReceivedMessages(); + while (true) { + int receivedMsgCount = 0; + for (int i = 0; i < consumers.length; i++) { + Consumer consumer = consumers[i]; + while (true) { + Message msg = consumer.receive(2, TimeUnit.SECONDS); + if (msg != null) { + receivedMsgCount++; + String v = msg.getValue(); + MessageId messageId = msg.getMessageId(); + receivedMessages.messagesReceived.add(Pair.of(msg.getMessageId(), v)); + if (ackPredicate.apply(messageId, v)) { + consumer.acknowledge(msg); + receivedMessages.messagesAcked.add(Pair.of(msg.getMessageId(), v)); + } + } else { + break; + } + } + } + // Because of the possibility of consumers getting stuck with each other, only jump out of the loop if all + // consumers could not receive messages. + if (receivedMsgCount == 0) { + break; + } + } + return receivedMessages; + } + + private ReceivedMessages ackAllMessages(Consumer...consumers) throws Exception { + return receiveAndAckMessages((msgId, msgV) -> true, consumers); + } + + private ReceivedMessages ackOddMessagesOnly(Consumer...consumers) throws Exception { + return receiveAndAckMessages((msgId, msgV) -> Integer.valueOf(msgV) % 2 == 1, consumers); + } + + private static class ReceivedMessages { + + List> messagesReceived = new ArrayList<>(); + + List> messagesAcked = new ArrayList<>(); + + public boolean hasReceivedMessage(String v) { + for (Pair pair : messagesReceived) { + if (pair.getRight().equals(v)) { + return true; + } + } + return false; + } + + public boolean hasAckedMessage(String v) { + for (Pair pair : messagesAcked) { + if (pair.getRight().equals(v)) { + return true; + } + } + return false; + } + } + + @Test(dataProvider = "multiConsumerSubscriptionTypes") + public void testPauseOnAckStatPersist(SubscriptionType subscriptionType) throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4; + final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10; + + enablePolicyDispatcherPauseOnAckStatePersistent(tpName); + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subscription, MessageId.earliest); + + // Send double MAX_UNACKED_RANGES_TO_PERSIST messages. + Producer p1 = pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create(); + ArrayList messageIdsSent = new ArrayList<>(); + for (int i = 0; i < msgSendCount; i++) { + MessageIdImpl messageId = (MessageIdImpl) p1.send(Integer.valueOf(i).toString()); + messageIdsSent.add(messageId); + } + // Make ack holes. + Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType) + .subscribe(); + ackOddMessagesOnly(c1); + + cancelPendingRead(tpName, subscription); + triggerNewReadMoreEntries(tpName, subscription); + + // Verify: the dispatcher has been paused. + final String specifiedMessage = "9876543210"; + p1.send(specifiedMessage); + Message msg1 = c1.receive(2, TimeUnit.SECONDS); + Assert.assertNull(msg1); + + // Verify: after ack messages, will unpause the dispatcher. + c1.acknowledge(messageIdsSent); + ReceivedMessages receivedMessagesAfterPause = ackAllMessages(c1); + Assert.assertTrue(receivedMessagesAfterPause.hasReceivedMessage(specifiedMessage)); + Assert.assertTrue(receivedMessagesAfterPause.hasAckedMessage(specifiedMessage)); + + // cleanup. + p1.close(); + c1.close(); + admin.topics().delete(tpName, false); + } + + @Test(dataProvider = "skipTypes") + public void testUnPauseOnSkipEntries(SkipType skipType) throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4; + final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10; + + enablePolicyDispatcherPauseOnAckStatePersistent(tpName); + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subscription, MessageId.earliest); + + // Send double MAX_UNACKED_RANGES_TO_PERSIST messages. + Producer p1 = pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create(); + ArrayList messageIdsSent = new ArrayList<>(); + for (int i = 0; i < msgSendCount; i++) { + MessageIdImpl messageId = (MessageIdImpl) p1.send(Integer.valueOf(i).toString()); + messageIdsSent.add(messageId); + } + // Make ack holes. + Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true) + .subscriptionType(SubscriptionType.Shared).subscribe(); + ackOddMessagesOnly(c1); + + cancelPendingRead(tpName, subscription); + triggerNewReadMoreEntries(tpName, subscription); + + // Verify: the dispatcher has been paused. + final String specifiedMessage1 = "9876543210"; + p1.send(specifiedMessage1); + Message msg1 = c1.receive(2, TimeUnit.SECONDS); + Assert.assertNull(msg1); + + // Verify: after enough messages have been skipped, will unpause the dispatcher. + skipMessages(tpName, subscription, skipType, c1); + // Since the message "specifiedMessage1" might be skipped, we send a new message to verify the result. + final String specifiedMessage2 = "9876543211"; + p1.send(specifiedMessage2); + + ReceivedMessages receivedMessagesAfterPause = ackAllMessages(c1); + Assert.assertTrue(receivedMessagesAfterPause.hasReceivedMessage(specifiedMessage2)); + Assert.assertTrue(receivedMessagesAfterPause.hasAckedMessage(specifiedMessage2)); + + // cleanup. + p1.close(); + c1.close(); + admin.topics().delete(tpName, false); + } + + private void skipMessages(String tpName, String subscription, SkipType skipType, Consumer c) throws Exception { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(tpName, false).join().get(); + Position LAC = persistentTopic.getManagedLedger().getLastConfirmedEntry(); + MessageIdImpl LACMessageId = new MessageIdImpl(LAC.getLedgerId(), LAC.getEntryId(), -1); + if (skipType == SkipType.SKIP_ENTRIES) { + while (true) { + GetStatsOptions getStatsOptions = new GetStatsOptions( + true, /* getPreciseBacklog */ + false, /* subscriptionBacklogSize */ + false, /* getEarliestTimeInBacklog */ + true, /* excludePublishers */ + true /* excludeConsumers */); + org.apache.pulsar.common.policies.data.SubscriptionStats subscriptionStats = + admin.topics().getStats(tpName, getStatsOptions).getSubscriptions().get(subscription); + if (subscriptionStats.getMsgBacklog() < MAX_UNACKED_RANGES_TO_PERSIST) { + break; + } + admin.topics().skipMessages(tpName, subscription, 100); + } + } else if (skipType == SkipType.CLEAR_BACKLOG){ + admin.topics().skipAllMessages(tpName, subscription); + } else if (skipType == SkipType.SEEK) { + c.seek(LACMessageId); + } else if (skipType == SkipType.RESET_CURSOR) { + admin.topics().resetCursor(tpName, subscription, LACMessageId, false); + } + } + + @Test(dataProvider = "singleConsumerSubscriptionTypes") + public void testSingleConsumerDispatcherWillNotPause(SubscriptionType subscriptionType) throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4; + final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10; + + enablePolicyDispatcherPauseOnAckStatePersistent(tpName); + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subscription, MessageId.earliest); + + // Send double MAX_UNACKED_RANGES_TO_PERSIST messages. + Producer p1 = pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create(); + ArrayList messageIdsSent = new ArrayList<>(); + for (int i = 0; i < msgSendCount; i++) { + MessageIdImpl messageId = (MessageIdImpl) p1.send(Integer.valueOf(i).toString()); + messageIdsSent.add(messageId); + } + // Make ack holes. + Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true) + .subscriptionType(subscriptionType) + .subscribe(); + ackOddMessagesOnly(c1); + + cancelPendingRead(tpName, subscription); + triggerNewReadMoreEntries(tpName, subscription); + + // Verify: the dispatcher has been paused. + final String specifiedMessage = "9876543210"; + p1.send(specifiedMessage); + Message msg1 = c1.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(msg1); + Assert.assertEquals(msg1.getValue(), specifiedMessage); + + // cleanup. + p1.close(); + c1.close(); + admin.topics().delete(tpName, false); + } + + @Test(dataProvider = "multiConsumerSubscriptionTypes") + public void testPauseOnAckStatPersistNotAffectReplayRead(SubscriptionType subscriptionType) throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4; + final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10; + + enablePolicyDispatcherPauseOnAckStatePersistent(tpName); + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subscription, MessageId.earliest); + + // Send double MAX_UNACKED_RANGES_TO_PERSIST messages. + Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType) + .subscribe(); + Producer p1 = pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create(); + ArrayList messageIdsSent = new ArrayList<>(); + for (int i = 0; i < msgSendCount; i++) { + MessageIdImpl messageId = (MessageIdImpl) p1.send(Integer.valueOf(i).toString()); + messageIdsSent.add(messageId); + } + // Make ack holes. + ReceivedMessages receivedMessagesC1 = ackOddMessagesOnly(c1); + + cancelPendingRead(tpName, subscription); + triggerNewReadMoreEntries(tpName, subscription); + + // Verify: the dispatcher has been paused. + Consumer c2 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType) + .subscribe(); + final String specifiedMessage = "9876543210"; + final int specifiedMessageCount = 1; + p1.send(specifiedMessage); + Message msg1 = c1.receive(2, TimeUnit.SECONDS); + Assert.assertNull(msg1); + Message msg2 = c2.receive(2, TimeUnit.SECONDS); + Assert.assertNull(msg2); + + // Verify: close the previous consumer, the new one could receive all messages. + c1.close(); + ReceivedMessages receivedMessagesC2 = ackAllMessages(c2); + int messageCountAckedByC1 = receivedMessagesC1.messagesAcked.size(); + int messageCountAckedByC2 = receivedMessagesC2.messagesAcked.size(); + Assert.assertEquals(messageCountAckedByC2, msgSendCount - messageCountAckedByC1 + specifiedMessageCount); + + // cleanup, c1 has been closed before. + p1.close(); + c2.close(); + admin.topics().delete(tpName, false); + } + + @Test(dataProvider = "multiConsumerSubscriptionTypes") + public void testMultiConsumersPauseOnAckStatPersistNotAffectReplayRead(SubscriptionType subscriptionType) + throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + final int msgSendCount = MAX_UNACKED_RANGES_TO_PERSIST * 4; + final int incomingQueueSize = MAX_UNACKED_RANGES_TO_PERSIST * 10; + + enablePolicyDispatcherPauseOnAckStatePersistent(tpName); + admin.topics().createNonPartitionedTopic(tpName); + admin.topics().createSubscription(tpName, subscription, MessageId.earliest); + + // Send double MAX_UNACKED_RANGES_TO_PERSIST messages. + Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType) + .subscribe(); + Consumer c2 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType) + .subscribe(); + Producer p1 = pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create(); + ArrayList messageIdsSent = new ArrayList<>(); + for (int i = 0; i < msgSendCount; i++) { + MessageIdImpl messageId = (MessageIdImpl) p1.send(Integer.valueOf(i).toString()); + messageIdsSent.add(messageId); + } + // Make ack holes. + ReceivedMessages receivedMessagesC1AndC2 = ackOddMessagesOnly(c1, c2); + + cancelPendingRead(tpName, subscription); + triggerNewReadMoreEntries(tpName, subscription); + + // Verify: the dispatcher has been paused. + Consumer c3 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType) + .subscribe(); + Consumer c4 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .receiverQueueSize(incomingQueueSize).isAckReceiptEnabled(true).subscriptionType(subscriptionType) + .subscribe(); + final String specifiedMessage = "9876543210"; + final int specifiedMessageCount = 1; + p1.send(specifiedMessage); + for (Consumer c : Arrays.asList(c1, c2, c3, c4)) { + Message m = c.receive(2, TimeUnit.SECONDS); + Assert.assertNull(m); + } + + // Verify: close the previous consumer, the new one could receive all messages. + c1.close(); + c2.close(); + ReceivedMessages receivedMessagesC3AndC4 = ackAllMessages(c3, c4); + int messageCountAckedByC1AndC2 = receivedMessagesC1AndC2.messagesAcked.size(); + int messageCountAckedByC3AndC4 = receivedMessagesC3AndC4.messagesAcked.size(); + Assert.assertEquals(messageCountAckedByC3AndC4, + msgSendCount - messageCountAckedByC1AndC2 + specifiedMessageCount); + + // cleanup, c1 has been closed before. + p1.close(); + c3.close(); + c4.close(); + admin.topics().delete(tpName, false); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java index 0249272b72d84..7f841ec89758e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java @@ -49,6 +49,7 @@ public class HierarchyTopicPolicies { final PolicyHierarchyValue maxConsumerPerTopic; final PolicyHierarchyValue publishRate; final PolicyHierarchyValue delayedDeliveryEnabled; + final PolicyHierarchyValue dispatcherPauseOnAckStatePersistentEnabled; final PolicyHierarchyValue delayedDeliveryTickTimeMillis; final PolicyHierarchyValue replicatorDispatchRate; final PolicyHierarchyValue maxConsumersPerSubscription; @@ -81,6 +82,7 @@ public HierarchyTopicPolicies() { messageTTLInSeconds = new PolicyHierarchyValue<>(); publishRate = new PolicyHierarchyValue<>(); delayedDeliveryEnabled = new PolicyHierarchyValue<>(); + dispatcherPauseOnAckStatePersistentEnabled = new PolicyHierarchyValue<>(); delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>(); replicatorDispatchRate = new PolicyHierarchyValue<>(); compactionThreshold = new PolicyHierarchyValue<>(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index 4a76170d116a3..438e48511ae43 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -62,6 +62,7 @@ public class TopicPolicies { private Integer maxUnackedMessagesOnSubscription; private Long delayedDeliveryTickTimeMillis; private Boolean delayedDeliveryEnabled; + private Boolean dispatcherPauseOnAckStatePersistentEnabled;; private OffloadPoliciesImpl offloadPolicies; private InactiveTopicPolicies inactiveTopicPolicies; private DispatchRateImpl dispatchRate;