diff --git a/crates/dekaf/src/log_appender.rs b/crates/dekaf/src/log_appender.rs index 5af906db28..5d22cbfa00 100644 --- a/crates/dekaf/src/log_appender.rs +++ b/crates/dekaf/src/log_appender.rs @@ -3,7 +3,7 @@ use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; use flow_client::fetch_task_authorization; -use futures::{StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use gazette::{ journal, uuid::{self, Producer}, @@ -14,6 +14,7 @@ use rand::Rng; use std::{ collections::{BTreeMap, VecDeque}, marker::PhantomData, + mem, sync::Arc, time::{Duration, SystemTime}, }; @@ -68,8 +69,18 @@ impl StatsAggregator { // This abstraction exists mostly in order to make testing easier. #[async_trait] pub trait TaskWriter: Send + Sync { - async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()>; - async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()>; + async fn append_logs( + &mut self, + log_data: impl Fn() -> S + Send + Sync, + ) -> anyhow::Result<()> + where + S: Stream> + Send + 'static; + async fn append_stats( + &mut self, + stat_data: impl Fn() -> S + Send + Sync, + ) -> anyhow::Result<()> + where + S: Stream> + Send + 'static; async fn set_task_name(&mut self, name: String) -> anyhow::Result<()>; } @@ -93,19 +104,28 @@ impl TaskWriter for GazetteWriter { Ok(()) } - async fn append_logs(&mut self, data: Bytes) -> anyhow::Result<()> { + async fn append_logs(&mut self, log_data: impl Fn() -> S + Send + Sync) -> anyhow::Result<()> + where + S: Stream> + Send + 'static, + { self.logs_appender .as_mut() .context("not initialized")? - .append(data) + .append(log_data) .await } - async fn append_stats(&mut self, data: Bytes) -> anyhow::Result<()> { + async fn append_stats( + &mut self, + stat_data: impl Fn() -> S + Send + Sync, + ) -> anyhow::Result<()> + where + S: Stream> + Send + 'static, + { self.stats_appender .as_mut() .context("not initialized")? - .append(data) + .append(stat_data) .await } } @@ -164,7 +184,10 @@ impl GazetteAppender { }) } - async fn append(&mut self, data: Bytes) -> anyhow::Result<()> { + async fn append(&mut self, data: impl Fn() -> S + Send + Sync) -> anyhow::Result<()> + where + S: Stream> + Send + 'static, + { if (self.exp - SystemTime::now()).whole_seconds() < 60 { self.refresh().await?; } @@ -174,52 +197,21 @@ impl GazetteAppender { journal: self.journal_name.clone(), ..Default::default() }, - || { - futures::stream::once({ - let value = data.clone(); - async move { Ok(value) } - }) - }, + data, ); tokio::pin!(resp); loop { match resp.try_next().await { Ok(_) => return Ok(()), - Err(RetryError { inner: err, .. }) - if matches!( - &err, - gazette::Error::Grpc(status) if status.code() == tonic::Code::DeadlineExceeded - ) => - { - tracing::warn!( - ?err, - "DeadlineExceeded error likely means that the data-plane access token has expired, but tokens get refreshed so this should never happen" - ); - - return Err(err.into()); - } - Err(RetryError { attempt, ref inner }) if inner.is_transient() && attempt < 3 => { - let wait_ms = rand::thread_rng().gen_range(400..5_000); - + Err(RetryError { attempt, ref inner }) if inner.is_transient() => { tracing::warn!( ?attempt, ?inner, - ?wait_ms, "Got recoverable error trying to write logs, retrying" ); - - tokio::time::sleep(Duration::from_millis(wait_ms)).await; continue; } - Err(err) if err.inner.is_transient() => { - tracing::warn!( - attempt=err.attempt, - inner=?err.inner, - "Got recoverable error multiple times while trying to write logs" - ); - return Err(err.inner.into()); - } Err(err) => { tracing::warn!(?err, "Got fatal error while trying to write logs"); return Err(err.inner.into()); @@ -349,9 +341,40 @@ impl TaskForwarder { // TODO(jshearer): Do we want to make this configurable? let mut stats_interval = tokio::time::interval(std::time::Duration::from_secs(30)); + let mut pending_logs = Vec::new(); loop { tokio::select! { + // We always want to start a new append before accumulating more log messages because in + // the extreme case where we're getting messages faster than we can store them, we don't want + // to end up with an infinitely growing buffer of `pending_logs`. + biased; + + Err(append_error) = Self::append_logs_to_writer( + &mut writer, + &mut pending_logs, + task_name.clone(), + uuid_producer.clone(), + ), if pending_logs.len() > 0 => { + tracing::error!(?append_error, "Error appending logs"); + } + + _ = stats_interval.tick() => { + // Take current stats and write if non-zero + if let Some(current_stats) = stats.take(){ + if let Err(append_error) = writer.append_stats(||{ + let serialized = Self::serialize_stats( + uuid_producer, + current_stats.clone(), + task_name.to_owned(), + ); + futures::stream::once(async move { Ok(serialized.clone().into()) }) + }).await { + tracing::error!(?append_error, "Error appending stats") + } + } + } + msg = event_stream.next() => { match msg { Some(TaskWriterMessage::SetTaskName(new_name)) => { @@ -369,9 +392,7 @@ impl TaskForwarder { } } - writer - .append_logs(Self::serialize_log(uuid_producer, log, task_name.to_owned()).into()) - .await?; + pending_logs.push(log); } Some(TaskWriterMessage::Stats((collection_name, new_stats))) => { stats.add(collection_name, new_stats); @@ -380,20 +401,24 @@ impl TaskForwarder { None => break, } }, - _ = stats_interval.tick() => { - // Take current stats and write if non-zero - if let Some(current_stats) = stats.take(){ - let data = Self::serialize_stats(uuid_producer, current_stats, task_name.to_owned()); - writer.append_stats(data.into()).await?; - } - } } } // Flush any remaining stats after stream ends if let Some(remaining_stats) = stats.take() { - let data = Self::serialize_stats(uuid_producer, remaining_stats, task_name); - writer.append_stats(data.into()).await?; + if let Err(append_error) = writer + .append_stats(|| { + let serialized = Self::serialize_stats( + uuid_producer, + remaining_stats.clone(), + task_name.to_owned(), + ); + futures::stream::once(async move { Ok(serialized.clone().into()) }) + }) + .await + { + tracing::error!(?append_error, "Error appending stats") + }; } Ok(()) @@ -403,7 +428,7 @@ impl TaskForwarder { producer: Producer, stats: BTreeMap, task_name: String, - ) -> Vec { + ) -> bytes::Bytes { let uuid = gazette::uuid::build( producer, gazette::uuid::Clock::from_time(std::time::SystemTime::now()), @@ -427,10 +452,10 @@ impl TaskForwarder { let mut buf = serde_json::to_vec(&stats_output).expect("Value always serializes"); buf.push(b'\n'); - buf + bytes::Bytes::from(buf) } - fn serialize_log(producer: Producer, mut log: ops::Log, task_name: String) -> Vec { + fn serialize_log(producer: Producer, mut log: ops::Log, task_name: String) -> bytes::Bytes { let uuid = gazette::uuid::build( producer, gazette::uuid::Clock::from_time(std::time::SystemTime::now()), @@ -445,7 +470,7 @@ impl TaskForwarder { let mut buf = serde_json::to_vec(&log).expect("Value always serializes"); buf.push(b'\n'); - buf + bytes::Bytes::from(buf) } pub fn set_task_name(&self, name: String) { @@ -499,6 +524,31 @@ impl TaskForwarder { } } } + + async fn append_logs_to_writer( + writer: &mut W, + pending_logs: &mut Vec, + task_name: String, + uuid_producer: Producer, + ) -> anyhow::Result<()> { + let logs_to_append = mem::take(pending_logs); + + writer + .append_logs(move || { + futures::stream::iter(logs_to_append.clone().into_iter().map({ + let value = task_name.clone(); + move |log| { + let serialized = TaskForwarder::::serialize_log( + uuid_producer.clone(), + log, + value.to_owned(), + ); + Ok(serialized) + } + })) + }) + .await + } } fn dekaf_shard_ref(task_name: String) -> ops::ShardRef { @@ -536,12 +586,34 @@ mod tests { Ok(()) } - async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()> { - self.logs.lock().await.push_back(log_data); + async fn append_logs( + &mut self, + log_data: impl Fn() -> S + Send + Sync, + ) -> anyhow::Result<()> + where + S: Stream> + Send + 'static, + { + let mut logs = self.logs.lock().await; + let mut stream = Box::pin(log_data()); + + while let Some(Ok(data)) = stream.next().await { + logs.push_back(data); + } Ok(()) } - async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()> { - self.stats.lock().await.push_back(log_data); + async fn append_stats( + &mut self, + stat_data: impl Fn() -> S + Send + Sync, + ) -> anyhow::Result<()> + where + S: Stream> + Send + 'static, + { + let mut stats = self.stats.lock().await; + let mut stream = Box::pin(stat_data()); + + while let Some(Ok(data)) = stream.next().await { + stats.push_back(data); + } Ok(()) } }