Skip to content

Commit

Permalink
[improve][broker] Close TopicPoliciesService to allow Pulsar broker g…
Browse files Browse the repository at this point in the history
…raceful shutdown (#22589)
  • Loading branch information
lhotari authored Apr 26, 2024
1 parent f8f256c commit 7a44c80
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,11 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (topicPoliciesService != null) {
topicPoliciesService.close();
topicPoliciesService = null;
}

if (client != null) {
client.close();
client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
private final PulsarService pulsarService;
private final HashSet localCluster;
private final String clusterName;
private final AtomicBoolean closed = new AtomicBoolean(false);

private final ConcurrentInitializer<NamespaceEventsSystemTopicFactory>
namespaceEventsSystemTopicFactoryLazyInitializer = new LazyInitializer<>() {
Expand Down Expand Up @@ -110,12 +112,18 @@ public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
this.writerCaches = Caffeine.newBuilder()
.expireAfterAccess(5, TimeUnit.MINUTES)
.removalListener((namespaceName, writer, cause) -> {
((SystemTopicClient.Writer) writer).closeAsync().exceptionally(ex -> {
log.error("[{}] Close writer error.", namespaceName, ex);
return null;
});
try {
((SystemTopicClient.Writer) writer).close();
} catch (Exception e) {
log.error("[{}] Close writer error.", namespaceName, e);
}
})
.executor(pulsarService.getExecutor())
.buildAsync((namespaceName, executor) -> {
if (closed.get()) {
return CompletableFuture.failedFuture(
new BrokerServiceException(getClass().getName() + " is closed."));
}
SystemTopicClient<PulsarEvent> systemTopicClient = getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(namespaceName);
return systemTopicClient.newWriterAsync();
Expand Down Expand Up @@ -382,6 +390,10 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name

protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClient(
NamespaceName namespace) {
if (closed.get()) {
return CompletableFuture.failedFuture(
new BrokerServiceException(getClass().getName() + " is closed."));
}
try {
createSystemTopicFactoryIfNeeded();
} catch (PulsarServerException ex) {
Expand Down Expand Up @@ -430,6 +442,11 @@ public boolean test(NamespaceBundle namespaceBundle) {
}

private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) {
if (closed.get()) {
future.completeExceptionally(new BrokerServiceException(getClass().getName() + " is closed."));
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
return;
}
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
log.error("[{}] Failed to check the move events for the system topic",
Expand Down Expand Up @@ -511,6 +528,10 @@ private void cleanCacheAndCloseReader(@Nonnull NamespaceName namespace, boolean
* #{@link SystemTopicBasedTopicPoliciesService#getTopicPoliciesAsync(TopicName)} method to block loading topic.
*/
private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader) {
if (closed.get()) {
cleanCacheAndCloseReader(reader.getSystemTopic().getTopicName().getNamespaceObject(), false);
return;
}
reader.readNextAsync()
.thenAccept(msg -> {
refreshTopicPoliciesCache(msg);
Expand Down Expand Up @@ -628,11 +649,20 @@ private NamespaceEventsSystemTopicFactory getNamespaceEventsSystemTopicFactory()
private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<PulsarEvent> reader,
TopicName topicName, TopicPolicies policies,
CompletableFuture<TopicPolicies> future) {
if (closed.get()) {
future.completeExceptionally(new BrokerServiceException(getClass().getName() + " is closed."));
reader.closeAsync().whenComplete((v, e) -> {
if (e != null) {
log.error("[{}] Close reader error.", topicName, e);
}
});
return;
}
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
if (ex != null) {
future.completeExceptionally(ex);
}
if (hasMore) {
if (hasMore != null && hasMore) {
reader.readNextAsync().whenComplete((msg, e) -> {
if (e != null) {
future.completeExceptionally(e);
Expand All @@ -656,7 +686,9 @@ private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<Puls
}
});
} else {
future.complete(policies);
if (!future.isDone()) {
future.complete(policies);
}
reader.closeAsync().whenComplete((v, e) -> {
if (e != null) {
log.error("[{}] Close reader error.", topicName, e);
Expand Down Expand Up @@ -740,4 +772,23 @@ protected AsyncLoadingCache<NamespaceName, SystemTopicClient.Writer<PulsarEvent>
}

private static final Logger log = LoggerFactory.getLogger(SystemTopicBasedTopicPoliciesService.class);

@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
writerCaches.synchronous().invalidateAll();
readerCaches.values().forEach(future -> {
if (future != null && !future.isCompletedExceptionally()) {
future.thenAccept(reader -> {
try {
reader.close();
} catch (Exception e) {
log.error("Failed to close reader.", e);
}
});
}
});
readerCaches.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,26 @@ private SystemTopicClient<T> getTransactionBufferSystemTopicClient(NamespaceName

public void close() throws Exception {
for (Map.Entry<NamespaceName, SystemTopicClient<T>> entry : clients.entrySet()) {
entry.getValue().close();
try {
entry.getValue().close();
} catch (Exception e) {
log.error("Failed to close system topic client for namespace {}", entry.getKey(), e);
}
}
clients.clear();
for (Map.Entry<NamespaceName, ReferenceCountedWriter<T>> entry : refCountedWriterMap.entrySet()) {
CompletableFuture<SystemTopicClient.Writer<T>> future = entry.getValue().getFuture();
if (!future.isCompletedExceptionally()) {
future.thenAccept(writer -> {
try {
writer.close();
} catch (Exception e) {
log.error("Failed to close writer for namespace {}", entry.getKey(), e);
}
});
}
}
refCountedWriterMap.clear();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* Topic policies service.
*/
@InterfaceStability.Evolving
public interface TopicPoliciesService {
public interface TopicPoliciesService extends AutoCloseable {

TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
long DEFAULT_GET_TOPIC_POLICY_TIMEOUT = 30_000;
Expand Down Expand Up @@ -239,5 +239,10 @@ public void registerListener(TopicName topicName, TopicPolicyListener<TopicPolic
public void unregisterListener(TopicName topicName, TopicPolicyListener<TopicPolicies> listener) {
//No-op
}

@Override
public void close() {
//No-op
}
}
}

0 comments on commit 7a44c80

Please sign in to comment.