diff --git a/Cargo.lock b/Cargo.lock index 43b0f899f7..f4422331ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2670,6 +2670,7 @@ name = "gazette" version = "0.0.0" dependencies = [ "async-compression", + "async-trait", "bytes", "coroutines", "doc", diff --git a/crates/gazette/Cargo.toml b/crates/gazette/Cargo.toml index df1ad2da8e..ec59486477 100644 --- a/crates/gazette/Cargo.toml +++ b/crates/gazette/Cargo.toml @@ -18,6 +18,7 @@ proto-grpc = { path = "../proto-grpc", features = [ ] } async-compression = { workspace = true } +async-trait = { workspace = true } bytes = { workspace = true } futures = { workspace = true } futures-core = { workspace = true } diff --git a/crates/gazette/src/journal/append_stream.rs b/crates/gazette/src/journal/append_stream.rs new file mode 100644 index 0000000000..9ada96b1d0 --- /dev/null +++ b/crates/gazette/src/journal/append_stream.rs @@ -0,0 +1,104 @@ +use super::Client; +use async_trait::async_trait; +use bytes::BytesMut; +use futures::{Future, Stream, StreamExt}; +use proto_gazette::broker::{self, AppendRequest}; + +const APPEND_BUFFER_LIMIT: usize = 2 ^ 22; + +impl Client { + /// Appends a stream of [`FramedMessage`]s in-order via a sequence of batched append RPCs. + /// Returns a Stream of [`RetryResult`]s containing one [`broker::AppendResponse`] per + /// successful append, and any number of [`RetryError`]s. Just like [`journal::Client::append()`], + /// after getting an `Err` you can continue to poll the stream to retry. + /// + /// While `Client::append()` is suitable for one-off appends of a single buffer, `append_stream` + /// is for continuously appending an ordered stream of messages. Messages are buffered up to + /// 4MB (APPEND_BUFFER_LIMIT) if an append is already in-flight. If the buffer is full, + /// backpressure is applied by pausing consumption of the input stream. + pub fn append_stream<'a, S>( + &'a self, + // req is a template request used for the Append RPCs + req: AppendRequest, + mut messages: S, + ) -> impl Stream> + 'a + where + S: Stream> + Unpin + 'a, + { + let mut buf = BytesMut::new(); + + let resp = coroutines::coroutine(move |mut co| async move { + let mut attempt = 0; + + loop { + tokio::select! { + // Always start a new append request as soon as possible: either + // the previous one finished and there's buffered data, or we + // got our first message to send. + biased; + + // Append requests run one at a time, and we always try to start the + // next one as soon as we can. "Poll to retry" behavior of `[Client::append()]` + // is retained, except now there can be multiple `Ok` responses since + // we're chaining together more than 1 append request. + _ = async { + let append_buf = buf.split().freeze(); + let append_stream = self.append(req.clone(), || { + futures::stream::once({ + let append_buf = append_buf.clone(); + async move { Ok(append_buf) } + }) + }); + tokio::pin!(append_stream); + loop { + match append_stream.next().await { + Some(Ok(response)) => { + () = co.yield_(Ok(response)).await; + } + Some(Err(e)) => { + () = co.yield_(Err(e)).await; + } + None => break + } + } + }, if buf.len() > 0 => {} + + // So long as we have room in our buffer, eagerly read messages from + // the input stream and buffer them until they can be sent out with + // the next append. If we hit the buffer cap, apply backpressure by + // not consuming any more messages. + Some(msg) = messages.next(), if buf.len() < APPEND_BUFFER_LIMIT => { + match msg.serialize(buf.clone()).await { + Ok(new_buf) =>{ + attempt = 0; + buf = new_buf; + }, + Err(e) => { + () = co.yield_(Err(crate::RetryError { attempt, inner: crate::Error::AppendRead(e) })).await; + attempt += 1; + } + } + }, + } + } + }); + + resp + } +} + +#[async_trait] +pub trait FramedMessage: Send + Sync { + async fn serialize(self: Box, buf: BytesMut) -> std::io::Result; +} + +#[async_trait] +impl FramedMessage for T +where + Fut: Future> + Send, + T: FnOnce(BytesMut) -> Fut + Send + Sync + 'static, +{ + async fn serialize(self: Box, buf: BytesMut) -> std::io::Result { + (self)(buf).await + } +} diff --git a/crates/gazette/src/journal/mod.rs b/crates/gazette/src/journal/mod.rs index 727ca96012..8e45748365 100644 --- a/crates/gazette/src/journal/mod.rs +++ b/crates/gazette/src/journal/mod.rs @@ -3,9 +3,12 @@ use proto_gazette::broker; use tonic::transport::Channel; mod append; +mod append_stream; mod list; mod read; +pub use append_stream::FramedMessage; + mod read_json_lines; pub use read_json_lines::{ReadJsonLine, ReadJsonLines};