Skip to content

Commit

Permalink
feat: support SASL SCRAM-SHA-256 and SCRAM-SHA-512
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 11, 2024
1 parent c1fc78e commit 04c39cc
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 20 deletions.
48 changes: 38 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "329431ac4edf630d6bf44976596f89e23429f825", features = [
# SCRAM-SHA-512 requires https://github.com/dequbed/rsasl/pull/48, https://github.com/influxdata/rskafka/pull/247
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "940c6030012c5b746fad819fb72e3325b26e39de", features = [
"transport-tls",
] }
rstest = "0.21"
Expand Down
23 changes: 15 additions & 8 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use rskafka::client::SaslConfig;
use rskafka::client::{Credentials, SaslConfig};
use rustls::{ClientConfig, RootCertStore};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -58,9 +58,11 @@ pub struct KafkaClientSasl {
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
pub enum KafkaClientSaslConfig {
Plain { username: String, password: String },
ScramSha256 { username: String, password: String },
ScramSha512 { username: String, password: String },
}

#[cfg(test)]
Expand All @@ -72,12 +74,17 @@ impl KafkaClientSaslConfig {

impl KafkaClientSaslConfig {
/// Converts to [`SaslConfig`].
pub fn to_sasl_config(&self) -> SaslConfig {
match &self {
KafkaClientSaslConfig::Plain { username, password } => SaslConfig::Plain {
username: username.to_string(),
password: password.to_string(),
},
pub fn into_sasl_config(self) -> SaslConfig {
match self {
KafkaClientSaslConfig::Plain { username, password } => {
SaslConfig::Plain(Credentials::new(username, password))
}
KafkaClientSaslConfig::ScramSha256 { username, password } => {
SaslConfig::ScramSha256(Credentials::new(username, password))
}
KafkaClientSaslConfig::ScramSha512 { username, password } => {
SaslConfig::ScramSha512(Credentials::new(username, password))
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ClientManager {
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
if let Some(sasl) = &config.sasl {
builder = builder.sasl_config(sasl.config.to_sasl_config());
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &config.tls {
builder = builder.tls_config(tls.to_tsl_config().context(TlsConfigSnafu)?)
Expand Down

0 comments on commit 04c39cc

Please sign in to comment.