diff --git a/config/config.md b/config/config.md
index c66eb44d4ed4..6d4837ae46d1 100644
--- a/config/config.md
+++ b/config/config.md
@@ -67,9 +67,10 @@
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. |
-| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.
**It's only used when the provider is `kafka`**. |
+| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
+| `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. |
-| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
**It's only used when the provider is `kafka`**. |
+| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
**It's only used when the provider is `kafka`**. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.
**It's only used when the provider is `kafka`**. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.
**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. |
@@ -287,9 +288,10 @@
| `wal` | -- | -- | -- |
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
-| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. |
+| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
+| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default) |
-| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. |
+| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |
diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml
index 41e9306ebd78..57be17b52021 100644
--- a/config/metasrv.example.toml
+++ b/config/metasrv.example.toml
@@ -99,7 +99,12 @@ provider = "raft_engine"
## The broker endpoints of the Kafka cluster.
broker_endpoints = ["127.0.0.1:9092"]
-## Number of topics to be created upon start.
+## Automatically create topics for WAL.
+## Set to `true` to automatically create topics for WAL.
+## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
+auto_create_topics = true
+
+## Number of topics.
num_topics = 64
## Topic selector type.
@@ -108,6 +113,7 @@ num_topics = 64
selector_type = "round_robin"
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
+## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
topic_name_prefix = "greptimedb_wal_topic"
## Expected number of replicas of each partition.
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 9258397bb82d..7a9a09dc5ab4 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -171,7 +171,12 @@ sync_period = "10s"
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
-## Number of topics to be created upon start.
+## Automatically create topics for WAL.
+## Set to `true` to automatically create topics for WAL.
+## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
+auto_create_topics = true
+
+## Number of topics.
## **It's only used when the provider is `kafka`**.
num_topics = 64
@@ -182,6 +187,7 @@ num_topics = 64
selector_type = "round_robin"
## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
+## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
## **It's only used when the provider is `kafka`**.
topic_name_prefix = "greptimedb_wal_topic"
diff --git a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs
index 060f82d8d71e..3f1ffb4c45c5 100644
--- a/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs
+++ b/src/common/meta/src/wal_options_allocator/kafka/topic_manager.rs
@@ -76,6 +76,10 @@ impl TopicManager {
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
+ // Skip creating topics.
+ if !self.config.auto_create_topics {
+ return Ok(());
+ }
let num_topics = self.config.kafka_topic.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });
diff --git a/src/common/wal/src/config.rs b/src/common/wal/src/config.rs
index 90f3e44f9c4a..052311b5af4d 100644
--- a/src/common/wal/src/config.rs
+++ b/src/common/wal/src/config.rs
@@ -53,6 +53,7 @@ impl From for MetasrvWalConfig {
connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
+ auto_create_topics: config.auto_create_topics,
}),
}
}
@@ -188,6 +189,7 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
+ auto_create_topics: true,
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));
diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs
index f68ddfa5d8b2..d12c651f23ba 100644
--- a/src/common/wal/src/config/kafka/common.rs
+++ b/src/common/wal/src/config/kafka/common.rs
@@ -187,7 +187,7 @@ impl Default for KafkaConnectionConfig {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaTopicConfig {
- /// Number of topics to be created upon start.
+ /// Number of topics.
pub num_topics: usize,
/// Number of partitions per topic.
pub num_partitions: i32,
diff --git a/src/common/wal/src/config/kafka/datanode.rs b/src/common/wal/src/config/kafka/datanode.rs
index 84e9da6bccfa..27f693204014 100644
--- a/src/common/wal/src/config/kafka/datanode.rs
+++ b/src/common/wal/src/config/kafka/datanode.rs
@@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig {
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
+ // Automatically create topics for WAL.
+ pub auto_create_topics: bool,
+ // Create index for WAL.
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
@@ -54,6 +57,7 @@ impl Default for DatanodeKafkaConfig {
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
+ auto_create_topics: true,
create_index: true,
dump_index_interval: Duration::from_secs(60),
}
diff --git a/src/common/wal/src/config/kafka/metasrv.rs b/src/common/wal/src/config/kafka/metasrv.rs
index f61047315cda..abde6b74448b 100644
--- a/src/common/wal/src/config/kafka/metasrv.rs
+++ b/src/common/wal/src/config/kafka/metasrv.rs
@@ -18,7 +18,7 @@ use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};
/// Kafka wal configurations for metasrv.
-#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvKafkaConfig {
/// The kafka connection config.
@@ -30,4 +30,17 @@ pub struct MetasrvKafkaConfig {
/// The kafka config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
+ // Automatically create topics for WAL.
+ pub auto_create_topics: bool,
+}
+
+impl Default for MetasrvKafkaConfig {
+ fn default() -> Self {
+ Self {
+ connection: Default::default(),
+ backoff: Default::default(),
+ kafka_topic: Default::default(),
+ auto_create_topics: true,
+ }
+ }
}