Skip to content

Commit

Permalink
refactor: refactor standalone wal config
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 26, 2023
1 parent d2cf0b6 commit 5ffe15f
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 117 deletions.
2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ sync_write = false
# broker_endpoints = ["127.0.0.1:9090"]
# max_batch_size = "4MB"
# linger = "200ms"
# max_wait_time = "100ms"
# produce_record_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
Expand Down
30 changes: 7 additions & 23 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,18 @@ enable = true
# Whether to enable Prometheus remote write and read in HTTP API, true by default.
enable = true

[wal_meta]
[wal]
# Available wal providers:
# - "raft_engine" (default)
# - "kafka"
provider = "raft_engine"

# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal config.
# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]

# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
Expand All @@ -103,33 +104,16 @@ provider = "raft_engine"
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

# WAL options for datanode.
[wal_datanode]
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
provider = "raft_engine"

# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# The maximum log size a kafka batch producer could buffer.
# max_batch_size = "4MB"
# The linger duration of a kafka batch producer.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# max_wait_time = "100ms"
# produce_record_timeout = "100ms"
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"

# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
Expand Down
28 changes: 11 additions & 17 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::{fs, path};
use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig as DatanodeWalConfig};
use common_config::wal::StandaloneWalConfig;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
Expand All @@ -27,9 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::{
WalConfig as MetaSrvWalConfig, WalOptionsAllocator, WalOptionsAllocatorRef,
};
use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
Expand Down Expand Up @@ -106,8 +105,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal_meta: MetaSrvWalConfig,
pub wal_datanode: DatanodeWalConfig,
pub wal: StandaloneWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -130,8 +128,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal_meta: MetaSrvWalConfig::default(),
wal_datanode: DatanodeWalConfig::default(),
wal: StandaloneWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -170,7 +167,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: self.enable_telemetry,
wal: self.wal_datanode,
wal: self.wal.into(),
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
Expand Down Expand Up @@ -342,7 +339,7 @@ impl StartCommand {
let procedure = opts.procedure.clone();
let frontend = opts.clone().frontend_options();
let logging = opts.logging.clone();
let wal_meta = opts.wal_meta.clone();
let wal_meta = opts.wal.clone().into();
let datanode = opts.datanode_options().clone();

Ok(Options::Standalone(Box::new(MixOptions {
Expand Down Expand Up @@ -484,6 +481,7 @@ mod tests {

use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_config::WalConfig;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;
Expand Down Expand Up @@ -526,10 +524,7 @@ mod tests {
enable_memory_catalog = true
[wal_meta]
provider = "raft_engine"
[wal_datanode]
[wal]
provider = "raft_engine"
dir = "/tmp/greptimedb/test/wal"
file_size = "1GB"
Expand Down Expand Up @@ -594,7 +589,7 @@ mod tests {
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);

let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());
Expand Down Expand Up @@ -740,8 +735,7 @@ mod tests {
assert_eq!(options.opentsdb, default_options.opentsdb);
assert_eq!(options.influxdb, default_options.influxdb);
assert_eq!(options.prom_store, default_options.prom_store);
assert_eq!(options.wal_meta, default_options.wal_meta);
assert_eq!(options.wal_datanode, default_options.wal_datanode);
assert_eq!(options.wal, default_options.wal);
assert_eq!(options.metadata_store, default_options.metadata_store);
assert_eq!(options.procedure, default_options.procedure);
assert_eq!(options.logging, default_options.logging);
Expand Down
47 changes: 35 additions & 12 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod raft_engine;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

use self::kafka::StandaloneKafkaConfig;
pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
pub use crate::wal::raft_engine::RaftEngineConfig;

Expand All @@ -27,30 +28,49 @@ pub const WAL_OPTIONS_KEY: &str = "wal_options";

/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider")]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum WalConfig {
#[serde(rename = "raft_engine")]
RaftEngine(RaftEngineConfig),
#[serde(rename = "kafka")]
Kafka(KafkaConfig),
}

impl From<StandaloneWalConfig> for WalConfig {
fn from(value: StandaloneWalConfig) -> Self {
match value {
StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(config.base),
}
}
}

impl Default for WalConfig {
fn default() -> Self {
WalConfig::RaftEngine(RaftEngineConfig::default())
}
}

/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}

impl Default for StandaloneWalConfig {
fn default() -> Self {
StandaloneWalConfig::RaftEngine(RaftEngineConfig::default())
}
}

/// Wal options allocated to a region.
/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded
/// by datanode with `serde_json::from_str`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(tag = "wal.provider")]
#[serde(tag = "wal.provider", rename_all = "snake_case")]
pub enum WalOptions {
#[default]
#[serde(rename = "raft_engine")]
RaftEngine,
#[serde(rename = "kafka")]
#[serde(with = "prefix_wal_kafka")]
Kafka(KafkaWalOptions),
}
Expand All @@ -64,6 +84,7 @@ mod tests {
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression as RsKafkaCompression;

use crate::wal::kafka::KafkaBackoffConfig;
use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions};

#[test]
Expand All @@ -72,7 +93,7 @@ mod tests {
broker_endpoints = ["127.0.0.1:9090"]
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
produce_record_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
Expand All @@ -84,11 +105,13 @@ mod tests {
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
produce_record_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(decoded, expected);
}
Expand Down
91 changes: 76 additions & 15 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression as RsKafkaCompression;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

/// Topic name prefix.
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
/// Kafka wal topic.
pub type Topic = String;

/// The type of the topic selector, i.e. with which strategy to select a topic.
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum TopicSelectorType {
#[default]
RoundRobin,
}

/// Configurations for kafka wal.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
Expand All @@ -40,34 +49,86 @@ pub struct KafkaConfig {
pub linger: Duration,
/// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
#[serde(with = "humantime_serde")]
pub max_wait_time: Duration,
pub produce_record_timeout: Duration,
/// The backoff config.
#[serde(flatten, with = "kafka_backoff")]
pub backoff: KafkaBackoffConfig,
}

impl Default for KafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
compression: RsKafkaCompression::NoCompression,
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
produce_record_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig::default(),
}
}
}

with_prefix!(pub kafka_backoff "backoff_");

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaBackoffConfig {
/// The initial backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_init: Duration,
pub init: Duration,
/// The maximum backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
pub max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
// Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait.
pub backoff_base: u32,
pub base: u32,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
#[serde(with = "humantime_serde")]
pub backoff_deadline: Option<Duration>,
pub deadline: Option<Duration>,
}

impl Default for KafkaConfig {
impl Default for KafkaBackoffConfig {
fn default() -> Self {
Self {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
compression: RsKafkaCompression::NoCompression,
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct StandaloneKafkaConfig {
#[serde(flatten)]
pub base: KafkaConfig,
/// Number of topics to be created upon start.
pub num_topics: usize,
/// The type of the topic selector with which to select a topic for a region.
pub selector_type: TopicSelectorType,
/// Topic name prefix.
pub topic_name_prefix: String,
/// Number of partitions per topic.
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
/// Above which a topic creation operation will be cancelled.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
}

impl Default for StandaloneKafkaConfig {
fn default() -> Self {
Self {
base: KafkaConfig::default(),
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
}
}
}
Expand Down
Loading

0 comments on commit 5ffe15f

Please sign in to comment.