diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java new file mode 100644 index 00000000..a2f5975c --- /dev/null +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -0,0 +1,153 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms.selectors; + +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.broker.service.plugin.FilterContext; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.intercept.InterceptException; +import org.apache.pulsar.common.protocol.Commands; + +@Slf4j +public class JMSPublishFilters implements BrokerInterceptor { + private static final String JMS_FILTERED_PROPERTY = "jms-filtered"; + private final JMSFilter filter = new JMSFilter(); + private boolean enabled = false; + + @Override + public void initialize(PulsarService pulsarService) { + enabled = + Boolean.parseBoolean( + pulsarService + .getConfiguration() + .getProperties() + .getProperty("jmsApplyFiltersOnPublish", "true")); + log.info("jmsApplyFiltersOnPublish={}", enabled); + } + + @Override + public void onMessagePublish( + Producer producer, ByteBuf headersAndPayload, Topic.PublishContext publishContext) { + if (!enabled) { + return; + } + if (publishContext.isMarkerMessage() + || publishContext.isChunked() + || publishContext.getNumberOfMessages() > 1) { + return; + } + + MessageMetadata messageMetadata = + Commands.peekMessageMetadata(headersAndPayload, "jms-filter-on-publish", -1); + if (messageMetadata.hasNumMessagesInBatch()) { + return; + } + producer + .getTopic() + .getSubscriptions() + .forEach( + (name, subscription) -> { + if (!(subscription instanceof PersistentSubscription)) { + return; + } + Map subscriptionProperties = subscription.getSubscriptionProperties(); + if (!subscriptionProperties.containsKey("jms.selector")) { + return; + } + FilterContext filterContext = new FilterContext(); + filterContext.setSubscription(subscription); + filterContext.setMsgMetadata(messageMetadata); + filterContext.setConsumer(null); + Entry entry = null; // we would need the Entry only in case of batch messages + EntryFilter.FilterResult filterResult = filter.filterEntry(entry, filterContext); + if (filterResult == EntryFilter.FilterResult.REJECT) { + String property = "filter-result-" + name + "@" + subscription.getTopicName(); + publishContext.setProperty(property, filterResult); + publishContext.setProperty(JMS_FILTERED_PROPERTY, true); + } + }); + } + + @Override + public void messageProduced( + ServerCnx cnx, + Producer producer, + long startTimeNs, + long ledgerId, + long entryId, + Topic.PublishContext publishContext) { + if (!enabled || publishContext.getProperty(JMS_FILTERED_PROPERTY) == null) { + return; + } + producer + .getTopic() + .getSubscriptions() + .forEach( + (name, subscription) -> { + String property = "filter-result-" + name + "@" + subscription.getTopicName(); + EntryFilter.FilterResult filterResult = + (EntryFilter.FilterResult) publishContext.getProperty(property); + if (filterResult == EntryFilter.FilterResult.REJECT) { + if (log.isDebugEnabled()) { + log.debug("Reject message {}:{} for subscription {}", ledgerId, entryId, name); + } + // ir is possible that calling this method in this thread may affect performance + // let's keep it simple for now, we can optimize it later + subscription.acknowledgeMessage( + Collections.singletonList(new PositionImpl(ledgerId, entryId)), + CommandAck.AckType.Individual, + null); + } + }); + } + + @Override + public void close() { + filter.close(); + } + + @Override + public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {} + + @Override + public void onConnectionClosed(ServerCnx cnx) {} + + @Override + public void onWebserviceRequest(ServletRequest request) + throws IOException, ServletException, InterceptException {} + + @Override + public void onWebserviceResponse(ServletRequest request, ServletResponse response) + throws IOException, ServletException {} +} diff --git a/pulsar-jms-filters/src/main/resources/META-INF/services/broker_interceptor.yml b/pulsar-jms-filters/src/main/resources/META-INF/services/broker_interceptor.yml new file mode 100644 index 00000000..fc7620ac --- /dev/null +++ b/pulsar-jms-filters/src/main/resources/META-INF/services/broker_interceptor.yml @@ -0,0 +1,3 @@ +interceptorClass: com.datastax.oss.pulsar.jms.selectors.JMSPublishFilters +name: jms-publish-filters +description: Starlight for JMS - support for server side filters on the publish path \ No newline at end of file diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 6ac0391a..b7a535ab 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -103,8 +103,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index 9d14ad38..9f58373a 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -128,8 +128,10 @@ copy filters - - + + + + diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java new file mode 100644 index 00000000..117b6e84 --- /dev/null +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/JMSPublishFiltersTest.java @@ -0,0 +1,128 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.pulsar.jms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.datastax.oss.pulsar.jms.utils.PulsarContainerExtension; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.TextMessage; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; + +@Slf4j +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class JMSPublishFiltersTest { + + @RegisterExtension + static PulsarContainerExtension pulsarContainer = + new PulsarContainerExtension() + .withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "false") + .withEnv("PULSAR_PREFIX_brokerInterceptorsDirectory", "/pulsar/interceptors") + .withEnv("PULSAR_PREFIX_brokerInterceptors", "jms-publish-filters") + .withEnv("PULSAR_PREFIX_jmsApplyFiltersOnPublish", "true") + .withLogContainerOutput(true); + + private Map buildProperties() { + Map properties = pulsarContainer.buildJMSConnectionProperties(); + properties.put("jms.useServerSideFiltering", true); + properties.put("jms.enableClientSideEmulation", false); + + Map producerConfig = new HashMap<>(); + producerConfig.put("batchingEnabled", false); + properties.put("producerConfig", producerConfig); + return properties; + } + + @Test + public void sendMessageReceiveFromQueue() throws Exception { + Map properties = buildProperties(); + + String topicName = "persistent://public/default/test-" + UUID.randomUUID(); + try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { + try (PulsarConnection connection = factory.createConnection()) { + connection.start(); + try (PulsarSession session = connection.createSession(); ) { + Queue destination = session.createQueue(topicName); + + try (PulsarMessageConsumer consumer1 = session.createConsumer(destination); ) { + assertEquals( + SubscriptionType.Shared, ((PulsarMessageConsumer) consumer1).getSubscriptionType()); + + String newSelector = "lastMessage=TRUE"; + Map subscriptionProperties = new HashMap<>(); + subscriptionProperties.put("jms.selector", newSelector); + subscriptionProperties.put("jms.filtering", "true"); + + pulsarContainer + .getAdmin() + .topics() + .updateSubscriptionProperties(topicName, "jms-queue", subscriptionProperties); + + try (MessageProducer producer = session.createProducer(destination); ) { + for (int i = 0; i < 10; i++) { + TextMessage textMessage = session.createTextMessage("foo-" + i); + if (i == 9) { + textMessage.setBooleanProperty("lastMessage", true); + } + producer.send(textMessage); + } + } + + TextMessage textMessage = (TextMessage) consumer1.receive(); + assertEquals("foo-9", textMessage.getText()); + + assertEquals(1, consumer1.getReceivedMessages()); + assertEquals(0, consumer1.getSkippedMessages()); + + // no more messages + assertNull(consumer1.receiveNoWait()); + + // ensure that the filter didn't reject any message while dispatching to the consumer + // because the filter has been already applied on the write path + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); + SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); + assertEquals(subscriptionStats.getFilterProcessedMsgCount(), 1); + assertEquals(subscriptionStats.getFilterRejectedMsgCount(), 0); + assertEquals(subscriptionStats.getFilterAcceptedMsgCount(), 1); + } + + // create a message that doesn't match the filter + // verify that the back log is accurate (0) + + try (MessageProducer producer = session.createProducer(destination); ) { + TextMessage textMessage = session.createTextMessage("backlog"); + producer.send(textMessage); + + TopicStats stats = pulsarContainer.getAdmin().topics().getStats(topicName); + SubscriptionStats subscriptionStats = stats.getSubscriptions().get("jms-queue"); + assertEquals(0, subscriptionStats.getMsgBacklog()); + } + } + } + } + } +} diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java index 2d339d60..fc287996 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/PriorityTest.java @@ -506,7 +506,8 @@ public void testConsumerPriorityQueue(String mapping) throws Exception { try (Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); ) { Queue destination = session.createQueue("test-" + UUID.randomUUID()); - pulsarContainer.getAdmin() + pulsarContainer + .getAdmin() .topics() .createPartitionedTopic(factory.getPulsarTopicName(destination), 10); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java index 3baadb25..02d37bd0 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/utils/PulsarContainerExtension.java @@ -35,7 +35,7 @@ @Slf4j public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback { - public static final String PULSAR_IMAGE = "apachepulsar/pulsar:3.0.0"; + public static final String PULSAR_IMAGE = "datastax/lunastreaming:3.1_3.1"; private PulsarContainer pulsarContainer; private Consumer onContainerReady; private Map env = new HashMap<>(); @@ -43,6 +43,7 @@ public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCall private Network network; private PulsarAdmin admin; + private boolean logContainerOutput = false; public PulsarContainerExtension() { env.put("PULSAR_PREFIX_acknowledgmentAtBatchIndexLevelEnabled", "true"); @@ -72,8 +73,11 @@ public void afterAll(ExtensionContext extensionContext) { public void beforeAll(ExtensionContext extensionContext) { network = Network.newNetwork(); CountDownLatch pulsarReady = new CountDownLatch(1); + log.info("ENV: {}", env); pulsarContainer = - new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE)) + new PulsarContainer( + DockerImageName.parse(PULSAR_IMAGE) + .asCompatibleSubstituteFor("apachepulsar/pulsar")) .withNetwork(network) .withEnv(env) .withLogConsumer( @@ -82,10 +86,16 @@ public void beforeAll(ExtensionContext extensionContext) { if (text.contains("messaging service is ready")) { pulsarReady.countDown(); } - log.debug(text); + if (logContainerOutput) { + log.info(text); + } else { + log.debug(text); + } }) .withCopyFileToContainer( - MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters"); + MountableFile.forHostPath("target/classes/filters"), "/pulsar/filters") + .withCopyFileToContainer( + MountableFile.forHostPath("target/classes/interceptors"), "/pulsar/interceptors"); // start Pulsar and wait for it to be ready to accept requests pulsarContainer.start(); assertTrue(pulsarReady.await(1, TimeUnit.MINUTES)); @@ -104,6 +114,11 @@ public PulsarContainerExtension withOnContainerReady( return this; } + public PulsarContainerExtension withLogContainerOutput(boolean logContainerOutput) { + this.logContainerOutput = logContainerOutput; + return this; + } + public PulsarContainerExtension withEnv(String key, String value) { this.env.put(key, value); return this;