From 88e456fe0c966faabf1fb9f26ffde515d52efee4 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 7 Feb 2025 12:30:20 -0500 Subject: [PATCH] dekaf: Implement automatic token refreshing I noticed that after roughly 1-2 hours, Dekaf would stop writing logs and stats. I tracked that down to an error appending logs, specifically: ``` Grpc( Status { code: DeadlineExceeded, message: "context deadline exceeded" } ) ``` It turns out that this is the error Gazette returns when the auth token you pass it is expired, and the appending machinery in Dekaf wasn't taking into account token expiry. So this commit refactors `GazetteWriter` to be composed of two `GazetteAppender`s, one for logs and one for stats. Each `GazetteAppender` is capable of refreshing its internal client when neccesary --- Cargo.lock | 1 + crates/dekaf/Cargo.toml | 1 + crates/dekaf/src/log_appender.rs | 215 ++++++++++++++++++++----------- crates/dekaf/src/topology.rs | 42 +++--- crates/flow-client/src/client.rs | 6 +- 5 files changed, 161 insertions(+), 104 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0d77f8eb4b..66e1905a20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2030,6 +2030,7 @@ dependencies = [ "tokio-rustls 0.26.0", "tokio-stream", "tokio-util", + "tonic", "tower-http", "tracing", "tracing-record-hierarchical", diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 324bc9656a..8a820d3df4 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -68,6 +68,7 @@ tokio = { workspace = true } tokio-rustls = { workspace = true } tokio-util = { workspace = true } tokio-stream = { workspace = true } +tonic = { workspace = true } tracing-record-hierarchical = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } diff --git a/crates/dekaf/src/log_appender.rs b/crates/dekaf/src/log_appender.rs index bc317dd3f8..b0c5e1ca12 100644 --- a/crates/dekaf/src/log_appender.rs +++ b/crates/dekaf/src/log_appender.rs @@ -67,8 +67,8 @@ impl StatsAggregator { // This abstraction exists mostly in order to make testing easier. #[async_trait] pub trait TaskWriter: Send + Sync { - async fn append_logs(&self, log_data: Bytes) -> anyhow::Result<()>; - async fn append_stats(&self, log_data: Bytes) -> anyhow::Result<()>; + async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()>; + async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()>; async fn set_task_name(&mut self, name: String) -> anyhow::Result<()>; } @@ -76,46 +76,36 @@ pub trait TaskWriter: Send + Sync { #[derive(Clone)] pub struct GazetteWriter { app: Arc, - logs_client: Option, - stats_client: Option, - logs_journal_name: Option, - stats_journal_name: Option, + logs_appender: Option, + stats_appender: Option, + task_name: Option, } #[async_trait] impl TaskWriter for GazetteWriter { async fn set_task_name(&mut self, task_name: String) -> anyhow::Result<()> { - let (logs_client, stats_client, logs_journal, stats_journal) = - self.get_journal_client(task_name).await?; - self.logs_client.replace(logs_client); - self.stats_client.replace(stats_client); - self.logs_journal_name.replace(logs_journal); - self.stats_journal_name.replace(stats_journal); + let (logs_appender, stats_appender) = self.get_appenders(task_name.as_str()).await?; + self.logs_appender.replace(logs_appender); + self.stats_appender.replace(stats_appender); + self.task_name.replace(task_name); + Ok(()) } - async fn append_logs(&self, data: Bytes) -> anyhow::Result<()> { - Self::append( - self.logs_client.as_ref().context("not initialized")?, - data, - self.logs_journal_name - .as_ref() - .context("Writer is not initialized")? - .clone(), - ) - .await + async fn append_logs(&mut self, data: Bytes) -> anyhow::Result<()> { + self.logs_appender + .as_mut() + .context("not initialized")? + .append(data) + .await } - async fn append_stats(&self, data: Bytes) -> anyhow::Result<()> { - Self::append( - self.stats_client.as_ref().context("not initialized")?, - data, - self.stats_journal_name - .as_ref() - .context("Writer is not initialized")? - .clone(), - ) - .await + async fn append_stats(&mut self, data: Bytes) -> anyhow::Result<()> { + self.stats_appender + .as_mut() + .context("not initialized")? + .append(data) + .await } } @@ -123,59 +113,64 @@ impl GazetteWriter { pub fn new(app: Arc) -> Self { Self { app: app, - logs_client: None, - stats_client: None, - logs_journal_name: None, - stats_journal_name: None, + task_name: None, + logs_appender: None, + stats_appender: None, } } - async fn get_journal_client( + async fn get_appenders( &self, - task_name: String, - ) -> anyhow::Result<(journal::Client, journal::Client, String, String)> { - let (client, _claims, ops_logs, ops_stats, _task_spec) = fetch_dekaf_task_auth( + task_name: &str, + ) -> anyhow::Result<(GazetteAppender, GazetteAppender)> { + let (_, _, ops_logs, ops_stats, _) = fetch_dekaf_task_auth( self.app.client_base.clone(), &task_name, &self.app.data_plane_fqdn, &self.app.data_plane_signer, ) .await?; + Ok(( + GazetteAppender::try_create(ops_logs, task_name.to_string(), self.app.clone()).await?, + GazetteAppender::try_create(ops_stats, task_name.to_string(), self.app.clone()).await?, + )) + } +} - let template_id = dekaf_shard_template_id(task_name.as_str()); - - let (logs_client, stats_client) = tokio::try_join!( - fetch_task_authorization( - &client, - &template_id, - &self.app.data_plane_fqdn, - &self.app.data_plane_signer, - proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND, - gazette::broker::LabelSelector { - include: Some(labels::build_set([("name", ops_logs.as_str()),])), - exclude: None, - }, - ), - fetch_task_authorization( - &client, - &template_id, - &self.app.data_plane_fqdn, - &self.app.data_plane_signer, - proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND, - gazette::broker::LabelSelector { - include: Some(labels::build_set([("name", ops_stats.as_str()),])), - exclude: None, - }, - ) - )?; +#[derive(Clone)] +struct GazetteAppender { + client: journal::Client, + journal_name: String, + exp: time::OffsetDateTime, + app: Arc, + task_name: String, +} - Ok((logs_client, stats_client, ops_logs, ops_stats)) +impl GazetteAppender { + pub async fn try_create( + journal_name: String, + task_name: String, + app: Arc, + ) -> anyhow::Result { + let (client, exp) = Self::refresh_client(&task_name, &journal_name, app.clone()).await?; + + Ok(Self { + client, + exp, + task_name, + journal_name, + app, + }) } - async fn append(client: &journal::Client, data: Bytes, journal: String) -> anyhow::Result<()> { - let resp = client.append( + async fn append(&mut self, data: Bytes) -> anyhow::Result<()> { + if (self.exp - SystemTime::now()).whole_seconds() < 60 { + self.refresh().await?; + } + + let resp = self.client.append( gazette::broker::AppendRequest { - journal, + journal: self.journal_name.clone(), ..Default::default() }, || { @@ -187,26 +182,92 @@ impl GazetteWriter { ); tokio::pin!(resp); - loop { match resp.try_next().await { - Err(RetryError { attempt, inner }) if inner.is_transient() && attempt < 3 => { + Ok(_) => return Ok(()), + Err(RetryError { inner: err, .. }) + if matches!( + &err, + gazette::Error::Grpc(status) if status.code() == tonic::Code::DeadlineExceeded + ) => + { + tracing::warn!( + ?err, + "DeadlineExceeded error likely means that the data-plane access token has expired, but tokens get refreshed so this should never happen" + ); + + return Err(err.into()); + } + Err(RetryError { attempt, ref inner }) if inner.is_transient() && attempt < 3 => { let wait_ms = rand::thread_rng().gen_range(400..5_000); + tracing::warn!( + ?attempt, + ?inner, + ?wait_ms, + "Got recoverable error trying to write logs, retrying" + ); + tokio::time::sleep(Duration::from_millis(wait_ms)).await; continue; } - Err(err) => { + Err(err) if err.inner.is_transient() => { tracing::warn!( - ?err, + attempt=err.attempt, + inner=?err.inner, "Got recoverable error multiple times while trying to write logs" ); return Err(err.inner.into()); } - Ok(_) => return Ok(()), + Err(err) => { + tracing::warn!(?err, "Got fatal error while trying to write logs"); + return Err(err.inner.into()); + } } } } + + async fn refresh(&mut self) -> anyhow::Result<()> { + let (client, exp) = + Self::refresh_client(&self.task_name, &self.journal_name, self.app.clone()).await?; + self.client = client; + self.exp = exp; + Ok(()) + } + + async fn refresh_client( + task_name: &str, + journal_name: &str, + app: Arc, + ) -> anyhow::Result<(journal::Client, time::OffsetDateTime)> { + let base_client = app.client_base.clone(); + let data_plane_fqdn = &app.data_plane_fqdn; + let signer = &app.data_plane_signer; + + let template_id = dekaf_shard_template_id(task_name); + + let (auth_client, _, _, _, _) = + fetch_dekaf_task_auth(base_client, template_id.as_str(), data_plane_fqdn, signer) + .await?; + + let (new_client, new_claims) = fetch_task_authorization( + &auth_client, + &template_id, + data_plane_fqdn, + signer, + proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND, + gazette::broker::LabelSelector { + include: Some(labels::build_set([("name", journal_name)])), + exclude: None, + }, + ) + .await?; + + Ok(( + new_client, + time::OffsetDateTime::UNIX_EPOCH + Duration::from_secs(new_claims.exp), + )) + } } #[derive(Clone)] @@ -453,11 +514,11 @@ mod tests { Ok(()) } - async fn append_logs(&self, log_data: Bytes) -> anyhow::Result<()> { + async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()> { self.logs.lock().await.push_back(log_data); Ok(()) } - async fn append_stats(&self, log_data: Bytes) -> anyhow::Result<()> { + async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()> { self.stats.lock().await.push_back(log_data); Ok(()) } diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 6e2720b388..23c83b6a77 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -2,12 +2,10 @@ use crate::{ connector, dekaf_shard_template_id, utils, App, SessionAuthentication, TaskAuth, UserAuth, }; use anyhow::{anyhow, bail, Context}; -use flow_client::fetch_task_authorization; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use gazette::{broker, journal, uuid}; use models::RawValue; use proto_flow::flow; -use std::time::Duration; impl UserAuth { /// Fetch the names of all collections which the current user may read. @@ -381,29 +379,23 @@ impl Collection { Ok(journal_client) } SessionAuthentication::Task(task_auth) => { - let journal_client = tokio::time::timeout( - Duration::from_secs(30), - fetch_task_authorization( - &app.client_base, - &dekaf_shard_template_id(&task_auth.task_name), - &app.data_plane_fqdn, - &app.data_plane_signer, - proto_flow::capability::AUTHORIZE - | proto_gazette::capability::LIST - | proto_gazette::capability::READ, - gazette::broker::LabelSelector { - include: Some(labels::build_set([( - "name:prefix", - format!("{partition_template_name}/").as_str(), - )])), - exclude: None, - }, - ), + let (journal_client, _claims) = flow_client::fetch_task_authorization( + &app.client_base, + &dekaf_shard_template_id(&task_auth.task_name), + &app.data_plane_fqdn, + &app.data_plane_signer, + proto_flow::capability::AUTHORIZE + | proto_gazette::capability::LIST + | proto_gazette::capability::READ, + gazette::broker::LabelSelector { + include: Some(labels::build_set([( + "name:prefix", + format!("{partition_template_name}/").as_str(), + )])), + exclude: None, + }, ) - .map_err(|e| { - anyhow::anyhow!("timed out building journal client for {collection_name}: {e}") - }) - .await??; + .await?; Ok(journal_client) } diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index 04ac2634aa..85f179436c 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -183,7 +183,7 @@ pub async fn fetch_task_authorization( data_plane_signer: &jsonwebtoken::EncodingKey, capability: u32, selector: gazette::broker::LabelSelector, -) -> anyhow::Result { +) -> anyhow::Result<(gazette::journal::Client, proto_gazette::Claims)> { let request_token = build_task_authorization_request_token( shard_template_id, data_plane_fqdn, @@ -219,6 +219,8 @@ pub async fn fetch_task_authorization( tracing::debug!(broker_address, "resolved task data-plane and authorization"); + let parsed_claims: proto_gazette::Claims = parse_jwt_claims(&token)?; + let mut md = gazette::Metadata::default(); md.bearer_token(&token)?; @@ -226,7 +228,7 @@ pub async fn fetch_task_authorization( .journal_client .with_endpoint_and_metadata(broker_address, md); - Ok(journal_client) + Ok((journal_client, parsed_claims)) } pub fn build_task_authorization_request_token(