Skip to content

Commit

Permalink
feat: allow skipping topic creation (GreptimeTeam#4616)
Browse files Browse the repository at this point in the history
* feat: introduce `create_topics` opt

* feat: allow skipping topic creation

* chore: refine docs

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored and CookiePieWw committed Sep 17, 2024
1 parent cce44ec commit 1561ffa
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 8 deletions.
10 changes: 6 additions & 4 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start.<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>**It's only used when the provider is `kafka`**. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.<br/>**It's only used when the provider is `kafka`**. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
Expand Down Expand Up @@ -287,9 +288,10 @@
| `wal` | -- | -- | -- |
| `wal.provider` | String | `raft_engine` | -- |
| `wal.broker_endpoints` | Array | -- | The broker endpoints of the Kafka cluster. |
| `wal.num_topics` | Integer | `64` | Number of topics to be created upon start. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default) |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. |
| `wal.topic_name_prefix` | String | `greptimedb_wal_topic` | A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.<br/>i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1. |
| `wal.replication_factor` | Integer | `1` | Expected number of replicas of each partition. |
| `wal.create_topic_timeout` | String | `30s` | Above which a topic creation operation will be cancelled. |
| `wal.backoff_init` | String | `500ms` | The initial backoff for kafka clients. |
Expand Down
8 changes: 7 additions & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ provider = "raft_engine"
## The broker endpoints of the Kafka cluster.
broker_endpoints = ["127.0.0.1:9092"]

## Number of topics to be created upon start.
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
auto_create_topics = true

## Number of topics.
num_topics = 64

## Topic selector type.
Expand All @@ -108,6 +113,7 @@ num_topics = 64
selector_type = "round_robin"

## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
topic_name_prefix = "greptimedb_wal_topic"

## Expected number of replicas of each partition.
Expand Down
8 changes: 7 additions & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ sync_period = "10s"
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]

## Number of topics to be created upon start.
## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`
auto_create_topics = true

## Number of topics.
## **It's only used when the provider is `kafka`**.
num_topics = 64

Expand All @@ -182,6 +187,7 @@ num_topics = 64
selector_type = "round_robin"

## A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
## i.g., greptimedb_wal_topic_0, greptimedb_wal_topic_1.
## **It's only used when the provider is `kafka`**.
topic_name_prefix = "greptimedb_wal_topic"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ impl TopicManager {
/// The initializer first tries to restore persisted topics from the kv backend.
/// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request creating more topics.
pub async fn start(&self) -> Result<()> {
// Skip creating topics.
if !self.config.auto_create_topics {
return Ok(());
}
let num_topics = self.config.kafka_topic.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });

Expand Down
2 changes: 2 additions & 0 deletions src/common/wal/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl From<DatanodeWalConfig> for MetasrvWalConfig {
connection: config.connection,
backoff: config.backoff,
kafka_topic: config.kafka_topic,
auto_create_topics: config.auto_create_topics,
}),
}
}
Expand Down Expand Up @@ -188,6 +189,7 @@ mod tests {
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
},
auto_create_topics: true,
};
assert_eq!(metasrv_wal_config, MetasrvWalConfig::Kafka(expected));

Expand Down
2 changes: 1 addition & 1 deletion src/common/wal/src/config/kafka/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ impl Default for KafkaConnectionConfig {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaTopicConfig {
/// Number of topics to be created upon start.
/// Number of topics.
pub num_topics: usize,
/// Number of partitions per topic.
pub num_partitions: i32,
Expand Down
4 changes: 4 additions & 0 deletions src/common/wal/src/config/kafka/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub struct DatanodeKafkaConfig {
/// The kafka topic config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
// Automatically create topics for WAL.
pub auto_create_topics: bool,
// Create index for WAL.
pub create_index: bool,
#[serde(with = "humantime_serde")]
pub dump_index_interval: Duration,
Expand All @@ -54,6 +57,7 @@ impl Default for DatanodeKafkaConfig {
consumer_wait_timeout: Duration::from_millis(100),
backoff: BackoffConfig::default(),
kafka_topic: KafkaTopicConfig::default(),
auto_create_topics: true,
create_index: true,
dump_index_interval: Duration::from_secs(60),
}
Expand Down
15 changes: 14 additions & 1 deletion src/common/wal/src/config/kafka/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::common::KafkaConnectionConfig;
use crate::config::kafka::common::{backoff_prefix, BackoffConfig, KafkaTopicConfig};

/// Kafka wal configurations for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvKafkaConfig {
/// The kafka connection config.
Expand All @@ -30,4 +30,17 @@ pub struct MetasrvKafkaConfig {
/// The kafka config.
#[serde(flatten)]
pub kafka_topic: KafkaTopicConfig,
// Automatically create topics for WAL.
pub auto_create_topics: bool,
}

impl Default for MetasrvKafkaConfig {
fn default() -> Self {
Self {
connection: Default::default(),
backoff: Default::default(),
kafka_topic: Default::default(),
auto_create_topics: true,
}
}
}

0 comments on commit 1561ffa

Please sign in to comment.