diff --git a/Cargo.lock b/Cargo.lock index 0d77f8eb4b..66e1905a20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2030,6 +2030,7 @@ dependencies = [ "tokio-rustls 0.26.0", "tokio-stream", "tokio-util", + "tonic", "tower-http", "tracing", "tracing-record-hierarchical", diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 324bc9656a..8a820d3df4 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -68,6 +68,7 @@ tokio = { workspace = true } tokio-rustls = { workspace = true } tokio-util = { workspace = true } tokio-stream = { workspace = true } +tonic = { workspace = true } tracing-record-hierarchical = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } diff --git a/crates/dekaf/src/log_appender.rs b/crates/dekaf/src/log_appender.rs index bc317dd3f8..b0c5e1ca12 100644 --- a/crates/dekaf/src/log_appender.rs +++ b/crates/dekaf/src/log_appender.rs @@ -67,8 +67,8 @@ impl StatsAggregator { // This abstraction exists mostly in order to make testing easier. #[async_trait] pub trait TaskWriter: Send + Sync { - async fn append_logs(&self, log_data: Bytes) -> anyhow::Result<()>; - async fn append_stats(&self, log_data: Bytes) -> anyhow::Result<()>; + async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()>; + async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()>; async fn set_task_name(&mut self, name: String) -> anyhow::Result<()>; } @@ -76,46 +76,36 @@ pub trait TaskWriter: Send + Sync { #[derive(Clone)] pub struct GazetteWriter { app: Arc, - logs_client: Option, - stats_client: Option, - logs_journal_name: Option, - stats_journal_name: Option, + logs_appender: Option, + stats_appender: Option, + task_name: Option, } #[async_trait] impl TaskWriter for GazetteWriter { async fn set_task_name(&mut self, task_name: String) -> anyhow::Result<()> { - let (logs_client, stats_client, logs_journal, stats_journal) = - self.get_journal_client(task_name).await?; - self.logs_client.replace(logs_client); - self.stats_client.replace(stats_client); - self.logs_journal_name.replace(logs_journal); - self.stats_journal_name.replace(stats_journal); + let (logs_appender, stats_appender) = self.get_appenders(task_name.as_str()).await?; + self.logs_appender.replace(logs_appender); + self.stats_appender.replace(stats_appender); + self.task_name.replace(task_name); + Ok(()) } - async fn append_logs(&self, data: Bytes) -> anyhow::Result<()> { - Self::append( - self.logs_client.as_ref().context("not initialized")?, - data, - self.logs_journal_name - .as_ref() - .context("Writer is not initialized")? - .clone(), - ) - .await + async fn append_logs(&mut self, data: Bytes) -> anyhow::Result<()> { + self.logs_appender + .as_mut() + .context("not initialized")? + .append(data) + .await } - async fn append_stats(&self, data: Bytes) -> anyhow::Result<()> { - Self::append( - self.stats_client.as_ref().context("not initialized")?, - data, - self.stats_journal_name - .as_ref() - .context("Writer is not initialized")? - .clone(), - ) - .await + async fn append_stats(&mut self, data: Bytes) -> anyhow::Result<()> { + self.stats_appender + .as_mut() + .context("not initialized")? + .append(data) + .await } } @@ -123,59 +113,64 @@ impl GazetteWriter { pub fn new(app: Arc) -> Self { Self { app: app, - logs_client: None, - stats_client: None, - logs_journal_name: None, - stats_journal_name: None, + task_name: None, + logs_appender: None, + stats_appender: None, } } - async fn get_journal_client( + async fn get_appenders( &self, - task_name: String, - ) -> anyhow::Result<(journal::Client, journal::Client, String, String)> { - let (client, _claims, ops_logs, ops_stats, _task_spec) = fetch_dekaf_task_auth( + task_name: &str, + ) -> anyhow::Result<(GazetteAppender, GazetteAppender)> { + let (_, _, ops_logs, ops_stats, _) = fetch_dekaf_task_auth( self.app.client_base.clone(), &task_name, &self.app.data_plane_fqdn, &self.app.data_plane_signer, ) .await?; + Ok(( + GazetteAppender::try_create(ops_logs, task_name.to_string(), self.app.clone()).await?, + GazetteAppender::try_create(ops_stats, task_name.to_string(), self.app.clone()).await?, + )) + } +} - let template_id = dekaf_shard_template_id(task_name.as_str()); - - let (logs_client, stats_client) = tokio::try_join!( - fetch_task_authorization( - &client, - &template_id, - &self.app.data_plane_fqdn, - &self.app.data_plane_signer, - proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND, - gazette::broker::LabelSelector { - include: Some(labels::build_set([("name", ops_logs.as_str()),])), - exclude: None, - }, - ), - fetch_task_authorization( - &client, - &template_id, - &self.app.data_plane_fqdn, - &self.app.data_plane_signer, - proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND, - gazette::broker::LabelSelector { - include: Some(labels::build_set([("name", ops_stats.as_str()),])), - exclude: None, - }, - ) - )?; +#[derive(Clone)] +struct GazetteAppender { + client: journal::Client, + journal_name: String, + exp: time::OffsetDateTime, + app: Arc, + task_name: String, +} - Ok((logs_client, stats_client, ops_logs, ops_stats)) +impl GazetteAppender { + pub async fn try_create( + journal_name: String, + task_name: String, + app: Arc, + ) -> anyhow::Result { + let (client, exp) = Self::refresh_client(&task_name, &journal_name, app.clone()).await?; + + Ok(Self { + client, + exp, + task_name, + journal_name, + app, + }) } - async fn append(client: &journal::Client, data: Bytes, journal: String) -> anyhow::Result<()> { - let resp = client.append( + async fn append(&mut self, data: Bytes) -> anyhow::Result<()> { + if (self.exp - SystemTime::now()).whole_seconds() < 60 { + self.refresh().await?; + } + + let resp = self.client.append( gazette::broker::AppendRequest { - journal, + journal: self.journal_name.clone(), ..Default::default() }, || { @@ -187,26 +182,92 @@ impl GazetteWriter { ); tokio::pin!(resp); - loop { match resp.try_next().await { - Err(RetryError { attempt, inner }) if inner.is_transient() && attempt < 3 => { + Ok(_) => return Ok(()), + Err(RetryError { inner: err, .. }) + if matches!( + &err, + gazette::Error::Grpc(status) if status.code() == tonic::Code::DeadlineExceeded + ) => + { + tracing::warn!( + ?err, + "DeadlineExceeded error likely means that the data-plane access token has expired, but tokens get refreshed so this should never happen" + ); + + return Err(err.into()); + } + Err(RetryError { attempt, ref inner }) if inner.is_transient() && attempt < 3 => { let wait_ms = rand::thread_rng().gen_range(400..5_000); + tracing::warn!( + ?attempt, + ?inner, + ?wait_ms, + "Got recoverable error trying to write logs, retrying" + ); + tokio::time::sleep(Duration::from_millis(wait_ms)).await; continue; } - Err(err) => { + Err(err) if err.inner.is_transient() => { tracing::warn!( - ?err, + attempt=err.attempt, + inner=?err.inner, "Got recoverable error multiple times while trying to write logs" ); return Err(err.inner.into()); } - Ok(_) => return Ok(()), + Err(err) => { + tracing::warn!(?err, "Got fatal error while trying to write logs"); + return Err(err.inner.into()); + } } } } + + async fn refresh(&mut self) -> anyhow::Result<()> { + let (client, exp) = + Self::refresh_client(&self.task_name, &self.journal_name, self.app.clone()).await?; + self.client = client; + self.exp = exp; + Ok(()) + } + + async fn refresh_client( + task_name: &str, + journal_name: &str, + app: Arc, + ) -> anyhow::Result<(journal::Client, time::OffsetDateTime)> { + let base_client = app.client_base.clone(); + let data_plane_fqdn = &app.data_plane_fqdn; + let signer = &app.data_plane_signer; + + let template_id = dekaf_shard_template_id(task_name); + + let (auth_client, _, _, _, _) = + fetch_dekaf_task_auth(base_client, template_id.as_str(), data_plane_fqdn, signer) + .await?; + + let (new_client, new_claims) = fetch_task_authorization( + &auth_client, + &template_id, + data_plane_fqdn, + signer, + proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND, + gazette::broker::LabelSelector { + include: Some(labels::build_set([("name", journal_name)])), + exclude: None, + }, + ) + .await?; + + Ok(( + new_client, + time::OffsetDateTime::UNIX_EPOCH + Duration::from_secs(new_claims.exp), + )) + } } #[derive(Clone)] @@ -453,11 +514,11 @@ mod tests { Ok(()) } - async fn append_logs(&self, log_data: Bytes) -> anyhow::Result<()> { + async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()> { self.logs.lock().await.push_back(log_data); Ok(()) } - async fn append_stats(&self, log_data: Bytes) -> anyhow::Result<()> { + async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()> { self.stats.lock().await.push_back(log_data); Ok(()) } diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 6e2720b388..23c83b6a77 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -2,12 +2,10 @@ use crate::{ connector, dekaf_shard_template_id, utils, App, SessionAuthentication, TaskAuth, UserAuth, }; use anyhow::{anyhow, bail, Context}; -use flow_client::fetch_task_authorization; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use gazette::{broker, journal, uuid}; use models::RawValue; use proto_flow::flow; -use std::time::Duration; impl UserAuth { /// Fetch the names of all collections which the current user may read. @@ -381,29 +379,23 @@ impl Collection { Ok(journal_client) } SessionAuthentication::Task(task_auth) => { - let journal_client = tokio::time::timeout( - Duration::from_secs(30), - fetch_task_authorization( - &app.client_base, - &dekaf_shard_template_id(&task_auth.task_name), - &app.data_plane_fqdn, - &app.data_plane_signer, - proto_flow::capability::AUTHORIZE - | proto_gazette::capability::LIST - | proto_gazette::capability::READ, - gazette::broker::LabelSelector { - include: Some(labels::build_set([( - "name:prefix", - format!("{partition_template_name}/").as_str(), - )])), - exclude: None, - }, - ), + let (journal_client, _claims) = flow_client::fetch_task_authorization( + &app.client_base, + &dekaf_shard_template_id(&task_auth.task_name), + &app.data_plane_fqdn, + &app.data_plane_signer, + proto_flow::capability::AUTHORIZE + | proto_gazette::capability::LIST + | proto_gazette::capability::READ, + gazette::broker::LabelSelector { + include: Some(labels::build_set([( + "name:prefix", + format!("{partition_template_name}/").as_str(), + )])), + exclude: None, + }, ) - .map_err(|e| { - anyhow::anyhow!("timed out building journal client for {collection_name}: {e}") - }) - .await??; + .await?; Ok(journal_client) } diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index 04ac2634aa..85f179436c 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -183,7 +183,7 @@ pub async fn fetch_task_authorization( data_plane_signer: &jsonwebtoken::EncodingKey, capability: u32, selector: gazette::broker::LabelSelector, -) -> anyhow::Result { +) -> anyhow::Result<(gazette::journal::Client, proto_gazette::Claims)> { let request_token = build_task_authorization_request_token( shard_template_id, data_plane_fqdn, @@ -219,6 +219,8 @@ pub async fn fetch_task_authorization( tracing::debug!(broker_address, "resolved task data-plane and authorization"); + let parsed_claims: proto_gazette::Claims = parse_jwt_claims(&token)?; + let mut md = gazette::Metadata::default(); md.bearer_token(&token)?; @@ -226,7 +228,7 @@ pub async fn fetch_task_authorization( .journal_client .with_endpoint_and_metadata(broker_address, md); - Ok(journal_client) + Ok((journal_client, parsed_claims)) } pub fn build_task_authorization_request_token(