Skip to content

Commit

Permalink
Lazily evaluate Message Properties (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Jun 6, 2022
1 parent 030092d commit 73ee0a0
Show file tree
Hide file tree
Showing 8 changed files with 402 additions and 237 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
Expand All @@ -26,6 +28,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.selector.SelectorParser;
Expand All @@ -52,18 +55,14 @@ public static SelectorSupport build(String selector, boolean enabled) throws JMS
return new SelectorSupport(parse, selector);
}

public boolean matches(
Map<String, Object> messageProperties,
String jmsMessageId,
String jmsCorrelationId,
Destination jmsReplyTo,
Destination jmsDestination,
int jmsDeliveryMode,
String jmsType,
long jmsExpiration,
int jmsPriority,
long jmsTimestamp)
throws JMSException {
public boolean matches(Function<String, Object> messagePropertiesAccessor) throws JMSException {
Map<String, Object> cache = new HashMap<>();

// this cache is important in order to be able to not parse Message Metadata more than once
// for complex selectors that refer more times to the same property
Function<String, Object> messageProperties =
(name) -> cache.computeIfAbsent(name, messagePropertiesAccessor::apply);

// convert anything that can be used by the selector
// https://github.com/apache/activemq/blob/d54d046b8a8f2e9e5c0a28e1f8c7634b3c8b18e4/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java#L35

Expand Down Expand Up @@ -108,28 +107,124 @@ public int getReferenceCount() {
return 0;
}

@Override
public String getJMSMessageID() {
return (String) messageProperties.apply("JMSMessageID");
}

@Override
public MessageId getMessageId() {
return new MessageId(getJMSMessageID());
}

@Override
public Destination getJMSReplyTo() {
return (Destination) messageProperties.apply("JMSReplyTo");
}

@Override
public ActiveMQDestination getReplyTo() {
return (ActiveMQDestination) messageProperties.apply("JMSReplyTo");
}

@Override
public ActiveMQDestination getOriginalDestination() {
return (ActiveMQDestination) messageProperties.apply("JMSDestination");
}

@Override
public String getJMSCorrelationID() {
return (String) messageProperties.apply("JMSCorrelationID");
}

@Override
public String getCorrelationId() {
return (String) messageProperties.apply("JMSCorrelationID");
}

@Override
public long getJMSTimestamp() {
return (long) messageProperties.apply("JMSTimestamp");
}

@Override
public long getTimestamp() {
return (long) messageProperties.apply("JMSTimestamp");
}

@Override
public String getGroupID() {
return (String) messageProperties.apply("JMSXGroupID");
}

@Override
public int getGroupSequence() {
return (int) messageProperties.apply("JMSXGroupSeq");
}

@Override
public boolean isRedelivered() {
// not supported
return false;
}

@Override
public long getJMSExpiration() {
return (long) messageProperties.apply("JMSExpiration");
}

@Override
public long getExpiration() {
return (long) messageProperties.apply("JMSExpiration");
}

@Override
public int getJMSPriority() {
return (int) messageProperties.apply("JMSPriority");
}

@Override
public byte getPriority() {
return (byte) ((int) messageProperties.apply("JMSPriority"));
}

@Override
public int getJMSDeliveryMode() {
return (int) messageProperties.apply("JMSDeliveryMode");
}

@Override
public boolean isPersistent() {
return ((int) messageProperties.apply("JMSDeliveryMode")) == DeliveryMode.PERSISTENT;
}

@Override
public String getJMSType() {
return (String) messageProperties.apply("JMSType");
}

@Override
public String getType() {
return (String) messageProperties.apply("JMSType");
}

@Override
public Destination getJMSDestination() {
return (Destination) messageProperties.apply("JMSDestination");
}

// saving CPU cycles here, PropertyExpression calls this method for non-system properties
// https://github.com/apache/activemq/blob/d54d046b8a8f2e9e5c0a28e1f8c7634b3c8b18e4/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java#L226
@Override
public Object getProperty(String name) {
return messageProperties.get(name);
return messageProperties.apply(name);
}

@Override
public Map<String, Object> getProperties() {
throw new UnsupportedOperationException("not supported - getProperties");
};
};
// the is no need to call toMessage.setProperties()
toMessage.setJMSMessageID(jmsMessageId);
toMessage.setJMSCorrelationID(jmsCorrelationId);
toMessage.setJMSReplyTo(ActiveMQDestination.transform(jmsReplyTo));
toMessage.setJMSDestination(ActiveMQDestination.transform(jmsDestination));
toMessage.setJMSDeliveryMode(jmsDeliveryMode);
toMessage.setJMSType(jmsType);
toMessage.setJMSExpiration(jmsExpiration);
toMessage.setJMSPriority(jmsPriority);
toMessage.setJMSTimestamp(jmsTimestamp);
context.setMessageReference(toMessage);
return expression.matches(context);
}
Expand All @@ -145,17 +240,36 @@ public boolean matches(Message fromMessage) throws JMSException {
Object obj = fromMessage.getObjectProperty(name);
properties.put(name, obj);
}
return matches(
properties,
fromMessage.getJMSMessageID(),
fromMessage.getJMSCorrelationID(),
fromMessage.getJMSReplyTo(),
fromMessage.getJMSDestination(),
fromMessage.getJMSDeliveryMode(),
fromMessage.getJMSType(),
fromMessage.getJMSExpiration(),
fromMessage.getJMSPriority(),
fromMessage.getJMSTimestamp());
Function<String, Object> getProperty =
(name) -> {
try {
switch (name) {
case "JMSMessageID":
return fromMessage.getJMSMessageID();
case "JMSCorrelationID":
return fromMessage.getJMSCorrelationID();
case "JMSReplyTo":
return fromMessage.getJMSReplyTo();
case "JMSDestination":
return fromMessage.getJMSDestination();
case "JMSDeliveryMode":
return fromMessage.getJMSDeliveryMode();
case "JMSType":
return fromMessage.getJMSType();
case "JMSExpiration":
return fromMessage.getJMSExpiration();
case "JMSPriority":
return fromMessage.getJMSPriority();
case "JMSTimestamp":
return fromMessage.getJMSTimestamp();
default:
return properties.get(name);
}
} catch (JMSException err) {
throw new RuntimeException(err);
}
};
return matches(getProperty);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import javax.jms.DeliveryMode;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.jupiter.api.Test;

class SelectorSupportTest {
Expand All @@ -42,10 +47,48 @@ public void test() throws Exception {
match(false, "undefinedProperty");
}

@Test
public void testSpecialKeywords() throws Exception {
match(true, "JMSMessageID = '0:1:9:-1'");
match(true, "JMSMessageID is not null");
match(false, "JMSMessageID is null");
match(true, "JMSReplyTo = 'queue://persistent://public/default/testReply'");
match(true, "JMSDestination = 'topic://persistent://public/default/test'");
match(true, "JMSCorrelationID = '0:1:2:3'");
match(true, "JMSDeliveryMode = 'PERSISTENT'");
match(true, "JMSType = 'my-type'");
match(true, "JMSExpiration = 1234");
match(true, "JMSPriority = 5");
match(true, "JMSTimestamp = 5234234");
}

private static void match(boolean expected, String selector) throws Exception {
SelectorSupport build = SelectorSupport.build(selector, true);
Map<String, AtomicInteger> propertyAccessCount = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
properties.put("foo", "bar");
assertEquals(expected, build.matches(properties, "", "", null, null, 0, null, 0, 0, 0));
properties.put("JMSMessageID", "0:1:9:-1");
properties.put("JMSReplyTo", new ActiveMQQueue("persistent://public/default/testReply"));
properties.put("JMSDestination", new ActiveMQTopic("persistent://public/default/test"));

properties.put("JMSCorrelationID", "0:1:2:3");
properties.put("JMSDeliveryMode", DeliveryMode.PERSISTENT);
properties.put("JMSType", "my-type");
properties.put("JMSExpiration", 1234L);
properties.put("JMSPriority", 5);
properties.put("JMSTimestamp", 5234234L);

Function<String, Object> spy =
(k) -> {
propertyAccessCount.computeIfAbsent(k, v -> new AtomicInteger()).incrementAndGet();
return properties.get(k);
};
assertEquals(expected, build.matches(spy));

// SelectorSupport has a cache to prevent multiple requests for the same key
propertyAccessCount.forEach(
(property, count) -> {
assertEquals(1, count.get());
});
}
}
4 changes: 2 additions & 2 deletions pulsar-jms-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@
<configuration>
<tasks>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-${project.version}-nar.nar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</tasks>
</configuration>
</execution>
Expand Down
4 changes: 2 additions & 2 deletions pulsar-jms/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@
<configuration>
<tasks>
<echo>copy filters</echo>
<mkdir dir="${project.build.outputDirectory}/filters" />
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-filters-${project.version}.jar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar" />
<mkdir dir="${project.build.outputDirectory}/filters"/>
<copy verbose="true" file="${basedir}/../pulsar-jms-filters/target/pulsar-jms-filters-${project.version}.jar" tofile="${project.build.outputDirectory}/filters/jms-filter.nar"/>
</tasks>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,15 +522,15 @@ public synchronized void close() throws JMSException {
}
if (!session.isTransactionStarted()) {
session.executeCriticalOperation(
() -> {
try {
consumer.close();
session.removeConsumer(this);
return null;
} catch (Exception err) {
throw Utils.handleException(err);
}
});
() -> {
try {
consumer.close();
session.removeConsumer(this);
return null;
} catch (Exception err) {
throw Utils.handleException(err);
}
});
} else if (session.getTransacted()) {
closedWhileActiveTransaction = true;
}
Expand Down
Loading

0 comments on commit 73ee0a0

Please sign in to comment.