diff --git a/Cargo.lock b/Cargo.lock index 25de0124d4..f499b05b71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2685,6 +2685,7 @@ dependencies = [ "simd-doc", "thiserror", "tokio", + "tokio-stream", "tokio-util", "tonic", "tower 0.5.0", @@ -6656,9 +6657,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" dependencies = [ "futures-core", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index c6385b8f5a..f1a7b45858 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -186,6 +186,7 @@ tokio = { version = "1", features = [ "time", ] } tokio-util = { version = "0.7", features = ["io", "compat", "rt"] } +tokio-stream = { version = "0.1.17" } tonic = { version = "0.12", features = ["tls", "tls-roots"] } hyper-util = "0.1" tower = { version = "0.5", features = ["util"] } diff --git a/crates/gazette/Cargo.toml b/crates/gazette/Cargo.toml index 1045a9976e..a38010e772 100644 --- a/crates/gazette/Cargo.toml +++ b/crates/gazette/Cargo.toml @@ -12,7 +12,10 @@ license.workspace = true coroutines = { path = "../coroutines" } ops = { path = "../ops" } proto-gazette = { path = "../proto-gazette" } -proto-grpc = { path = "../proto-grpc", features = ["broker_client", "consumer_client"] } +proto-grpc = { path = "../proto-grpc", features = [ + "broker_client", + "consumer_client", +] } async-compression = { workspace = true } bytes = { workspace = true } @@ -25,6 +28,7 @@ reqwest = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } +tokio-stream = { workspace = true } tonic = { workspace = true } tower = { workspace = true } tracing = { workspace = true } @@ -44,4 +48,4 @@ hexdump = { workspace = true } # anyhow = { workspace = true } # serde_json = { workspace = true } # memchr = { workspace = true } -# pin-project-lite = { workspace = true } \ No newline at end of file +# pin-project-lite = { workspace = true } diff --git a/crates/gazette/src/journal/append.rs b/crates/gazette/src/journal/append.rs new file mode 100644 index 0000000000..3793b99b5b --- /dev/null +++ b/crates/gazette/src/journal/append.rs @@ -0,0 +1,186 @@ +use super::Client; +use crate::{journal::check_ok, Error}; +use futures::{Stream, StreamExt}; +use proto_gazette::broker; +use tokio::{ + io::{AsyncBufReadExt, AsyncRead, AsyncSeek, AsyncSeekExt, BufReader}, + pin, +}; + +// TODO: Tune this? +const CHUNK_SIZE: usize = 1 << 14; + +impl Client { + /// Helper function to appends the contents of `source` to the specified journal. + /// This drives a single append RPC cycle and propagates any errors. If you need + /// streaming input or retries, use `[append_stream]` instead. + pub async fn append_once( + &self, + journal: String, + source: Vec, + ) -> crate::Result { + let mapped_source = std::io::Cursor::new(source); + + let appender = self.append_stream(journal, mapped_source); + tokio::pin!(appender); + + match appender.next().await { + Some(Ok(resp)) => { + if let None = appender.next().await { + Ok(resp) + } else { + Err(Error::Append("Didn't get EOF after Ok".to_string())) + } + } + Some(err) => err, + None => Err(Error::UnexpectedEof), + } + } + + /// Append the contents of an `AsyncRead + AsyncSeek` to the specified journal. + /// Returns a Stream of results which will yield either: + /// - An AppendResponse after all data is successfully appended + /// - Errors for any failures encountered. + /// If polled after an error, restarts the request from the beginning. + pub fn append_stream( + &self, + journal: String, + source: R, + ) -> impl Stream> + '_ + where + R: AsyncRead + AsyncSeek + Send + Unpin + 'static, + { + coroutines::coroutine(move |mut co| async move { + let mut reader = BufReader::with_capacity(CHUNK_SIZE, source); + loop { + match self.append_all(&journal, &mut reader).await { + Ok(resp) => { + () = co.yield_(Ok(resp)).await; + return; + } + Err(err) => { + () = co.yield_(Err(err)).await; + // Polling after an error indicates the caller would like to retry, + // so seek back to the beginning and restart. + // Seeking shouldn't error unless there's a bug + reader.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + } + } + } + }) + } + + async fn append_all( + &self, + journal: &str, + source: &mut R, + ) -> crate::Result + where + R: AsyncBufReadExt + Send + Unpin, + { + // Transforms `source` into a stream of `Result`. This deals with + // the append RPC's semantics that require an initial "metadata" request, followed by a stream of + // "chunk" requests, followed by an empty request to indicate we're done. Potential errors ultimately + // originate from reading the input AsyncRead. + let request_generator = coroutines::coroutine(move |mut co| async move { + // Send initial request + () = co + .yield_(Ok(broker::AppendRequest { + journal: journal.to_string(), + ..Default::default() + })) + .await; + + loop { + // Process chunks until EOF + let bytes_read = match source.fill_buf().await { + // An empty buffer indicates EOF, as otherwise fill_buf() will wait until data is available + Ok(chunk) if chunk.len() == 0 => break, + Ok(chunk) => { + () = co + .yield_(Ok(broker::AppendRequest { + content: chunk.to_vec(), + ..Default::default() + })) + .await; + chunk.len() + } + Err(e) => { + () = co.yield_(Err(Error::Append(e.to_string()))).await; + return; + } + }; + + source.consume(bytes_read); + } + // Send final empty chunk + () = co + .yield_(Ok(broker::AppendRequest { + ..Default::default() + })) + .await; + }); + + // Since reading from `source` can error, we need this whole song and dance to + // handle those errors. We could just `.collect()` all of the requests and catch + // any errors there, but since this is supposed to handle significant volumes of data + // over an undefined period of time, that won't work. So instead we need to pass + // `JournalClient::append()` a stream of _just_ the `AppendRequest`s that come out + // of the above `request_generator`, while also promptly returning any errors if they + // crop up, and cancelling the append request. + + let (req_tx, req_rx) = tokio::sync::mpsc::channel(100); + + let mut client = self.into_sub(self.router.route(None, false, &self.default)?); + + // Run `JournalClient::append` in a separate Tokio task, and feed it a steady diet of `AppendRequest`s + // while also giving us a convenient handle to `.abort()` if we encounter an error. + let mut append_handle = tokio::spawn(async move { + let resp = client + .append(tokio_stream::wrappers::ReceiverStream::new(req_rx)) + .await + .map_err(crate::Error::Grpc)? + .into_inner(); + + check_ok(resp.status(), resp) + }); + + pin!(request_generator); + + loop { + tokio::select! { + maybe_item = request_generator.next() => { + match maybe_item { + Some(Ok(req)) => { + req_tx.send(req).await.map_err(|e|Error::Append(e.to_string()))?; + }, + Some(Err(e)) => { + // If `request_generator` errors, i.e we failed to read incoming data, + // cancel the `append` RPC and propagate the error + drop(req_tx); + append_handle.abort(); + return Err(e); + }, + None => { + // We hit EOF, drop the request channel sender which will close the + // `ReceiverStream` and signal `JournalClient::append` to finish up. + drop(req_tx); + break; + }, + } + }, + res = &mut append_handle => { + // Handle `JournalClient::append` finishing first. This will probably only happen + // if there's an error, as EOF breaks out and relies on the final `.await` to + // get the `AppendResponse` out. + return res.map_err(|e|Error::Append(e.to_string()))?; + }, + } + } + + // We hit EOF and now have to wait for `JournalClient::append` to finish + append_handle + .await + .map_err(|e| Error::Append(e.to_string()))? + } +} diff --git a/crates/gazette/src/journal/mod.rs b/crates/gazette/src/journal/mod.rs index 1f800ef7b2..cc3ba45b35 100644 --- a/crates/gazette/src/journal/mod.rs +++ b/crates/gazette/src/journal/mod.rs @@ -1,6 +1,7 @@ use proto_gazette::broker; use tonic::transport::Channel; +mod append; mod list; mod read; diff --git a/crates/gazette/src/lib.rs b/crates/gazette/src/lib.rs index 4a484e1fef..af3b2bdf08 100644 --- a/crates/gazette/src/lib.rs +++ b/crates/gazette/src/lib.rs @@ -35,6 +35,8 @@ pub enum Error { }, #[error("{0}")] Protocol(&'static str), + #[error("{0}")] + Append(String), #[error(transparent)] UUID(#[from] uuid::Error), #[error("unexpected server EOF")] @@ -71,6 +73,7 @@ impl Error { Error::Protocol(_) => false, Error::UUID(_) => false, Error::JWT(_) => false, + Error::Append(_) => false, } } }