Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer supplied filters when producing, along with a producer filter #36

Merged
merged 19 commits into from
Oct 10, 2023

Conversation

huntc
Copy link
Collaborator

@huntc huntc commented Oct 5, 2023

When producing from the edge, the consumer may also supply filters. The changes here implement that by providing a watch channel for the upstream gRPC producer. Please start with the sample temperature_production.rs file changes; the addition of conveying a criteria watch channel is consistent with when the edge is consuming, although in this instance it cannot be optional. We do not know if the remote consumer will be supplying criteria or not, hence the channel being mandatory.

A producer filter is also catered for.

Filters are applied prior to any event transformation and intentionally result in seq_nr gaps when events go unmatched. The consuming side has previously been enhanced to expect these gaps.

TODO:

  • Implement the consumer_filter matches function
  • Tests for the consumer_filter matches function

Questions/thoughts/issues:

  1. We can add a filter parameter to the grpc_producer.handler(consumer_filters_receiver, transformer) method if we want, or just have the transformer double-up as a producer-side filter.
  2. It appears that a consumer can only send a filter upon starting the protocol, and not subsequently. Is this correct?
  3. We don't presently support tags in akka-edge-rs, and so we can't filter against them. Should we support tags? (I recall an early conversation that we wouldn't, but I may have misheard that).
  4. I've just noticed that the JVM code holds the notion of merging and diffing filters. I'll need more of an explanation on that. Right now, we simply merge filter commands with the current filter state, and remember this filter state across connections.
  5. I haven't been able to find tests that test out the filter matching.

Fixes #25

When producing from the edge, the consumer may also supply filters. The changes here implement that by providing a watch channel for the upstream gRPC producer.
@@ -28,6 +28,7 @@ pub async fn task(
// Establish a sink of envelopes that will be forwarded
// on to a consumer via gRPC event producer.

let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]);
let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10);

let grpc_producer =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, isn't it confusing to reuse the same name grpc_producer for a new thing, or is that a rust thing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d say that it is a more general personal style thing. Rust linting via clippy has no problem with it. Clippy is king. Perhaps I could use a different name here though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit 8730696 deals with this.

@patriknw
Copy link
Member

patriknw commented Oct 6, 2023

  1. It appears that a consumer can only send a filter upon starting the protocol, and not subsequently. Is this correct?

That is correct. We have not implemented dynamic consumer filter on jvm side for this case. We might do that later, but currently it can only be set at start.

@patriknw
Copy link
Member

patriknw commented Oct 6, 2023

  1. We don't presently support tags in akka-edge-rs, and so we can't filter against them. Should we support tags? (I recall an early conversation that we wouldn't, but I may have misheard that).

We have found that tags is a pretty good way to filter events (consumer defined filters, evaluated on producer side). Without tags those filters can only be on entity ids (exact or regex). I think we have been and still are uncertain about how useful consumer defined filters are for the case when edge is producer. I guess the consumer would be interested in all events from the edge in most cases, and the edge itself decides what to publish (producer defined filter). I would say that tag filters can be added when we have more important things in place. Also depends on effort, if it's a small thing to add tagging support we can just do it.

Also note that topic filters is actually a specific type of tag filters https://doc.akka.io/docs/akka-projection/snapshot/grpc.html#topics
Topic is not the same concept as a streambed topic.

@patriknw
Copy link
Member

patriknw commented Oct 6, 2023

  1. We can add a filter parameter to the grpc_producer.handler(consumer_filters_receiver, transformer) method if we want, or just have the transformer double-up as a producer-side filter.

What's your opinion on this @johanandren ?
For producer push I can agree that transformation and producerFilter looks very similar and we will handle them in same way (for -rs we don't have to send the FilteredEvent since gaps will be filled on other side)

@johanandren
Copy link
Member

I'll just copy-paste what I said in Slack:

I think it would be good if the API aligns with the Akka side and has that (producer filter) as well, to figure out that something called transformer can also be used for filtering is also a form of complexity to push to the developer IMO. The producer filter is optional so shouldn't be needed to define (or think about) unless the dev wants to.

@huntc
Copy link
Collaborator Author

huntc commented Oct 6, 2023

I'll just copy-paste what I said in Slack:

I think it would be good if the API aligns with the Akka side and has that (producer filter) as well, to figure out that something called transformer can also be used for filtering is also a form of complexity to push to the developer IMO. The producer filter is optional so shouldn't be needed to define (or think about) unless the dev wants to.

Do you feel the same now looking at the code?

@johanandren
Copy link
Member

johanandren commented Oct 6, 2023

Yes, but I wonder if we could make all of the optional handler parameters a single setup-kinda thing, so that for the simplest case it would be grpc_flow.handler(None) or something like a factory with an empty default grpc_flow.handler(options_empty) and then you could build from that for the case where you want filtering grpc_flow.handler(options_empty.with_producer_filter(...).with_event_transform(transform)

That would somewhat align with the EventProducerSource we use for setup in the Akka JVM side of things.

The general design idea is easy discoverability of opt-ins, but with no mental overhead needing to know all possible details when not used.

@huntc
Copy link
Collaborator Author

huntc commented Oct 6, 2023

Yes, but I wonder if we could make all of the optional handler parameters a single setup-kinda thing, so that for the simplest case it would be grpc_flow.handler(None) or something like a factory with an empty default grpc_flow.handler(options_empty) and then you could build from that for the case where you want filtering grpc_flow.handler(options_empty.with_producer_filter(...).with_event_transform(transform)

That would somewhat align with the EventProducerSource we use for setup in the Akka JVM side of things.

The general design idea is easy discoverability of opt-ins, but with no mental overhead needing to know all possible details when not used.

I'll have a think on that a bit more... you can't have default params or overloaded functions in Rust, but you can have constrructors that return Self and then have a with_ style thing.

@johanandren
Copy link
Member

If it's not too far from what Rust devs would expect ofc. my suggestions was mostly coming from being used to Scala/Java. It could be possible to do nicer in some very different way in rust, but the aim of not having to know all options up front, but easily being able to discover them, I think is more general.

huntc added 3 commits October 7, 2023 13:06
By conveying a PersistenceId instead of an EntityId for envelopes, we can reduce the places where we need to declare the event type.
@huntc
Copy link
Collaborator Author

huntc commented Oct 7, 2023

  1. It appears that a consumer can only send a filter upon starting the protocol, and not subsequently. Is this correct?

That is correct. We have not implemented dynamic consumer filter on jvm side for this case. We might do that later, but currently it can only be set at start.

Great. FYI, the code here will permit the filter to be updated in the future (it was easy and natural to cater for this).

@huntc
Copy link
Collaborator Author

huntc commented Oct 7, 2023

  1. We don't presently support tags in akka-edge-rs, and so we can't filter against them. Should we support tags? (I recall an early conversation that we wouldn't, but I may have misheard that).

We have found that tags is a pretty good way to filter events (consumer defined filters, evaluated on producer side). Without tags those filters can only be on entity ids (exact or regex). I think we have been and still are uncertain about how useful consumer defined filters are for the case when edge is producer. I guess the consumer would be interested in all events from the edge in most cases, and the edge itself decides what to publish (producer defined filter). I would say that tag filters can be added when we have more important things in place. Also depends on effort, if it's a small thing to add tagging support we can just do it.

We now have a WithTags trait for envelopes. At present, it'll always return an empty vector. I'll create a separate issue to circle back on considering the utility of tags at the edge. UPDATE: Issue: #38

Also note that topic filters is actually a specific type of tag filters https://doc.akka.io/docs/akka-projection/snapshot/grpc.html#topics Topic is not the same concept as a streambed topic.

Understood.

@huntc
Copy link
Collaborator Author

huntc commented Oct 7, 2023

Yes, but I wonder if we could make all of the optional handler parameters a single setup-kinda thing, ...

I've now included an explicit producer_filter parameter via commit 715c7b4.

I've separated out the declarations so that you can see how the grpc_flow handler is constructed:

    let grpc_flow = GrpcEventFlow::new(entity_type, grpc_producer);
    let producer_filter = |_: &CommitLogEventEnvelope<temperature::Event>| true;
    let consumer_filter = Filter::default();
    let event_handler = grpc_flow.handler(
        producer_filter,
        consumer_filters_receiver,
        consumer_filter,
        transformer,
    );

We have a separate issue to see if we can improve the DX on some of these calls. My present focus is to standup the full API.

@huntc huntc marked this pull request as ready for review October 7, 2023 06:05
@huntc huntc self-assigned this Oct 7, 2023
@huntc huntc added the enhancement New feature or request label Oct 7, 2023
@huntc huntc changed the title Consumer supplied filters when producing Consumer supplied filters when producing, along with a producer filter Oct 7, 2023
huntc added 5 commits October 8, 2023 16:43
Was previously running the stream only once
We should always endeavour to avoid running out of heap
IncludeEntityIds {
entity_id_offsets: Vec<EntityIdOffset>,
IncludePersistenceIds {
persistence_id_offsets: Vec<PersistenceIdIdOffset>,
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened with EntityId vs PersistenceId?
The filters should be defined in terms of entity id on the consumer side.

Then there can be a mapping between stream id and entity type. This mapping is on the producer side.

Copy link
Collaborator Author

@huntc huntc Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be lost in the implementation details, but this is my understanding of how it should work.

edge is consumer

The user API to define a filter at the edge should be IncludeEntityIds, ExcludeEntityIds and corresponding Remove.
That is transformed pretty much 1:1 to the corresponding EntityId protobuf messages.

Those filters are evaluated on the jvm consumer side, so you don't have to care about mapping between streamId and entityType for this case.

edge is producer

The incoming protobuf filters are IncludeEntityIds and ExcludeEntityIds. When filtering the events from the journal you have to filter on the EntityType and EntityId. You know the EntityType on the edge producer side, because that was defined when setting up the producer push stream. That producer push stream should only emit events with the given EntityType to begin with.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the offline discussions. Commit d2e1550 restores things back to filtering by entity ids.

EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset,
};
use akka_projection_rs::consumer_filter::{ComparableRegex, FilterCriteria, PersistenceIdIdOffset};
use mqtt::TopicFilter;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we don't have tags (yet) I'm not sure we should add the mqtt dependency.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's the potential of supporting tags in the future i.e. if we're still unsure, I'd prefer to retain the code. I've now hidden the MQTT types from the developer though as per commit 0fc0f50.

/// Remove a previously added `IncludeTopics`.
RemoveIncludeTopics { expressions: Vec<TopicMatcher> },
RemoveIncludeTopics { expressions: Vec<TopicFilter> },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should tie this to the mqtt dependency. I picked mqtt for the semantics of the topic filter wildcards but we don't aim for speaking mqtt protocol in any way.

I can understand that you added the library instead of implementing the evaluation of the topic filter, but I think we should keep that as an implementation detail (not exposed to user).

Note that these filters are also used for the case where edge is consumer, and filter is evaluated by (jvm) producer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MQTT dependency is now isolated to one place and the TopicFilter type has been wrapped with our own TopicMatcher. We are therefore free to change its implementation if required.

Commit 0fc0f50 resolves this.

huntc added 2 commits October 10, 2023 10:21
The MQTT TopicFilter is hidden to the outside so that we can freely change it, as we're not necessarily supporting all of MQTT's capabilities.
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@huntc huntc merged commit f12f526 into main Oct 10, 2023
1 check passed
@huntc huntc deleted the consumer-filter branch October 10, 2023 21:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Need to consider filters
3 participants