Skip to content

Commit

Permalink
dekaf: Add back Kafka healthcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Feb 19, 2025
1 parent 5d2b26f commit b575a43
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 45 deletions.
2 changes: 1 addition & 1 deletion crates/dekaf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod metrics_server;
pub mod registry;

mod api_client;
pub use api_client::KafkaApiClient;
pub use api_client::{KafkaApiClient, KafkaClientAuth};

use aes_siv::{aead::Aead, Aes256SivAead, KeyInit, KeySizeUser};
use flow_client::client::{refresh_authorizations, RefreshToken};
Expand Down
137 changes: 93 additions & 44 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ extern crate allocator;
use anyhow::{bail, Context};
use axum_server::tls_rustls::RustlsConfig;
use clap::{Args, Parser};
use dekaf::{log_appender::GazetteWriter, logging, Session};
use dekaf::{log_appender::GazetteWriter, logging, KafkaApiClient, KafkaClientAuth, Session};
use flow_client::{
DEFAULT_AGENT_URL, DEFAULT_DATA_PLANE_FQDN, DEFAULT_PG_PUBLIC_TOKEN, DEFAULT_PG_URL,
LOCAL_AGENT_URL, LOCAL_DATA_PLANE_FQDN, LOCAL_DATA_PLANE_HMAC, LOCAL_PG_PUBLIC_TOKEN,
Expand Down Expand Up @@ -136,9 +136,64 @@ struct TlsArgs {
certificate_key_file: Option<PathBuf>,
}

impl Cli {
fn build_broker_urls(&self) -> anyhow::Result<Vec<String>> {
self.default_broker_urls
.clone()
.into_iter()
.map(|url| {
{
let parsed = Url::parse(&url).expect("invalid broker URL {url}");
Ok::<_, anyhow::Error>(format!(
"tcp://{}:{}",
parsed.host().context(format!("invalid broker URL {url}"))?,
parsed.port().unwrap_or(9092)
))
}
.context(url)
})
.collect::<anyhow::Result<Vec<_>>>()
}

fn build_LEGACY_broker_urls(&self) -> anyhow::Result<Vec<String>> {
self.legacy_mode_broker_urls
.clone()
.into_iter()
.map(|url| {
{
let parsed = Url::parse(&url).expect("invalid broker URL {url}");
Ok::<_, anyhow::Error>(format!(
"tcp://{}:{}",
parsed.host().context(format!("invalid broker URL {url}"))?,
parsed.port().unwrap_or(9092)
))
}
.context(url)
})
.collect::<anyhow::Result<Vec<_>>>()
}
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::install();

let cli = Cli::parse();

rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();

tracing::info!("Starting dekaf");

let upstream_kafka_urls = cli.build_broker_urls()?;

// ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------
let legacy_mode_kafka_urls = cli.build_LEGACY_broker_urls()?;
// ------------------------------------------------------------------------------------------------

test_kafka(&cli).await?;

let (api_endpoint, api_key) = if cli.local {
(LOCAL_PG_URL.to_owned(), LOCAL_PG_PUBLIC_TOKEN.to_string())
} else {
Expand All @@ -156,49 +211,6 @@ async fn main() -> anyhow::Result<()> {
client_base: flow_client::Client::new(cli.agent_endpoint, api_key, api_endpoint, None),
});

logging::install();

tracing::info!("Starting dekaf");

rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();

let upstream_kafka_urls = cli
.default_broker_urls
.clone()
.into_iter()
.map(|url| {
{
let parsed = Url::parse(&url).expect("invalid broker URL {url}");
Ok::<_, anyhow::Error>(format!(
"tcp://{}:{}",
parsed.host().context(format!("invalid broker URL {url}"))?,
parsed.port().unwrap_or(9092)
))
}
.context(url)
})
.collect::<anyhow::Result<Vec<_>>>()?;

// ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------
let legacy_mode_kafka_urls = cli
.default_broker_urls
.into_iter()
.map(|url| {
{
let parsed = Url::parse(&url).expect("invalid broker URL {url}");
Ok::<_, anyhow::Error>(format!(
"tcp://{}:{}",
parsed.host().context(format!("invalid broker URL {url}"))?,
parsed.port().unwrap_or(9092)
))
}
.context(url)
})
.collect::<anyhow::Result<Vec<_>>>()?;
// ------------------------------------------------------------------------------------------------

let mut stop = async {
tokio::signal::ctrl_c()
.await
Expand Down Expand Up @@ -426,3 +438,40 @@ fn validate_certificate_name(
}
return Ok(false);
}

#[tracing::instrument(skip(cli))]
async fn test_kafka(cli: &Cli) -> anyhow::Result<()> {
let iam_creds = KafkaClientAuth::MSK {
aws_region: cli.default_broker_msk_region.clone(),
provider: aws_config::from_env()
.region(aws_types::region::Region::new(
cli.default_broker_msk_region.clone(),
))
.load()
.await
.credentials_provider()
.unwrap(),
cached: None,
};
let user_pass_creds =
KafkaClientAuth::NonRefreshing(rsasl::config::SASLConfig::with_credentials(
None,
cli.legacy_mode_broker_username.clone(),
cli.legacy_mode_broker_password.clone(),
)?);

let broker_urls = cli.build_broker_urls()?;
let legacy_broker_urls = cli.build_LEGACY_broker_urls()?;

let (iam_client, legacy_client) = tokio::join!(
KafkaApiClient::connect(broker_urls.as_slice(), iam_creds),
KafkaApiClient::connect(legacy_broker_urls.as_slice(), user_pass_creds)
);

iam_client?;
legacy_client?;

tracing::info!("Successfully connected to upstream kafka");

Ok(())
}

0 comments on commit b575a43

Please sign in to comment.