diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java index e287acdb..cc9dc1f9 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java @@ -127,7 +127,7 @@ public JMSFilter() { public FilterResult filterEntry(Entry entry, FilterContext context) { long start = System.nanoTime(); try { - return filterEntry(entry, context, false); + return filterEntry(entry, context, false, null); } finally { filterProcessingTime .labels(context.getSubscription().getTopicName(), context.getSubscription().getName()) @@ -206,7 +206,11 @@ private boolean isHandleOnlySelectors(FilterContext context) { return handleOnlySelectors; } - public FilterResult filterEntry(Entry entry, FilterContext context, boolean onMessagePublish) { + public FilterResult filterEntry( + Entry entry, + FilterContext context, + boolean onMessagePublish, + MessageMetadataCache messageMetadataCache) { Consumer consumer = context.getConsumer(); Map consumerMetadata = consumer != null ? consumer.getMetadata() : Collections.emptyMap(); @@ -279,7 +283,8 @@ public FilterResult filterEntry(Entry entry, FilterContext context, boolean onMe selectorOnSubscription, selector, subscription, - consumerMetadata); + consumerMetadata, + messageMetadataCache); } } catch (Throwable err) { log.error("Error while processing entry " + err, err); @@ -325,13 +330,19 @@ private FilterResult processSingleMessageEntry( SelectorSupport selectorOnSubscription, SelectorSupport selector, Subscription subscription, - Map consumerMetadata) + Map consumerMetadata, + MessageMetadataCache messageMetadataCache) throws JMSException { // here we are dealing with a single message, // so we can reject the message more easily PropertyEvaluator typedProperties = new PropertyEvaluator( - metadata.getPropertiesCount(), metadata.getPropertiesList(), null, metadata, context); + metadata.getPropertiesCount(), + metadata.getPropertiesList(), + null, + metadata, + context, + messageMetadataCache); if (selectorOnSubscription != null) { boolean matchesSubscriptionFilter = matches(typedProperties, selectorOnSubscription); @@ -414,7 +425,8 @@ private FilterResult processBatchEntry( singleMessageMetadata.getPropertiesList(), singleMessageMetadata, null, - context); + context, + null); // noLocal filter // all the messages in the batch come from the Producer/Connection @@ -545,8 +557,13 @@ private static class PropertyEvaluator implements Function { private SingleMessageMetadata singleMessageMetadata; private MessageMetadata metadata; private FilterContext context; + private MessageMetadataCache messageMetadataCache; private Object getProperty(String name) { + if (messageMetadataCache != null) { + return messageMetadataCache.getProperty( + name, n -> JMSFilter.getProperty(propertiesCount, propertiesList, n)); + } return JMSFilter.getProperty(propertiesCount, propertiesList, name); } diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index f1d5182b..1e171098 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -21,35 +21,49 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import io.prometheus.client.Histogram; import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Collections; -import java.util.concurrent.Executor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; +import java.util.stream.Collectors; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.intercept.BrokerInterceptor; -import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.broker.service.plugin.FilterContext; import org.apache.pulsar.common.api.proto.BaseCommand; @@ -60,6 +74,8 @@ @Slf4j public class JMSPublishFilters implements BrokerInterceptor { private static final String JMS_FILTER_METADATA = "jms-msg-metadata"; + private static final ByteBuf COULDNOT_ACQUIRE_MEMORY_PLACEHOLDER = Unpooled.EMPTY_BUFFER; + private static final int TIMEOUT_READ_ENTRY = 10000; // 10 seconds to read private static final Histogram filterProcessingTimeOnPublish = Histogram.build() @@ -78,6 +94,23 @@ public class JMSPublishFilters implements BrokerInterceptor { .buckets(BUCKETS) .create(); + private static final Histogram filterAckTimeOnProduce = + Histogram.build() + .name("pulsar_jmsfilter_ack_time_onpublish") + .help("Time taken to persist the ack on the broker after applying filters") + .labelNames("topic", "subscription") + .buckets(BUCKETS) + .create(); + + private static final Histogram filterOverallProcessingTimeOnPublish = + Histogram.build() + .name("pulsar_jmsfilter_overall_processing_time_onpublish") + .help( + "Time taken to process the message on the broker from publishers and applying filters") + .labelNames("topic") + .buckets(BUCKETS) + .create(); + private static final Gauge memoryUsed = Gauge.build() .name("pulsar_jmsfilter_processing_memory") @@ -90,34 +123,25 @@ public class JMSPublishFilters implements BrokerInterceptor { .help("Number of pending operations in the JMSPublishFilters interceptor") .create(); + private static final Gauge pendingAcks = + Gauge.build() + .name("pulsar_jmsfilter_processing_pending_acks") + .help("Number of pending acks in the JMSPublishFilters interceptor") + .create(); + + private static final Counter readFromLedger = + Counter.build() + .name("pulsar_jmsfilter_entries_read_from_ledger") + .help("Number of entries read from ledgers by JMSPublishFilters interceptor") + .create(); + private final JMSFilter filter = new JMSFilter(false); private boolean enabled = false; private Semaphore memoryLimit; private final AtomicBoolean closed = new AtomicBoolean(); - - private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers; - private static final Field dispatchMessagesThreadFieldPersistentDispatcherSingleActiveConsumer; - - static { - Field fieldPersistentDispatcherMultipleConsumers = null; - Field fieldPersistentDispatcherSingleActiveConsumer = null; - try { - fieldPersistentDispatcherMultipleConsumers = - PersistentDispatcherMultipleConsumers.class.getDeclaredField("dispatchMessagesThread"); - fieldPersistentDispatcherMultipleConsumers.setAccessible(true); - - fieldPersistentDispatcherSingleActiveConsumer = - PersistentDispatcherSingleActiveConsumer.class.getDeclaredField("executor"); - fieldPersistentDispatcherSingleActiveConsumer.setAccessible(true); - - } catch (NoSuchFieldException e) { - log.error("Cannot access thread field: " + e); - } - dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers = - fieldPersistentDispatcherMultipleConsumers; - dispatchMessagesThreadFieldPersistentDispatcherSingleActiveConsumer = - fieldPersistentDispatcherSingleActiveConsumer; - } + private ExecutorService executor; + private final BlockingQueue ackQueue = new BatchedArrayBlockingQueue<>(100000); + private final Runnable drainAckQueueTask = SafeRunnable.safeRun(this::drainAckQueue); @Override public void initialize(PulsarService pulsarService) { @@ -129,12 +153,28 @@ public void initialize(PulsarService pulsarService) { .getProperty("jmsApplyFiltersOnPublish", "true")); log.info("jmsApplyFiltersOnPublish={}", enabled); + // the number of threads is bigger than the number of cores because + // sometimes the operation has to perform a blocking read to get the entry from the bookies + int numThreads = + Integer.parseInt( + pulsarService + .getConfiguration() + .getProperties() + .getProperty( + "jmsFiltersOnPublishThreads", + (Runtime.getRuntime().availableProcessors() * 4) + "")); + log.info("jmsFiltersOnPublishThreads={}", numThreads); + executor = Executors.newFixedThreadPool(numThreads, new WorkersThreadFactory()); try { log.info("Registering JMSFilter metrics"); CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnPublish); CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnProduce); + CollectorRegistry.defaultRegistry.register(filterAckTimeOnProduce); CollectorRegistry.defaultRegistry.register(memoryUsed); CollectorRegistry.defaultRegistry.register(pendingOperations); + CollectorRegistry.defaultRegistry.register(pendingAcks); + CollectorRegistry.defaultRegistry.register(filterOverallProcessingTimeOnPublish); + CollectorRegistry.defaultRegistry.register(readFromLedger); } catch (IllegalArgumentException alreadyRegistered) { // ignore log.info("Filter metrics already registered", alreadyRegistered); @@ -143,16 +183,27 @@ public void initialize(PulsarService pulsarService) { pulsarService .getConfiguration() .getProperties() - .getProperty("jmsFiltersOnPublishMaxMemoryMB", "128"); + .getProperty("jmsFiltersOnPublishMaxMemoryMB", "256"); try { int memoryLimitBytes = Integer.parseInt(memoryLimitString) * 1024 * 1024; - memoryLimit = new Semaphore(memoryLimitBytes); - log.info("jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes); + if (memoryLimitBytes > 0) { + memoryLimit = new Semaphore(memoryLimitBytes); + log.info( + "jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes); + } else { + memoryLimit = null; + log.info( + "jmsFiltersOnPublishMaxMemoryMB={} (no cache for JMSPublishFilters)", + memoryLimitString); + } } catch (NumberFormatException e) { throw new RuntimeException( "Invalid memory limit jmsFiltersOnPublishMaxMemoryMB=" + memoryLimitString, e); } + + // start the ack queue draining + executor.submit(drainAckQueueTask); } @Override @@ -174,7 +225,11 @@ public void onMessagePublish( } // we must make a copy because the ByteBuf will be released ByteBuf messageMetadata = copyMessageMetadataAndAcquireMemory(headersAndPayload); - publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata); + if (messageMetadata != null) { + publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata); + } else { + publishContext.setProperty(JMS_FILTER_METADATA, COULDNOT_ACQUIRE_MEMORY_PLACEHOLDER); + } // as soon as we find a good reason to apply the filters in messageProduced // we can exit return; @@ -187,14 +242,21 @@ public void onMessagePublish( } public ByteBuf copyMessageMetadataAndAcquireMemory(ByteBuf buffer) { + if (memoryLimit == null) { + return null; + } int readerIndex = buffer.readerIndex(); skipBrokerEntryMetadataIfExist(buffer); skipChecksumIfPresent(buffer); int metadataSize = (int) buffer.readUnsignedInt(); - // this is going to throttle the producer if the memory limit is reached - // please note that this is a blocking operation on the Netty eventpool - // currently we cannot do better than this, as the interceptor API is blocking - memoryLimit.acquireUninterruptibly(metadataSize); + // we cannot block the producer thread + // if there is no memory available we must return null and the entry will be read before + // applying the filters + boolean acquired = memoryLimit.tryAcquire(metadataSize); + if (!acquired) { + buffer.readerIndex(readerIndex); + return null; + } // please note that Netty would probably retain more memory than this buffer // but this is the best approximation we can do memoryUsed.inc(metadataSize); @@ -219,40 +281,40 @@ public void messageProduced( if (!enabled) { return; } + long startNanos = System.nanoTime(); int memorySize = messageMetadataUnparsed.readableBytes(); - AtomicInteger pending = new AtomicInteger(1); - Consumer onComplete = - (mainThread) -> { - if (!mainThread) { - // the main thread doesn't count as a pending operation - pendingOperations.dec(); - } - if (pending.decrementAndGet() == 0) { - messageMetadataUnparsed.release(); + Runnable onComplete = + () -> { + pendingOperations.dec(); + messageMetadataUnparsed.release(); + if (memoryLimit != null) { memoryLimit.release(memorySize); - memoryUsed.dec(memorySize); } + memoryUsed.dec(memorySize); }; - try { - producer - .getTopic() - .getSubscriptions() - .forEach( - (___, subscription) -> { - if (!(isPersistentSubscriptionWithSelector(subscription))) { - return; - } - pending.incrementAndGet(); - pendingOperations.inc(); - ByteBuf duplicate = messageMetadataUnparsed.duplicate(); - FilterAndAckMessageOperation filterAndAckMessageOperation = - new FilterAndAckMessageOperation( - ledgerId, entryId, subscription, duplicate, onComplete); - scheduleOnDispatchThread(subscription, filterAndAckMessageOperation); - }); - } finally { - onComplete.accept(true); + Topic topic = producer.getTopic(); + List subscriptions = + topic + .getSubscriptions() + .values() + .stream() + .filter(JMSPublishFilters::isPersistentSubscriptionWithSelector) + .collect(Collectors.toList()); + pendingOperations.inc(); + if (subscriptions.isEmpty()) { // this is very unlikely + onComplete.run(); + return; } + FilterAndAckMessageOperation filterAndAckMessageOperation = + new FilterAndAckMessageOperation( + ledgerId, + entryId, + startNanos, + (PersistentTopic) topic, + subscriptions, + messageMetadataUnparsed, + onComplete); + scheduleOnWorkerThreads(filterAndAckMessageOperation, onComplete); } private static boolean isPersistentSubscriptionWithSelector(Subscription subscription) { @@ -261,98 +323,250 @@ private static boolean isPersistentSubscriptionWithSelector(Subscription subscri && "true".equals(subscription.getSubscriptionProperties().get("jms.filtering")); } + private static class WorkersThreadFactory implements ThreadFactory { + private static final AtomicInteger THREAD_COUNT = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "jms-filters-workers-" + THREAD_COUNT.getAndIncrement()); + } + } + @AllArgsConstructor private class FilterAndAckMessageOperation implements Runnable { final long ledgerId; final long entryId; - final Subscription subscription; + final long startNanos; + final PersistentTopic topic; + final List subscriptions; final ByteBuf messageMetadataUnparsed; - final Consumer onComplete; + final Runnable onComplete; @Override public void run() { try { - filterAndAckMessage(ledgerId, entryId, subscription, messageMetadataUnparsed); + filterAndAckMessage(ledgerId, entryId, topic, subscriptions, messageMetadataUnparsed); + } catch (Throwable error) { + log.error( + "Error while filtering message {}:{}, topic {}", + ledgerId, + entryId, + topic.getName(), + error); } finally { - onComplete.accept(false); + onComplete.run(); + filterOverallProcessingTimeOnPublish + .labels(topic.getName()) + .observe(System.nanoTime() - startNanos); } } } private void filterAndAckMessage( - long ledgerId, long entryId, Subscription subscription, ByteBuf messageMetadataUnparsed) { + long ledgerId, + long entryId, + PersistentTopic topic, + List subscriptions, + ByteBuf messageMetadataUnparsed) { if (closed.get()) { // the broker is shutting down, we cannot process the entries // this operation has been enqueued before the broker shutdown return; } - MessageMetadata messageMetadata = getMessageMetadata(messageMetadataUnparsed); - long now = System.nanoTime(); + + if (!isTopicOwned(topic)) { + return; + } + + ByteBuf entryReadFromBookie = null; try { - FilterContext filterContext = new FilterContext(); - filterContext.setSubscription(subscription); - filterContext.setMsgMetadata(messageMetadata); - filterContext.setConsumer(null); - Entry entry = null; // we would need the Entry only in case of batch messages - EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext, true); - if (filterResult == EntryFilter.FilterResult.REJECT) { - if (log.isDebugEnabled()) { - log.debug( - "Reject message {}:{} for subscription {}", - ledgerId, - entryId, - subscription.getName()); + final MessageMetadata messageMetadata; + if (messageMetadataUnparsed == COULDNOT_ACQUIRE_MEMORY_PLACEHOLDER) { + // note that this is a blocking IO operation, for this reason + // it makes sense to have more threads than the number of cores + try { + entryReadFromBookie = + readSingleEntry(ledgerId, entryId, topic).get(TIMEOUT_READ_ENTRY, TimeUnit.SECONDS); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + } catch (TimeoutException timeoutException) { + // timeout + } catch (ExecutionException err) { + throw err.getCause(); + } + if (entryReadFromBookie == null) { + log.error("Could not read entry {}:{} from topic {}", ledgerId, entryId, topic); + return; } - // ir is possible that calling this method in this thread may affect - // performance - // let's keep it simple for now, we can optimize it later - subscription.acknowledgeMessage( - Collections.singletonList(new PositionImpl(ledgerId, entryId)), - CommandAck.AckType.Individual, - null); + messageMetadata = new MessageMetadata(); + skipBrokerEntryMetadataIfExist(entryReadFromBookie); + skipChecksumIfPresent(entryReadFromBookie); + int metadataSize = (int) entryReadFromBookie.readUnsignedInt(); + messageMetadata.parseFrom(entryReadFromBookie, metadataSize); + } else { + messageMetadata = + getMessageMetadata(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes()); } + + // if we have more than one subscription we can save a lot of resources by caching the + // properties + MessageMetadataCache messageMetadataCache = + subscriptions.size() > 1 ? new MessageMetadataCache() : null; + for (Subscription subscription : subscriptions) { + if (closed.get()) { + // the broker is shutting down, we cannot process the entries + // this operation has been enqueued before the broker shutdown + return; + } + if (!isTopicOwned(topic)) { + return; + } + + EntryFilter.FilterResult filterResult; + long now = System.nanoTime(); + try { + FilterContext filterContext = new FilterContext(); + filterContext.setSubscription(subscription); + filterContext.setMsgMetadata(messageMetadata); + filterContext.setConsumer(null); + Entry entry = null; // we would need the Entry only in case of batch messages + filterResult = filter.filterEntry(entry, filterContext, true, messageMetadataCache); + } finally { + filterProcessingTimeOnProduce + .labels(topic.getName(), subscription.getName()) + .observe(System.nanoTime() - now); + } + + if (filterResult == EntryFilter.FilterResult.REJECT) { + if (log.isDebugEnabled()) { + log.debug( + "Reject message {}:{} for subscription {}", + ledgerId, + entryId, + subscription.getName()); + } + + pendingAcks.inc(); + ackQueue.put( + new AckFuture( + (PersistentSubscription) subscription, new PositionImpl(ledgerId, entryId))); + } + } + } catch (Throwable error) { + log.error("Error while filtering message", error); } finally { - filterProcessingTimeOnProduce - .labels(subscription.getTopic().getName(), subscription.getName()) - .observe(System.nanoTime() - now); + if (entryReadFromBookie != null) { + entryReadFromBookie.release(); + } } } - private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed) { - MessageMetadata messageMetadata = new MessageMetadata(); - messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes()); - return messageMetadata; + @AllArgsConstructor + private static final class AckFuture { + private final PersistentSubscription subscription; + private final PositionImpl position; } - private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) { + private void drainAckQueue() { try { - Dispatcher dispatcher = subscription.getDispatcher(); - if (dispatcher instanceof PersistentDispatcherMultipleConsumers) { - ExecutorService singleThreadExecutor = - (ExecutorService) - dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers.get(dispatcher); - if (singleThreadExecutor != null) { - singleThreadExecutor.submit(runnable); - return; + Map> acksBySubscription = new HashMap<>(); + try { + int maxItems = 50000; + while (maxItems-- > 0) { + AckFuture ackFuture = ackQueue.poll(100, TimeUnit.MILLISECONDS); + if (ackFuture == null) { + break; + } + pendingAcks.dec(); + List acks = + acksBySubscription.computeIfAbsent(ackFuture.subscription, k -> new ArrayList<>()); + acks.add(ackFuture.position); + } + } finally { + // continue draining the queue + if (!closed.get()) { + executor.submit(drainAckQueueTask); } } - if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) { - Executor singleThreadExecutor = - (Executor) - dispatchMessagesThreadFieldPersistentDispatcherSingleActiveConsumer.get(dispatcher); - if (singleThreadExecutor != null) { - singleThreadExecutor.execute(runnable); + for (Map.Entry> entry : acksBySubscription.entrySet()) { + long now = System.nanoTime(); + Subscription subscription = entry.getKey(); + PersistentTopic topic = (PersistentTopic) subscription.getTopic(); + if (!isTopicOwned(topic)) { return; } + try { + List acks = entry.getValue(); + subscription.acknowledgeMessage(acks, CommandAck.AckType.Individual, null); + } finally { + filterAckTimeOnProduce + .labels(topic.getName(), subscription.getName()) + .observe(System.nanoTime() - now); + } } - // this case also happens when there is no dispatcher (no consumer has connected since the - // last - // topic load) - // this thread is on the same threadpool that is used by PersistentDispatcherMultipleConsumers - // and PersistentDispatcherSingleActiveConsumer - subscription.getTopic().getBrokerService().getTopicOrderedExecutor().execute(runnable); + } catch (InterruptedException exit) { + Thread.currentThread().interrupt(); + log.info("JMSPublishFilter Ack queue draining interrupted"); + } catch (Throwable error) { + log.error("Error while draining ack queue", error); + } + } + + private static boolean isTopicOwned(PersistentTopic topic) { + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger(); + switch (managedLedger.getState()) { + case Closed: + case Terminated: + case Fenced: + case FencedForDeletion: + return false; + default: + return true; + } + } + + private static CompletableFuture readSingleEntry( + long ledgerId, long entryId, PersistentTopic topic) { + readFromLedger.inc(); + CompletableFuture entryFuture = new CompletableFuture<>(); + + PositionImpl position = new PositionImpl(ledgerId, entryId); + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger(); + // asyncReadEntry reads from the Broker cache, and falls bach to the Bookie + // is also leverage bookie read deduplication + managedLedger.asyncReadEntry( + position, + new AsyncCallbacks.ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + ByteBuf data = entry.getDataBuffer(); + entryFuture.complete(data); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + log.error("Failed to read entry", exception); + entryFuture.completeExceptionally(exception); + } + }, + null); + return entryFuture; + } + + private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed, int size) { + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.parseFrom(messageMetadataUnparsed, size); + return messageMetadata; + } + + private void scheduleOnWorkerThreads(Runnable runnable, Runnable onError) { + // we let the thread pool peek any thread, + // this way a broker owning few partitions can still use all the cores + try { + executor.submit(runnable); } catch (Throwable error) { - log.error("Error while scheduling on dispatch thread", error); + log.error("Error while scheduling on worker threads", error); + onError.run(); } } @@ -361,6 +575,7 @@ public void close() { log.info("Broker is shutting down. Disabling JMSPublishFilters interceptor"); closed.set(true); filter.close(); + executor.shutdown(); } @Override diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java new file mode 100644 index 00000000..c0b2f685 --- /dev/null +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java @@ -0,0 +1,28 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms.selectors; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +class MessageMetadataCache { + final Map properties = new HashMap<>(); + + Object getProperty(String key, Function compute) { + return properties.computeIfAbsent(key, compute); + } +} diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index abdc94cd..64e0ae1c 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -128,10 +128,10 @@ copy filters - - - - + + + + diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java similarity index 78% rename from pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java rename to pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java index b08f12af..7264456c 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBase.java @@ -37,23 +37,15 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.extension.RegisterExtension; @Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class JMSPublishFiltersTest { +public abstract class JMSPublishFiltersBase { - @RegisterExtension - static PulsarContainerExtension pulsarContainer = - new PulsarContainerExtension() - .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true") - .withEnv("PULSAR_PREFIX_brokerInterceptorsDirectory", "/pulsar/interceptors") - .withEnv("PULSAR_PREFIX_brokerInterceptors", "jms-publish-filters") - .withEnv("PULSAR_PREFIX_jmsApplyFiltersOnPublish", "true") - .withLogContainerOutput(true); + abstract PulsarContainerExtension getPulsarContainer(); private Map buildProperties() { - Map properties = pulsarContainer.buildJMSConnectionProperties(); + Map properties = getPulsarContainer().buildJMSConnectionProperties(); properties.put("jms.useServerSideFiltering", true); properties.put("jms.enableClientSideEmulation", false); @@ -94,7 +86,7 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { subscriptionProperties.put("jms.selector", newSelector); subscriptionProperties.put("jms.filtering", "true"); - pulsarContainer + getPulsarContainer() .getAdmin() .topics() .updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties); @@ -112,6 +104,9 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { } } + // wait for the filters to be processed in background + Thread.sleep(5000); + TextMessage textMessage = (TextMessage) consumer1.receive(); assertEquals("foo-9", textMessage.getText()); @@ -121,23 +116,30 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { // no more messages assertNull(consumer1.receiveNoWait()); - // ensure that the filter didn't reject any message while dispatching to the consumer - // because the filter has been already applied on the write path - TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); - SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); - if (transacted) { - // when we enable transactions the stats are not updated correctly - // it seems that the transaction marker is counted as "processed by filters" - // but actually it is not processed by the JMSFilter at all - assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 2); - assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); - assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); - session.commit(); - } else { - assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1); - assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); - assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); - } + Awaitility.await() + .untilAsserted( + () -> { + // ensure that the filter didn't reject any message while dispatching to the + // consumer + // because the filter has been already applied on the write path + TopicStats stats = + getPulsarContainer().getAdmin().topics().getStats(topicName); + SubscriptionStats subscriptionStats = + stats.getSubscriptions().get("jms-queue"); + if (transacted) { + // when we enable transactions the stats are not updated correctly + // it seems that the transaction marker is counted as "processed by filters" + // but actually it is not processed by the JMSFilter at all + assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 2); + assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); + assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); + session.commit(); + } else { + assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1); + assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); + assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); + } + }); } // create a message that doesn't match the filter @@ -147,9 +149,15 @@ private void sendMessageReceiveFromQueue(boolean transacted) throws Exception { TextMessage textMessage = session.createTextMessage("backlog"); producer.send(textMessage); - TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); - SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); - assertEquals(0, subscriptionStats.getMsgBacklog()); + Awaitility.await() + .untilAsserted( + () -> { + TopicStats stats = + getPulsarContainer().getAdmin().topics().getStats(topicName); + SubscriptionStats subscriptionStats = + stats.getSubscriptions().get("jms-queue"); + assertEquals(0, subscriptionStats.getMsgBacklog()); + }); if (transacted) { session.commit(); @@ -181,7 +189,7 @@ void testManyMessagesWithPartitions() throws Exception { subscriptionProperties.put("jms.selector", newSelector); subscriptionProperties.put("jms.filtering", "true"); - pulsarContainer + getPulsarContainer() .getAdmin() .topics() .updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBaseMemoryCacheTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBaseMemoryCacheTest.java new file mode 100644 index 00000000..4b2c913e --- /dev/null +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBaseMemoryCacheTest.java @@ -0,0 +1,38 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class JMSPublishFiltersBaseMemoryCacheTest extends JMSPublishFiltersBase { + + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true") + .withEnv("PULSAR_PREFIX_brokerInterceptorsDirectory", "/pulsar/interceptors") + .withEnv("PULSAR_PREFIX_brokerInterceptors", "jms-publish-filters") + .withEnv("PULSAR_PREFIX_jmsApplyFiltersOnPublish", "true") + .withEnv("PULSAR_PREFIX_jmsFiltersOnPublishMaxMemoryMB", "110") + .withEnv("PULSAR_PREFIX_jmsFiltersOnPublishThreads", "10") + .withLogContainerOutput(true); + + @Override + PulsarContainerExtension getPulsarContainer() { + return pulsarContainer; + } +} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBaseMemoryNoCacheTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBaseMemoryNoCacheTest.java new file mode 100644 index 00000000..72b7ce8d --- /dev/null +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersBaseMemoryNoCacheTest.java @@ -0,0 +1,38 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class JMSPublishFiltersBaseMemoryNoCacheTest extends JMSPublishFiltersBase { + + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true") + .withEnv("PULSAR_PREFIX_brokerInterceptorsDirectory", "/pulsar/interceptors") + .withEnv("PULSAR_PREFIX_brokerInterceptors", "jms-publish-filters") + .withEnv("PULSAR_PREFIX_jmsApplyFiltersOnPublish", "true") + .withEnv("PULSAR_PREFIX_jmsFiltersOnPublishMaxMemoryMB", "0") // cache disabled + .withEnv("PULSAR_PREFIX_jmsFiltersOnPublishThreads", "10") + .withLogContainerOutput(true); + + @Override + PulsarContainerExtension getPulsarContainer() { + return pulsarContainer; + } +}