Skip to content

Commit

Permalink
feat: add SASL/PLAIN and TLS config for Kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Aug 9, 2024
1 parent 0e7701b commit 50415f1
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 19 deletions.
59 changes: 56 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = "0.5"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "329431ac4edf630d6bf44976596f89e23429f825", features = [
"transport-tls",
] }
rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
Expand Down
2 changes: 2 additions & 0 deletions src/common/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rskafka.workspace = true
rustls = "0.23"
rustls-pemfile = "2.1"
serde.workspace = true
serde_with.workspace = true
snafu.workspace = true
Expand Down
15 changes: 15 additions & 0 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ mod tests {
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use kafka::datanode::{KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls};
use tests::kafka::common::KafkaTopicConfig;

use super::*;
Expand Down Expand Up @@ -144,6 +145,12 @@ mod tests {
replication_factor = 1
create_topic_timeout = "30s"
topic_name_prefix = "greptimedb_wal_topic"
[sasl]
type = "plain"
username = "hi"
password = "test"
[tls]
server_ca_cert_path = "/path/to/server.pem"
"#;

// Deserialized to MetasrvWalConfig.
Expand Down Expand Up @@ -187,6 +194,14 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
sasl: Some(KafkaClientSasl {
config: KafkaClientSaslConfig::new_plain("hi".to_string(), "test".to_string()),
}),
tls: Some(KafkaClientTls {
server_ca_cert_path: "/path/to/server.pem".to_string(),
client_cert_path: None,
client_key_path: None,
}),
};
assert_eq!(datanode_wal_config, DatanodeWalConfig::Kafka(expected));
}
Expand Down
81 changes: 76 additions & 5 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use rskafka::client::SaslConfig;
use rustls::{ClientConfig, RootCertStore};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};

use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
use crate::error::{self, Result};
use crate::BROKER_ENDPOINT;

/// Kafka wal configurations for datanode.
Expand Down Expand Up @@ -48,20 +54,85 @@ pub struct DatanodeKafkaConfig {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaClientSasl {
#[serde(flatten)]
pub config: Option<KafkaClientSaslConfig>,
pub config: KafkaClientSaslConfig,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum KafkaClientSaslConfig {
Plain { username: String, password: String },
}

#[cfg(test)]
impl KafkaClientSaslConfig {
pub fn new_plain(username: String, password: String) -> Self {
Self::Plain { username, password }
}
}

impl KafkaClientSaslConfig {
/// Converts to [`SaslConfig`].
pub fn to_sasl_config(&self) -> SaslConfig {
match &self {
KafkaClientSaslConfig::Plain { username, password } => SaslConfig::Plain {
username: username.to_string(),
password: password.to_string(),
},
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct KafkaClientTls {
server_ca_cert_path: String,
client_cert_path: String,
client_key_path: String,
pub server_ca_cert_path: String,
pub client_cert_path: Option<String>,
pub client_key_path: Option<String>,
}

impl KafkaClientTls {
/// Builds the [`ClientConfig`].
pub fn to_tsl_config(&self) -> Result<Arc<ClientConfig>> {
let builder = ClientConfig::builder();
let mut roots = RootCertStore::empty();

let root_cert_bytes =
std::fs::read(&self.server_ca_cert_path).context(error::ReadFileSnafu {
path: &self.server_ca_cert_path,
})?;
let mut cursor = Cursor::new(root_cert_bytes);
for cert in rustls_pemfile::certs(&mut cursor)
.collect::<std::result::Result<Vec<_>, _>>()
.context(error::ReadCertsSnafu {
path: &self.server_ca_cert_path,
})?
{
roots.add(cert).context(error::AddCertSnafu)?;
}
let builder = builder.with_root_certificates(roots);

let config = if let (Some(cert_path), Some(key_path)) =
(&self.client_cert_path, &self.client_key_path)
{
let cert_bytes =
std::fs::read(cert_path).context(error::ReadFileSnafu { path: cert_path })?;
let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
.collect::<std::result::Result<Vec<_>, _>>()
.context(error::ReadCertsSnafu { path: cert_path })?;

let key_bytes = std::fs::read(key_path).unwrap();
let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
.context(error::ReadKeySnafu { path: key_path })?
.context(error::KeyNotFoundSnafu { path: key_path })?;

builder
.with_client_auth_cert(client_certs, client_key)
.context(error::SetClientAuthCertSnafu)?
} else {
builder.with_no_client_auth()
};

Ok(Arc::new(config))
}
}

impl Default for DatanodeKafkaConfig {
Expand Down
60 changes: 58 additions & 2 deletions src/common/wal/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use common_macro::stack_trace_debug;
use snafu::Snafu;
use snafu::{Location, Snafu};

#[derive(Snafu)]
#[snafu(visibility(pub))]
Expand All @@ -24,10 +24,66 @@ pub enum Error {
broker_endpoint: String,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to find ipv4 endpoint: {:?}", broker_endpoint))]
EndpointIPV4NotFound { broker_endpoint: String },
EndpointIPV4NotFound {
broker_endpoint: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to read file, path: {}", path))]
ReadFile {
path: String,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to add root cert"))]
AddCert {
#[snafu(source)]
error: rustls::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to read cert, path: {}", path))]
ReadCerts {
path: String,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to read key, path: {}", path))]
ReadKey {
path: String,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to parse key, path: {}", path))]
KeyNotFound {
path: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to set client auth cert"))]
SetClientAuthCert {
#[snafu(source)]
error: rustls::Error,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
7 changes: 7 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ use crate::kafka::producer::ProduceRequest;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to create TLS Config"))]
TlsConfig {
#[snafu(implicit)]
location: Location,
source: common_wal::error::Error,
},

#[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))]
InvalidProvider {
#[snafu(implicit)]
Expand Down
Loading

0 comments on commit 50415f1

Please sign in to comment.