Skip to content

Commit

Permalink
[improve][broker] Don't use forkjoin pool by default for deleting par…
Browse files Browse the repository at this point in the history
…titioned topics (#22598)
  • Loading branch information
lhotari authored Apr 26, 2024
1 parent 7a44c80 commit 8323a3c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -54,10 +56,14 @@ public class NamespaceResources extends BaseResources<Policies> {
private static final String NAMESPACE_BASE_PATH = "/namespace";

public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) {
this(configurationStore, operationTimeoutSec, ForkJoinPool.commonPool());
}

public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec, executor);
}

public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
Expand Down Expand Up @@ -234,9 +240,11 @@ public void setIsolationDataWithCreate(String cluster,

public static class PartitionedTopicResources extends BaseResources<PartitionedTopicMetadata> {
private static final String PARTITIONED_TOPIC_PATH = "/admin/partitioned-topics";
private final Executor executor;

public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec) {
public PartitionedTopicResources(MetadataStore configurationStore, int operationTimeoutSec, Executor executor) {
super(configurationStore, PartitionedTopicMetadata.class, operationTimeoutSec);
this.executor = executor;
}

public CompletableFuture<Void> updatePartitionedTopicAsync(TopicName tn, Function<PartitionedTopicMetadata,
Expand Down Expand Up @@ -371,7 +379,7 @@ public CompletableFuture<Void> runWithMarkDeleteAsync(TopicName topic,
future.complete(deleteResult);
}
});
});
}, executor);

return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.resources;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import lombok.Getter;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand Down Expand Up @@ -57,13 +59,19 @@ public class PulsarResources {
public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore) {
this(localMetadataStore, configurationMetadataStore, DEFAULT_OPERATION_TIMEOUT_SEC);
}

public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
int operationTimeoutSec) {
this(localMetadataStore, configurationMetadataStore, operationTimeoutSec, ForkJoinPool.commonPool());
}

public PulsarResources(MetadataStore localMetadataStore, MetadataStore configurationMetadataStore,
int operationTimeoutSec) {
int operationTimeoutSec, Executor executor) {
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore,
operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec, executor);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception
@VisibleForTesting
protected PulsarResources newPulsarResources() {
PulsarResources pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore,
config.getMetadataStoreOperationTimeoutSeconds());
config.getMetadataStoreOperationTimeoutSeconds(), getExecutor());

pulsarResources.getClusterResources().getStore().registerListener(this::handleDeleteCluster);
return pulsarResources;
Expand Down

0 comments on commit 8323a3c

Please sign in to comment.