diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java index 64408315..c4945a60 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java @@ -566,8 +566,7 @@ private static class PropertyEvaluator implements Function { private Object getProperty(String name) { if (messageMetadataCache != null) { - return messageMetadataCache.getProperty( - name, n -> JMSFilter.getProperty(propertiesCount, propertiesList, n)); + return messageMetadataCache.getProperty(name); } return JMSFilter.getProperty(propertiesCount, propertiesList, name); } @@ -729,6 +728,20 @@ private static Object getProperty( return getObjectProperty(value, type); } + static Object getProperty(Map cacheProperties, String name) { + if (cacheProperties.isEmpty()) { + return null; + } + // we don't write the type for system fields + // we pre-compute the type in order to avoid to scan the list to fine the type + String type = SYSTEM_PROPERTIES_TYPES.get(name); + if (type == null) { + type = propertyType(name); + } + String value = cacheProperties.get(name); + return getObjectProperty(value, type); + } + @Override public void close() { selectors.clear(); diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java index 1e171098..499fe303 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSPublishFilters.java @@ -411,7 +411,7 @@ private void filterAndAckMessage( // if we have more than one subscription we can save a lot of resources by caching the // properties MessageMetadataCache messageMetadataCache = - subscriptions.size() > 1 ? new MessageMetadataCache() : null; + subscriptions.size() > 1 ? new MessageMetadataCache(messageMetadata) : null; for (Subscription subscription : subscriptions) { if (closed.get()) { // the broker is shutting down, we cannot process the entries diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java index c0b2f685..311acd5e 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/MessageMetadataCache.java @@ -17,12 +17,36 @@ import java.util.HashMap; import java.util.Map; -import java.util.function.Function; +import org.apache.pulsar.common.api.proto.MessageMetadata; -class MessageMetadataCache { +final class MessageMetadataCache { + final Map rawProperties = new HashMap<>(); final Map properties = new HashMap<>(); + private static final Object CACHED_NULL = new Object(); - Object getProperty(String key, Function compute) { - return properties.computeIfAbsent(key, compute); + MessageMetadataCache(MessageMetadata metadata) { + // computing this is expensive because it involves a lot of string manipulation + // protobuf has to parse the bytes and then convert them to Strings + // so we want to do it only once + // please note that when a selector references a property that is not + // in the message we would end up in scanning the whole list + metadata.getPropertiesList().forEach(p -> rawProperties.put(p.getKey(), p.getValue())); + } + + Object getProperty(String key) { + Object result = properties.get(key); + if (result == null) { + result = JMSFilter.getProperty(rawProperties, key); + if (result == null) { + properties.put(key, CACHED_NULL); + } else { + properties.put(key, result); + } + return result; + } + if (result == CACHED_NULL) { + return null; + } + return result; } } diff --git a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java index e55d5120..97486ac2 100644 --- a/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java +++ b/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java @@ -55,13 +55,40 @@ public static SelectorSupport build(String selector, boolean enabled) throws JMS return new SelectorSupport(parse, selector); } + private static class PropertiesCache implements Function { + final Function messagePropertiesAccessor; + private static final Object CACHED_NULL = new Object(); + + final Map cache = new HashMap<>(); + + public PropertiesCache(Function messagePropertiesAccessor) { + this.messagePropertiesAccessor = messagePropertiesAccessor; + } + + @Override + public Object apply(String s) { + Object result = cache.get(s); + if (result == null) { + result = messagePropertiesAccessor.apply(s); + if (result == null) { + cache.put(s, CACHED_NULL); + } else { + cache.put(s, result); + } + return result; + } + if (result == CACHED_NULL) { + return null; + } + return result; + } + } + public boolean matches(Function messagePropertiesAccessor) throws JMSException { - Map cache = new HashMap<>(); // this cache is important in order to be able to not parse Message Metadata more than once // for complex selectors that refer more times to the same property - Function messageProperties = - (name) -> cache.computeIfAbsent(name, messagePropertiesAccessor::apply); + PropertiesCache messageProperties = new PropertiesCache(messagePropertiesAccessor); // convert anything that can be used by the selector // https://github.com/apache/activemq/blob/d54d046b8a8f2e9e5c0a28e1f8c7634b3c8b18e4/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java#L35