diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 1ba353dccaa1c..975b23192f949 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -24,6 +24,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; @@ -54,10 +56,14 @@ public class NamespaceResources extends BaseResources { private static final String NAMESPACE_BASE_PATH = "/namespace"; public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) { + this(configurationStore, operationTimeoutSec, ForkJoinPool.commonPool()); + } + + public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) { super(configurationStore, Policies.class, operationTimeoutSec); this.configurationStore = configurationStore; isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec); - partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec); + partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec, executor); } public CompletableFuture> listNamespacesAsync(String tenant) { @@ -234,9 +240,11 @@ public void setIsolationDataWithCreate(String cluster, public static class PartitionedTopicResources extends BaseResources { private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics"; + private final Executor executor; - public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) { + public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) { super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec); + this.executor = executor; } public CompletableFuture updatePartitionedTopicAsync(TopicName tn, Function runWithMarkDeleteAsync(TopicName topic, future.complete(deleteResult); } }); - }); + }, executor); return future; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index fe7ffe0bc7b43..cc64eeb52f6eb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.resources; import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import lombok.Getter; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -57,13 +59,19 @@ public class PulsarResources { public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) { this(localMetadataStore, configurationMetadataStore, DEFAULT_OPERATION_TIMEOUT_SEC); } + + public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore, + int operationTimeoutSec) { + this(localMetadataStore, configurationMetadataStore, operationTimeoutSec, ForkJoinPool.commonPool()); + } + public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore, - int operationTimeoutSec) { + int operationTimeoutSec, Executor executor) { if (configurationMetadataStore != null) { tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec); clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore, operationTimeoutSec); - namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec); + namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec, executor); resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec); } else { tenantResources = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 51dffc20d076e..96f3653ea9966 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1000,7 +1000,7 @@ protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception @VisibleForTesting protected PulsarResources newPulsarResources() { PulsarResources pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore, - config.getMetadataStoreOperationTimeoutSeconds()); + config.getMetadataStoreOperationTimeoutSeconds(), getExecutor()); pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster); return pulsarResources;