Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer supplied filters when producing, along with a producer filter #36

Merged
merged 19 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ itoa = "1.0"
js-sys = "0.3.60"
log = "0.4.17"
lru = "0.11.0"
mqtt-protocol = "0.11.2"
postcard = { version = "1.0.6", default-features = false }
prost = { version = "0.12.0" }
prost-build = { version = "0.12.0" }
prost-types = { version = "0.12.0" }
rand = "0.8"
regex = "1.9.6"
scopeguard = "1.1"
serde = "1.0.151"
serde_json = "1.0.107"
Expand Down
65 changes: 34 additions & 31 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

use akka_persistence_rs::{
entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider},
EntityId, Offset, TimestampOffset, WithEntityId, WithOffset, WithSeqNr, WithTimestampOffset,
EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset,
WithPersistenceId, WithSeqNr, WithTags, WithTimestampOffset,
};
use async_stream::stream;
use async_trait::async_trait;
Expand All @@ -19,39 +20,22 @@ use streambed::{
use tokio_stream::{Stream, StreamExt};

/// An envelope wraps a commit log event associated with a specific entity.
/// Tags are not presently considered useful at the edge. A remote consumer would be interested
/// in all events from the edge in most cases, and the edge itself decides what to publish
/// (producer defined filter).
#[derive(Clone, Debug, PartialEq)]
pub struct EventEnvelope<E> {
pub entity_id: EntityId,
pub persistence_id: PersistenceId,
pub seq_nr: u64,
pub timestamp: DateTime<Utc>,
pub event: E,
pub offset: CommitLogOffset,
pub tags: Vec<Tag>,
}

impl<E> EventEnvelope<E> {
pub fn new<EI>(
entity_id: EI,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: E,
offset: CommitLogOffset,
) -> Self
where
EI: Into<EntityId>,
{
Self {
entity_id: entity_id.into(),
seq_nr,
timestamp,
event,
offset,
}
}
}

impl<E> WithEntityId for EventEnvelope<E> {
fn entity_id(&self) -> EntityId {
self.entity_id.clone()
impl<E> WithPersistenceId for EventEnvelope<E> {
fn persistence_id(&self) -> &PersistenceId {
&self.persistence_id
}
}

Expand All @@ -67,6 +51,12 @@ impl<E> WithSeqNr for EventEnvelope<E> {
}
}

impl<E> WithTags for EventEnvelope<E> {
fn tags(&self) -> &[akka_persistence_rs::Tag] {
&self.tags
}
}

impl<E> WithTimestampOffset for EventEnvelope<E> {
fn timestamp_offset(&self) -> TimestampOffset {
TimestampOffset {
Expand All @@ -84,6 +74,9 @@ pub trait CommitLogMarshaler<E>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
/// Declares the entity type to the marshaler.
fn entity_type(&self) -> EntityType;

/// Provide a key we can use for the purposes of log compaction.
/// A key would generally comprise and event type value held in
/// the high bits, and the entity id in the lower bits.
Expand Down Expand Up @@ -159,8 +152,13 @@ where
}
});
seq_nr.and_then(|seq_nr| {
record.timestamp.map(|timestamp| {
EventEnvelope::new(entity_id, seq_nr, timestamp, event, record.offset)
record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr,
timestamp,
event,
offset: record.offset,
tags: vec![],
})
})
})
Expand Down Expand Up @@ -291,7 +289,7 @@ where
marshaler.envelope(record_entity_id, consumer_record).await
{
yield EntityManagerEventEnvelope::new(
envelope.entity_id,
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
Expand Down Expand Up @@ -328,7 +326,7 @@ where
marshaler.envelope(record_entity_id, consumer_record).await
{
yield EntityManagerEventEnvelope::new(
envelope.entity_id,
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
Expand Down Expand Up @@ -476,6 +474,10 @@ mod tests {

#[async_trait]
impl CommitLogMarshaler<MyEvent> for MyEventMarshaler {
fn entity_type(&self) -> EntityType {
EntityType::from("some-entity-type")
}

fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
panic!("should not be called")
}
Expand All @@ -496,11 +498,12 @@ mod tests {
let value = String::from_utf8(record.value).ok()?;
let event = MyEvent { value };
record.timestamp.map(|timestamp| EventEnvelope {
entity_id,
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr: 1,
timestamp,
event,
offset: 0,
tags: vec![],
})
}

Expand Down
21 changes: 15 additions & 6 deletions akka-persistence-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,29 @@ use std::{

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;

pub mod effect;
pub mod entity;
pub mod entity_manager;

/// Uniquely identifies the type of an Entity.
pub type EntityType = smol_str::SmolStr;
pub type EntityType = SmolStr;

/// Uniquely identifies an entity, or entity instance.
pub type EntityId = smol_str::SmolStr;
pub type EntityId = SmolStr;

/// Implemented by structures that can return an entity id.
pub trait WithEntityId {
fn entity_id(&self) -> EntityId;
/// Tags annotate an entity's events
pub type Tag = SmolStr;

/// Implemented by structures that can return a persistence id.
pub trait WithPersistenceId {
fn persistence_id(&self) -> &PersistenceId;
}

/// Implemented by structures that can return tags.
pub trait WithTags {
fn tags(&self) -> &[Tag];
}

/// A slice is deterministically defined based on the persistence id.
Expand Down Expand Up @@ -63,7 +72,7 @@ fn jdk_string_hashcode(s: &str) -> i32 {
}

/// A namespaced entity id given an entity type.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, Deserialize, PartialOrd, Ord, Serialize, PartialEq, Eq, Hash)]
pub struct PersistenceId {
pub entity_type: EntityType,
pub entity_id: EntityId,
Expand Down
29 changes: 12 additions & 17 deletions akka-projection-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod offset_store;

use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin};

use akka_persistence_rs::{EntityType, Offset, PersistenceId};
use akka_persistence_rs::{Offset, PersistenceId};
use akka_persistence_rs_commitlog::{CommitLogMarshaler, EventEnvelope};
use akka_projection_rs::SourceProvider;
use async_stream::stream;
Expand All @@ -17,7 +17,6 @@ use tokio_stream::{Stream, StreamExt};
pub struct CommitLogSourceProvider<CL, E, M> {
commit_log: CL,
consumer_group_name: String,
entity_type: EntityType,
marshaler: M,
slice_range: Range<u32>,
topic: Topic,
Expand All @@ -30,13 +29,7 @@ where
M: CommitLogMarshaler<E> + Sync,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
pub fn new(
commit_log: CL,
marshaler: M,
consumer_group_name: &str,
topic: Topic,
entity_type: EntityType,
) -> Self {
pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self {
// When it comes to having a projection sourced from a local
// commit log, there's little benefit if having many of them.
// We therefore manage all slices from just one projection.
Expand All @@ -47,7 +40,6 @@ where
marshaler,
consumer_group_name,
topic,
entity_type,
slice_range.get(0).cloned().unwrap(),
)
}
Expand All @@ -57,7 +49,6 @@ where
marshaler: M,
consumer_group_name: &str,
topic: Topic,
entity_type: EntityType,
slice_range: Range<u32>,
) -> Self {
Self {
Expand All @@ -66,7 +57,6 @@ where
marshaler,
slice_range,
topic,
entity_type,
phantom: PhantomData,
}
}
Expand Down Expand Up @@ -102,7 +92,7 @@ where
while let Some(consumer_record) = records.next().await {
if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) {
let persistence_id =
PersistenceId::new(self.entity_type.clone(), record_entity_id);
PersistenceId::new(marshaler.entity_type(), record_entity_id);
if self.slice_range.contains(&persistence_id.slice()) {
if let Some(envelope) = marshaler
.envelope(persistence_id.entity_id, consumer_record)
Expand Down Expand Up @@ -144,7 +134,7 @@ mod tests {
use std::{env, fs};

use super::*;
use akka_persistence_rs::EntityId;
use akka_persistence_rs::{EntityId, EntityType};
use chrono::{DateTime, Utc};
use serde::Deserialize;
use streambed::commit_log::{ConsumerRecord, Header, Key, ProducerRecord};
Expand Down Expand Up @@ -173,6 +163,10 @@ mod tests {

#[async_trait]
impl CommitLogMarshaler<MyEvent> for MyEventMarshaler {
fn entity_type(&self) -> EntityType {
EntityType::from("some-topic")
}

fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
panic!("should not be called")
}
Expand All @@ -193,11 +187,12 @@ mod tests {
let value = String::from_utf8(record.value).ok()?;
let event = MyEvent { value };
record.timestamp.map(|timestamp| EventEnvelope {
entity_id,
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr: 1,
timestamp,
event,
offset: 0,
tags: vec![],
})
}

Expand Down Expand Up @@ -239,6 +234,7 @@ mod tests {

let entity_type = EntityType::from("some-topic");
let entity_id = EntityId::from("some-entity");
let persistence_id = PersistenceId::new(entity_type.clone(), entity_id.clone());
let topic = Topic::from("some-topic");
let event_value = "some value".to_string();

Expand All @@ -263,13 +259,12 @@ mod tests {
MyEventMarshaler,
"some-consumer",
topic,
entity_type,
);

let mut envelopes = source_provider.source(|| async { None }).await;
let envelope = envelopes.next().await.unwrap();

assert_eq!(envelope.entity_id, entity_id,);
assert_eq!(envelope.persistence_id, persistence_id);
assert_eq!(envelope.event, MyEvent { value: event_value },);
}
}
16 changes: 13 additions & 3 deletions akka-projection-rs-commitlog/src/offset_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::num::NonZeroUsize;

use akka_persistence_rs::{entity_manager, EntityId, Message};
use akka_persistence_rs::{entity_manager, EntityId, EntityType, Message, PersistenceId};
use akka_persistence_rs_commitlog::{CommitLogMarshaler, CommitLogTopicAdapter, EventEnvelope};
use akka_projection_rs::offset_store;
use async_trait::async_trait;
Expand All @@ -13,6 +13,7 @@ use streambed_logged::{compaction::KeyBasedRetention, FileLog};
use tokio::sync::mpsc;

struct OffsetStoreEventMarshaler<F> {
entity_type: EntityType,
to_compaction_key: F,
}

Expand All @@ -21,6 +22,10 @@ impl<F> CommitLogMarshaler<offset_store::Event> for OffsetStoreEventMarshaler<F>
where
F: Fn(&EntityId, &offset_store::Event) -> Option<Key> + Send + Sync,
{
fn entity_type(&self) -> EntityType {
self.entity_type.clone()
}

fn to_compaction_key(&self, entity_id: &EntityId, event: &offset_store::Event) -> Option<Key> {
(self.to_compaction_key)(entity_id, event)
}
Expand All @@ -41,11 +46,12 @@ where
let value = u64::from_be_bytes(record.value.try_into().ok()?);
let event = offset_store::Event::Saved { seq_nr: value };
record.timestamp.map(|timestamp| EventEnvelope {
entity_id,
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr: 0, // We don't care about sequence numbers with the offset store as they won't be projected anywhere
timestamp,
event,
offset: record.offset,
tags: vec![],
})
}

Expand Down Expand Up @@ -85,6 +91,7 @@ pub async fn run(
offset_store_receiver: mpsc::Receiver<Message<offset_store::Command>>,
to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option<Key> + Send + Sync + 'static,
) {
let events_entity_type = EntityType::from(offset_store_id.clone());
let events_topic = Topic::from(offset_store_id.clone());

commit_log
Expand All @@ -94,7 +101,10 @@ pub async fn run(

let file_log_topic_adapter = CommitLogTopicAdapter::new(
commit_log,
OffsetStoreEventMarshaler { to_compaction_key },
OffsetStoreEventMarshaler {
entity_type: events_entity_type,
to_compaction_key,
},
&offset_store_id,
events_topic,
);
Expand Down
2 changes: 2 additions & 0 deletions akka-projection-rs-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ chrono = { workspace = true }
exponential-backoff = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
mqtt-protocol = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
regex = { workspace = true }
smol_str = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
Loading