diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c21c7dc771eae..51dffc20d076e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -565,6 +565,11 @@ public CompletableFuture closeAsync() { transactionBufferClient.close(); } + if (topicPoliciesService != null) { + topicPoliciesService.close(); + topicPoliciesService = null; + } + if (client != null) { client.close(); client = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 0449e5c885cd3..6d18d6d61b08e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.apache.commons.lang3.concurrent.ConcurrentInitializer; @@ -72,6 +73,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private final PulsarService pulsarService; private final HashSet localCluster; private final String clusterName; + private final AtomicBoolean closed = new AtomicBoolean(false); private final ConcurrentInitializer namespaceEventsSystemTopicFactoryLazyInitializer = new LazyInitializer<>() { @@ -110,12 +112,18 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) { this.writerCaches = Caffeine.newBuilder() .expireAfterAccess(5, TimeUnit.MINUTES) .removalListener((namespaceName, writer, cause) -> { - ((SystemTopicClient.Writer) writer).closeAsync().exceptionally(ex -> { - log.error("[{}] Close writer error.", namespaceName, ex); - return null; - }); + try { + ((SystemTopicClient.Writer) writer).close(); + } catch (Exception e) { + log.error("[{}] Close writer error.", namespaceName, e); + } }) + .executor(pulsarService.getExecutor()) .buildAsync((namespaceName, executor) -> { + if (closed.get()) { + return CompletableFuture.failedFuture( + new BrokerServiceException(getClass().getName() + " is closed.")); + } SystemTopicClient systemTopicClient = getNamespaceEventsSystemTopicFactory() .createTopicPoliciesSystemTopicClient(namespaceName); return systemTopicClient.newWriterAsync(); @@ -382,6 +390,10 @@ public CompletableFuture addOwnedNamespaceBundleAsync(NamespaceBundle name protected CompletableFuture> createSystemTopicClient( NamespaceName namespace) { + if (closed.get()) { + return CompletableFuture.failedFuture( + new BrokerServiceException(getClass().getName() + " is closed.")); + } try { createSystemTopicFactoryIfNeeded(); } catch (PulsarServerException ex) { @@ -430,6 +442,11 @@ public boolean test(NamespaceBundle namespaceBundle) { } private void initPolicesCache(SystemTopicClient.Reader reader, CompletableFuture future) { + if (closed.get()) { + future.completeExceptionally(new BrokerServiceException(getClass().getName() + " is closed.")); + cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); + return; + } reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { if (ex != null) { log.error("[{}] Failed to check the move events for the system topic", @@ -511,6 +528,10 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean * #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic. */ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) { + if (closed.get()) { + cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false); + return; + } reader.readNextAsync() .thenAccept(msg -> { refreshTopicPoliciesCache(msg); @@ -628,11 +649,20 @@ private NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory() private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader reader, TopicName topicName, TopicPolicies policies, CompletableFuture future) { + if (closed.get()) { + future.completeExceptionally(new BrokerServiceException(getClass().getName() + " is closed.")); + reader.closeAsync().whenComplete((v, e) -> { + if (e != null) { + log.error("[{}] Close reader error.", topicName, e); + } + }); + return; + } reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { if (ex != null) { future.completeExceptionally(ex); } - if (hasMore) { + if (hasMore != null && hasMore) { reader.readNextAsync().whenComplete((msg, e) -> { if (e != null) { future.completeExceptionally(e); @@ -656,7 +686,9 @@ private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader { if (e != null) { log.error("[{}] Close reader error.", topicName, e); @@ -740,4 +772,23 @@ protected AsyncLoadingCache } private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class); + + @Override + public void close() throws Exception { + if (closed.compareAndSet(false, true)) { + writerCaches.synchronous().invalidateAll(); + readerCaches.values().forEach(future -> { + if (future != null && !future.isCompletedExceptionally()) { + future.thenAccept(reader -> { + try { + reader.close(); + } catch (Exception e) { + log.error("Failed to close reader.", e); + } + }); + } + }); + readerCaches.clear(); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java index 332d754cf97d2..bd1b90981695e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java @@ -142,8 +142,26 @@ private SystemTopicClient getTransactionBufferSystemTopicClient(NamespaceName public void close() throws Exception { for (Map.Entry> entry : clients.entrySet()) { - entry.getValue().close(); + try { + entry.getValue().close(); + } catch (Exception e) { + log.error("Failed to close system topic client for namespace {}", entry.getKey(), e); + } + } + clients.clear(); + for (Map.Entry> entry : refCountedWriterMap.entrySet()) { + CompletableFuture> future = entry.getValue().getFuture(); + if (!future.isCompletedExceptionally()) { + future.thenAccept(writer -> { + try { + writer.close(); + } catch (Exception e) { + log.error("Failed to close writer for namespace {}", entry.getKey(), e); + } + }); + } } + refCountedWriterMap.clear(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index aa3a6aaeff29f..41fecb3b87ed4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -38,7 +38,7 @@ * Topic policies service. */ @InterfaceStability.Evolving -public interface TopicPoliciesService { +public interface TopicPoliciesService extends AutoCloseable { TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled(); long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000; @@ -239,5 +239,10 @@ public void registerListener(TopicName topicName, TopicPolicyListener listener) { //No-op } + + @Override + public void close() { + //No-op + } } }