Skip to content

Commit 734e3da

Browse files
dao-junnikhil-ctds
authored andcommitted
[fix][broker] Handle BucketDelayedDeliveryTracker recover failed (apache#22735)
(cherry picked from commit 1c53841) (cherry picked from commit 257ca7f)
1 parent ead7629 commit 734e3da

9 files changed

+420
-23
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.delayed;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import io.netty.util.HashedWheelTimer;
2223
import io.netty.util.Timer;
2324
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -33,10 +34,15 @@
3334
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
3435
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
3536
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
37+
import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
38+
import org.apache.pulsar.broker.service.BrokerService;
3639
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
3740
import org.apache.pulsar.common.util.FutureUtil;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
3843

3944
public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {
45+
private static final Logger log = LoggerFactory.getLogger(BucketDelayedDeliveryTrackerFactory.class);
4046

4147
BucketSnapshotStorage bucketSnapshotStorage;
4248

@@ -73,8 +79,28 @@ public void initialize(PulsarService pulsarService) throws Exception {
7379

7480
@Override
7581
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
76-
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
77-
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
82+
String topicName = dispatcher.getTopic().getName();
83+
String subscriptionName = dispatcher.getSubscription().getName();
84+
BrokerService brokerService = dispatcher.getTopic().getBrokerService();
85+
DelayedDeliveryTracker tracker;
86+
87+
try {
88+
tracker = newTracker0(dispatcher);
89+
} catch (RecoverDelayedDeliveryTrackerException ex) {
90+
log.warn("Failed to recover BucketDelayedDeliveryTracker, fallback to InMemoryDelayedDeliveryTracker."
91+
+ " topic {}, subscription {}", topicName, subscriptionName, ex);
92+
// If failed to create BucketDelayedDeliveryTracker, fallback to InMemoryDelayedDeliveryTracker
93+
brokerService.initializeFallbackDelayedDeliveryTrackerFactory();
94+
tracker = brokerService.getFallbackDelayedDeliveryTrackerFactory().newTracker(dispatcher);
95+
}
96+
return tracker;
97+
}
98+
99+
@VisibleForTesting
100+
BucketDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher)
101+
throws RecoverDelayedDeliveryTrackerException {
102+
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
103+
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
78104
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
79105
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
80106
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryTracker.java

+47
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,51 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
8585
* Close the subscription tracker and release all resources.
8686
*/
8787
void close();
88+
89+
DelayedDeliveryTracker DISABLE = new DelayedDeliveryTracker() {
90+
@Override
91+
public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
92+
return false;
93+
}
94+
95+
@Override
96+
public boolean hasMessageAvailable() {
97+
return false;
98+
}
99+
100+
@Override
101+
public long getNumberOfDelayedMessages() {
102+
return 0;
103+
}
104+
105+
@Override
106+
public long getBufferMemoryUsage() {
107+
return 0;
108+
}
109+
110+
@Override
111+
public NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
112+
return null;
113+
}
114+
115+
@Override
116+
public boolean shouldPauseAllDeliveries() {
117+
return false;
118+
}
119+
120+
@Override
121+
public void resetTickTime(long tickTime) {
122+
123+
}
124+
125+
@Override
126+
public CompletableFuture<Void> clear() {
127+
return null;
128+
}
129+
130+
@Override
131+
public void close() {
132+
133+
}
134+
};
88135
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java

+19
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818
*/
1919
package org.apache.pulsar.broker.delayed;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import io.netty.util.HashedWheelTimer;
2223
import io.netty.util.Timer;
2324
import io.netty.util.concurrent.DefaultThreadFactory;
2425
import java.util.concurrent.TimeUnit;
2526
import org.apache.pulsar.broker.PulsarService;
2627
import org.apache.pulsar.broker.ServiceConfiguration;
2728
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2831

2932
public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTrackerFactory {
33+
private static final Logger log = LoggerFactory.getLogger(InMemoryDelayedDeliveryTrackerFactory.class);
3034

3135
private Timer timer;
3236

@@ -48,6 +52,21 @@ public void initialize(PulsarService pulsarService) {
4852

4953
@Override
5054
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
55+
String topicName = dispatcher.getTopic().getName();
56+
String subscriptionName = dispatcher.getSubscription().getName();
57+
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
58+
try {
59+
tracker = newTracker0(dispatcher);
60+
} catch (Exception e) {
61+
// it should never go here
62+
log.warn("Failed to create InMemoryDelayedDeliveryTracker, topic {}, subscription {}",
63+
topicName, subscriptionName, e);
64+
}
65+
return tracker;
66+
}
67+
68+
@VisibleForTesting
69+
InMemoryDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher) {
5170
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
5271
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
5372
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java

+22-13
Original file line numberDiff line numberDiff line change
@@ -105,22 +105,24 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker
105105
private CompletableFuture<Void> pendingLoad = null;
106106

107107
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
108-
Timer timer, long tickTimeMillis,
109-
boolean isDelayedDeliveryDeliverAtTimeStrict,
110-
BucketSnapshotStorage bucketSnapshotStorage,
111-
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
112-
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
108+
Timer timer, long tickTimeMillis,
109+
boolean isDelayedDeliveryDeliverAtTimeStrict,
110+
BucketSnapshotStorage bucketSnapshotStorage,
111+
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
112+
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
113+
throws RecoverDelayedDeliveryTrackerException {
113114
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
114115
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
115116
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
116117
}
117118

118119
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
119-
Timer timer, long tickTimeMillis, Clock clock,
120-
boolean isDelayedDeliveryDeliverAtTimeStrict,
121-
BucketSnapshotStorage bucketSnapshotStorage,
122-
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
123-
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
120+
Timer timer, long tickTimeMillis, Clock clock,
121+
boolean isDelayedDeliveryDeliverAtTimeStrict,
122+
BucketSnapshotStorage bucketSnapshotStorage,
123+
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
124+
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
125+
throws RecoverDelayedDeliveryTrackerException {
124126
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
125127
this.minIndexCountPerBucket = minIndexCountPerBucket;
126128
this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
@@ -133,10 +135,17 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
133135
new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(),
134136
bucketSnapshotStorage);
135137
this.stats = new BucketDelayedMessageIndexStats();
136-
this.numberDelayedMessages = recoverBucketSnapshot();
138+
139+
// Close the tracker if failed to recover.
140+
try {
141+
this.numberDelayedMessages = recoverBucketSnapshot();
142+
} catch (RecoverDelayedDeliveryTrackerException e) {
143+
close();
144+
throw e;
145+
}
137146
}
138147

139-
private synchronized long recoverBucketSnapshot() throws RuntimeException {
148+
private synchronized long recoverBucketSnapshot() throws RecoverDelayedDeliveryTrackerException {
140149
ManagedCursor cursor = this.lastMutableBucket.getCursor();
141150
Map<String, String> cursorProperties = cursor.getCursorProperties();
142151
if (MapUtils.isEmpty(cursorProperties)) {
@@ -181,7 +190,7 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
181190
if (e instanceof InterruptedException) {
182191
Thread.currentThread().interrupt();
183192
}
184-
throw new RuntimeException(e);
193+
throw new RecoverDelayedDeliveryTrackerException(e);
185194
}
186195

187196
for (Map.Entry<Range<Long>, CompletableFuture<List<DelayedIndex>>> entry : futures.entrySet()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.delayed.bucket;
20+
21+
public class RecoverDelayedDeliveryTrackerException extends Exception {
22+
public RecoverDelayedDeliveryTrackerException(Throwable cause) {
23+
super(cause);
24+
}
25+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.apache.pulsar.broker.cache.BundlesQuotas;
105105
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
106106
import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
107+
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory;
107108
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
108109
import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
109110
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -272,10 +273,11 @@ public class BrokerService implements Closeable {
272273
private final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
273274
private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers;
274275
private final ReadWriteLock lock = new ReentrantReadWriteLock();
275-
276-
@Getter
277276
@VisibleForTesting
278277
private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
278+
// InMemoryDelayedDeliveryTrackerFactory is for the purpose of
279+
// fallback if recover BucketDelayedDeliveryTracker failed.
280+
private volatile DelayedDeliveryTrackerFactory fallbackDelayedDeliveryTrackerFactory;
279281
private final ServerBootstrap defaultServerBootstrap;
280282
private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>();
281283

@@ -884,6 +886,9 @@ public CompletableFuture<Void> closeAsync() {
884886
pulsarStats.close();
885887
try {
886888
delayedDeliveryTrackerFactory.close();
889+
if (fallbackDelayedDeliveryTrackerFactory != null) {
890+
fallbackDelayedDeliveryTrackerFactory.close();
891+
}
887892
} catch (Exception e) {
888893
log.warn("Error in closing delayedDeliveryTrackerFactory", e);
889894
}
@@ -3442,6 +3447,25 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
34423447
}
34433448
}
34443449

3450+
/**
3451+
* Initializes the in-memory delayed delivery tracker factory when
3452+
* BucketDelayedDeliveryTrackerFactory.newTracker failed.
3453+
*/
3454+
public synchronized void initializeFallbackDelayedDeliveryTrackerFactory() {
3455+
if (fallbackDelayedDeliveryTrackerFactory != null) {
3456+
return;
3457+
}
3458+
3459+
DelayedDeliveryTrackerFactory factory = new InMemoryDelayedDeliveryTrackerFactory();
3460+
try {
3461+
factory.initialize(pulsar);
3462+
this.fallbackDelayedDeliveryTrackerFactory = factory;
3463+
} catch (Exception e) {
3464+
// it should never go here
3465+
log.error("Failed to initialize InMemoryDelayedDeliveryTrackerFactory", e);
3466+
}
3467+
}
3468+
34453469
private static class ConfigField {
34463470
final Field field;
34473471

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -1083,15 +1083,15 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
10831083
}
10841084

10851085
synchronized (this) {
1086-
if (!delayedDeliveryTracker.isPresent()) {
1086+
if (delayedDeliveryTracker.isEmpty()) {
10871087
if (!msgMetadata.hasDeliverAtTime()) {
10881088
// No need to initialize the tracker here
10891089
return false;
10901090
}
10911091

10921092
// Initialize the tracker the first time we need to use it
1093-
delayedDeliveryTracker = Optional
1094-
.of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
1093+
delayedDeliveryTracker = Optional.of(
1094+
topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this));
10951095
}
10961096

10971097
delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis());
@@ -1245,5 +1245,10 @@ protected int getStickyKeyHash(Entry entry) {
12451245
return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer()));
12461246
}
12471247

1248+
1249+
public Subscription getSubscription() {
1250+
return subscription;
1251+
}
1252+
12481253
private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
12491254
}

0 commit comments

Comments
 (0)