Skip to content

Commit

Permalink
[JMSPublishFilters] Do not filter messages during broker shutdown (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored May 6, 2024
1 parent dacf5a8 commit 40e9720
Showing 1 changed file with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class JMSPublishFilters implements BrokerInterceptor {

private final JMSFilter filter = new JMSFilter(false);
private boolean enabled = false;
private final AtomicBoolean closed = new AtomicBoolean();

private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers;
private static final Field dispatchMessagesThreadFieldPersistentDispatcherSingleActiveConsumer;
Expand Down Expand Up @@ -178,39 +180,51 @@ public void messageProduced(
scheduleOnDispatchThread(
subscription,
() -> {
long now = System.nanoTime();
try {
FilterContext filterContext = new FilterContext();
filterContext.setSubscription(subscription);
filterContext.setMsgMetadata(messageMetadata);
filterContext.setConsumer(null);
Entry entry = null; // we would need the Entry only in case of batch messages
EntryFilter.FilterResult filterResult =
filter.filterEntry(entry, filterContext, true);
if (filterResult == EntryFilter.FilterResult.REJECT) {
if (log.isDebugEnabled()) {
log.debug(
"Reject message {}:{} for subscription {}",
ledgerId,
entryId,
subscription.getName());
}
// ir is possible that calling this method in this thread may affect
// performance
// let's keep it simple for now, we can optimize it later
subscription.acknowledgeMessage(
Collections.singletonList(new PositionImpl(ledgerId, entryId)),
CommandAck.AckType.Individual,
null);
}
} finally {
filterProcessingTimeOnProduce
.labels(producer.getTopic().getName(), subscription.getName())
.observe(System.nanoTime() - now);
}
filterAndAckMessage(producer, ledgerId, entryId, subscription, messageMetadata);
});
}
;
}

private void filterAndAckMessage(
Producer producer,
long ledgerId,
long entryId,
Subscription subscription,
MessageMetadata messageMetadata) {
if (closed.get()) {
// the broker is shutting down, we cannot process the entries
// this operation has been enqueued before the broker shutdown
return;
}
long now = System.nanoTime();
try {
FilterContext filterContext = new FilterContext();
filterContext.setSubscription(subscription);
filterContext.setMsgMetadata(messageMetadata);
filterContext.setConsumer(null);
Entry entry = null; // we would need the Entry only in case of batch messages
EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext, true);
if (filterResult == EntryFilter.FilterResult.REJECT) {
if (log.isDebugEnabled()) {
log.debug(
"Reject message {}:{} for subscription {}",
ledgerId,
entryId,
subscription.getName());
}
// ir is possible that calling this method in this thread may affect
// performance
// let's keep it simple for now, we can optimize it later
subscription.acknowledgeMessage(
Collections.singletonList(new PositionImpl(ledgerId, entryId)),
CommandAck.AckType.Individual,
null);
}
} finally {
filterProcessingTimeOnProduce
.labels(producer.getTopic().getName(), subscription.getName())
.observe(System.nanoTime() - now);
}
}

private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) {
Expand Down Expand Up @@ -247,6 +261,8 @@ private static void scheduleOnDispatchThread(Subscription subscription, Runnable

@Override
public void close() {
log.info("Broker is shutting down. Disabling JMSPublishFilters interceptor");
closed.set(true);
filter.close();
}

Expand Down

0 comments on commit 40e9720

Please sign in to comment.