From 3297d5f6578940701294ed2fcce9e72c00c74dca Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 26 Aug 2024 16:34:27 +0800 Subject: [PATCH] feat: allow skipping topic creation (#4616) * feat: introduce `create_topics` opt * feat: allow skipping topic creation * chore: refine docs * chore: apply suggestions from CR --- config/config.md | 10 ++++++---- config/metasrv.example.toml | 8 +++++++- config/standalone.example.toml | 8 +++++++- .../wal_options_allocator/kafka/topic_manager.rs | 4 ++++ src/common/wal/src/config.rs | 2 ++ src/common/wal/src/config/kafka/common.rs | 2 +- src/common/wal/src/config/kafka/datanode.rs | 4 ++++ src/common/wal/src/config/kafka/metasrv.rs | 15 ++++++++++++++- 8 files changed, 45 insertions(+), 8 deletions(-) diff --git a/config/config.md b/config/config.md index c66eb44d4ed4..6d4837ae46d1 100644 --- a/config/config.md +++ b/config/config.md @@ -67,9 +67,10 @@ | `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | -| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.
**It's only used when the provider is `kafka`**. | +| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | +| `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. | -| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
**It's only used when the provider is `kafka`**. | +| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
**It's only used when the provider is `kafka`**. | | `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.
**It's only used when the provider is `kafka`**. | | `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.
**It's only used when the provider is `kafka`**. | | `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | @@ -287,9 +288,10 @@ | `wal` | -- | -- | -- | | `wal.provider` | String | `raft_engine` | -- | | `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. | -| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. | +| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | +| `wal.num_topics` | Integer | `64` | Number of topics. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) | -| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. | +| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. | | `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. | | `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. | | `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. | diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 41e9306ebd78..57be17b52021 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -99,7 +99,12 @@ provider = "raft_engine" ## The broker endpoints of the Kafka cluster. broker_endpoints = ["127.0.0.1:9092"] -## Number of topics to be created upon start. +## Automatically create topics for WAL. +## Set to `true` to automatically create topics for WAL. +## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` +auto_create_topics = true + +## Number of topics. num_topics = 64 ## Topic selector type. @@ -108,6 +113,7 @@ num_topics = 64 selector_type = "round_robin" ## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. topic_name_prefix = "greptimedb_wal_topic" ## Expected number of replicas of each partition. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 9258397bb82d..7a9a09dc5ab4 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -171,7 +171,12 @@ sync_period = "10s" ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] -## Number of topics to be created upon start. +## Automatically create topics for WAL. +## Set to `true` to automatically create topics for WAL. +## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` +auto_create_topics = true + +## Number of topics. ## **It's only used when the provider is `kafka`**. num_topics = 64 @@ -182,6 +187,7 @@ num_topics = 64 selector_type = "round_robin" ## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. ## **It's only used when the provider is `kafka`**. topic_name_prefix = "greptimedb_wal_topic" diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs index 060f82d8d71e..3f1ffb4c45c5 100644 --- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs +++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs @@ -76,6 +76,10 @@ impl TopicManager { /// The initializer first tries to restore persisted topics from the kv backend. /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics. pub async fn start(&self) -> Result<()> { + // Skip creating topics. + if !self.config.auto_create_topics { + return Ok(()); + } let num_topics = self.config.kafka_topic.num_topics; ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics }); diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs index 90f3e44f9c4a..052311b5af4d 100644 --- a/src/common/wal/src/config.rs +++ b/src/common/wal/src/config.rs @@ -53,6 +53,7 @@ impl From for MetasrvWalConfig { connection: config.connection, backoff: config.backoff, kafka_topic: config.kafka_topic, + auto_create_topics: config.auto_create_topics, }), } } @@ -188,6 +189,7 @@ mod tests { replication_factor: 1, create_topic_timeout: Duration::from_secs(30), }, + auto_create_topics: true, }; assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected)); diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index f68ddfa5d8b2..d12c651f23ba 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -187,7 +187,7 @@ impl Default for KafkaConnectionConfig { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct KafkaTopicConfig { - /// Number of topics to be created upon start. + /// Number of topics. pub num_topics: usize, /// Number of partitions per topic. pub num_partitions: i32, diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs index 84e9da6bccfa..27f693204014 100644 --- a/src/common/wal/src/config/kafka/datanode.rs +++ b/src/common/wal/src/config/kafka/datanode.rs @@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig { /// The kafka topic config. #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, + // Automatically create topics for WAL. + pub auto_create_topics: bool, + // Create index for WAL. pub create_index: bool, #[serde(with = "humantime_serde")] pub dump_index_interval: Duration, @@ -54,6 +57,7 @@ impl Default for DatanodeKafkaConfig { consumer_wait_timeout: Duration::from_millis(100), backoff: BackoffConfig::default(), kafka_topic: KafkaTopicConfig::default(), + auto_create_topics: true, create_index: true, dump_index_interval: Duration::from_secs(60), } diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs index f61047315cda..abde6b74448b 100644 --- a/src/common/wal/src/config/kafka/metasrv.rs +++ b/src/common/wal/src/config/kafka/metasrv.rs @@ -18,7 +18,7 @@ use super::common::KafkaConnectionConfig; use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig}; /// Kafka wal configurations for metasrv. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvKafkaConfig { /// The kafka connection config. @@ -30,4 +30,17 @@ pub struct MetasrvKafkaConfig { /// The kafka config. #[serde(flatten)] pub kafka_topic: KafkaTopicConfig, + // Automatically create topics for WAL. + pub auto_create_topics: bool, +} + +impl Default for MetasrvKafkaConfig { + fn default() -> Self { + Self { + connection: Default::default(), + backoff: Default::default(), + kafka_topic: Default::default(), + auto_create_topics: true, + } + } }