Skip to content

Commit

Permalink
JMSPublishFilter: improve memory footprint and impose a limit (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored May 7, 2024
1 parent 9a48d2b commit 05a0021
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@
*/
package com.datastax.oss.pulsar.jms.selectors;

import static org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist;
import static org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand All @@ -47,7 +55,6 @@
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.protocol.Commands;

@Slf4j
public class JMSPublishFilters implements BrokerInterceptor {
Expand All @@ -68,8 +75,21 @@ public class JMSPublishFilters implements BrokerInterceptor {
.labelNames("topic", "subscription")
.create();

private static final Gauge memoryUsed =
Gauge.build()
.name("pulsar_jmsfilter_processing_memory")
.help("Current memory held by the JMSPublishFilters interceptor")
.create();

private static final Gauge pendingOperations =
Gauge.build()
.name("pulsar_jmsfilter_processing_pending_operations")
.help("Number of pending operations in the JMSPublishFilters interceptor")
.create();

private final JMSFilter filter = new JMSFilter(false);
private boolean enabled = false;
private Semaphore memoryLimit;
private final AtomicBoolean closed = new AtomicBoolean();

private static final Field dispatchMessagesThreadFieldPersistentDispatcherMultipleConsumers;
Expand Down Expand Up @@ -110,10 +130,26 @@ public void initialize(PulsarService pulsarService) {
log.info("Registering JMSFilter metrics");
CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnPublish);
CollectorRegistry.defaultRegistry.register(filterProcessingTimeOnProduce);
CollectorRegistry.defaultRegistry.register(memoryUsed);
CollectorRegistry.defaultRegistry.register(pendingOperations);
} catch (IllegalArgumentException alreadyRegistered) {
// ignore
log.info("Filter metrics already registered", alreadyRegistered);
}
String memoryLimitString =
pulsarService
.getConfiguration()
.getProperties()
.getProperty("jmsFiltersOnPublishMaxMemoryMB", "128");

try {
int memoryLimitBytes = Integer.parseInt(memoryLimitString) * 1024 * 1024;
memoryLimit = new Semaphore(memoryLimitBytes);
log.info("jmsFiltersOnPublishMaxMemoryMB={} ({} bytes)", memoryLimitString, memoryLimitBytes);
} catch (NumberFormatException e) {
throw new RuntimeException(
"Invalid memory limit jmsFiltersOnPublishMaxMemoryMB=" + memoryLimitString, e);
}
}

@Override
Expand All @@ -130,20 +166,11 @@ public void onMessagePublish(
long now = System.nanoTime();
try {
for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
if (!(subscription instanceof PersistentSubscription)) {
if (!(isPersistentSubscriptionWithSelector(subscription))) {
continue;
}
Map<String, String> subscriptionProperties = subscription.getSubscriptionProperties();
if (!subscriptionProperties.containsKey("jms.selector")) {
continue;
}

// we must make a copy because the ByteBuf will be released
MessageMetadata messageMetadata =
new MessageMetadata()
.copyFrom(
Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1));

ByteBuf messageMetadata = copyMessageMetadataAndAcquireMemory(headersAndPayload);
publishContext.setProperty(JMS_FILTER_METADATA, messageMetadata);
// as soon as we find a good reason to apply the filters in messageProduced
// we can exit
Expand All @@ -156,6 +183,24 @@ public void onMessagePublish(
}
}

public ByteBuf copyMessageMetadataAndAcquireMemory(ByteBuf buffer) {
int readerIndex = buffer.readerIndex();
skipBrokerEntryMetadataIfExist(buffer);
skipChecksumIfPresent(buffer);
int metadataSize = (int) buffer.readUnsignedInt();
// this is going to throttle the producer if the memory limit is reached
// please note that this is a blocking operation on the Netty eventpool
// currently we cannot do better than this, as the interceptor API is blocking
memoryLimit.acquireUninterruptibly(metadataSize);
// please note that Netty would probably retain more memory than this buffer
// but this is the best approximation we can do
memoryUsed.inc(metadataSize);
ByteBuf copy = PooledByteBufAllocator.DEFAULT.buffer(metadataSize);
buffer.readBytes(copy);
buffer.readerIndex(readerIndex);
return copy;
}

@Override
public void messageProduced(
ServerCnx cnx,
Expand All @@ -164,38 +209,80 @@ public void messageProduced(
long ledgerId,
long entryId,
Topic.PublishContext publishContext) {
if (!enabled) {
ByteBuf messageMetadataUnparsed = (ByteBuf) publishContext.getProperty(JMS_FILTER_METADATA);
if (messageMetadataUnparsed == null) {
return;
}
MessageMetadata messageMetadata =
(MessageMetadata) publishContext.getProperty(JMS_FILTER_METADATA);
if (messageMetadata == null) {
if (!enabled) {
return;
}
if (messageMetadata.hasNumMessagesInBatch()) {
return;
int memorySize = messageMetadataUnparsed.readableBytes();
AtomicInteger pending = new AtomicInteger(1);
Consumer<Boolean> onComplete =
(mainThread) -> {
if (!mainThread) {
// the main thread doesn't count as a pending operation
pendingOperations.dec();
}
if (pending.decrementAndGet() == 0) {
messageMetadataUnparsed.release();
memoryLimit.release(memorySize);
memoryUsed.dec(memorySize);
}
};
try {
producer
.getTopic()
.getSubscriptions()
.forEach(
(___, subscription) -> {
if (!(isPersistentSubscriptionWithSelector(subscription))) {
return;
}
pending.incrementAndGet();
pendingOperations.inc();
ByteBuf duplicate = messageMetadataUnparsed.duplicate();
FilterAndAckMessageOperation filterAndAckMessageOperation =
new FilterAndAckMessageOperation(
ledgerId, entryId, subscription, duplicate, onComplete);
scheduleOnDispatchThread(subscription, filterAndAckMessageOperation);
});
} finally {
onComplete.accept(true);
}
}

private static boolean isPersistentSubscriptionWithSelector(Subscription subscription) {
return subscription instanceof PersistentSubscription
&& subscription.getSubscriptionProperties().containsKey("jms.selector");
}

for (Subscription subscription : producer.getTopic().getSubscriptions().values()) {
scheduleOnDispatchThread(
subscription,
() -> {
filterAndAckMessage(producer, ledgerId, entryId, subscription, messageMetadata);
});
@AllArgsConstructor
private class FilterAndAckMessageOperation implements Runnable {
final long ledgerId;
final long entryId;
final Subscription subscription;
final ByteBuf messageMetadataUnparsed;
final Consumer<Boolean> onComplete;

@Override
public void run() {
try {
filterAndAckMessage(ledgerId, entryId, subscription, messageMetadataUnparsed);
} finally {
onComplete.accept(false);
}
}
}

private void filterAndAckMessage(
Producer producer,
long ledgerId,
long entryId,
Subscription subscription,
MessageMetadata messageMetadata) {
long ledgerId, long entryId, Subscription subscription, ByteBuf messageMetadataUnparsed) {
if (closed.get()) {
// the broker is shutting down, we cannot process the entries
// this operation has been enqueued before the broker shutdown
return;
}
MessageMetadata messageMetadata = getMessageMetadata(messageMetadataUnparsed);
long now = System.nanoTime();
try {
FilterContext filterContext = new FilterContext();
Expand All @@ -222,11 +309,17 @@ private void filterAndAckMessage(
}
} finally {
filterProcessingTimeOnProduce
.labels(producer.getTopic().getName(), subscription.getName())
.labels(subscription.getTopic().getName(), subscription.getName())
.observe(System.nanoTime() - now);
}
}

private static MessageMetadata getMessageMetadata(ByteBuf messageMetadataUnparsed) {
MessageMetadata messageMetadata = new MessageMetadata();
messageMetadata.parseFrom(messageMetadataUnparsed, messageMetadataUnparsed.readableBytes());
return messageMetadata;
}

private static void scheduleOnDispatchThread(Subscription subscription, Runnable runnable) {
try {
Dispatcher dispatcher = subscription.getDispatcher();
Expand Down
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
8 changes: 4 additions & 4 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<mkdir dir="${project.build.outputDirectory}/interceptors" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<mkdir dir="${project.build.outputDirectory}/interceptors"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import java.util.concurrent.CompletableFuture;
import javax.jms.CompletionListener;
import javax.jms.Message;

/** Utility class to convert a CompletionListener into a CompletableFuture. */
public class CompletableFutureCompletionListener extends CompletableFuture<Message>
implements CompletionListener {

@Override
public void onCompletion(Message message) {
complete(message);
}

@Override
public void onException(Message message, Exception exception) {
completeExceptionally(exception);
}
}
Loading

0 comments on commit 05a0021

Please sign in to comment.