Skip to content

Commit

Permalink
chore(event_ingestion): Try to adapt to new design (#159)
Browse files Browse the repository at this point in the history
* chore(event_ingestion): Try to adapt to new design

* add `sync` flag for `tokio` dependency (#160)

* add sync feature to tokio crate

* add changeset

* chore: bump versions before release (#161)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* chore: fix typo in publish.yml update_ruby_lockfile step (#162)

* chore(ruby): update lockfile (#163)

* chore: hide event ingestion under a feature flag (#164)

* chore: bump versions before release (#165)

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* chore(ruby): update lockfile (#166)

* enable feature, use tokio timer advance

* more code review comments

* fix failing tests

* DRY up

---------

Co-authored-by: Leo Romanovsky <[email protected]>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Oleksii Shmalko <[email protected]>
  • Loading branch information
5 people authored Jan 17, 2025
1 parent f3e1006 commit 903eae4
Show file tree
Hide file tree
Showing 29 changed files with 752 additions and 370 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:

- run: npm ci
- run: rustup update ${{ matrix.toolchain }} && rustup default ${{ matrix.toolchain }}
- run: cargo build --verbose --all-targets --workspace
- run: cargo build --verbose --all-targets --workspace --features "event_ingestion"

- run: cargo test --verbose --workspace
- run: cargo test --verbose --workspace --features "event_ingestion"
- run: cargo doc --verbose
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:

update_ruby_lockfile:
name: Update Ruby SDK lockfile
run-on: ubuntu-latest
runs-on: ubuntu-latest
if: ${{ startsWith(github.ref_name, 'eppo_core@') }}
needs: publish
steps:
Expand Down
12 changes: 12 additions & 0 deletions eppo_core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# eppo_core

## 7.0.2

### Patch Changes

- [#164](https://github.com/Eppo-exp/eppo-multiplatform/pull/164) [`aa0ca89`](https://github.com/Eppo-exp/eppo-multiplatform/commit/aa0ca8912bab269613d3da25c06f81b1f19ffb36) Thanks [@rasendubi](https://github.com/rasendubi)! - Hide event ingestion under a feature flag.

## 7.0.1

### Patch Changes

- [#160](https://github.com/Eppo-exp/eppo-multiplatform/pull/160) [`82d05ae`](https://github.com/Eppo-exp/eppo-multiplatform/commit/82d05aea0263639be56ba5667500f6940b4832ab) Thanks [@leoromanovsky](https://github.com/leoromanovsky)! - add sync feature to tokio crate

## 7.0.0

### Major Changes
Expand Down
16 changes: 10 additions & 6 deletions eppo_core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "eppo_core"
version = "7.0.0"
version = "7.0.2"
edition = "2021"
description = "Eppo SDK core library"
repository = "https://github.com/Eppo-exp/rust-sdk"
Expand All @@ -10,6 +10,10 @@ categories = ["config"]
rust-version = "1.75.0"

[features]
# TODO: Remove this line before merging
default = ["event_ingestion"]
# Unstable feature flag for an upcoming feature.
event_ingestion = ["dep:uuid", "dep:linked_hash_set"]
# Add implementation of `FromPyObject`/`ToPyObject` for some types.
pyo3 = ["dep:pyo3", "dep:serde-pyobject"]
# Add implementation of `TryConvert`/`IntoValue` for some types.
Expand All @@ -23,7 +27,7 @@ vendored = ["reqwest/native-tls-vendored"]
[dependencies]
base64 = "0.22.1"
chrono = { version = "0.4.38", features = ["serde"] }
derive_more = { version = "1.0.0", features = ["from", "into"] }
derive_more = { version = "1.0.0", default-features = false, features = ["from", "into"] }
faststr = { version = "0.2.23", features = ["serde"] }
log = { version = "0.4.21", features = ["kv", "kv_serde"] }
md5 = "0.7.0"
Expand All @@ -36,14 +40,14 @@ serde-bool = "0.1.3"
serde_json = "1.0.116"
serde_with = { version = "3.11.0", default-features = false, features = ["base64", "hex", "macros"] }
thiserror = "2.0.3"
linked_hash_set = "0.1.5"
tokio = { version = "1.34.0", features = ["rt", "time", "macros"] }
linked_hash_set = { version = "0.1.5", optional = true }
tokio = { version = "1.34.0", default-features = false, features = ["macros", "sync", "rt", "time", "test-util"] }
url = "2.5.0"
uuid = { version = "1.11.0", features = ["v4", "serde"] }
uuid = { version = "1.11.0", features = ["v4", "serde"], optional = true }

# pyo3 dependencies
pyo3 = { version = "0.22.0", optional = true, default-features = false }
serde-pyobject = { version = "0.4.0", optional = true}
serde-pyobject = { version = "0.4.0", optional = true }

# magnus dependencies
magnus = { version = "0.6.4", default-features = false, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion eppo_core/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"private": true,
"name": "eppo_core",
"version": "7.0.0",
"version": "7.0.2",
"dependencies": {},
"devDependencies": {},
"scripts": {
Expand Down
142 changes: 142 additions & 0 deletions eppo_core/src/event_ingestion/auto_flusher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use std::time::Duration;

use tokio::{sync::mpsc, time::Instant};

use super::BatchedMessage;

/// Auto-flusher forwards all messages from `uplink` to `downlink` unchanged and inserts extra flush
/// requests if it hasn't seen one within the given `period`. In other words, it makes sure that the
/// channel is flushed at least every `period`.
pub(super) async fn auto_flusher<T>(
mut uplink: mpsc::Receiver<BatchedMessage<T>>,
downlink: mpsc::Sender<BatchedMessage<T>>,
period: Duration,
) -> Option<()> {
'flushed: loop {
// Process first message.
let msg = uplink.recv().await?;
let flushed = msg.flush.is_some();
downlink.send(msg).await.ok()?;

// No need to time if we just flushed.
if flushed {
continue;
}

let flush_at = Instant::now() + period;
// loop till we reach flush_at or see a flushed message.
loop {
tokio::select! {
_ = tokio::time::sleep_until(flush_at) => {
downlink.send(BatchedMessage { batch: Vec::new(), flush: Some(()) }).await.ok()?;
continue 'flushed;
},
msg = uplink.recv() => {
let msg = msg?;
let flushed = msg.flush.is_some();
downlink.send(msg).await.ok()?;
if flushed {
continue 'flushed;
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use crate::event_ingestion::auto_flusher;
use crate::event_ingestion::batched_message::BatchedMessage;
use tokio::sync::mpsc;
use tokio::time::{Duration};

#[tokio::test(start_paused = true)]
async fn test_auto_flusher() {
let (uplink_tx, uplink_rx) = mpsc::channel(10);
let (downlink_tx, mut downlink_rx) = mpsc::channel(10);
let flush_period = Duration::from_millis(100);
tokio::spawn(auto_flusher::auto_flusher(
uplink_rx,
downlink_tx,
flush_period,
));

uplink_tx
.send(BatchedMessage {
batch: vec![1, 2, 3],
flush: None,
})
.await
.unwrap();
uplink_tx
.send(BatchedMessage {
batch: vec![4, 5, 6],
flush: None,
})
.await
.unwrap();

// Verify that the messages are forwarded to downlink
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![1, 2, 3],
flush: None
})
);
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![4, 5, 6],
flush: None
})
);

// Wait for the flush period to trigger an auto-flush
tokio::time::advance(flush_period * 2).await;

// Verify the auto-flush behavior
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: Vec::new(),
flush: Some(())
})
);

// Send a flushed message explicitly
uplink_tx
.send(BatchedMessage {
batch: vec![],
flush: Some(()),
})
.await
.unwrap();

// Verify that the flushed message is forwarded immediately
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![],
flush: Some(())
})
);

// Ensure the loop continues and processes further messages
uplink_tx
.send(BatchedMessage {
batch: vec![7, 8, 9],
flush: None,
})
.await
.unwrap();
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![7, 8, 9],
flush: None
})
);
}
}
39 changes: 39 additions & 0 deletions eppo_core/src/event_ingestion/batched_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/// Batched message contain a batch of data and may optionally require processors to flush any processing.
#[derive(Debug, PartialEq)]
pub(super) struct BatchedMessage<T> {
pub batch: Vec<T>,
/// `None` means the message does not require a flush.
/// `Some` contains a list of watchers.
pub flush: Option<()>,
}

impl<T> BatchedMessage<T> {
/// Create a new empty message.
pub fn empty() -> BatchedMessage<T> {
BatchedMessage {
batch: Vec::new(),
flush: None,
}
}

/// Create a new message with a batch of data and optionally a list of watchers.
pub fn new(batch: Vec<T>, flush: Option<()>) -> BatchedMessage<T> {
BatchedMessage {
batch,
flush,
}
}

pub fn requires_flush(&self) -> bool {
self.flush.is_some()
}

// Mark the message as successfully flushed, consuming it and notifying any interested parties.
// pub fn flushed(self) {
// if let Some(flush) = self.flush {
// for f in flush {
// f.send(());
// }
// }
// }
}
Loading

0 comments on commit 903eae4

Please sign in to comment.