From 50415f1be1fd694d31e86f28ff1a91444acebfc7 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 9 Aug 2024 06:58:18 +0000 Subject: [PATCH] feat: add SASL/PLAIN and TLS config for Kafka client --- Cargo.lock | 59 ++++++++++++++- Cargo.toml | 4 +- src/common/wal/Cargo.toml | 2 + src/common/wal/src/config.rs | 15 ++++ src/common/wal/src/config/kafka/datanode.rs | 81 +++++++++++++++++++-- src/common/wal/src/error.rs | 60 ++++++++++++++- src/log-store/src/error.rs | 7 ++ src/log-store/src/kafka/client_manager.rs | 20 +++-- 8 files changed, 229 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2f714e587bef..927fc00a81ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,6 +811,33 @@ dependencies = [ "cc", ] +[[package]] +name = "aws-lc-rs" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae74d9bd0a7530e8afd1770739ad34b36838829d6ad61818f9230f683f5ad77" +dependencies = [ + "aws-lc-sys", + "mirai-annotations", + "paste", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f0e249228c6ad2d240c2dc94b714d711629d52bad946075d8e9b2f5391f0703" +dependencies = [ + "bindgen", + "cc", + "cmake", + "dunce", + "fs_extra", + "libc", + "paste", +] + [[package]] name = "axum" version = "0.6.20" @@ -980,12 +1007,15 @@ dependencies = [ "itertools 0.12.1", "lazy_static", "lazycell", + "log", + "prettyplease", "proc-macro2", "quote", "regex", "rustc-hash", "shlex", "syn 2.0.66", + "which", ] [[package]] @@ -2279,6 +2309,8 @@ dependencies = [ "futures-util", "humantime-serde", "rskafka", + "rustls 0.23.10", + "rustls-pemfile 2.1.2", "serde", "serde_json", "serde_with", @@ -3472,6 +3504,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "duration-str" version = "0.11.2" @@ -4015,6 +4053,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "fsevent-sys" version = "4.1.0" @@ -6271,6 +6315,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mirai-annotations" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1" + [[package]] name = "mito2" version = "0.9.1" @@ -9135,8 +9185,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "132ecfa3cd9c3825208524a80881f115337762904ad3f0174e87975b2d79162c" +source = "git+https://github.com/influxdata/rskafka.git?rev=329431ac4edf630d6bf44976596f89e23429f825#329431ac4edf630d6bf44976596f89e23429f825" dependencies = [ "async-trait", "bytes", @@ -9149,11 +9198,13 @@ dependencies = [ "parking_lot 0.12.3", "pin-project-lite", "rand", + "rustls 0.23.10", "snap", "thiserror", "tokio", + "tokio-rustls 0.26.0", "tracing", - "zstd 0.12.4", + "zstd 0.13.1", ] [[package]] @@ -9405,6 +9456,7 @@ version = "0.23.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05cff451f60db80f490f3c182b77c35260baace73209e9cdbbe526bfe3a4d402" dependencies = [ + "aws-lc-rs", "log", "once_cell", "ring 0.17.8", @@ -9468,6 +9520,7 @@ version = "0.102.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" dependencies = [ + "aws-lc-rs", "ring 0.17.8", "rustls-pki-types", "untrusted 0.9.0", diff --git a/Cargo.toml b/Cargo.toml index 5d473de72226..632feb0c1f56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,9 @@ reqwest = { version = "0.12", default-features = false, features = [ "stream", "multipart", ] } -rskafka = "0.5" +rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "329431ac4edf630d6bf44976596f89e23429f825", features = [ + "transport-tls", +] } rstest = "0.21" rstest_reuse = "0.7" rust_decimal = "1.33" diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index a39baf438f19..25862bcd90d0 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -18,6 +18,8 @@ common-telemetry.workspace = true futures-util.workspace = true humantime-serde.workspace = true rskafka.workspace = true +rustls = "0.23" +rustls-pemfile = "2.1" serde.workspace = true serde_with.workspace = true snafu.workspace = true diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 6edee1703c81..1f83376fb732 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -75,6 +75,7 @@ mod tests { use std::time::Duration; use common_base::readable_size::ReadableSize; + use kafka::datanode::{KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls}; use tests::kafka::common::KafkaTopicConfig; use super::*; @@ -144,6 +145,12 @@ mod tests { replication_factor = 1 create_topic_timeout = "30s" topic_name_prefix = "greptimedb_wal_topic" + [sasl] + type = "plain" + username = "hi" + password = "test" + [tls] + server_ca_cert_path = "/path/to/server.pem" "#; // Deserialized to MetasrvWalConfig. @@ -187,6 +194,14 @@ mod tests { replication_factor: 1, create_topic_timeout: Duration::from_secs(30), }, + sasl: Some(KafkaClientSasl { + config: KafkaClientSaslConfig::new_plain("hi".to_string(), "test".to_string()), + }), + tls: Some(KafkaClientTls { + server_ca_cert_path: "/path/to/server.pem".to_string(), + client_cert_path: None, + client_key_path: None, + }), }; assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected)); } diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 4146bc29d0ff..1ae8fc0d2501 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -12,12 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io::Cursor; +use std::sync::Arc; use std::time::Duration; use common_base::readable_size::ReadableSize; +use rskafka::client::SaslConfig; +use rustls::{ClientConfig, RootCertStore}; use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; +use crate::error::{self, Result}; use crate::BROKER_ENDPOINT; /// Kafka wal configurations for datanode. @@ -48,20 +54,85 @@ pub struct DatanodeKafkaConfig { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct KafkaClientSasl { #[serde(flatten)] - pub config: Option, + pub config: KafkaClientSaslConfig, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(tag = "type")] +#[serde(tag = "type", rename_all = "snake_case")] pub enum KafkaClientSaslConfig { Plain { username: String, password: String }, } +#[cfg(test)] +impl KafkaClientSaslConfig { + pub fn new_plain(username: String, password: String) -> Self { + Self::Plain { username, password } + } +} + +impl KafkaClientSaslConfig { + /// Converts to [`SaslConfig`]. + pub fn to_sasl_config(&self) -> SaslConfig { + match &self { + KafkaClientSaslConfig::Plain { username, password } => SaslConfig::Plain { + username: username.to_string(), + password: password.to_string(), + }, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct KafkaClientTls { - server_ca_cert_path: String, - client_cert_path: String, - client_key_path: String, + pub server_ca_cert_path: String, + pub client_cert_path: Option, + pub client_key_path: Option, +} + +impl KafkaClientTls { + /// Builds the [`ClientConfig`]. + pub fn to_tsl_config(&self) -> Result> { + let builder = ClientConfig::builder(); + let mut roots = RootCertStore::empty(); + + let root_cert_bytes = + std::fs::read(&self.server_ca_cert_path).context(error::ReadFileSnafu { + path: &self.server_ca_cert_path, + })?; + let mut cursor = Cursor::new(root_cert_bytes); + for cert in rustls_pemfile::certs(&mut cursor) + .collect::, _>>() + .context(error::ReadCertsSnafu { + path: &self.server_ca_cert_path, + })? + { + roots.add(cert).context(error::AddCertSnafu)?; + } + let builder = builder.with_root_certificates(roots); + + let config = if let (Some(cert_path), Some(key_path)) = + (&self.client_cert_path, &self.client_key_path) + { + let cert_bytes = + std::fs::read(cert_path).context(error::ReadFileSnafu { path: cert_path })?; + let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes)) + .collect::, _>>() + .context(error::ReadCertsSnafu { path: cert_path })?; + + let key_bytes = std::fs::read(key_path).unwrap(); + let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes)) + .context(error::ReadKeySnafu { path: key_path })? + .context(error::KeyNotFoundSnafu { path: key_path })?; + + builder + .with_client_auth_cert(client_certs, client_key) + .context(error::SetClientAuthCertSnafu)? + } else { + builder.with_no_client_auth() + }; + + Ok(Arc::new(config)) + } } impl Default for DatanodeKafkaConfig { diff --git a/src/common/wal/src/error.rs b/src/common/wal/src/error.rs index 147eeb293da1..24953d400733 100644 --- a/src/common/wal/src/error.rs +++ b/src/common/wal/src/error.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_macro::stack_trace_debug; -use snafu::Snafu; +use snafu::{Location, Snafu}; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -24,10 +24,66 @@ pub enum Error { broker_endpoint: String, #[snafu(source)] error: std::io::Error, + #[snafu(implicit)] + location: Location, }, #[snafu(display("Failed to find ipv4 endpoint: {:?}", broker_endpoint))] - EndpointIPV4NotFound { broker_endpoint: String }, + EndpointIPV4NotFound { + broker_endpoint: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to read file, path: {}", path))] + ReadFile { + path: String, + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to add root cert"))] + AddCert { + #[snafu(source)] + error: rustls::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to read cert, path: {}", path))] + ReadCerts { + path: String, + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to read key, path: {}", path))] + ReadKey { + path: String, + #[snafu(source)] + error: std::io::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to parse key, path: {}", path))] + KeyNotFound { + path: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to set client auth cert"))] + SetClientAuthCert { + #[snafu(source)] + error: rustls::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 4918bdf3567b..222725d06ac7 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -27,6 +27,13 @@ use crate::kafka::producer::ProduceRequest; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to create TLS Config"))] + TlsConfig { + #[snafu(implicit)] + location: Location, + source: common_wal::error::Error, + }, + #[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))] InvalidProvider { #[snafu(implicit)] diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 089f05f008c4..b670612fbe95 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -25,7 +25,7 @@ use tokio::sync::{Mutex, RwLock}; use super::producer::OrderedBatchProducer; use crate::error::{ - BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, + BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu, }; use crate::kafka::producer::OrderedBatchProducerRef; @@ -83,13 +83,17 @@ impl ClientManager { let broker_endpoints = common_wal::resolve_to_ipv4(&config.broker_endpoints) .await .context(ResolveKafkaEndpointSnafu)?; - let client = ClientBuilder::new(broker_endpoints) - .backoff_config(backoff_config) - .build() - .await - .with_context(|_| BuildClientSnafu { - broker_endpoints: config.broker_endpoints.clone(), - })?; + let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config); + if let Some(sasl) = &config.sasl { + builder = builder.sasl_config(sasl.config.to_sasl_config()); + }; + if let Some(tls) = &config.tls { + builder = builder.tls_config(tls.to_tsl_config().context(TlsConfigSnafu)?) + }; + + let client = builder.build().await.with_context(|_| BuildClientSnafu { + broker_endpoints: config.broker_endpoints.clone(), + })?; Ok(Self { client,