From 10e97e54c63281f0436855685699446be0488a2f Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 5 Oct 2023 20:29:51 +1100 Subject: [PATCH 01/19] Consumer supplied filters when producing When producing from the edge, the consumer may also supply filters. The changes here implement that by providing a watch channel for the upstream gRPC producer. --- akka-projection-rs-grpc/src/lib.rs | 111 +++++++++++++++++- akka-projection-rs-grpc/src/producer.rs | 27 ++++- akka-projection-rs/src/consumer_filter.rs | 11 +- .../backend/src/temperature_production.rs | 6 +- 4 files changed, 147 insertions(+), 8 deletions(-) diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs index 63e845d..65822e7 100644 --- a/akka-projection-rs-grpc/src/lib.rs +++ b/akka-projection-rs-grpc/src/lib.rs @@ -1,7 +1,11 @@ #![doc = include_str!("../README.md")] -use akka_persistence_rs::{Offset, PersistenceId, TimestampOffset, WithOffset}; -use akka_projection_rs::consumer_filter::{EntityIdOffset, FilterCriteria}; +use akka_persistence_rs::{ + EntityId, Offset, PersistenceId, TimestampOffset, WithEntityId, WithOffset, +}; +use akka_projection_rs::consumer_filter::{ + EntityIdMatcher, EntityIdOffset, FilterCriteria, Tag, TopicMatcher, +}; use smol_str::SmolStr; pub mod consumer; @@ -17,6 +21,12 @@ pub struct EventEnvelope { pub offset: TimestampOffset, } +impl WithEntityId for EventEnvelope { + fn entity_id(&self) -> EntityId { + self.persistence_id.entity_id.clone() + } +} + impl WithOffset for EventEnvelope { fn offset(&self) -> Offset { Offset::Timestamp(self.offset.clone()) @@ -133,3 +143,100 @@ impl From for proto::FilterCriteria { } } } + +/// Declares that a protobuf criteria is unable to be converted +/// due to there being no message. +pub struct NoMessage; + +impl TryFrom for FilterCriteria { + type Error = NoMessage; + + fn try_from(value: proto::FilterCriteria) -> Result { + match value.message { + Some(message) => { + let criteria = match message { + proto::filter_criteria::Message::ExcludeTags(proto::ExcludeTags { tags }) => { + FilterCriteria::ExcludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + } + } + proto::filter_criteria::Message::RemoveExcludeTags( + proto::RemoveExcludeTags { tags }, + ) => FilterCriteria::RemoveExcludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + }, + proto::filter_criteria::Message::IncludeTags(proto::IncludeTags { tags }) => { + FilterCriteria::IncludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + } + } + proto::filter_criteria::Message::RemoveIncludeTags( + proto::RemoveIncludeTags { tags }, + ) => FilterCriteria::RemoveIncludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + }, + proto::filter_criteria::Message::ExcludeMatchingEntityIds( + proto::ExcludeRegexEntityIds { matching }, + ) => FilterCriteria::ExcludeRegexEntityIds { + matching: matching.into_iter().map(EntityIdMatcher::from).collect(), + }, + proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( + proto::RemoveExcludeRegexEntityIds { matching }, + ) => FilterCriteria::RemoveExcludeRegexEntityIds { + matching: matching.into_iter().map(EntityIdMatcher::from).collect(), + }, + proto::filter_criteria::Message::IncludeMatchingEntityIds( + proto::IncludeRegexEntityIds { matching }, + ) => FilterCriteria::IncludeRegexEntityIds { + matching: matching.into_iter().map(EntityIdMatcher::from).collect(), + }, + proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( + proto::RemoveIncludeRegexEntityIds { matching }, + ) => FilterCriteria::RemoveIncludeRegexEntityIds { + matching: matching.into_iter().map(EntityIdMatcher::from).collect(), + }, + proto::filter_criteria::Message::ExcludeEntityIds( + proto::ExcludeEntityIds { entity_ids }, + ) => FilterCriteria::ExcludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::RemoveExcludeEntityIds( + proto::RemoveExcludeEntityIds { entity_ids }, + ) => FilterCriteria::RemoveExcludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::IncludeEntityIds( + proto::IncludeEntityIds { entity_id_offset }, + ) => FilterCriteria::IncludeEntityIds { + entity_id_offsets: entity_id_offset + .into_iter() + .map( + |proto::EntityIdOffset { entity_id, seq_nr }| EntityIdOffset { + entity_id: EntityId::from(entity_id), + seq_nr: seq_nr as u64, + }, + ) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeEntityIds( + proto::RemoveIncludeEntityIds { entity_ids }, + ) => FilterCriteria::RemoveIncludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::IncludeTopics(proto::IncludeTopics { + expression, + }) => FilterCriteria::IncludeTopics { + expressions: expression.into_iter().map(TopicMatcher::from).collect(), + }, + proto::filter_criteria::Message::RemoveIncludeTopics( + proto::RemoveIncludeTopics { expression }, + ) => FilterCriteria::RemoveIncludeTopics { + expressions: expression.into_iter().map(TopicMatcher::from).collect(), + }, + }; + Ok(criteria) + } + None => Err(NoMessage), + } + } +} diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index fe84afc..f29de04 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -5,10 +5,13 @@ use akka_persistence_rs::TimestampOffset; use akka_persistence_rs::WithEntityId; use akka_persistence_rs::WithSeqNr; use akka_persistence_rs::WithTimestampOffset; +use akka_projection_rs::consumer_filter; +use akka_projection_rs::consumer_filter::FilterCriteria; use akka_projection_rs::HandlerError; use akka_projection_rs::PendingHandler; use async_stream::stream; use async_trait::async_trait; +use futures::future; use log::debug; use log::warn; use prost::Name; @@ -20,6 +23,7 @@ use std::marker::PhantomData; use std::pin::Pin; use tokio::sync::mpsc; use tokio::sync::oneshot; +use tokio::sync::watch; use tokio_stream::StreamExt; use tonic::transport::Channel; use tonic::Request; @@ -45,6 +49,7 @@ where F: Fn(&EI) -> Option, { producer: GrpcEventProducer, + consumer_filters_receiver: watch::Receiver>, transformer: F, phantom: PhantomData, } @@ -64,7 +69,14 @@ where &mut self, envelope: Self::Envelope, ) -> Result> + Send>>, HandlerError> { - let event = (self.transformer)(&envelope); + let event = if consumer_filter::matches(&envelope, &self.consumer_filters_receiver.borrow()) + { + (self.transformer)(&envelope) + } else { + // Gaps are ok given a filter situation. + return Ok(Box::pin(future::ready(Ok(())))); + }; + let transformation = Transformation { entity_id: envelope.entity_id(), seq_nr: envelope.seq_nr(), @@ -99,12 +111,17 @@ impl GrpcEventProducer { } } - pub fn handler(self, transformer: F) -> GrpcEventProcessor + pub fn handler( + self, + consumer_filters_receiver: watch::Receiver>, + transformer: F, + ) -> GrpcEventProcessor where F: Fn(&EI) -> Option, { GrpcEventProcessor { producer: self, + consumer_filters_receiver, transformer, phantom: PhantomData, } @@ -142,6 +159,7 @@ pub async fn run( event_consumer_channel: EC, origin_id: StreamId, stream_id: StreamId, + consumer_filters: watch::Sender>, mut envelopes: mpsc::Receiver<(EventEnvelope, oneshot::Sender<()>)>, mut kill_switch: oneshot::Receiver<()>, ) where @@ -241,8 +259,9 @@ pub async fn run( Some(Ok(proto::ConsumeEventOut { message: Some(message), })) = stream_outs.next() => match message { - proto::consume_event_out::Message::Start(proto::ConsumerEventStart { .. }) => { + proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter }) => { debug!("Starting the protocol"); + consumer_filters.send(filter.into_iter().flat_map(|f| f.try_into()).collect()).unwrap(); break; } _ => { @@ -408,6 +427,7 @@ mod tests { .unwrap(); }); + let (consumer_filters, _) = watch::channel(vec![]); let (sender, receiver) = mpsc::channel(10); let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); tokio::spawn(async move { @@ -416,6 +436,7 @@ mod tests { || channel.connect(), OriginId::from("some-origin-id"), StreamId::from("some-stream-id"), + consumer_filters, receiver, task_kill_switch_receiver, ) diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index c6214bc..078b6bc 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -25,7 +25,7 @@ //! * IncludeRegexEntityIds - include events for entities with entity ids matching the given regular expressions //! * IncludeEntityIds - include events for entities with the given entity ids -use akka_persistence_rs::EntityId; +use akka_persistence_rs::{EntityId, WithEntityId}; use smol_str::SmolStr; #[derive(Clone)] @@ -100,3 +100,12 @@ pub fn exclude_all() -> FilterCriteria { matching: vec![EntityIdMatcher::from(".*")], } } + +/// A function that matches an envelope with criteria and passes it through if matched. +pub fn matches(_envelope: &E, _consumer_filters: &[FilterCriteria]) -> bool +where + E: WithEntityId, +{ + // TODO + todo!() +} diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index 01d77de..6ef1418 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -12,7 +12,7 @@ use std::{path::PathBuf, time::Duration}; use streambed::commit_log::Topic; use streambed_confidant::FileSecretStore; use streambed_logged::FileLog; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tonic::transport::{Channel, Uri}; // Apply sensor observations to a remote consumer. @@ -28,6 +28,7 @@ pub async fn task( // Establish a sink of envelopes that will be forwarded // on to a consumer via gRPC event producer. + let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]); let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10); let grpc_producer = @@ -40,6 +41,7 @@ pub async fn task( || channel.connect(), OriginId::from("edge-iot-service"), StreamId::from("temperature-events"), + consumer_filters, grpc_producer_receiver, task_kill_switch_receiver, ) @@ -89,7 +91,7 @@ pub async fn task( &state_storage_path, kill_switch, source_provider, - grpc_producer.handler(transformer), + grpc_producer.handler(consumer_filters_receiver, transformer), Duration::from_millis(100), ) .await From 87306966cf8e5c53492d149cc11accdd677a68b8 Mon Sep 17 00:00:00 2001 From: huntc Date: Fri, 6 Oct 2023 11:13:56 +1100 Subject: [PATCH 02/19] Possibly better name for flowing gRPC events --- akka-projection-rs-grpc/src/producer.rs | 12 ++++++------ .../backend/src/temperature_production.rs | 9 ++++----- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index f29de04..b66436c 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -48,7 +48,7 @@ pub struct GrpcEventProcessor where F: Fn(&EI) -> Option, { - producer: GrpcEventProducer, + flow: GrpcEventFlow, consumer_filters_receiver: watch::Receiver>, transformer: F, phantom: PhantomData, @@ -83,7 +83,7 @@ where offset: envelope.timestamp_offset(), event, }; - let result = self.producer.process(transformation).await; + let result = self.flow.process(transformation).await; if let Ok(receiver_reply) = result { Ok(Box::pin(async { receiver_reply.await.map_err(|_| HandlerError) @@ -94,13 +94,13 @@ where } } -/// Produce gRPC events given a user-supplied transformation function. -pub struct GrpcEventProducer { +/// Transform and forward gRPC events given a user-supplied transformation function. +pub struct GrpcEventFlow { entity_type: EntityType, grpc_producer: mpsc::Sender<(EventEnvelope, oneshot::Sender<()>)>, } -impl GrpcEventProducer { +impl GrpcEventFlow { pub fn new( entity_type: EntityType, grpc_producer: mpsc::Sender<(EventEnvelope, oneshot::Sender<()>)>, @@ -120,7 +120,7 @@ impl GrpcEventProducer { F: Fn(&EI) -> Option, { GrpcEventProcessor { - producer: self, + flow: self, consumer_filters_receiver, transformer, phantom: PhantomData, diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index 6ef1418..f1f1b72 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -5,7 +5,7 @@ use crate::temperature::{self, EventEnvelopeMarshaler}; use akka_persistence_rs::EntityType; use akka_persistence_rs_commitlog::EventEnvelope as CommitLogEventEnvelope; use akka_projection_rs_commitlog::CommitLogSourceProvider; -use akka_projection_rs_grpc::producer::GrpcEventProducer; +use akka_projection_rs_grpc::producer::GrpcEventFlow; use akka_projection_rs_grpc::{OriginId, StreamId}; use std::sync::Arc; use std::{path::PathBuf, time::Duration}; @@ -31,8 +31,7 @@ pub async fn task( let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]); let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10); - let grpc_producer = - GrpcEventProducer::new(EntityType::from(temperature::ENTITY_TYPE), grpc_producer); + let grpc_flow = GrpcEventFlow::new(EntityType::from(temperature::ENTITY_TYPE), grpc_producer); let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); tokio::spawn(async { @@ -80,7 +79,7 @@ pub async fn task( // to remember the offset consumed from the commit log. This then // permits us to restart from a specific point in the source given // restarts. - // A handler is formed from the gRPC producer. This handler will + // A handler is formed from the gRPC flow. This handler will // call upon the transformer function to, in turn, produce the // gRPC events to a remote consumer. The handler is a "flowing" one // where an upper limit of the number of envelopes in-flight is set. @@ -91,7 +90,7 @@ pub async fn task( &state_storage_path, kill_switch, source_provider, - grpc_producer.handler(consumer_filters_receiver, transformer), + grpc_flow.handler(consumer_filters_receiver, transformer), Duration::from_millis(100), ) .await From 3384e04a32f453e68d12833b5a2cd78e740d1ec6 Mon Sep 17 00:00:00 2001 From: huntc Date: Fri, 6 Oct 2023 20:33:04 +1100 Subject: [PATCH 03/19] Beginnings of fleshing out the filtering further Note quite working yet. --- Cargo.toml | 2 + akka-persistence-rs-commitlog/src/lib.rs | 10 +- akka-persistence-rs/src/lib.rs | 18 +- akka-projection-rs-grpc/Cargo.toml | 2 + akka-projection-rs-grpc/src/consumer.rs | 8 +- akka-projection-rs-grpc/src/lib.rs | 261 ++++++++++-------- akka-projection-rs-grpc/src/producer.rs | 34 ++- akka-projection-rs/Cargo.toml | 2 + akka-projection-rs/src/consumer_filter.rs | 193 ++++++++++--- .../backend/src/temperature_production.rs | 9 +- 10 files changed, 378 insertions(+), 161 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b8add91..9b3bf59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,11 +32,13 @@ itoa = "1.0" js-sys = "0.3.60" log = "0.4.17" lru = "0.11.0" +mqtt-protocol = "0.11.2" postcard = { version = "1.0.6", default-features = false } prost = { version = "0.12.0" } prost-build = { version = "0.12.0" } prost-types = { version = "0.12.0" } rand = "0.8" +regex = "1.9.6" scopeguard = "1.1" serde = "1.0.151" serde_json = "1.0.107" diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 86236d4..7f6421e 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -2,7 +2,8 @@ use akka_persistence_rs::{ entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider}, - EntityId, Offset, TimestampOffset, WithEntityId, WithOffset, WithSeqNr, WithTimestampOffset, + EntityId, Offset, TimestampOffset, WithEntityId, WithOffset, WithSeqNr, WithTags, + WithTimestampOffset, }; use async_stream::stream; use async_trait::async_trait; @@ -67,6 +68,13 @@ impl WithSeqNr for EventEnvelope { } } +impl WithTags for EventEnvelope { + fn tags(&self) -> Vec { + // TODO We need to support tags + vec![] + } +} + impl WithTimestampOffset for EventEnvelope { fn timestamp_offset(&self) -> TimestampOffset { TimestampOffset { diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index 5017c48..db6d341 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -9,22 +9,36 @@ use std::{ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use smol_str::SmolStr; pub mod effect; pub mod entity; pub mod entity_manager; /// Uniquely identifies the type of an Entity. -pub type EntityType = smol_str::SmolStr; +pub type EntityType = SmolStr; /// Uniquely identifies an entity, or entity instance. -pub type EntityId = smol_str::SmolStr; +pub type EntityId = SmolStr; + +/// Tags annotate an entity's events +pub type Tag = SmolStr; /// Implemented by structures that can return an entity id. pub trait WithEntityId { fn entity_id(&self) -> EntityId; } +/// Implemented by structures that can return a persistence id. +pub trait WithPersistenceId { + fn persistence_id(&self) -> PersistenceId; +} + +/// Implemented by structures that can return tags. +pub trait WithTags { + fn tags(&self) -> Vec; +} + /// A slice is deterministically defined based on the persistence id. /// `NUMBER_OF_SLICES` is not configurable because changing the value would result in /// different slice for a persistence id than what was used before, which would diff --git a/akka-projection-rs-grpc/Cargo.toml b/akka-projection-rs-grpc/Cargo.toml index a5e5a49..04b236f 100644 --- a/akka-projection-rs-grpc/Cargo.toml +++ b/akka-projection-rs-grpc/Cargo.toml @@ -11,8 +11,10 @@ chrono = { workspace = true } exponential-backoff = { workspace = true } futures = { workspace = true } log = { workspace = true } +mqtt-protocol = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +regex = { workspace = true } smol_str = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index 66015eb..085c9fd 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -297,7 +297,7 @@ mod tests { use super::*; use akka_persistence_rs::{EntityId, EntityType, PersistenceId}; - use akka_projection_rs::consumer_filter::{self, EntityIdOffset}; + use akka_projection_rs::consumer_filter::{self, PersistenceIdIdOffset}; use async_stream::stream; use chrono::{DateTime, Utc}; use prost_types::Any; @@ -491,9 +491,9 @@ mod tests { }); let (consumer_filters, consumer_filters_receiver) = - watch::channel(vec![FilterCriteria::IncludeEntityIds { - entity_id_offsets: vec![EntityIdOffset { - entity_id: entity_id.clone(), + watch::channel(vec![FilterCriteria::IncludePersistenceIds { + persistence_id_offsets: vec![PersistenceIdIdOffset { + persistence_id: persistence_id.clone(), seq_nr: 0, }], }]); diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs index 65822e7..9eaa20f 100644 --- a/akka-projection-rs-grpc/src/lib.rs +++ b/akka-projection-rs-grpc/src/lib.rs @@ -1,11 +1,11 @@ #![doc = include_str!("../README.md")] use akka_persistence_rs::{ - EntityId, Offset, PersistenceId, TimestampOffset, WithEntityId, WithOffset, -}; -use akka_projection_rs::consumer_filter::{ - EntityIdMatcher, EntityIdOffset, FilterCriteria, Tag, TopicMatcher, + EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithEntityId, WithOffset, }; +use akka_projection_rs::consumer_filter::{FilterCriteria, PersistenceIdIdOffset}; +use mqtt::TopicFilter; +use regex::Regex; use smol_str::SmolStr; pub mod consumer; @@ -95,35 +95,47 @@ impl From for proto::FilterCriteria { }, ) } - FilterCriteria::ExcludeEntityIds { entity_ids } => { + FilterCriteria::ExcludePersistenceIds { persistence_ids } => { proto::filter_criteria::Message::ExcludeEntityIds(proto::ExcludeEntityIds { - entity_ids: entity_ids.into_iter().map(|v| v.to_string()).collect(), + entity_ids: persistence_ids + .into_iter() + .map(|v| v.entity_id.to_string()) + .collect(), }) } - FilterCriteria::RemoveExcludeEntityIds { entity_ids } => { + FilterCriteria::RemoveExcludePersistenceIds { persistence_ids } => { proto::filter_criteria::Message::RemoveExcludeEntityIds( proto::RemoveExcludeEntityIds { - entity_ids: entity_ids.into_iter().map(|v| v.to_string()).collect(), + entity_ids: persistence_ids + .into_iter() + .map(|v| v.entity_id.to_string()) + .collect(), }, ) } - FilterCriteria::IncludeEntityIds { entity_id_offsets } => { - proto::filter_criteria::Message::IncludeEntityIds(proto::IncludeEntityIds { - entity_id_offset: entity_id_offsets - .into_iter() - .map( - |EntityIdOffset { entity_id, seq_nr }| proto::EntityIdOffset { - entity_id: entity_id.to_string(), - seq_nr: seq_nr as i64, - }, - ) - .collect(), - }) - } - FilterCriteria::RemoveIncludeEntityIds { entity_ids } => { + FilterCriteria::IncludePersistenceIds { + persistence_id_offsets, + } => proto::filter_criteria::Message::IncludeEntityIds(proto::IncludeEntityIds { + entity_id_offset: persistence_id_offsets + .into_iter() + .map( + |PersistenceIdIdOffset { + persistence_id, + seq_nr, + }| proto::EntityIdOffset { + entity_id: persistence_id.entity_id.to_string(), + seq_nr: seq_nr as i64, + }, + ) + .collect(), + }), + FilterCriteria::RemoveIncludePersistenceIds { persistence_ids } => { proto::filter_criteria::Message::RemoveIncludeEntityIds( proto::RemoveIncludeEntityIds { - entity_ids: entity_ids.into_iter().map(|v| v.to_string()).collect(), + entity_ids: persistence_ids + .into_iter() + .map(|v| v.entity_id.to_string()) + .collect(), }, ) } @@ -148,95 +160,126 @@ impl From for proto::FilterCriteria { /// due to there being no message. pub struct NoMessage; -impl TryFrom for FilterCriteria { - type Error = NoMessage; - - fn try_from(value: proto::FilterCriteria) -> Result { - match value.message { - Some(message) => { - let criteria = match message { - proto::filter_criteria::Message::ExcludeTags(proto::ExcludeTags { tags }) => { - FilterCriteria::ExcludeTags { - tags: tags.into_iter().map(Tag::from).collect(), - } - } - proto::filter_criteria::Message::RemoveExcludeTags( - proto::RemoveExcludeTags { tags }, - ) => FilterCriteria::RemoveExcludeTags { +/// Attempt to convert from a protobuf filter criteria to a model +/// representation given an entity type. +pub fn to_filter_criteria( + entity_type: EntityType, + value: proto::FilterCriteria, +) -> Result { + match value.message { + Some(message) => { + let criteria = match message { + proto::filter_criteria::Message::ExcludeTags(proto::ExcludeTags { tags }) => { + FilterCriteria::ExcludeTags { tags: tags.into_iter().map(Tag::from).collect(), - }, - proto::filter_criteria::Message::IncludeTags(proto::IncludeTags { tags }) => { - FilterCriteria::IncludeTags { - tags: tags.into_iter().map(Tag::from).collect(), - } } - proto::filter_criteria::Message::RemoveIncludeTags( - proto::RemoveIncludeTags { tags }, - ) => FilterCriteria::RemoveIncludeTags { + } + proto::filter_criteria::Message::RemoveExcludeTags(proto::RemoveExcludeTags { + tags, + }) => FilterCriteria::RemoveExcludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + }, + proto::filter_criteria::Message::IncludeTags(proto::IncludeTags { tags }) => { + FilterCriteria::IncludeTags { tags: tags.into_iter().map(Tag::from).collect(), - }, - proto::filter_criteria::Message::ExcludeMatchingEntityIds( - proto::ExcludeRegexEntityIds { matching }, - ) => FilterCriteria::ExcludeRegexEntityIds { - matching: matching.into_iter().map(EntityIdMatcher::from).collect(), - }, - proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( - proto::RemoveExcludeRegexEntityIds { matching }, - ) => FilterCriteria::RemoveExcludeRegexEntityIds { - matching: matching.into_iter().map(EntityIdMatcher::from).collect(), - }, - proto::filter_criteria::Message::IncludeMatchingEntityIds( - proto::IncludeRegexEntityIds { matching }, - ) => FilterCriteria::IncludeRegexEntityIds { - matching: matching.into_iter().map(EntityIdMatcher::from).collect(), - }, - proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( - proto::RemoveIncludeRegexEntityIds { matching }, - ) => FilterCriteria::RemoveIncludeRegexEntityIds { - matching: matching.into_iter().map(EntityIdMatcher::from).collect(), - }, - proto::filter_criteria::Message::ExcludeEntityIds( - proto::ExcludeEntityIds { entity_ids }, - ) => FilterCriteria::ExcludeEntityIds { - entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), - }, - proto::filter_criteria::Message::RemoveExcludeEntityIds( - proto::RemoveExcludeEntityIds { entity_ids }, - ) => FilterCriteria::RemoveExcludeEntityIds { - entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), - }, - proto::filter_criteria::Message::IncludeEntityIds( - proto::IncludeEntityIds { entity_id_offset }, - ) => FilterCriteria::IncludeEntityIds { - entity_id_offsets: entity_id_offset - .into_iter() - .map( - |proto::EntityIdOffset { entity_id, seq_nr }| EntityIdOffset { - entity_id: EntityId::from(entity_id), - seq_nr: seq_nr as u64, - }, - ) - .collect(), - }, - proto::filter_criteria::Message::RemoveIncludeEntityIds( - proto::RemoveIncludeEntityIds { entity_ids }, - ) => FilterCriteria::RemoveIncludeEntityIds { - entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), - }, - proto::filter_criteria::Message::IncludeTopics(proto::IncludeTopics { - expression, - }) => FilterCriteria::IncludeTopics { - expressions: expression.into_iter().map(TopicMatcher::from).collect(), - }, - proto::filter_criteria::Message::RemoveIncludeTopics( - proto::RemoveIncludeTopics { expression }, - ) => FilterCriteria::RemoveIncludeTopics { - expressions: expression.into_iter().map(TopicMatcher::from).collect(), - }, - }; - Ok(criteria) - } - None => Err(NoMessage), + } + } + proto::filter_criteria::Message::RemoveIncludeTags(proto::RemoveIncludeTags { + tags, + }) => FilterCriteria::RemoveIncludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + }, + proto::filter_criteria::Message::ExcludeMatchingEntityIds( + proto::ExcludeRegexEntityIds { matching }, + ) => FilterCriteria::ExcludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok()) + .collect(), + }, + proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( + proto::RemoveExcludeRegexEntityIds { matching }, + ) => FilterCriteria::RemoveExcludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok()) + .collect(), + }, + proto::filter_criteria::Message::IncludeMatchingEntityIds( + proto::IncludeRegexEntityIds { matching }, + ) => FilterCriteria::IncludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok()) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( + proto::RemoveIncludeRegexEntityIds { matching }, + ) => FilterCriteria::RemoveIncludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok()) + .collect(), + }, + proto::filter_criteria::Message::ExcludeEntityIds(proto::ExcludeEntityIds { + entity_ids, + }) => FilterCriteria::ExcludePersistenceIds { + persistence_ids: entity_ids + .into_iter() + .map(|e| PersistenceId::new(entity_type.clone(), EntityId::from(e))) + .collect(), + }, + proto::filter_criteria::Message::RemoveExcludeEntityIds( + proto::RemoveExcludeEntityIds { entity_ids }, + ) => FilterCriteria::RemoveExcludePersistenceIds { + persistence_ids: entity_ids + .into_iter() + .map(|e| PersistenceId::new(entity_type.clone(), EntityId::from(e))) + .collect(), + }, + proto::filter_criteria::Message::IncludeEntityIds(proto::IncludeEntityIds { + entity_id_offset, + }) => FilterCriteria::IncludePersistenceIds { + persistence_id_offsets: entity_id_offset + .into_iter() + .map( + |proto::EntityIdOffset { entity_id, seq_nr }| PersistenceIdIdOffset { + persistence_id: PersistenceId::new( + entity_type.clone(), + EntityId::from(entity_id), + ), + seq_nr: seq_nr as u64, + }, + ) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeEntityIds( + proto::RemoveIncludeEntityIds { entity_ids }, + ) => FilterCriteria::RemoveIncludePersistenceIds { + persistence_ids: entity_ids + .into_iter() + .map(|e| PersistenceId::new(entity_type.clone(), EntityId::from(e))) + .collect(), + }, + proto::filter_criteria::Message::IncludeTopics(proto::IncludeTopics { + expression, + }) => FilterCriteria::IncludeTopics { + expressions: expression + .into_iter() + .flat_map(|e| TopicFilter::new(e).ok()) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeTopics( + proto::RemoveIncludeTopics { expression }, + ) => FilterCriteria::RemoveIncludeTopics { + expressions: expression + .into_iter() + .flat_map(|e| TopicFilter::new(e).ok()) + .collect(), + }, + }; + Ok(criteria) } + None => Err(NoMessage), } } diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index b66436c..e80dbff 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -1,11 +1,13 @@ use akka_persistence_rs::EntityId; use akka_persistence_rs::EntityType; use akka_persistence_rs::PersistenceId; +use akka_persistence_rs::Tag; use akka_persistence_rs::TimestampOffset; -use akka_persistence_rs::WithEntityId; +use akka_persistence_rs::WithPersistenceId; use akka_persistence_rs::WithSeqNr; +use akka_persistence_rs::WithTags; use akka_persistence_rs::WithTimestampOffset; -use akka_projection_rs::consumer_filter; +use akka_projection_rs::consumer_filter::Filter; use akka_projection_rs::consumer_filter::FilterCriteria; use akka_projection_rs::HandlerError; use akka_projection_rs::PendingHandler; @@ -30,6 +32,7 @@ use tonic::Request; use crate::delayer::Delayer; use crate::proto; +use crate::to_filter_criteria; use crate::EventEnvelope; use crate::StreamId; @@ -50,6 +53,8 @@ where { flow: GrpcEventFlow, consumer_filters_receiver: watch::Receiver>, + filter: Filter, + topic_tag_prefix: Tag, transformer: F, phantom: PhantomData, } @@ -57,7 +62,7 @@ where #[async_trait] impl PendingHandler for GrpcEventProcessor where - EI: WithEntityId + WithSeqNr + WithTimestampOffset + Send, + EI: WithPersistenceId + WithSeqNr + WithTags + WithTimestampOffset + Send, E: Send, F: Fn(&EI) -> Option + Send, { @@ -69,8 +74,15 @@ where &mut self, envelope: Self::Envelope, ) -> Result> + Send>>, HandlerError> { - let event = if consumer_filter::matches(&envelope, &self.consumer_filters_receiver.borrow()) - { + if self.consumer_filters_receiver.has_changed().unwrap_or(true) { + self.filter = ( + self.topic_tag_prefix.clone(), + self.consumer_filters_receiver.borrow().clone(), + ) + .into(); + }; + + let event = if self.filter.matches(&envelope) { (self.transformer)(&envelope) } else { // Gaps are ok given a filter situation. @@ -78,7 +90,7 @@ where }; let transformation = Transformation { - entity_id: envelope.entity_id(), + entity_id: envelope.persistence_id().entity_id, seq_nr: envelope.seq_nr(), offset: envelope.timestamp_offset(), event, @@ -111,9 +123,13 @@ impl GrpcEventFlow { } } + /// Produces a handler for this flow. The handler will receive events, + /// apply filters and, if the filters match, transform and forward events + /// on to a gRPC producer. pub fn handler( self, consumer_filters_receiver: watch::Receiver>, + topic_tag_prefix: Tag, transformer: F, ) -> GrpcEventProcessor where @@ -122,6 +138,8 @@ impl GrpcEventFlow { GrpcEventProcessor { flow: self, consumer_filters_receiver, + filter: Filter::default(), + topic_tag_prefix, transformer, phantom: PhantomData, } @@ -160,6 +178,7 @@ pub async fn run( origin_id: StreamId, stream_id: StreamId, consumer_filters: watch::Sender>, + entity_type: EntityType, mut envelopes: mpsc::Receiver<(EventEnvelope, oneshot::Sender<()>)>, mut kill_switch: oneshot::Receiver<()>, ) where @@ -261,7 +280,7 @@ pub async fn run( })) = stream_outs.next() => match message { proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter }) => { debug!("Starting the protocol"); - consumer_filters.send(filter.into_iter().flat_map(|f| f.try_into()).collect()).unwrap(); + consumer_filters.send(filter.into_iter().flat_map(|f| to_filter_criteria(entity_type.clone(), f)).collect()).unwrap(); break; } _ => { @@ -437,6 +456,7 @@ mod tests { OriginId::from("some-origin-id"), StreamId::from("some-stream-id"), consumer_filters, + EntityType::from("some-entity-type"), receiver, task_kill_switch_receiver, ) diff --git a/akka-projection-rs/Cargo.toml b/akka-projection-rs/Cargo.toml index ec05cca..0076276 100644 --- a/akka-projection-rs/Cargo.toml +++ b/akka-projection-rs/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] async-trait = { workspace = true } chrono = { workspace = true } +mqtt-protocol = { workspace = true } +regex = { workspace = true } serde = { workspace = true } smol_str = { workspace = true } tokio = { workspace = true } diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 078b6bc..a97ea5d 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -25,25 +25,20 @@ //! * IncludeRegexEntityIds - include events for entities with entity ids matching the given regular expressions //! * IncludeEntityIds - include events for entities with the given entity ids -use akka_persistence_rs::{EntityId, WithEntityId}; -use smol_str::SmolStr; +use akka_persistence_rs::{ + EntityId, PersistenceId, Tag, WithEntityId, WithPersistenceId, WithTags, +}; +use mqtt::{TopicFilter, TopicNameRef}; +use regex::Regex; #[derive(Clone)] -pub struct EntityIdOffset { - pub entity_id: EntityId, +pub struct PersistenceIdIdOffset { + pub persistence_id: PersistenceId, // If this is defined (> 0) events are replayed from the given // sequence number (inclusive). pub seq_nr: u64, } -/// A regex expression for matching entity ids. -pub type EntityIdMatcher = SmolStr; - -pub type Tag = SmolStr; - -/// A topic match expression according to MQTT specification, including wildcards -pub type TopicMatcher = SmolStr; - /// Exclude criteria are evaluated first. /// If no matching exclude criteria the event is emitted. /// If an exclude criteria is matching the include criteria are evaluated. @@ -63,49 +58,177 @@ pub enum FilterCriteria { RemoveIncludeTags { tags: Vec }, /// Exclude events for entities with entity ids matching the given regular expressions, /// unless there is a matching include filter that overrides the exclude. - ExcludeRegexEntityIds { matching: Vec }, + ExcludeRegexEntityIds { matching: Vec }, /// Remove a previously added `ExcludeRegexEntityIds`. - RemoveExcludeRegexEntityIds { matching: Vec }, + RemoveExcludeRegexEntityIds { matching: Vec }, /// Include events for entities with entity ids matching the given regular expressions. /// A matching include overrides a matching exclude. - IncludeRegexEntityIds { matching: Vec }, + IncludeRegexEntityIds { matching: Vec }, /// Remove a previously added `IncludeRegexEntityIds`. - RemoveIncludeRegexEntityIds { matching: Vec }, - /// Exclude events for entities with the given entity ids, + RemoveIncludeRegexEntityIds { matching: Vec }, + /// Exclude events for entities with the given persistence ids, /// unless there is a matching include filter that overrides the exclude. - ExcludeEntityIds { entity_ids: Vec }, - /// Remove a previously added `ExcludeEntityIds`. - RemoveExcludeEntityIds { entity_ids: Vec }, - /// Include events for entities with the given entity ids. A matching include overrides + ExcludePersistenceIds { persistence_ids: Vec }, + /// Remove a previously added `ExcludePersistenceIds`. + RemoveExcludePersistenceIds { persistence_ids: Vec }, + /// Include events for entities with the given persistence ids. A matching include overrides /// a matching exclude. /// /// For the given entity ids a `seq_nr` can be defined to replay all events for the entity /// from the sequence number (inclusive). If `seq_nr` is 0 events will not be replayed. - IncludeEntityIds { - entity_id_offsets: Vec, + IncludePersistenceIds { + persistence_id_offsets: Vec, }, - /// Remove a previously added `IncludeEntityIds`. - RemoveIncludeEntityIds { entity_ids: Vec }, + /// Remove a previously added `IncludePersistenceIds`. + RemoveIncludePersistenceIds { persistence_ids: Vec }, /// Include events with any of the given matching topics. A matching include overrides /// a matching exclude. - IncludeTopics { expressions: Vec }, + IncludeTopics { expressions: Vec }, /// Remove a previously added `IncludeTopics`. - RemoveIncludeTopics { expressions: Vec }, + RemoveIncludeTopics { expressions: Vec }, } /// Exclude events from all entity ids, convenience for combining with for example a topic filter /// to include only events matching the topic filter. pub fn exclude_all() -> FilterCriteria { FilterCriteria::ExcludeRegexEntityIds { - matching: vec![EntityIdMatcher::from(".*")], + matching: vec![Regex::new(".*").unwrap()], } } -/// A function that matches an envelope with criteria and passes it through if matched. -pub fn matches(_envelope: &E, _consumer_filters: &[FilterCriteria]) -> bool -where - E: WithEntityId, -{ - // TODO - todo!() +/// A collection of criteria +#[derive(Default)] +pub struct Filter { + topic_tag_prefix: Tag, + + exclude_tags: Vec, + remove_exclude_tags: Vec, + include_tags: Vec, + remove_include_tags: Vec, + exclude_regex_entity_ids: Vec, + remove_exclude_regex_entity_ids: Vec, + include_regex_entity_ids: Vec, + remove_include_regex_entity_ids: Vec, + exclude_persistence_ids: Vec, + remove_exclude_persistence_ids: Vec, + include_persistence_ids: Vec, + remove_include_persistence_ids: Vec, + include_topics: Vec, + remove_include_topics: Vec, +} + +impl Filter { + /// A function that matches an envelope with criteria and passes it through if matched. + pub fn matches(&self, envelope: &E) -> bool + where + E: WithPersistenceId + WithTags, + { + let persistence_id = envelope.persistence_id(); + let entity_id = persistence_id.entity_id.clone(); + let tags = envelope.tags(); + + if self.matches_exclude_tags(&tags) || self.matches_exclude_persistence_ids(&persistence_id) + { + true + } else { + true + } + + // if (env.tags.intersect(excludeTags).nonEmpty || + // excludePersistenceIds.contains(pid) || + // matchesExcludeRegexEntityIds) { + // env.tags.intersect(includeTags).nonEmpty || + // matchesTopics || + // includePersistenceIds.contains(pid) || + // matchesIncludeRegexEntityIds + // } else { + // true + // } + } + + fn matches_exclude_regex_entity_ids(&self, entity_id: &EntityId) -> bool { + Self::matches_regex_entity_ids(&self.exclude_regex_entity_ids, entity_id) + } + + fn matches_include_regex_entity_ids(&self, entity_id: &EntityId) -> bool { + Self::matches_regex_entity_ids(&self.include_regex_entity_ids, entity_id) + } + + fn matches_exclude_persistence_ids(&self, persistence_id: &PersistenceId) -> bool { + Self::matches_persistence_ids(&self.exclude_persistence_ids, persistence_id) + } + + // fn matches_include_persistence_ids(&self, persistence_id: &PersistenceId) -> bool { + // Self::matches_persistence_ids(&self.include_persistence_ids, persistence_id) + // } + + fn matches_exclude_tags(&self, tags: &[Tag]) -> bool { + Self::matches_tags(&self.exclude_tags, tags) + } + + fn matches_include_tags(&self, tags: &[Tag]) -> bool { + Self::matches_tags(&self.include_tags, tags) + } + + fn matches_regex_entity_ids(matching: &[Regex], entity_id: &EntityId) -> bool { + matching.iter().any(|r| r.is_match(&entity_id)) + } + + fn matches_persistence_ids( + persistence_ids: &[PersistenceId], + persistence_id: &PersistenceId, + ) -> bool { + persistence_ids.iter().any(|pi| pi == persistence_id) + } + + fn matches_tags(match_tags: &[Tag], tags: &[Tag]) -> bool { + match_tags.iter().any(|mt| tags.iter().any(|t| t == mt)) + } + fn matches_topics( + expressions: &[TopicFilter], + topic_tag_prefix: &Tag, + tags: &Vec, + ) -> bool { + let topic_tag_prefix_len = topic_tag_prefix.len(); + expressions.iter().any(|r| { + let matcher = r.get_matcher(); + tags.iter() + .filter(|t| t.starts_with(topic_tag_prefix.as_str())) + .any(|t| { + let topic_name = TopicNameRef::new(&t[topic_tag_prefix_len..]); + if let Ok(topic_name) = topic_name { + matcher.is_match(topic_name) + } else { + false + } + }) + }) + } +} + +impl From<(Tag, Vec)> for Filter { + fn from(value: (Tag, Vec)) -> Self { + let (topic_tag_prefix, value) = value; + + value.into_iter().fold(Filter {topic_tag_prefix, ..Filter::default()}, |mut f, fc| { + #[rustfmt::skip] + match fc { + FilterCriteria::ExcludeTags { mut tags } => f.exclude_tags.append(&mut tags), + FilterCriteria::RemoveExcludeTags { mut tags } => f.remove_exclude_tags.append(&mut tags), + FilterCriteria::IncludeTags { mut tags } => f.include_tags.append(&mut tags), + FilterCriteria::RemoveIncludeTags { mut tags } => f.remove_include_tags.append(&mut tags), + FilterCriteria::ExcludeRegexEntityIds { mut matching } => f.exclude_regex_entity_ids.append(&mut matching), + FilterCriteria::RemoveExcludeRegexEntityIds { mut matching } => f.remove_exclude_regex_entity_ids.append(&mut matching), + FilterCriteria::IncludeRegexEntityIds { mut matching } => f.include_regex_entity_ids.append(&mut matching), + FilterCriteria::RemoveIncludeRegexEntityIds { mut matching } => f.remove_include_regex_entity_ids.append(&mut matching), + FilterCriteria::ExcludePersistenceIds { mut persistence_ids } => f.exclude_persistence_ids.append(&mut persistence_ids), + FilterCriteria::RemoveExcludePersistenceIds { mut persistence_ids } => f.remove_exclude_persistence_ids.append(&mut persistence_ids), + FilterCriteria::IncludePersistenceIds { mut persistence_id_offsets } => f.include_persistence_ids.append(&mut persistence_id_offsets), + FilterCriteria::RemoveIncludePersistenceIds { mut persistence_ids } => f.remove_include_persistence_ids.append(&mut persistence_ids), + FilterCriteria::IncludeTopics { mut expressions } => f.include_topics.append(&mut expressions), + FilterCriteria::RemoveIncludeTopics { mut expressions } => f.remove_include_topics.append(&mut expressions), + }; + f + }) + } } diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index f1f1b72..b48b71c 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -2,7 +2,7 @@ use crate::proto; use crate::temperature::{self, EventEnvelopeMarshaler}; -use akka_persistence_rs::EntityType; +use akka_persistence_rs::{EntityType, Tag}; use akka_persistence_rs_commitlog::EventEnvelope as CommitLogEventEnvelope; use akka_projection_rs_commitlog::CommitLogSourceProvider; use akka_projection_rs_grpc::producer::GrpcEventFlow; @@ -25,13 +25,15 @@ pub async fn task( kill_switch: oneshot::Receiver<()>, state_storage_path: PathBuf, ) { + let entity_type = EntityType::from(temperature::ENTITY_TYPE); + // Establish a sink of envelopes that will be forwarded // on to a consumer via gRPC event producer. let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]); let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10); - let grpc_flow = GrpcEventFlow::new(EntityType::from(temperature::ENTITY_TYPE), grpc_producer); + let grpc_flow = GrpcEventFlow::new(entity_type.clone(), grpc_producer); let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); tokio::spawn(async { @@ -41,6 +43,7 @@ pub async fn task( OriginId::from("edge-iot-service"), StreamId::from("temperature-events"), consumer_filters, + entity_type, grpc_producer_receiver, task_kill_switch_receiver, ) @@ -90,7 +93,7 @@ pub async fn task( &state_storage_path, kill_switch, source_provider, - grpc_flow.handler(consumer_filters_receiver, transformer), + grpc_flow.handler(consumer_filters_receiver, Tag::from("t:"), transformer), Duration::from_millis(100), ) .await From a07ea02587f2f3b3901cbb6ab2a645147982262e Mon Sep 17 00:00:00 2001 From: huntc Date: Sat, 7 Oct 2023 13:06:30 +1100 Subject: [PATCH 04/19] Replace WithEntityId and streamlined EventType usage By conveying a PersistenceId instead of an EntityId for envelopes, we can reduce the places where we need to declare the event type. --- akka-persistence-rs-commitlog/src/lib.rs | 54 ++++++++----------- akka-persistence-rs/src/lib.rs | 5 -- akka-projection-rs-commitlog/src/lib.rs | 28 ++++------ .../src/offset_store.rs | 15 ++++-- akka-projection-rs-grpc/src/lib.rs | 8 +-- akka-projection-rs-grpc/src/producer.rs | 9 +++- akka-projection-rs-storage/src/lib.rs | 43 ++++++++++----- akka-projection-rs/src/consumer_filter.rs | 4 +- .../iot-service/backend/src/temperature.rs | 7 +++ .../backend/src/temperature_production.rs | 5 +- 10 files changed, 93 insertions(+), 85 deletions(-) diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 7f6421e..67d2932 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -2,8 +2,8 @@ use akka_persistence_rs::{ entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider}, - EntityId, Offset, TimestampOffset, WithEntityId, WithOffset, WithSeqNr, WithTags, - WithTimestampOffset, + EntityId, EntityType, Offset, PersistenceId, TimestampOffset, WithOffset, WithPersistenceId, + WithSeqNr, WithTags, WithTimestampOffset, }; use async_stream::stream; use async_trait::async_trait; @@ -22,37 +22,16 @@ use tokio_stream::{Stream, StreamExt}; /// An envelope wraps a commit log event associated with a specific entity. #[derive(Clone, Debug, PartialEq)] pub struct EventEnvelope { - pub entity_id: EntityId, + pub persistence_id: PersistenceId, pub seq_nr: u64, pub timestamp: DateTime, pub event: E, pub offset: CommitLogOffset, } -impl EventEnvelope { - pub fn new( - entity_id: EI, - seq_nr: u64, - timestamp: DateTime, - event: E, - offset: CommitLogOffset, - ) -> Self - where - EI: Into, - { - Self { - entity_id: entity_id.into(), - seq_nr, - timestamp, - event, - offset, - } - } -} - -impl WithEntityId for EventEnvelope { - fn entity_id(&self) -> EntityId { - self.entity_id.clone() +impl WithPersistenceId for EventEnvelope { + fn persistence_id(&self) -> PersistenceId { + self.persistence_id.clone() } } @@ -92,6 +71,9 @@ pub trait CommitLogMarshaler where for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { + /// Declares the entity type to the marshaler. + fn entity_type(&self) -> EntityType; + /// Provide a key we can use for the purposes of log compaction. /// A key would generally comprise and event type value held in /// the high bits, and the entity id in the lower bits. @@ -167,8 +149,12 @@ where } }); seq_nr.and_then(|seq_nr| { - record.timestamp.map(|timestamp| { - EventEnvelope::new(entity_id, seq_nr, timestamp, event, record.offset) + record.timestamp.map(|timestamp| EventEnvelope { + persistence_id: PersistenceId::new(self.entity_type(), entity_id), + seq_nr, + timestamp, + event, + offset: record.offset, }) }) }) @@ -299,7 +285,7 @@ where marshaler.envelope(record_entity_id, consumer_record).await { yield EntityManagerEventEnvelope::new( - envelope.entity_id, + envelope.persistence_id.entity_id, envelope.seq_nr, envelope.timestamp, envelope.event, @@ -336,7 +322,7 @@ where marshaler.envelope(record_entity_id, consumer_record).await { yield EntityManagerEventEnvelope::new( - envelope.entity_id, + envelope.persistence_id.entity_id, envelope.seq_nr, envelope.timestamp, envelope.event, @@ -484,6 +470,10 @@ mod tests { #[async_trait] impl CommitLogMarshaler for MyEventMarshaler { + fn entity_type(&self) -> EntityType { + EntityType::from("some-entity-type") + } + fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option { panic!("should not be called") } @@ -504,7 +494,7 @@ mod tests { let value = String::from_utf8(record.value).ok()?; let event = MyEvent { value }; record.timestamp.map(|timestamp| EventEnvelope { - entity_id, + persistence_id: PersistenceId::new(self.entity_type(), entity_id), seq_nr: 1, timestamp, event, diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index db6d341..9aa07d0 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -24,11 +24,6 @@ pub type EntityId = SmolStr; /// Tags annotate an entity's events pub type Tag = SmolStr; -/// Implemented by structures that can return an entity id. -pub trait WithEntityId { - fn entity_id(&self) -> EntityId; -} - /// Implemented by structures that can return a persistence id. pub trait WithPersistenceId { fn persistence_id(&self) -> PersistenceId; diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 7b5dd33..913a062 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -4,7 +4,7 @@ pub mod offset_store; use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin}; -use akka_persistence_rs::{EntityType, Offset, PersistenceId}; +use akka_persistence_rs::{Offset, PersistenceId}; use akka_persistence_rs_commitlog::{CommitLogMarshaler, EventEnvelope}; use akka_projection_rs::SourceProvider; use async_stream::stream; @@ -17,7 +17,6 @@ use tokio_stream::{Stream, StreamExt}; pub struct CommitLogSourceProvider { commit_log: CL, consumer_group_name: String, - entity_type: EntityType, marshaler: M, slice_range: Range, topic: Topic, @@ -30,13 +29,7 @@ where M: CommitLogMarshaler + Sync, for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { - pub fn new( - commit_log: CL, - marshaler: M, - consumer_group_name: &str, - topic: Topic, - entity_type: EntityType, - ) -> Self { + pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self { // When it comes to having a projection sourced from a local // commit log, there's little benefit if having many of them. // We therefore manage all slices from just one projection. @@ -47,7 +40,6 @@ where marshaler, consumer_group_name, topic, - entity_type, slice_range.get(0).cloned().unwrap(), ) } @@ -57,7 +49,6 @@ where marshaler: M, consumer_group_name: &str, topic: Topic, - entity_type: EntityType, slice_range: Range, ) -> Self { Self { @@ -66,7 +57,6 @@ where marshaler, slice_range, topic, - entity_type, phantom: PhantomData, } } @@ -102,7 +92,7 @@ where while let Some(consumer_record) = records.next().await { if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) { let persistence_id = - PersistenceId::new(self.entity_type.clone(), record_entity_id); + PersistenceId::new(marshaler.entity_type(), record_entity_id); if self.slice_range.contains(&persistence_id.slice()) { if let Some(envelope) = marshaler .envelope(persistence_id.entity_id, consumer_record) @@ -144,7 +134,7 @@ mod tests { use std::{env, fs}; use super::*; - use akka_persistence_rs::EntityId; + use akka_persistence_rs::{EntityId, EntityType}; use chrono::{DateTime, Utc}; use serde::Deserialize; use streambed::commit_log::{ConsumerRecord, Header, Key, ProducerRecord}; @@ -173,6 +163,10 @@ mod tests { #[async_trait] impl CommitLogMarshaler for MyEventMarshaler { + fn entity_type(&self) -> EntityType { + EntityType::from("some-topic") + } + fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option { panic!("should not be called") } @@ -193,7 +187,7 @@ mod tests { let value = String::from_utf8(record.value).ok()?; let event = MyEvent { value }; record.timestamp.map(|timestamp| EventEnvelope { - entity_id, + persistence_id: PersistenceId::new(self.entity_type(), entity_id), seq_nr: 1, timestamp, event, @@ -239,6 +233,7 @@ mod tests { let entity_type = EntityType::from("some-topic"); let entity_id = EntityId::from("some-entity"); + let persistence_id = PersistenceId::new(entity_type.clone(), entity_id.clone()); let topic = Topic::from("some-topic"); let event_value = "some value".to_string(); @@ -263,13 +258,12 @@ mod tests { MyEventMarshaler, "some-consumer", topic, - entity_type, ); let mut envelopes = source_provider.source(|| async { None }).await; let envelope = envelopes.next().await.unwrap(); - assert_eq!(envelope.entity_id, entity_id,); + assert_eq!(envelope.persistence_id, persistence_id); assert_eq!(envelope.event, MyEvent { value: event_value },); } } diff --git a/akka-projection-rs-commitlog/src/offset_store.rs b/akka-projection-rs-commitlog/src/offset_store.rs index 60769e7..b2457fc 100644 --- a/akka-projection-rs-commitlog/src/offset_store.rs +++ b/akka-projection-rs-commitlog/src/offset_store.rs @@ -2,7 +2,7 @@ use std::num::NonZeroUsize; -use akka_persistence_rs::{entity_manager, EntityId, Message}; +use akka_persistence_rs::{entity_manager, EntityId, EntityType, Message, PersistenceId}; use akka_persistence_rs_commitlog::{CommitLogMarshaler, CommitLogTopicAdapter, EventEnvelope}; use akka_projection_rs::offset_store; use async_trait::async_trait; @@ -13,6 +13,7 @@ use streambed_logged::{compaction::KeyBasedRetention, FileLog}; use tokio::sync::mpsc; struct OffsetStoreEventMarshaler { + entity_type: EntityType, to_compaction_key: F, } @@ -21,6 +22,10 @@ impl CommitLogMarshaler for OffsetStoreEventMarshaler where F: Fn(&EntityId, &offset_store::Event) -> Option + Send + Sync, { + fn entity_type(&self) -> EntityType { + self.entity_type.clone() + } + fn to_compaction_key(&self, entity_id: &EntityId, event: &offset_store::Event) -> Option { (self.to_compaction_key)(entity_id, event) } @@ -41,7 +46,7 @@ where let value = u64::from_be_bytes(record.value.try_into().ok()?); let event = offset_store::Event::Saved { seq_nr: value }; record.timestamp.map(|timestamp| EventEnvelope { - entity_id, + persistence_id: PersistenceId::new(self.entity_type(), entity_id), seq_nr: 0, // We don't care about sequence numbers with the offset store as they won't be projected anywhere timestamp, event, @@ -85,6 +90,7 @@ pub async fn run( offset_store_receiver: mpsc::Receiver>, to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option + Send + Sync + 'static, ) { + let events_entity_type = EntityType::from(offset_store_id.clone()); let events_topic = Topic::from(offset_store_id.clone()); commit_log @@ -94,7 +100,10 @@ pub async fn run( let file_log_topic_adapter = CommitLogTopicAdapter::new( commit_log, - OffsetStoreEventMarshaler { to_compaction_key }, + OffsetStoreEventMarshaler { + entity_type: events_entity_type, + to_compaction_key, + }, &offset_store_id, events_topic, ); diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs index 9eaa20f..1e7d42f 100644 --- a/akka-projection-rs-grpc/src/lib.rs +++ b/akka-projection-rs-grpc/src/lib.rs @@ -1,7 +1,7 @@ #![doc = include_str!("../README.md")] use akka_persistence_rs::{ - EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithEntityId, WithOffset, + EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset, }; use akka_projection_rs::consumer_filter::{FilterCriteria, PersistenceIdIdOffset}; use mqtt::TopicFilter; @@ -21,12 +21,6 @@ pub struct EventEnvelope { pub offset: TimestampOffset, } -impl WithEntityId for EventEnvelope { - fn entity_id(&self) -> EntityId { - self.persistence_id.entity_id.clone() - } -} - impl WithOffset for EventEnvelope { fn offset(&self) -> Offset { Offset::Timestamp(self.offset.clone()) diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index e80dbff..c205f7e 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -280,7 +280,12 @@ pub async fn run( })) = stream_outs.next() => match message { proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter }) => { debug!("Starting the protocol"); - consumer_filters.send(filter.into_iter().flat_map(|f| to_filter_criteria(entity_type.clone(), f)).collect()).unwrap(); + let _ = consumer_filters.send( + filter + .into_iter() + .flat_map(|f| to_filter_criteria(entity_type.clone(), f)) + .collect(), + ); break; } _ => { @@ -446,7 +451,7 @@ mod tests { .unwrap(); }); - let (consumer_filters, _) = watch::channel(vec![]); + let (consumer_filters, _consumer_filters_receiver) = watch::channel(vec![]); let (sender, receiver) = mpsc::channel(10); let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); tokio::spawn(async move { diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index 7e3791e..5b4d603 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -159,7 +159,7 @@ mod tests { use std::{collections::HashMap, env, fs, future::Future, pin::Pin}; use super::*; - use akka_persistence_rs::EntityId; + use akka_persistence_rs::{EntityId, EntityType, PersistenceId}; use akka_persistence_rs_commitlog::EventEnvelope; use akka_projection_rs::HandlerError; use async_stream::stream; @@ -248,7 +248,7 @@ mod tests { const MIN_SAVE_OFFSET_INTERVAL: Duration = Duration::from_millis(100); struct MySourceProvider { - entity_id: EntityId, + persistence_id: PersistenceId, event_value: String, } @@ -264,16 +264,27 @@ mod tests { F: Fn() -> FR + Send + Sync, FR: Future> + Send, { - Box::pin(stream! { - if offset().await.is_none() { - yield EventEnvelope::new(self.entity_id.clone(), 1, Utc::now(), MyEvent { value:self.event_value.clone() }, 0); + Box::pin( + stream! { + if offset().await.is_none() { + yield EventEnvelope { + persistence_id: self.persistence_id.clone(), + seq_nr: 1, + timestamp: Utc::now(), + event: MyEvent { + value: self.event_value.clone(), + }, + offset: 0, + } + } } - }.chain(stream::pending())) + .chain(stream::pending()), + ) } } struct MyHandler { - entity_id: EntityId, + persistence_id: PersistenceId, event_value: String, } @@ -283,7 +294,7 @@ mod tests { /// Process an envelope. async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError> { - assert_eq!(envelope.entity_id, self.entity_id); + assert_eq!(envelope.persistence_id, self.persistence_id); assert_eq!( envelope.event, MyEvent { @@ -295,7 +306,7 @@ mod tests { } struct MyHandlerPending { - entity_id: EntityId, + persistence_id: PersistenceId, event_value: String, } @@ -311,7 +322,7 @@ mod tests { envelope: Self::Envelope, ) -> Result> + Send>>, HandlerError> { - assert_eq!(envelope.entity_id, self.entity_id); + assert_eq!(envelope.persistence_id, self.persistence_id); assert_eq!( envelope.event, MyEvent { @@ -332,7 +343,9 @@ mod tests { // Scaffolding + let entity_type = EntityType::from("some-entity-type"); let entity_id = EntityId::from("some-entity"); + let persistence_id = PersistenceId::new(entity_type, entity_id); let event_value = "some value".to_string(); // Process an event. @@ -348,11 +361,11 @@ mod tests { &task_storage_path, registration_projection_command_receiver, MySourceProvider { - entity_id: entity_id.clone(), + persistence_id: persistence_id.clone(), event_value: event_value.clone(), }, MyHandler { - entity_id: entity_id.clone(), + persistence_id: persistence_id.clone(), event_value: event_value.clone(), }, MIN_SAVE_OFFSET_INTERVAL, @@ -383,7 +396,9 @@ mod tests { // Scaffolding + let entity_type = EntityType::from("some-entity-type"); let entity_id = EntityId::from("some-entity"); + let persistence_id = PersistenceId::new(entity_type, entity_id); let event_value = "some value".to_string(); // Process an event. @@ -398,11 +413,11 @@ mod tests { &task_storage_path, registration_projection_command_receiver, MySourceProvider { - entity_id: entity_id.clone(), + persistence_id: persistence_id.clone(), event_value: event_value.clone(), }, MyHandlerPending { - entity_id: entity_id.clone(), + persistence_id: persistence_id.clone(), event_value: event_value.clone(), }, MIN_SAVE_OFFSET_INTERVAL, diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index a97ea5d..131e4bc 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -25,9 +25,7 @@ //! * IncludeRegexEntityIds - include events for entities with entity ids matching the given regular expressions //! * IncludeEntityIds - include events for entities with the given entity ids -use akka_persistence_rs::{ - EntityId, PersistenceId, Tag, WithEntityId, WithPersistenceId, WithTags, -}; +use akka_persistence_rs::{EntityId, PersistenceId, Tag, WithPersistenceId, WithTags}; use mqtt::{TopicFilter, TopicNameRef}; use regex::Regex; diff --git a/examples/iot-service/backend/src/temperature.rs b/examples/iot-service/backend/src/temperature.rs index 974d20c..e9193b4 100644 --- a/examples/iot-service/backend/src/temperature.rs +++ b/examples/iot-service/backend/src/temperature.rs @@ -5,6 +5,7 @@ use std::{io, num::NonZeroUsize, sync::Arc}; use akka_persistence_rs::{ effect::{self, emit_event, reply, then, unhandled, Effect, EffectExt}, entity::{Context, EventSourcedBehavior}, + EntityType, }; use akka_persistence_rs::{ entity_manager::{self}, @@ -117,6 +118,7 @@ impl EventSourcedBehavior for Behavior { // keys in any way required. pub struct EventEnvelopeMarshaler { + pub entity_type: EntityType, pub events_key_secret_path: Arc, pub secret_store: FileSecretStore, } @@ -134,6 +136,10 @@ const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF; #[async_trait] impl CommitLogMarshaler for EventEnvelopeMarshaler { + fn entity_type(&self) -> EntityType { + self.entity_type.clone() + } + fn to_compaction_key(&self, entity_id: &EntityId, event: &Event) -> Option { let record_type = match event { Event::TemperatureRead { .. } => Some(0), @@ -227,6 +233,7 @@ pub async fn task( let file_log_topic_adapter = CommitLogTopicAdapter::new( commit_log, EventEnvelopeMarshaler { + entity_type: EntityType::from(ENTITY_TYPE), events_key_secret_path: Arc::from(events_key_secret_path), secret_store, }, diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index b48b71c..49cfbb5 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -36,6 +36,7 @@ pub async fn task( let grpc_flow = GrpcEventFlow::new(entity_type.clone(), grpc_producer); let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); + let task_entity_type = entity_type.clone(); tokio::spawn(async { let channel = Channel::builder(event_consumer_addr); akka_projection_rs_grpc::producer::run( @@ -43,7 +44,7 @@ pub async fn task( OriginId::from("edge-iot-service"), StreamId::from("temperature-events"), consumer_filters, - entity_type, + task_entity_type, grpc_producer_receiver, task_kill_switch_receiver, ) @@ -55,12 +56,12 @@ pub async fn task( let source_provider = CommitLogSourceProvider::new( commit_log, EventEnvelopeMarshaler { + entity_type, events_key_secret_path: Arc::from(events_key_secret_path), secret_store: secret_store.clone(), }, "iot-service-projection", Topic::from(temperature::EVENTS_TOPIC), - EntityType::from(temperature::ENTITY_TYPE), ); // Optionally transform events from the commit log to an event the From e8b7b35e530bc4656ba5ca91b8f325f5aede146e Mon Sep 17 00:00:00 2001 From: huntc Date: Sat, 7 Oct 2023 13:32:57 +1100 Subject: [PATCH 05/19] Includes a test for the consumer filter --- akka-projection-rs-grpc/src/producer.rs | 9 +++++++-- .../iot-service/backend/src/temperature_production.rs | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index c205f7e..496c09c 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -395,7 +395,9 @@ mod tests { yield Ok(proto::ConsumeEventOut { message: Some(proto::consume_event_out::Message::Start( proto::ConsumerEventStart { - filter: vec![] + filter: vec![proto::FilterCriteria { + message: Some(proto::filter_criteria::Message::ExcludeEntityIds(proto::ExcludeEntityIds { entity_ids: vec![] })), + }] }, )), }); @@ -451,7 +453,7 @@ mod tests { .unwrap(); }); - let (consumer_filters, _consumer_filters_receiver) = watch::channel(vec![]); + let (consumer_filters, mut consumer_filters_receiver) = watch::channel(vec![]); let (sender, receiver) = mpsc::channel(10); let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); tokio::spawn(async move { @@ -488,6 +490,9 @@ mod tests { .await .is_ok()); + assert!(consumer_filters_receiver.changed().await.is_ok()); + assert!(!consumer_filters_receiver.borrow().is_empty()); + assert!(reply_receiver.await.is_ok()); server_kill_switch.notified(); diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index 49cfbb5..95a5145 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -37,6 +37,7 @@ pub async fn task( let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); let task_entity_type = entity_type.clone(); + tokio::spawn(async { let channel = Channel::builder(event_consumer_addr); akka_projection_rs_grpc::producer::run( From 6cba9bb67750e8ffd275860605086ec3fe558fdc Mon Sep 17 00:00:00 2001 From: huntc Date: Sat, 7 Oct 2023 15:42:29 +1100 Subject: [PATCH 06/19] Makes the filter update-able as per the JVM --- akka-projection-rs-grpc/src/producer.rs | 11 +-- akka-projection-rs/src/consumer_filter.rs | 105 +++++++++++----------- 2 files changed, 53 insertions(+), 63 deletions(-) diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index 496c09c..ae0fcbc 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -54,7 +54,6 @@ where flow: GrpcEventFlow, consumer_filters_receiver: watch::Receiver>, filter: Filter, - topic_tag_prefix: Tag, transformer: F, phantom: PhantomData, } @@ -75,11 +74,8 @@ where envelope: Self::Envelope, ) -> Result> + Send>>, HandlerError> { if self.consumer_filters_receiver.has_changed().unwrap_or(true) { - self.filter = ( - self.topic_tag_prefix.clone(), - self.consumer_filters_receiver.borrow().clone(), - ) - .into(); + self.filter + .update(self.consumer_filters_receiver.borrow().clone()); }; let event = if self.filter.matches(&envelope) { @@ -138,8 +134,7 @@ impl GrpcEventFlow { GrpcEventProcessor { flow: self, consumer_filters_receiver, - filter: Filter::default(), - topic_tag_prefix, + filter: Filter::new(topic_tag_prefix), transformer, phantom: PhantomData, } diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 131e4bc..1ffa38c 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -95,53 +95,52 @@ pub fn exclude_all() -> FilterCriteria { } /// A collection of criteria -#[derive(Default)] pub struct Filter { topic_tag_prefix: Tag, exclude_tags: Vec, - remove_exclude_tags: Vec, include_tags: Vec, - remove_include_tags: Vec, exclude_regex_entity_ids: Vec, - remove_exclude_regex_entity_ids: Vec, include_regex_entity_ids: Vec, - remove_include_regex_entity_ids: Vec, exclude_persistence_ids: Vec, - remove_exclude_persistence_ids: Vec, - include_persistence_ids: Vec, - remove_include_persistence_ids: Vec, + include_persistence_ids: Vec, include_topics: Vec, - remove_include_topics: Vec, } impl Filter { + pub fn new(topic_tag_prefix: Tag) -> Self { + Self { + topic_tag_prefix, + exclude_tags: vec![], + include_tags: vec![], + exclude_regex_entity_ids: vec![], + include_regex_entity_ids: vec![], + exclude_persistence_ids: vec![], + include_persistence_ids: vec![], + include_topics: vec![], + } + } + /// A function that matches an envelope with criteria and passes it through if matched. pub fn matches(&self, envelope: &E) -> bool where E: WithPersistenceId + WithTags, { + let tags = envelope.tags(); let persistence_id = envelope.persistence_id(); let entity_id = persistence_id.entity_id.clone(); - let tags = envelope.tags(); - if self.matches_exclude_tags(&tags) || self.matches_exclude_persistence_ids(&persistence_id) + if self.matches_exclude_tags(&tags) + || self.matches_exclude_persistence_ids(&persistence_id) + || self.matches_exclude_regex_entity_ids(&entity_id) { - true + self.matches_include_tags(&tags) + || self.matches_include_topics(&tags) + || self.matches_include_persistence_ids(&persistence_id) + || self.matches_include_regex_entity_ids(&entity_id) } else { true } - - // if (env.tags.intersect(excludeTags).nonEmpty || - // excludePersistenceIds.contains(pid) || - // matchesExcludeRegexEntityIds) { - // env.tags.intersect(includeTags).nonEmpty || - // matchesTopics || - // includePersistenceIds.contains(pid) || - // matchesIncludeRegexEntityIds - // } else { - // true - // } } fn matches_exclude_regex_entity_ids(&self, entity_id: &EntityId) -> bool { @@ -156,9 +155,9 @@ impl Filter { Self::matches_persistence_ids(&self.exclude_persistence_ids, persistence_id) } - // fn matches_include_persistence_ids(&self, persistence_id: &PersistenceId) -> bool { - // Self::matches_persistence_ids(&self.include_persistence_ids, persistence_id) - // } + fn matches_include_persistence_ids(&self, persistence_id: &PersistenceId) -> bool { + Self::matches_persistence_ids(&self.include_persistence_ids, persistence_id) + } fn matches_exclude_tags(&self, tags: &[Tag]) -> bool { Self::matches_tags(&self.exclude_tags, tags) @@ -168,8 +167,12 @@ impl Filter { Self::matches_tags(&self.include_tags, tags) } + fn matches_include_topics(&self, tags: &[Tag]) -> bool { + Self::matches_topics(&self.include_topics, &self.topic_tag_prefix, tags) + } + fn matches_regex_entity_ids(matching: &[Regex], entity_id: &EntityId) -> bool { - matching.iter().any(|r| r.is_match(&entity_id)) + matching.iter().any(|r| r.is_match(entity_id)) } fn matches_persistence_ids( @@ -182,11 +185,7 @@ impl Filter { fn matches_tags(match_tags: &[Tag], tags: &[Tag]) -> bool { match_tags.iter().any(|mt| tags.iter().any(|t| t == mt)) } - fn matches_topics( - expressions: &[TopicFilter], - topic_tag_prefix: &Tag, - tags: &Vec, - ) -> bool { + fn matches_topics(expressions: &[TopicFilter], topic_tag_prefix: &Tag, tags: &[Tag]) -> bool { let topic_tag_prefix_len = topic_tag_prefix.len(); expressions.iter().any(|r| { let matcher = r.get_matcher(); @@ -202,31 +201,27 @@ impl Filter { }) }) } -} -impl From<(Tag, Vec)> for Filter { - fn from(value: (Tag, Vec)) -> Self { - let (topic_tag_prefix, value) = value; - - value.into_iter().fold(Filter {topic_tag_prefix, ..Filter::default()}, |mut f, fc| { + /// Updates the filter given commands to add or remove new criteria. + pub fn update(&mut self, criteria: Vec) { + for criterion in criteria { #[rustfmt::skip] - match fc { - FilterCriteria::ExcludeTags { mut tags } => f.exclude_tags.append(&mut tags), - FilterCriteria::RemoveExcludeTags { mut tags } => f.remove_exclude_tags.append(&mut tags), - FilterCriteria::IncludeTags { mut tags } => f.include_tags.append(&mut tags), - FilterCriteria::RemoveIncludeTags { mut tags } => f.remove_include_tags.append(&mut tags), - FilterCriteria::ExcludeRegexEntityIds { mut matching } => f.exclude_regex_entity_ids.append(&mut matching), - FilterCriteria::RemoveExcludeRegexEntityIds { mut matching } => f.remove_exclude_regex_entity_ids.append(&mut matching), - FilterCriteria::IncludeRegexEntityIds { mut matching } => f.include_regex_entity_ids.append(&mut matching), - FilterCriteria::RemoveIncludeRegexEntityIds { mut matching } => f.remove_include_regex_entity_ids.append(&mut matching), - FilterCriteria::ExcludePersistenceIds { mut persistence_ids } => f.exclude_persistence_ids.append(&mut persistence_ids), - FilterCriteria::RemoveExcludePersistenceIds { mut persistence_ids } => f.remove_exclude_persistence_ids.append(&mut persistence_ids), - FilterCriteria::IncludePersistenceIds { mut persistence_id_offsets } => f.include_persistence_ids.append(&mut persistence_id_offsets), - FilterCriteria::RemoveIncludePersistenceIds { mut persistence_ids } => f.remove_include_persistence_ids.append(&mut persistence_ids), - FilterCriteria::IncludeTopics { mut expressions } => f.include_topics.append(&mut expressions), - FilterCriteria::RemoveIncludeTopics { mut expressions } => f.remove_include_topics.append(&mut expressions), + match criterion { + FilterCriteria::ExcludeTags { mut tags } => self.exclude_tags.append(&mut tags), + FilterCriteria::RemoveExcludeTags { tags } => self.exclude_tags.retain(|existing| !tags.contains(existing)), + FilterCriteria::IncludeTags { mut tags } => self.include_tags.append(&mut tags), + FilterCriteria::RemoveIncludeTags { tags } => self.include_tags.retain(|existing| !tags.contains(existing)), + FilterCriteria::ExcludeRegexEntityIds { mut matching } => self.exclude_regex_entity_ids.append(&mut matching), + FilterCriteria::RemoveExcludeRegexEntityIds { matching } => self.exclude_regex_entity_ids.retain(|existing| !matching.iter().map(|m| m.as_str()).collect::>().contains(&existing.as_str())), + FilterCriteria::IncludeRegexEntityIds { mut matching } => self.include_regex_entity_ids.append(&mut matching), + FilterCriteria::RemoveIncludeRegexEntityIds { matching } => self.include_regex_entity_ids.retain(|existing| !matching.iter().map(|m| m.as_str()).collect::>().contains(&existing.as_str())), + FilterCriteria::ExcludePersistenceIds { mut persistence_ids } => self.exclude_persistence_ids.append(&mut persistence_ids), + FilterCriteria::RemoveExcludePersistenceIds { persistence_ids } => self.exclude_persistence_ids.retain(|existing| !persistence_ids.contains(existing)), + FilterCriteria::IncludePersistenceIds { persistence_id_offsets } => self.include_persistence_ids.append(&mut persistence_id_offsets.into_iter().map(|PersistenceIdIdOffset { persistence_id, .. } | persistence_id).collect()), + FilterCriteria::RemoveIncludePersistenceIds { persistence_ids } => self.include_persistence_ids.retain(|existing| !persistence_ids.contains(existing)), + FilterCriteria::IncludeTopics { mut expressions } => self.include_topics.append(&mut expressions), + FilterCriteria::RemoveIncludeTopics { expressions } => self.include_topics.retain(|existing| !expressions.contains(existing)), }; - f - }) + } } } From 8b1231e268b65aa86108fd18bf3fe4ee56979909 Mon Sep 17 00:00:00 2001 From: huntc Date: Sat, 7 Oct 2023 16:52:36 +1100 Subject: [PATCH 07/19] Doc --- akka-persistence-rs-commitlog/src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 67d2932..8a52bce 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -49,7 +49,9 @@ impl WithSeqNr for EventEnvelope { impl WithTags for EventEnvelope { fn tags(&self) -> Vec { - // TODO We need to support tags + // Tags are not presently considered useful at the edge. A remote consumer would be interested + // in all events from the edge in most cases, and the edge itself decides what to publish + // (producer defined filter). vec![] } } From 715c7b458d6852cda783617ca7fe7c6f58cf779f Mon Sep 17 00:00:00 2001 From: huntc Date: Sat, 7 Oct 2023 16:59:02 +1100 Subject: [PATCH 08/19] Provides a producer filter --- akka-projection-rs-grpc/src/producer.rs | 26 ++++++++++--------- .../backend/src/temperature_production.rs | 14 ++++++++-- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index ae0fcbc..43ab265 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -47,23 +47,22 @@ pub struct Transformation { /// Processes events transformed from some unknown event envelope (EI) /// to then pass on to a gRPC event producer. -pub struct GrpcEventProcessor -where - F: Fn(&EI) -> Option, -{ +pub struct GrpcEventProcessor { flow: GrpcEventFlow, + producer_filter: PF, consumer_filters_receiver: watch::Receiver>, filter: Filter, - transformer: F, + transformer: T, phantom: PhantomData, } #[async_trait] -impl PendingHandler for GrpcEventProcessor +impl PendingHandler for GrpcEventProcessor where EI: WithPersistenceId + WithSeqNr + WithTags + WithTimestampOffset + Send, E: Send, - F: Fn(&EI) -> Option + Send, + PF: Fn(&EI) -> bool + Send, + T: Fn(&EI) -> Option + Send, { type Envelope = EI; @@ -78,7 +77,7 @@ where .update(self.consumer_filters_receiver.borrow().clone()); }; - let event = if self.filter.matches(&envelope) { + let event = if (self.producer_filter)(&envelope) && self.filter.matches(&envelope) { (self.transformer)(&envelope) } else { // Gaps are ok given a filter situation. @@ -122,17 +121,20 @@ impl GrpcEventFlow { /// Produces a handler for this flow. The handler will receive events, /// apply filters and, if the filters match, transform and forward events /// on to a gRPC producer. - pub fn handler( + pub fn handler( self, + producer_filter: PF, consumer_filters_receiver: watch::Receiver>, topic_tag_prefix: Tag, - transformer: F, - ) -> GrpcEventProcessor + transformer: T, + ) -> GrpcEventProcessor where - F: Fn(&EI) -> Option, + PF: Fn(&EI) -> bool, + T: Fn(&EI) -> Option, { GrpcEventProcessor { flow: self, + producer_filter, consumer_filters_receiver, filter: Filter::new(topic_tag_prefix), transformer, diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index 95a5145..5576236 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -40,8 +40,10 @@ pub async fn task( tokio::spawn(async { let channel = Channel::builder(event_consumer_addr); + let consumer_channel = || channel.connect(); + akka_projection_rs_grpc::producer::run( - || channel.connect(), + consumer_channel, OriginId::from("edge-iot-service"), StreamId::from("temperature-events"), consumer_filters, @@ -89,13 +91,21 @@ pub async fn task( // gRPC events to a remote consumer. The handler is a "flowing" one // where an upper limit of the number of envelopes in-flight is set. + let producer_filter = |_: &CommitLogEventEnvelope| true; + let topic_tag_prefix = Tag::from("t:"); + let event_handler = grpc_flow.handler( + producer_filter, + consumer_filters_receiver, + topic_tag_prefix, + transformer, + ); akka_projection_rs_storage::run( &secret_store, &offsets_key_secret_path, &state_storage_path, kill_switch, source_provider, - grpc_flow.handler(consumer_filters_receiver, Tag::from("t:"), transformer), + event_handler, Duration::from_millis(100), ) .await From e60ac7df309deafe6983507bb6d8d4758309f538 Mon Sep 17 00:00:00 2001 From: huntc Date: Sat, 7 Oct 2023 17:05:11 +1100 Subject: [PATCH 09/19] Formatting --- akka-projection-rs/src/consumer_filter.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 1ffa38c..3a3bb99 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -185,6 +185,7 @@ impl Filter { fn matches_tags(match_tags: &[Tag], tags: &[Tag]) -> bool { match_tags.iter().any(|mt| tags.iter().any(|t| t == mt)) } + fn matches_topics(expressions: &[TopicFilter], topic_tag_prefix: &Tag, tags: &[Tag]) -> bool { let topic_tag_prefix_len = topic_tag_prefix.len(); expressions.iter().any(|r| { From 9c6911f878601f4810228d4d932879a26bc8a8e9 Mon Sep 17 00:00:00 2001 From: huntc Date: Sun, 8 Oct 2023 14:57:06 +1100 Subject: [PATCH 10/19] Correctly merge criteria into filters --- akka-persistence-rs/src/lib.rs | 2 +- akka-projection-rs-grpc/src/lib.rs | 18 +-- akka-projection-rs/src/consumer_filter.rs | 130 +++++++++++++++++----- 3 files changed, 115 insertions(+), 35 deletions(-) diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index 9aa07d0..2d7073b 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -72,7 +72,7 @@ fn jdk_string_hashcode(s: &str) -> i32 { } /// A namespaced entity id given an entity type. -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, Deserialize, PartialOrd, Ord, Serialize, PartialEq, Eq, Hash)] pub struct PersistenceId { pub entity_type: EntityType, pub entity_id: EntityId, diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs index 1e7d42f..8afebef 100644 --- a/akka-projection-rs-grpc/src/lib.rs +++ b/akka-projection-rs-grpc/src/lib.rs @@ -3,7 +3,7 @@ use akka_persistence_rs::{ EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset, }; -use akka_projection_rs::consumer_filter::{FilterCriteria, PersistenceIdIdOffset}; +use akka_projection_rs::consumer_filter::{ComparableRegex, FilterCriteria, PersistenceIdIdOffset}; use mqtt::TopicFilter; use regex::Regex; use smol_str::SmolStr; @@ -64,28 +64,28 @@ impl From for proto::FilterCriteria { FilterCriteria::ExcludeRegexEntityIds { matching } => { proto::filter_criteria::Message::ExcludeMatchingEntityIds( proto::ExcludeRegexEntityIds { - matching: matching.into_iter().map(|v| v.to_string()).collect(), + matching: matching.into_iter().map(|v| v.0.to_string()).collect(), }, ) } FilterCriteria::RemoveExcludeRegexEntityIds { matching } => { proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( proto::RemoveExcludeRegexEntityIds { - matching: matching.into_iter().map(|v| v.to_string()).collect(), + matching: matching.into_iter().map(|v| v.0.to_string()).collect(), }, ) } FilterCriteria::IncludeRegexEntityIds { matching } => { proto::filter_criteria::Message::IncludeMatchingEntityIds( proto::IncludeRegexEntityIds { - matching: matching.into_iter().map(|v| v.to_string()).collect(), + matching: matching.into_iter().map(|v| v.0.to_string()).collect(), }, ) } FilterCriteria::RemoveIncludeRegexEntityIds { matching } => { proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( proto::RemoveIncludeRegexEntityIds { - matching: matching.into_iter().map(|v| v.to_string()).collect(), + matching: matching.into_iter().map(|v| v.0.to_string()).collect(), }, ) } @@ -188,7 +188,7 @@ pub fn to_filter_criteria( ) => FilterCriteria::ExcludeRegexEntityIds { matching: matching .into_iter() - .flat_map(|m| Regex::new(&m).ok()) + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) .collect(), }, proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( @@ -196,7 +196,7 @@ pub fn to_filter_criteria( ) => FilterCriteria::RemoveExcludeRegexEntityIds { matching: matching .into_iter() - .flat_map(|m| Regex::new(&m).ok()) + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) .collect(), }, proto::filter_criteria::Message::IncludeMatchingEntityIds( @@ -204,7 +204,7 @@ pub fn to_filter_criteria( ) => FilterCriteria::IncludeRegexEntityIds { matching: matching .into_iter() - .flat_map(|m| Regex::new(&m).ok()) + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) .collect(), }, proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( @@ -212,7 +212,7 @@ pub fn to_filter_criteria( ) => FilterCriteria::RemoveIncludeRegexEntityIds { matching: matching .into_iter() - .flat_map(|m| Regex::new(&m).ok()) + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) .collect(), }, proto::filter_criteria::Message::ExcludeEntityIds(proto::ExcludeEntityIds { diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 3a3bb99..2ce50c6 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -37,6 +37,29 @@ pub struct PersistenceIdIdOffset { pub seq_nr: u64, } +#[derive(Clone)] +pub struct ComparableRegex(pub Regex); + +impl PartialEq for ComparableRegex { + fn eq(&self, other: &Self) -> bool { + self.0.as_str() == other.0.as_str() + } +} + +impl Eq for ComparableRegex {} + +impl PartialOrd for ComparableRegex { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ComparableRegex { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.as_str().cmp(other.0.as_str()) + } +} + /// Exclude criteria are evaluated first. /// If no matching exclude criteria the event is emitted. /// If an exclude criteria is matching the include criteria are evaluated. @@ -56,14 +79,14 @@ pub enum FilterCriteria { RemoveIncludeTags { tags: Vec }, /// Exclude events for entities with entity ids matching the given regular expressions, /// unless there is a matching include filter that overrides the exclude. - ExcludeRegexEntityIds { matching: Vec }, + ExcludeRegexEntityIds { matching: Vec }, /// Remove a previously added `ExcludeRegexEntityIds`. - RemoveExcludeRegexEntityIds { matching: Vec }, + RemoveExcludeRegexEntityIds { matching: Vec }, /// Include events for entities with entity ids matching the given regular expressions. /// A matching include overrides a matching exclude. - IncludeRegexEntityIds { matching: Vec }, + IncludeRegexEntityIds { matching: Vec }, /// Remove a previously added `IncludeRegexEntityIds`. - RemoveIncludeRegexEntityIds { matching: Vec }, + RemoveIncludeRegexEntityIds { matching: Vec }, /// Exclude events for entities with the given persistence ids, /// unless there is a matching include filter that overrides the exclude. ExcludePersistenceIds { persistence_ids: Vec }, @@ -90,18 +113,17 @@ pub enum FilterCriteria { /// to include only events matching the topic filter. pub fn exclude_all() -> FilterCriteria { FilterCriteria::ExcludeRegexEntityIds { - matching: vec![Regex::new(".*").unwrap()], + matching: vec![ComparableRegex(Regex::new(".*").unwrap())], } } /// A collection of criteria pub struct Filter { topic_tag_prefix: Tag, - exclude_tags: Vec, include_tags: Vec, - exclude_regex_entity_ids: Vec, - include_regex_entity_ids: Vec, + exclude_regex_entity_ids: Vec, + include_regex_entity_ids: Vec, exclude_persistence_ids: Vec, include_persistence_ids: Vec, include_topics: Vec, @@ -171,8 +193,8 @@ impl Filter { Self::matches_topics(&self.include_topics, &self.topic_tag_prefix, tags) } - fn matches_regex_entity_ids(matching: &[Regex], entity_id: &EntityId) -> bool { - matching.iter().any(|r| r.is_match(entity_id)) + fn matches_regex_entity_ids(matching: &[ComparableRegex], entity_id: &EntityId) -> bool { + matching.iter().any(|r| r.0.is_match(entity_id)) } fn matches_persistence_ids( @@ -206,23 +228,81 @@ impl Filter { /// Updates the filter given commands to add or remove new criteria. pub fn update(&mut self, criteria: Vec) { for criterion in criteria { - #[rustfmt::skip] match criterion { - FilterCriteria::ExcludeTags { mut tags } => self.exclude_tags.append(&mut tags), - FilterCriteria::RemoveExcludeTags { tags } => self.exclude_tags.retain(|existing| !tags.contains(existing)), - FilterCriteria::IncludeTags { mut tags } => self.include_tags.append(&mut tags), - FilterCriteria::RemoveIncludeTags { tags } => self.include_tags.retain(|existing| !tags.contains(existing)), - FilterCriteria::ExcludeRegexEntityIds { mut matching } => self.exclude_regex_entity_ids.append(&mut matching), - FilterCriteria::RemoveExcludeRegexEntityIds { matching } => self.exclude_regex_entity_ids.retain(|existing| !matching.iter().map(|m| m.as_str()).collect::>().contains(&existing.as_str())), - FilterCriteria::IncludeRegexEntityIds { mut matching } => self.include_regex_entity_ids.append(&mut matching), - FilterCriteria::RemoveIncludeRegexEntityIds { matching } => self.include_regex_entity_ids.retain(|existing| !matching.iter().map(|m| m.as_str()).collect::>().contains(&existing.as_str())), - FilterCriteria::ExcludePersistenceIds { mut persistence_ids } => self.exclude_persistence_ids.append(&mut persistence_ids), - FilterCriteria::RemoveExcludePersistenceIds { persistence_ids } => self.exclude_persistence_ids.retain(|existing| !persistence_ids.contains(existing)), - FilterCriteria::IncludePersistenceIds { persistence_id_offsets } => self.include_persistence_ids.append(&mut persistence_id_offsets.into_iter().map(|PersistenceIdIdOffset { persistence_id, .. } | persistence_id).collect()), - FilterCriteria::RemoveIncludePersistenceIds { persistence_ids } => self.include_persistence_ids.retain(|existing| !persistence_ids.contains(existing)), - FilterCriteria::IncludeTopics { mut expressions } => self.include_topics.append(&mut expressions), - FilterCriteria::RemoveIncludeTopics { expressions } => self.include_topics.retain(|existing| !expressions.contains(existing)), + FilterCriteria::ExcludeTags { mut tags } => { + merge(&mut self.exclude_tags, &mut tags) + } + + FilterCriteria::RemoveExcludeTags { tags } => remove(&mut self.exclude_tags, &tags), + + FilterCriteria::IncludeTags { mut tags } => { + merge(&mut self.include_tags, &mut tags) + } + + FilterCriteria::RemoveIncludeTags { tags } => remove(&mut self.include_tags, &tags), + + FilterCriteria::ExcludeRegexEntityIds { mut matching } => { + merge(&mut self.exclude_regex_entity_ids, &mut matching) + } + + FilterCriteria::RemoveExcludeRegexEntityIds { matching } => { + remove(&mut self.exclude_regex_entity_ids, &matching) + } + + FilterCriteria::IncludeRegexEntityIds { mut matching } => { + merge(&mut self.include_regex_entity_ids, &mut matching) + } + + FilterCriteria::RemoveIncludeRegexEntityIds { matching } => { + remove(&mut self.include_regex_entity_ids, &matching) + } + + FilterCriteria::ExcludePersistenceIds { + mut persistence_ids, + } => merge(&mut self.exclude_persistence_ids, &mut persistence_ids), + + FilterCriteria::RemoveExcludePersistenceIds { persistence_ids } => { + remove(&mut self.exclude_persistence_ids, &persistence_ids) + } + + FilterCriteria::IncludePersistenceIds { + persistence_id_offsets, + } => merge( + &mut self.include_persistence_ids, + &mut persistence_id_offsets + .into_iter() + .map(|PersistenceIdIdOffset { persistence_id, .. }| persistence_id) + .collect(), + ), + + FilterCriteria::RemoveIncludePersistenceIds { persistence_ids } => { + remove(&mut self.include_persistence_ids, &persistence_ids) + } + + FilterCriteria::IncludeTopics { mut expressions } => { + merge(&mut self.include_topics, &mut expressions) + } + + FilterCriteria::RemoveIncludeTopics { expressions } => { + remove(&mut self.include_topics, &expressions) + } }; } } } + +fn merge(l: &mut Vec, r: &mut Vec) +where + T: Ord, +{ + l.append(r); + l.sort(); + l.dedup(); +} + +fn remove(l: &mut Vec, r: &[T]) +where + T: PartialEq, +{ + l.retain(|existing| !r.contains(existing)); +} From 4028a947746b677fb9df0aae0a735bb814fdc5c8 Mon Sep 17 00:00:00 2001 From: huntc Date: Sun, 8 Oct 2023 15:49:12 +1100 Subject: [PATCH 11/19] Filter tests --- akka-projection-rs/src/consumer_filter.rs | 160 ++++++++++++++++++++++ 1 file changed, 160 insertions(+) diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 2ce50c6..8f7863a 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -306,3 +306,163 @@ where { l.retain(|existing| !r.contains(existing)); } + +#[cfg(test)] +mod tests { + + use super::*; + + struct TestEnvelope { + persistence_id: PersistenceId, + tags: Vec, + } + + impl WithPersistenceId for TestEnvelope { + fn persistence_id(&self) -> PersistenceId { + self.persistence_id.clone() + } + } + + impl WithTags for TestEnvelope { + fn tags(&self) -> Vec { + self.tags.clone() + } + } + + #[test] + fn exclude_include_and_remove_include_tag_and_remove_exclude_tag() { + let persistence_id = "a|1".parse::().unwrap(); + let tag = Tag::from("a"); + + let envelope = TestEnvelope { + persistence_id: persistence_id.clone(), + tags: vec![tag.clone()], + }; + + let mut filter = Filter::new(Tag::from("")); + + let criteria = vec![ + FilterCriteria::ExcludeTags { + tags: vec![tag.clone()], + }, + FilterCriteria::IncludeTags { + tags: vec![tag.clone()], + }, + ]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveIncludeTags { + tags: vec![tag.clone()], + }]; + filter.update(criteria); + assert!(!filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveExcludeTags { tags: vec![tag] }]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + } + + #[test] + fn exclude_include_and_remove_include_persistence_id_and_remove_exclude_persistence_id() { + let persistence_id = "a|1".parse::().unwrap(); + + let envelope = TestEnvelope { + persistence_id: persistence_id.clone(), + tags: vec![], + }; + + let mut filter = Filter::new(Tag::from("")); + + let criteria = vec![ + FilterCriteria::ExcludePersistenceIds { + persistence_ids: vec![persistence_id.clone()], + }, + FilterCriteria::IncludePersistenceIds { + persistence_id_offsets: vec![PersistenceIdIdOffset { + persistence_id: persistence_id.clone(), + seq_nr: 0, + }], + }, + ]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveIncludePersistenceIds { + persistence_ids: vec![persistence_id.clone()], + }]; + filter.update(criteria); + assert!(!filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveExcludePersistenceIds { + persistence_ids: vec![persistence_id.clone()], + }]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + } + + #[test] + fn exclude_include_and_remove_include_regex_entity_id_and_remove_exclude_regex_entity_id() { + let persistence_id = "a|1".parse::().unwrap(); + let matching = ComparableRegex(Regex::new("1").unwrap()); + + let envelope = TestEnvelope { + persistence_id: persistence_id.clone(), + tags: vec![], + }; + + let mut filter = Filter::new(Tag::from("")); + + let criteria = vec![ + FilterCriteria::ExcludeRegexEntityIds { + matching: vec![matching.clone()], + }, + FilterCriteria::IncludeRegexEntityIds { + matching: vec![matching.clone()], + }, + ]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveIncludeRegexEntityIds { + matching: vec![matching.clone()], + }]; + filter.update(criteria); + assert!(!filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveExcludeRegexEntityIds { + matching: vec![matching.clone()], + }]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + } + + #[test] + fn include_and_remove_include_topic() { + let persistence_id = "a|1".parse::().unwrap(); + let tag = Tag::from("t:sport/abc/player1"); + let expression = TopicFilter::new("sport/+/player1").unwrap(); + + let envelope = TestEnvelope { + persistence_id: persistence_id.clone(), + tags: vec![tag.clone()], + }; + + let mut filter = Filter::new(Tag::from("t:")); + + let criteria = vec![ + exclude_all(), + FilterCriteria::IncludeTopics { + expressions: vec![expression.clone()], + }, + ]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveIncludeTopics { + expressions: vec![expression.clone()], + }]; + filter.update(criteria); + assert!(!filter.matches(&envelope)); + } +} From 7a51787b966c8d5afb0edcd38be8f4cc3dd2166d Mon Sep 17 00:00:00 2001 From: huntc Date: Sun, 8 Oct 2023 15:58:17 +1100 Subject: [PATCH 12/19] Use more refs - reduces the clones --- akka-persistence-rs-commitlog/src/lib.rs | 21 +++++++++------- akka-persistence-rs/src/lib.rs | 4 ++-- akka-projection-rs-commitlog/src/lib.rs | 1 + .../src/offset_store.rs | 1 + akka-projection-rs-grpc/src/producer.rs | 2 +- akka-projection-rs-storage/src/lib.rs | 1 + akka-projection-rs/src/consumer_filter.rs | 24 +++++++++---------- 7 files changed, 30 insertions(+), 24 deletions(-) diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 8a52bce..f4ff507 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -2,8 +2,8 @@ use akka_persistence_rs::{ entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider}, - EntityId, EntityType, Offset, PersistenceId, TimestampOffset, WithOffset, WithPersistenceId, - WithSeqNr, WithTags, WithTimestampOffset, + EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset, + WithPersistenceId, WithSeqNr, WithTags, WithTimestampOffset, }; use async_stream::stream; use async_trait::async_trait; @@ -20,6 +20,9 @@ use streambed::{ use tokio_stream::{Stream, StreamExt}; /// An envelope wraps a commit log event associated with a specific entity. +/// Tags are not presently considered useful at the edge. A remote consumer would be interested +/// in all events from the edge in most cases, and the edge itself decides what to publish +/// (producer defined filter). #[derive(Clone, Debug, PartialEq)] pub struct EventEnvelope { pub persistence_id: PersistenceId, @@ -27,11 +30,12 @@ pub struct EventEnvelope { pub timestamp: DateTime, pub event: E, pub offset: CommitLogOffset, + pub tags: Vec, } impl WithPersistenceId for EventEnvelope { - fn persistence_id(&self) -> PersistenceId { - self.persistence_id.clone() + fn persistence_id(&self) -> &PersistenceId { + &self.persistence_id } } @@ -48,11 +52,8 @@ impl WithSeqNr for EventEnvelope { } impl WithTags for EventEnvelope { - fn tags(&self) -> Vec { - // Tags are not presently considered useful at the edge. A remote consumer would be interested - // in all events from the edge in most cases, and the edge itself decides what to publish - // (producer defined filter). - vec![] + fn tags(&self) -> &[akka_persistence_rs::Tag] { + &self.tags } } @@ -157,6 +158,7 @@ where timestamp, event, offset: record.offset, + tags: vec![], }) }) }) @@ -501,6 +503,7 @@ mod tests { timestamp, event, offset: 0, + tags: vec![], }) } diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index 2d7073b..ef750ec 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -26,12 +26,12 @@ pub type Tag = SmolStr; /// Implemented by structures that can return a persistence id. pub trait WithPersistenceId { - fn persistence_id(&self) -> PersistenceId; + fn persistence_id(&self) -> &PersistenceId; } /// Implemented by structures that can return tags. pub trait WithTags { - fn tags(&self) -> Vec; + fn tags(&self) -> &[Tag]; } /// A slice is deterministically defined based on the persistence id. diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 913a062..b1c871d 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -192,6 +192,7 @@ mod tests { timestamp, event, offset: 0, + tags: vec![], }) } diff --git a/akka-projection-rs-commitlog/src/offset_store.rs b/akka-projection-rs-commitlog/src/offset_store.rs index b2457fc..487ee94 100644 --- a/akka-projection-rs-commitlog/src/offset_store.rs +++ b/akka-projection-rs-commitlog/src/offset_store.rs @@ -51,6 +51,7 @@ where timestamp, event, offset: record.offset, + tags: vec![], }) } diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index 43ab265..c3fcdc2 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -85,7 +85,7 @@ where }; let transformation = Transformation { - entity_id: envelope.persistence_id().entity_id, + entity_id: envelope.persistence_id().entity_id.clone(), seq_nr: envelope.seq_nr(), offset: envelope.timestamp_offset(), event, diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index 5b4d603..0398933 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -275,6 +275,7 @@ mod tests { value: self.event_value.clone(), }, offset: 0, + tags: vec![] } } } diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 8f7863a..70293d9 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -150,16 +150,16 @@ impl Filter { { let tags = envelope.tags(); let persistence_id = envelope.persistence_id(); - let entity_id = persistence_id.entity_id.clone(); + let entity_id = &persistence_id.entity_id; - if self.matches_exclude_tags(&tags) - || self.matches_exclude_persistence_ids(&persistence_id) - || self.matches_exclude_regex_entity_ids(&entity_id) + if self.matches_exclude_tags(tags) + || self.matches_exclude_persistence_ids(persistence_id) + || self.matches_exclude_regex_entity_ids(entity_id) { - self.matches_include_tags(&tags) - || self.matches_include_topics(&tags) - || self.matches_include_persistence_ids(&persistence_id) - || self.matches_include_regex_entity_ids(&entity_id) + self.matches_include_tags(tags) + || self.matches_include_topics(tags) + || self.matches_include_persistence_ids(persistence_id) + || self.matches_include_regex_entity_ids(entity_id) } else { true } @@ -318,14 +318,14 @@ mod tests { } impl WithPersistenceId for TestEnvelope { - fn persistence_id(&self) -> PersistenceId { - self.persistence_id.clone() + fn persistence_id(&self) -> &PersistenceId { + &self.persistence_id } } impl WithTags for TestEnvelope { - fn tags(&self) -> Vec { - self.tags.clone() + fn tags(&self) -> &[Tag] { + &self.tags } } From 820dae8be971460ee7ef905784719426b94ca169 Mon Sep 17 00:00:00 2001 From: huntc Date: Sun, 8 Oct 2023 16:43:18 +1100 Subject: [PATCH 13/19] Improved bench Was previously running the stream only once --- akka-projection-rs-storage/benches/benches.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/akka-projection-rs-storage/benches/benches.rs b/akka-projection-rs-storage/benches/benches.rs index e9791f0..d4aa102 100644 --- a/akka-projection-rs-storage/benches/benches.rs +++ b/akka-projection-rs-storage/benches/benches.rs @@ -40,8 +40,10 @@ impl SourceProvider for TestSourceProvider { FR: Future> + Send, { let _ = offset().await; - Box::pin(stream!(for offset in 0..NUM_EVENTS as u64 { - yield TestEnvelope { offset }; + Box::pin(stream!(loop { + for offset in 0..NUM_EVENTS as u64 { + yield TestEnvelope { offset }; + } })) } } @@ -58,6 +60,7 @@ impl Handler for TestHandler { const LAST_OFFSET: u64 = NUM_EVENTS as u64 - 1; if envelope.offset == LAST_OFFSET { self.events_processed.notify_one(); + return Err(HandlerError); } Ok(()) } From ba3fae7463ae74ffc309924244c7841c9cfcac93 Mon Sep 17 00:00:00 2001 From: huntc Date: Sun, 8 Oct 2023 16:49:53 +1100 Subject: [PATCH 14/19] Improved some names --- examples/iot-service/backend/src/temperature_production.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index 5576236..ca25d9f 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -39,11 +39,11 @@ pub async fn task( let task_entity_type = entity_type.clone(); tokio::spawn(async { - let channel = Channel::builder(event_consumer_addr); - let consumer_channel = || channel.connect(); + let consumer_endpoint = Channel::builder(event_consumer_addr); + let consumer_connector = || consumer_endpoint.connect(); akka_projection_rs_grpc::producer::run( - consumer_channel, + consumer_connector, OriginId::from("edge-iot-service"), StreamId::from("temperature-events"), consumer_filters, From cc3e556f5aa14268b40d59311fe4ab7c2e0af205 Mon Sep 17 00:00:00 2001 From: huntc Date: Mon, 9 Oct 2023 10:18:04 +1100 Subject: [PATCH 15/19] Restrict the max size of a filter's fields We should always endeavour to avoid running out of heap --- akka-projection-rs/src/consumer_filter.rs | 38 ++++++++++++++++------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 70293d9..873836a 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -129,6 +129,11 @@ pub struct Filter { include_topics: Vec, } +const MAX_TAGS: usize = 100; +const MAX_REGEX_ENTITY_IDS: usize = 100; +const MAX_PERSISTENCE_IDS: usize = 100; +const MAX_TOPICS: usize = 100; + impl Filter { pub fn new(topic_tag_prefix: Tag) -> Self { Self { @@ -230,19 +235,22 @@ impl Filter { for criterion in criteria { match criterion { FilterCriteria::ExcludeTags { mut tags } => { - merge(&mut self.exclude_tags, &mut tags) + merge::<_, MAX_TAGS>(&mut self.exclude_tags, &mut tags) } FilterCriteria::RemoveExcludeTags { tags } => remove(&mut self.exclude_tags, &tags), FilterCriteria::IncludeTags { mut tags } => { - merge(&mut self.include_tags, &mut tags) + merge::<_, MAX_TAGS>(&mut self.include_tags, &mut tags) } FilterCriteria::RemoveIncludeTags { tags } => remove(&mut self.include_tags, &tags), FilterCriteria::ExcludeRegexEntityIds { mut matching } => { - merge(&mut self.exclude_regex_entity_ids, &mut matching) + merge::<_, MAX_REGEX_ENTITY_IDS>( + &mut self.exclude_regex_entity_ids, + &mut matching, + ) } FilterCriteria::RemoveExcludeRegexEntityIds { matching } => { @@ -250,7 +258,10 @@ impl Filter { } FilterCriteria::IncludeRegexEntityIds { mut matching } => { - merge(&mut self.include_regex_entity_ids, &mut matching) + merge::<_, MAX_REGEX_ENTITY_IDS>( + &mut self.include_regex_entity_ids, + &mut matching, + ) } FilterCriteria::RemoveIncludeRegexEntityIds { matching } => { @@ -259,7 +270,10 @@ impl Filter { FilterCriteria::ExcludePersistenceIds { mut persistence_ids, - } => merge(&mut self.exclude_persistence_ids, &mut persistence_ids), + } => merge::<_, MAX_PERSISTENCE_IDS>( + &mut self.exclude_persistence_ids, + &mut persistence_ids, + ), FilterCriteria::RemoveExcludePersistenceIds { persistence_ids } => { remove(&mut self.exclude_persistence_ids, &persistence_ids) @@ -267,7 +281,7 @@ impl Filter { FilterCriteria::IncludePersistenceIds { persistence_id_offsets, - } => merge( + } => merge::<_, MAX_PERSISTENCE_IDS>( &mut self.include_persistence_ids, &mut persistence_id_offsets .into_iter() @@ -280,7 +294,7 @@ impl Filter { } FilterCriteria::IncludeTopics { mut expressions } => { - merge(&mut self.include_topics, &mut expressions) + merge::<_, MAX_TOPICS>(&mut self.include_topics, &mut expressions) } FilterCriteria::RemoveIncludeTopics { expressions } => { @@ -291,13 +305,15 @@ impl Filter { } } -fn merge(l: &mut Vec, r: &mut Vec) +fn merge(l: &mut Vec, r: &mut Vec) where T: Ord, { - l.append(r); - l.sort(); - l.dedup(); + if l.len() + r.len() < MAX_LEN * 2 { + l.append(r); + l.sort(); + l.dedup(); + } } fn remove(l: &mut Vec, r: &[T]) From 928b5d8b6a0f853fcc918212e9b11bff409db0a9 Mon Sep 17 00:00:00 2001 From: huntc Date: Mon, 9 Oct 2023 11:10:30 +1100 Subject: [PATCH 16/19] Improve the filter dx --- akka-projection-rs-grpc/src/producer.rs | 5 +- akka-projection-rs/src/consumer_filter.rs | 84 ++++++++++++------- .../backend/src/temperature_production.rs | 7 +- 3 files changed, 62 insertions(+), 34 deletions(-) diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index c3fcdc2..1846103 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -1,7 +1,6 @@ use akka_persistence_rs::EntityId; use akka_persistence_rs::EntityType; use akka_persistence_rs::PersistenceId; -use akka_persistence_rs::Tag; use akka_persistence_rs::TimestampOffset; use akka_persistence_rs::WithPersistenceId; use akka_persistence_rs::WithSeqNr; @@ -125,7 +124,7 @@ impl GrpcEventFlow { self, producer_filter: PF, consumer_filters_receiver: watch::Receiver>, - topic_tag_prefix: Tag, + filter: Filter, transformer: T, ) -> GrpcEventProcessor where @@ -136,7 +135,7 @@ impl GrpcEventFlow { flow: self, producer_filter, consumer_filters_receiver, - filter: Filter::new(topic_tag_prefix), + filter, transformer, phantom: PhantomData, } diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 873836a..472b48f 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -120,30 +120,58 @@ pub fn exclude_all() -> FilterCriteria { /// A collection of criteria pub struct Filter { topic_tag_prefix: Tag, + max_tags: usize, exclude_tags: Vec, include_tags: Vec, + max_regex_entity_ids: usize, exclude_regex_entity_ids: Vec, include_regex_entity_ids: Vec, + max_persistence_ids: usize, exclude_persistence_ids: Vec, include_persistence_ids: Vec, + max_topics: usize, include_topics: Vec, } -const MAX_TAGS: usize = 100; -const MAX_REGEX_ENTITY_IDS: usize = 100; -const MAX_PERSISTENCE_IDS: usize = 100; -const MAX_TOPICS: usize = 100; +impl Default for Filter { + fn default() -> Self { + Self { + topic_tag_prefix: Tag::from("t:"), + max_tags: 10, + exclude_tags: vec![], + include_tags: vec![], + max_regex_entity_ids: 10, + exclude_regex_entity_ids: vec![], + include_regex_entity_ids: vec![], + max_persistence_ids: 10, + exclude_persistence_ids: vec![], + include_persistence_ids: vec![], + max_topics: 10, + include_topics: vec![], + } + } +} impl Filter { - pub fn new(topic_tag_prefix: Tag) -> Self { + pub fn new( + topic_tag_prefix: Tag, + max_tags: usize, + max_regex_entity_ids: usize, + max_persistence_ids: usize, + max_topics: usize, + ) -> Self { Self { topic_tag_prefix, + max_tags, exclude_tags: vec![], include_tags: vec![], + max_regex_entity_ids, exclude_regex_entity_ids: vec![], include_regex_entity_ids: vec![], + max_persistence_ids, exclude_persistence_ids: vec![], include_persistence_ids: vec![], + max_topics, include_topics: vec![], } } @@ -235,34 +263,32 @@ impl Filter { for criterion in criteria { match criterion { FilterCriteria::ExcludeTags { mut tags } => { - merge::<_, MAX_TAGS>(&mut self.exclude_tags, &mut tags) + merge(&mut self.exclude_tags, &mut tags, self.max_tags) } FilterCriteria::RemoveExcludeTags { tags } => remove(&mut self.exclude_tags, &tags), FilterCriteria::IncludeTags { mut tags } => { - merge::<_, MAX_TAGS>(&mut self.include_tags, &mut tags) + merge(&mut self.include_tags, &mut tags, self.max_tags) } FilterCriteria::RemoveIncludeTags { tags } => remove(&mut self.include_tags, &tags), - FilterCriteria::ExcludeRegexEntityIds { mut matching } => { - merge::<_, MAX_REGEX_ENTITY_IDS>( - &mut self.exclude_regex_entity_ids, - &mut matching, - ) - } + FilterCriteria::ExcludeRegexEntityIds { mut matching } => merge( + &mut self.exclude_regex_entity_ids, + &mut matching, + self.max_regex_entity_ids, + ), FilterCriteria::RemoveExcludeRegexEntityIds { matching } => { remove(&mut self.exclude_regex_entity_ids, &matching) } - FilterCriteria::IncludeRegexEntityIds { mut matching } => { - merge::<_, MAX_REGEX_ENTITY_IDS>( - &mut self.include_regex_entity_ids, - &mut matching, - ) - } + FilterCriteria::IncludeRegexEntityIds { mut matching } => merge( + &mut self.include_regex_entity_ids, + &mut matching, + self.max_regex_entity_ids, + ), FilterCriteria::RemoveIncludeRegexEntityIds { matching } => { remove(&mut self.include_regex_entity_ids, &matching) @@ -270,9 +296,10 @@ impl Filter { FilterCriteria::ExcludePersistenceIds { mut persistence_ids, - } => merge::<_, MAX_PERSISTENCE_IDS>( + } => merge( &mut self.exclude_persistence_ids, &mut persistence_ids, + self.max_persistence_ids, ), FilterCriteria::RemoveExcludePersistenceIds { persistence_ids } => { @@ -281,12 +308,13 @@ impl Filter { FilterCriteria::IncludePersistenceIds { persistence_id_offsets, - } => merge::<_, MAX_PERSISTENCE_IDS>( + } => merge( &mut self.include_persistence_ids, &mut persistence_id_offsets .into_iter() .map(|PersistenceIdIdOffset { persistence_id, .. }| persistence_id) .collect(), + self.max_persistence_ids, ), FilterCriteria::RemoveIncludePersistenceIds { persistence_ids } => { @@ -294,7 +322,7 @@ impl Filter { } FilterCriteria::IncludeTopics { mut expressions } => { - merge::<_, MAX_TOPICS>(&mut self.include_topics, &mut expressions) + merge(&mut self.include_topics, &mut expressions, self.max_topics) } FilterCriteria::RemoveIncludeTopics { expressions } => { @@ -305,11 +333,11 @@ impl Filter { } } -fn merge(l: &mut Vec, r: &mut Vec) +fn merge(l: &mut Vec, r: &mut Vec, max_len: usize) where T: Ord, { - if l.len() + r.len() < MAX_LEN * 2 { + if l.len() < max_len && r.len() < max_len { l.append(r); l.sort(); l.dedup(); @@ -355,7 +383,7 @@ mod tests { tags: vec![tag.clone()], }; - let mut filter = Filter::new(Tag::from("")); + let mut filter = Filter::default(); let criteria = vec![ FilterCriteria::ExcludeTags { @@ -388,7 +416,7 @@ mod tests { tags: vec![], }; - let mut filter = Filter::new(Tag::from("")); + let mut filter = Filter::default(); let criteria = vec![ FilterCriteria::ExcludePersistenceIds { @@ -427,7 +455,7 @@ mod tests { tags: vec![], }; - let mut filter = Filter::new(Tag::from("")); + let mut filter = Filter::default(); let criteria = vec![ FilterCriteria::ExcludeRegexEntityIds { @@ -464,7 +492,7 @@ mod tests { tags: vec![tag.clone()], }; - let mut filter = Filter::new(Tag::from("t:")); + let mut filter = Filter::default(); let criteria = vec![ exclude_all(), diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index ca25d9f..043f1d4 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -2,8 +2,9 @@ use crate::proto; use crate::temperature::{self, EventEnvelopeMarshaler}; -use akka_persistence_rs::{EntityType, Tag}; +use akka_persistence_rs::EntityType; use akka_persistence_rs_commitlog::EventEnvelope as CommitLogEventEnvelope; +use akka_projection_rs::consumer_filter::Filter; use akka_projection_rs_commitlog::CommitLogSourceProvider; use akka_projection_rs_grpc::producer::GrpcEventFlow; use akka_projection_rs_grpc::{OriginId, StreamId}; @@ -92,11 +93,11 @@ pub async fn task( // where an upper limit of the number of envelopes in-flight is set. let producer_filter = |_: &CommitLogEventEnvelope| true; - let topic_tag_prefix = Tag::from("t:"); + let consumer_filter = Filter::default(); let event_handler = grpc_flow.handler( producer_filter, consumer_filters_receiver, - topic_tag_prefix, + consumer_filter, transformer, ); akka_projection_rs_storage::run( From f1162a4bd149e8f3b9f8f8402cdc08a0deeb31d3 Mon Sep 17 00:00:00 2001 From: huntc Date: Mon, 9 Oct 2023 11:16:01 +1100 Subject: [PATCH 17/19] More DX tidy-up --- examples/iot-service/backend/src/temperature_production.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index 043f1d4..d400deb 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -34,8 +34,6 @@ pub async fn task( let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]); let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10); - let grpc_flow = GrpcEventFlow::new(entity_type.clone(), grpc_producer); - let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); let task_entity_type = entity_type.clone(); @@ -60,7 +58,7 @@ pub async fn task( let source_provider = CommitLogSourceProvider::new( commit_log, EventEnvelopeMarshaler { - entity_type, + entity_type: entity_type.clone(), events_key_secret_path: Arc::from(events_key_secret_path), secret_store: secret_store.clone(), }, @@ -92,6 +90,7 @@ pub async fn task( // gRPC events to a remote consumer. The handler is a "flowing" one // where an upper limit of the number of envelopes in-flight is set. + let grpc_flow = GrpcEventFlow::new(entity_type, grpc_producer); let producer_filter = |_: &CommitLogEventEnvelope| true; let consumer_filter = Filter::default(); let event_handler = grpc_flow.handler( @@ -100,6 +99,7 @@ pub async fn task( consumer_filter, transformer, ); + akka_projection_rs_storage::run( &secret_store, &offsets_key_secret_path, From 0fc0f50c4ce9ae8f49be79f866d4e5361119479c Mon Sep 17 00:00:00 2001 From: huntc Date: Tue, 10 Oct 2023 10:21:53 +1100 Subject: [PATCH 18/19] MQTT protocol is now hidden The MQTT TopicFilter is hidden to the outside so that we can freely change it, as we're not necessarily supporting all of MQTT's capabilities. --- akka-projection-rs-grpc/Cargo.toml | 1 - akka-projection-rs-grpc/src/lib.rs | 9 +++--- akka-projection-rs/src/consumer_filter.rs | 34 +++++++++++++++++++---- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/akka-projection-rs-grpc/Cargo.toml b/akka-projection-rs-grpc/Cargo.toml index 04b236f..6a1a92e 100644 --- a/akka-projection-rs-grpc/Cargo.toml +++ b/akka-projection-rs-grpc/Cargo.toml @@ -11,7 +11,6 @@ chrono = { workspace = true } exponential-backoff = { workspace = true } futures = { workspace = true } log = { workspace = true } -mqtt-protocol = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } regex = { workspace = true } diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs index 8afebef..9dafd2a 100644 --- a/akka-projection-rs-grpc/src/lib.rs +++ b/akka-projection-rs-grpc/src/lib.rs @@ -3,8 +3,9 @@ use akka_persistence_rs::{ EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset, }; -use akka_projection_rs::consumer_filter::{ComparableRegex, FilterCriteria, PersistenceIdIdOffset}; -use mqtt::TopicFilter; +use akka_projection_rs::consumer_filter::{ + ComparableRegex, FilterCriteria, PersistenceIdIdOffset, TopicMatcher, +}; use regex::Regex; use smol_str::SmolStr; @@ -260,7 +261,7 @@ pub fn to_filter_criteria( }) => FilterCriteria::IncludeTopics { expressions: expression .into_iter() - .flat_map(|e| TopicFilter::new(e).ok()) + .flat_map(|e| TopicMatcher::new(e).ok()) .collect(), }, proto::filter_criteria::Message::RemoveIncludeTopics( @@ -268,7 +269,7 @@ pub fn to_filter_criteria( ) => FilterCriteria::RemoveIncludeTopics { expressions: expression .into_iter() - .flat_map(|e| TopicFilter::new(e).ok()) + .flat_map(|e| TopicMatcher::new(e).ok()) .collect(), }, }; diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 472b48f..99fcd02 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -25,6 +25,8 @@ //! * IncludeRegexEntityIds - include events for entities with entity ids matching the given regular expressions //! * IncludeEntityIds - include events for entities with the given entity ids +use std::fmt::Display; + use akka_persistence_rs::{EntityId, PersistenceId, Tag, WithPersistenceId, WithTags}; use mqtt::{TopicFilter, TopicNameRef}; use regex::Regex; @@ -60,6 +62,26 @@ impl Ord for ComparableRegex { } } +#[derive(Debug, Clone, Ord, Eq, PartialEq, PartialOrd)] +pub struct TopicMatcher(TopicFilter); + +#[derive(Debug)] +pub struct BadTopicMatcher; + +impl TopicMatcher { + pub fn new>(matcher: S) -> Result { + Ok(Self( + TopicFilter::new(matcher).map_err(|_| BadTopicMatcher)?, + )) + } +} + +impl Display for TopicMatcher { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + /// Exclude criteria are evaluated first. /// If no matching exclude criteria the event is emitted. /// If an exclude criteria is matching the include criteria are evaluated. @@ -104,9 +126,9 @@ pub enum FilterCriteria { RemoveIncludePersistenceIds { persistence_ids: Vec }, /// Include events with any of the given matching topics. A matching include overrides /// a matching exclude. - IncludeTopics { expressions: Vec }, + IncludeTopics { expressions: Vec }, /// Remove a previously added `IncludeTopics`. - RemoveIncludeTopics { expressions: Vec }, + RemoveIncludeTopics { expressions: Vec }, } /// Exclude events from all entity ids, convenience for combining with for example a topic filter @@ -130,7 +152,7 @@ pub struct Filter { exclude_persistence_ids: Vec, include_persistence_ids: Vec, max_topics: usize, - include_topics: Vec, + include_topics: Vec, } impl Default for Filter { @@ -241,10 +263,10 @@ impl Filter { match_tags.iter().any(|mt| tags.iter().any(|t| t == mt)) } - fn matches_topics(expressions: &[TopicFilter], topic_tag_prefix: &Tag, tags: &[Tag]) -> bool { + fn matches_topics(expressions: &[TopicMatcher], topic_tag_prefix: &Tag, tags: &[Tag]) -> bool { let topic_tag_prefix_len = topic_tag_prefix.len(); expressions.iter().any(|r| { - let matcher = r.get_matcher(); + let matcher = r.0.get_matcher(); tags.iter() .filter(|t| t.starts_with(topic_tag_prefix.as_str())) .any(|t| { @@ -485,7 +507,7 @@ mod tests { fn include_and_remove_include_topic() { let persistence_id = "a|1".parse::().unwrap(); let tag = Tag::from("t:sport/abc/player1"); - let expression = TopicFilter::new("sport/+/player1").unwrap(); + let expression = TopicMatcher::new("sport/+/player1").unwrap(); let envelope = TestEnvelope { persistence_id: persistence_id.clone(), From d2e1550e7d584afbe0c85e0f783c6b27265a08f9 Mon Sep 17 00:00:00 2001 From: huntc Date: Tue, 10 Oct 2023 21:11:24 +1100 Subject: [PATCH 19/19] Filter by entity ids for edge based consumers --- akka-projection-rs-grpc/src/consumer.rs | 8 +- akka-projection-rs-grpc/src/lib.rs | 275 ++++++++---------- akka-projection-rs-grpc/src/producer.rs | 5 +- akka-projection-rs/src/consumer_filter.rs | 116 ++++---- .../backend/src/temperature_production.rs | 2 - 5 files changed, 186 insertions(+), 220 deletions(-) diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index 085c9fd..dead6c2 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -297,7 +297,7 @@ mod tests { use super::*; use akka_persistence_rs::{EntityId, EntityType, PersistenceId}; - use akka_projection_rs::consumer_filter::{self, PersistenceIdIdOffset}; + use akka_projection_rs::consumer_filter::{self, EntityIdOffset}; use async_stream::stream; use chrono::{DateTime, Utc}; use prost_types::Any; @@ -491,9 +491,9 @@ mod tests { }); let (consumer_filters, consumer_filters_receiver) = - watch::channel(vec![FilterCriteria::IncludePersistenceIds { - persistence_id_offsets: vec![PersistenceIdIdOffset { - persistence_id: persistence_id.clone(), + watch::channel(vec![FilterCriteria::IncludeEntityIds { + entity_id_offsets: vec![EntityIdOffset { + entity_id: persistence_id.entity_id.clone(), seq_nr: 0, }], }]); diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs index 9dafd2a..c53ac74 100644 --- a/akka-projection-rs-grpc/src/lib.rs +++ b/akka-projection-rs-grpc/src/lib.rs @@ -1,10 +1,8 @@ #![doc = include_str!("../README.md")] -use akka_persistence_rs::{ - EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset, -}; +use akka_persistence_rs::{EntityId, Offset, PersistenceId, Tag, TimestampOffset, WithOffset}; use akka_projection_rs::consumer_filter::{ - ComparableRegex, FilterCriteria, PersistenceIdIdOffset, TopicMatcher, + ComparableRegex, EntityIdOffset, FilterCriteria, TopicMatcher, }; use regex::Regex; use smol_str::SmolStr; @@ -90,47 +88,35 @@ impl From for proto::FilterCriteria { }, ) } - FilterCriteria::ExcludePersistenceIds { persistence_ids } => { + FilterCriteria::ExcludeEntityIds { entity_ids } => { proto::filter_criteria::Message::ExcludeEntityIds(proto::ExcludeEntityIds { - entity_ids: persistence_ids - .into_iter() - .map(|v| v.entity_id.to_string()) - .collect(), + entity_ids: entity_ids.into_iter().map(|v| v.to_string()).collect(), }) } - FilterCriteria::RemoveExcludePersistenceIds { persistence_ids } => { + FilterCriteria::RemoveExcludeEntityIds { entity_ids } => { proto::filter_criteria::Message::RemoveExcludeEntityIds( proto::RemoveExcludeEntityIds { - entity_ids: persistence_ids - .into_iter() - .map(|v| v.entity_id.to_string()) - .collect(), + entity_ids: entity_ids.into_iter().map(|v| v.to_string()).collect(), }, ) } - FilterCriteria::IncludePersistenceIds { - persistence_id_offsets, - } => proto::filter_criteria::Message::IncludeEntityIds(proto::IncludeEntityIds { - entity_id_offset: persistence_id_offsets - .into_iter() - .map( - |PersistenceIdIdOffset { - persistence_id, - seq_nr, - }| proto::EntityIdOffset { - entity_id: persistence_id.entity_id.to_string(), - seq_nr: seq_nr as i64, - }, - ) - .collect(), - }), - FilterCriteria::RemoveIncludePersistenceIds { persistence_ids } => { + FilterCriteria::IncludeEntityIds { entity_id_offsets } => { + proto::filter_criteria::Message::IncludeEntityIds(proto::IncludeEntityIds { + entity_id_offset: entity_id_offsets + .into_iter() + .map( + |EntityIdOffset { entity_id, seq_nr }| proto::EntityIdOffset { + entity_id: entity_id.to_string(), + seq_nr: seq_nr as i64, + }, + ) + .collect(), + }) + } + FilterCriteria::RemoveIncludeEntityIds { entity_ids } => { proto::filter_criteria::Message::RemoveIncludeEntityIds( proto::RemoveIncludeEntityIds { - entity_ids: persistence_ids - .into_iter() - .map(|v| v.entity_id.to_string()) - .collect(), + entity_ids: entity_ids.into_iter().map(|v| v.to_string()).collect(), }, ) } @@ -157,124 +143,113 @@ pub struct NoMessage; /// Attempt to convert from a protobuf filter criteria to a model /// representation given an entity type. -pub fn to_filter_criteria( - entity_type: EntityType, - value: proto::FilterCriteria, -) -> Result { - match value.message { - Some(message) => { - let criteria = match message { - proto::filter_criteria::Message::ExcludeTags(proto::ExcludeTags { tags }) => { - FilterCriteria::ExcludeTags { - tags: tags.into_iter().map(Tag::from).collect(), +impl TryFrom for FilterCriteria { + type Error = NoMessage; + + fn try_from(value: proto::FilterCriteria) -> Result { + match value.message { + Some(message) => { + let criteria = match message { + proto::filter_criteria::Message::ExcludeTags(proto::ExcludeTags { tags }) => { + FilterCriteria::ExcludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + } } - } - proto::filter_criteria::Message::RemoveExcludeTags(proto::RemoveExcludeTags { - tags, - }) => FilterCriteria::RemoveExcludeTags { - tags: tags.into_iter().map(Tag::from).collect(), - }, - proto::filter_criteria::Message::IncludeTags(proto::IncludeTags { tags }) => { - FilterCriteria::IncludeTags { + proto::filter_criteria::Message::RemoveExcludeTags( + proto::RemoveExcludeTags { tags }, + ) => FilterCriteria::RemoveExcludeTags { tags: tags.into_iter().map(Tag::from).collect(), + }, + proto::filter_criteria::Message::IncludeTags(proto::IncludeTags { tags }) => { + FilterCriteria::IncludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + } } - } - proto::filter_criteria::Message::RemoveIncludeTags(proto::RemoveIncludeTags { - tags, - }) => FilterCriteria::RemoveIncludeTags { - tags: tags.into_iter().map(Tag::from).collect(), - }, - proto::filter_criteria::Message::ExcludeMatchingEntityIds( - proto::ExcludeRegexEntityIds { matching }, - ) => FilterCriteria::ExcludeRegexEntityIds { - matching: matching - .into_iter() - .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) - .collect(), - }, - proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( - proto::RemoveExcludeRegexEntityIds { matching }, - ) => FilterCriteria::RemoveExcludeRegexEntityIds { - matching: matching - .into_iter() - .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) - .collect(), - }, - proto::filter_criteria::Message::IncludeMatchingEntityIds( - proto::IncludeRegexEntityIds { matching }, - ) => FilterCriteria::IncludeRegexEntityIds { - matching: matching - .into_iter() - .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) - .collect(), - }, - proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( - proto::RemoveIncludeRegexEntityIds { matching }, - ) => FilterCriteria::RemoveIncludeRegexEntityIds { - matching: matching - .into_iter() - .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) - .collect(), - }, - proto::filter_criteria::Message::ExcludeEntityIds(proto::ExcludeEntityIds { - entity_ids, - }) => FilterCriteria::ExcludePersistenceIds { - persistence_ids: entity_ids - .into_iter() - .map(|e| PersistenceId::new(entity_type.clone(), EntityId::from(e))) - .collect(), - }, - proto::filter_criteria::Message::RemoveExcludeEntityIds( - proto::RemoveExcludeEntityIds { entity_ids }, - ) => FilterCriteria::RemoveExcludePersistenceIds { - persistence_ids: entity_ids - .into_iter() - .map(|e| PersistenceId::new(entity_type.clone(), EntityId::from(e))) - .collect(), - }, - proto::filter_criteria::Message::IncludeEntityIds(proto::IncludeEntityIds { - entity_id_offset, - }) => FilterCriteria::IncludePersistenceIds { - persistence_id_offsets: entity_id_offset - .into_iter() - .map( - |proto::EntityIdOffset { entity_id, seq_nr }| PersistenceIdIdOffset { - persistence_id: PersistenceId::new( - entity_type.clone(), - EntityId::from(entity_id), - ), - seq_nr: seq_nr as u64, - }, - ) - .collect(), - }, - proto::filter_criteria::Message::RemoveIncludeEntityIds( - proto::RemoveIncludeEntityIds { entity_ids }, - ) => FilterCriteria::RemoveIncludePersistenceIds { - persistence_ids: entity_ids - .into_iter() - .map(|e| PersistenceId::new(entity_type.clone(), EntityId::from(e))) - .collect(), - }, - proto::filter_criteria::Message::IncludeTopics(proto::IncludeTopics { - expression, - }) => FilterCriteria::IncludeTopics { - expressions: expression - .into_iter() - .flat_map(|e| TopicMatcher::new(e).ok()) - .collect(), - }, - proto::filter_criteria::Message::RemoveIncludeTopics( - proto::RemoveIncludeTopics { expression }, - ) => FilterCriteria::RemoveIncludeTopics { - expressions: expression - .into_iter() - .flat_map(|e| TopicMatcher::new(e).ok()) - .collect(), - }, - }; - Ok(criteria) + proto::filter_criteria::Message::RemoveIncludeTags( + proto::RemoveIncludeTags { tags }, + ) => FilterCriteria::RemoveIncludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + }, + proto::filter_criteria::Message::ExcludeMatchingEntityIds( + proto::ExcludeRegexEntityIds { matching }, + ) => FilterCriteria::ExcludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) + .collect(), + }, + proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( + proto::RemoveExcludeRegexEntityIds { matching }, + ) => FilterCriteria::RemoveExcludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) + .collect(), + }, + proto::filter_criteria::Message::IncludeMatchingEntityIds( + proto::IncludeRegexEntityIds { matching }, + ) => FilterCriteria::IncludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( + proto::RemoveIncludeRegexEntityIds { matching }, + ) => FilterCriteria::RemoveIncludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) + .collect(), + }, + proto::filter_criteria::Message::ExcludeEntityIds( + proto::ExcludeEntityIds { entity_ids }, + ) => FilterCriteria::ExcludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::RemoveExcludeEntityIds( + proto::RemoveExcludeEntityIds { entity_ids }, + ) => FilterCriteria::RemoveExcludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::IncludeEntityIds( + proto::IncludeEntityIds { entity_id_offset }, + ) => FilterCriteria::IncludeEntityIds { + entity_id_offsets: entity_id_offset + .into_iter() + .map( + |proto::EntityIdOffset { entity_id, seq_nr }| EntityIdOffset { + entity_id: EntityId::from(entity_id), + seq_nr: seq_nr as u64, + }, + ) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeEntityIds( + proto::RemoveIncludeEntityIds { entity_ids }, + ) => FilterCriteria::RemoveIncludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::IncludeTopics(proto::IncludeTopics { + expression, + }) => FilterCriteria::IncludeTopics { + expressions: expression + .into_iter() + .flat_map(|e| TopicMatcher::new(e).ok()) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeTopics( + proto::RemoveIncludeTopics { expression }, + ) => FilterCriteria::RemoveIncludeTopics { + expressions: expression + .into_iter() + .flat_map(|e| TopicMatcher::new(e).ok()) + .collect(), + }, + }; + Ok(criteria) + } + None => Err(NoMessage), } - None => Err(NoMessage), } } diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index 1846103..fbabb78 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -31,7 +31,6 @@ use tonic::Request; use crate::delayer::Delayer; use crate::proto; -use crate::to_filter_criteria; use crate::EventEnvelope; use crate::StreamId; @@ -174,7 +173,6 @@ pub async fn run( origin_id: StreamId, stream_id: StreamId, consumer_filters: watch::Sender>, - entity_type: EntityType, mut envelopes: mpsc::Receiver<(EventEnvelope, oneshot::Sender<()>)>, mut kill_switch: oneshot::Receiver<()>, ) where @@ -279,7 +277,7 @@ pub async fn run( let _ = consumer_filters.send( filter .into_iter() - .flat_map(|f| to_filter_criteria(entity_type.clone(), f)) + .flat_map(|f| f.try_into()) .collect(), ); break; @@ -459,7 +457,6 @@ mod tests { OriginId::from("some-origin-id"), StreamId::from("some-stream-id"), consumer_filters, - EntityType::from("some-entity-type"), receiver, task_kill_switch_receiver, ) diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index 99fcd02..53bfdf1 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -27,13 +27,13 @@ use std::fmt::Display; -use akka_persistence_rs::{EntityId, PersistenceId, Tag, WithPersistenceId, WithTags}; +use akka_persistence_rs::{EntityId, Tag, WithPersistenceId, WithTags}; use mqtt::{TopicFilter, TopicNameRef}; use regex::Regex; #[derive(Clone)] -pub struct PersistenceIdIdOffset { - pub persistence_id: PersistenceId, +pub struct EntityIdOffset { + pub entity_id: EntityId, // If this is defined (> 0) events are replayed from the given // sequence number (inclusive). pub seq_nr: u64, @@ -109,21 +109,21 @@ pub enum FilterCriteria { IncludeRegexEntityIds { matching: Vec }, /// Remove a previously added `IncludeRegexEntityIds`. RemoveIncludeRegexEntityIds { matching: Vec }, - /// Exclude events for entities with the given persistence ids, + /// Exclude events for entities with the given entity ids, /// unless there is a matching include filter that overrides the exclude. - ExcludePersistenceIds { persistence_ids: Vec }, - /// Remove a previously added `ExcludePersistenceIds`. - RemoveExcludePersistenceIds { persistence_ids: Vec }, - /// Include events for entities with the given persistence ids. A matching include overrides + ExcludeEntityIds { entity_ids: Vec }, + /// Remove a previously added `ExcludeEntityIds`. + RemoveExcludeEntityIds { entity_ids: Vec }, + /// Include events for entities with the given entity ids. A matching include overrides /// a matching exclude. /// /// For the given entity ids a `seq_nr` can be defined to replay all events for the entity /// from the sequence number (inclusive). If `seq_nr` is 0 events will not be replayed. - IncludePersistenceIds { - persistence_id_offsets: Vec, + IncludeEntityIds { + entity_id_offsets: Vec, }, - /// Remove a previously added `IncludePersistenceIds`. - RemoveIncludePersistenceIds { persistence_ids: Vec }, + /// Remove a previously added `IncludeEntityIds`. + RemoveIncludeEntityIds { entity_ids: Vec }, /// Include events with any of the given matching topics. A matching include overrides /// a matching exclude. IncludeTopics { expressions: Vec }, @@ -148,9 +148,9 @@ pub struct Filter { max_regex_entity_ids: usize, exclude_regex_entity_ids: Vec, include_regex_entity_ids: Vec, - max_persistence_ids: usize, - exclude_persistence_ids: Vec, - include_persistence_ids: Vec, + max_entity_ids: usize, + exclude_entity_ids: Vec, + include_entity_ids: Vec, max_topics: usize, include_topics: Vec, } @@ -165,9 +165,9 @@ impl Default for Filter { max_regex_entity_ids: 10, exclude_regex_entity_ids: vec![], include_regex_entity_ids: vec![], - max_persistence_ids: 10, - exclude_persistence_ids: vec![], - include_persistence_ids: vec![], + max_entity_ids: 10, + exclude_entity_ids: vec![], + include_entity_ids: vec![], max_topics: 10, include_topics: vec![], } @@ -179,7 +179,7 @@ impl Filter { topic_tag_prefix: Tag, max_tags: usize, max_regex_entity_ids: usize, - max_persistence_ids: usize, + max_entity_ids: usize, max_topics: usize, ) -> Self { Self { @@ -190,9 +190,9 @@ impl Filter { max_regex_entity_ids, exclude_regex_entity_ids: vec![], include_regex_entity_ids: vec![], - max_persistence_ids, - exclude_persistence_ids: vec![], - include_persistence_ids: vec![], + max_entity_ids, + exclude_entity_ids: vec![], + include_entity_ids: vec![], max_topics, include_topics: vec![], } @@ -208,12 +208,12 @@ impl Filter { let entity_id = &persistence_id.entity_id; if self.matches_exclude_tags(tags) - || self.matches_exclude_persistence_ids(persistence_id) + || self.matches_exclude_entity_ids(entity_id) || self.matches_exclude_regex_entity_ids(entity_id) { self.matches_include_tags(tags) || self.matches_include_topics(tags) - || self.matches_include_persistence_ids(persistence_id) + || self.matches_include_entity_ids(entity_id) || self.matches_include_regex_entity_ids(entity_id) } else { true @@ -228,12 +228,12 @@ impl Filter { Self::matches_regex_entity_ids(&self.include_regex_entity_ids, entity_id) } - fn matches_exclude_persistence_ids(&self, persistence_id: &PersistenceId) -> bool { - Self::matches_persistence_ids(&self.exclude_persistence_ids, persistence_id) + fn matches_exclude_entity_ids(&self, entity_id: &EntityId) -> bool { + Self::matches_entity_ids(&self.exclude_entity_ids, entity_id) } - fn matches_include_persistence_ids(&self, persistence_id: &PersistenceId) -> bool { - Self::matches_persistence_ids(&self.include_persistence_ids, persistence_id) + fn matches_include_entity_ids(&self, entity_id: &EntityId) -> bool { + Self::matches_entity_ids(&self.include_entity_ids, entity_id) } fn matches_exclude_tags(&self, tags: &[Tag]) -> bool { @@ -252,11 +252,8 @@ impl Filter { matching.iter().any(|r| r.0.is_match(entity_id)) } - fn matches_persistence_ids( - persistence_ids: &[PersistenceId], - persistence_id: &PersistenceId, - ) -> bool { - persistence_ids.iter().any(|pi| pi == persistence_id) + fn matches_entity_ids(entity_ids: &[EntityId], entity_id: &EntityId) -> bool { + entity_ids.iter().any(|pi| pi == entity_id) } fn matches_tags(match_tags: &[Tag], tags: &[Tag]) -> bool { @@ -316,31 +313,27 @@ impl Filter { remove(&mut self.include_regex_entity_ids, &matching) } - FilterCriteria::ExcludePersistenceIds { - mut persistence_ids, - } => merge( - &mut self.exclude_persistence_ids, - &mut persistence_ids, - self.max_persistence_ids, + FilterCriteria::ExcludeEntityIds { mut entity_ids } => merge( + &mut self.exclude_entity_ids, + &mut entity_ids, + self.max_entity_ids, ), - FilterCriteria::RemoveExcludePersistenceIds { persistence_ids } => { - remove(&mut self.exclude_persistence_ids, &persistence_ids) + FilterCriteria::RemoveExcludeEntityIds { entity_ids } => { + remove(&mut self.exclude_entity_ids, &entity_ids) } - FilterCriteria::IncludePersistenceIds { - persistence_id_offsets, - } => merge( - &mut self.include_persistence_ids, - &mut persistence_id_offsets + FilterCriteria::IncludeEntityIds { entity_id_offsets } => merge( + &mut self.include_entity_ids, + &mut entity_id_offsets .into_iter() - .map(|PersistenceIdIdOffset { persistence_id, .. }| persistence_id) + .map(|EntityIdOffset { entity_id, .. }| entity_id) .collect(), - self.max_persistence_ids, + self.max_entity_ids, ), - FilterCriteria::RemoveIncludePersistenceIds { persistence_ids } => { - remove(&mut self.include_persistence_ids, &persistence_ids) + FilterCriteria::RemoveIncludeEntityIds { entity_ids } => { + remove(&mut self.include_entity_ids, &entity_ids) } FilterCriteria::IncludeTopics { mut expressions } => { @@ -376,6 +369,8 @@ where #[cfg(test)] mod tests { + use akka_persistence_rs::PersistenceId; + use super::*; struct TestEnvelope { @@ -430,8 +425,9 @@ mod tests { } #[test] - fn exclude_include_and_remove_include_persistence_id_and_remove_exclude_persistence_id() { + fn exclude_include_and_remove_include_entity_id_and_remove_exclude_entity_id() { let persistence_id = "a|1".parse::().unwrap(); + let entity_id = persistence_id.entity_id.clone(); let envelope = TestEnvelope { persistence_id: persistence_id.clone(), @@ -441,12 +437,12 @@ mod tests { let mut filter = Filter::default(); let criteria = vec![ - FilterCriteria::ExcludePersistenceIds { - persistence_ids: vec![persistence_id.clone()], + FilterCriteria::ExcludeEntityIds { + entity_ids: vec![entity_id.clone()], }, - FilterCriteria::IncludePersistenceIds { - persistence_id_offsets: vec![PersistenceIdIdOffset { - persistence_id: persistence_id.clone(), + FilterCriteria::IncludeEntityIds { + entity_id_offsets: vec![EntityIdOffset { + entity_id: entity_id.clone(), seq_nr: 0, }], }, @@ -454,14 +450,14 @@ mod tests { filter.update(criteria); assert!(filter.matches(&envelope)); - let criteria = vec![FilterCriteria::RemoveIncludePersistenceIds { - persistence_ids: vec![persistence_id.clone()], + let criteria = vec![FilterCriteria::RemoveIncludeEntityIds { + entity_ids: vec![entity_id.clone()], }]; filter.update(criteria); assert!(!filter.matches(&envelope)); - let criteria = vec![FilterCriteria::RemoveExcludePersistenceIds { - persistence_ids: vec![persistence_id.clone()], + let criteria = vec![FilterCriteria::RemoveExcludeEntityIds { + entity_ids: vec![entity_id.clone()], }]; filter.update(criteria); assert!(filter.matches(&envelope)); diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index d400deb..19bb63d 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -35,7 +35,6 @@ pub async fn task( let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10); let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); - let task_entity_type = entity_type.clone(); tokio::spawn(async { let consumer_endpoint = Channel::builder(event_consumer_addr); @@ -46,7 +45,6 @@ pub async fn task( OriginId::from("edge-iot-service"), StreamId::from("temperature-events"), consumer_filters, - task_entity_type, grpc_producer_receiver, task_kill_switch_receiver, )