Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMS Selectors: apply selectors on the write path (initial prototype) #123

Merged
merged 3 commits into from
Apr 11, 2024

Conversation

eolivelli
Copy link
Collaborator

@eolivelli eolivelli commented Apr 11, 2024

This change adds a new BrokerInterceptor that applies the Filters on the Write Path and acknowledges the message that don't match the filter.

With this change the backlog metric is more accurate and there is less work to do while dispatching the messages.
In the other hand, we are adding more load on the write path.

Please note that we still have to apply the filter while dispatching the messages because:

  • subscriptions may be created after the message as been written
  • in Pulsar you can "seek" and go back in time
  • message ack is always "best effort" (because Pulsar groups acks and they are not written synchronously in order to save resources)

How to enable this feature:

brokerInterceptorsDirectory=./interceptors
brokerInterceptors=jms-publish-filters
entryFiltersDirectory=./filters
entryFilterNames=jms

You also have to copy the jms filters .nar file into the 'interceptors' directory (the same file you put in the 'filters' directory)

@eolivelli eolivelli changed the title JMS Selectors: apply selectors on the write path JMS Selectors: apply selectors on the write path (initial prototype) Apr 11, 2024
@eolivelli eolivelli marked this pull request as ready for review April 11, 2024 09:41
@eolivelli eolivelli merged commit 2bd0fd9 into master Apr 11, 2024
2 of 3 checks passed
@eolivelli eolivelli deleted the preack-selectors branch April 11, 2024 12:30
Comment on lines +94 to +98
if (filterResult == EntryFilter.FilterResult.REJECT) {
String property = "filter-result-" + name + "@" + subscription.getTopicName();
publishContext.setProperty(property, filterResult);
publishContext.setProperty(JMS_FILTERED_PROPERTY, true);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A follow up optimization can use these fields to skip the post filtering step.

Comment on lines +125 to +131
// ir is possible that calling this method in this thread may affect performance
// let's keep it simple for now, we can optimize it later
subscription.acknowledgeMessage(
Collections.singletonList(new PositionImpl(ledgerId, entryId)),
CommandAck.AckType.Individual,
null);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure acknowledging the message should be fast. It might get slowed down by synchronization, but I would expect the pre-filtering to be the thing that is most disruptive.

@michaeljmarshall
Copy link
Member

Nice work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants