diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml
index 8b4e0b8416ad..13bbc41db125 100644
--- a/.github/workflows/develop.yml
+++ b/.github/workflows/develop.yml
@@ -719,6 +719,7 @@ jobs:
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
+ GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
- name: Codecov upload
uses: codecov/codecov-action@v4
diff --git a/Cargo.lock b/Cargo.lock
index 255d07496839..21993b96d03f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2280,6 +2280,9 @@ dependencies = [
"futures-util",
"humantime-serde",
"rskafka",
+ "rustls 0.23.10",
+ "rustls-native-certs",
+ "rustls-pemfile 2.1.2",
"serde",
"serde_json",
"serde_with",
@@ -2445,6 +2448,15 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f"
+[[package]]
+name = "core2"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505"
+dependencies = [
+ "memchr",
+]
+
[[package]]
name = "cpp_demangle"
version = "0.4.3"
@@ -8253,7 +8265,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
dependencies = [
"bytes",
- "heck 0.4.1",
+ "heck 0.5.0",
"itertools 0.12.1",
"log",
"multimap",
@@ -8274,7 +8286,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1"
dependencies = [
"bytes",
- "heck 0.4.1",
+ "heck 0.5.0",
"itertools 0.13.0",
"log",
"multimap",
@@ -9133,11 +9145,27 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "rsasl"
+version = "2.0.2"
+source = "git+https://github.com/wenyxu/rsasl.git?rev=06ebb683d5539c3410de4ce9fa37ff9b97e790a4#06ebb683d5539c3410de4ce9fa37ff9b97e790a4"
+dependencies = [
+ "base64 0.22.1",
+ "core2",
+ "digest",
+ "hmac",
+ "pbkdf2",
+ "rand",
+ "serde_json",
+ "sha2",
+ "stringprep",
+ "thiserror",
+]
+
[[package]]
name = "rskafka"
version = "0.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "132ecfa3cd9c3825208524a80881f115337762904ad3f0174e87975b2d79162c"
+source = "git+https://github.com/WenyXu/rskafka.git?rev=940c6030012c5b746fad819fb72e3325b26e39de#940c6030012c5b746fad819fb72e3325b26e39de"
dependencies = [
"async-trait",
"bytes",
@@ -9150,11 +9178,14 @@ dependencies = [
"parking_lot 0.12.3",
"pin-project-lite",
"rand",
+ "rsasl",
+ "rustls 0.23.10",
"snap",
"thiserror",
"tokio",
+ "tokio-rustls 0.26.0",
"tracing",
- "zstd 0.12.4",
+ "zstd 0.13.1",
]
[[package]]
@@ -9423,9 +9454,9 @@ dependencies = [
[[package]]
name = "rustls-native-certs"
-version = "0.7.0"
+version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
+checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba"
dependencies = [
"openssl-probe",
"rustls-pemfile 2.1.2",
@@ -10127,12 +10158,13 @@ dependencies = [
[[package]]
name = "serde_json"
-version = "1.0.117"
+version = "1.0.122"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3"
+checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da"
dependencies = [
"indexmap 2.2.6",
"itoa",
+ "memchr",
"ryu",
"serde",
]
@@ -10565,7 +10597,7 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5"
dependencies = [
- "heck 0.4.1",
+ "heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.66",
@@ -11694,18 +11726,18 @@ checksum = "23d434d3f8967a09480fb04132ebe0a3e088c173e6d0ee7897abbdf4eab0f8b9"
[[package]]
name = "thiserror"
-version = "1.0.61"
+version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709"
+checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.61"
+version = "1.0.63"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533"
+checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
diff --git a/Cargo.toml b/Cargo.toml
index 2f4258f01baa..32a65dac6e01 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -151,7 +151,10 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
-rskafka = "0.5"
+# SCRAM-SHA-512 requires https://github.com/dequbed/rsasl/pull/48, https://github.com/influxdata/rskafka/pull/247
+rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "940c6030012c5b746fad819fb72e3325b26e39de", features = [
+ "transport-tls",
+] }
rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
diff --git a/config/config.md b/config/config.md
index 0a7e8290f162..dfd2ab889c6d 100644
--- a/config/config.md
+++ b/config/config.md
@@ -67,6 +67,11 @@
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. |
+| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.
**It's only used when the provider is `kafka`**. |
+| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. |
+| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
**It's only used when the provider is `kafka`**. |
+| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.
**It's only used when the provider is `kafka`**. |
+| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.
**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.
**It's only used when the provider is `kafka`**. |
| `wal.backoff_init` | String | `500ms` | The initial backoff delay.
**It's only used when the provider is `kafka`**. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 7754542c6238..81cbc4703c4d 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -187,6 +187,24 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"
+# The Kafka SASL configuration.
+# **It's only used when the provider is `kafka`**.
+# Available SASL mechanisms:
+# - `PLAIN`
+# - `SCRAM-SHA-256`
+# - `SCRAM-SHA-512`
+# [wal.sasl]
+# type = "SCRAM-SHA-512"
+# username = "user_kafka"
+# password = "secret"
+
+# The Kafka TLS configuration.
+# **It's only used when the provider is `kafka`**.
+# [wal.tls]
+# server_ca_cert_path = "/path/to/server_cert"
+# client_cert_path = "/path/to/client_cert"
+# client_key_path = "/path/to/key"
+
# Example of using S3 as the storage.
# [storage]
# type = "S3"
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index f748d8586d0c..b4ed23b2fe02 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -124,6 +124,24 @@ backoff_base = 2
## Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
backoff_deadline = "5mins"
+# The Kafka SASL configuration.
+# **It's only used when the provider is `kafka`**.
+# Available SASL mechanisms:
+# - `PLAIN`
+# - `SCRAM-SHA-256`
+# - `SCRAM-SHA-512`
+# [wal.sasl]
+# type = "SCRAM-SHA-512"
+# username = "user_kafka"
+# password = "secret"
+
+# The Kafka TLS configuration.
+# **It's only used when the provider is `kafka`**.
+# [wal.tls]
+# server_ca_cert_path = "/path/to/server_cert"
+# client_cert_path = "/path/to/client_cert"
+# client_key_path = "/path/to/key"
+
## The logging options.
[logging]
## The directory to store the log files.
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 73775b9fc1c4..43f36373706b 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -171,6 +171,28 @@ sync_period = "10s"
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
+## Number of topics to be created upon start.
+## **It's only used when the provider is `kafka`**.
+num_topics = 64
+
+## Topic selector type.
+## Available selector types:
+## - `round_robin` (default)
+## **It's only used when the provider is `kafka`**.
+selector_type = "round_robin"
+
+## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
+## **It's only used when the provider is `kafka`**.
+topic_name_prefix = "greptimedb_wal_topic"
+
+## Expected number of replicas of each partition.
+## **It's only used when the provider is `kafka`**.
+replication_factor = 1
+
+## Above which a topic creation operation will be cancelled.
+## **It's only used when the provider is `kafka`**.
+create_topic_timeout = "30s"
+
## The max size of a single producer batch.
## Warning: Kafka has a default limit of 1MB per message in a topic.
## **It's only used when the provider is `kafka`**.
@@ -196,6 +218,24 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"
+# The Kafka SASL configuration.
+# **It's only used when the provider is `kafka`**.
+# Available SASL mechanisms:
+# - `PLAIN`
+# - `SCRAM-SHA-256`
+# - `SCRAM-SHA-512`
+# [wal.sasl]
+# type = "SCRAM-SHA-512"
+# username = "user_kafka"
+# password = "secret"
+
+# The Kafka TLS configuration.
+# **It's only used when the provider is `kafka`**.
+# [wal.tls]
+# server_ca_cert_path = "/path/to/server_cert"
+# client_cert_path = "/path/to/client_cert"
+# client_key_path = "/path/to/key"
+
## Metadata storage options.
[metadata_store]
## Kv file size in bytes.
diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs
index ccd887345c07..6d436edae03e 100644
--- a/src/common/meta/src/error.rs
+++ b/src/common/meta/src/error.rs
@@ -499,6 +499,13 @@ pub enum Error {
error: rskafka::client::error::Error,
},
+ #[snafu(display("Failed to create TLS Config"))]
+ TlsConfig {
+ #[snafu(implicit)]
+ location: Location,
+ source: common_wal::error::Error,
+ },
+
#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },
@@ -714,7 +721,8 @@ impl ErrorExt for Error {
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. }
| MismatchPrefix { .. }
- | DelimiterNotFound { .. } => StatusCode::InvalidArguments,
+ | DelimiterNotFound { .. }
+ | TlsConfig { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,
diff --git a/src/common/meta/src/wal_options_allocator.rs b/src/common/meta/src/wal_options_allocator.rs
index 5fb3db6e20eb..ba0c6f407fda 100644
--- a/src/common/meta/src/wal_options_allocator.rs
+++ b/src/common/meta/src/wal_options_allocator.rs
@@ -123,7 +123,7 @@ pub fn prepare_wal_options(
#[cfg(test)]
mod tests {
- use common_wal::config::kafka::common::KafkaTopicConfig;
+ use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;
@@ -166,7 +166,10 @@ mod tests {
..Default::default()
};
let config = MetasrvKafkaConfig {
- broker_endpoints,
+ connection: KafkaConnectionConfig {
+ broker_endpoints,
+ ..Default::default()
+ },
kafka_topic,
..Default::default()
};
diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs
index ec88e37cd14d..060f82d8d71e 100644
--- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs
+++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs
@@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
- ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
+ ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
@@ -117,15 +117,22 @@ impl TopicManager {
base: self.config.backoff.base as f64,
deadline: self.config.backoff.deadline,
};
- let broker_endpoints = common_wal::resolve_to_ipv4(&self.config.broker_endpoints)
- .await
- .context(ResolveKafkaEndpointSnafu)?;
- let client = ClientBuilder::new(broker_endpoints)
- .backoff_config(backoff_config)
+ let broker_endpoints =
+ common_wal::resolve_to_ipv4(&self.config.connection.broker_endpoints)
+ .await
+ .context(ResolveKafkaEndpointSnafu)?;
+ let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
+ if let Some(sasl) = &self.config.connection.sasl {
+ builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
+ };
+ if let Some(tls) = &self.config.connection.tls {
+ builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
+ };
+ let client = builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
- broker_endpoints: self.config.broker_endpoints.clone(),
+ broker_endpoints: self.config.connection.broker_endpoints.clone(),
})?;
let control_client = client
@@ -242,7 +249,7 @@ impl TopicManager {
#[cfg(test)]
mod tests {
- use common_wal::config::kafka::common::KafkaTopicConfig;
+ use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::test_util::run_test_with_kafka_wal;
use super::*;
@@ -289,7 +296,10 @@ mod tests {
..Default::default()
};
let config = MetasrvKafkaConfig {
- broker_endpoints,
+ connection: KafkaConnectionConfig {
+ broker_endpoints,
+ ..Default::default()
+ },
kafka_topic,
..Default::default()
};
diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml
index a39baf438f19..0bced0dd38d8 100644
--- a/src/common/wal/Cargo.toml
+++ b/src/common/wal/Cargo.toml
@@ -18,6 +18,9 @@ common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rskafka.workspace = true
+rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] }
+rustls-native-certs = "0.7"
+rustls-pemfile = "2.1"
serde.workspace = true
serde_with.workspace = true
snafu.workspace = true
diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs
index 6edee1703c81..9bf3280c5a29 100644
--- a/src/common/wal/src/config.rs
+++ b/src/common/wal/src/config.rs
@@ -23,6 +23,7 @@ use crate::config::raft_engine::RaftEngineConfig;
/// Wal configurations for metasrv.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[serde(tag = "provider", rename_all = "snake_case")]
+#[allow(clippy::large_enum_variant)]
pub enum MetasrvWalConfig {
#[default]
RaftEngine,
@@ -48,7 +49,7 @@ impl From for MetasrvWalConfig {
match config {
DatanodeWalConfig::RaftEngine(_) => Self::RaftEngine,
DatanodeWalConfig::Kafka(config) => Self::Kafka(MetasrvKafkaConfig {
- broker_endpoints: config.broker_endpoints,
+ connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
}),
@@ -61,7 +62,7 @@ impl From for DatanodeWalConfig {
match config {
MetasrvWalConfig::RaftEngine => Self::RaftEngine(RaftEngineConfig::default()),
MetasrvWalConfig::Kafka(config) => Self::Kafka(DatanodeKafkaConfig {
- broker_endpoints: config.broker_endpoints,
+ connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
..Default::default()
@@ -75,6 +76,9 @@ mod tests {
use std::time::Duration;
use common_base::readable_size::ReadableSize;
+ use kafka::common::{
+ KafkaClientSasl, KafkaClientSaslConfig, KafkaClientTls, KafkaConnectionConfig,
+ };
use tests::kafka::common::KafkaTopicConfig;
use super::*;
@@ -144,12 +148,31 @@ mod tests {
replication_factor = 1
create_topic_timeout = "30s"
topic_name_prefix = "greptimedb_wal_topic"
+ [tls]
+ server_ca_cert_path = "/path/to/server.pem"
+ [sasl]
+ type = "SCRAM-SHA-512"
+ username = "hi"
+ password = "test"
"#;
// Deserialized to MetasrvWalConfig.
let metasrv_wal_config: MetasrvWalConfig = toml::from_str(toml_str).unwrap();
let expected = MetasrvKafkaConfig {
- broker_endpoints: vec!["127.0.0.1:9092".to_string()],
+ connection: KafkaConnectionConfig {
+ broker_endpoints: vec!["127.0.0.1:9092".to_string()],
+ sasl: Some(KafkaClientSasl {
+ config: KafkaClientSaslConfig::ScramSha512 {
+ username: "hi".to_string(),
+ password: "test".to_string(),
+ },
+ }),
+ tls: Some(KafkaClientTls {
+ server_ca_cert_path: Some("/path/to/server.pem".to_string()),
+ client_cert_path: None,
+ client_key_path: None,
+ }),
+ },
backoff: BackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
@@ -170,7 +193,20 @@ mod tests {
// Deserialized to DatanodeWalConfig.
let datanode_wal_config: DatanodeWalConfig = toml::from_str(toml_str).unwrap();
let expected = DatanodeKafkaConfig {
- broker_endpoints: vec!["127.0.0.1:9092".to_string()],
+ connection: KafkaConnectionConfig {
+ broker_endpoints: vec!["127.0.0.1:9092".to_string()],
+ sasl: Some(KafkaClientSasl {
+ config: KafkaClientSaslConfig::ScramSha512 {
+ username: "hi".to_string(),
+ password: "test".to_string(),
+ },
+ }),
+ tls: Some(KafkaClientTls {
+ server_ca_cert_path: Some("/path/to/server.pem".to_string()),
+ client_cert_path: None,
+ client_key_path: None,
+ }),
+ },
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig {
diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs
index e61823938546..f68ddfa5d8b2 100644
--- a/src/common/wal/src/config/kafka/common.rs
+++ b/src/common/wal/src/config/kafka/common.rs
@@ -12,16 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::io::Cursor;
+use std::sync::Arc;
use std::time::Duration;
+use rskafka::client::{Credentials, SaslConfig};
+use rustls::{ClientConfig, RootCertStore};
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;
+use snafu::{OptionExt, ResultExt};
-use crate::{TopicSelectorType, TOPIC_NAME_PREFIX};
+use crate::error::{self, Result};
+use crate::{TopicSelectorType, BROKER_ENDPOINT, TOPIC_NAME_PREFIX};
with_prefix!(pub backoff_prefix "backoff_");
-/// Backoff configurations for kafka clients.
+/// Backoff configurations for kafka client.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct BackoffConfig {
@@ -49,6 +55,134 @@ impl Default for BackoffConfig {
}
}
+/// The SASL configurations for kafka client.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub struct KafkaClientSasl {
+ #[serde(flatten)]
+ pub config: KafkaClientSaslConfig,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(tag = "type", rename_all = "SCREAMING-KEBAB-CASE")]
+pub enum KafkaClientSaslConfig {
+ Plain {
+ username: String,
+ password: String,
+ },
+ #[serde(rename = "SCRAM-SHA-256")]
+ ScramSha256 {
+ username: String,
+ password: String,
+ },
+ #[serde(rename = "SCRAM-SHA-512")]
+ ScramSha512 {
+ username: String,
+ password: String,
+ },
+}
+
+impl KafkaClientSaslConfig {
+ /// Converts to [`SaslConfig`].
+ pub fn into_sasl_config(self) -> SaslConfig {
+ match self {
+ KafkaClientSaslConfig::Plain { username, password } => {
+ SaslConfig::Plain(Credentials::new(username, password))
+ }
+ KafkaClientSaslConfig::ScramSha256 { username, password } => {
+ SaslConfig::ScramSha256(Credentials::new(username, password))
+ }
+ KafkaClientSaslConfig::ScramSha512 { username, password } => {
+ SaslConfig::ScramSha512(Credentials::new(username, password))
+ }
+ }
+ }
+}
+
+/// The TLS configurations for kafka client.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+pub struct KafkaClientTls {
+ pub server_ca_cert_path: Option,
+ pub client_cert_path: Option,
+ pub client_key_path: Option,
+}
+
+impl KafkaClientTls {
+ /// Builds the [`ClientConfig`].
+ pub async fn to_tls_config(&self) -> Result> {
+ let builder = ClientConfig::builder();
+ let mut roots = RootCertStore::empty();
+
+ if let Some(server_ca_cert_path) = &self.server_ca_cert_path {
+ let root_cert_bytes =
+ tokio::fs::read(&server_ca_cert_path)
+ .await
+ .context(error::ReadFileSnafu {
+ path: server_ca_cert_path,
+ })?;
+ let mut cursor = Cursor::new(root_cert_bytes);
+ for cert in rustls_pemfile::certs(&mut cursor)
+ .collect::, _>>()
+ .context(error::ReadCertsSnafu {
+ path: server_ca_cert_path,
+ })?
+ {
+ roots.add(cert).context(error::AddCertSnafu)?;
+ }
+ };
+ roots.add_parsable_certificates(
+ rustls_native_certs::load_native_certs().context(error::LoadSystemCertsSnafu)?,
+ );
+
+ let builder = builder.with_root_certificates(roots);
+ let config = if let (Some(cert_path), Some(key_path)) =
+ (&self.client_cert_path, &self.client_key_path)
+ {
+ let cert_bytes = tokio::fs::read(cert_path)
+ .await
+ .context(error::ReadFileSnafu { path: cert_path })?;
+ let client_certs = rustls_pemfile::certs(&mut Cursor::new(cert_bytes))
+ .collect::, _>>()
+ .context(error::ReadCertsSnafu { path: cert_path })?;
+ let key_bytes = tokio::fs::read(key_path)
+ .await
+ .context(error::ReadFileSnafu { path: key_path })?;
+ let client_key = rustls_pemfile::private_key(&mut Cursor::new(key_bytes))
+ .context(error::ReadKeySnafu { path: key_path })?
+ .context(error::KeyNotFoundSnafu { path: key_path })?;
+
+ builder
+ .with_client_auth_cert(client_certs, client_key)
+ .context(error::SetClientAuthCertSnafu)?
+ } else {
+ builder.with_no_client_auth()
+ };
+
+ Ok(Arc::new(config))
+ }
+}
+
+/// The connection configurations for kafka clients.
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+#[serde(default)]
+pub struct KafkaConnectionConfig {
+ /// The broker endpoints of the Kafka cluster.
+ pub broker_endpoints: Vec,
+ /// Client SASL.
+ pub sasl: Option,
+ /// Client TLS config
+ pub tls: Option,
+}
+
+impl Default for KafkaConnectionConfig {
+ fn default() -> Self {
+ Self {
+ broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
+ sasl: None,
+ tls: None,
+ }
+ }
+}
+
/// Topic configurations for kafka clients.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs
index b01e0635f637..a1260c05effd 100644
--- a/src/common/wal/src/config/kafka/datanode.rs
+++ b/src/common/wal/src/config/kafka/datanode.rs
@@ -17,15 +17,16 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
+use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
-use crate::BROKER_ENDPOINT;
/// Kafka wal configurations for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct DatanodeKafkaConfig {
- /// The broker endpoints of the Kafka cluster.
- pub broker_endpoints: Vec,
+ /// The kafka connection config.
+ #[serde(flatten)]
+ pub connection: KafkaConnectionConfig,
/// TODO(weny): Remove the alias once we release v0.9.
/// The max size of a single producer batch.
#[serde(alias = "max_batch_size")]
@@ -44,7 +45,7 @@ pub struct DatanodeKafkaConfig {
impl Default for DatanodeKafkaConfig {
fn default() -> Self {
Self {
- broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
+ connection: KafkaConnectionConfig::default(),
// Warning: Kafka has a default limit of 1MB per message in a topic.
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),
diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs
index 519992e17579..f61047315cda 100644
--- a/src/common/wal/src/config/kafka/metasrv.rs
+++ b/src/common/wal/src/config/kafka/metasrv.rs
@@ -14,15 +14,16 @@
use serde::{Deserialize, Serialize};
+use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
-use crate::BROKER_ENDPOINT;
/// Kafka wal configurations for metasrv.
-#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct MetasrvKafkaConfig {
- /// The broker endpoints of the Kafka cluster.
- pub broker_endpoints: Vec,
+ /// The kafka connection config.
+ #[serde(flatten)]
+ pub connection: KafkaConnectionConfig,
/// The backoff config.
#[serde(flatten, with = "backoff_prefix")]
pub backoff: BackoffConfig,
@@ -30,14 +31,3 @@ pub struct MetasrvKafkaConfig {
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
}
-
-impl Default for MetasrvKafkaConfig {
- fn default() -> Self {
- let broker_endpoints = vec![BROKER_ENDPOINT.to_string()];
- Self {
- broker_endpoints,
- backoff: BackoffConfig::default(),
- kafka_topic: KafkaTopicConfig::default(),
- }
- }
-}
diff --git a/src/common/wal/src/error.rs b/src/common/wal/src/error.rs
index 147eeb293da1..d4427c5ba204 100644
--- a/src/common/wal/src/error.rs
+++ b/src/common/wal/src/error.rs
@@ -13,7 +13,7 @@
// limitations under the License.
use common_macro::stack_trace_debug;
-use snafu::Snafu;
+use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -24,10 +24,74 @@ pub enum Error {
broker_endpoint: String,
#[snafu(source)]
error: std::io::Error,
+ #[snafu(implicit)]
+ location: Location,
},
#[snafu(display("Failed to find ipv4 endpoint: {:?}", broker_endpoint))]
- EndpointIPV4NotFound { broker_endpoint: String },
+ EndpointIPV4NotFound {
+ broker_endpoint: String,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to read file, path: {}", path))]
+ ReadFile {
+ path: String,
+ #[snafu(source)]
+ error: std::io::Error,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to add root cert"))]
+ AddCert {
+ #[snafu(source)]
+ error: rustls::Error,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to read cert, path: {}", path))]
+ ReadCerts {
+ path: String,
+ #[snafu(source)]
+ error: std::io::Error,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to read key, path: {}", path))]
+ ReadKey {
+ path: String,
+ #[snafu(source)]
+ error: std::io::Error,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to parse key, path: {}", path))]
+ KeyNotFound {
+ path: String,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to set client auth cert"))]
+ SetClientAuthCert {
+ #[snafu(source)]
+ error: rustls::Error,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to load ca certs from system"))]
+ LoadSystemCerts {
+ #[snafu(source)]
+ error: std::io::Error,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
pub type Result = std::result::Result;
diff --git a/src/common/wal/src/lib.rs b/src/common/wal/src/lib.rs
index 086846ab3960..659a045f57eb 100644
--- a/src/common/wal/src/lib.rs
+++ b/src/common/wal/src/lib.rs
@@ -61,6 +61,9 @@ async fn resolve_to_ipv4_one>(endpoint: T) -> Result {
mod tests {
use std::assert_matches::assert_matches;
+ use common_telemetry::warn;
+ use rskafka::client::{Credentials, SaslConfig};
+
use super::*;
use crate::error::Error;
@@ -86,4 +89,44 @@ mod tests {
let got = resolve_to_ipv4_one(host).await;
assert_matches!(got.unwrap_err(), Error::ResolveEndpoint { .. });
}
+
+ #[tokio::test]
+ async fn test_sasl() {
+ common_telemetry::init_default_ut_logging();
+ let Ok(broker_endpoints) = std::env::var("GT_KAFKA_SASL_ENDPOINTS") else {
+ warn!("The endpoints is empty, skipping the test 'test_sasl'");
+ return;
+ };
+ let broker_endpoints = broker_endpoints
+ .split(',')
+ .map(|s| s.trim().to_string())
+ .collect::>();
+
+ let username = "user_kafka";
+ let password = "secret";
+ let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
+ .sasl_config(SaslConfig::Plain(Credentials::new(
+ username.to_string(),
+ password.to_string(),
+ )))
+ .build()
+ .await
+ .unwrap();
+ let _ = rskafka::client::ClientBuilder::new(broker_endpoints.clone())
+ .sasl_config(SaslConfig::ScramSha256(Credentials::new(
+ username.to_string(),
+ password.to_string(),
+ )))
+ .build()
+ .await
+ .unwrap();
+ let _ = rskafka::client::ClientBuilder::new(broker_endpoints)
+ .sasl_config(SaslConfig::ScramSha512(Credentials::new(
+ username.to_string(),
+ password.to_string(),
+ )))
+ .build()
+ .await
+ .unwrap();
+ }
}
diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs
index 4918bdf3567b..222725d06ac7 100644
--- a/src/log-store/src/error.rs
+++ b/src/log-store/src/error.rs
@@ -27,6 +27,13 @@ use crate::kafka::producer::ProduceRequest;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
+ #[snafu(display("Failed to create TLS Config"))]
+ TlsConfig {
+ #[snafu(implicit)]
+ location: Location,
+ source: common_wal::error::Error,
+ },
+
#[snafu(display("Invalid provider type, expected: {}, actual: {}", expected, actual))]
InvalidProvider {
#[snafu(implicit)]
diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs
index 089f05f008c4..64523e6d0b08 100644
--- a/src/log-store/src/kafka/client_manager.rs
+++ b/src/log-store/src/kafka/client_manager.rs
@@ -25,7 +25,7 @@ use tokio::sync::{Mutex, RwLock};
use super::producer::OrderedBatchProducer;
use crate::error::{
- BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result,
+ BuildClientSnafu, BuildPartitionClientSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::kafka::producer::OrderedBatchProducerRef;
@@ -80,16 +80,20 @@ impl ClientManager {
base: config.backoff.base as f64,
deadline: config.backoff.deadline,
};
- let broker_endpoints = common_wal::resolve_to_ipv4(&config.broker_endpoints)
+ let broker_endpoints = common_wal::resolve_to_ipv4(&config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
- let client = ClientBuilder::new(broker_endpoints)
- .backoff_config(backoff_config)
- .build()
- .await
- .with_context(|_| BuildClientSnafu {
- broker_endpoints: config.broker_endpoints.clone(),
- })?;
+ let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
+ if let Some(sasl) = &config.connection.sasl {
+ builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
+ };
+ if let Some(tls) = &config.connection.tls {
+ builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
+ };
+
+ let client = builder.build().await.with_context(|_| BuildClientSnafu {
+ broker_endpoints: config.connection.broker_endpoints.clone(),
+ })?;
Ok(Self {
client,
@@ -161,6 +165,7 @@ impl ClientManager {
#[cfg(test)]
mod tests {
+ use common_wal::config::kafka::common::KafkaConnectionConfig;
use common_wal::test_util::run_test_with_kafka_wal;
use tokio::sync::Barrier;
@@ -206,7 +211,10 @@ mod tests {
.await;
let config = DatanodeKafkaConfig {
- broker_endpoints,
+ connection: KafkaConnectionConfig {
+ broker_endpoints,
+ ..Default::default()
+ },
..Default::default()
};
let manager = ClientManager::try_new(&config).await.unwrap();
diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs
index 19518575315e..23fb19461789 100644
--- a/src/log-store/src/kafka/log_store.rs
+++ b/src/log-store/src/kafka/log_store.rs
@@ -360,6 +360,7 @@ mod tests {
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use common_telemetry::tracing::warn;
+ use common_wal::config::kafka::common::KafkaConnectionConfig;
use common_wal::config::kafka::DatanodeKafkaConfig;
use futures::TryStreamExt;
use rand::prelude::SliceRandom;
@@ -461,7 +462,10 @@ mod tests {
.map(|s| s.trim().to_string())
.collect::>();
let config = DatanodeKafkaConfig {
- broker_endpoints,
+ connection: KafkaConnectionConfig {
+ broker_endpoints,
+ ..Default::default()
+ },
max_batch_bytes: ReadableSize::kb(32),
..Default::default()
};
@@ -530,7 +534,10 @@ mod tests {
.map(|s| s.trim().to_string())
.collect::>();
let config = DatanodeKafkaConfig {
- broker_endpoints,
+ connection: KafkaConnectionConfig {
+ broker_endpoints,
+ ..Default::default()
+ },
max_batch_bytes: ReadableSize::kb(8),
..Default::default()
};
diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs
index dacdf5088227..f78b5a965d0c 100644
--- a/src/log-store/src/test_util/log_store_util.rs
+++ b/src/log-store/src/test_util/log_store_util.rs
@@ -15,6 +15,7 @@
use std::path::Path;
use common_base::readable_size::ReadableSize;
+use common_wal::config::kafka::common::KafkaConnectionConfig;
use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;
@@ -34,7 +35,10 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng
/// Create a [KafkaLogStore].
pub async fn create_kafka_log_store(broker_endpoints: Vec) -> KafkaLogStore {
KafkaLogStore::try_new(&DatanodeKafkaConfig {
- broker_endpoints,
+ connection: KafkaConnectionConfig {
+ broker_endpoints,
+ ..Default::default()
+ },
..Default::default()
})
.await
diff --git a/tests-integration/fixtures/kafka/docker-compose-standalone.yml b/tests-integration/fixtures/kafka/docker-compose-standalone.yml
index 9c257418a5d8..4208fe3f67ed 100644
--- a/tests-integration/fixtures/kafka/docker-compose-standalone.yml
+++ b/tests-integration/fixtures/kafka/docker-compose-standalone.yml
@@ -1,21 +1,28 @@
version: '3.8'
services:
+ zookeeper:
+ image: bitnami/zookeeper:3.7
+ ports:
+ - '2181:2181'
+ environment:
+ - ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:3.6.0
container_name: kafka
ports:
- 9092:9092
+ - 9093:9093
environment:
# KRaft settings
- KAFKA_KRAFT_CLUSTER_ID: Kmp-xkTnSf-WWXhWmiorDg
- KAFKA_ENABLE_KRAFT: "yes"
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: broker,controller
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:2181
# Listeners
- KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
+ KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092,SECURE://localhost:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181
+ KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SECURE:SASL_PLAINTEXT
+ KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:2181,SECURE://:9093
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_BROKER_ID: "1"
+ KAFKA_CLIENT_USERS: "user_kafka"
+ KAFKA_CLIENT_PASSWORDS: "secret"
diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs
index 491a93086953..0f93766ec7e9 100644
--- a/tests-integration/src/tests/test_util.rs
+++ b/tests-integration/src/tests/test_util.rs
@@ -21,7 +21,7 @@ use common_query::Output;
use common_recordbatch::util;
use common_telemetry::warn;
use common_test_util::find_workspace_path;
-use common_wal::config::kafka::common::KafkaTopicConfig;
+use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::{DatanodeKafkaConfig, MetasrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use frontend::instance::Instance;
@@ -227,11 +227,17 @@ pub(crate) async fn standalone_with_kafka_wal() -> Option Option