diff --git a/Cargo.lock b/Cargo.lock index 9a6f601cad5b..d3f98114e217 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2446,6 +2446,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpp_demangle" version = "0.4.3" @@ -8254,7 +8263,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.12.1", "log", "multimap", @@ -8275,7 +8284,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.13.0", "log", "multimap", @@ -9134,10 +9143,27 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rsasl" +version = "2.0.2" +source = "git+https://github.com/wenyxu/rsasl.git?rev=06ebb683d5539c3410de4ce9fa37ff9b97e790a4#06ebb683d5539c3410de4ce9fa37ff9b97e790a4" +dependencies = [ + "base64 0.22.1", + "core2", + "digest", + "hmac", + "pbkdf2", + "rand", + "serde_json", + "sha2", + "stringprep", + "thiserror", +] + [[package]] name = "rskafka" version = "0.5.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=329431ac4edf630d6bf44976596f89e23429f825#329431ac4edf630d6bf44976596f89e23429f825" +source = "git+https://github.com/WenyXu/rskafka.git?rev=940c6030012c5b746fad819fb72e3325b26e39de#940c6030012c5b746fad819fb72e3325b26e39de" dependencies = [ "async-trait", "bytes", @@ -9150,6 +9176,7 @@ dependencies = [ "parking_lot 0.12.3", "pin-project-lite", "rand", + "rsasl", "rustls 0.23.10", "snap", "thiserror", @@ -10123,12 +10150,13 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.117" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" dependencies = [ "indexmap 2.2.6", "itoa", + "memchr", "ryu", "serde", ] @@ -10561,7 +10589,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.66", @@ -11690,18 +11718,18 @@ checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9" [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 632feb0c1f56..b2bbbb4c48f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,8 @@ reqwest = { version = "0.12", default-features = false, features = [ "stream", "multipart", ] } -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "329431ac4edf630d6bf44976596f89e23429f825", features = [ +# SCRAM-SHA-512 requires https://github.com/dequbed/rsasl/pull/48, https://github.com/influxdata/rskafka/pull/247 +rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "940c6030012c5b746fad819fb72e3325b26e39de", features = [ "transport-tls", ] } rstest = "0.21" diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 1ae8fc0d2501..b5a16c65e889 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_base::readable_size::ReadableSize; -use rskafka::client::SaslConfig; +use rskafka::client::{Credentials, SaslConfig}; use rustls::{ClientConfig, RootCertStore}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -58,9 +58,11 @@ pub struct KafkaClientSasl { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(tag = "type", rename_all = "snake_case")] +#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")] pub enum KafkaClientSaslConfig { Plain { username: String, password: String }, + ScramSha256 { username: String, password: String }, + ScramSha512 { username: String, password: String }, } #[cfg(test)] @@ -72,12 +74,17 @@ impl KafkaClientSaslConfig { impl KafkaClientSaslConfig { /// Converts to [`SaslConfig`]. - pub fn to_sasl_config(&self) -> SaslConfig { - match &self { - KafkaClientSaslConfig::Plain { username, password } => SaslConfig::Plain { - username: username.to_string(), - password: password.to_string(), - }, + pub fn into_sasl_config(self) -> SaslConfig { + match self { + KafkaClientSaslConfig::Plain { username, password } => { + SaslConfig::Plain(Credentials::new(username, password)) + } + KafkaClientSaslConfig::ScramSha256 { username, password } => { + SaslConfig::ScramSha256(Credentials::new(username, password)) + } + KafkaClientSaslConfig::ScramSha512 { username, password } => { + SaslConfig::ScramSha512(Credentials::new(username, password)) + } } } } diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index b670612fbe95..3dc03ff297ec 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -85,7 +85,7 @@ impl ClientManager { .context(ResolveKafkaEndpointSnafu)?; let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config); if let Some(sasl) = &config.sasl { - builder = builder.sasl_config(sasl.config.to_sasl_config()); + builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; if let Some(tls) = &config.tls { builder = builder.tls_config(tls.to_tsl_config().context(TlsConfigSnafu)?)