diff --git a/Cargo.toml b/Cargo.toml index b8add91..9b3bf59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,11 +32,13 @@ itoa = "1.0" js-sys = "0.3.60" log = "0.4.17" lru = "0.11.0" +mqtt-protocol = "0.11.2" postcard = { version = "1.0.6", default-features = false } prost = { version = "0.12.0" } prost-build = { version = "0.12.0" } prost-types = { version = "0.12.0" } rand = "0.8" +regex = "1.9.6" scopeguard = "1.1" serde = "1.0.151" serde_json = "1.0.107" diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 86236d4..f4ff507 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -2,7 +2,8 @@ use akka_persistence_rs::{ entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider}, - EntityId, Offset, TimestampOffset, WithEntityId, WithOffset, WithSeqNr, WithTimestampOffset, + EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset, + WithPersistenceId, WithSeqNr, WithTags, WithTimestampOffset, }; use async_stream::stream; use async_trait::async_trait; @@ -19,39 +20,22 @@ use streambed::{ use tokio_stream::{Stream, StreamExt}; /// An envelope wraps a commit log event associated with a specific entity. +/// Tags are not presently considered useful at the edge. A remote consumer would be interested +/// in all events from the edge in most cases, and the edge itself decides what to publish +/// (producer defined filter). #[derive(Clone, Debug, PartialEq)] pub struct EventEnvelope { - pub entity_id: EntityId, + pub persistence_id: PersistenceId, pub seq_nr: u64, pub timestamp: DateTime, pub event: E, pub offset: CommitLogOffset, + pub tags: Vec, } -impl EventEnvelope { - pub fn new( - entity_id: EI, - seq_nr: u64, - timestamp: DateTime, - event: E, - offset: CommitLogOffset, - ) -> Self - where - EI: Into, - { - Self { - entity_id: entity_id.into(), - seq_nr, - timestamp, - event, - offset, - } - } -} - -impl WithEntityId for EventEnvelope { - fn entity_id(&self) -> EntityId { - self.entity_id.clone() +impl WithPersistenceId for EventEnvelope { + fn persistence_id(&self) -> &PersistenceId { + &self.persistence_id } } @@ -67,6 +51,12 @@ impl WithSeqNr for EventEnvelope { } } +impl WithTags for EventEnvelope { + fn tags(&self) -> &[akka_persistence_rs::Tag] { + &self.tags + } +} + impl WithTimestampOffset for EventEnvelope { fn timestamp_offset(&self) -> TimestampOffset { TimestampOffset { @@ -84,6 +74,9 @@ pub trait CommitLogMarshaler where for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { + /// Declares the entity type to the marshaler. + fn entity_type(&self) -> EntityType; + /// Provide a key we can use for the purposes of log compaction. /// A key would generally comprise and event type value held in /// the high bits, and the entity id in the lower bits. @@ -159,8 +152,13 @@ where } }); seq_nr.and_then(|seq_nr| { - record.timestamp.map(|timestamp| { - EventEnvelope::new(entity_id, seq_nr, timestamp, event, record.offset) + record.timestamp.map(|timestamp| EventEnvelope { + persistence_id: PersistenceId::new(self.entity_type(), entity_id), + seq_nr, + timestamp, + event, + offset: record.offset, + tags: vec![], }) }) }) @@ -291,7 +289,7 @@ where marshaler.envelope(record_entity_id, consumer_record).await { yield EntityManagerEventEnvelope::new( - envelope.entity_id, + envelope.persistence_id.entity_id, envelope.seq_nr, envelope.timestamp, envelope.event, @@ -328,7 +326,7 @@ where marshaler.envelope(record_entity_id, consumer_record).await { yield EntityManagerEventEnvelope::new( - envelope.entity_id, + envelope.persistence_id.entity_id, envelope.seq_nr, envelope.timestamp, envelope.event, @@ -476,6 +474,10 @@ mod tests { #[async_trait] impl CommitLogMarshaler for MyEventMarshaler { + fn entity_type(&self) -> EntityType { + EntityType::from("some-entity-type") + } + fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option { panic!("should not be called") } @@ -496,11 +498,12 @@ mod tests { let value = String::from_utf8(record.value).ok()?; let event = MyEvent { value }; record.timestamp.map(|timestamp| EventEnvelope { - entity_id, + persistence_id: PersistenceId::new(self.entity_type(), entity_id), seq_nr: 1, timestamp, event, offset: 0, + tags: vec![], }) } diff --git a/akka-persistence-rs/src/lib.rs b/akka-persistence-rs/src/lib.rs index 5017c48..ef750ec 100644 --- a/akka-persistence-rs/src/lib.rs +++ b/akka-persistence-rs/src/lib.rs @@ -9,20 +9,29 @@ use std::{ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use smol_str::SmolStr; pub mod effect; pub mod entity; pub mod entity_manager; /// Uniquely identifies the type of an Entity. -pub type EntityType = smol_str::SmolStr; +pub type EntityType = SmolStr; /// Uniquely identifies an entity, or entity instance. -pub type EntityId = smol_str::SmolStr; +pub type EntityId = SmolStr; -/// Implemented by structures that can return an entity id. -pub trait WithEntityId { - fn entity_id(&self) -> EntityId; +/// Tags annotate an entity's events +pub type Tag = SmolStr; + +/// Implemented by structures that can return a persistence id. +pub trait WithPersistenceId { + fn persistence_id(&self) -> &PersistenceId; +} + +/// Implemented by structures that can return tags. +pub trait WithTags { + fn tags(&self) -> &[Tag]; } /// A slice is deterministically defined based on the persistence id. @@ -63,7 +72,7 @@ fn jdk_string_hashcode(s: &str) -> i32 { } /// A namespaced entity id given an entity type. -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, Deserialize, PartialOrd, Ord, Serialize, PartialEq, Eq, Hash)] pub struct PersistenceId { pub entity_type: EntityType, pub entity_id: EntityId, diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 7b5dd33..b1c871d 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -4,7 +4,7 @@ pub mod offset_store; use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin}; -use akka_persistence_rs::{EntityType, Offset, PersistenceId}; +use akka_persistence_rs::{Offset, PersistenceId}; use akka_persistence_rs_commitlog::{CommitLogMarshaler, EventEnvelope}; use akka_projection_rs::SourceProvider; use async_stream::stream; @@ -17,7 +17,6 @@ use tokio_stream::{Stream, StreamExt}; pub struct CommitLogSourceProvider { commit_log: CL, consumer_group_name: String, - entity_type: EntityType, marshaler: M, slice_range: Range, topic: Topic, @@ -30,13 +29,7 @@ where M: CommitLogMarshaler + Sync, for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait, { - pub fn new( - commit_log: CL, - marshaler: M, - consumer_group_name: &str, - topic: Topic, - entity_type: EntityType, - ) -> Self { + pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self { // When it comes to having a projection sourced from a local // commit log, there's little benefit if having many of them. // We therefore manage all slices from just one projection. @@ -47,7 +40,6 @@ where marshaler, consumer_group_name, topic, - entity_type, slice_range.get(0).cloned().unwrap(), ) } @@ -57,7 +49,6 @@ where marshaler: M, consumer_group_name: &str, topic: Topic, - entity_type: EntityType, slice_range: Range, ) -> Self { Self { @@ -66,7 +57,6 @@ where marshaler, slice_range, topic, - entity_type, phantom: PhantomData, } } @@ -102,7 +92,7 @@ where while let Some(consumer_record) = records.next().await { if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) { let persistence_id = - PersistenceId::new(self.entity_type.clone(), record_entity_id); + PersistenceId::new(marshaler.entity_type(), record_entity_id); if self.slice_range.contains(&persistence_id.slice()) { if let Some(envelope) = marshaler .envelope(persistence_id.entity_id, consumer_record) @@ -144,7 +134,7 @@ mod tests { use std::{env, fs}; use super::*; - use akka_persistence_rs::EntityId; + use akka_persistence_rs::{EntityId, EntityType}; use chrono::{DateTime, Utc}; use serde::Deserialize; use streambed::commit_log::{ConsumerRecord, Header, Key, ProducerRecord}; @@ -173,6 +163,10 @@ mod tests { #[async_trait] impl CommitLogMarshaler for MyEventMarshaler { + fn entity_type(&self) -> EntityType { + EntityType::from("some-topic") + } + fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option { panic!("should not be called") } @@ -193,11 +187,12 @@ mod tests { let value = String::from_utf8(record.value).ok()?; let event = MyEvent { value }; record.timestamp.map(|timestamp| EventEnvelope { - entity_id, + persistence_id: PersistenceId::new(self.entity_type(), entity_id), seq_nr: 1, timestamp, event, offset: 0, + tags: vec![], }) } @@ -239,6 +234,7 @@ mod tests { let entity_type = EntityType::from("some-topic"); let entity_id = EntityId::from("some-entity"); + let persistence_id = PersistenceId::new(entity_type.clone(), entity_id.clone()); let topic = Topic::from("some-topic"); let event_value = "some value".to_string(); @@ -263,13 +259,12 @@ mod tests { MyEventMarshaler, "some-consumer", topic, - entity_type, ); let mut envelopes = source_provider.source(|| async { None }).await; let envelope = envelopes.next().await.unwrap(); - assert_eq!(envelope.entity_id, entity_id,); + assert_eq!(envelope.persistence_id, persistence_id); assert_eq!(envelope.event, MyEvent { value: event_value },); } } diff --git a/akka-projection-rs-commitlog/src/offset_store.rs b/akka-projection-rs-commitlog/src/offset_store.rs index 60769e7..487ee94 100644 --- a/akka-projection-rs-commitlog/src/offset_store.rs +++ b/akka-projection-rs-commitlog/src/offset_store.rs @@ -2,7 +2,7 @@ use std::num::NonZeroUsize; -use akka_persistence_rs::{entity_manager, EntityId, Message}; +use akka_persistence_rs::{entity_manager, EntityId, EntityType, Message, PersistenceId}; use akka_persistence_rs_commitlog::{CommitLogMarshaler, CommitLogTopicAdapter, EventEnvelope}; use akka_projection_rs::offset_store; use async_trait::async_trait; @@ -13,6 +13,7 @@ use streambed_logged::{compaction::KeyBasedRetention, FileLog}; use tokio::sync::mpsc; struct OffsetStoreEventMarshaler { + entity_type: EntityType, to_compaction_key: F, } @@ -21,6 +22,10 @@ impl CommitLogMarshaler for OffsetStoreEventMarshaler where F: Fn(&EntityId, &offset_store::Event) -> Option + Send + Sync, { + fn entity_type(&self) -> EntityType { + self.entity_type.clone() + } + fn to_compaction_key(&self, entity_id: &EntityId, event: &offset_store::Event) -> Option { (self.to_compaction_key)(entity_id, event) } @@ -41,11 +46,12 @@ where let value = u64::from_be_bytes(record.value.try_into().ok()?); let event = offset_store::Event::Saved { seq_nr: value }; record.timestamp.map(|timestamp| EventEnvelope { - entity_id, + persistence_id: PersistenceId::new(self.entity_type(), entity_id), seq_nr: 0, // We don't care about sequence numbers with the offset store as they won't be projected anywhere timestamp, event, offset: record.offset, + tags: vec![], }) } @@ -85,6 +91,7 @@ pub async fn run( offset_store_receiver: mpsc::Receiver>, to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option + Send + Sync + 'static, ) { + let events_entity_type = EntityType::from(offset_store_id.clone()); let events_topic = Topic::from(offset_store_id.clone()); commit_log @@ -94,7 +101,10 @@ pub async fn run( let file_log_topic_adapter = CommitLogTopicAdapter::new( commit_log, - OffsetStoreEventMarshaler { to_compaction_key }, + OffsetStoreEventMarshaler { + entity_type: events_entity_type, + to_compaction_key, + }, &offset_store_id, events_topic, ); diff --git a/akka-projection-rs-grpc/Cargo.toml b/akka-projection-rs-grpc/Cargo.toml index a5e5a49..6a1a92e 100644 --- a/akka-projection-rs-grpc/Cargo.toml +++ b/akka-projection-rs-grpc/Cargo.toml @@ -13,6 +13,7 @@ futures = { workspace = true } log = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +regex = { workspace = true } smol_str = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } diff --git a/akka-projection-rs-grpc/src/consumer.rs b/akka-projection-rs-grpc/src/consumer.rs index 66015eb..dead6c2 100644 --- a/akka-projection-rs-grpc/src/consumer.rs +++ b/akka-projection-rs-grpc/src/consumer.rs @@ -493,7 +493,7 @@ mod tests { let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![FilterCriteria::IncludeEntityIds { entity_id_offsets: vec![EntityIdOffset { - entity_id: entity_id.clone(), + entity_id: persistence_id.entity_id.clone(), seq_nr: 0, }], }]); diff --git a/akka-projection-rs-grpc/src/lib.rs b/akka-projection-rs-grpc/src/lib.rs index 63e845d..c53ac74 100644 --- a/akka-projection-rs-grpc/src/lib.rs +++ b/akka-projection-rs-grpc/src/lib.rs @@ -1,7 +1,10 @@ #![doc = include_str!("../README.md")] -use akka_persistence_rs::{Offset, PersistenceId, TimestampOffset, WithOffset}; -use akka_projection_rs::consumer_filter::{EntityIdOffset, FilterCriteria}; +use akka_persistence_rs::{EntityId, Offset, PersistenceId, Tag, TimestampOffset, WithOffset}; +use akka_projection_rs::consumer_filter::{ + ComparableRegex, EntityIdOffset, FilterCriteria, TopicMatcher, +}; +use regex::Regex; use smol_str::SmolStr; pub mod consumer; @@ -60,28 +63,28 @@ impl From for proto::FilterCriteria { FilterCriteria::ExcludeRegexEntityIds { matching } => { proto::filter_criteria::Message::ExcludeMatchingEntityIds( proto::ExcludeRegexEntityIds { - matching: matching.into_iter().map(|v| v.to_string()).collect(), + matching: matching.into_iter().map(|v| v.0.to_string()).collect(), }, ) } FilterCriteria::RemoveExcludeRegexEntityIds { matching } => { proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( proto::RemoveExcludeRegexEntityIds { - matching: matching.into_iter().map(|v| v.to_string()).collect(), + matching: matching.into_iter().map(|v| v.0.to_string()).collect(), }, ) } FilterCriteria::IncludeRegexEntityIds { matching } => { proto::filter_criteria::Message::IncludeMatchingEntityIds( proto::IncludeRegexEntityIds { - matching: matching.into_iter().map(|v| v.to_string()).collect(), + matching: matching.into_iter().map(|v| v.0.to_string()).collect(), }, ) } FilterCriteria::RemoveIncludeRegexEntityIds { matching } => { proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( proto::RemoveIncludeRegexEntityIds { - matching: matching.into_iter().map(|v| v.to_string()).collect(), + matching: matching.into_iter().map(|v| v.0.to_string()).collect(), }, ) } @@ -133,3 +136,120 @@ impl From for proto::FilterCriteria { } } } + +/// Declares that a protobuf criteria is unable to be converted +/// due to there being no message. +pub struct NoMessage; + +/// Attempt to convert from a protobuf filter criteria to a model +/// representation given an entity type. +impl TryFrom for FilterCriteria { + type Error = NoMessage; + + fn try_from(value: proto::FilterCriteria) -> Result { + match value.message { + Some(message) => { + let criteria = match message { + proto::filter_criteria::Message::ExcludeTags(proto::ExcludeTags { tags }) => { + FilterCriteria::ExcludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + } + } + proto::filter_criteria::Message::RemoveExcludeTags( + proto::RemoveExcludeTags { tags }, + ) => FilterCriteria::RemoveExcludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + }, + proto::filter_criteria::Message::IncludeTags(proto::IncludeTags { tags }) => { + FilterCriteria::IncludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + } + } + proto::filter_criteria::Message::RemoveIncludeTags( + proto::RemoveIncludeTags { tags }, + ) => FilterCriteria::RemoveIncludeTags { + tags: tags.into_iter().map(Tag::from).collect(), + }, + proto::filter_criteria::Message::ExcludeMatchingEntityIds( + proto::ExcludeRegexEntityIds { matching }, + ) => FilterCriteria::ExcludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) + .collect(), + }, + proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds( + proto::RemoveExcludeRegexEntityIds { matching }, + ) => FilterCriteria::RemoveExcludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) + .collect(), + }, + proto::filter_criteria::Message::IncludeMatchingEntityIds( + proto::IncludeRegexEntityIds { matching }, + ) => FilterCriteria::IncludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds( + proto::RemoveIncludeRegexEntityIds { matching }, + ) => FilterCriteria::RemoveIncludeRegexEntityIds { + matching: matching + .into_iter() + .flat_map(|m| Regex::new(&m).ok().map(ComparableRegex)) + .collect(), + }, + proto::filter_criteria::Message::ExcludeEntityIds( + proto::ExcludeEntityIds { entity_ids }, + ) => FilterCriteria::ExcludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::RemoveExcludeEntityIds( + proto::RemoveExcludeEntityIds { entity_ids }, + ) => FilterCriteria::RemoveExcludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::IncludeEntityIds( + proto::IncludeEntityIds { entity_id_offset }, + ) => FilterCriteria::IncludeEntityIds { + entity_id_offsets: entity_id_offset + .into_iter() + .map( + |proto::EntityIdOffset { entity_id, seq_nr }| EntityIdOffset { + entity_id: EntityId::from(entity_id), + seq_nr: seq_nr as u64, + }, + ) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeEntityIds( + proto::RemoveIncludeEntityIds { entity_ids }, + ) => FilterCriteria::RemoveIncludeEntityIds { + entity_ids: entity_ids.into_iter().map(EntityId::from).collect(), + }, + proto::filter_criteria::Message::IncludeTopics(proto::IncludeTopics { + expression, + }) => FilterCriteria::IncludeTopics { + expressions: expression + .into_iter() + .flat_map(|e| TopicMatcher::new(e).ok()) + .collect(), + }, + proto::filter_criteria::Message::RemoveIncludeTopics( + proto::RemoveIncludeTopics { expression }, + ) => FilterCriteria::RemoveIncludeTopics { + expressions: expression + .into_iter() + .flat_map(|e| TopicMatcher::new(e).ok()) + .collect(), + }, + }; + Ok(criteria) + } + None => Err(NoMessage), + } + } +} diff --git a/akka-projection-rs-grpc/src/producer.rs b/akka-projection-rs-grpc/src/producer.rs index fe84afc..fbabb78 100644 --- a/akka-projection-rs-grpc/src/producer.rs +++ b/akka-projection-rs-grpc/src/producer.rs @@ -2,13 +2,17 @@ use akka_persistence_rs::EntityId; use akka_persistence_rs::EntityType; use akka_persistence_rs::PersistenceId; use akka_persistence_rs::TimestampOffset; -use akka_persistence_rs::WithEntityId; +use akka_persistence_rs::WithPersistenceId; use akka_persistence_rs::WithSeqNr; +use akka_persistence_rs::WithTags; use akka_persistence_rs::WithTimestampOffset; +use akka_projection_rs::consumer_filter::Filter; +use akka_projection_rs::consumer_filter::FilterCriteria; use akka_projection_rs::HandlerError; use akka_projection_rs::PendingHandler; use async_stream::stream; use async_trait::async_trait; +use futures::future; use log::debug; use log::warn; use prost::Name; @@ -20,6 +24,7 @@ use std::marker::PhantomData; use std::pin::Pin; use tokio::sync::mpsc; use tokio::sync::oneshot; +use tokio::sync::watch; use tokio_stream::StreamExt; use tonic::transport::Channel; use tonic::Request; @@ -40,21 +45,22 @@ pub struct Transformation { /// Processes events transformed from some unknown event envelope (EI) /// to then pass on to a gRPC event producer. -pub struct GrpcEventProcessor -where - F: Fn(&EI) -> Option, -{ - producer: GrpcEventProducer, - transformer: F, +pub struct GrpcEventProcessor { + flow: GrpcEventFlow, + producer_filter: PF, + consumer_filters_receiver: watch::Receiver>, + filter: Filter, + transformer: T, phantom: PhantomData, } #[async_trait] -impl PendingHandler for GrpcEventProcessor +impl PendingHandler for GrpcEventProcessor where - EI: WithEntityId + WithSeqNr + WithTimestampOffset + Send, + EI: WithPersistenceId + WithSeqNr + WithTags + WithTimestampOffset + Send, E: Send, - F: Fn(&EI) -> Option + Send, + PF: Fn(&EI) -> bool + Send, + T: Fn(&EI) -> Option + Send, { type Envelope = EI; @@ -64,14 +70,25 @@ where &mut self, envelope: Self::Envelope, ) -> Result> + Send>>, HandlerError> { - let event = (self.transformer)(&envelope); + if self.consumer_filters_receiver.has_changed().unwrap_or(true) { + self.filter + .update(self.consumer_filters_receiver.borrow().clone()); + }; + + let event = if (self.producer_filter)(&envelope) && self.filter.matches(&envelope) { + (self.transformer)(&envelope) + } else { + // Gaps are ok given a filter situation. + return Ok(Box::pin(future::ready(Ok(())))); + }; + let transformation = Transformation { - entity_id: envelope.entity_id(), + entity_id: envelope.persistence_id().entity_id.clone(), seq_nr: envelope.seq_nr(), offset: envelope.timestamp_offset(), event, }; - let result = self.producer.process(transformation).await; + let result = self.flow.process(transformation).await; if let Ok(receiver_reply) = result { Ok(Box::pin(async { receiver_reply.await.map_err(|_| HandlerError) @@ -82,13 +99,13 @@ where } } -/// Produce gRPC events given a user-supplied transformation function. -pub struct GrpcEventProducer { +/// Transform and forward gRPC events given a user-supplied transformation function. +pub struct GrpcEventFlow { entity_type: EntityType, grpc_producer: mpsc::Sender<(EventEnvelope, oneshot::Sender<()>)>, } -impl GrpcEventProducer { +impl GrpcEventFlow { pub fn new( entity_type: EntityType, grpc_producer: mpsc::Sender<(EventEnvelope, oneshot::Sender<()>)>, @@ -99,12 +116,25 @@ impl GrpcEventProducer { } } - pub fn handler(self, transformer: F) -> GrpcEventProcessor + /// Produces a handler for this flow. The handler will receive events, + /// apply filters and, if the filters match, transform and forward events + /// on to a gRPC producer. + pub fn handler( + self, + producer_filter: PF, + consumer_filters_receiver: watch::Receiver>, + filter: Filter, + transformer: T, + ) -> GrpcEventProcessor where - F: Fn(&EI) -> Option, + PF: Fn(&EI) -> bool, + T: Fn(&EI) -> Option, { GrpcEventProcessor { - producer: self, + flow: self, + producer_filter, + consumer_filters_receiver, + filter, transformer, phantom: PhantomData, } @@ -142,6 +172,7 @@ pub async fn run( event_consumer_channel: EC, origin_id: StreamId, stream_id: StreamId, + consumer_filters: watch::Sender>, mut envelopes: mpsc::Receiver<(EventEnvelope, oneshot::Sender<()>)>, mut kill_switch: oneshot::Receiver<()>, ) where @@ -241,8 +272,14 @@ pub async fn run( Some(Ok(proto::ConsumeEventOut { message: Some(message), })) = stream_outs.next() => match message { - proto::consume_event_out::Message::Start(proto::ConsumerEventStart { .. }) => { + proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter }) => { debug!("Starting the protocol"); + let _ = consumer_filters.send( + filter + .into_iter() + .flat_map(|f| f.try_into()) + .collect(), + ); break; } _ => { @@ -352,7 +389,9 @@ mod tests { yield Ok(proto::ConsumeEventOut { message: Some(proto::consume_event_out::Message::Start( proto::ConsumerEventStart { - filter: vec![] + filter: vec![proto::FilterCriteria { + message: Some(proto::filter_criteria::Message::ExcludeEntityIds(proto::ExcludeEntityIds { entity_ids: vec![] })), + }] }, )), }); @@ -408,6 +447,7 @@ mod tests { .unwrap(); }); + let (consumer_filters, mut consumer_filters_receiver) = watch::channel(vec![]); let (sender, receiver) = mpsc::channel(10); let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); tokio::spawn(async move { @@ -416,6 +456,7 @@ mod tests { || channel.connect(), OriginId::from("some-origin-id"), StreamId::from("some-stream-id"), + consumer_filters, receiver, task_kill_switch_receiver, ) @@ -442,6 +483,9 @@ mod tests { .await .is_ok()); + assert!(consumer_filters_receiver.changed().await.is_ok()); + assert!(!consumer_filters_receiver.borrow().is_empty()); + assert!(reply_receiver.await.is_ok()); server_kill_switch.notified(); diff --git a/akka-projection-rs-storage/benches/benches.rs b/akka-projection-rs-storage/benches/benches.rs index e9791f0..d4aa102 100644 --- a/akka-projection-rs-storage/benches/benches.rs +++ b/akka-projection-rs-storage/benches/benches.rs @@ -40,8 +40,10 @@ impl SourceProvider for TestSourceProvider { FR: Future> + Send, { let _ = offset().await; - Box::pin(stream!(for offset in 0..NUM_EVENTS as u64 { - yield TestEnvelope { offset }; + Box::pin(stream!(loop { + for offset in 0..NUM_EVENTS as u64 { + yield TestEnvelope { offset }; + } })) } } @@ -58,6 +60,7 @@ impl Handler for TestHandler { const LAST_OFFSET: u64 = NUM_EVENTS as u64 - 1; if envelope.offset == LAST_OFFSET { self.events_processed.notify_one(); + return Err(HandlerError); } Ok(()) } diff --git a/akka-projection-rs-storage/src/lib.rs b/akka-projection-rs-storage/src/lib.rs index 7e3791e..0398933 100644 --- a/akka-projection-rs-storage/src/lib.rs +++ b/akka-projection-rs-storage/src/lib.rs @@ -159,7 +159,7 @@ mod tests { use std::{collections::HashMap, env, fs, future::Future, pin::Pin}; use super::*; - use akka_persistence_rs::EntityId; + use akka_persistence_rs::{EntityId, EntityType, PersistenceId}; use akka_persistence_rs_commitlog::EventEnvelope; use akka_projection_rs::HandlerError; use async_stream::stream; @@ -248,7 +248,7 @@ mod tests { const MIN_SAVE_OFFSET_INTERVAL: Duration = Duration::from_millis(100); struct MySourceProvider { - entity_id: EntityId, + persistence_id: PersistenceId, event_value: String, } @@ -264,16 +264,28 @@ mod tests { F: Fn() -> FR + Send + Sync, FR: Future> + Send, { - Box::pin(stream! { - if offset().await.is_none() { - yield EventEnvelope::new(self.entity_id.clone(), 1, Utc::now(), MyEvent { value:self.event_value.clone() }, 0); + Box::pin( + stream! { + if offset().await.is_none() { + yield EventEnvelope { + persistence_id: self.persistence_id.clone(), + seq_nr: 1, + timestamp: Utc::now(), + event: MyEvent { + value: self.event_value.clone(), + }, + offset: 0, + tags: vec![] + } + } } - }.chain(stream::pending())) + .chain(stream::pending()), + ) } } struct MyHandler { - entity_id: EntityId, + persistence_id: PersistenceId, event_value: String, } @@ -283,7 +295,7 @@ mod tests { /// Process an envelope. async fn process(&mut self, envelope: Self::Envelope) -> Result<(), HandlerError> { - assert_eq!(envelope.entity_id, self.entity_id); + assert_eq!(envelope.persistence_id, self.persistence_id); assert_eq!( envelope.event, MyEvent { @@ -295,7 +307,7 @@ mod tests { } struct MyHandlerPending { - entity_id: EntityId, + persistence_id: PersistenceId, event_value: String, } @@ -311,7 +323,7 @@ mod tests { envelope: Self::Envelope, ) -> Result> + Send>>, HandlerError> { - assert_eq!(envelope.entity_id, self.entity_id); + assert_eq!(envelope.persistence_id, self.persistence_id); assert_eq!( envelope.event, MyEvent { @@ -332,7 +344,9 @@ mod tests { // Scaffolding + let entity_type = EntityType::from("some-entity-type"); let entity_id = EntityId::from("some-entity"); + let persistence_id = PersistenceId::new(entity_type, entity_id); let event_value = "some value".to_string(); // Process an event. @@ -348,11 +362,11 @@ mod tests { &task_storage_path, registration_projection_command_receiver, MySourceProvider { - entity_id: entity_id.clone(), + persistence_id: persistence_id.clone(), event_value: event_value.clone(), }, MyHandler { - entity_id: entity_id.clone(), + persistence_id: persistence_id.clone(), event_value: event_value.clone(), }, MIN_SAVE_OFFSET_INTERVAL, @@ -383,7 +397,9 @@ mod tests { // Scaffolding + let entity_type = EntityType::from("some-entity-type"); let entity_id = EntityId::from("some-entity"); + let persistence_id = PersistenceId::new(entity_type, entity_id); let event_value = "some value".to_string(); // Process an event. @@ -398,11 +414,11 @@ mod tests { &task_storage_path, registration_projection_command_receiver, MySourceProvider { - entity_id: entity_id.clone(), + persistence_id: persistence_id.clone(), event_value: event_value.clone(), }, MyHandlerPending { - entity_id: entity_id.clone(), + persistence_id: persistence_id.clone(), event_value: event_value.clone(), }, MIN_SAVE_OFFSET_INTERVAL, diff --git a/akka-projection-rs/Cargo.toml b/akka-projection-rs/Cargo.toml index ec05cca..0076276 100644 --- a/akka-projection-rs/Cargo.toml +++ b/akka-projection-rs/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" [dependencies] async-trait = { workspace = true } chrono = { workspace = true } +mqtt-protocol = { workspace = true } +regex = { workspace = true } serde = { workspace = true } smol_str = { workspace = true } tokio = { workspace = true } diff --git a/akka-projection-rs/src/consumer_filter.rs b/akka-projection-rs/src/consumer_filter.rs index c6214bc..53bfdf1 100644 --- a/akka-projection-rs/src/consumer_filter.rs +++ b/akka-projection-rs/src/consumer_filter.rs @@ -25,8 +25,11 @@ //! * IncludeRegexEntityIds - include events for entities with entity ids matching the given regular expressions //! * IncludeEntityIds - include events for entities with the given entity ids -use akka_persistence_rs::EntityId; -use smol_str::SmolStr; +use std::fmt::Display; + +use akka_persistence_rs::{EntityId, Tag, WithPersistenceId, WithTags}; +use mqtt::{TopicFilter, TopicNameRef}; +use regex::Regex; #[derive(Clone)] pub struct EntityIdOffset { @@ -36,13 +39,48 @@ pub struct EntityIdOffset { pub seq_nr: u64, } -/// A regex expression for matching entity ids. -pub type EntityIdMatcher = SmolStr; +#[derive(Clone)] +pub struct ComparableRegex(pub Regex); + +impl PartialEq for ComparableRegex { + fn eq(&self, other: &Self) -> bool { + self.0.as_str() == other.0.as_str() + } +} + +impl Eq for ComparableRegex {} + +impl PartialOrd for ComparableRegex { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ComparableRegex { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.0.as_str().cmp(other.0.as_str()) + } +} -pub type Tag = SmolStr; +#[derive(Debug, Clone, Ord, Eq, PartialEq, PartialOrd)] +pub struct TopicMatcher(TopicFilter); -/// A topic match expression according to MQTT specification, including wildcards -pub type TopicMatcher = SmolStr; +#[derive(Debug)] +pub struct BadTopicMatcher; + +impl TopicMatcher { + pub fn new>(matcher: S) -> Result { + Ok(Self( + TopicFilter::new(matcher).map_err(|_| BadTopicMatcher)?, + )) + } +} + +impl Display for TopicMatcher { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} /// Exclude criteria are evaluated first. /// If no matching exclude criteria the event is emitted. @@ -63,14 +101,14 @@ pub enum FilterCriteria { RemoveIncludeTags { tags: Vec }, /// Exclude events for entities with entity ids matching the given regular expressions, /// unless there is a matching include filter that overrides the exclude. - ExcludeRegexEntityIds { matching: Vec }, + ExcludeRegexEntityIds { matching: Vec }, /// Remove a previously added `ExcludeRegexEntityIds`. - RemoveExcludeRegexEntityIds { matching: Vec }, + RemoveExcludeRegexEntityIds { matching: Vec }, /// Include events for entities with entity ids matching the given regular expressions. /// A matching include overrides a matching exclude. - IncludeRegexEntityIds { matching: Vec }, + IncludeRegexEntityIds { matching: Vec }, /// Remove a previously added `IncludeRegexEntityIds`. - RemoveIncludeRegexEntityIds { matching: Vec }, + RemoveIncludeRegexEntityIds { matching: Vec }, /// Exclude events for entities with the given entity ids, /// unless there is a matching include filter that overrides the exclude. ExcludeEntityIds { entity_ids: Vec }, @@ -97,6 +135,396 @@ pub enum FilterCriteria { /// to include only events matching the topic filter. pub fn exclude_all() -> FilterCriteria { FilterCriteria::ExcludeRegexEntityIds { - matching: vec![EntityIdMatcher::from(".*")], + matching: vec![ComparableRegex(Regex::new(".*").unwrap())], + } +} + +/// A collection of criteria +pub struct Filter { + topic_tag_prefix: Tag, + max_tags: usize, + exclude_tags: Vec, + include_tags: Vec, + max_regex_entity_ids: usize, + exclude_regex_entity_ids: Vec, + include_regex_entity_ids: Vec, + max_entity_ids: usize, + exclude_entity_ids: Vec, + include_entity_ids: Vec, + max_topics: usize, + include_topics: Vec, +} + +impl Default for Filter { + fn default() -> Self { + Self { + topic_tag_prefix: Tag::from("t:"), + max_tags: 10, + exclude_tags: vec![], + include_tags: vec![], + max_regex_entity_ids: 10, + exclude_regex_entity_ids: vec![], + include_regex_entity_ids: vec![], + max_entity_ids: 10, + exclude_entity_ids: vec![], + include_entity_ids: vec![], + max_topics: 10, + include_topics: vec![], + } + } +} + +impl Filter { + pub fn new( + topic_tag_prefix: Tag, + max_tags: usize, + max_regex_entity_ids: usize, + max_entity_ids: usize, + max_topics: usize, + ) -> Self { + Self { + topic_tag_prefix, + max_tags, + exclude_tags: vec![], + include_tags: vec![], + max_regex_entity_ids, + exclude_regex_entity_ids: vec![], + include_regex_entity_ids: vec![], + max_entity_ids, + exclude_entity_ids: vec![], + include_entity_ids: vec![], + max_topics, + include_topics: vec![], + } + } + + /// A function that matches an envelope with criteria and passes it through if matched. + pub fn matches(&self, envelope: &E) -> bool + where + E: WithPersistenceId + WithTags, + { + let tags = envelope.tags(); + let persistence_id = envelope.persistence_id(); + let entity_id = &persistence_id.entity_id; + + if self.matches_exclude_tags(tags) + || self.matches_exclude_entity_ids(entity_id) + || self.matches_exclude_regex_entity_ids(entity_id) + { + self.matches_include_tags(tags) + || self.matches_include_topics(tags) + || self.matches_include_entity_ids(entity_id) + || self.matches_include_regex_entity_ids(entity_id) + } else { + true + } + } + + fn matches_exclude_regex_entity_ids(&self, entity_id: &EntityId) -> bool { + Self::matches_regex_entity_ids(&self.exclude_regex_entity_ids, entity_id) + } + + fn matches_include_regex_entity_ids(&self, entity_id: &EntityId) -> bool { + Self::matches_regex_entity_ids(&self.include_regex_entity_ids, entity_id) + } + + fn matches_exclude_entity_ids(&self, entity_id: &EntityId) -> bool { + Self::matches_entity_ids(&self.exclude_entity_ids, entity_id) + } + + fn matches_include_entity_ids(&self, entity_id: &EntityId) -> bool { + Self::matches_entity_ids(&self.include_entity_ids, entity_id) + } + + fn matches_exclude_tags(&self, tags: &[Tag]) -> bool { + Self::matches_tags(&self.exclude_tags, tags) + } + + fn matches_include_tags(&self, tags: &[Tag]) -> bool { + Self::matches_tags(&self.include_tags, tags) + } + + fn matches_include_topics(&self, tags: &[Tag]) -> bool { + Self::matches_topics(&self.include_topics, &self.topic_tag_prefix, tags) + } + + fn matches_regex_entity_ids(matching: &[ComparableRegex], entity_id: &EntityId) -> bool { + matching.iter().any(|r| r.0.is_match(entity_id)) + } + + fn matches_entity_ids(entity_ids: &[EntityId], entity_id: &EntityId) -> bool { + entity_ids.iter().any(|pi| pi == entity_id) + } + + fn matches_tags(match_tags: &[Tag], tags: &[Tag]) -> bool { + match_tags.iter().any(|mt| tags.iter().any(|t| t == mt)) + } + + fn matches_topics(expressions: &[TopicMatcher], topic_tag_prefix: &Tag, tags: &[Tag]) -> bool { + let topic_tag_prefix_len = topic_tag_prefix.len(); + expressions.iter().any(|r| { + let matcher = r.0.get_matcher(); + tags.iter() + .filter(|t| t.starts_with(topic_tag_prefix.as_str())) + .any(|t| { + let topic_name = TopicNameRef::new(&t[topic_tag_prefix_len..]); + if let Ok(topic_name) = topic_name { + matcher.is_match(topic_name) + } else { + false + } + }) + }) + } + + /// Updates the filter given commands to add or remove new criteria. + pub fn update(&mut self, criteria: Vec) { + for criterion in criteria { + match criterion { + FilterCriteria::ExcludeTags { mut tags } => { + merge(&mut self.exclude_tags, &mut tags, self.max_tags) + } + + FilterCriteria::RemoveExcludeTags { tags } => remove(&mut self.exclude_tags, &tags), + + FilterCriteria::IncludeTags { mut tags } => { + merge(&mut self.include_tags, &mut tags, self.max_tags) + } + + FilterCriteria::RemoveIncludeTags { tags } => remove(&mut self.include_tags, &tags), + + FilterCriteria::ExcludeRegexEntityIds { mut matching } => merge( + &mut self.exclude_regex_entity_ids, + &mut matching, + self.max_regex_entity_ids, + ), + + FilterCriteria::RemoveExcludeRegexEntityIds { matching } => { + remove(&mut self.exclude_regex_entity_ids, &matching) + } + + FilterCriteria::IncludeRegexEntityIds { mut matching } => merge( + &mut self.include_regex_entity_ids, + &mut matching, + self.max_regex_entity_ids, + ), + + FilterCriteria::RemoveIncludeRegexEntityIds { matching } => { + remove(&mut self.include_regex_entity_ids, &matching) + } + + FilterCriteria::ExcludeEntityIds { mut entity_ids } => merge( + &mut self.exclude_entity_ids, + &mut entity_ids, + self.max_entity_ids, + ), + + FilterCriteria::RemoveExcludeEntityIds { entity_ids } => { + remove(&mut self.exclude_entity_ids, &entity_ids) + } + + FilterCriteria::IncludeEntityIds { entity_id_offsets } => merge( + &mut self.include_entity_ids, + &mut entity_id_offsets + .into_iter() + .map(|EntityIdOffset { entity_id, .. }| entity_id) + .collect(), + self.max_entity_ids, + ), + + FilterCriteria::RemoveIncludeEntityIds { entity_ids } => { + remove(&mut self.include_entity_ids, &entity_ids) + } + + FilterCriteria::IncludeTopics { mut expressions } => { + merge(&mut self.include_topics, &mut expressions, self.max_topics) + } + + FilterCriteria::RemoveIncludeTopics { expressions } => { + remove(&mut self.include_topics, &expressions) + } + }; + } + } +} + +fn merge(l: &mut Vec, r: &mut Vec, max_len: usize) +where + T: Ord, +{ + if l.len() < max_len && r.len() < max_len { + l.append(r); + l.sort(); + l.dedup(); + } +} + +fn remove(l: &mut Vec, r: &[T]) +where + T: PartialEq, +{ + l.retain(|existing| !r.contains(existing)); +} + +#[cfg(test)] +mod tests { + + use akka_persistence_rs::PersistenceId; + + use super::*; + + struct TestEnvelope { + persistence_id: PersistenceId, + tags: Vec, + } + + impl WithPersistenceId for TestEnvelope { + fn persistence_id(&self) -> &PersistenceId { + &self.persistence_id + } + } + + impl WithTags for TestEnvelope { + fn tags(&self) -> &[Tag] { + &self.tags + } + } + + #[test] + fn exclude_include_and_remove_include_tag_and_remove_exclude_tag() { + let persistence_id = "a|1".parse::().unwrap(); + let tag = Tag::from("a"); + + let envelope = TestEnvelope { + persistence_id: persistence_id.clone(), + tags: vec![tag.clone()], + }; + + let mut filter = Filter::default(); + + let criteria = vec![ + FilterCriteria::ExcludeTags { + tags: vec![tag.clone()], + }, + FilterCriteria::IncludeTags { + tags: vec![tag.clone()], + }, + ]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveIncludeTags { + tags: vec![tag.clone()], + }]; + filter.update(criteria); + assert!(!filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveExcludeTags { tags: vec![tag] }]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + } + + #[test] + fn exclude_include_and_remove_include_entity_id_and_remove_exclude_entity_id() { + let persistence_id = "a|1".parse::().unwrap(); + let entity_id = persistence_id.entity_id.clone(); + + let envelope = TestEnvelope { + persistence_id: persistence_id.clone(), + tags: vec![], + }; + + let mut filter = Filter::default(); + + let criteria = vec![ + FilterCriteria::ExcludeEntityIds { + entity_ids: vec![entity_id.clone()], + }, + FilterCriteria::IncludeEntityIds { + entity_id_offsets: vec![EntityIdOffset { + entity_id: entity_id.clone(), + seq_nr: 0, + }], + }, + ]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveIncludeEntityIds { + entity_ids: vec![entity_id.clone()], + }]; + filter.update(criteria); + assert!(!filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveExcludeEntityIds { + entity_ids: vec![entity_id.clone()], + }]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + } + + #[test] + fn exclude_include_and_remove_include_regex_entity_id_and_remove_exclude_regex_entity_id() { + let persistence_id = "a|1".parse::().unwrap(); + let matching = ComparableRegex(Regex::new("1").unwrap()); + + let envelope = TestEnvelope { + persistence_id: persistence_id.clone(), + tags: vec![], + }; + + let mut filter = Filter::default(); + + let criteria = vec![ + FilterCriteria::ExcludeRegexEntityIds { + matching: vec![matching.clone()], + }, + FilterCriteria::IncludeRegexEntityIds { + matching: vec![matching.clone()], + }, + ]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveIncludeRegexEntityIds { + matching: vec![matching.clone()], + }]; + filter.update(criteria); + assert!(!filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveExcludeRegexEntityIds { + matching: vec![matching.clone()], + }]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + } + + #[test] + fn include_and_remove_include_topic() { + let persistence_id = "a|1".parse::().unwrap(); + let tag = Tag::from("t:sport/abc/player1"); + let expression = TopicMatcher::new("sport/+/player1").unwrap(); + + let envelope = TestEnvelope { + persistence_id: persistence_id.clone(), + tags: vec![tag.clone()], + }; + + let mut filter = Filter::default(); + + let criteria = vec![ + exclude_all(), + FilterCriteria::IncludeTopics { + expressions: vec![expression.clone()], + }, + ]; + filter.update(criteria); + assert!(filter.matches(&envelope)); + + let criteria = vec![FilterCriteria::RemoveIncludeTopics { + expressions: vec![expression.clone()], + }]; + filter.update(criteria); + assert!(!filter.matches(&envelope)); } } diff --git a/examples/iot-service/backend/src/temperature.rs b/examples/iot-service/backend/src/temperature.rs index 974d20c..e9193b4 100644 --- a/examples/iot-service/backend/src/temperature.rs +++ b/examples/iot-service/backend/src/temperature.rs @@ -5,6 +5,7 @@ use std::{io, num::NonZeroUsize, sync::Arc}; use akka_persistence_rs::{ effect::{self, emit_event, reply, then, unhandled, Effect, EffectExt}, entity::{Context, EventSourcedBehavior}, + EntityType, }; use akka_persistence_rs::{ entity_manager::{self}, @@ -117,6 +118,7 @@ impl EventSourcedBehavior for Behavior { // keys in any way required. pub struct EventEnvelopeMarshaler { + pub entity_type: EntityType, pub events_key_secret_path: Arc, pub secret_store: FileSecretStore, } @@ -134,6 +136,10 @@ const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF; #[async_trait] impl CommitLogMarshaler for EventEnvelopeMarshaler { + fn entity_type(&self) -> EntityType { + self.entity_type.clone() + } + fn to_compaction_key(&self, entity_id: &EntityId, event: &Event) -> Option { let record_type = match event { Event::TemperatureRead { .. } => Some(0), @@ -227,6 +233,7 @@ pub async fn task( let file_log_topic_adapter = CommitLogTopicAdapter::new( commit_log, EventEnvelopeMarshaler { + entity_type: EntityType::from(ENTITY_TYPE), events_key_secret_path: Arc::from(events_key_secret_path), secret_store, }, diff --git a/examples/iot-service/backend/src/temperature_production.rs b/examples/iot-service/backend/src/temperature_production.rs index 01d77de..19bb63d 100644 --- a/examples/iot-service/backend/src/temperature_production.rs +++ b/examples/iot-service/backend/src/temperature_production.rs @@ -4,15 +4,16 @@ use crate::proto; use crate::temperature::{self, EventEnvelopeMarshaler}; use akka_persistence_rs::EntityType; use akka_persistence_rs_commitlog::EventEnvelope as CommitLogEventEnvelope; +use akka_projection_rs::consumer_filter::Filter; use akka_projection_rs_commitlog::CommitLogSourceProvider; -use akka_projection_rs_grpc::producer::GrpcEventProducer; +use akka_projection_rs_grpc::producer::GrpcEventFlow; use akka_projection_rs_grpc::{OriginId, StreamId}; use std::sync::Arc; use std::{path::PathBuf, time::Duration}; use streambed::commit_log::Topic; use streambed_confidant::FileSecretStore; use streambed_logged::FileLog; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tonic::transport::{Channel, Uri}; // Apply sensor observations to a remote consumer. @@ -25,21 +26,25 @@ pub async fn task( kill_switch: oneshot::Receiver<()>, state_storage_path: PathBuf, ) { + let entity_type = EntityType::from(temperature::ENTITY_TYPE); + // Establish a sink of envelopes that will be forwarded // on to a consumer via gRPC event producer. + let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]); let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10); - let grpc_producer = - GrpcEventProducer::new(EntityType::from(temperature::ENTITY_TYPE), grpc_producer); - let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel(); + tokio::spawn(async { - let channel = Channel::builder(event_consumer_addr); + let consumer_endpoint = Channel::builder(event_consumer_addr); + let consumer_connector = || consumer_endpoint.connect(); + akka_projection_rs_grpc::producer::run( - || channel.connect(), + consumer_connector, OriginId::from("edge-iot-service"), StreamId::from("temperature-events"), + consumer_filters, grpc_producer_receiver, task_kill_switch_receiver, ) @@ -51,12 +56,12 @@ pub async fn task( let source_provider = CommitLogSourceProvider::new( commit_log, EventEnvelopeMarshaler { + entity_type: entity_type.clone(), events_key_secret_path: Arc::from(events_key_secret_path), secret_store: secret_store.clone(), }, "iot-service-projection", Topic::from(temperature::EVENTS_TOPIC), - EntityType::from(temperature::ENTITY_TYPE), ); // Optionally transform events from the commit log to an event the @@ -78,18 +83,28 @@ pub async fn task( // to remember the offset consumed from the commit log. This then // permits us to restart from a specific point in the source given // restarts. - // A handler is formed from the gRPC producer. This handler will + // A handler is formed from the gRPC flow. This handler will // call upon the transformer function to, in turn, produce the // gRPC events to a remote consumer. The handler is a "flowing" one // where an upper limit of the number of envelopes in-flight is set. + let grpc_flow = GrpcEventFlow::new(entity_type, grpc_producer); + let producer_filter = |_: &CommitLogEventEnvelope| true; + let consumer_filter = Filter::default(); + let event_handler = grpc_flow.handler( + producer_filter, + consumer_filters_receiver, + consumer_filter, + transformer, + ); + akka_projection_rs_storage::run( &secret_store, &offsets_key_secret_path, &state_storage_path, kill_switch, source_provider, - grpc_producer.handler(transformer), + event_handler, Duration::from_millis(100), ) .await