From 092a21a89eca27e8e83ab3aa6132ba28c6eb04fa Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Thu, 30 Jan 2025 19:03:09 -0500 Subject: [PATCH] dekaf: Upstream kafka fallback and per-connection-type routing Part of dekaf: Improvements to handle higher scale #1876, we want to implement broker fallback so Dekaf can connect to any of the brokers in the cluster if one doesn't respond. An improvement here would be to periodically fetch the metadata from at least one of the responding brokers and update this list of addresses so that future sessions can know about/use any newly created members of the cluster. I don't anticipate changing the topology of our cluster that frequently, and if we do then updating Dekaf's deployment configs isn't that big of a deal. I may eat my hat on this, we'll see. In addition, we want to move people over to the new MSK cluster, so this implements routing new-style connections to a separate cluster with separate credentials. --- crates/dekaf/src/api_client.rs | 69 +++++++++++++++++++++--------- crates/dekaf/src/main.rs | 76 +++++++++++++++++++++++++++------- crates/dekaf/src/session.rs | 39 +++++++++++++---- 3 files changed, 143 insertions(+), 41 deletions(-) diff --git a/crates/dekaf/src/api_client.rs b/crates/dekaf/src/api_client.rs index 03d5f385f4..4b4dd5cae5 100644 --- a/crates/dekaf/src/api_client.rs +++ b/crates/dekaf/src/api_client.rs @@ -288,10 +288,57 @@ pub struct KafkaApiClient { } impl KafkaApiClient { + #[instrument(name = "api_client_connect", skip(sasl_config))] + pub async fn connect( + broker_urls: &[String], + sasl_config: Arc, + ) -> anyhow::Result { + tracing::debug!("Attempting to establish new connection"); + + for url in broker_urls { + match Self::try_connect(url, sasl_config.clone()).await { + Ok(client) => return Ok(client), + Err(e) => { + let error = e.context(format!("Failed to connect to {}", url)); + tracing::warn!(?error, "Connection attempt failed"); + } + } + } + + anyhow::bail!( + "Failed to connect to any Kafka brokers. Attempted {} brokers", + broker_urls.len() + ) + } + + /// Attempt to open a connection to a specific broker address + async fn try_connect(url: &str, sasl_config: Arc) -> anyhow::Result { + let mut conn = async_connect(url) + .await + .context("Failed to establish TCP connection")?; + + tracing::debug!("Authenticating connection"); + sasl_auth(&mut conn, url, sasl_config.clone()) + .await + .context("SASL authentication failed")?; + + let versions = get_versions(&mut conn) + .await + .context("Failed to negotiate protocol versions")?; + + Ok(Self { + conn, + url: url.to_string(), + sasl_config, + versions, + clients: HashMap::new(), + }) + } + /// Returns a [`KafkaApiClient`] for the given broker URL. /// If a client for that broker already exists, return it /// rather than creating a new one. - pub async fn connect_to(&mut self, broker_url: &str) -> anyhow::Result<&mut Self> { + async fn connect_to(&mut self, broker_url: &str) -> anyhow::Result<&mut Self> { if broker_url.eq(self.url.as_str()) { return Ok(self); } @@ -299,7 +346,7 @@ impl KafkaApiClient { if let std::collections::hash_map::Entry::Vacant(entry) = self.clients.entry(broker_url.to_string()) { - let new_client = Self::connect(broker_url, self.sasl_config.clone()).await?; + let new_client = Self::try_connect(broker_url, self.sasl_config.clone()).await?; entry.insert(new_client); } @@ -310,24 +357,6 @@ impl KafkaApiClient { .expect("guarinteed to be present")) } - #[instrument(name = "api_client_connect", skip(sasl_config))] - pub async fn connect(broker_url: &str, sasl_config: Arc) -> anyhow::Result { - tracing::debug!("Attempting to establish a new connection!"); - let mut conn = async_connect(broker_url).await?; - tracing::debug!("Authenticating opened connection"); - sasl_auth(&mut conn, broker_url, sasl_config.clone()).await?; - - let versions = get_versions(&mut conn).await?; - - Ok(Self { - conn, - url: broker_url.to_string(), - sasl_config, - versions, - clients: HashMap::new(), - }) - } - /// Send a request and wait for the response. Per Kafka wire protocol docs: /// The server guarantees that on a single TCP connection, requests will be processed in the order /// they are sent and responses will return in that order as well. The broker's request processing diff --git a/crates/dekaf/src/main.rs b/crates/dekaf/src/main.rs index 9c9742358b..7b4c1b48ed 100644 --- a/crates/dekaf/src/main.rs +++ b/crates/dekaf/src/main.rs @@ -68,12 +68,9 @@ pub struct Cli { #[arg(long, default_value = "9094", env = "METRICS_PORT")] metrics_port: u16, - /// The hostname of the default Kafka broker to use for serving group management APIs - #[arg(long, env = "DEFAULT_BROKER_HOSTNAME")] - default_broker_hostname: String, - /// The port of the default Kafka broker to use for serving group management APIs - #[arg(long, default_value = "9092", env = "DEFAULT_BROKER_PORT")] - default_broker_port: u16, + /// List of Kafka broker URLs to try connecting to for group management APIs + #[arg(long, env = "DEFAULT_BROKER_URLS")] + default_broker_urls: Vec, /// The username for the default Kafka broker to use for serving group management APIs. /// Currently only supports SASL PLAIN username/password auth. #[arg(long, env = "DEFAULT_BROKER_USERNAME")] @@ -82,6 +79,19 @@ pub struct Cli { #[arg(long, env = "DEFAULT_BROKER_PASSWORD")] default_broker_password: String, + // ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------ + /// Brokers to use for connections using the legacy refresh-token based connection mode + #[arg(long, env = "LEGACY_MODE_BROKER_URLS")] + legacy_mode_broker_urls: Vec, + /// The username for the Kafka broker to use for serving group management APIs for connections + /// using the legacy refresh-token based connection mode + #[arg(long, env = "LEGACY_MODE_BROKER_USERNAME")] + legacy_mode_broker_username: String, + /// The password for the Kafka broker to use for serving group management API for connections + /// using the legacy refresh-token based connection modes + #[arg(long, env = "LEGACY_MODE_BROKER_PASSWORD")] + legacy_mode_broker_password: String, + // ------------------------------------------------------------------------------------------------ /// The secret used to encrypt/decrypt potentially sensitive strings when sending them /// to the upstream Kafka broker, e.g topic names in group management metadata. #[arg(long, env = "ENCRYPTION_SECRET")] @@ -158,10 +168,40 @@ async fn main() -> anyhow::Result<()> { .install_default() .unwrap(); - let upstream_kafka_host = format!( - "tcp://{}:{}", - cli.default_broker_hostname, cli.default_broker_port - ); + let upstream_kafka_urls = cli + .default_broker_urls + .clone() + .into_iter() + .map(|url| { + { + let parsed = Url::parse(&url).expect("invalid broker URL"); + Ok::<_, anyhow::Error>(format!( + "tcp://{}:{}", + parsed.host().context("invalid broker URL")?, + parsed.port().unwrap_or(9092) + )) + } + .context(url) + }) + .collect::>>()?; + + // ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------ + let legacy_mode_kafka_urls = cli + .default_broker_urls + .into_iter() + .map(|url| { + { + let parsed = Url::parse(&url).expect("invalid broker URL"); + Ok::<_, anyhow::Error>(format!( + "tcp://{}:{}", + parsed.host().context("invalid broker URL")?, + parsed.port().unwrap_or(9092) + )) + } + .context(url) + }) + .collect::>>()?; + // ------------------------------------------------------------------------------------------------ let mut stop = async { tokio::signal::ctrl_c() @@ -186,6 +226,8 @@ async fn main() -> anyhow::Result<()> { let broker_username = cli.default_broker_username.as_str(); let broker_password = cli.default_broker_password.as_str(); + let legacy_broker_username = cli.legacy_mode_broker_username.as_str(); + let legacy_broker_password = cli.legacy_mode_broker_password.as_str(); if let Some(tls_cfg) = cli.tls { let axum_rustls_config = RustlsConfig::from_pem_file( tls_cfg.certificate_file.clone().unwrap(), @@ -240,9 +282,12 @@ async fn main() -> anyhow::Result<()> { Session::new( app.clone(), cli.encryption_secret.to_owned(), - upstream_kafka_host.to_string(), + upstream_kafka_urls.clone(), broker_username.to_string(), - broker_password.to_string() + broker_password.to_string(), + legacy_mode_kafka_urls.clone(), + legacy_broker_username.to_string(), + legacy_broker_password.to_string() ), socket, addr, @@ -278,9 +323,12 @@ async fn main() -> anyhow::Result<()> { Session::new( app.clone(), cli.encryption_secret.to_owned(), - upstream_kafka_host.to_string(), + upstream_kafka_urls.clone(), broker_username.to_string(), - broker_password.to_string() + broker_password.to_string(), + legacy_mode_kafka_urls.clone(), + legacy_broker_username.to_string(), + legacy_broker_password.to_string() ), socket, addr, diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 3f8dce8469..09ea403f84 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -45,25 +45,37 @@ pub struct Session { secret: String, auth: Option, data_preview_state: SessionDataPreviewState, - broker_url: String, + broker_urls: Vec, broker_username: String, broker_password: String, + + // ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------ + legacy_mode_broker_urls: Vec, + legacy_mode_broker_username: String, + legacy_mode_broker_password: String, + // ------------------------------------------------------------------------------------------------ } impl Session { pub fn new( app: Arc, secret: String, - broker_url: String, + broker_urls: Vec, broker_username: String, broker_password: String, + legacy_mode_broker_urls: Vec, + legacy_mode_broker_username: String, + legacy_mode_broker_password: String, ) -> Self { Self { app, client: None, - broker_url, + broker_urls, broker_username, broker_password, + legacy_mode_broker_urls, + legacy_mode_broker_username, + legacy_mode_broker_password, reads: HashMap::new(), auth: None, secret, @@ -75,19 +87,32 @@ impl Session { if let Some(ref mut client) = self.client { Ok(client) } else { + let (urls, username, password) = match self.auth { + Some(SessionAuthentication::Task(_)) => ( + self.broker_urls.as_slice(), + self.broker_username.clone(), + self.broker_password.clone(), + ), + Some(SessionAuthentication::User(_)) => ( + self.legacy_mode_broker_urls.as_slice(), + self.legacy_mode_broker_username.clone(), + self.legacy_mode_broker_password.clone(), + ), + None => anyhow::bail!("Must be authenticated"), + }; self.client.replace( KafkaApiClient::connect( - &self.broker_url, + urls, rsasl::config::SASLConfig::with_credentials( None, - self.broker_username.clone(), - self.broker_password.clone(), + username, + password, )?, ).await.context( "failed to connect or authenticate to upstream Kafka broker used for serving group management APIs", )? ); - Ok(self.client.as_mut().expect("guarinteed to exist")) + Ok(self.client.as_mut().expect("guaranteed to exist")) } }