diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java index c4945a60..607c2acd 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java @@ -736,7 +736,7 @@ static Object getProperty(Map cacheProperties, String name) { // we pre-compute the type in order to avoid to scan the list to fine the type String type = SYSTEM_PROPERTIES_TYPES.get(name); if (type == null) { - type = propertyType(name); + type = cacheProperties.get(propertyType(name)); } String value = cacheProperties.get(name); return getObjectProperty(value, type); diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 499fe303..6a65b2f1 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -48,7 +49,6 @@ import javax.servlet.ServletResponse; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -140,8 +140,9 @@ public class JMSPublishFilters implements BrokerInterceptor { private Semaphore memoryLimit; private final AtomicBoolean closed = new AtomicBoolean(); private ExecutorService executor; - private final BlockingQueue ackQueue = new BatchedArrayBlockingQueue<>(100000); + private final BlockingQueue ackQueue = new ArrayBlockingQueue<>(100000); private final Runnable drainAckQueueTask = SafeRunnable.safeRun(this::drainAckQueue); + private ExecutorService drainAckQueueExecutor; @Override public void initialize(PulsarService pulsarService) { @@ -164,7 +165,19 @@ public void initialize(PulsarService pulsarService) { "jmsFiltersOnPublishThreads", (Runtime.getRuntime().availableProcessors() * 4) + "")); log.info("jmsFiltersOnPublishThreads={}", numThreads); - executor = Executors.newFixedThreadPool(numThreads, new WorkersThreadFactory()); + executor = + Executors.newFixedThreadPool(numThreads, new WorkersThreadFactory("jms-filters-workers-")); + int numThreadsAcks = + Integer.parseInt( + pulsarService + .getConfiguration() + .getProperties() + .getProperty( + "jmsFiltersOnPublishAckThreads", + (Math.max(2, Runtime.getRuntime().availableProcessors() / 2)) + "")); + log.info("jmsFiltersOnPublishAckThreads={}", numThreadsAcks); + drainAckQueueExecutor = + Executors.newFixedThreadPool(numThreadsAcks, new WorkersThreadFactory("jms-filters-acks-")); try { log.info("Registering JMSFilter metrics"); CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnPublish); @@ -203,7 +216,7 @@ public void initialize(PulsarService pulsarService) { } // start the ack queue draining - executor.submit(drainAckQueueTask); + drainAckQueueExecutor.submit(drainAckQueueTask); } @Override @@ -323,12 +336,14 @@ private static boolean isPersistentSubscriptionWithSelector(Subscription subscri && "true".equals(subscription.getSubscriptionProperties().get("jms.filtering")); } + @AllArgsConstructor private static class WorkersThreadFactory implements ThreadFactory { private static final AtomicInteger THREAD_COUNT = new AtomicInteger(); + private final String name; @Override public Thread newThread(Runnable r) { - return new Thread(r, "jms-filters-workers-" + THREAD_COUNT.getAndIncrement()); + return new Thread(r, name + THREAD_COUNT.getAndIncrement()); } } @@ -482,10 +497,15 @@ private void drainAckQueue() { acksBySubscription.computeIfAbsent(ackFuture.subscription, k -> new ArrayList<>()); acks.add(ackFuture.position); } + } catch (InterruptedException exit) { + Thread.currentThread().interrupt(); + log.info("JMSPublishFilter Ack queue draining interrupted"); + } catch (Throwable error) { + log.error("Error while draining ack queue", error); } finally { // continue draining the queue if (!closed.get()) { - executor.submit(drainAckQueueTask); + drainAckQueueExecutor.submit(drainAckQueueTask); } } for (Map.Entry> entry : acksBySubscription.entrySet()) { @@ -493,7 +513,7 @@ private void drainAckQueue() { Subscription subscription = entry.getKey(); PersistentTopic topic = (PersistentTopic) subscription.getTopic(); if (!isTopicOwned(topic)) { - return; + continue; } try { List acks = entry.getValue(); @@ -504,9 +524,6 @@ private void drainAckQueue() { .observe(System.nanoTime() - now); } } - } catch (InterruptedException exit) { - Thread.currentThread().interrupt(); - log.info("JMSPublishFilter Ack queue draining interrupted"); } catch (Throwable error) { log.error("Error while draining ack queue", error); } @@ -575,7 +592,12 @@ public void close() { log.info("Broker is shutting down. Disabling JMSPublishFilters interceptor"); closed.set(true); filter.close(); - executor.shutdown(); + if (executor != null) { + executor.shutdown(); + } + if (drainAckQueueExecutor != null) { + drainAckQueueExecutor.shutdown(); + } } @Override diff --git a/pulsar-jms-filters/src/test/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCacheTest.java b/pulsar-jms-filters/src/test/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCacheTest.java new file mode 100644 index 00000000..42e3491c --- /dev/null +++ b/pulsar-jms-filters/src/test/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCacheTest.java @@ -0,0 +1,35 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms.selectors; + +import static org.junit.jupiter.api.Assertions.*; + +import org.apache.pulsar.common.api.proto.MessageMetadata; + +class MessageMetadataCacheTest { + + @org.junit.jupiter.api.Test + void testGetProperty() { + MessageMetadata metadata = new MessageMetadata(); + metadata.addProperty().setKey("foo").setValue("bar"); + metadata.addProperty().setKey("i_jsmtype").setValue("int"); + metadata.addProperty().setKey("i").setValue("5"); + MessageMetadataCache cache = new MessageMetadataCache(metadata); + assertNull(cache.getProperty("key")); + assertEquals("bar", cache.getProperty("foo")); + assertEquals(5, cache.getProperty("i")); + } +}