Skip to content

Commit

Permalink
dekaf: Upstream kafka fallback and per-connection-type routing
Browse files Browse the repository at this point in the history
Part of dekaf: Improvements to handle higher scale #1876, we want to implement broker fallback so Dekaf can connect to any of the brokers in the cluster if one doesn't respond. An improvement here would be to periodically fetch the metadata from at least one of the responding brokers and update this list of addresses so that future sessions can know about/use any newly created members of the cluster. I don't anticipate changing the topology of our cluster that frequently, and if we do then updating Dekaf's deployment configs isn't that big of a deal. I may eat my hat on this, we'll see.

In addition, we want to move people over to the new MSK cluster, so this implements routing new-style connections to a separate cluster with separate credentials.
  • Loading branch information
jshearer committed Feb 6, 2025
1 parent 3462798 commit dc394c4
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 46 deletions.
78 changes: 53 additions & 25 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,64 @@ pub struct KafkaApiClient {
}

impl KafkaApiClient {
/// Returns a [`KafkaApiClient`] for the given broker URL.
/// If a client for that broker already exists, return it
/// rather than creating a new one.
pub async fn connect_to(&mut self, broker_url: &str) -> anyhow::Result<&mut Self> {
#[instrument(name = "api_client_connect", skip(sasl_config))]
pub async fn connect(
broker_urls: &[String],
sasl_config: Arc<SASLConfig>,
) -> anyhow::Result<Self> {
tracing::debug!("Attempting to establish new connection");

for url in broker_urls {
match Self::try_connect(url, sasl_config.clone()).await {
Ok(client) => return Ok(client),
Err(e) => {
let error = e.context(format!("Failed to connect to {}", url));
tracing::warn!(?error, "Connection attempt failed");
}
}
}

anyhow::bail!(
"Failed to connect to any Kafka brokers. Attempted {} brokers",
broker_urls.len()
)
}

/// Attempt to open a connection to a specific broker address
async fn try_connect(url: &str, sasl_config: Arc<SASLConfig>) -> anyhow::Result<Self> {
let mut conn = async_connect(url)
.await
.context("Failed to establish TCP connection")?;

tracing::debug!("Authenticating connection");
sasl_auth(&mut conn, url, sasl_config.clone())
.await
.context("SASL authentication failed")?;

let versions = get_versions(&mut conn)
.await
.context("Failed to negotiate protocol versions")?;

Ok(Self {
conn,
url: url.to_string(),
sasl_config,
versions,
clients: HashMap::new(),
})
}

/// Returns a [`KafkaApiClient`] for the given broker URL. If a client
/// for that broker already exists, return it rather than creating a new one.
async fn client_for_broker(&mut self, broker_url: &str) -> anyhow::Result<&mut Self> {
if broker_url.eq(self.url.as_str()) {
return Ok(self);
}

if let std::collections::hash_map::Entry::Vacant(entry) =
self.clients.entry(broker_url.to_string())
{
let new_client = Self::connect(broker_url, self.sasl_config.clone()).await?;
let new_client = Self::try_connect(broker_url, self.sasl_config.clone()).await?;

entry.insert(new_client);
}
Expand All @@ -310,24 +356,6 @@ impl KafkaApiClient {
.expect("guarinteed to be present"))
}

#[instrument(name = "api_client_connect", skip(sasl_config))]
pub async fn connect(broker_url: &str, sasl_config: Arc<SASLConfig>) -> anyhow::Result<Self> {
tracing::debug!("Attempting to establish a new connection!");
let mut conn = async_connect(broker_url).await?;
tracing::debug!("Authenticating opened connection");
sasl_auth(&mut conn, broker_url, sasl_config.clone()).await?;

let versions = get_versions(&mut conn).await?;

Ok(Self {
conn,
url: broker_url.to_string(),
sasl_config,
versions,
clients: HashMap::new(),
})
}

/// Send a request and wait for the response. Per Kafka wire protocol docs:
/// The server guarantees that on a single TCP connection, requests will be processed in the order
/// they are sent and responses will return in that order as well. The broker's request processing
Expand Down Expand Up @@ -382,7 +410,7 @@ impl KafkaApiClient {
Ok(if coord_host.len() == 0 && coord_port == -1 {
self
} else {
self.connect_to(&coord_url).await?
self.client_for_broker(&coord_url).await?
})
}

Expand All @@ -408,7 +436,7 @@ impl KafkaApiClient {

let controller_url = format!("tcp://{}:{}", controller.host.to_string(), controller.port);

self.connect_to(&controller_url).await
self.client_for_broker(&controller_url).await
}

pub fn supported_versions<R: Request>(
Expand Down
76 changes: 62 additions & 14 deletions crates/dekaf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,9 @@ pub struct Cli {
#[arg(long, default_value = "9094", env = "METRICS_PORT")]
metrics_port: u16,

/// The hostname of the default Kafka broker to use for serving group management APIs
#[arg(long, env = "DEFAULT_BROKER_HOSTNAME")]
default_broker_hostname: String,
/// The port of the default Kafka broker to use for serving group management APIs
#[arg(long, default_value = "9092", env = "DEFAULT_BROKER_PORT")]
default_broker_port: u16,
/// List of Kafka broker URLs to try connecting to for group management APIs
#[arg(long, env = "DEFAULT_BROKER_URLS")]
default_broker_urls: Vec<String>,
/// The username for the default Kafka broker to use for serving group management APIs.
/// Currently only supports SASL PLAIN username/password auth.
#[arg(long, env = "DEFAULT_BROKER_USERNAME")]
Expand All @@ -82,6 +79,19 @@ pub struct Cli {
#[arg(long, env = "DEFAULT_BROKER_PASSWORD")]
default_broker_password: String,

// ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------
/// Brokers to use for connections using the legacy refresh-token based connection mode
#[arg(long, env = "LEGACY_MODE_BROKER_URLS")]
legacy_mode_broker_urls: Vec<String>,
/// The username for the Kafka broker to use for serving group management APIs for connections
/// using the legacy refresh-token based connection mode
#[arg(long, env = "LEGACY_MODE_BROKER_USERNAME")]
legacy_mode_broker_username: String,
/// The password for the Kafka broker to use for serving group management API for connections
/// using the legacy refresh-token based connection modes
#[arg(long, env = "LEGACY_MODE_BROKER_PASSWORD")]
legacy_mode_broker_password: String,
// ------------------------------------------------------------------------------------------------
/// The secret used to encrypt/decrypt potentially sensitive strings when sending them
/// to the upstream Kafka broker, e.g topic names in group management metadata.
#[arg(long, env = "ENCRYPTION_SECRET")]
Expand Down Expand Up @@ -158,10 +168,40 @@ async fn main() -> anyhow::Result<()> {
.install_default()
.unwrap();

let upstream_kafka_host = format!(
"tcp://{}:{}",
cli.default_broker_hostname, cli.default_broker_port
);
let upstream_kafka_urls = cli
.default_broker_urls
.clone()
.into_iter()
.map(|url| {
{
let parsed = Url::parse(&url).expect("invalid broker URL");
Ok::<_, anyhow::Error>(format!(
"tcp://{}:{}",
parsed.host().context("invalid broker URL")?,
parsed.port().unwrap_or(9092)
))
}
.context(url)
})
.collect::<anyhow::Result<Vec<_>>>()?;

// ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------
let legacy_mode_kafka_urls = cli
.default_broker_urls
.into_iter()
.map(|url| {
{
let parsed = Url::parse(&url).expect("invalid broker URL");
Ok::<_, anyhow::Error>(format!(
"tcp://{}:{}",
parsed.host().context("invalid broker URL")?,
parsed.port().unwrap_or(9092)
))
}
.context(url)
})
.collect::<anyhow::Result<Vec<_>>>()?;
// ------------------------------------------------------------------------------------------------

let mut stop = async {
tokio::signal::ctrl_c()
Expand All @@ -186,6 +226,8 @@ async fn main() -> anyhow::Result<()> {

let broker_username = cli.default_broker_username.as_str();
let broker_password = cli.default_broker_password.as_str();
let legacy_broker_username = cli.legacy_mode_broker_username.as_str();
let legacy_broker_password = cli.legacy_mode_broker_password.as_str();
if let Some(tls_cfg) = cli.tls {
let axum_rustls_config = RustlsConfig::from_pem_file(
tls_cfg.certificate_file.clone().unwrap(),
Expand Down Expand Up @@ -240,9 +282,12 @@ async fn main() -> anyhow::Result<()> {
Session::new(
app.clone(),
cli.encryption_secret.to_owned(),
upstream_kafka_host.to_string(),
upstream_kafka_urls.clone(),
broker_username.to_string(),
broker_password.to_string()
broker_password.to_string(),
legacy_mode_kafka_urls.clone(),
legacy_broker_username.to_string(),
legacy_broker_password.to_string()
),
socket,
addr,
Expand Down Expand Up @@ -278,9 +323,12 @@ async fn main() -> anyhow::Result<()> {
Session::new(
app.clone(),
cli.encryption_secret.to_owned(),
upstream_kafka_host.to_string(),
upstream_kafka_urls.clone(),
broker_username.to_string(),
broker_password.to_string()
broker_password.to_string(),
legacy_mode_kafka_urls.clone(),
legacy_broker_username.to_string(),
legacy_broker_password.to_string()
),
socket,
addr,
Expand Down
39 changes: 32 additions & 7 deletions crates/dekaf/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,37 @@ pub struct Session {
secret: String,
auth: Option<SessionAuthentication>,
data_preview_state: SessionDataPreviewState,
broker_url: String,
broker_urls: Vec<String>,
broker_username: String,
broker_password: String,

// ------ This can be cleaned up once everyone is migrated off of the legacy connection mode ------
legacy_mode_broker_urls: Vec<String>,
legacy_mode_broker_username: String,
legacy_mode_broker_password: String,
// ------------------------------------------------------------------------------------------------
}

impl Session {
pub fn new(
app: Arc<App>,
secret: String,
broker_url: String,
broker_urls: Vec<String>,
broker_username: String,
broker_password: String,
legacy_mode_broker_urls: Vec<String>,
legacy_mode_broker_username: String,
legacy_mode_broker_password: String,
) -> Self {
Self {
app,
client: None,
broker_url,
broker_urls,
broker_username,
broker_password,
legacy_mode_broker_urls,
legacy_mode_broker_username,
legacy_mode_broker_password,
reads: HashMap::new(),
auth: None,
secret,
Expand All @@ -75,19 +87,32 @@ impl Session {
if let Some(ref mut client) = self.client {
Ok(client)
} else {
let (urls, username, password) = match self.auth {
Some(SessionAuthentication::Task(_)) => (
self.broker_urls.as_slice(),
self.broker_username.clone(),
self.broker_password.clone(),
),
Some(SessionAuthentication::User(_)) => (
self.legacy_mode_broker_urls.as_slice(),
self.legacy_mode_broker_username.clone(),
self.legacy_mode_broker_password.clone(),
),
None => anyhow::bail!("Must be authenticated"),
};
self.client.replace(
KafkaApiClient::connect(
&self.broker_url,
urls,
rsasl::config::SASLConfig::with_credentials(
None,
self.broker_username.clone(),
self.broker_password.clone(),
username,
password,
)?,
).await.context(
"failed to connect or authenticate to upstream Kafka broker used for serving group management APIs",
)?
);
Ok(self.client.as_mut().expect("guarinteed to exist"))
Ok(self.client.as_mut().expect("guaranteed to exist"))
}
}

Expand Down

0 comments on commit dc394c4

Please sign in to comment.