Skip to content

Commit

Permalink
JMS Selectors: apply selectors on the write path
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Apr 11, 2024
1 parent 5041029 commit 872ac5e
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms.selectors;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.FilterContext;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MarkerType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.protocol.Commands;

@Slf4j
public class WritePathSelectors implements BrokerInterceptor {

private final JMSFilter filter = new JMSFilter();

@Override
public void initialize(PulsarService pulsarService) throws Exception {
log.info("initialize");
}

@Override
public void onMessagePublish(
Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) {
log.info("onMessagePublish {} {}", producer.getProducerName(), producer.getTopic().getName());
MessageMetadata messageMetadata =
Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1);
if (messageMetadata.hasNumMessagesInBatch() && messageMetadata.getNumMessagesInBatch() > 0) {
log.info("Skip batch message");
return;
}
if (messageMetadata.hasMarkerType()
&& messageMetadata.getMarkerType() != MarkerType.UNKNOWN_MARKER_VALUE) {
log.info("Skip marker message");
return;
}

producer
.getTopic()
.getSubscriptions()
.forEach(
(name, subscription) -> {
Map<String, String> subscriptionProperties = subscription.getSubscriptionProperties();
FilterContext filterContext = new FilterContext();
filterContext.setSubscription(subscription);
filterContext.setMsgMetadata(messageMetadata);
filterContext.setConsumer(null);
Entry entry = null; // we would need the Entry only in case of batch messages
EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext);
log.info("Filter result for subscription {} is {}", name, filterResult);
String property = "filter-result-" + name + "@" + subscription.getTopicName();
publishContext.setProperty(property, filterResult);
});
}

@Override
public void messageProduced(
ServerCnx cnx,
Producer producer,
long startTimeNs,
long ledgerId,
long entryId,
Topic.PublishContext publishContext) {
log.info(
"messageProduced {} {}: {}:{}",
producer.getProducerName(),
producer.getTopic().getName(),
ledgerId,
entryId);
producer
.getTopic()
.getSubscriptions()
.forEach(
(name, subscription) -> {
String property = "filter-result-" + name + "@" + subscription.getTopicName();
EntryFilter.FilterResult filterResult =
(EntryFilter.FilterResult) publishContext.getProperty(property);
log.info(
"Filter result for subscription {} is {} for entry {}:{}",
name,
filterResult,
ledgerId,
entryId);
if (filterResult == EntryFilter.FilterResult.REJECT) {
log.info("Reject message {}:{} for subscription {}", ledgerId, entryId, name);
subscription.acknowledgeMessage(
Arrays.asList(new PositionImpl(ledgerId, entryId)),
CommandAck.AckType.Individual,
null);
}
});
}

@Override
public void close() {
filter.close();
}

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {}

@Override
public void onConnectionClosed(ServerCnx cnx) {}

@Override
public void onWebserviceRequest(ServletRequest request)
throws IOException, ServletException, InterceptException {}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response)
throws IOException, ServletException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
interceptorClass: com.datastax.oss.pulsar.jms.selectors.WritePathSelectors
name: jms-write-path-selectors
description: Starlight for JMS - support for server side filters on the write path
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
6 changes: 4 additions & 2 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,10 @@
<configuration>
<target>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<mkdir dir="${project.build.outputDirectory}/interceptors"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/interceptors/jms-filter.nar"/>
</target>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ public void testConsumerPriorityQueue(String mapping) throws Exception {
try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) {
Queue destination = session.createQueue("test-" + UUID.randomUUID());

pulsarContainer.getAdmin()
pulsarContainer
.getAdmin()
.topics()
.createPartitionedTopic(factory.getPulsarTopicName(destination), 10);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.pulsar.jms;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.TextMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;

@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class WritePathSelectorsTest {

@RegisterExtension
static PulsarContainerExtension pulsarContainer =
new PulsarContainerExtension()
.withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false")
.withEnv("PULSAR_PREFIX_brokerInterceptorsDirectory", "/pulsar/interceptors")
.withEnv("PULSAR_PREFIX_brokerInterceptors", "jms-write-path-selectors")
.withEnv("PULSAR_LOG_LEVEL", "info");

private Map<String, Object> buildProperties() {
Map<String, Object> properties = pulsarContainer.buildJMSConnectionProperties();
properties.put("jms.useServerSideFiltering", true);
properties.put("jms.enableClientSideEmulation", false);

Map<String, Object> producerConfig = new HashMap<>();
producerConfig.put("batchingEnabled", false);
properties.put("producerConfig", producerConfig);
return properties;
}

@Test
public void sendMessageReceiveFromQueue() throws Exception {
Map<String, Object> properties = buildProperties();

String topicName = "persistent://public/default/test-" + UUID.randomUUID();
try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) {
try (PulsarConnection connection = factory.createConnection()) {
connection.start();
try (PulsarSession session = connection.createSession(); ) {
Queue destination = session.createQueue(topicName);

try (PulsarMessageConsumer consumer1 = session.createConsumer(destination); ) {
assertEquals(
SubscriptionType.Shared, ((PulsarMessageConsumer) consumer1).getSubscriptionType());

String newSelector = "lastMessage=TRUE";
Map<String, String> subscriptionProperties = new HashMap<>();
subscriptionProperties.put("jms.selector", newSelector);
subscriptionProperties.put("jms.filtering", "true");

pulsarContainer
.getAdmin()
.topics()
.updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties);

try (MessageProducer producer = session.createProducer(destination); ) {
for (int i = 0; i < 10; i++) {
TextMessage textMessage = session.createTextMessage("foo-" + i);
if (i == 9) {
textMessage.setBooleanProperty("lastMessage", true);
}
producer.send(textMessage);
}
}

TextMessage textMessage = (TextMessage) consumer1.receive();
assertEquals("foo-9", textMessage.getText());

assertEquals(1, consumer1.getReceivedMessages());
assertEquals(0, consumer1.getSkippedMessages());

// no more messages
assertNull(consumer1.receiveNoWait());

// ensure that the filter didn't reject any message while dispatching to the consumer
// because the filter has been already applied on the write path
TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName);
SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue");
assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1);
assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0);
assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

@Slf4j
public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback {
public static final String PULSAR_IMAGE = "apachepulsar/pulsar:3.0.0";
public static final String PULSAR_IMAGE = "datastax/lunastreaming:3.1_3.1";
private PulsarContainer pulsarContainer;
private Consumer<PulsarContainerExtension> onContainerReady;
private Map<String, String> env = new HashMap<>();
Expand Down Expand Up @@ -72,8 +72,11 @@ public void afterAll(ExtensionContext extensionContext) {
public void beforeAll(ExtensionContext extensionContext) {
network = Network.newNetwork();
CountDownLatch pulsarReady = new CountDownLatch(1);
log.info("ENV: {}", env);
pulsarContainer =
new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE))
new PulsarContainer(
DockerImageName.parse(PULSAR_IMAGE)
.asCompatibleSubstituteFor("apachepulsar/pulsar"))
.withNetwork(network)
.withEnv(env)
.withLogConsumer(
Expand All @@ -82,10 +85,12 @@ public void beforeAll(ExtensionContext extensionContext) {
if (text.contains("messaging service is ready")) {
pulsarReady.countDown();
}
log.debug(text);
log.info(text);
})
.withCopyFileToContainer(
MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters");
MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters")
.withCopyFileToContainer(
MountableFile.forHostPath("target/classes/interceptors"), "/pulsar/interceptors");
// start Pulsar and wait for it to be ready to accept requests
pulsarContainer.start();
assertTrue(pulsarReady.await(1, TimeUnit.MINUTES));
Expand Down

0 comments on commit 872ac5e

Please sign in to comment.