Skip to content

Commit

Permalink
feat!: correct the kafka config option (#3065)
Browse files Browse the repository at this point in the history
* feat: correct the kafka config option

* refactor: rewrite the verbose comments
  • Loading branch information
WenyXu authored Jan 2, 2024
1 parent c4d7b0d commit 5653389
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 25 deletions.
2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ sync_write = false
# broker_endpoints = ["127.0.0.1:9092"]
# max_batch_size = "4MB"
# linger = "200ms"
# produce_record_timeout = "100ms"
# consumer_wait_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2
Expand Down
20 changes: 10 additions & 10 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,29 @@ provider = "raft_engine"
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# The prefix of topic name.
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# The number of replicas of each partition.
# replication_factor = 1

# The maximum log size a kafka batch producer could buffer.
# The max size of a single producer batch.
# max_batch_size = "4MB"
# The linger duration of a kafka batch producer.
# The linger duration.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# produce_record_timeout = "100ms"
# Above which a topic creation operation will be cancelled.
# The consumer wait timeout.
# consumer_wait_timeout = "100ms"
# Create topic timeout.
# create_topic_timeout = "30s"

# The initial backoff for kafka clients.
# The initial backoff delay.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# The maximum backoff delay.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# The deadline of retries.
# backoff_deadline = "5mins"

# WAL data directory
Expand Down
4 changes: 2 additions & 2 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ mod tests {
broker_endpoints = ["127.0.0.1:9092"]
max_batch_size = "4MB"
linger = "200ms"
produce_record_timeout = "100ms"
consumer_wait_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
Expand All @@ -106,7 +106,7 @@ mod tests {
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
produce_record_timeout: Duration::from_millis(100),
consumer_wait_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
Expand Down
18 changes: 8 additions & 10 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ pub struct KafkaConfig {
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The maximum log size a kafka batch producer could buffer.
/// The max size of a single producer batch.
pub max_batch_size: ReadableSize,
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
pub linger: Duration,
/// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
/// The consumer wait timeout.
#[serde(with = "humantime_serde")]
pub produce_record_timeout: Duration,
pub consumer_wait_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "kafka_backoff")]
pub backoff: KafkaBackoffConfig,
Expand All @@ -62,7 +62,7 @@ impl Default for KafkaConfig {
compression: RsKafkaCompression::NoCompression,
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
produce_record_timeout: Duration::from_millis(100),
consumer_wait_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig::default(),
}
}
Expand All @@ -73,17 +73,15 @@ with_prefix!(pub kafka_backoff "backoff_");
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaBackoffConfig {
/// The initial backoff for kafka clients.
/// The initial backoff delay.
#[serde(with = "humantime_serde")]
pub init: Duration,
/// The maximum backoff for kafka clients.
/// The maximum backoff delay.
#[serde(with = "humantime_serde")]
pub max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
// Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait.
pub base: u32,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
/// The deadline of retries. `None` stands for no deadline.
#[serde(with = "humantime_serde")]
pub deadline: Option<Duration>,
}
Expand Down Expand Up @@ -114,7 +112,7 @@ pub struct StandaloneKafkaConfig {
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
/// Above which a topic creation operation will be cancelled.
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct KafkaConfig {
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
/// Above which a topic creation operation will be cancelled.
/// The timeout of topic creation.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// The backoff config.
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl LogStore for KafkaLogStore {

let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset))
.with_max_batch_size(self.config.max_batch_size.as_bytes() as i32)
.with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32)
.with_max_wait_ms(self.config.consumer_wait_timeout.as_millis() as i32)
.build();

debug!(
Expand Down

0 comments on commit 5653389

Please sign in to comment.