diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java index 35930053..354bd765 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java @@ -18,13 +18,14 @@ import io.netty.buffer.ByteBuf; import java.nio.charset.StandardCharsets; import java.util.Base64; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -160,16 +161,19 @@ public FilterResult filterEntry(Entry entry, FilterContext context) { Commands.deSerializeSingleMessageInBatch( uncompressedPayload, singleMessageMetadata, i, numMessages); try { - Map typedProperties = - buildProperties( + PropertyEvaluator typedProperties = + new PropertyEvaluator( singleMessageMetadata.getPropertiesCount(), - singleMessageMetadata.getPropertiesList()); - + singleMessageMetadata.getPropertiesList(), + destinationTypeForTheClient, + topicName, + singleMessageMetadata, + null); // noLocal filter // all the messages in the batch come from the Producer/Connection // so we can reject the whole batch immediately at the first entry if (!filterJMSConnectionID.isEmpty() - && filterJMSConnectionID.equals(typedProperties.get("JMSConnectionID"))) { + && filterJMSConnectionID.equals(typedProperties.apply("JMSConnectionID"))) { if (isExclusive || forceDropRejected) { return FilterResult.REJECT; } else { @@ -190,27 +194,12 @@ public FilterResult filterEntry(Entry entry, FilterContext context) { boolean matches = true; if (selector != null) { - matches = - matches( - typedProperties, - singleMessageMetadata, - null, - topicName, - selector, - destinationTypeForTheClient, - jmsExpiration); + matches = matches(typedProperties, selector); } if (!jmsSelectorOnSubscription.isEmpty()) { boolean matchesSubscriptionFilter = - matches( - typedProperties, - null, - metadata, - topicName, - selectorOnSubscription, - destinationTypeForTheClient, - jmsExpiration); + matches(typedProperties, selectorOnSubscription); matches = matches && matchesSubscriptionFilter; if (matchesSubscriptionFilter) { allFilteredBySubscriptionFilter = false; @@ -239,9 +228,14 @@ public FilterResult filterEntry(Entry entry, FilterContext context) { // here we are dealing with a single message, // so we can reject the message more easily - - Map typedProperties = - buildProperties(metadata.getPropertiesCount(), metadata.getPropertiesList()); + PropertyEvaluator typedProperties = + new PropertyEvaluator( + metadata.getPropertiesCount(), + metadata.getPropertiesList(), + destinationTypeForTheClient, + topicName, + null, + metadata); // timetoLive filter long jmsExpiration = getJMSExpiration(typedProperties); @@ -252,15 +246,7 @@ public FilterResult filterEntry(Entry entry, FilterContext context) { } if (!jmsSelectorOnSubscription.isEmpty()) { - boolean matchesSubscriptionFilter = - matches( - typedProperties, - null, - metadata, - topicName, - selectorOnSubscription, - destinationTypeForTheClient, - jmsExpiration); + boolean matchesSubscriptionFilter = matches(typedProperties, selectorOnSubscription); // the subscription filter always deletes the messages if (!matchesSubscriptionFilter) { return FilterResult.REJECT; @@ -269,19 +255,11 @@ public FilterResult filterEntry(Entry entry, FilterContext context) { boolean matches = true; if (selector != null) { - matches = - matches( - typedProperties, - null, - metadata, - topicName, - selector, - destinationTypeForTheClient, - jmsExpiration); + matches = matches(typedProperties, selector); } if (!filterJMSConnectionID.isEmpty() - && filterJMSConnectionID.equals(typedProperties.get("JMSConnectionID"))) { + && filterJMSConnectionID.equals(typedProperties.apply("JMSConnectionID"))) { if (isExclusive || forceDropRejected) { return FilterResult.REJECT; } else { @@ -301,11 +279,133 @@ public FilterResult filterEntry(Entry entry, FilterContext context) { } } - private static long getJMSExpiration(Map typedProperties) { + @AllArgsConstructor + private static class PropertyEvaluator implements Function { + private int propertiesCount; + private List propertiesList; + private String destinationTypeForTheClient; + private String topicName; + private SingleMessageMetadata singleMessageMetadata; + private MessageMetadata metadata; + + private Object getProperty(String name) { + return JMSFilter.getProperty(propertiesCount, propertiesList, name); + } + + @Override + public Object apply(String name) { + switch (name) { + case "JMSReplyTo": + { + String _jmsReplyTo = safeString(getProperty("JMSReplyTo")); + Destination jmsReplyTo = null; + if (_jmsReplyTo != null) { + String jmsReplyToType = getProperty("JMSReplyToType") + ""; + switch (jmsReplyToType) { + case "topic": + return new ActiveMQTopic(_jmsReplyTo); + default: + return new ActiveMQQueue(_jmsReplyTo); + } + } else { + return null; + } + } + case "JMSDestination": + { + return "queue".equalsIgnoreCase(destinationTypeForTheClient) + ? new ActiveMQQueue(topicName) + : new ActiveMQTopic(topicName); + } + case "JMSType": + case "JMSMessageId": + return getProperty(name); + case "JMSCorrelationID": + { + String _correlationId = safeString(getProperty("JMSCorrelationID")); + if (_correlationId != null) { + return new String(Base64.getDecoder().decode(_correlationId), StandardCharsets.UTF_8); + } else { + return null; + } + } + case "JMSPriority": + { + Object jmsPriorityString = getProperty("JMSPriority"); + if (jmsPriorityString != null) { + try { + return Integer.parseInt(jmsPriorityString + ""); + } catch (NumberFormatException err) { + // cannot decode priority, not a big deal as it is not supported in Pulsar + return Message.DEFAULT_PRIORITY; + } + } + } + case "JMSDeliveryMode": + { + Object deliveryModeString = getProperty("JMSDeliveryMode"); + if (deliveryModeString != null) { + try { + return Integer.parseInt(deliveryModeString + ""); + } catch (NumberFormatException err) { + // cannot decode deliveryMode, not a big deal as it is not supported in Pulsar + } + } + return Message.DEFAULT_DELIVERY_MODE; + } + case "JMSTimestamp": + { + if (singleMessageMetadata != null) { + if (singleMessageMetadata.hasEventTime()) { + return singleMessageMetadata.getEventTime(); + } + } + if (metadata != null) { + if (metadata.hasEventTime()) { + return metadata.getEventTime(); + } + } + return 0; + } + case "JMSXDeliveryCount": + // this is not supported on the broker + return 0; + case "JMSXGroupID": + { + if (singleMessageMetadata != null && singleMessageMetadata.hasPartitionKey()) { + return singleMessageMetadata.getPartitionKey(); + } + if (metadata != null && metadata.hasPartitionKey()) { + return metadata.getPartitionKey(); + } + return ""; + } + case "JMSXGroupSeq": + { + Object rawJMSXGroupSeq = getProperty("JMSXGroupSeq"); + if (rawJMSXGroupSeq != null) { + return rawJMSXGroupSeq; + } + if (singleMessageMetadata != null && singleMessageMetadata.hasSequenceId()) { + return singleMessageMetadata.getSequenceId() + ""; + } + if (metadata != null && metadata.hasSequenceId()) { + return metadata.getSequenceId() + ""; + } + return "0"; + } + default: + return getProperty(name); + } + } + } + + private static long getJMSExpiration(Function typedProperties) { long jmsExpiration = 0; - if (typedProperties.containsKey("JMSExpiration")) { + Object value = typedProperties.apply("JMSExpiration"); + if (value != null) { try { - jmsExpiration = Long.parseLong(typedProperties.get("JMSExpiration") + ""); + jmsExpiration = Long.parseLong(value + ""); } catch (NumberFormatException err) { // cannot decode JMSExpiration } @@ -313,22 +413,26 @@ private static long getJMSExpiration(Map typedProperties) { return jmsExpiration; } - private Map buildProperties(int propertiesCount, List propertiesList) { - Map properties = new HashMap<>(); - if (propertiesCount > 0) { - propertiesList.forEach( - kv -> { - properties.put(kv.getKey(), kv.getValue()); - }); + private static Object getProperty( + int propertiesCount, List propertiesList, String name) { + if (propertiesCount <= 0) { + return null; } - Map typedProperties = new HashMap<>(); - properties.forEach( - (k, v) -> { - if (!k.equals("_jsmtype")) { - typedProperties.put(k, getObjectProperty(k, properties)); - } - }); - return typedProperties; + String type = null; + String value = null; + String typeProperty = propertyType(name); + for (KeyValue keyValue : propertiesList) { + String key = keyValue.getKey(); + if (key.equals(typeProperty)) { + type = keyValue.getValue(); + } else if (key.equals(name)) { + value = keyValue.getValue(); + } + if (type != null && value != null) { + break; + } + } + return getObjectProperty(value, type); } @Override @@ -340,13 +444,14 @@ private static String propertyType(String name) { return name + "_jsmtype"; } - public static Object getObjectProperty(String name, Map properties) { - - String value = properties.getOrDefault(name, null); + private static Object getObjectProperty(String value, String type) { if (value == null) { return null; } - String type = properties.getOrDefault(propertyType(name), "string"); + if (type == null) { + // strings + return value; + } switch (type) { case "string": return value; @@ -374,117 +479,9 @@ private static String safeString(Object value) { return value == null ? null : value.toString(); } - private static boolean matches( - Map typedProperties, - SingleMessageMetadata singleMessageMetadata, - MessageMetadata metadata, - String topicName, - SelectorSupport selector, - String destinationTypeForTheClient, - long jmsExpiration) + private static boolean matches(Function typedProperties, SelectorSupport selector) throws JMSException { - String _jmsReplyTo = safeString(typedProperties.get("JMSReplyTo")); - Destination jmsReplyTo = null; - if (_jmsReplyTo != null) { - String jmsReplyToType = typedProperties.get("JMSReplyToType") + ""; - switch (jmsReplyToType) { - case "topic": - jmsReplyTo = new ActiveMQTopic(_jmsReplyTo); - break; - default: - jmsReplyTo = new ActiveMQQueue(_jmsReplyTo); - } - } - - Destination destination = - "queue".equalsIgnoreCase(destinationTypeForTheClient) - ? new ActiveMQQueue(topicName) - : new ActiveMQTopic(topicName); - - String jmsType = safeString(typedProperties.get("JMSType")); - String messageId = safeString(typedProperties.get("JMSMessageId")); - String correlationId = null; - String _correlationId = safeString(typedProperties.get("JMSCorrelationID")); - if (_correlationId != null) { - correlationId = new String(Base64.getDecoder().decode(correlationId), StandardCharsets.UTF_8); - } - int jmsPriority = Message.DEFAULT_PRIORITY; - if (typedProperties.containsKey("JMSPriority")) { - try { - jmsPriority = Integer.parseInt(typedProperties.get("JMSPriority") + ""); - } catch (NumberFormatException err) { - // cannot decode priority, not a big deal as it is not supported in Pulsar - } - } - int deliveryMode = Message.DEFAULT_DELIVERY_MODE; - if (typedProperties.containsKey("JMSDeliveryMode")) { - try { - deliveryMode = Integer.parseInt(typedProperties.get("JMSDeliveryMode") + ""); - } catch (NumberFormatException err) { - // cannot decode deliveryMode, not a big deal as it is not supported in Pulsar - } - } - - // this is optional - long jmsTimestamp = 0; - - if (singleMessageMetadata != null) { - - if (singleMessageMetadata.hasEventTime()) { - jmsTimestamp = singleMessageMetadata.getEventTime(); - } - - // JMSXDeliveryCount not supported here - // typedProperties.put("JMSXDeliveryCount", (singleMessageMetadata.getRedeliveryCount() + 1) + - // ""); - - if (singleMessageMetadata.hasPartitionKey()) { - typedProperties.put("JMSXGroupID", singleMessageMetadata.getPartitionKey()); - } else { - typedProperties.put("JMSXGroupID", ""); - } - - if (!typedProperties.containsKey("JMSXGroupSeq")) { - if (singleMessageMetadata.hasSequenceId()) { - typedProperties.put("JMSXGroupSeq", singleMessageMetadata.getSequenceId() + ""); - } else { - typedProperties.put("JMSXGroupSeq", "0"); - } - } - - } else { - if (metadata.hasEventTime()) { - jmsTimestamp = metadata.getEventTime(); - } - - // JMSXDeliveryCount not supported here - // typedProperties.put("JMSXDeliveryCount", (metadata.getRedeliveryCount() + 1) + ""); - - if (metadata.hasPartitionKey()) { - typedProperties.put("JMSXGroupID", metadata.getPartitionKey()); - } else { - typedProperties.put("JMSXGroupID", ""); - } - - if (!typedProperties.containsKey("JMSXGroupSeq")) { - if (metadata.hasSequenceId()) { - typedProperties.put("JMSXGroupSeq", metadata.getSequenceId() + ""); - } else { - typedProperties.put("JMSXGroupSeq", "0"); - } - } - } - return selector.matches( - typedProperties, - messageId, - correlationId, - jmsReplyTo, - destination, - deliveryMode, - jmsType, - jmsExpiration, - jmsPriority, - jmsTimestamp); + return selector.matches(typedProperties); } } diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java index fc8d5cf0..e55d5120 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java @@ -18,6 +18,8 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -26,6 +28,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.MessageId; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; @@ -52,18 +55,14 @@ public static SelectorSupport build(String selector, boolean enabled) throws JMS return new SelectorSupport(parse, selector); } - public boolean matches( - Map messageProperties, - String jmsMessageId, - String jmsCorrelationId, - Destination jmsReplyTo, - Destination jmsDestination, - int jmsDeliveryMode, - String jmsType, - long jmsExpiration, - int jmsPriority, - long jmsTimestamp) - throws JMSException { + public boolean matches(Function messagePropertiesAccessor) throws JMSException { + Map cache = new HashMap<>(); + + // this cache is important in order to be able to not parse Message Metadata more than once + // for complex selectors that refer more times to the same property + Function messageProperties = + (name) -> cache.computeIfAbsent(name, messagePropertiesAccessor::apply); + // convert anything that can be used by the selector // https://github.com/apache/activemq/blob/d54d046b8a8f2e9e5c0a28e1f8c7634b3c8b18e4/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java#L35 @@ -108,11 +107,117 @@ public int getReferenceCount() { return 0; } + @Override + public String getJMSMessageID() { + return (String) messageProperties.apply("JMSMessageID"); + } + + @Override + public MessageId getMessageId() { + return new MessageId(getJMSMessageID()); + } + + @Override + public Destination getJMSReplyTo() { + return (Destination) messageProperties.apply("JMSReplyTo"); + } + + @Override + public ActiveMQDestination getReplyTo() { + return (ActiveMQDestination) messageProperties.apply("JMSReplyTo"); + } + + @Override + public ActiveMQDestination getOriginalDestination() { + return (ActiveMQDestination) messageProperties.apply("JMSDestination"); + } + + @Override + public String getJMSCorrelationID() { + return (String) messageProperties.apply("JMSCorrelationID"); + } + + @Override + public String getCorrelationId() { + return (String) messageProperties.apply("JMSCorrelationID"); + } + + @Override + public long getJMSTimestamp() { + return (long) messageProperties.apply("JMSTimestamp"); + } + + @Override + public long getTimestamp() { + return (long) messageProperties.apply("JMSTimestamp"); + } + + @Override + public String getGroupID() { + return (String) messageProperties.apply("JMSXGroupID"); + } + + @Override + public int getGroupSequence() { + return (int) messageProperties.apply("JMSXGroupSeq"); + } + + @Override + public boolean isRedelivered() { + // not supported + return false; + } + + @Override + public long getJMSExpiration() { + return (long) messageProperties.apply("JMSExpiration"); + } + + @Override + public long getExpiration() { + return (long) messageProperties.apply("JMSExpiration"); + } + + @Override + public int getJMSPriority() { + return (int) messageProperties.apply("JMSPriority"); + } + + @Override + public byte getPriority() { + return (byte) ((int) messageProperties.apply("JMSPriority")); + } + + @Override + public int getJMSDeliveryMode() { + return (int) messageProperties.apply("JMSDeliveryMode"); + } + + @Override + public boolean isPersistent() { + return ((int) messageProperties.apply("JMSDeliveryMode")) == DeliveryMode.PERSISTENT; + } + + @Override + public String getJMSType() { + return (String) messageProperties.apply("JMSType"); + } + + @Override + public String getType() { + return (String) messageProperties.apply("JMSType"); + } + + @Override + public Destination getJMSDestination() { + return (Destination) messageProperties.apply("JMSDestination"); + } + // saving CPU cycles here, PropertyExpression calls this method for non-system properties // https://github.com/apache/activemq/blob/d54d046b8a8f2e9e5c0a28e1f8c7634b3c8b18e4/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java#L226 @Override public Object getProperty(String name) { - return messageProperties.get(name); + return messageProperties.apply(name); } @Override @@ -120,16 +225,6 @@ public Map getProperties() { throw new UnsupportedOperationException("not supported - getProperties"); }; }; - // the is no need to call toMessage.setProperties() - toMessage.setJMSMessageID(jmsMessageId); - toMessage.setJMSCorrelationID(jmsCorrelationId); - toMessage.setJMSReplyTo(ActiveMQDestination.transform(jmsReplyTo)); - toMessage.setJMSDestination(ActiveMQDestination.transform(jmsDestination)); - toMessage.setJMSDeliveryMode(jmsDeliveryMode); - toMessage.setJMSType(jmsType); - toMessage.setJMSExpiration(jmsExpiration); - toMessage.setJMSPriority(jmsPriority); - toMessage.setJMSTimestamp(jmsTimestamp); context.setMessageReference(toMessage); return expression.matches(context); } @@ -145,17 +240,36 @@ public boolean matches(Message fromMessage) throws JMSException { Object obj = fromMessage.getObjectProperty(name); properties.put(name, obj); } - return matches( - properties, - fromMessage.getJMSMessageID(), - fromMessage.getJMSCorrelationID(), - fromMessage.getJMSReplyTo(), - fromMessage.getJMSDestination(), - fromMessage.getJMSDeliveryMode(), - fromMessage.getJMSType(), - fromMessage.getJMSExpiration(), - fromMessage.getJMSPriority(), - fromMessage.getJMSTimestamp()); + Function getProperty = + (name) -> { + try { + switch (name) { + case "JMSMessageID": + return fromMessage.getJMSMessageID(); + case "JMSCorrelationID": + return fromMessage.getJMSCorrelationID(); + case "JMSReplyTo": + return fromMessage.getJMSReplyTo(); + case "JMSDestination": + return fromMessage.getJMSDestination(); + case "JMSDeliveryMode": + return fromMessage.getJMSDeliveryMode(); + case "JMSType": + return fromMessage.getJMSType(); + case "JMSExpiration": + return fromMessage.getJMSExpiration(); + case "JMSPriority": + return fromMessage.getJMSPriority(); + case "JMSTimestamp": + return fromMessage.getJMSTimestamp(); + default: + return properties.get(name); + } + } catch (JMSException err) { + throw new RuntimeException(err); + } + }; + return matches(getProperty); } @Override diff --git a/pulsar-jms-filters/src/test/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupportTest.java b/pulsar-jms-filters/src/test/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupportTest.java index 3ccc5b3b..65937254 100644 --- a/pulsar-jms-filters/src/test/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupportTest.java +++ b/pulsar-jms-filters/src/test/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupportTest.java @@ -19,6 +19,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import javax.jms.DeliveryMode; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.junit.jupiter.api.Test; class SelectorSupportTest { @@ -42,10 +47,48 @@ public void test() throws Exception { match(false, "undefinedProperty"); } + @Test + public void testSpecialKeywords() throws Exception { + match(true, "JMSMessageID = '0:1:9:-1'"); + match(true, "JMSMessageID is not null"); + match(false, "JMSMessageID is null"); + match(true, "JMSReplyTo = 'queue://persistent://public/default/testReply'"); + match(true, "JMSDestination = 'topic://persistent://public/default/test'"); + match(true, "JMSCorrelationID = '0:1:2:3'"); + match(true, "JMSDeliveryMode = 'PERSISTENT'"); + match(true, "JMSType = 'my-type'"); + match(true, "JMSExpiration = 1234"); + match(true, "JMSPriority = 5"); + match(true, "JMSTimestamp = 5234234"); + } + private static void match(boolean expected, String selector) throws Exception { SelectorSupport build = SelectorSupport.build(selector, true); + Map propertyAccessCount = new HashMap<>(); Map properties = new HashMap<>(); properties.put("foo", "bar"); - assertEquals(expected, build.matches(properties, "", "", null, null, 0, null, 0, 0, 0)); + properties.put("JMSMessageID", "0:1:9:-1"); + properties.put("JMSReplyTo", new ActiveMQQueue("persistent://public/default/testReply")); + properties.put("JMSDestination", new ActiveMQTopic("persistent://public/default/test")); + + properties.put("JMSCorrelationID", "0:1:2:3"); + properties.put("JMSDeliveryMode", DeliveryMode.PERSISTENT); + properties.put("JMSType", "my-type"); + properties.put("JMSExpiration", 1234L); + properties.put("JMSPriority", 5); + properties.put("JMSTimestamp", 5234234L); + + Function spy = + (k) -> { + propertyAccessCount.computeIfAbsent(k, v -> new AtomicInteger()).incrementAndGet(); + return properties.get(k); + }; + assertEquals(expected, build.matches(spy)); + + // SelectorSupport has a cache to prevent multiple requests for the same key + propertyAccessCount.forEach( + (property, count) -> { + assertEquals(1, count.get()); + }); } } diff --git a/pulsar-jms-integration-tests/pom.xml b/pulsar-jms-integration-tests/pom.xml index 693e74cf..8f0dea7d 100644 --- a/pulsar-jms-integration-tests/pom.xml +++ b/pulsar-jms-integration-tests/pom.xml @@ -97,8 +97,8 @@ copy filters - - + + diff --git a/pulsar-jms/pom.xml b/pulsar-jms/pom.xml index b9e032d3..a7afc777 100644 --- a/pulsar-jms/pom.xml +++ b/pulsar-jms/pom.xml @@ -108,8 +108,8 @@ copy filters - - + + diff --git a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java index d7fc7b33..f663d934 100644 --- a/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java +++ b/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageConsumer.java @@ -522,15 +522,15 @@ public synchronized void close() throws JMSException { } if (!session.isTransactionStarted()) { session.executeCriticalOperation( - () -> { - try { - consumer.close(); - session.removeConsumer(this); - return null; - } catch (Exception err) { - throw Utils.handleException(err); - } - }); + () -> { + try { + consumer.close(); + session.removeConsumer(this); + return null; + } catch (Exception err) { + throw Utils.handleException(err); + } + }); } else if (session.getTransacted()) { closedWhileActiveTransaction = true; } diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java index 235ee16b..66578ae3 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/SelectorsTestsBase.java @@ -16,6 +16,7 @@ package com.datastax.oss.pulsar.jms; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -93,6 +94,9 @@ private Map buildProperties() { Map consumerConfig = new HashMap<>(); properties.put("consumerConfig", consumerConfig); + // batchIndexAckEnabled is required in order for the client to be able to + // negatively/positively acknowledge single messages inside a batch + consumerConfig.put("batchIndexAckEnabled", true); return properties; } @@ -407,11 +411,6 @@ public void sendBatchWithCompetingConsumersOnQueue() throws Exception { producerConfig.put("batchingMaxMessages", "5"); } - // batchIndexAckEnabled is required in order for the client to be able to - // negatively/positively acknowledge single messages inside a batch - Map consumerConfig = (Map) properties.get("consumerConfig"); - consumerConfig.put("batchIndexAckEnabled", true); - try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ) { try (PulsarConnection connection = factory.createConnection()) { connection.start(); @@ -897,16 +896,30 @@ public void onException(Message message, Exception e) { for (int i = 0; i < 20; i++) { if ((i % 2 == 0) && (i % 3 == 0)) { TextMessage textMessage = (TextMessage) consumer1.receive(); - log.info("received {}", textMessage.getText()); + log.info( + "received {} {}", + textMessage.getText(), + ((PulsarMessage) textMessage).getReceivedPulsarMessage().getMessageId()); assertEquals("foo-" + i, textMessage.getText()); } } // no more messages - assertNull(consumer1.receive(1000)); + TextMessage receive = (TextMessage) consumer1.receive(1000); + boolean failed = false; + if (receive != null) { + failed = true; + log.info( + "FAILED ! received {} {}", + receive.getText(), + ((PulsarMessage) receive).getReceivedPulsarMessage().getMessageId()); + receive = (TextMessage) consumer1.receive(1000); + } + assertFalse(failed); if (enableBatching) { - assertEquals(20, consumer1.getReceivedMessages()); - assertEquals(16, consumer1.getSkippedMessages()); + assertTrue( + consumer1.getReceivedMessages() == 20 || consumer1.getReceivedMessages() == 21); + assertEquals(consumer1.getReceivedMessages() - 4, consumer1.getSkippedMessages()); } else { assertEquals(4, consumer1.getReceivedMessages()); assertEquals(0, consumer1.getSkippedMessages()); diff --git a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java index 2d6be59a..2f0800ea 100644 --- a/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java +++ b/pulsar-jms/src/test/java/com/datastax/oss/pulsar/jms/TransactionsTest.java @@ -284,7 +284,6 @@ public void consumeRollbackTransactionTest() throws Exception { Message receive = consumer.receive(); assertEquals("foo", receive.getBody(String.class)); - } // rollback transaction AFTER closing the Consumer @@ -301,7 +300,6 @@ public void consumeRollbackTransactionTest() throws Exception { } } - @Test public void consumeRollbackTransaction2Test() throws Exception { @@ -317,7 +315,7 @@ public void consumeRollbackTransaction2Test() throws Exception { try (Session producerSession = connection.createSession(); ) { Destination destination = - producerSession.createQueue("persistent://public/default/test-" + UUID.randomUUID()); + producerSession.createQueue("persistent://public/default/test-" + UUID.randomUUID()); try (Session transaction = connection.createSession(Session.SESSION_TRANSACTED); ) {