Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(event_ingestion): Try to adapt to new design #159

Merged
merged 13 commits into from
Jan 17, 2025
Merged
142 changes: 142 additions & 0 deletions eppo_core/src/event_ingestion/auto_flusher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
use std::time::Duration;

use tokio::{sync::mpsc, time::Instant};

use super::BatchedMessage;

/// Auto-flusher forwards all messages from `uplink` to `downlink` unchanged and inserts extra flush
/// requests if it hasn't seen one within the given `period`. In other words, it makes sure that the
/// channel is flushed at least every `period`.
pub(super) async fn auto_flusher<T>(
mut uplink: mpsc::Receiver<BatchedMessage<T>>,
downlink: mpsc::Sender<BatchedMessage<T>>,
period: Duration,
) -> Option<()> {
'flushed: loop {
// Process first message.
let msg = uplink.recv().await?;
let flushed = msg.flush.is_some();
downlink.send(msg).await.ok()?;

// No need to time if we just flushed.
if flushed {
continue;
}

let flush_at = Instant::now() + period;
// loop till we reach flush_at or see a flushed message.
loop {
tokio::select! {
_ = tokio::time::sleep_until(flush_at) => {
downlink.send(BatchedMessage { batch: Vec::new(), flush: Some(()) }).await.ok()?;
continue 'flushed;
},
msg = uplink.recv() => {
let msg = msg?;
let flushed = msg.flush.is_some();
downlink.send(msg).await.ok()?;
if flushed {
continue 'flushed;
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use crate::event_ingestion::auto_flusher;
use crate::event_ingestion::batched_message::BatchedMessage;
use tokio::sync::mpsc;
use tokio::time::{self, Duration};

#[tokio::test]
async fn test_auto_flusher() {
let (uplink_tx, uplink_rx) = mpsc::channel(10);
let (downlink_tx, mut downlink_rx) = mpsc::channel(10);
let flush_period = Duration::from_millis(100);
tokio::spawn(auto_flusher::auto_flusher(
uplink_rx,
downlink_tx,
flush_period,
));

uplink_tx
.send(BatchedMessage {
batch: vec![1, 2, 3],
flush: None,
})
.await
.unwrap();
uplink_tx
.send(BatchedMessage {
batch: vec![4, 5, 6],
flush: None,
})
.await
.unwrap();

// Verify that the messages are forwarded to downlink
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![1, 2, 3],
flush: None
})
);
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![4, 5, 6],
flush: None
})
);

// Wait for the flush period to trigger an auto-flush
time::sleep(flush_period * 2).await;

// Verify the auto-flush behavior
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: Vec::new(),
flush: Some(())
})
);

// Send a flushed message explicitly
uplink_tx
.send(BatchedMessage {
batch: vec![],
flush: Some(()),
})
.await
.unwrap();

// Verify that the flushed message is forwarded immediately
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![],
flush: Some(())
})
);

// Ensure the loop continues and processes further messages
uplink_tx
.send(BatchedMessage {
batch: vec![7, 8, 9],
flush: None,
})
.await
.unwrap();
assert_eq!(
downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![7, 8, 9],
flush: None
})
);
}
}
31 changes: 31 additions & 0 deletions eppo_core/src/event_ingestion/batched_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/// Batched message contain a batch of data and may optionally require processors to flush any processing.
#[derive(Debug, PartialEq)]
pub(super) struct BatchedMessage<T> {
pub batch: Vec<T>,
/// `None` means the message does not require a flush.
/// `Some` contains a list of watchers.
pub flush: Option<()>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason you changed this to Option<()>? (that's basically a bool)

It used to contain oneshot sender, so it can notify the other end when flush is complete. We would wait for it to know when to return from a flush()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly it was just because I couldn't figure out what it was for and this seemed to make things simpler for now. No real reason beyond that

}

impl<T> BatchedMessage<T> {
/// Create a new empty message.
pub fn empty() -> BatchedMessage<T> {
BatchedMessage {
batch: Vec::new(),
flush: None,
}
}

pub fn requires_flush(&self) -> bool {
self.flush.is_some()
}

// Mark the message as successfully flushed, consuming it and notifying any interested parties.
// pub fn flushed(self) {
// if let Some(flush) = self.flush {
// for f in flush {
// f.send(());
// }
// }
// }
}
145 changes: 145 additions & 0 deletions eppo_core/src/event_ingestion/batcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use tokio::sync::mpsc;

use super::BatchedMessage;

/// Batch messages, so they are at least `min_batch_size` size. Push incomplete batch down if flush
/// is received.
///
/// If uplink is closed, send all buffered data downstream and exit.
///
/// If downlink is closed, just exit.
pub(super) async fn batcher<T>(
mut uplink: mpsc::Receiver<BatchedMessage<T>>,
downlink: mpsc::Sender<BatchedMessage<T>>,
min_batch_size: usize,
) -> Option<()> {
let mut uplink_alive = true;
while uplink_alive {
let mut batch = BatchedMessage::empty();

while uplink_alive && batch.batch.len() < min_batch_size && batch.flush.is_none() {
match uplink.recv().await {
None => {
uplink_alive = false;
}
Some(BatchedMessage {
batch: events,
flush,
}) => {
batch.batch.extend(events);
batch.flush = flush;
}
}
}

downlink.send(batch).await.ok()?;
}
None
}

#[cfg(test)]
mod tests {
use crate::event_ingestion::batched_message::BatchedMessage;
use crate::event_ingestion::{auto_flusher, batcher};

#[tokio::test]
async fn test_auto_flusher_and_batcher_pipeline() {
use tokio::sync::mpsc;
use tokio::time::{self, Duration};

// Define test parameters
let flush_period = Duration::from_millis(100);
let min_batch_size = 5;

// Channels for the pipeline
let (flusher_uplink_tx, flusher_uplink_rx) = mpsc::channel(10);
let (flusher_downlink_tx, flusher_downlink_rx) = mpsc::channel(10);
let (batcher_downlink_tx, mut batcher_downlink_rx) = mpsc::channel(10);

// Spawn the auto_flusher and batcher
tokio::spawn(auto_flusher::auto_flusher(
flusher_uplink_rx,
flusher_downlink_tx,
flush_period,
));
tokio::spawn(batcher::batcher(
flusher_downlink_rx,
batcher_downlink_tx,
min_batch_size,
));

// Send some messages to the flusher uplink
flusher_uplink_tx
.send(BatchedMessage {
batch: vec![1, 2, 3],
flush: None,
})
.await
.unwrap();
flusher_uplink_tx
.send(BatchedMessage {
batch: vec![4],
flush: None,
})
.await
.unwrap();

// Verify that the batcher does not output anything until the minimum batch size is met
assert_eq!(
batcher_downlink_rx.try_recv(),
Err(mpsc::error::TryRecvError::Empty)
);

// Wait for the auto_flusher to send a flush message
time::sleep(flush_period * 2).await;

// Verify that the batcher outputs a batch due to the flush
assert_eq!(
batcher_downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![1, 2, 3, 4],
flush: Some(()),
})
);

// Send additional messages to the flusher
flusher_uplink_tx
.send(BatchedMessage {
batch: vec![5, 6, 7, 8, 9],
flush: None,
})
.await
.unwrap();

// Verify that the batcher outputs a batch once the minimum batch size is met
assert_eq!(
batcher_downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![5, 6, 7, 8, 9],
flush: None,
})
);

// Simulate uplink closure
flusher_uplink_tx
.send(BatchedMessage {
batch: vec![10],
flush: None,
})
.await
.unwrap();
drop(flusher_uplink_tx);

// Verify that the batcher flushes the remaining data on uplink closure
assert_eq!(
batcher_downlink_rx.recv().await,
Some(BatchedMessage {
batch: vec![10],
flush: None,
})
);

// Verify that the batcher exits cleanly
assert_eq!(batcher_downlink_rx.recv().await, None);
}
}
70 changes: 70 additions & 0 deletions eppo_core/src/event_ingestion/delivery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use super::{BatchedMessage, Event};
use crate::event_ingestion::event_delivery::{EventDelivery, EventDeliveryError};
use log::warn;
use tokio::sync::mpsc;

pub(super) struct DeliveryStatus {
success: Vec<Event>,
failure: Vec<Event>,
}

pub(super) async fn delivery(
mut uplink: mpsc::Receiver<BatchedMessage<Event>>,
delivery_status: mpsc::Sender<DeliveryStatus>,
event_delivery: EventDelivery,
) -> Option<()> {
loop {
let event_delivery = event_delivery.clone();
match uplink.recv().await {
None => {
return None;
}
Some(BatchedMessage {
batch,
flush: _flush,
}) => {
let result = event_delivery.deliver(batch.clone()).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deliver() should take the batch by reference, so we don't have to copy the whole batch of events

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked ChatGPT and apparently handling deserialization with references is more tricky than I thought. i'll punt this to a follow up PR as well, please feel free to send a PR demonstrating how to do this cleanly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, deserialization is more tricky but delivery() only needs serialization (which doesn't get complicated with references)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, i'll give it another shot

match result {
Ok(response) => {
let failed_event_uuids = response.failed_events;
if !failed_event_uuids.is_empty() {
warn!("Failed to deliver {} events", failed_event_uuids.len());
let mut success = Vec::new();
let mut failure = Vec::new();
batch.into_iter().for_each(|queued_event| {
if failed_event_uuids.contains(&queued_event.uuid) {
failure.push(queued_event);
} else {
success.push(queued_event);
}
});
delivery_status
.send(DeliveryStatus { success, failure })
.await
.ok()?;
}
}
Err(err) => {
match err {
EventDeliveryError::RetriableError(_) => {
// Retry later
delivery_status
.send(DeliveryStatus {
failure: batch,
success: Vec::new(),
})
.await
.ok()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrying inline is the simplest thing we can do for v1. (In that case, we can ignore delivery statuses for now)

You may take a look at exponential-backoff for inspiration (or feel free to use the library)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the recommendation, will address that next

}
EventDeliveryError::NonRetriableError(_) => {
warn!("Failed to deliver events: {}", err);
// In this case there is no point in retrying delivery since the error is
// non-retriable.
}
}
}
}
}
}
}
}
Loading
Loading