Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMSPublishFilters save memory allocations and CPU cycles by caching null values for non-existant properties #147

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,7 @@ private static class PropertyEvaluator implements Function<String, Object> {

private Object getProperty(String name) {
if (messageMetadataCache != null) {
return messageMetadataCache.getProperty(
name, n -> JMSFilter.getProperty(propertiesCount, propertiesList, n));
return messageMetadataCache.getProperty(name);
}
return JMSFilter.getProperty(propertiesCount, propertiesList, name);
}
Expand Down Expand Up @@ -729,6 +728,20 @@ private static Object getProperty(
return getObjectProperty(value, type);
}

static Object getProperty(Map<String, String> cacheProperties, String name) {
if (cacheProperties.isEmpty()) {
return null;
}
// we don't write the type for system fields
// we pre-compute the type in order to avoid to scan the list to fine the type
String type = SYSTEM_PROPERTIES_TYPES.get(name);
if (type == null) {
type = propertyType(name);
}
String value = cacheProperties.get(name);
return getObjectProperty(value, type);
}

@Override
public void close() {
selectors.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ private void filterAndAckMessage(
// if we have more than one subscription we can save a lot of resources by caching the
// properties
MessageMetadataCache messageMetadataCache =
subscriptions.size() > 1 ? new MessageMetadataCache() : null;
subscriptions.size() > 1 ? new MessageMetadataCache(messageMetadata) : null;
for (Subscription subscription : subscriptions) {
if (closed.get()) {
// the broker is shutting down, we cannot process the entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,36 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.pulsar.common.api.proto.MessageMetadata;

class MessageMetadataCache {
final class MessageMetadataCache {
final Map<String, String> rawProperties = new HashMap<>();
final Map<String, Object> properties = new HashMap<>();
private static final Object CACHED_NULL = new Object();

Object getProperty(String key, Function<String, Object> compute) {
return properties.computeIfAbsent(key, compute);
MessageMetadataCache(MessageMetadata metadata) {
// computing this is expensive because it involves a lot of string manipulation
// protobuf has to parse the bytes and then convert them to Strings
// so we want to do it only once
// please note that when a selector references a property that is not
// in the message we would end up in scanning the whole list
metadata.getPropertiesList().forEach(p -> rawProperties.put(p.getKey(), p.getValue()));
}

Object getProperty(String key) {
Object result = properties.get(key);
if (result == null) {
result = JMSFilter.getProperty(rawProperties, key);
if (result == null) {
properties.put(key, CACHED_NULL);
} else {
properties.put(key, result);
}
return result;
}
if (result == CACHED_NULL) {
return null;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,40 @@ public static SelectorSupport build(String selector, boolean enabled) throws JMS
return new SelectorSupport(parse, selector);
}

private static class PropertiesCache implements Function<String, Object> {
final Function<String, Object> messagePropertiesAccessor;
private static final Object CACHED_NULL = new Object();

final Map<String, Object> cache = new HashMap<>();

public PropertiesCache(Function<String, Object> messagePropertiesAccessor) {
this.messagePropertiesAccessor = messagePropertiesAccessor;
}

@Override
public Object apply(String s) {
Object result = cache.get(s);
if (result == null) {
result = messagePropertiesAccessor.apply(s);
if (result == null) {
cache.put(s, CACHED_NULL);
} else {
cache.put(s, result);
}
return result;
}
if (result == CACHED_NULL) {
return null;
}
return result;
}
}

public boolean matches(Function<String, Object> messagePropertiesAccessor) throws JMSException {
Map<String, Object> cache = new HashMap<>();

// this cache is important in order to be able to not parse Message Metadata more than once
// for complex selectors that refer more times to the same property
Function<String, Object> messageProperties =
(name) -> cache.computeIfAbsent(name, messagePropertiesAccessor::apply);
PropertiesCache messageProperties = new PropertiesCache(messagePropertiesAccessor);

// convert anything that can be used by the selector
// https://github.com/apache/activemq/blob/d54d046b8a8f2e9e5c0a28e1f8c7634b3c8b18e4/activemq-client/src/main/java/org/apache/activemq/filter/PropertyExpression.java#L35
Expand Down
Loading