Skip to content

Commit

Permalink
JMSPublishFilters: fix metadata cache and introduce new threadpool fo…
Browse files Browse the repository at this point in the history
…r acks
  • Loading branch information
eolivelli committed May 14, 2024
1 parent 23072de commit 534343b
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ static Object getProperty(Map<String, String> cacheProperties, String name) {
// we pre-compute the type in order to avoid to scan the list to fine the type
String type = SYSTEM_PROPERTIES_TYPES.get(name);
if (type == null) {
type = propertyType(name);
type = cacheProperties.get(propertyType(name));
}
String value = cacheProperties.get(name);
return getObjectProperty(value, type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -48,7 +49,6 @@
import javax.servlet.ServletResponse;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.collections.BatchedArrayBlockingQueue;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand Down Expand Up @@ -140,8 +140,9 @@ public class JMSPublishFilters implements BrokerInterceptor {
private Semaphore memoryLimit;
private final AtomicBoolean closed = new AtomicBoolean();
private ExecutorService executor;
private final BlockingQueue<AckFuture> ackQueue = new BatchedArrayBlockingQueue<>(100000);
private final BlockingQueue<AckFuture> ackQueue = new ArrayBlockingQueue<>(100000);
private final Runnable drainAckQueueTask = SafeRunnable.safeRun(this::drainAckQueue);
private ExecutorService drainAckQueueExecutor;

@Override
public void initialize(PulsarService pulsarService) {
Expand All @@ -164,7 +165,19 @@ public void initialize(PulsarService pulsarService) {
"jmsFiltersOnPublishThreads",
(Runtime.getRuntime().availableProcessors() * 4) + ""));
log.info("jmsFiltersOnPublishThreads={}", numThreads);
executor = Executors.newFixedThreadPool(numThreads, new WorkersThreadFactory());
executor =
Executors.newFixedThreadPool(numThreads, new WorkersThreadFactory("jms-filters-workers-"));
int numThreadsAcks =
Integer.parseInt(
pulsarService
.getConfiguration()
.getProperties()
.getProperty(
"jmsFiltersOnPublishAckThreads",
(Math.max(2, Runtime.getRuntime().availableProcessors() / 2)) + ""));
log.info("jmsFiltersOnPublishAckThreads={}", numThreadsAcks);
drainAckQueueExecutor =
Executors.newFixedThreadPool(numThreadsAcks, new WorkersThreadFactory("jms-filters-acks-"));
try {
log.info("Registering JMSFilter metrics");
CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnPublish);
Expand Down Expand Up @@ -203,7 +216,7 @@ public void initialize(PulsarService pulsarService) {
}

// start the ack queue draining
executor.submit(drainAckQueueTask);
drainAckQueueExecutor.submit(drainAckQueueTask);
}

@Override
Expand Down Expand Up @@ -323,12 +336,14 @@ private static boolean isPersistentSubscriptionWithSelector(Subscription subscri
&& "true".equals(subscription.getSubscriptionProperties().get("jms.filtering"));
}

@AllArgsConstructor
private static class WorkersThreadFactory implements ThreadFactory {
private static final AtomicInteger THREAD_COUNT = new AtomicInteger();
private final String name;

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "jms-filters-workers-" + THREAD_COUNT.getAndIncrement());
return new Thread(r, name + THREAD_COUNT.getAndIncrement());
}
}

Expand Down Expand Up @@ -482,18 +497,23 @@ private void drainAckQueue() {
acksBySubscription.computeIfAbsent(ackFuture.subscription, k -> new ArrayList<>());
acks.add(ackFuture.position);
}
} catch (InterruptedException exit) {
Thread.currentThread().interrupt();
log.info("JMSPublishFilter Ack queue draining interrupted");
} catch (Throwable error) {
log.error("Error while draining ack queue", error);
} finally {
// continue draining the queue
if (!closed.get()) {
executor.submit(drainAckQueueTask);
drainAckQueueExecutor.submit(drainAckQueueTask);
}
}
for (Map.Entry<Subscription, List<Position>> entry : acksBySubscription.entrySet()) {
long now = System.nanoTime();
Subscription subscription = entry.getKey();
PersistentTopic topic = (PersistentTopic) subscription.getTopic();
if (!isTopicOwned(topic)) {
return;
continue;
}
try {
List<Position> acks = entry.getValue();
Expand All @@ -504,9 +524,6 @@ private void drainAckQueue() {
.observe(System.nanoTime() - now);
}
}
} catch (InterruptedException exit) {
Thread.currentThread().interrupt();
log.info("JMSPublishFilter Ack queue draining interrupted");
} catch (Throwable error) {
log.error("Error while draining ack queue", error);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms.selectors;

import static org.junit.jupiter.api.Assertions.*;

import org.apache.pulsar.common.api.proto.MessageMetadata;

class MessageMetadataCacheTest {

@org.junit.jupiter.api.Test
void testGetProperty() {
MessageMetadata metadata = new MessageMetadata();
metadata.addProperty().setKey("foo").setValue("bar");
metadata.addProperty().setKey("i_jsmtype").setValue("int");
metadata.addProperty().setKey("i").setValue("5");
MessageMetadataCache cache = new MessageMetadataCache(metadata);
assertNull(cache.getProperty("key"));
assertEquals("bar", cache.getProperty("foo"));
assertEquals(5, cache.getProperty("i"));
}
}

0 comments on commit 534343b

Please sign in to comment.