Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer supplied filters when producing, along with a producer filter #36

Merged
merged 19 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 109 additions & 2 deletions akka-projection-rs-grpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#![doc = include_str!("../README.md")]

use akka_persistence_rs::{Offset, PersistenceId, TimestampOffset, WithOffset};
use akka_projection_rs::consumer_filter::{EntityIdOffset, FilterCriteria};
use akka_persistence_rs::{
EntityId, Offset, PersistenceId, TimestampOffset, WithEntityId, WithOffset,
};
use akka_projection_rs::consumer_filter::{
EntityIdMatcher, EntityIdOffset, FilterCriteria, Tag, TopicMatcher,
};
use smol_str::SmolStr;

pub mod consumer;
Expand All @@ -17,6 +21,12 @@ pub struct EventEnvelope<E> {
pub offset: TimestampOffset,
}

impl<E> WithEntityId for EventEnvelope<E> {
fn entity_id(&self) -> EntityId {
self.persistence_id.entity_id.clone()
}
}

impl<E> WithOffset for EventEnvelope<E> {
fn offset(&self) -> Offset {
Offset::Timestamp(self.offset.clone())
Expand Down Expand Up @@ -133,3 +143,100 @@ impl From<FilterCriteria> for proto::FilterCriteria {
}
}
}

/// Declares that a protobuf criteria is unable to be converted
/// due to there being no message.
pub struct NoMessage;

impl TryFrom<proto::FilterCriteria> for FilterCriteria {
type Error = NoMessage;

fn try_from(value: proto::FilterCriteria) -> Result<Self, Self::Error> {
match value.message {
Some(message) => {
let criteria = match message {
proto::filter_criteria::Message::ExcludeTags(proto::ExcludeTags { tags }) => {
FilterCriteria::ExcludeTags {
tags: tags.into_iter().map(Tag::from).collect(),
}
}
proto::filter_criteria::Message::RemoveExcludeTags(
proto::RemoveExcludeTags { tags },
) => FilterCriteria::RemoveExcludeTags {
tags: tags.into_iter().map(Tag::from).collect(),
},
proto::filter_criteria::Message::IncludeTags(proto::IncludeTags { tags }) => {
FilterCriteria::IncludeTags {
tags: tags.into_iter().map(Tag::from).collect(),
}
}
proto::filter_criteria::Message::RemoveIncludeTags(
proto::RemoveIncludeTags { tags },
) => FilterCriteria::RemoveIncludeTags {
tags: tags.into_iter().map(Tag::from).collect(),
},
proto::filter_criteria::Message::ExcludeMatchingEntityIds(
proto::ExcludeRegexEntityIds { matching },
) => FilterCriteria::ExcludeRegexEntityIds {
matching: matching.into_iter().map(EntityIdMatcher::from).collect(),
},
proto::filter_criteria::Message::RemoveExcludeMatchingEntityIds(
proto::RemoveExcludeRegexEntityIds { matching },
) => FilterCriteria::RemoveExcludeRegexEntityIds {
matching: matching.into_iter().map(EntityIdMatcher::from).collect(),
},
proto::filter_criteria::Message::IncludeMatchingEntityIds(
proto::IncludeRegexEntityIds { matching },
) => FilterCriteria::IncludeRegexEntityIds {
matching: matching.into_iter().map(EntityIdMatcher::from).collect(),
},
proto::filter_criteria::Message::RemoveIncludeMatchingEntityIds(
proto::RemoveIncludeRegexEntityIds { matching },
) => FilterCriteria::RemoveIncludeRegexEntityIds {
matching: matching.into_iter().map(EntityIdMatcher::from).collect(),
},
proto::filter_criteria::Message::ExcludeEntityIds(
proto::ExcludeEntityIds { entity_ids },
) => FilterCriteria::ExcludeEntityIds {
entity_ids: entity_ids.into_iter().map(EntityId::from).collect(),
},
proto::filter_criteria::Message::RemoveExcludeEntityIds(
proto::RemoveExcludeEntityIds { entity_ids },
) => FilterCriteria::RemoveExcludeEntityIds {
entity_ids: entity_ids.into_iter().map(EntityId::from).collect(),
},
proto::filter_criteria::Message::IncludeEntityIds(
proto::IncludeEntityIds { entity_id_offset },
) => FilterCriteria::IncludeEntityIds {
entity_id_offsets: entity_id_offset
.into_iter()
.map(
|proto::EntityIdOffset { entity_id, seq_nr }| EntityIdOffset {
entity_id: EntityId::from(entity_id),
seq_nr: seq_nr as u64,
},
)
.collect(),
},
proto::filter_criteria::Message::RemoveIncludeEntityIds(
proto::RemoveIncludeEntityIds { entity_ids },
) => FilterCriteria::RemoveIncludeEntityIds {
entity_ids: entity_ids.into_iter().map(EntityId::from).collect(),
},
proto::filter_criteria::Message::IncludeTopics(proto::IncludeTopics {
expression,
}) => FilterCriteria::IncludeTopics {
expressions: expression.into_iter().map(TopicMatcher::from).collect(),
},
proto::filter_criteria::Message::RemoveIncludeTopics(
proto::RemoveIncludeTopics { expression },
) => FilterCriteria::RemoveIncludeTopics {
expressions: expression.into_iter().map(TopicMatcher::from).collect(),
},
};
Ok(criteria)
}
None => Err(NoMessage),
}
}
}
27 changes: 24 additions & 3 deletions akka-projection-rs-grpc/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use akka_persistence_rs::TimestampOffset;
use akka_persistence_rs::WithEntityId;
use akka_persistence_rs::WithSeqNr;
use akka_persistence_rs::WithTimestampOffset;
use akka_projection_rs::consumer_filter;
use akka_projection_rs::consumer_filter::FilterCriteria;
use akka_projection_rs::HandlerError;
use akka_projection_rs::PendingHandler;
use async_stream::stream;
use async_trait::async_trait;
use futures::future;
use log::debug;
use log::warn;
use prost::Name;
Expand All @@ -20,6 +23,7 @@ use std::marker::PhantomData;
use std::pin::Pin;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tonic::Request;
Expand All @@ -45,6 +49,7 @@ where
F: Fn(&EI) -> Option<E>,
{
producer: GrpcEventProducer<E>,
consumer_filters_receiver: watch::Receiver<Vec<FilterCriteria>>,
transformer: F,
phantom: PhantomData<EI>,
}
Expand All @@ -64,7 +69,14 @@ where
&mut self,
envelope: Self::Envelope,
) -> Result<Pin<Box<dyn Future<Output = Result<(), HandlerError>> + Send>>, HandlerError> {
let event = (self.transformer)(&envelope);
let event = if consumer_filter::matches(&envelope, &self.consumer_filters_receiver.borrow())
{
(self.transformer)(&envelope)
} else {
// Gaps are ok given a filter situation.
return Ok(Box::pin(future::ready(Ok(()))));
};

let transformation = Transformation {
entity_id: envelope.entity_id(),
seq_nr: envelope.seq_nr(),
Expand Down Expand Up @@ -99,12 +111,17 @@ impl<E> GrpcEventProducer<E> {
}
}

pub fn handler<EI, F>(self, transformer: F) -> GrpcEventProcessor<E, EI, F>
pub fn handler<EI, F>(
self,
consumer_filters_receiver: watch::Receiver<Vec<FilterCriteria>>,
transformer: F,
) -> GrpcEventProcessor<E, EI, F>
where
F: Fn(&EI) -> Option<E>,
{
GrpcEventProcessor {
producer: self,
consumer_filters_receiver,
transformer,
phantom: PhantomData,
}
Expand Down Expand Up @@ -142,6 +159,7 @@ pub async fn run<E, EC, ECR>(
event_consumer_channel: EC,
origin_id: StreamId,
stream_id: StreamId,
consumer_filters: watch::Sender<Vec<FilterCriteria>>,
mut envelopes: mpsc::Receiver<(EventEnvelope<E>, oneshot::Sender<()>)>,
mut kill_switch: oneshot::Receiver<()>,
) where
Expand Down Expand Up @@ -241,8 +259,9 @@ pub async fn run<E, EC, ECR>(
Some(Ok(proto::ConsumeEventOut {
message: Some(message),
})) = stream_outs.next() => match message {
proto::consume_event_out::Message::Start(proto::ConsumerEventStart { .. }) => {
proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter }) => {
debug!("Starting the protocol");
consumer_filters.send(filter.into_iter().flat_map(|f| f.try_into()).collect()).unwrap();
break;
}
_ => {
Expand Down Expand Up @@ -408,6 +427,7 @@ mod tests {
.unwrap();
});

let (consumer_filters, _) = watch::channel(vec![]);
let (sender, receiver) = mpsc::channel(10);
let (_task_kill_switch, task_kill_switch_receiver) = oneshot::channel();
tokio::spawn(async move {
Expand All @@ -416,6 +436,7 @@ mod tests {
|| channel.connect(),
OriginId::from("some-origin-id"),
StreamId::from("some-stream-id"),
consumer_filters,
receiver,
task_kill_switch_receiver,
)
Expand Down
11 changes: 10 additions & 1 deletion akka-projection-rs/src/consumer_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! * IncludeRegexEntityIds - include events for entities with entity ids matching the given regular expressions
//! * IncludeEntityIds - include events for entities with the given entity ids

use akka_persistence_rs::EntityId;
use akka_persistence_rs::{EntityId, WithEntityId};
use smol_str::SmolStr;

#[derive(Clone)]
Expand Down Expand Up @@ -100,3 +100,12 @@ pub fn exclude_all() -> FilterCriteria {
matching: vec![EntityIdMatcher::from(".*")],
}
}

/// A function that matches an envelope with criteria and passes it through if matched.
pub fn matches<E>(_envelope: &E, _consumer_filters: &[FilterCriteria]) -> bool
where
E: WithEntityId,
{
// TODO
todo!()
}
6 changes: 4 additions & 2 deletions examples/iot-service/backend/src/temperature_production.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{path::PathBuf, time::Duration};
use streambed::commit_log::Topic;
use streambed_confidant::FileSecretStore;
use streambed_logged::FileLog;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};
use tonic::transport::{Channel, Uri};

// Apply sensor observations to a remote consumer.
Expand All @@ -28,6 +28,7 @@ pub async fn task(
// Establish a sink of envelopes that will be forwarded
// on to a consumer via gRPC event producer.

let (consumer_filters, consumer_filters_receiver) = watch::channel(vec![]);
huntc marked this conversation as resolved.
Show resolved Hide resolved
let (grpc_producer, grpc_producer_receiver) = mpsc::channel(10);

let grpc_producer =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, isn't it confusing to reuse the same name grpc_producer for a new thing, or is that a rust thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d say that it is a more general personal style thing. Rust linting via clippy has no problem with it. Clippy is king. Perhaps I could use a different name here though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit 8730696 deals with this.

Expand All @@ -40,6 +41,7 @@ pub async fn task(
|| channel.connect(),
OriginId::from("edge-iot-service"),
StreamId::from("temperature-events"),
consumer_filters,
grpc_producer_receiver,
task_kill_switch_receiver,
)
Expand Down Expand Up @@ -89,7 +91,7 @@ pub async fn task(
&state_storage_path,
kill_switch,
source_provider,
grpc_producer.handler(transformer),
grpc_producer.handler(consumer_filters_receiver, transformer),
Duration::from_millis(100),
)
.await
Expand Down
Loading