diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/WritePathSelectors.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/WritePathSelectors.java new file mode 100644 index 00000000..319cce30 --- /dev/null +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/WritePathSelectors.java @@ -0,0 +1,142 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms.selectors; + +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.broker.service.plugin.FilterContext; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.MarkerType; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.protocol.Commands; + +@Slf4j +public class WritePathSelectors implements BrokerInterceptor { + + private final JMSFilter filter = new JMSFilter(); + + @Override + public void initialize(PulsarService pulsarService) throws Exception { + log.info("initialize"); + } + + @Override + public void onMessagePublish( + Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { + log.info("onMessagePublish {} {}", producer.getProducerName(), producer.getTopic().getName()); + MessageMetadata messageMetadata = + Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1); + if (messageMetadata.hasNumMessagesInBatch() && messageMetadata.getNumMessagesInBatch() > 0) { + log.info("Skip batch message"); + return; + } + if (messageMetadata.hasMarkerType() + && messageMetadata.getMarkerType() != MarkerType.UNKNOWN_MARKER_VALUE) { + log.info("Skip marker message"); + return; + } + + producer + .getTopic() + .getSubscriptions() + .forEach( + (name, subscription) -> { + Map subscriptionProperties = subscription.getSubscriptionProperties(); + FilterContext filterContext = new FilterContext(); + filterContext.setSubscription(subscription); + filterContext.setMsgMetadata(messageMetadata); + filterContext.setConsumer(null); + Entry entry = null; // we would need the Entry only in case of batch messages + EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext); + log.info("Filter result for subscription {} is {}", name, filterResult); + String property = "filter-result-" + name + "@" + subscription.getTopicName(); + publishContext.setProperty(property, filterResult); + }); + } + + @Override + public void messageProduced( + ServerCnx cnx, + Producer producer, + long startTimeNs, + long ledgerId, + long entryId, + Topic.PublishContext publishContext) { + log.info( + "messageProduced {} {}: {}:{}", + producer.getProducerName(), + producer.getTopic().getName(), + ledgerId, + entryId); + producer + .getTopic() + .getSubscriptions() + .forEach( + (name, subscription) -> { + String property = "filter-result-" + name + "@" + subscription.getTopicName(); + EntryFilter.FilterResult filterResult = + (EntryFilter.FilterResult) publishContext.getProperty(property); + log.info( + "Filter result for subscription {} is {} for entry {}:{}", + name, + filterResult, + ledgerId, + entryId); + if (filterResult == EntryFilter.FilterResult.REJECT) { + log.info("Reject message {}:{} for subscription {}", ledgerId, entryId, name); + subscription.acknowledgeMessage( + Arrays.asList(new PositionImpl(ledgerId, entryId)), + CommandAck.AckType.Individual, + null); + } + }); + } + + @Override + public void close() { + filter.close(); + } + + @Override + public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {} + + @Override + public void onConnectionClosed(ServerCnx cnx) {} + + @Override + public void onWebserviceRequest(ServletRequest request) + throws IOException, ServletException, InterceptException {} + + @Override + public void onWebserviceResponse(ServletRequest request, ServletResponse response) + throws IOException, ServletException {} +} diff --git a/pulsar-jms-filters/src/main/resources/META-INF/services/broker_interceptor.yml b/pulsar-jms-filters/src/main/resources/META-INF/services/broker_interceptor.yml new file mode 100644 index 00000000..2e68d955 --- /dev/null +++ b/pulsar-jms-filters/src/main/resources/META-INF/services/broker_interceptor.yml @@ -0,0 +1,3 @@ +interceptorClass: com.datastax.oss.pulsar.jms.selectors.WritePathSelectors +name: jms-write-path-selectors +description: Starlight for JMS - support for server side filters on the write path \ No newline at end of file diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 6ac0391a..b7a535ab 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -103,8 +103,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 9d14ad38..9f58373a 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -128,8 +128,10 @@ copy filters - - + + + + diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java index 2d339d60..fc287996 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java @@ -506,7 +506,8 @@ public void testConsumerPriorityQueue(String mapping) throws Exception { try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) { Queue destination = session.createQueue("test-" + UUID.randomUUID()); - pulsarContainer.getAdmin() + pulsarContainer + .getAdmin() .topics() .createPartitionedTopic(factory.getPulsarTopicName(destination), 10); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/WritePathSelectorsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/WritePathSelectorsTest.java new file mode 100644 index 00000000..b4f38000 --- /dev/null +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/WritePathSelectorsTest.java @@ -0,0 +1,115 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.TextMessage; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; + +@Slf4j +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class WritePathSelectorsTest { + + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false") + .withEnv("PULSAR_PREFIX_brokerInterceptorsDirectory", "/pulsar/interceptors") + .withEnv("PULSAR_PREFIX_brokerInterceptors", "jms-write-path-selectors") + .withEnv("PULSAR_LOG_LEVEL", "info"); + + private Map buildProperties() { + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("jms.useServerSideFiltering", true); + properties.put("jms.enableClientSideEmulation", false); + + Map producerConfig = new HashMap<>(); + producerConfig.put("batchingEnabled", false); + properties.put("producerConfig", producerConfig); + return properties; + } + + @Test + public void sendMessageReceiveFromQueue() throws Exception { + Map properties = buildProperties(); + + String topicName = "persistent://public/default/test-" + UUID.randomUUID(); + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (PulsarConnection connection = factory.createConnection()) { + connection.start(); + try (PulsarSession session = connection.createSession(); ) { + Queue destination = session.createQueue(topicName); + + try (PulsarMessageConsumer consumer1 = session.createConsumer(destination); ) { + assertEquals( + SubscriptionType.Shared, ((PulsarMessageConsumer) consumer1).getSubscriptionType()); + + String newSelector = "lastMessage=TRUE"; + Map subscriptionProperties = new HashMap<>(); + subscriptionProperties.put("jms.selector", newSelector); + subscriptionProperties.put("jms.filtering", "true"); + + pulsarContainer + .getAdmin() + .topics() + .updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties); + + try (MessageProducer producer = session.createProducer(destination); ) { + for (int i = 0; i < 10; i++) { + TextMessage textMessage = session.createTextMessage("foo-" + i); + if (i == 9) { + textMessage.setBooleanProperty("lastMessage", true); + } + producer.send(textMessage); + } + } + + TextMessage textMessage = (TextMessage) consumer1.receive(); + assertEquals("foo-9", textMessage.getText()); + + assertEquals(1, consumer1.getReceivedMessages()); + assertEquals(0, consumer1.getSkippedMessages()); + + // no more messages + assertNull(consumer1.receiveNoWait()); + + // ensure that the filter didn't reject any message while dispatching to the consumer + // because the filter has been already applied on the write path + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); + SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); + assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1); + assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); + assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); + } + } + } + } + } +} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java index 3baadb25..52fd1d7a 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java @@ -35,7 +35,7 @@ @Slf4j public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback { - public static final String PULSAR_IMAGE = "apachepulsar/pulsar:3.0.0"; + public static final String PULSAR_IMAGE = "datastax/lunastreaming:3.1_3.1"; private PulsarContainer pulsarContainer; private Consumer onContainerReady; private Map env = new HashMap<>(); @@ -72,8 +72,11 @@ public void afterAll(ExtensionContext extensionContext) { public void beforeAll(ExtensionContext extensionContext) { network = Network.newNetwork(); CountDownLatch pulsarReady = new CountDownLatch(1); + log.info("ENV: {}", env); pulsarContainer = - new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE)) + new PulsarContainer( + DockerImageName.parse(PULSAR_IMAGE) + .asCompatibleSubstituteFor("apachepulsar/pulsar")) .withNetwork(network) .withEnv(env) .withLogConsumer( @@ -82,10 +85,12 @@ public void beforeAll(ExtensionContext extensionContext) { if (text.contains("messaging service is ready")) { pulsarReady.countDown(); } - log.debug(text); + log.info(text); }) .withCopyFileToContainer( - MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters"); + MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters") + .withCopyFileToContainer( + MountableFile.forHostPath("target/classes/interceptors"), "/pulsar/interceptors"); // start Pulsar and wait for it to be ready to accept requests pulsarContainer.start(); assertTrue(pulsarReady.await(1, TimeUnit.MINUTES));