Skip to content

Commit

Permalink
feat: add SASL and TLS config for Kafka client (#4536)
Browse files Browse the repository at this point in the history
* feat: add SASL and TLS config

* feat: add SASL/PLAIN and TLS config for Kafka client

* chore: use `ring`

* feat: support SASL SCRAM-SHA-256 and SCRAM-SHA-512

* fix: correct unit test

* test: add integration test

* chore: apply suggestions from CR

* refactor: introduce `KafkaConnectionConfig`

* chore: refine toml examples

* docs: add missing fields

* chore: refine examples

* feat: allow no server ca cert

* chore: refine examples

* chore: fix clippy

* feat: load system ca certs

* chore: fmt toml

* chore: unpin version

* Update src/common/wal/src/error.rs

Co-authored-by: Lei, HUANG <[email protected]>

---------

Co-authored-by: Lei, HUANG <[email protected]>
  • Loading branch information
WenyXu and v0y4g3r authored Aug 12, 2024
1 parent 9bcaeaa commit 2e2eacf
Show file tree
Hide file tree
Showing 24 changed files with 588 additions and 92 deletions.
1 change: 1 addition & 0 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ jobs:
GT_MINIO_ENDPOINT_URL: http://127.0.0.1:9000
GT_ETCD_ENDPOINTS: http://127.0.0.1:2379
GT_KAFKA_ENDPOINTS: 127.0.0.1:9092
GT_KAFKA_SASL_ENDPOINTS: 127.0.0.1:9093
UNITTEST_LOG_DIR: "__unittest_logs"
- name: Codecov upload
uses: codecov/codecov-action@v4
Expand Down
60 changes: 46 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = "0.5"
# SCRAM-SHA-512 requires https://github.com/dequbed/rsasl/pull/48, https://github.com/influxdata/rskafka/pull/247
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "940c6030012c5b746fad819fb72e3325b26e39de", features = [
"transport-tls",
] }
rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
Expand Down
5 changes: 5 additions & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>**It's only used when the provider is `kafka`**. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.backoff_init` | String | `500ms` | The initial backoff delay.<br/>**It's only used when the provider is `kafka`**. |
Expand Down
18 changes: 18 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,24 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
# - `PLAIN`
# - `SCRAM-SHA-256`
# - `SCRAM-SHA-512`
# [wal.sasl]
# type = "SCRAM-SHA-512"
# username = "user_kafka"
# password = "secret"

# The Kafka TLS configuration.
# **It's only used when the provider is `kafka`**.
# [wal.tls]
# server_ca_cert_path = "/path/to/server_cert"
# client_cert_path = "/path/to/client_cert"
# client_key_path = "/path/to/key"

# Example of using S3 as the storage.
# [storage]
# type = "S3"
Expand Down
18 changes: 18 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,24 @@ backoff_base = 2
## Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
backoff_deadline = "5mins"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
# - `PLAIN`
# - `SCRAM-SHA-256`
# - `SCRAM-SHA-512`
# [wal.sasl]
# type = "SCRAM-SHA-512"
# username = "user_kafka"
# password = "secret"

# The Kafka TLS configuration.
# **It's only used when the provider is `kafka`**.
# [wal.tls]
# server_ca_cert_path = "/path/to/server_cert"
# client_cert_path = "/path/to/client_cert"
# client_key_path = "/path/to/key"

## The logging options.
[logging]
## The directory to store the log files.
Expand Down
40 changes: 40 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,28 @@ sync_period = "10s"
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]

## Number of topics to be created upon start.
## **It's only used when the provider is `kafka`**.
num_topics = 64

## Topic selector type.
## Available selector types:
## - `round_robin` (default)
## **It's only used when the provider is `kafka`**.
selector_type = "round_robin"

## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## **It's only used when the provider is `kafka`**.
topic_name_prefix = "greptimedb_wal_topic"

## Expected number of replicas of each partition.
## **It's only used when the provider is `kafka`**.
replication_factor = 1

## Above which a topic creation operation will be cancelled.
## **It's only used when the provider is `kafka`**.
create_topic_timeout = "30s"

## The max size of a single producer batch.
## Warning: Kafka has a default limit of 1MB per message in a topic.
## **It's only used when the provider is `kafka`**.
Expand All @@ -196,6 +218,24 @@ backoff_base = 2
## **It's only used when the provider is `kafka`**.
backoff_deadline = "5mins"

# The Kafka SASL configuration.
# **It's only used when the provider is `kafka`**.
# Available SASL mechanisms:
# - `PLAIN`
# - `SCRAM-SHA-256`
# - `SCRAM-SHA-512`
# [wal.sasl]
# type = "SCRAM-SHA-512"
# username = "user_kafka"
# password = "secret"

# The Kafka TLS configuration.
# **It's only used when the provider is `kafka`**.
# [wal.tls]
# server_ca_cert_path = "/path/to/server_cert"
# client_cert_path = "/path/to/client_cert"
# client_key_path = "/path/to/key"

## Metadata storage options.
[metadata_store]
## Kv file size in bytes.
Expand Down
10 changes: 9 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,13 @@ pub enum Error {
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to create TLS Config"))]
TlsConfig {
#[snafu(implicit)]
location: Location,
source: common_wal::error::Error,
},

#[snafu(display("Failed to resolve Kafka broker endpoint."))]
ResolveKafkaEndpoint { source: common_wal::error::Error },

Expand Down Expand Up @@ -714,7 +721,8 @@ impl ErrorExt for Error {
| AlterLogicalTablesInvalidArguments { .. }
| CreateLogicalTablesInvalidArguments { .. }
| MismatchPrefix { .. }
| DelimiterNotFound { .. } => StatusCode::InvalidArguments,
| DelimiterNotFound { .. }
| TlsConfig { .. } => StatusCode::InvalidArguments,

FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,
Expand Down
7 changes: 5 additions & 2 deletions src/common/meta/src/wal_options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn prepare_wal_options(

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::config::kafka::MetasrvKafkaConfig;
use common_wal::test_util::run_test_with_kafka_wal;

Expand Down Expand Up @@ -166,7 +166,10 @@ mod tests {
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
Expand Down
28 changes: 19 additions & 9 deletions src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt};
use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, BuildKafkaPartitionClientSnafu,
CreateKafkaWalTopicSnafu, DecodeJsonSnafu, EncodeJsonSnafu, InvalidNumTopicsSnafu,
ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result,
ProduceRecordSnafu, ResolveKafkaEndpointSnafu, Result, TlsConfigSnafu,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
Expand Down Expand Up @@ -117,15 +117,22 @@ impl TopicManager {
base: self.config.backoff.base as f64,
deadline: self.config.backoff.deadline,
};
let broker_endpoints = common_wal::resolve_to_ipv4(&self.config.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let client = ClientBuilder::new(broker_endpoints)
.backoff_config(backoff_config)
let broker_endpoints =
common_wal::resolve_to_ipv4(&self.config.connection.broker_endpoints)
.await
.context(ResolveKafkaEndpointSnafu)?;
let mut builder = ClientBuilder::new(broker_endpoints).backoff_config(backoff_config);
if let Some(sasl) = &self.config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};
if let Some(tls) = &self.config.connection.tls {
builder = builder.tls_config(tls.to_tls_config().await.context(TlsConfigSnafu)?)
};
let client = builder
.build()
.await
.with_context(|_| BuildKafkaClientSnafu {
broker_endpoints: self.config.broker_endpoints.clone(),
broker_endpoints: self.config.connection.broker_endpoints.clone(),
})?;

let control_client = client
Expand Down Expand Up @@ -242,7 +249,7 @@ impl TopicManager {

#[cfg(test)]
mod tests {
use common_wal::config::kafka::common::KafkaTopicConfig;
use common_wal::config::kafka::common::{KafkaConnectionConfig, KafkaTopicConfig};
use common_wal::test_util::run_test_with_kafka_wal;

use super::*;
Expand Down Expand Up @@ -289,7 +296,10 @@ mod tests {
..Default::default()
};
let config = MetasrvKafkaConfig {
broker_endpoints,
connection: KafkaConnectionConfig {
broker_endpoints,
..Default::default()
},
kafka_topic,
..Default::default()
};
Expand Down
3 changes: 3 additions & 0 deletions src/common/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
rskafka.workspace = true
rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] }
rustls-native-certs = "0.7"
rustls-pemfile = "2.1"
serde.workspace = true
serde_with.workspace = true
snafu.workspace = true
Expand Down
Loading

0 comments on commit 2e2eacf

Please sign in to comment.