Skip to content

Commit e1a0ef8

Browse files
committed
Restore Delayed delivery feature
1 parent 8570ba4 commit e1a0ef8

File tree

1 file changed

+20
-1
lines changed

1 file changed

+20
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
6161
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
6262
import org.apache.pulsar.client.impl.Backoff;
63+
import org.apache.pulsar.common.api.proto.PulsarApi;
6364
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
6465
import org.apache.pulsar.common.naming.TopicName;
6566
import org.apache.pulsar.common.policies.data.DispatchRate;
@@ -119,7 +120,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
119120
public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription,
120121
boolean allowOutOfOrderDelivery) {
121122
super(subscription);
122-
this.serviceConfig = topic.getBrokerService().getPulsar().getConfig();
123+
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
123124
this.cursor = cursor;
124125
this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
125126
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
@@ -844,6 +845,24 @@ public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
844845
}
845846
}
846847

848+
@Override
849+
public boolean trackDelayedDelivery(long ledgerId, long entryId, PulsarApi.MessageMetadata msgMetadata) {
850+
if (!topic.isDelayedDeliveryEnabled()) {
851+
// If broker has the feature disabled, always deliver messages immediately
852+
return false;
853+
}
854+
synchronized (this) {
855+
if (!delayedDeliveryTracker.isPresent()) {
856+
// Initialize the tracker the first time we need to use it
857+
delayedDeliveryTracker = Optional
858+
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
859+
}
860+
861+
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
862+
return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime());
863+
}
864+
}
865+
847866
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
848867
if (!redeliveryMessages.isEmpty()) {
849868
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);

0 commit comments

Comments
 (0)