Skip to content

Commit

Permalink
dekaf: Implement automatic token refreshing
Browse files Browse the repository at this point in the history
I noticed that after roughly 1-2 hours, Dekaf would stop writing logs and stats. I tracked that down to an error appending logs, specifically:

```
Grpc(
  Status {
    code: DeadlineExceeded,
    message: "context deadline exceeded"
  }
)
```

It turns out that this is the error Gazette returns when the auth token you pass it is expired, and the appending machinery in Dekaf wasn't taking into account token expiry. So this commit refactors `GazetteWriter` to be composed of two `GazetteAppender`s, one for logs and one for stats. Each `GazetteAppender` is capable of refreshing its internal client when neccesary
  • Loading branch information
jshearer committed Feb 7, 2025
1 parent 08e3263 commit 88e456f
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 104 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ tokio = { workspace = true }
tokio-rustls = { workspace = true }
tokio-util = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tracing-record-hierarchical = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
Expand Down
215 changes: 138 additions & 77 deletions crates/dekaf/src/log_appender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,115 +67,110 @@ impl StatsAggregator {
// This abstraction exists mostly in order to make testing easier.
#[async_trait]
pub trait TaskWriter: Send + Sync {
async fn append_logs(&self, log_data: Bytes) -> anyhow::Result<()>;
async fn append_stats(&self, log_data: Bytes) -> anyhow::Result<()>;
async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()>;
async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()>;

async fn set_task_name(&mut self, name: String) -> anyhow::Result<()>;
}

#[derive(Clone)]
pub struct GazetteWriter {
app: Arc<App>,
logs_client: Option<journal::Client>,
stats_client: Option<journal::Client>,
logs_journal_name: Option<String>,
stats_journal_name: Option<String>,
logs_appender: Option<GazetteAppender>,
stats_appender: Option<GazetteAppender>,
task_name: Option<String>,
}

#[async_trait]
impl TaskWriter for GazetteWriter {
async fn set_task_name(&mut self, task_name: String) -> anyhow::Result<()> {
let (logs_client, stats_client, logs_journal, stats_journal) =
self.get_journal_client(task_name).await?;
self.logs_client.replace(logs_client);
self.stats_client.replace(stats_client);
self.logs_journal_name.replace(logs_journal);
self.stats_journal_name.replace(stats_journal);
let (logs_appender, stats_appender) = self.get_appenders(task_name.as_str()).await?;
self.logs_appender.replace(logs_appender);
self.stats_appender.replace(stats_appender);
self.task_name.replace(task_name);

Ok(())
}

async fn append_logs(&self, data: Bytes) -> anyhow::Result<()> {
Self::append(
self.logs_client.as_ref().context("not initialized")?,
data,
self.logs_journal_name
.as_ref()
.context("Writer is not initialized")?
.clone(),
)
.await
async fn append_logs(&mut self, data: Bytes) -> anyhow::Result<()> {
self.logs_appender
.as_mut()
.context("not initialized")?
.append(data)
.await
}

async fn append_stats(&self, data: Bytes) -> anyhow::Result<()> {
Self::append(
self.stats_client.as_ref().context("not initialized")?,
data,
self.stats_journal_name
.as_ref()
.context("Writer is not initialized")?
.clone(),
)
.await
async fn append_stats(&mut self, data: Bytes) -> anyhow::Result<()> {
self.stats_appender
.as_mut()
.context("not initialized")?
.append(data)
.await
}
}

impl GazetteWriter {
pub fn new(app: Arc<App>) -> Self {
Self {
app: app,
logs_client: None,
stats_client: None,
logs_journal_name: None,
stats_journal_name: None,
task_name: None,
logs_appender: None,
stats_appender: None,
}
}

async fn get_journal_client(
async fn get_appenders(
&self,
task_name: String,
) -> anyhow::Result<(journal::Client, journal::Client, String, String)> {
let (client, _claims, ops_logs, ops_stats, _task_spec) = fetch_dekaf_task_auth(
task_name: &str,
) -> anyhow::Result<(GazetteAppender, GazetteAppender)> {
let (_, _, ops_logs, ops_stats, _) = fetch_dekaf_task_auth(
self.app.client_base.clone(),
&task_name,
&self.app.data_plane_fqdn,
&self.app.data_plane_signer,
)
.await?;
Ok((
GazetteAppender::try_create(ops_logs, task_name.to_string(), self.app.clone()).await?,
GazetteAppender::try_create(ops_stats, task_name.to_string(), self.app.clone()).await?,
))
}
}

let template_id = dekaf_shard_template_id(task_name.as_str());

let (logs_client, stats_client) = tokio::try_join!(
fetch_task_authorization(
&client,
&template_id,
&self.app.data_plane_fqdn,
&self.app.data_plane_signer,
proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND,
gazette::broker::LabelSelector {
include: Some(labels::build_set([("name", ops_logs.as_str()),])),
exclude: None,
},
),
fetch_task_authorization(
&client,
&template_id,
&self.app.data_plane_fqdn,
&self.app.data_plane_signer,
proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND,
gazette::broker::LabelSelector {
include: Some(labels::build_set([("name", ops_stats.as_str()),])),
exclude: None,
},
)
)?;
#[derive(Clone)]
struct GazetteAppender {
client: journal::Client,
journal_name: String,
exp: time::OffsetDateTime,
app: Arc<App>,
task_name: String,
}

Ok((logs_client, stats_client, ops_logs, ops_stats))
impl GazetteAppender {
pub async fn try_create(
journal_name: String,
task_name: String,
app: Arc<App>,
) -> anyhow::Result<Self> {
let (client, exp) = Self::refresh_client(&task_name, &journal_name, app.clone()).await?;

Ok(Self {
client,
exp,
task_name,
journal_name,
app,
})
}

async fn append(client: &journal::Client, data: Bytes, journal: String) -> anyhow::Result<()> {
let resp = client.append(
async fn append(&mut self, data: Bytes) -> anyhow::Result<()> {
if (self.exp - SystemTime::now()).whole_seconds() < 60 {
self.refresh().await?;
}

let resp = self.client.append(
gazette::broker::AppendRequest {
journal,
journal: self.journal_name.clone(),
..Default::default()
},
|| {
Expand All @@ -187,26 +182,92 @@ impl GazetteWriter {
);

tokio::pin!(resp);

loop {
match resp.try_next().await {
Err(RetryError { attempt, inner }) if inner.is_transient() && attempt < 3 => {
Ok(_) => return Ok(()),
Err(RetryError { inner: err, .. })
if matches!(
&err,
gazette::Error::Grpc(status) if status.code() == tonic::Code::DeadlineExceeded
) =>
{
tracing::warn!(
?err,
"DeadlineExceeded error likely means that the data-plane access token has expired, but tokens get refreshed so this should never happen"
);

return Err(err.into());
}
Err(RetryError { attempt, ref inner }) if inner.is_transient() && attempt < 3 => {
let wait_ms = rand::thread_rng().gen_range(400..5_000);

tracing::warn!(
?attempt,
?inner,
?wait_ms,
"Got recoverable error trying to write logs, retrying"
);

tokio::time::sleep(Duration::from_millis(wait_ms)).await;
continue;
}
Err(err) => {
Err(err) if err.inner.is_transient() => {
tracing::warn!(
?err,
attempt=err.attempt,
inner=?err.inner,
"Got recoverable error multiple times while trying to write logs"
);
return Err(err.inner.into());
}
Ok(_) => return Ok(()),
Err(err) => {
tracing::warn!(?err, "Got fatal error while trying to write logs");
return Err(err.inner.into());
}
}
}
}

async fn refresh(&mut self) -> anyhow::Result<()> {
let (client, exp) =
Self::refresh_client(&self.task_name, &self.journal_name, self.app.clone()).await?;
self.client = client;
self.exp = exp;
Ok(())
}

async fn refresh_client(
task_name: &str,
journal_name: &str,
app: Arc<App>,
) -> anyhow::Result<(journal::Client, time::OffsetDateTime)> {
let base_client = app.client_base.clone();
let data_plane_fqdn = &app.data_plane_fqdn;
let signer = &app.data_plane_signer;

let template_id = dekaf_shard_template_id(task_name);

let (auth_client, _, _, _, _) =
fetch_dekaf_task_auth(base_client, template_id.as_str(), data_plane_fqdn, signer)
.await?;

let (new_client, new_claims) = fetch_task_authorization(
&auth_client,
&template_id,
data_plane_fqdn,
signer,
proto_flow::capability::AUTHORIZE | proto_gazette::capability::APPEND,
gazette::broker::LabelSelector {
include: Some(labels::build_set([("name", journal_name)])),
exclude: None,
},
)
.await?;

Ok((
new_client,
time::OffsetDateTime::UNIX_EPOCH + Duration::from_secs(new_claims.exp),
))
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -453,11 +514,11 @@ mod tests {
Ok(())
}

async fn append_logs(&self, log_data: Bytes) -> anyhow::Result<()> {
async fn append_logs(&mut self, log_data: Bytes) -> anyhow::Result<()> {
self.logs.lock().await.push_back(log_data);
Ok(())
}
async fn append_stats(&self, log_data: Bytes) -> anyhow::Result<()> {
async fn append_stats(&mut self, log_data: Bytes) -> anyhow::Result<()> {
self.stats.lock().await.push_back(log_data);
Ok(())
}
Expand Down
42 changes: 17 additions & 25 deletions crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ use crate::{
connector, dekaf_shard_template_id, utils, App, SessionAuthentication, TaskAuth, UserAuth,
};
use anyhow::{anyhow, bail, Context};
use flow_client::fetch_task_authorization;
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use futures::{StreamExt, TryStreamExt};
use gazette::{broker, journal, uuid};
use models::RawValue;
use proto_flow::flow;
use std::time::Duration;

impl UserAuth {
/// Fetch the names of all collections which the current user may read.
Expand Down Expand Up @@ -381,29 +379,23 @@ impl Collection {
Ok(journal_client)
}
SessionAuthentication::Task(task_auth) => {
let journal_client = tokio::time::timeout(
Duration::from_secs(30),
fetch_task_authorization(
&app.client_base,
&dekaf_shard_template_id(&task_auth.task_name),
&app.data_plane_fqdn,
&app.data_plane_signer,
proto_flow::capability::AUTHORIZE
| proto_gazette::capability::LIST
| proto_gazette::capability::READ,
gazette::broker::LabelSelector {
include: Some(labels::build_set([(
"name:prefix",
format!("{partition_template_name}/").as_str(),
)])),
exclude: None,
},
),
let (journal_client, _claims) = flow_client::fetch_task_authorization(
&app.client_base,
&dekaf_shard_template_id(&task_auth.task_name),
&app.data_plane_fqdn,
&app.data_plane_signer,
proto_flow::capability::AUTHORIZE
| proto_gazette::capability::LIST
| proto_gazette::capability::READ,
gazette::broker::LabelSelector {
include: Some(labels::build_set([(
"name:prefix",
format!("{partition_template_name}/").as_str(),
)])),
exclude: None,
},
)
.map_err(|e| {
anyhow::anyhow!("timed out building journal client for {collection_name}: {e}")
})
.await??;
.await?;

Ok(journal_client)
}
Expand Down
6 changes: 4 additions & 2 deletions crates/flow-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub async fn fetch_task_authorization(
data_plane_signer: &jsonwebtoken::EncodingKey,
capability: u32,
selector: gazette::broker::LabelSelector,
) -> anyhow::Result<gazette::journal::Client> {
) -> anyhow::Result<(gazette::journal::Client, proto_gazette::Claims)> {
let request_token = build_task_authorization_request_token(
shard_template_id,
data_plane_fqdn,
Expand Down Expand Up @@ -219,14 +219,16 @@ pub async fn fetch_task_authorization(

tracing::debug!(broker_address, "resolved task data-plane and authorization");

let parsed_claims: proto_gazette::Claims = parse_jwt_claims(&token)?;

let mut md = gazette::Metadata::default();
md.bearer_token(&token)?;

let journal_client = client
.journal_client
.with_endpoint_and_metadata(broker_address, md);

Ok(journal_client)
Ok((journal_client, parsed_claims))
}

pub fn build_task_authorization_request_token(
Expand Down

0 comments on commit 88e456f

Please sign in to comment.