Skip to content

Commit 40a797c

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[improve] [broker] high CPU usage caused by list topics under namespace (apache#23049)
(cherry picked from commit 3e4f338) (cherry picked from commit 3f7206c)
1 parent d63205c commit 40a797c

File tree

3 files changed

+27
-2
lines changed

3 files changed

+27
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

+23
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Optional;
3838
import java.util.Set;
3939
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.ConcurrentHashMap;
4041
import java.util.concurrent.CopyOnWriteArrayList;
4142
import java.util.concurrent.ExecutionException;
4243
import java.util.concurrent.TimeUnit;
@@ -52,6 +53,7 @@
5253
import org.apache.commons.collections4.CollectionUtils;
5354
import org.apache.commons.collections4.ListUtils;
5455
import org.apache.commons.lang3.StringUtils;
56+
import org.apache.commons.lang3.mutable.MutableBoolean;
5557
import org.apache.commons.lang3.tuple.Pair;
5658
import org.apache.pulsar.broker.PulsarServerException;
5759
import org.apache.pulsar.broker.PulsarService;
@@ -100,6 +102,7 @@
100102
import org.apache.pulsar.common.policies.data.Policies;
101103
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
102104
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
105+
import org.apache.pulsar.common.topics.TopicList;
103106
import org.apache.pulsar.common.util.FutureUtil;
104107
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
105108
import org.apache.pulsar.metadata.api.MetadataCache;
@@ -159,6 +162,9 @@ public class NamespaceService implements AutoCloseable {
159162
.register();
160163

161164

165+
private ConcurrentHashMap<String, CompletableFuture<List<String>>> inProgressQueryUserTopics =
166+
new ConcurrentHashMap<>();
167+
162168
/**
163169
* Default constructor.
164170
*/
@@ -1452,6 +1458,23 @@ public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceNa
14521458
}
14531459
}
14541460

1461+
public CompletableFuture<List<String>> getListOfUserTopics(NamespaceName namespaceName, Mode mode) {
1462+
String key = String.format("%s://%s", mode, namespaceName);
1463+
final MutableBoolean initializedByCurrentThread = new MutableBoolean();
1464+
CompletableFuture<List<String>> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> {
1465+
initializedByCurrentThread.setTrue();
1466+
return getListOfTopics(namespaceName, mode).thenApplyAsync(list -> {
1467+
return TopicList.filterSystemTopic(list);
1468+
}, pulsar.getExecutor());
1469+
});
1470+
if (initializedByCurrentThread.getValue()) {
1471+
queryRes.whenComplete((ignore, ex) -> {
1472+
inProgressQueryUserTopics.remove(key, queryRes);
1473+
});
1474+
}
1475+
return queryRes;
1476+
}
1477+
14551478
public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
14561479
return getPartitions(namespaceName, TopicDomain.persistent)
14571480
.thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent),

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -2348,11 +2348,11 @@ protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGet
23482348
if (lookupSemaphore.tryAcquire()) {
23492349
isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
23502350
if (isAuthorized) {
2351-
getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode)
2351+
getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode)
23522352
.thenAccept(topics -> {
23532353
boolean filterTopics = false;
23542354
// filter system topic
2355-
List<String> filteredTopics = TopicList.filterSystemTopic(topics);
2355+
List<String> filteredTopics = topics;
23562356

23572357
if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) {
23582358
if (topicsPattern.get().length() <= maxSubscriptionPatternLength) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ public void setup() throws Exception {
229229
doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any());
230230
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics(
231231
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);
232+
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfUserTopics(
233+
NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL);
232234
doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics(
233235
NamespaceName.get("use", "ns-abc"));
234236

0 commit comments

Comments
 (0)