From 5653389063be730f4fd50420a5de412ddbac8222 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 2 Jan 2024 16:31:37 +0900 Subject: [PATCH] feat!: correct the kafka config option (#3065) * feat: correct the kafka config option * refactor: rewrite the verbose comments --- config/datanode.example.toml | 2 +- config/standalone.example.toml | 20 ++++++++++---------- src/common/config/src/wal.rs | 4 ++-- src/common/config/src/wal/kafka.rs | 18 ++++++++---------- src/common/meta/src/wal/kafka.rs | 2 +- src/log-store/src/kafka/log_store.rs | 2 +- 6 files changed, 23 insertions(+), 25 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bd3f8fc2eec9..8f81829eab40 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -53,7 +53,7 @@ sync_write = false # broker_endpoints = ["127.0.0.1:9092"] # max_batch_size = "4MB" # linger = "200ms" -# produce_record_timeout = "100ms" +# consumer_wait_timeout = "100ms" # backoff_init = "500ms" # backoff_max = "10s" # backoff_base = 2 diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 0fa58dd413c6..cb2c0bd61ab9 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -100,29 +100,29 @@ provider = "raft_engine" # Available selector types: # - "round_robin" (default) # selector_type = "round_robin" -# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +# The prefix of topic name. # topic_name_prefix = "greptimedb_wal_topic" # Number of partitions per topic. # num_partitions = 1 -# Expected number of replicas of each partition. +# The number of replicas of each partition. # replication_factor = 1 -# The maximum log size a kafka batch producer could buffer. +# The max size of a single producer batch. # max_batch_size = "4MB" -# The linger duration of a kafka batch producer. +# The linger duration. # linger = "200ms" -# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. -# produce_record_timeout = "100ms" -# Above which a topic creation operation will be cancelled. +# The consumer wait timeout. +# consumer_wait_timeout = "100ms" +# Create topic timeout. # create_topic_timeout = "30s" -# The initial backoff for kafka clients. +# The initial backoff delay. # backoff_init = "500ms" -# The maximum backoff for kafka clients. +# The maximum backoff delay. # backoff_max = "10s" # Exponential backoff rate, i.e. next backoff = base * current backoff. # backoff_base = 2 -# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. +# The deadline of retries. # backoff_deadline = "5mins" # WAL data directory diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index f9c492758e63..6f0a9867804c 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -94,7 +94,7 @@ mod tests { broker_endpoints = ["127.0.0.1:9092"] max_batch_size = "4MB" linger = "200ms" - produce_record_timeout = "100ms" + consumer_wait_timeout = "100ms" backoff_init = "500ms" backoff_max = "10s" backoff_base = 2 @@ -106,7 +106,7 @@ mod tests { compression: RsKafkaCompression::default(), max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), - produce_record_timeout: Duration::from_millis(100), + consumer_wait_timeout: Duration::from_millis(100), backoff: KafkaBackoffConfig { init: Duration::from_millis(500), max: Duration::from_secs(10), diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index 858991264bb6..d510e973451c 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -42,14 +42,14 @@ pub struct KafkaConfig { #[serde(skip)] #[serde(default)] pub compression: RsKafkaCompression, - /// The maximum log size a kafka batch producer could buffer. + /// The max size of a single producer batch. pub max_batch_size: ReadableSize, /// The linger duration of a kafka batch producer. #[serde(with = "humantime_serde")] pub linger: Duration, - /// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. + /// The consumer wait timeout. #[serde(with = "humantime_serde")] - pub produce_record_timeout: Duration, + pub consumer_wait_timeout: Duration, /// The backoff config. #[serde(flatten, with = "kafka_backoff")] pub backoff: KafkaBackoffConfig, @@ -62,7 +62,7 @@ impl Default for KafkaConfig { compression: RsKafkaCompression::NoCompression, max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), - produce_record_timeout: Duration::from_millis(100), + consumer_wait_timeout: Duration::from_millis(100), backoff: KafkaBackoffConfig::default(), } } @@ -73,17 +73,15 @@ with_prefix!(pub kafka_backoff "backoff_"); #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct KafkaBackoffConfig { - /// The initial backoff for kafka clients. + /// The initial backoff delay. #[serde(with = "humantime_serde")] pub init: Duration, - /// The maximum backoff for kafka clients. + /// The maximum backoff delay. #[serde(with = "humantime_serde")] pub max: Duration, /// Exponential backoff rate, i.e. next backoff = base * current backoff. - // Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait. pub base: u32, - /// Stop reconnecting if the total wait time reaches the deadline. - /// If it's None, the reconnecting won't terminate. + /// The deadline of retries. `None` stands for no deadline. #[serde(with = "humantime_serde")] pub deadline: Option, } @@ -114,7 +112,7 @@ pub struct StandaloneKafkaConfig { pub num_partitions: i32, /// The replication factor of each topic. pub replication_factor: i16, - /// Above which a topic creation operation will be cancelled. + /// The timeout of topic creation. #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, } diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 6719f2f63849..703dfa7e3de0 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -41,7 +41,7 @@ pub struct KafkaConfig { pub num_partitions: i32, /// The replication factor of each topic. pub replication_factor: i16, - /// Above which a topic creation operation will be cancelled. + /// The timeout of topic creation. #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, /// The backoff config. diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 20bcd4e7cf50..2e5543341506 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -162,7 +162,7 @@ impl LogStore for KafkaLogStore { let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset)) .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) - .with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32) + .with_max_wait_ms(self.config.consumer_wait_timeout.as_millis() as i32) .build(); debug!(