-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer supplied filters when producing, along with a producer filter #36
Conversation
When producing from the edge, the consumer may also supply filters. The changes here implement that by providing a watch channel for the upstream gRPC producer.
@@ -28,6 +28,7 @@ pub async fn task( | |||
// Establish a sink of envelopes that will be forwarded | |||
// on to a consumer via gRPC event producer. | |||
|
|||
let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]); | |||
let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10); | |||
|
|||
let grpc_producer = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated, isn't it confusing to reuse the same name grpc_producer
for a new thing, or is that a rust thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’d say that it is a more general personal style thing. Rust linting via clippy has no problem with it. Clippy is king. Perhaps I could use a different name here though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit 8730696 deals with this.
That is correct. We have not implemented dynamic consumer filter on jvm side for this case. We might do that later, but currently it can only be set at start. |
We have found that tags is a pretty good way to filter events (consumer defined filters, evaluated on producer side). Without tags those filters can only be on entity ids (exact or regex). I think we have been and still are uncertain about how useful consumer defined filters are for the case when edge is producer. I guess the consumer would be interested in all events from the edge in most cases, and the edge itself decides what to publish (producer defined filter). I would say that tag filters can be added when we have more important things in place. Also depends on effort, if it's a small thing to add tagging support we can just do it. Also note that topic filters is actually a specific type of tag filters https://doc.akka.io/docs/akka-projection/snapshot/grpc.html#topics |
What's your opinion on this @johanandren ? |
I'll just copy-paste what I said in Slack:
|
Do you feel the same now looking at the code? |
Yes, but I wonder if we could make all of the optional handler parameters a single setup-kinda thing, so that for the simplest case it would be That would somewhat align with the The general design idea is easy discoverability of opt-ins, but with no mental overhead needing to know all possible details when not used. |
I'll have a think on that a bit more... you can't have default params or overloaded functions in Rust, but you can have constrructors that return |
Note quite working yet.
If it's not too far from what Rust devs would expect ofc. my suggestions was mostly coming from being used to Scala/Java. It could be possible to do nicer in some very different way in rust, but the aim of not having to know all options up front, but easily being able to discover them, I think is more general. |
By conveying a PersistenceId instead of an EntityId for envelopes, we can reduce the places where we need to declare the event type.
Great. FYI, the code here will permit the filter to be updated in the future (it was easy and natural to cater for this). |
We now have a
Understood. |
I've now included an explicit I've separated out the declarations so that you can see how the let grpc_flow = GrpcEventFlow::new(entity_type, grpc_producer);
let producer_filter = |_: &CommitLogEventEnvelope<temperature::Event>| true;
let consumer_filter = Filter::default();
let event_handler = grpc_flow.handler(
producer_filter,
consumer_filters_receiver,
consumer_filter,
transformer,
); We have a separate issue to see if we can improve the DX on some of these calls. My present focus is to standup the full API. |
Was previously running the stream only once
We should always endeavour to avoid running out of heap
IncludeEntityIds { | ||
entity_id_offsets: Vec<EntityIdOffset>, | ||
IncludePersistenceIds { | ||
persistence_id_offsets: Vec<PersistenceIdIdOffset>, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened with EntityId vs PersistenceId?
The filters should be defined in terms of entity id on the consumer side.
Then there can be a mapping between stream id and entity type. This mapping is on the producer side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's obviously something I'm not understanding here. I took inspiration from the protobuf to filter conversions on the JVM: https://github.com/akka/akka-projection/blob/main/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ProtobufProtocolConversions.scala#L242-L285
And also more specifically here: https://github.com/akka/akka-projection/blob/6f13d3c378e4cbd6f2e7d214f25ce63dd902f85f/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala#L77
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be lost in the implementation details, but this is my understanding of how it should work.
edge is consumer
The user API to define a filter at the edge should be IncludeEntityIds, ExcludeEntityIds and corresponding Remove.
That is transformed pretty much 1:1 to the corresponding EntityId protobuf messages.
Those filters are evaluated on the jvm consumer side, so you don't have to care about mapping between streamId and entityType for this case.
edge is producer
The incoming protobuf filters are IncludeEntityIds and ExcludeEntityIds. When filtering the events from the journal you have to filter on the EntityType and EntityId. You know the EntityType on the edge producer side, because that was defined when setting up the producer push stream. That producer push stream should only emit events with the given EntityType to begin with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the offline discussions. Commit d2e1550 restores things back to filtering by entity ids.
akka-projection-rs-grpc/src/lib.rs
Outdated
EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset, | ||
}; | ||
use akka_projection_rs::consumer_filter::{ComparableRegex, FilterCriteria, PersistenceIdIdOffset}; | ||
use mqtt::TopicFilter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we don't have tags (yet) I'm not sure we should add the mqtt dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's the potential of supporting tags in the future i.e. if we're still unsure, I'd prefer to retain the code. I've now hidden the MQTT types from the developer though as per commit 0fc0f50.
/// Remove a previously added `IncludeTopics`. | ||
RemoveIncludeTopics { expressions: Vec<TopicMatcher> }, | ||
RemoveIncludeTopics { expressions: Vec<TopicFilter> }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should tie this to the mqtt dependency. I picked mqtt for the semantics of the topic filter wildcards but we don't aim for speaking mqtt protocol in any way.
I can understand that you added the library instead of implementing the evaluation of the topic filter, but I think we should keep that as an implementation detail (not exposed to user).
Note that these filters are also used for the case where edge is consumer, and filter is evaluated by (jvm) producer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MQTT dependency is now isolated to one place and the TopicFilter
type has been wrapped with our own TopicMatcher
. We are therefore free to change its implementation if required.
Commit 0fc0f50 resolves this.
The MQTT TopicFilter is hidden to the outside so that we can freely change it, as we're not necessarily supporting all of MQTT's capabilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
When producing from the edge, the consumer may also supply filters. The changes here implement that by providing a watch channel for the upstream gRPC producer. Please start with the sample
temperature_production.rs
file changes; the addition of conveying a criteria watch channel is consistent with when the edge is consuming, although in this instance it cannot be optional. We do not know if the remote consumer will be supplying criteria or not, hence the channel being mandatory.A producer filter is also catered for.
Filters are applied prior to any event transformation and intentionally result in
seq_nr
gaps when events go unmatched. The consuming side has previously been enhanced to expect these gaps.TODO:
consumer_filter
matches
functionconsumer_filter
matches
functionQuestions/thoughts/issues:
We can add afilter
parameter to thegrpc_producer.handler(consumer_filters_receiver, transformer)
method if we want, or just have thetransformer
double-up as a producer-side filter.It appears that a consumer can only send a filter upon starting the protocol, and not subsequently. Is this correct?We don't presently support tags in akka-edge-rs, and so we can't filter against them. Should we support tags? (I recall an early conversation that we wouldn't, but I may have misheard that).I've just noticed that the JVM code holds the notion of merging and diffing filters. I'll need more of an explanation on that. Right now, we simply merge filter commands with the current filter state, and remember this filter state across connections.I haven't been able to find tests that test out the filter matching.Fixes #25