Skip to content

Commit 00e6f6f

Browse files
shibdnikhil-ctds
authored andcommitted
[fix][broker][branch-3.0] The topic might reference a closed ledger (apache#22860) (apache#22900)
(cherry picked from commit 8be3e8a)
1 parent 71b7561 commit 00e6f6f

File tree

4 files changed

+150
-87
lines changed

4 files changed

+150
-87
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

+5
Original file line numberDiff line numberDiff line change
@@ -1940,6 +1940,11 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
19401940
return new BrokerService(pulsar, ioEventLoopGroup);
19411941
}
19421942

1943+
@VisibleForTesting
1944+
public void setTransactionExecutorProvider(TransactionBufferProvider transactionBufferProvider) {
1945+
this.transactionBufferProvider = transactionBufferProvider;
1946+
}
1947+
19431948
public void initConfigMetadataSynchronizerIfNeeded() {
19441949
mutex.lock();
19451950
try {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+76-78
Original file line numberDiff line numberDiff line change
@@ -1044,38 +1044,38 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
10441044
return getTopic(TopicName.get(topic), createIfMissing, properties);
10451045
}
10461046

1047+
/**
1048+
* Retrieves or creates a topic based on the specified parameters.
1049+
* 0. If disable PersistentTopics or NonPersistentTopics, it will return a failed future with NotAllowedException.
1050+
* 1. If topic future exists in the cache returned directly regardless of whether it fails or timeout.
1051+
* 2. If the topic metadata exists, the topic is created regardless of {@code createIfMissing}.
1052+
* 3. If the topic metadata not exists, and {@code createIfMissing} is false,
1053+
* returns an empty Optional in a CompletableFuture. And this empty future not be added to the map.
1054+
* 4. Otherwise, use computeIfAbsent. It returns the existing topic or creates and adds a new topicFuture.
1055+
* Any exceptions will remove the topicFuture from the map.
1056+
*
1057+
* @param topicName The name of the topic, potentially including partition information.
1058+
* @param createIfMissing If true, creates the topic if it does not exist.
1059+
* @param properties Topic configuration properties used during creation.
1060+
* @return CompletableFuture with an Optional of the topic if found or created, otherwise empty.
1061+
*/
10471062
public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, boolean createIfMissing,
10481063
Map<String, String> properties) {
10491064
try {
1050-
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
1051-
if (topicFuture != null) {
1052-
if (topicFuture.isCompletedExceptionally()
1053-
|| (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) {
1054-
// Exceptional topics should be recreated.
1055-
topics.remove(topicName.toString(), topicFuture);
1056-
} else {
1057-
// a non-existing topic in the cache shouldn't prevent creating a topic
1058-
if (createIfMissing) {
1059-
if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) {
1060-
return topicFuture;
1061-
} else {
1062-
return topicFuture.thenCompose(value -> {
1063-
if (!value.isPresent()) {
1064-
// retry and create topic
1065-
return getTopic(topicName, createIfMissing, properties);
1066-
} else {
1067-
// in-progress future completed successfully
1068-
return CompletableFuture.completedFuture(value);
1069-
}
1070-
});
1071-
}
1072-
} else {
1073-
return topicFuture;
1074-
}
1075-
}
1065+
// If topic future exists in the cache returned directly regardless of whether it fails or timeout.
1066+
CompletableFuture<Optional<Topic>> tp = topics.get(topicName.toString());
1067+
if (tp != null) {
1068+
return tp;
10761069
}
10771070
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
10781071
if (isPersistentTopic) {
1072+
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
1073+
if (log.isDebugEnabled()) {
1074+
log.debug("Broker is unable to load persistent topic {}", topicName);
1075+
}
1076+
return FutureUtil.failedFuture(new NotAllowedException(
1077+
"Broker is unable to load persistent topic"));
1078+
}
10791079
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topicName)
10801080
.thenCompose(exists -> {
10811081
if (!exists && !createIfMissing) {
@@ -1090,44 +1090,48 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
10901090
throw FutureUtil.wrapToCompletionException(new ServiceUnitNotReadyException(errorInfo));
10911091
}).thenCompose(optionalTopicPolicies -> {
10921092
final TopicPolicies topicPolicies = optionalTopicPolicies.orElse(null);
1093-
return topics.computeIfAbsent(topicName.toString(), (tpName) -> {
1094-
if (topicName.isPartitioned()) {
1095-
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
1096-
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
1097-
.thenCompose((metadata) -> {
1098-
// Allow crate non-partitioned persistent topic that name includes
1099-
// `partition`
1100-
if (metadata.partitions == 0
1101-
|| topicName.getPartitionIndex() < metadata.partitions) {
1102-
return loadOrCreatePersistentTopic(tpName, createIfMissing,
1103-
properties, topicPolicies);
1104-
}
1093+
if (topicName.isPartitioned()) {
1094+
final TopicName topicNameEntity = TopicName.get(topicName.getPartitionedTopicName());
1095+
return fetchPartitionedTopicMetadataAsync(topicNameEntity)
1096+
.thenCompose((metadata) -> {
1097+
// Allow crate non-partitioned persistent topic that name includes
1098+
// `partition`
1099+
if (metadata.partitions == 0
1100+
|| topicName.getPartitionIndex() < metadata.partitions) {
1101+
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
1102+
loadOrCreatePersistentTopic(tpName,
1103+
createIfMissing, properties, topicPolicies));
1104+
} else {
11051105
final String errorMsg =
11061106
String.format("Illegal topic partition name %s with max allowed "
11071107
+ "%d partitions", topicName, metadata.partitions);
11081108
log.warn(errorMsg);
11091109
return FutureUtil.failedFuture(
11101110
new BrokerServiceException.NotAllowedException(errorMsg));
1111-
});
1112-
}
1113-
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
1114-
}).thenCompose(optionalTopic -> {
1115-
if (!optionalTopic.isPresent() && createIfMissing) {
1116-
log.warn("[{}] Try to recreate the topic with createIfMissing=true "
1117-
+ "but the returned topic is empty", topicName);
1118-
return getTopic(topicName, createIfMissing, properties);
1119-
}
1120-
return CompletableFuture.completedFuture(optionalTopic);
1121-
});
1111+
}
1112+
});
1113+
} else {
1114+
return topics.computeIfAbsent(topicName.toString(), (tpName) ->
1115+
loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies));
1116+
}
11221117
});
11231118
});
11241119
} else {
1125-
return topics.computeIfAbsent(topicName.toString(), (name) -> {
1120+
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
1121+
if (log.isDebugEnabled()) {
1122+
log.debug("Broker is unable to load non-persistent topic {}", topicName);
1123+
}
1124+
return FutureUtil.failedFuture(new NotAllowedException(
1125+
"Broker is unable to load persistent topic"));
1126+
}
1127+
if (!topics.containsKey(topicName.toString())) {
11261128
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.BEFORE);
1127-
if (topicName.isPartitioned()) {
1128-
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
1129-
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
1130-
if (topicName.getPartitionIndex() < metadata.partitions) {
1129+
}
1130+
if (topicName.isPartitioned()) {
1131+
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
1132+
return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
1133+
if (topicName.getPartitionIndex() < metadata.partitions) {
1134+
return topics.computeIfAbsent(topicName.toString(), (name) -> {
11311135
topicEventsDispatcher
11321136
.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);
11331137

@@ -1138,11 +1142,13 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11381142
topicEventsDispatcher
11391143
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
11401144
return res;
1141-
}
1142-
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
1143-
return CompletableFuture.completedFuture(Optional.empty());
1144-
});
1145-
} else if (createIfMissing) {
1145+
});
1146+
}
1147+
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
1148+
return CompletableFuture.completedFuture(Optional.empty());
1149+
});
1150+
} else if (createIfMissing) {
1151+
return topics.computeIfAbsent(topicName.toString(), (name) -> {
11461152
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.CREATE, EventStage.BEFORE);
11471153

11481154
CompletableFuture<Optional<Topic>> res = createNonPersistentTopic(name);
@@ -1152,11 +1158,15 @@ public CompletableFuture<Optional<Topic>> getTopic(final TopicName topicName, bo
11521158
topicEventsDispatcher
11531159
.notifyOnCompletion(eventFuture, topicName.toString(), TopicEvent.LOAD);
11541160
return res;
1155-
} else {
1161+
});
1162+
} else {
1163+
CompletableFuture<Optional<Topic>> topicFuture = topics.get(topicName.toString());
1164+
if (topicFuture == null) {
11561165
topicEventsDispatcher.notify(topicName.toString(), TopicEvent.LOAD, EventStage.FAILURE);
1157-
return CompletableFuture.completedFuture(Optional.empty());
1166+
topicFuture = CompletableFuture.completedFuture(Optional.empty());
11581167
}
1159-
});
1168+
return topicFuture;
1169+
}
11601170
}
11611171
} catch (IllegalArgumentException e) {
11621172
log.warn("[{}] Illegalargument exception when loading topic", topicName, e);
@@ -1295,15 +1305,9 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
12951305
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
12961306
topicFuture.exceptionally(t -> {
12971307
pulsarStats.recordTopicLoadFailed();
1308+
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
12981309
return null;
12991310
});
1300-
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
1301-
if (log.isDebugEnabled()) {
1302-
log.debug("Broker is unable to load non-persistent topic {}", topic);
1303-
}
1304-
return FutureUtil.failedFuture(
1305-
new NotAllowedException("Broker is not unable to load non-persistent topic"));
1306-
}
13071311
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
13081312
NonPersistentTopic nonPersistentTopic;
13091313
try {
@@ -1326,7 +1330,6 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
13261330
}).exceptionally(ex -> {
13271331
log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
13281332
nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
1329-
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
13301333
topicFuture.completeExceptionally(ex);
13311334
});
13321335
return null;
@@ -1579,14 +1582,6 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
15791582
final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(
15801583
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
15811584
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
1582-
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
1583-
if (log.isDebugEnabled()) {
1584-
log.debug("Broker is unable to load persistent topic {}", topic);
1585-
}
1586-
topicFuture.completeExceptionally(new NotAllowedException(
1587-
"Broker is unable to load persistent topic"));
1588-
return topicFuture;
1589-
}
15901585

15911586
checkTopicNsOwnership(topic)
15921587
.thenRun(() -> {
@@ -1609,6 +1604,7 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
16091604
}
16101605
}
16111606
}).exceptionally(ex -> {
1607+
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
16121608
topicFuture.completeExceptionally(ex.getCause());
16131609
return null;
16141610
});
@@ -1779,6 +1775,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
17791775
+ " topic", topic, FutureUtil.getException(topicFuture));
17801776
executor().submit(() -> {
17811777
persistentTopic.close().whenComplete((ignore, ex) -> {
1778+
topics.remove(topic, topicFuture);
17821779
if (ex != null) {
17831780
log.warn("[{}] Get an error when closing topic.",
17841781
topic, ex);
@@ -1795,6 +1792,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
17951792
+ " Removing topic from topics list {}, {}", topic, ex);
17961793
executor().submit(() -> {
17971794
persistentTopic.close().whenComplete((ignore, closeEx) -> {
1795+
topics.remove(topic, topicFuture);
17981796
if (closeEx != null) {
17991797
log.warn("[{}] Get an error when closing topic.",
18001798
topic, closeEx);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
22-
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
2322
import static org.mockito.ArgumentMatchers.eq;
2423
import static org.mockito.Mockito.doAnswer;
2524
import static org.mockito.Mockito.spy;
2625
import static org.testng.Assert.assertEquals;
27-
import static org.testng.Assert.assertNotEquals;
2826
import static org.testng.Assert.assertNotNull;
2927
import static org.testng.Assert.assertNull;
3028
import static org.testng.Assert.assertTrue;
@@ -1404,13 +1402,6 @@ public void testCleanupTopic() throws Exception {
14041402
// Ok
14051403
}
14061404

1407-
final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
1408-
// timeout topic future should be removed from cache
1409-
retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
1410-
1000);
1411-
1412-
assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));
1413-
14141405
try {
14151406
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
14161407
.subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
@@ -1422,6 +1413,7 @@ public void testCleanupTopic() throws Exception {
14221413
ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
14231414
mlFuture.complete(ml);
14241415

1416+
// Re-create topic will success.
14251417
Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
14261418
.subscriptionType(SubscriptionType.Shared).subscribeAsync()
14271419
.get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);

0 commit comments

Comments
 (0)