From 40e9720bae60a7427db864d17f78dde521fe304e Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 6 May 2024 13:20:52 +0200 Subject: [PATCH] [JMSPublishFilters] Do not filter messages during broker shutdown (#140) --- .../jms/selectors/JMSPublishFilters.java | 78 +++++++++++-------- 1 file changed, 47 insertions(+), 31 deletions(-) diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 96028afe..dc4f92af 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; @@ -69,6 +70,7 @@ public class JMSPublishFilters implements BrokerInterceptor { private final JMSFilter filter = new JMSFilter(false); private boolean enabled = false; + private final AtomicBoolean closed = new AtomicBoolean(); private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers; private static final Field dispatchMessagesThreadFieldPersistentDispatcherSingleActiveConsumer; @@ -178,39 +180,51 @@ public void messageProduced( scheduleOnDispatchThread( subscription, () -> { - long now = System.nanoTime(); - try { - FilterContext filterContext = new FilterContext(); - filterContext.setSubscription(subscription); - filterContext.setMsgMetadata(messageMetadata); - filterContext.setConsumer(null); - Entry entry = null; // we would need the Entry only in case of batch messages - EntryFilter.FilterResult filterResult = - filter.filterEntry(entry, filterContext, true); - if (filterResult == EntryFilter.FilterResult.REJECT) { - if (log.isDebugEnabled()) { - log.debug( - "Reject message {}:{} for subscription {}", - ledgerId, - entryId, - subscription.getName()); - } - // ir is possible that calling this method in this thread may affect - // performance - // let's keep it simple for now, we can optimize it later - subscription.acknowledgeMessage( - Collections.singletonList(new PositionImpl(ledgerId, entryId)), - CommandAck.AckType.Individual, - null); - } - } finally { - filterProcessingTimeOnProduce - .labels(producer.getTopic().getName(), subscription.getName()) - .observe(System.nanoTime() - now); - } + filterAndAckMessage(producer, ledgerId, entryId, subscription, messageMetadata); }); } - ; + } + + private void filterAndAckMessage( + Producer producer, + long ledgerId, + long entryId, + Subscription subscription, + MessageMetadata messageMetadata) { + if (closed.get()) { + // the broker is shutting down, we cannot process the entries + // this operation has been enqueued before the broker shutdown + return; + } + long now = System.nanoTime(); + try { + FilterContext filterContext = new FilterContext(); + filterContext.setSubscription(subscription); + filterContext.setMsgMetadata(messageMetadata); + filterContext.setConsumer(null); + Entry entry = null; // we would need the Entry only in case of batch messages + EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext, true); + if (filterResult == EntryFilter.FilterResult.REJECT) { + if (log.isDebugEnabled()) { + log.debug( + "Reject message {}:{} for subscription {}", + ledgerId, + entryId, + subscription.getName()); + } + // ir is possible that calling this method in this thread may affect + // performance + // let's keep it simple for now, we can optimize it later + subscription.acknowledgeMessage( + Collections.singletonList(new PositionImpl(ledgerId, entryId)), + CommandAck.AckType.Individual, + null); + } + } finally { + filterProcessingTimeOnProduce + .labels(producer.getTopic().getName(), subscription.getName()) + .observe(System.nanoTime() - now); + } } private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) { @@ -247,6 +261,8 @@ private static void scheduleOnDispatchThread(Subscription subscription, Runnable @Override public void close() { + log.info("Broker is shutting down. Disabling JMSPublishFilters interceptor"); + closed.set(true); filter.close(); }