From 398a86b41eca706a04554175f1f42a0c731af03d Mon Sep 17 00:00:00 2001 From: Harsh1s Date: Thu, 7 Mar 2024 04:09:57 +0530 Subject: [PATCH] refactor: split config into multiple modules Signed-off-by: Harsh1s --- crates/benchmark/src/runner.rs | 2 +- crates/curp-test-utils/src/test_cmd.rs | 4 +- crates/curp/src/client/mod.rs | 2 +- crates/curp/src/server/cmd_worker/mod.rs | 2 +- crates/curp/src/server/curp_node.rs | 5 +- crates/curp/src/server/mod.rs | 2 +- crates/curp/src/server/raw_curp/log.rs | 2 +- crates/curp/src/server/raw_curp/mod.rs | 2 +- crates/curp/src/server/raw_curp/tests.rs | 2 +- crates/curp/src/server/storage/db.rs | 2 +- crates/curp/tests/it/common/curp_group.rs | 3 +- crates/curp/tests/it/server.rs | 2 +- crates/utils/src/config.rs | 1500 ------------------ crates/utils/src/config/auth_config.rs | 28 + crates/utils/src/config/client_config.rs | 157 ++ crates/utils/src/config/cluster_config.rs | 139 ++ crates/utils/src/config/compact_config.rs | 99 ++ crates/utils/src/config/curp_config.rs | 223 +++ crates/utils/src/config/engine_config.rs | 26 + crates/utils/src/config/log_config.rs | 150 ++ crates/utils/src/config/metrics_config.rs | 151 ++ crates/utils/src/config/mod.rs | 445 ++++++ crates/utils/src/config/server_config.rs | 106 ++ crates/utils/src/config/storage_config.rs | 43 + crates/utils/src/config/tls_config.rs | 52 + crates/utils/src/config/trace_config.rs | 56 + crates/utils/src/parser.rs | 6 +- crates/xline-client/src/lib.rs | 2 +- crates/xline-test-utils/src/lib.rs | 6 +- crates/xline/src/server/maintenance.rs | 2 +- crates/xline/src/server/watch_server.rs | 4 +- crates/xline/src/server/xline_server.rs | 5 +- crates/xline/src/storage/auth_store/store.rs | 2 +- crates/xline/src/storage/compact/mod.rs | 2 +- crates/xline/src/storage/db.rs | 2 +- crates/xline/src/storage/kv_store.rs | 2 +- crates/xline/src/storage/kvwatcher.rs | 2 +- crates/xline/src/storage/lease_store/mod.rs | 2 +- crates/xline/src/utils/args.rs | 35 +- crates/xline/src/utils/metrics.rs | 2 +- crates/xline/src/utils/trace.rs | 2 +- crates/xline/tests/it/auth_test.rs | 5 +- crates/xlinectl/src/main.rs | 2 +- 43 files changed, 1739 insertions(+), 1549 deletions(-) delete mode 100644 crates/utils/src/config.rs create mode 100644 crates/utils/src/config/auth_config.rs create mode 100644 crates/utils/src/config/client_config.rs create mode 100644 crates/utils/src/config/cluster_config.rs create mode 100644 crates/utils/src/config/compact_config.rs create mode 100644 crates/utils/src/config/curp_config.rs create mode 100644 crates/utils/src/config/engine_config.rs create mode 100644 crates/utils/src/config/log_config.rs create mode 100644 crates/utils/src/config/metrics_config.rs create mode 100644 crates/utils/src/config/mod.rs create mode 100644 crates/utils/src/config/server_config.rs create mode 100644 crates/utils/src/config/storage_config.rs create mode 100644 crates/utils/src/config/tls_config.rs create mode 100644 crates/utils/src/config/trace_config.rs diff --git a/crates/benchmark/src/runner.rs b/crates/benchmark/src/runner.rs index 9fa5ad40e..60054a93b 100644 --- a/crates/benchmark/src/runner.rs +++ b/crates/benchmark/src/runner.rs @@ -19,7 +19,7 @@ use tokio::{ time::{Duration, Instant}, }; use tracing::debug; -use utils::config::ClientConfig; +use utils::config::client_config::ClientConfig; use xline_client::{types::kv::PutRequest, ClientOptions}; use crate::{args::Commands, bench_client::BenchClient, Benchmark}; diff --git a/crates/curp-test-utils/src/test_cmd.rs b/crates/curp-test-utils/src/test_cmd.rs index 79bfdd502..948c3ac11 100644 --- a/crates/curp-test-utils/src/test_cmd.rs +++ b/crates/curp-test-utils/src/test_cmd.rs @@ -18,7 +18,7 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::{sync::mpsc, time::sleep}; use tracing::debug; -use utils::config::EngineConfig; +use utils::config::engine_config::EngineConfig; use crate::{META_TABLE, REVISION_TABLE, TEST_TABLE}; @@ -411,7 +411,7 @@ impl TestCE { after_sync_sender: mpsc::UnboundedSender<(TestCommand, LogIndex)>, engine_cfg: EngineConfig, ) -> Self { - let engine_type = match engine_cfg { + let engine_type: EngineType = match engine_cfg { EngineConfig::Memory => EngineType::Memory, EngineConfig::RocksDB(path) => EngineType::Rocks(path), _ => unreachable!("Not supported storage type"), diff --git a/crates/curp/src/client/mod.rs b/crates/curp/src/client/mod.rs index 34c02c1c1..e62a976df 100644 --- a/crates/curp/src/client/mod.rs +++ b/crates/curp/src/client/mod.rs @@ -26,9 +26,9 @@ use futures::{stream::FuturesUnordered, StreamExt}; #[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; use tracing::debug; +use utils::{build_endpoint, config::client_config::ClientConfig}; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{build_endpoint, config::ClientConfig}; use self::{ retry::{Retry, RetryConfig}, diff --git a/crates/curp/src/server/cmd_worker/mod.rs b/crates/curp/src/server/cmd_worker/mod.rs index 8ede8acd7..ae3cab48c 100644 --- a/crates/curp/src/server/cmd_worker/mod.rs +++ b/crates/curp/src/server/cmd_worker/mod.rs @@ -415,7 +415,7 @@ mod tests { use test_macros::abort_on_panic; use tokio::{sync::mpsc, time::Instant}; use tracing_test::traced_test; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use super::*; use crate::{log_entry::LogEntry, rpc::ProposeId}; diff --git a/crates/curp/src/server/curp_node.rs b/crates/curp/src/server/curp_node.rs index 61da4da19..6252d8d12 100644 --- a/crates/curp/src/server/curp_node.rs +++ b/crates/curp/src/server/curp_node.rs @@ -18,12 +18,9 @@ use tokio::{ #[cfg(not(madsim))] use tonic::transport::ClientTlsConfig; use tracing::{debug, error, info, trace, warn}; +use utils::{config::curp_config::CurpConfig, task_manager::{tasks::TaskName, Listener, State, TaskManager}}; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{ - config::CurpConfig, - task_manager::{tasks::TaskName, Listener, State, TaskManager}, -}; use super::{ cmd_board::{CmdBoardRef, CommandBoard}, diff --git a/crates/curp/src/server/mod.rs b/crates/curp/src/server/mod.rs index 66470ceec..c520a0078 100644 --- a/crates/curp/src/server/mod.rs +++ b/crates/curp/src/server/mod.rs @@ -7,7 +7,7 @@ use tonic::transport::ClientTlsConfig; use tracing::instrument; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{config::CurpConfig, task_manager::TaskManager, tracing::Extract}; +use utils::{config::curp_config::CurpConfig, task_manager::TaskManager, tracing::Extract}; use self::curp_node::CurpNode; pub use self::raw_curp::RawCurp; diff --git a/crates/curp/src/server/raw_curp/log.rs b/crates/curp/src/server/raw_curp/log.rs index 4aee089d3..be1f5ca9c 100644 --- a/crates/curp/src/server/raw_curp/log.rs +++ b/crates/curp/src/server/raw_curp/log.rs @@ -455,7 +455,7 @@ mod tests { use std::{iter::repeat, ops::Index, sync::Arc}; use curp_test_utils::test_cmd::TestCommand; - use utils::config::{default_batch_max_size, default_log_entries_cap}; + use utils::config::curp_config::{default_batch_max_size, default_log_entries_cap}; use super::*; diff --git a/crates/curp/src/server/raw_curp/mod.rs b/crates/curp/src/server/raw_curp/mod.rs index 298677e75..46aba96d3 100644 --- a/crates/curp/src/server/raw_curp/mod.rs +++ b/crates/curp/src/server/raw_curp/mod.rs @@ -38,7 +38,7 @@ use tracing::{ #[cfg(madsim)] use utils::ClientTlsConfig; use utils::{ - config::CurpConfig, + config::curp_config::CurpConfig, parking_lot_lock::{MutexMap, RwLockMap}, task_manager::TaskManager, }; diff --git a/crates/curp/src/server/raw_curp/tests.rs b/crates/curp/src/server/raw_curp/tests.rs index 827213497..3b5a1251a 100644 --- a/crates/curp/src/server/raw_curp/tests.rs +++ b/crates/curp/src/server/raw_curp/tests.rs @@ -7,7 +7,7 @@ use tokio::{ time::{sleep, Instant}, }; use tracing_test::traced_test; -use utils::config::{ +use utils::config::curp_config::{ default_candidate_timeout_ticks, default_follower_timeout_ticks, default_heartbeat_interval, CurpConfigBuilder, }; diff --git a/crates/curp/src/server/storage/db.rs b/crates/curp/src/server/storage/db.rs index 1e96ccf81..dd9a0a4f3 100644 --- a/crates/curp/src/server/storage/db.rs +++ b/crates/curp/src/server/storage/db.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use async_trait::async_trait; use engine::{Engine, EngineType, StorageEngine, WriteOperation}; use prost::Message; -use utils::config::EngineConfig; +use utils::config::engine_config::EngineConfig; use super::{StorageApi, StorageError}; use crate::{ diff --git a/crates/curp/tests/it/common/curp_group.rs b/crates/curp/tests/it/common/curp_group.rs index 6cf8bb0c8..dfdd640a9 100644 --- a/crates/curp/tests/it/common/curp_group.rs +++ b/crates/curp/tests/it/common/curp_group.rs @@ -36,7 +36,8 @@ use tracing::debug; use utils::{ build_endpoint, config::{ - default_quota, ClientConfig, CurpConfig, CurpConfigBuilder, EngineConfig, StorageConfig, + client_config::ClientConfig, curp_config::CurpConfig, curp_config::CurpConfigBuilder, + engine_config::EngineConfig, storage_config::default_quota, storage_config::StorageConfig, }, task_manager::{tasks::TaskName, Listener, TaskManager}, }; diff --git a/crates/curp/tests/it/server.rs b/crates/curp/tests/it/server.rs index edf890f3c..9a0a1ab5b 100644 --- a/crates/curp/tests/it/server.rs +++ b/crates/curp/tests/it/server.rs @@ -15,7 +15,7 @@ use curp_test_utils::{ use madsim::rand::{thread_rng, Rng}; use test_macros::abort_on_panic; use tokio::net::TcpListener; -use utils::{config::ClientConfig, timestamp}; +use utils::{config::client_config::ClientConfig, timestamp}; use crate::common::curp_group::{ commandpb::ProposeId, CurpGroup, FetchClusterRequest, ProposeRequest, ProposeResponse, diff --git a/crates/utils/src/config.rs b/crates/utils/src/config.rs deleted file mode 100644 index 3b197cb77..000000000 --- a/crates/utils/src/config.rs +++ /dev/null @@ -1,1500 +0,0 @@ -use std::{collections::HashMap, path::PathBuf, time::Duration}; - -use derive_builder::Builder; -use getset::Getters; -use serde::Deserialize; -use tracing_appender::rolling::RollingFileAppender; - -/// Xline server configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] -pub struct XlineServerConfig { - /// cluster configuration object - #[getset(get = "pub")] - cluster: ClusterConfig, - /// xline storage configuration object - #[getset(get = "pub")] - storage: StorageConfig, - /// log configuration object - #[getset(get = "pub")] - log: LogConfig, - /// trace configuration object - #[getset(get = "pub")] - trace: TraceConfig, - /// auth configuration object - #[getset(get = "pub")] - auth: AuthConfig, - /// compactor configuration object - #[getset(get = "pub")] - compact: CompactConfig, - /// tls configuration object - #[getset(get = "pub")] - tls: TlsConfig, - /// Metrics config - #[getset(get = "pub")] - #[serde(default = "MetricsConfig::default")] - metrics: MetricsConfig, -} - -/// Cluster Range type alias -pub type ClusterRange = std::ops::Range; -/// Log verbosity level alias -#[allow(clippy::module_name_repetitions)] -pub type LevelConfig = tracing::metadata::LevelFilter; - -/// `Duration` deserialization formatter -pub mod duration_format { - use std::time::Duration; - - use serde::{self, Deserialize, Deserializer}; - - use crate::parse_duration; - - /// deserializes a cluster duration - #[allow(single_use_lifetimes)] // the false positive case blocks us - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_duration(&s).map_err(serde::de::Error::custom) - } -} - -/// batch size deserialization formatter -pub mod bytes_format { - use serde::{self, Deserialize, Deserializer}; - - use crate::parse_batch_bytes; - - /// deserializes a cluster duration - #[allow(single_use_lifetimes)] // the false positive case blocks us - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_batch_bytes(&s).map_err(serde::de::Error::custom) - } -} - -/// Cluster configuration object, including cluster relevant configuration fields -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct ClusterConfig { - /// Get xline server name - #[getset(get = "pub")] - name: String, - /// Xline server peer listen urls - #[getset(get = "pub")] - peer_listen_urls: Vec, - /// Xline server peer advertise urls - #[getset(get = "pub")] - peer_advertise_urls: Vec, - /// Xline server client listen urls - #[getset(get = "pub")] - client_listen_urls: Vec, - /// Xline server client advertise urls - #[getset(get = "pub")] - client_advertise_urls: Vec, - /// All the nodes in the xline cluster - #[getset(get = "pub")] - peers: HashMap>, - /// Leader node. - #[getset(get = "pub")] - is_leader: bool, - /// Curp server timeout settings - #[getset(get = "pub")] - #[serde(default = "CurpConfig::default")] - curp_config: CurpConfig, - /// Curp client config settings - #[getset(get = "pub")] - #[serde(default = "ClientConfig::default")] - client_config: ClientConfig, - /// Xline server timeout settings - #[getset(get = "pub")] - #[serde(default = "ServerTimeout::default")] - server_timeout: ServerTimeout, - /// Xline server initial state - #[getset(get = "pub")] - #[serde(with = "state_format", default = "InitialClusterState::default")] - initial_cluster_state: InitialClusterState, -} - -impl Default for ClusterConfig { - #[inline] - fn default() -> Self { - Self { - name: "default".to_owned(), - peer_listen_urls: vec!["http://127.0.0.1:2380".to_owned()], - peer_advertise_urls: vec!["http://127.0.0.1:2380".to_owned()], - client_listen_urls: vec!["http://127.0.0.1:2379".to_owned()], - client_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()], - peers: HashMap::from([( - "default".to_owned(), - vec!["http://127.0.0.1:2379".to_owned()], - )]), - is_leader: false, - curp_config: CurpConfig::default(), - client_config: ClientConfig::default(), - server_timeout: ServerTimeout::default(), - initial_cluster_state: InitialClusterState::default(), - } - } -} - -/// Initial cluster state of xline server -#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)] -#[non_exhaustive] -pub enum InitialClusterState { - /// Create a new cluster - #[default] - New, - /// Join an existing cluster - Existing, -} - -/// `InitialClusterState` deserialization formatter -pub mod state_format { - use serde::{self, Deserialize, Deserializer}; - - use super::InitialClusterState; - use crate::parse_state; - - /// deserializes a cluster log rotation strategy - #[allow(single_use_lifetimes)] - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_state(&s).map_err(serde::de::Error::custom) - } -} - -impl ClusterConfig { - /// Generate a new `ClusterConfig` object - #[must_use] - #[inline] - #[allow(clippy::too_many_arguments)] - pub fn new( - name: String, - peer_listen_urls: Vec, - peer_advertise_urls: Vec, - client_listen_urls: Vec, - client_advertise_urls: Vec, - members: HashMap>, - is_leader: bool, - curp: CurpConfig, - client_config: ClientConfig, - server_timeout: ServerTimeout, - initial_cluster_state: InitialClusterState, - ) -> Self { - Self { - name, - peer_listen_urls, - peer_advertise_urls, - client_listen_urls, - client_advertise_urls, - peers: members, - is_leader, - curp_config: curp, - client_config, - server_timeout, - initial_cluster_state, - } - } -} - -/// Compaction configuration -#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)] -#[allow(clippy::module_name_repetitions)] -pub struct CompactConfig { - /// The max number of historical versions processed in a single compact operation - #[getset(get = "pub")] - #[serde(default = "default_compact_batch_size")] - compact_batch_size: usize, - /// The interval between two compaction batches - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_compact_sleep_interval")] - compact_sleep_interval: Duration, - /// The auto compactor config - #[getset(get = "pub")] - auto_compact_config: Option, -} - -impl Default for CompactConfig { - #[inline] - fn default() -> Self { - Self { - compact_batch_size: default_compact_batch_size(), - compact_sleep_interval: default_compact_sleep_interval(), - auto_compact_config: None, - } - } -} - -impl CompactConfig { - /// Create a new compact config - #[must_use] - #[inline] - pub fn new( - compact_batch_size: usize, - compact_sleep_interval: Duration, - auto_compact_config: Option, - ) -> Self { - Self { - compact_batch_size, - compact_sleep_interval, - auto_compact_config, - } - } -} - -/// default compact batch size -#[must_use] -#[inline] -pub const fn default_compact_batch_size() -> usize { - 1000 -} - -/// default compact interval -#[must_use] -#[inline] -pub const fn default_compact_sleep_interval() -> Duration { - Duration::from_millis(10) -} - -/// Curp server timeout settings -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)] -#[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)] -pub struct CurpConfig { - /// Heartbeat Interval - #[builder(default = "default_heartbeat_interval()")] - #[serde(with = "duration_format", default = "default_heartbeat_interval")] - pub heartbeat_interval: Duration, - - /// Curp wait sync timeout - #[builder(default = "default_server_wait_synced_timeout()")] - #[serde( - with = "duration_format", - default = "default_server_wait_synced_timeout" - )] - pub wait_synced_timeout: Duration, - - /// Curp propose retry count - #[builder(default = "default_retry_count()")] - #[serde(default = "default_retry_count")] - pub retry_count: usize, - - /// Curp rpc timeout - #[builder(default = "default_rpc_timeout()")] - #[serde(with = "duration_format", default = "default_rpc_timeout")] - pub rpc_timeout: Duration, - - /// Curp append entries batch timeout - /// If the `batch_timeout` has expired, then it will be dispatched - /// whether its size reaches the `BATCHING_MSG_MAX_SIZE` or not. - #[builder(default = "default_batch_timeout()")] - #[serde(with = "duration_format", default = "default_batch_timeout")] - pub batch_timeout: Duration, - - /// The maximum number of bytes per batch. - #[builder(default = "default_batch_max_size()")] - #[serde(with = "bytes_format", default = "default_batch_max_size")] - pub batch_max_size: u64, - - /// How many ticks a follower is allowed to miss before it starts a new round of election - /// The actual timeout will be randomized and in between heartbeat_interval * [follower_timeout_ticks, 2 * follower_timeout_ticks) - #[builder(default = "default_follower_timeout_ticks()")] - #[serde(default = "default_follower_timeout_ticks")] - pub follower_timeout_ticks: u8, - - /// How many ticks a candidate needs to wait before it starts a new round of election - /// It should be smaller than `follower_timeout_ticks` - /// The actual timeout will be randomized and in between heartbeat_interval * [candidate_timeout_ticks, 2 * candidate_timeout_ticks) - #[builder(default = "default_candidate_timeout_ticks()")] - #[serde(default = "default_candidate_timeout_ticks")] - pub candidate_timeout_ticks: u8, - - /// Curp storage path - #[builder(default = "EngineConfig::default()")] - #[serde(default = "EngineConfig::default")] - pub engine_cfg: EngineConfig, - - /// Number of command execute workers - #[builder(default = "default_cmd_workers()")] - #[serde(default = "default_cmd_workers")] - pub cmd_workers: u8, - - /// How often should the gc task run - #[builder(default = "default_gc_interval()")] - #[serde(with = "duration_format", default = "default_gc_interval")] - pub gc_interval: Duration, - - /// Number of log entries to keep in memory - #[builder(default = "default_log_entries_cap()")] - #[serde(default = "default_log_entries_cap")] - pub log_entries_cap: usize, -} - -/// default heartbeat interval -#[must_use] -#[inline] -pub const fn default_heartbeat_interval() -> Duration { - Duration::from_millis(300) -} - -/// default batch timeout -#[must_use] -#[inline] -pub const fn default_batch_timeout() -> Duration { - Duration::from_millis(15) -} - -/// default batch timeout -#[must_use] -#[inline] -#[allow(clippy::arithmetic_side_effects)] -pub const fn default_batch_max_size() -> u64 { - 2 * 1024 * 1024 -} - -/// default wait synced timeout -#[must_use] -#[inline] -pub const fn default_server_wait_synced_timeout() -> Duration { - Duration::from_secs(5) -} - -/// default initial retry timeout -#[must_use] -#[inline] -pub const fn default_initial_retry_timeout() -> Duration { - Duration::from_millis(1500) -} - -/// default max retry timeout -#[must_use] -#[inline] -pub const fn default_max_retry_timeout() -> Duration { - Duration::from_millis(10_000) -} - -/// default retry count -#[cfg(not(madsim))] -#[must_use] -#[inline] -pub const fn default_retry_count() -> usize { - 3 -} -/// default retry count -#[cfg(madsim)] -#[must_use] -#[inline] -pub const fn default_retry_count() -> usize { - 10 -} - -/// default use backoff -#[must_use] -#[inline] -pub const fn default_fixed_backoff() -> bool { - false -} - -/// default rpc timeout -#[must_use] -#[inline] -pub const fn default_rpc_timeout() -> Duration { - Duration::from_millis(50) -} - -/// default candidate timeout ticks -#[must_use] -#[inline] -pub const fn default_candidate_timeout_ticks() -> u8 { - 2 -} - -/// default client wait synced timeout -#[must_use] -#[inline] -pub const fn default_client_wait_synced_timeout() -> Duration { - Duration::from_secs(2) -} - -/// default client propose timeout -#[must_use] -#[inline] -pub const fn default_propose_timeout() -> Duration { - Duration::from_secs(1) -} - -/// default follower timeout -#[must_use] -#[inline] -pub const fn default_follower_timeout_ticks() -> u8 { - 5 -} - -/// default number of execute workers -#[must_use] -#[inline] -pub const fn default_cmd_workers() -> u8 { - 8 -} - -/// default range retry timeout -#[must_use] -#[inline] -pub const fn default_range_retry_timeout() -> Duration { - Duration::from_secs(2) -} - -/// default compact timeout -#[must_use] -#[inline] -pub const fn default_compact_timeout() -> Duration { - Duration::from_secs(5) -} - -/// default sync victims interval -#[must_use] -#[inline] -pub const fn default_sync_victims_interval() -> Duration { - Duration::from_millis(10) -} - -/// default gc interval -#[must_use] -#[inline] -pub const fn default_gc_interval() -> Duration { - Duration::from_secs(20) -} - -/// default number of log entries to keep in memory -#[must_use] -#[inline] -pub const fn default_log_entries_cap() -> usize { - 5000 -} - -/// default watch progress notify interval -#[must_use] -#[inline] -pub const fn default_watch_progress_notify_interval() -> Duration { - Duration::from_secs(600) -} - -impl Default for CurpConfig { - #[inline] - fn default() -> Self { - Self { - heartbeat_interval: default_heartbeat_interval(), - wait_synced_timeout: default_server_wait_synced_timeout(), - retry_count: default_retry_count(), - rpc_timeout: default_rpc_timeout(), - batch_timeout: default_batch_timeout(), - batch_max_size: default_batch_max_size(), - follower_timeout_ticks: default_follower_timeout_ticks(), - candidate_timeout_ticks: default_candidate_timeout_ticks(), - engine_cfg: EngineConfig::default(), - cmd_workers: default_cmd_workers(), - gc_interval: default_gc_interval(), - log_entries_cap: default_log_entries_cap(), - } - } -} - -/// Curp client settings -#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -#[allow(clippy::module_name_repetitions)] -pub struct ClientConfig { - /// Curp client wait sync timeout - #[getset(get = "pub")] - #[serde( - with = "duration_format", - default = "default_client_wait_synced_timeout" - )] - wait_synced_timeout: Duration, - - /// Curp client propose request timeout - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_propose_timeout")] - propose_timeout: Duration, - - /// Curp client initial retry interval - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_initial_retry_timeout")] - initial_retry_timeout: Duration, - - /// Curp client max retry interval - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_max_retry_timeout")] - max_retry_timeout: Duration, - - /// Curp client retry interval - #[getset(get = "pub")] - #[serde(default = "default_retry_count")] - retry_count: usize, - - /// Whether to use exponential backoff in retries - #[getset(get = "pub")] - #[serde(default = "default_fixed_backoff")] - fixed_backoff: bool, -} - -impl ClientConfig { - /// Create a new client timeout - /// - /// # Panics - /// - /// Panics if `initial_retry_timeout` is larger than `max_retry_timeout` - #[must_use] - #[inline] - pub fn new( - wait_synced_timeout: Duration, - propose_timeout: Duration, - initial_retry_timeout: Duration, - max_retry_timeout: Duration, - retry_count: usize, - fixed_backoff: bool, - ) -> Self { - assert!( - initial_retry_timeout <= max_retry_timeout, - "`initial_retry_timeout` should less or equal to `max_retry_timeout`" - ); - Self { - wait_synced_timeout, - propose_timeout, - initial_retry_timeout, - max_retry_timeout, - retry_count, - fixed_backoff, - } - } -} - -impl Default for ClientConfig { - #[inline] - fn default() -> Self { - Self { - wait_synced_timeout: default_client_wait_synced_timeout(), - propose_timeout: default_propose_timeout(), - initial_retry_timeout: default_initial_retry_timeout(), - max_retry_timeout: default_max_retry_timeout(), - retry_count: default_retry_count(), - fixed_backoff: default_fixed_backoff(), - } - } -} - -/// Xline server settings -#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct ServerTimeout { - /// Range request retry timeout settings - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_range_retry_timeout")] - range_retry_timeout: Duration, - /// Range request retry timeout settings - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_compact_timeout")] - compact_timeout: Duration, - /// Sync victims interval - #[getset(get = "pub")] - #[serde(with = "duration_format", default = "default_sync_victims_interval")] - sync_victims_interval: Duration, - /// Watch progress notify interval settings - #[getset(get = "pub")] - #[serde( - with = "duration_format", - default = "default_watch_progress_notify_interval" - )] - watch_progress_notify_interval: Duration, -} - -impl ServerTimeout { - /// Create a new server timeout - #[must_use] - #[inline] - pub fn new( - range_retry_timeout: Duration, - compact_timeout: Duration, - sync_victims_interval: Duration, - watch_progress_notify_interval: Duration, - ) -> Self { - Self { - range_retry_timeout, - compact_timeout, - sync_victims_interval, - watch_progress_notify_interval, - } - } -} - -impl Default for ServerTimeout { - #[inline] - fn default() -> Self { - Self { - range_retry_timeout: default_range_retry_timeout(), - compact_timeout: default_compact_timeout(), - sync_victims_interval: default_sync_victims_interval(), - watch_progress_notify_interval: default_watch_progress_notify_interval(), - } - } -} - -/// Auto Compactor Configuration -#[allow(clippy::module_name_repetitions)] -#[non_exhaustive] -#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] -#[serde( - tag = "mode", - content = "retention", - rename_all(deserialize = "lowercase") -)] -pub enum AutoCompactConfig { - /// auto periodic compactor - #[serde(with = "duration_format")] - Periodic(Duration), - /// auto revision compactor - Revision(i64), -} - -/// Engine Configuration -#[allow(clippy::module_name_repetitions)] -#[non_exhaustive] -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -#[serde( - tag = "type", - content = "data_dir", - rename_all(deserialize = "lowercase") -)] -pub enum EngineConfig { - /// Memory Storage Engine - Memory, - /// RocksDB Storage Engine - RocksDB(PathBuf), -} - -impl Default for EngineConfig { - #[inline] - fn default() -> Self { - Self::Memory - } -} - -/// /// Storage Configuration -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -#[allow(clippy::module_name_repetitions)] -#[non_exhaustive] -pub struct StorageConfig { - /// Engine Configuration - #[serde(default = "EngineConfig::default")] - pub engine: EngineConfig, - /// Quota - #[serde(default = "default_quota")] - pub quota: u64, -} - -impl StorageConfig { - /// Create a new storage config - #[inline] - #[must_use] - pub fn new(engine: EngineConfig, quota: u64) -> Self { - Self { engine, quota } - } -} - -impl Default for StorageConfig { - #[inline] - fn default() -> Self { - Self { - engine: EngineConfig::default(), - quota: default_quota(), - } - } -} - -/// Default quota: 8GB -#[inline] -#[must_use] -pub fn default_quota() -> u64 { - // 8 * 1024 * 1024 * 1024 - 0x0002_0000_0000 -} - -/// Log configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct LogConfig { - /// Log file path - #[getset(get = "pub")] - #[serde(default)] - path: Option, - /// Log rotation strategy - #[getset(get = "pub")] - #[serde(with = "rotation_format", default = "default_rotation")] - rotation: RotationConfig, - /// Log verbosity level - #[getset(get = "pub")] - #[serde(with = "level_format", default = "default_log_level")] - level: LevelConfig, -} - -impl Default for LogConfig { - #[inline] - fn default() -> Self { - Self { - path: None, - rotation: default_rotation(), - level: default_log_level(), - } - } -} - -/// `LevelConfig` deserialization formatter -pub mod level_format { - use serde::{self, Deserialize, Deserializer}; - - use super::LevelConfig; - use crate::parse_log_level; - - /// deserializes a cluster duration - #[allow(single_use_lifetimes)] - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_log_level(&s).map_err(serde::de::Error::custom) - } -} - -/// default log level -#[must_use] -#[inline] -pub const fn default_log_level() -> LevelConfig { - LevelConfig::INFO -} - -impl LogConfig { - /// Generate a new `LogConfig` object - #[must_use] - #[inline] - pub fn new(path: PathBuf, rotation: RotationConfig, level: LevelConfig) -> Self { - Self { - path: Some(path), - rotation, - level, - } - } -} - -/// Xline log rotation strategy -#[non_exhaustive] -#[allow(clippy::module_name_repetitions)] -#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq)] -#[serde(rename_all(deserialize = "lowercase"))] -pub enum RotationConfig { - /// Rotate log file in every hour - Hourly, - /// Rotate log file every day - Daily, - /// Never rotate log file - Never, -} - -impl std::fmt::Display for RotationConfig { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match *self { - RotationConfig::Hourly => write!(f, "hourly"), - RotationConfig::Daily => write!(f, "daily"), - RotationConfig::Never => write!(f, "never"), - } - } -} - -/// `RotationConfig` deserialization formatter -pub mod rotation_format { - use serde::{self, Deserialize, Deserializer}; - - use super::RotationConfig; - use crate::parse_rotation; - - /// deserializes a cluster log rotation strategy - #[allow(single_use_lifetimes)] - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_rotation(&s).map_err(serde::de::Error::custom) - } -} - -/// default log rotation strategy -#[must_use] -#[inline] -pub const fn default_rotation() -> RotationConfig { - RotationConfig::Daily -} - -/// Generates a `RollingFileAppender` from the given `RotationConfig` and `name` -#[must_use] -#[inline] -pub fn file_appender( - rotation: RotationConfig, - file_path: &PathBuf, - name: &str, -) -> RollingFileAppender { - match rotation { - RotationConfig::Hourly => { - tracing_appender::rolling::hourly(file_path, format!("xline_{name}.log")) - } - RotationConfig::Daily => { - tracing_appender::rolling::daily(file_path, format!("xline_{name}.log")) - } - RotationConfig::Never => { - tracing_appender::rolling::never(file_path, format!("xline_{name}.log")) - } - #[allow(unreachable_patterns)] - // It's ok because `parse_rotation` have check the validity before. - _ => unreachable!("should not call file_appender when parse_rotation failed"), - } -} - -/// Xline tracing configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct TraceConfig { - /// Open jaeger online, sending data to jaeger agent directly - #[getset(get = "pub")] - jaeger_online: bool, - /// Open jaeger offline, saving data to the `jaeger_output_dir` - #[getset(get = "pub")] - jaeger_offline: bool, - /// The dir path to save the data when `jaeger_offline` is on - #[getset(get = "pub")] - jaeger_output_dir: PathBuf, - /// The verbosity level of tracing - #[getset(get = "pub")] - #[serde(with = "level_format", default = "default_log_level")] - jaeger_level: LevelConfig, -} - -impl Default for TraceConfig { - #[inline] - fn default() -> Self { - Self { - jaeger_online: false, - jaeger_offline: false, - jaeger_output_dir: "".into(), - jaeger_level: default_log_level(), - } - } -} - -impl TraceConfig { - /// Generate a new `TraceConfig` object - #[must_use] - #[inline] - pub fn new( - jaeger_online: bool, - jaeger_offline: bool, - jaeger_output_dir: PathBuf, - jaeger_level: LevelConfig, - ) -> Self { - Self { - jaeger_online, - jaeger_offline, - jaeger_output_dir, - jaeger_level, - } - } -} - -/// Xline tracing configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] -pub struct AuthConfig { - /// The public key file - #[getset(get = "pub")] - auth_public_key: Option, - /// The private key file - #[getset(get = "pub")] - auth_private_key: Option, -} - -impl AuthConfig { - /// Generate a new `AuthConfig` object - #[must_use] - #[inline] - pub fn new(auth_public_key: Option, auth_private_key: Option) -> Self { - Self { - auth_public_key, - auth_private_key, - } - } -} - -/// Xline tls configuration object -#[allow(clippy::module_name_repetitions)] -#[non_exhaustive] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] -pub struct TlsConfig { - /// The CA certificate file used by server to verify client certificates - #[getset(get = "pub")] - pub server_ca_cert_path: Option, - /// The public key file used by server - #[getset(get = "pub")] - pub server_cert_path: Option, - /// The private key file used by server - #[getset(get = "pub")] - pub server_key_path: Option, - /// The CA certificate file used by client to verify server certificates - #[getset(get = "pub")] - pub client_ca_cert_path: Option, - /// The public key file used by client - #[getset(get = "pub")] - pub client_cert_path: Option, - /// The private key file used by client - #[getset(get = "pub")] - pub client_key_path: Option, -} - -impl TlsConfig { - /// Create a new `TlsConfig` object - #[must_use] - #[inline] - pub fn new( - server_ca_cert_path: Option, - server_cert_path: Option, - server_key_path: Option, - client_ca_cert_path: Option, - client_cert_path: Option, - client_key_path: Option, - ) -> Self { - Self { - server_ca_cert_path, - server_cert_path, - server_key_path, - client_ca_cert_path, - client_cert_path, - client_key_path, - } - } -} - -/// Xline metrics push protocol -#[non_exhaustive] -#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Default)] -#[serde(rename_all(deserialize = "lowercase"))] -pub enum MetricsPushProtocol { - /// HTTP protocol - HTTP, - /// GRPC protocol - #[default] - GRPC, -} - -impl std::fmt::Display for MetricsPushProtocol { - #[inline] - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match *self { - MetricsPushProtocol::HTTP => write!(f, "http"), - MetricsPushProtocol::GRPC => write!(f, "grpc"), - } - } -} - -/// Metrics push protocol format -pub mod protocol_format { - use serde::{self, Deserialize, Deserializer}; - - use super::MetricsPushProtocol; - use crate::parse_metrics_push_protocol; - - /// deserializes a cluster duration - #[allow(single_use_lifetimes)] - pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - parse_metrics_push_protocol(&s).map_err(serde::de::Error::custom) - } -} - -/// Xline metrics configuration object -#[allow(clippy::module_name_repetitions)] -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] -pub struct MetricsConfig { - /// Enable or not - #[getset(get = "pub")] - #[serde(default = "default_metrics_enable")] - enable: bool, - /// The http port to expose - #[getset(get = "pub")] - #[serde(default = "default_metrics_port")] - port: u16, - /// The http path to expose - #[getset(get = "pub")] - #[serde(default = "default_metrics_path")] - path: String, - /// Enable push or not - #[getset(get = "pub")] - #[serde(default = "default_metrics_push")] - push: bool, - /// Push endpoint - #[getset(get = "pub")] - #[serde(default = "default_metrics_push_endpoint")] - push_endpoint: String, - /// Push protocol - #[getset(get = "pub")] - #[serde(with = "protocol_format", default = "default_metrics_push_protocol")] - push_protocol: MetricsPushProtocol, -} - -impl MetricsConfig { - /// Create a new `MetricsConfig` - #[must_use] - #[inline] - pub fn new( - enable: bool, - port: u16, - path: String, - push: bool, - push_endpoint: String, - push_protocol: MetricsPushProtocol, - ) -> Self { - Self { - enable, - port, - path, - push, - push_endpoint, - push_protocol, - } - } -} - -impl Default for MetricsConfig { - #[inline] - fn default() -> Self { - Self { - enable: default_metrics_enable(), - port: default_metrics_port(), - path: default_metrics_path(), - push: default_metrics_push(), - push_endpoint: default_metrics_push_endpoint(), - push_protocol: default_metrics_push_protocol(), - } - } -} - -/// Default metrics enable -#[must_use] -#[inline] -pub const fn default_metrics_enable() -> bool { - true -} - -/// Default metrics port -#[must_use] -#[inline] -pub const fn default_metrics_port() -> u16 { - 9100 -} - -/// Default metrics path -#[must_use] -#[inline] -pub fn default_metrics_path() -> String { - "/metrics".to_owned() -} - -/// Default metrics push option -#[must_use] -#[inline] -pub fn default_metrics_push() -> bool { - false -} - -/// Default metrics push protocol -#[must_use] -#[inline] -pub fn default_metrics_push_protocol() -> MetricsPushProtocol { - MetricsPushProtocol::GRPC -} - -/// Default metrics push endpoint -#[must_use] -#[inline] -pub fn default_metrics_push_endpoint() -> String { - "http://127.0.0.1:4318".to_owned() -} - -impl XlineServerConfig { - /// Generates a new `XlineServerConfig` object - #[must_use] - #[inline] - #[allow(clippy::too_many_arguments)] - pub fn new( - cluster: ClusterConfig, - storage: StorageConfig, - log: LogConfig, - trace: TraceConfig, - auth: AuthConfig, - compact: CompactConfig, - tls: TlsConfig, - metrics: MetricsConfig, - ) -> Self { - Self { - cluster, - storage, - log, - trace, - auth, - compact, - tls, - metrics, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[allow(clippy::too_many_lines)] // just a testcase, not too bad - #[test] - fn test_xline_server_config_should_be_loaded() { - let config: XlineServerConfig = toml::from_str( - r#"[cluster] - name = 'node1' - is_leader = true - initial_cluster_state = 'new' - peer_listen_urls = ['127.0.0.1:2380'] - peer_advertise_urls = ['127.0.0.1:2380'] - client_listen_urls = ['127.0.0.1:2379'] - client_advertise_urls = ['127.0.0.1:2379'] - - [cluster.server_timeout] - range_retry_timeout = '3s' - compact_timeout = '5s' - sync_victims_interval = '20ms' - watch_progress_notify_interval = '1s' - - [cluster.peers] - node1 = ['127.0.0.1:2378', '127.0.0.1:2379'] - node2 = ['127.0.0.1:2380'] - node3 = ['127.0.0.1:2381'] - - [cluster.curp_config] - heartbeat_interval = '200ms' - wait_synced_timeout = '100ms' - rpc_timeout = '100ms' - retry_timeout = '100ms' - - [cluster.client_config] - initial_retry_timeout = '5s' - max_retry_timeout = '50s' - - [storage] - engine = { type = 'memory'} - - [compact] - compact_batch_size = 123 - compact_sleep_interval = '5ms' - - [compact.auto_compact_config] - mode = 'periodic' - retention = '10h' - - [log] - path = '/var/log/xline' - rotation = 'daily' - level = 'info' - - [trace] - jaeger_online = false - jaeger_offline = false - jaeger_output_dir = './jaeger_jsons' - jaeger_level = 'info' - - [auth] - auth_public_key = './public_key.pem' - auth_private_key = './private_key.pem' - - [tls] - server_cert_path = './cert.pem' - server_key_path = './key.pem' - client_ca_cert_path = './ca.pem' - - [metrics] - enable = true - port = 9100 - path = "/metrics" - push = true - push_endpoint = 'http://some-endpoint.com:4396' - push_protocol = 'http' - "#, - ) - .unwrap(); - - let curp_config = CurpConfigBuilder::default() - .heartbeat_interval(Duration::from_millis(200)) - .wait_synced_timeout(Duration::from_millis(100)) - .rpc_timeout(Duration::from_millis(100)) - .build() - .unwrap(); - - let client_config = ClientConfig::new( - default_client_wait_synced_timeout(), - default_propose_timeout(), - Duration::from_secs(5), - Duration::from_secs(50), - default_retry_count(), - default_fixed_backoff(), - ); - - let server_timeout = ServerTimeout::new( - Duration::from_secs(3), - Duration::from_secs(5), - Duration::from_millis(20), - Duration::from_secs(1), - ); - - assert_eq!( - config.cluster, - ClusterConfig::new( - "node1".to_owned(), - vec!["127.0.0.1:2380".to_owned()], - vec!["127.0.0.1:2380".to_owned()], - vec!["127.0.0.1:2379".to_owned()], - vec!["127.0.0.1:2379".to_owned()], - HashMap::from_iter([ - ( - "node1".to_owned(), - vec!["127.0.0.1:2378".to_owned(), "127.0.0.1:2379".to_owned()] - ), - ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), - ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), - ]), - true, - curp_config, - client_config, - server_timeout, - InitialClusterState::New - ) - ); - - assert_eq!( - config.storage, - StorageConfig::new(EngineConfig::Memory, default_quota()) - ); - - assert_eq!( - config.log, - LogConfig::new( - PathBuf::from("/var/log/xline"), - RotationConfig::Daily, - LevelConfig::INFO - ) - ); - assert_eq!( - config.trace, - TraceConfig::new( - false, - false, - PathBuf::from("./jaeger_jsons"), - LevelConfig::INFO - ) - ); - - assert_eq!( - config.compact, - CompactConfig { - compact_batch_size: 123, - compact_sleep_interval: Duration::from_millis(5), - auto_compact_config: Some(AutoCompactConfig::Periodic(Duration::from_secs( - 10 * 60 * 60 - ))) - } - ); - - assert_eq!( - config.auth, - AuthConfig { - auth_private_key: Some(PathBuf::from("./private_key.pem")), - auth_public_key: Some(PathBuf::from("./public_key.pem")), - } - ); - - assert_eq!( - config.tls, - TlsConfig { - server_cert_path: Some(PathBuf::from("./cert.pem")), - server_key_path: Some(PathBuf::from("./key.pem")), - client_ca_cert_path: Some(PathBuf::from("./ca.pem")), - ..Default::default() - } - ); - - assert_eq!( - config.metrics, - MetricsConfig { - enable: true, - port: 9100, - path: "/metrics".to_owned(), - push: true, - push_endpoint: "http://some-endpoint.com:4396".to_owned(), - push_protocol: MetricsPushProtocol::HTTP, - }, - ); - } - - #[test] - fn test_xline_server_default_config_should_be_loaded() { - let config: XlineServerConfig = toml::from_str( - r#"[cluster] - name = 'node1' - is_leader = true - peer_listen_urls = ['127.0.0.1:2380'] - peer_advertise_urls = ['127.0.0.1:2380'] - client_listen_urls = ['127.0.0.1:2379'] - client_advertise_urls = ['127.0.0.1:2379'] - - [cluster.peers] - node1 = ['127.0.0.1:2379'] - node2 = ['127.0.0.1:2380'] - node3 = ['127.0.0.1:2381'] - - [cluster.storage] - - [log] - path = '/var/log/xline' - - [storage] - engine = { type = 'rocksdb', data_dir = '/usr/local/xline/data-dir' } - - [compact] - - [trace] - jaeger_online = false - jaeger_offline = false - jaeger_output_dir = './jaeger_jsons' - jaeger_level = 'info' - - [auth] - - [tls] - "#, - ) - .unwrap(); - - assert_eq!( - config.cluster, - ClusterConfig::new( - "node1".to_owned(), - vec!["127.0.0.1:2380".to_owned()], - vec!["127.0.0.1:2380".to_owned()], - vec!["127.0.0.1:2379".to_owned()], - vec!["127.0.0.1:2379".to_owned()], - HashMap::from([ - ("node1".to_owned(), vec!["127.0.0.1:2379".to_owned()]), - ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), - ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), - ]), - true, - CurpConfigBuilder::default().build().unwrap(), - ClientConfig::default(), - ServerTimeout::default(), - InitialClusterState::default() - ) - ); - - if let EngineConfig::RocksDB(path) = config.storage.engine { - assert_eq!(path, PathBuf::from("/usr/local/xline/data-dir")); - } else { - unreachable!(); - } - - assert_eq!( - config.log, - LogConfig::new( - PathBuf::from("/var/log/xline"), - RotationConfig::Daily, - LevelConfig::INFO - ) - ); - assert_eq!( - config.trace, - TraceConfig::new( - false, - false, - PathBuf::from("./jaeger_jsons"), - LevelConfig::INFO - ) - ); - assert_eq!(config.compact, CompactConfig::default()); - assert_eq!(config.auth, AuthConfig::default()); - assert_eq!(config.tls, TlsConfig::default()); - assert_eq!(config.metrics, MetricsConfig::default()); - } - - #[test] - fn test_auto_revision_compactor_config_should_be_loaded() { - let config: XlineServerConfig = toml::from_str( - r#"[cluster] - name = 'node1' - is_leader = true - peer_listen_urls = ['127.0.0.1:2380'] - peer_advertise_urls = ['127.0.0.1:2380'] - client_listen_urls = ['127.0.0.1:2379'] - client_advertise_urls = ['127.0.0.1:2379'] - - [cluster.peers] - node1 = ['127.0.0.1:2379'] - node2 = ['127.0.0.1:2380'] - node3 = ['127.0.0.1:2381'] - - [cluster.storage] - - [log] - path = '/var/log/xline' - - [storage] - engine = { type = 'memory' } - - [compact] - - [compact.auto_compact_config] - mode = 'revision' - retention = 10000 - - [trace] - jaeger_online = false - jaeger_offline = false - jaeger_output_dir = './jaeger_jsons' - jaeger_level = 'info' - - [auth] - - [tls] - "#, - ) - .unwrap(); - - assert_eq!( - config.compact, - CompactConfig { - auto_compact_config: Some(AutoCompactConfig::Revision(10000)), - ..Default::default() - } - ); - } -} diff --git a/crates/utils/src/config/auth_config.rs b/crates/utils/src/config/auth_config.rs new file mode 100644 index 000000000..2ec2f6f75 --- /dev/null +++ b/crates/utils/src/config/auth_config.rs @@ -0,0 +1,28 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +/// Xline auth configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct AuthConfig { + /// The public key file + #[getset(get = "pub")] + pub auth_public_key: Option, + /// The private key file + #[getset(get = "pub")] + pub auth_private_key: Option, +} + +impl AuthConfig { + /// Generate a new `AuthConfig` object + #[must_use] + #[inline] + pub fn new(auth_public_key: Option, auth_private_key: Option) -> Self { + Self { + auth_public_key, + auth_private_key, + } + } +} diff --git a/crates/utils/src/config/client_config.rs b/crates/utils/src/config/client_config.rs new file mode 100644 index 000000000..e850f641f --- /dev/null +++ b/crates/utils/src/config/client_config.rs @@ -0,0 +1,157 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Curp client settings +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +pub struct ClientConfig { + /// Curp client wait sync timeout + #[getset(get = "pub")] + #[serde( + with = "duration_format", + default = "default_client_wait_synced_timeout" + )] + wait_synced_timeout: Duration, + + /// Curp client propose request timeout + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_propose_timeout")] + propose_timeout: Duration, + + /// Curp client initial retry interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_initial_retry_timeout")] + initial_retry_timeout: Duration, + + /// Curp client max retry interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_max_retry_timeout")] + max_retry_timeout: Duration, + + /// Curp client retry interval + #[getset(get = "pub")] + #[serde(default = "default_retry_count")] + retry_count: usize, + + /// Whether to use exponential backoff in retries + #[getset(get = "pub")] + #[serde(default = "default_fixed_backoff")] + fixed_backoff: bool, +} + +impl ClientConfig { + /// Create a new client timeout + /// + /// # Panics + /// + /// Panics if `initial_retry_timeout` is larger than `max_retry_timeout` + #[must_use] + #[inline] + pub fn new( + wait_synced_timeout: Duration, + propose_timeout: Duration, + initial_retry_timeout: Duration, + max_retry_timeout: Duration, + retry_count: usize, + fixed_backoff: bool, + ) -> Self { + assert!( + initial_retry_timeout <= max_retry_timeout, + "`initial_retry_timeout` should less or equal to `max_retry_timeout`" + ); + Self { + wait_synced_timeout, + propose_timeout, + initial_retry_timeout, + max_retry_timeout, + retry_count, + fixed_backoff, + } + } +} + +impl Default for ClientConfig { + #[inline] + fn default() -> Self { + Self { + wait_synced_timeout: default_client_wait_synced_timeout(), + propose_timeout: default_propose_timeout(), + initial_retry_timeout: default_initial_retry_timeout(), + max_retry_timeout: default_max_retry_timeout(), + retry_count: default_retry_count(), + fixed_backoff: default_fixed_backoff(), + } + } +} + +/// default client wait synced timeout +#[must_use] +#[inline] +pub const fn default_client_wait_synced_timeout() -> Duration { + Duration::from_secs(2) +} + +/// default client propose timeout +#[must_use] +#[inline] +pub const fn default_propose_timeout() -> Duration { + Duration::from_secs(1) +} + +/// default initial retry timeout +#[must_use] +#[inline] +pub const fn default_initial_retry_timeout() -> Duration { + Duration::from_millis(1500) +} + +/// default max retry timeout +#[must_use] +#[inline] +pub const fn default_max_retry_timeout() -> Duration { + Duration::from_millis(10_000) +} + +/// default retry count +#[cfg(not(madsim))] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 3 +} + +/// default retry count +#[cfg(madsim)] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 10 +} + +/// default use backoff +#[must_use] +#[inline] +pub const fn default_fixed_backoff() -> bool { + false +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/cluster_config.rs b/crates/utils/src/config/cluster_config.rs new file mode 100644 index 000000000..2a56e4de3 --- /dev/null +++ b/crates/utils/src/config/cluster_config.rs @@ -0,0 +1,139 @@ +use std::collections::HashMap; + +use getset::Getters; +use serde::Deserialize; + +use super::client_config::ClientConfig; +use super::curp_config::CurpConfig; +use super::server_config::ServerTimeout; + +/// Cluster configuration object, including cluster relevant configuration fields +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct ClusterConfig { + /// Get xline server name + #[getset(get = "pub")] + name: String, + /// Xline server peer listen urls + #[getset(get = "pub")] + peer_listen_urls: Vec, + /// Xline server peer advertise urls + #[getset(get = "pub")] + peer_advertise_urls: Vec, + /// Xline server client listen urls + #[getset(get = "pub")] + client_listen_urls: Vec, + /// Xline server client advertise urls + #[getset(get = "pub")] + client_advertise_urls: Vec, + /// All the nodes in the xline cluster + #[getset(get = "pub")] + peers: HashMap>, + /// Leader node. + #[getset(get = "pub")] + is_leader: bool, + /// Curp server timeout settings + #[getset(get = "pub")] + #[serde(default = "CurpConfig::default")] + curp_config: CurpConfig, + /// Curp client config settings + #[getset(get = "pub")] + #[serde(default = "ClientConfig::default")] + client_config: ClientConfig, + /// Xline server timeout settings + #[getset(get = "pub")] + #[serde(default = "ServerTimeout::default")] + server_timeout: ServerTimeout, + /// Xline server initial state + #[getset(get = "pub")] + #[serde(with = "state_format", default = "InitialClusterState::default")] + initial_cluster_state: InitialClusterState, +} + +impl Default for ClusterConfig { + #[inline] + fn default() -> Self { + Self { + name: "default".to_owned(), + peer_listen_urls: vec!["http://127.0.0.1:2380".to_owned()], + peer_advertise_urls: vec!["http://127.0.0.1:2380".to_owned()], + client_listen_urls: vec!["http://127.0.0.1:2379".to_owned()], + client_advertise_urls: vec!["http://127.0.0.1:2379".to_owned()], + peers: HashMap::from([( + "default".to_owned(), + vec!["http://127.0.0.1:2379".to_owned()], + )]), + is_leader: false, + curp_config: CurpConfig::default(), + client_config: ClientConfig::default(), + server_timeout: ServerTimeout::default(), + initial_cluster_state: InitialClusterState::default(), + } + } +} + +impl ClusterConfig { + /// Generate a new `ClusterConfig` object + #[must_use] + #[inline] + #[allow(clippy::too_many_arguments)] + pub fn new( + name: String, + peer_listen_urls: Vec, + peer_advertise_urls: Vec, + client_listen_urls: Vec, + client_advertise_urls: Vec, + members: HashMap>, + is_leader: bool, + curp: CurpConfig, + client_config: ClientConfig, + server_timeout: ServerTimeout, + initial_cluster_state: InitialClusterState, + ) -> Self { + Self { + name, + peer_listen_urls, + peer_advertise_urls, + client_listen_urls, + client_advertise_urls, + peers: members, + is_leader, + curp_config: curp, + client_config, + server_timeout, + initial_cluster_state, + } + } +} + +/// Cluster Range type alias +pub type ClusterRange = std::ops::Range; + +/// Initial cluster state of xline server +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize)] +#[non_exhaustive] +pub enum InitialClusterState { + /// Create a new cluster + #[default] + New, + /// Join an existing cluster + Existing, +} + +/// `InitialClusterState` deserialization formatter +pub mod state_format { + use serde::{self, Deserialize, Deserializer}; + + use super::InitialClusterState; + use crate::parse_state; + + /// deserializes a cluster log rotation strategy + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_state(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/compact_config.rs b/crates/utils/src/config/compact_config.rs new file mode 100644 index 000000000..e5eea2aa7 --- /dev/null +++ b/crates/utils/src/config/compact_config.rs @@ -0,0 +1,99 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Compaction configuration +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +pub struct CompactConfig { + /// The max number of historical versions processed in a single compact operation + #[getset(get = "pub")] + #[serde(default = "default_compact_batch_size")] + pub compact_batch_size: usize, + /// The interval between two compaction batches + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_compact_sleep_interval")] + pub compact_sleep_interval: Duration, + /// The auto compactor config + #[getset(get = "pub")] + pub auto_compact_config: Option, +} + +impl Default for CompactConfig { + #[inline] + fn default() -> Self { + Self { + compact_batch_size: default_compact_batch_size(), + compact_sleep_interval: default_compact_sleep_interval(), + auto_compact_config: None, + } + } +} + +impl CompactConfig { + /// Create a new compact config + #[must_use] + #[inline] + pub fn new( + compact_batch_size: usize, + compact_sleep_interval: Duration, + auto_compact_config: Option, + ) -> Self { + Self { + compact_batch_size, + compact_sleep_interval, + auto_compact_config, + } + } +} + +/// Auto Compactor Configuration +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] +#[serde( + tag = "mode", + content = "retention", + rename_all(deserialize = "lowercase") +)] +pub enum AutoCompactConfig { + /// auto periodic compactor + #[serde(with = "duration_format")] + Periodic(Duration), + /// auto revision compactor + Revision(i64), +} + +/// default compact batch size +#[must_use] +#[inline] +pub const fn default_compact_batch_size() -> usize { + 1000 +} + +/// default compact interval +#[must_use] +#[inline] +pub const fn default_compact_sleep_interval() -> Duration { + Duration::from_millis(10) +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/curp_config.rs b/crates/utils/src/config/curp_config.rs new file mode 100644 index 000000000..5492fe67c --- /dev/null +++ b/crates/utils/src/config/curp_config.rs @@ -0,0 +1,223 @@ +use std::time::Duration; + +use derive_builder::Builder; +use getset::Getters; +use serde::Deserialize; + +use super::engine_config::EngineConfig; + +/// Curp server timeout settings +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)] +#[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)] +pub struct CurpConfig { + /// Heartbeat Interval + #[builder(default = "default_heartbeat_interval()")] + #[serde(with = "duration_format", default = "default_heartbeat_interval")] + pub heartbeat_interval: Duration, + + /// Curp wait sync timeout + #[builder(default = "default_server_wait_synced_timeout()")] + #[serde( + with = "duration_format", + default = "default_server_wait_synced_timeout" + )] + pub wait_synced_timeout: Duration, + + /// Curp propose retry count + #[builder(default = "default_retry_count()")] + #[serde(default = "default_retry_count")] + pub retry_count: usize, + + /// Curp rpc timeout + #[builder(default = "default_rpc_timeout()")] + #[serde(with = "duration_format", default = "default_rpc_timeout")] + pub rpc_timeout: Duration, + + /// Curp append entries batch timeout + /// If the `batch_timeout` has expired, then it will be dispatched + /// whether its size reaches the `BATCHING_MSG_MAX_SIZE` or not. + #[builder(default = "default_batch_timeout()")] + #[serde(with = "duration_format", default = "default_batch_timeout")] + pub batch_timeout: Duration, + + /// The maximum number of bytes per batch. + #[builder(default = "default_batch_max_size()")] + #[serde(with = "bytes_format", default = "default_batch_max_size")] + pub batch_max_size: u64, + + /// How many ticks a follower is allowed to miss before it starts a new round of election + /// The actual timeout will be randomized and in between heartbeat_interval * [follower_timeout_ticks, 2 * follower_timeout_ticks) + #[builder(default = "default_follower_timeout_ticks()")] + #[serde(default = "default_follower_timeout_ticks")] + pub follower_timeout_ticks: u8, + + /// How many ticks a candidate needs to wait before it starts a new round of election + /// It should be smaller than `follower_timeout_ticks` + /// The actual timeout will be randomized and in between heartbeat_interval * [candidate_timeout_ticks, 2 * candidate_timeout_ticks) + #[builder(default = "default_candidate_timeout_ticks()")] + #[serde(default = "default_candidate_timeout_ticks")] + pub candidate_timeout_ticks: u8, + + /// Curp storage path + #[builder(default = "EngineConfig::default()")] + #[serde(default = "EngineConfig::default")] + pub engine_cfg: EngineConfig, + + /// Number of command execute workers + #[builder(default = "default_cmd_workers()")] + #[serde(default = "default_cmd_workers")] + pub cmd_workers: u8, + + /// How often should the gc task run + #[builder(default = "default_gc_interval()")] + #[serde(with = "duration_format", default = "default_gc_interval")] + pub gc_interval: Duration, + + /// Number of log entries to keep in memory + #[builder(default = "default_log_entries_cap()")] + #[serde(default = "default_log_entries_cap")] + pub log_entries_cap: usize, +} + +impl Default for CurpConfig { + #[inline] + fn default() -> Self { + Self { + heartbeat_interval: default_heartbeat_interval(), + wait_synced_timeout: default_server_wait_synced_timeout(), + retry_count: default_retry_count(), + rpc_timeout: default_rpc_timeout(), + batch_timeout: default_batch_timeout(), + batch_max_size: default_batch_max_size(), + follower_timeout_ticks: default_follower_timeout_ticks(), + candidate_timeout_ticks: default_candidate_timeout_ticks(), + engine_cfg: EngineConfig::default(), + cmd_workers: default_cmd_workers(), + gc_interval: default_gc_interval(), + log_entries_cap: default_log_entries_cap(), + } + } +} + +/// default heartbeat interval +#[must_use] +#[inline] +pub const fn default_heartbeat_interval() -> Duration { + Duration::from_millis(300) +} + +/// default wait synced timeout +#[must_use] +#[inline] +pub const fn default_server_wait_synced_timeout() -> Duration { + Duration::from_secs(5) +} + +/// default retry count +#[cfg(not(madsim))] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 3 +} + +/// default retry count +#[cfg(madsim)] +#[must_use] +#[inline] +pub const fn default_retry_count() -> usize { + 10 +} + +/// default rpc timeout +#[must_use] +#[inline] +pub const fn default_rpc_timeout() -> Duration { + Duration::from_millis(50) +} + +/// default batch timeout +#[must_use] +#[inline] +pub const fn default_batch_timeout() -> Duration { + Duration::from_millis(15) +} + +/// default batch timeout +#[must_use] +#[inline] +#[allow(clippy::arithmetic_side_effects)] +pub const fn default_batch_max_size() -> u64 { + 2 * 1024 * 1024 +} + +/// default follower timeout +#[must_use] +#[inline] +pub const fn default_follower_timeout_ticks() -> u8 { + 5 +} + +/// default candidate timeout ticks +#[must_use] +#[inline] +pub const fn default_candidate_timeout_ticks() -> u8 { + 2 +} + +/// default number of execute workers +#[must_use] +#[inline] +pub const fn default_cmd_workers() -> u8 { + 8 +} + +/// default gc interval +#[must_use] +#[inline] +pub const fn default_gc_interval() -> Duration { + Duration::from_secs(20) +} + +/// default number of log entries to keep in memory +#[must_use] +#[inline] +pub const fn default_log_entries_cap() -> usize { + 5000 +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} + +/// batch size deserialization formatter +pub mod bytes_format { + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_batch_bytes; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_batch_bytes(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/engine_config.rs b/crates/utils/src/config/engine_config.rs new file mode 100644 index 000000000..12a8a5057 --- /dev/null +++ b/crates/utils/src/config/engine_config.rs @@ -0,0 +1,26 @@ +use std::path::PathBuf; + +use serde::Deserialize; + +/// Engine Configuration +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde( + tag = "type", + content = "data_dir", + rename_all(deserialize = "lowercase") +)] +pub enum EngineConfig { + /// Memory Storage Engine + Memory, + /// RocksDB Storage Engine + RocksDB(PathBuf), +} + +impl Default for EngineConfig { + #[inline] + fn default() -> Self { + Self::Memory + } +} diff --git a/crates/utils/src/config/log_config.rs b/crates/utils/src/config/log_config.rs new file mode 100644 index 000000000..d555739ac --- /dev/null +++ b/crates/utils/src/config/log_config.rs @@ -0,0 +1,150 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; +use tracing_appender::rolling::RollingFileAppender; + +/// Log verbosity level alias +#[allow(clippy::module_name_repetitions)] +pub type LevelConfig = tracing::metadata::LevelFilter; + +/// Log configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct LogConfig { + /// Log file path + #[getset(get = "pub")] + #[serde(default)] + path: Option, + /// Log rotation strategy + #[getset(get = "pub")] + #[serde(with = "rotation_format", default = "default_rotation")] + rotation: RotationConfig, + /// Log verbosity level + #[getset(get = "pub")] + #[serde(with = "level_format", default = "default_log_level")] + level: LevelConfig, +} + +impl Default for LogConfig { + #[inline] + fn default() -> Self { + Self { + path: None, + rotation: default_rotation(), + level: default_log_level(), + } + } +} + +/// `LevelConfig` deserialization formatter +pub mod level_format { + use serde::{self, Deserialize, Deserializer}; + + use super::LevelConfig; + use crate::parse_log_level; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_log_level(&s).map_err(serde::de::Error::custom) + } +} + +/// default log level +#[must_use] +#[inline] +pub const fn default_log_level() -> LevelConfig { + LevelConfig::INFO +} + +impl LogConfig { + /// Generate a new `LogConfig` object + #[must_use] + #[inline] + pub fn new(path: PathBuf, rotation: RotationConfig, level: LevelConfig) -> Self { + Self { + path: Some(path), + rotation, + level, + } + } +} + +/// Xline log rotation strategy +#[non_exhaustive] +#[allow(clippy::module_name_repetitions)] +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq)] +#[serde(rename_all(deserialize = "lowercase"))] +pub enum RotationConfig { + /// Rotate log file in every hour + Hourly, + /// Rotate log file every day + Daily, + /// Never rotate log file + Never, +} + +impl std::fmt::Display for RotationConfig { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + RotationConfig::Hourly => write!(f, "hourly"), + RotationConfig::Daily => write!(f, "daily"), + RotationConfig::Never => write!(f, "never"), + } + } +} + +/// `RotationConfig` deserialization formatter +pub mod rotation_format { + use serde::{self, Deserialize, Deserializer}; + + use super::RotationConfig; + use crate::parse_rotation; + + /// deserializes a cluster log rotation strategy + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_rotation(&s).map_err(serde::de::Error::custom) + } +} + +/// default log rotation strategy +#[must_use] +#[inline] +pub const fn default_rotation() -> RotationConfig { + RotationConfig::Daily +} + +/// Generates a `RollingFileAppender` from the given `RotationConfig` and `name` +#[must_use] +#[inline] +pub fn file_appender( + rotation: RotationConfig, + file_path: &PathBuf, + name: &str, +) -> RollingFileAppender { + match rotation { + RotationConfig::Hourly => { + tracing_appender::rolling::hourly(file_path, format!("xline_{name}.log")) + } + RotationConfig::Daily => { + tracing_appender::rolling::daily(file_path, format!("xline_{name}.log")) + } + RotationConfig::Never => { + tracing_appender::rolling::never(file_path, format!("xline_{name}.log")) + } + #[allow(unreachable_patterns)] + // It's ok because `parse_rotation` have check the validity before. + _ => unreachable!("should not call file_appender when parse_rotation failed"), + } +} diff --git a/crates/utils/src/config/metrics_config.rs b/crates/utils/src/config/metrics_config.rs new file mode 100644 index 000000000..9da6517c7 --- /dev/null +++ b/crates/utils/src/config/metrics_config.rs @@ -0,0 +1,151 @@ +use getset::Getters; +use serde::Deserialize; + +/// Xline metrics configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct MetricsConfig { + /// Enable or not + #[getset(get = "pub")] + #[serde(default = "default_metrics_enable")] + pub enable: bool, + /// The http port to expose + #[getset(get = "pub")] + #[serde(default = "default_metrics_port")] + pub port: u16, + /// The http path to expose + #[getset(get = "pub")] + #[serde(default = "default_metrics_path")] + pub path: String, + /// Enable push or not + #[getset(get = "pub")] + #[serde(default = "default_metrics_push")] + pub push: bool, + /// Push endpoint + #[getset(get = "pub")] + #[serde(default = "default_metrics_push_endpoint")] + pub push_endpoint: String, + /// Push protocol + #[getset(get = "pub")] + #[serde(with = "protocol_format", default = "default_metrics_push_protocol")] + pub push_protocol: MetricsPushProtocol, +} + +impl MetricsConfig { + /// Create a new `MetricsConfig` + #[must_use] + #[inline] + pub fn new( + enable: bool, + port: u16, + path: String, + push: bool, + push_endpoint: String, + push_protocol: MetricsPushProtocol, + ) -> Self { + Self { + enable, + port, + path, + push, + push_endpoint, + push_protocol, + } + } +} + +impl Default for MetricsConfig { + #[inline] + fn default() -> Self { + Self { + enable: default_metrics_enable(), + port: default_metrics_port(), + path: default_metrics_path(), + push: default_metrics_push(), + push_endpoint: default_metrics_push_endpoint(), + push_protocol: default_metrics_push_protocol(), + } + } +} + +/// Default metrics enable +#[must_use] +#[inline] +pub const fn default_metrics_enable() -> bool { + true +} + +/// Default metrics port +#[must_use] +#[inline] +pub const fn default_metrics_port() -> u16 { + 9100 +} + +/// Default metrics path +#[must_use] +#[inline] +pub fn default_metrics_path() -> String { + "/metrics".to_owned() +} + +/// Default metrics push option +#[must_use] +#[inline] +pub fn default_metrics_push() -> bool { + false +} + +/// Default metrics push protocol +#[must_use] +#[inline] +pub fn default_metrics_push_protocol() -> MetricsPushProtocol { + MetricsPushProtocol::GRPC +} + +/// Default metrics push endpoint +#[must_use] +#[inline] +pub fn default_metrics_push_endpoint() -> String { + "http://127.0.0.1:4318".to_owned() +} + +/// Xline metrics push protocol +#[non_exhaustive] +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Default)] +#[serde(rename_all(deserialize = "lowercase"))] +pub enum MetricsPushProtocol { + /// HTTP protocol + HTTP, + /// GRPC protocol + #[default] + GRPC, +} + +impl std::fmt::Display for MetricsPushProtocol { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match *self { + MetricsPushProtocol::HTTP => write!(f, "http"), + MetricsPushProtocol::GRPC => write!(f, "grpc"), + } + } +} + +/// Metrics push protocol format +pub mod protocol_format { + use serde::{self, Deserialize, Deserializer}; + + use super::MetricsPushProtocol; + use crate::parse_metrics_push_protocol; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_metrics_push_protocol(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/mod.rs b/crates/utils/src/config/mod.rs new file mode 100644 index 000000000..4a1fc8152 --- /dev/null +++ b/crates/utils/src/config/mod.rs @@ -0,0 +1,445 @@ +/// Xline auth configuration module +pub mod auth_config; +/// Curp client module +pub mod client_config; +/// Cluster configuration module +pub mod cluster_config; +/// Compaction configuration module +pub mod compact_config; +/// Curp server module +pub mod curp_config; +/// Engine Configuration module +pub mod engine_config; +/// Log configuration module +pub mod log_config; +/// Xline metrics configuration module +pub mod metrics_config; +/// Xline server module +pub mod server_config; +/// Storage Configuration module +pub mod storage_config; +/// Xline tls configuration module +pub mod tls_config; +/// Xline tracing configuration module +pub mod trace_config; + +use getset::Getters; +use serde::Deserialize; + +use crate::config::cluster_config::ClusterConfig; +use crate::config::compact_config::CompactConfig; +use crate::config::log_config::LogConfig; +use crate::config::metrics_config::MetricsConfig; +use crate::config::storage_config::StorageConfig; +use crate::config::trace_config::TraceConfig; + +use crate::config::{auth_config::AuthConfig, tls_config::TlsConfig}; + +/// Xline server configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct XlineServerConfig { + /// cluster configuration object + #[getset(get = "pub")] + cluster: ClusterConfig, + /// xline storage configuration object + #[getset(get = "pub")] + storage: StorageConfig, + /// log configuration object + #[getset(get = "pub")] + log: LogConfig, + /// trace configuration object + #[getset(get = "pub")] + trace: TraceConfig, + /// auth configuration object + #[getset(get = "pub")] + auth: AuthConfig, + /// compactor configuration object + #[getset(get = "pub")] + compact: CompactConfig, + /// tls configuration object + #[getset(get = "pub")] + tls: TlsConfig, + /// Metrics config + #[getset(get = "pub")] + #[serde(default = "MetricsConfig::default")] + metrics: MetricsConfig, +} + +impl XlineServerConfig { + /// Generates a new `XlineServerConfig` object + #[must_use] + #[inline] + #[allow(clippy::too_many_arguments)] + pub fn new( + cluster: ClusterConfig, + storage: StorageConfig, + log: LogConfig, + trace: TraceConfig, + auth: AuthConfig, + compact: CompactConfig, + tls: TlsConfig, + metrics: MetricsConfig, + ) -> Self { + Self { + cluster, + storage, + log, + trace, + auth, + compact, + tls, + metrics, + } + } +} + +#[cfg(test)] +mod tests { + use crate::{ + config::{ + client_config::{ + default_client_wait_synced_timeout, default_fixed_backoff, default_propose_timeout, + default_retry_count, ClientConfig, + }, + compact_config::AutoCompactConfig, + curp_config::CurpConfigBuilder, + engine_config::EngineConfig, + server_config::ServerTimeout, + storage_config::default_quota, + }, + InitialClusterState, LevelConfig, MetricsPushProtocol, RotationConfig, + }; + + use super::*; + use std::{collections::HashMap, path::PathBuf, time::Duration}; + + #[allow(clippy::too_many_lines)] // just a testcase, not too bad + #[test] + fn test_xline_server_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + initial_cluster_state = 'new' + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.server_timeout] + range_retry_timeout = '3s' + compact_timeout = '5s' + sync_victims_interval = '20ms' + watch_progress_notify_interval = '1s' + + [cluster.peers] + node1 = ['127.0.0.1:2378', '127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.curp_config] + heartbeat_interval = '200ms' + wait_synced_timeout = '100ms' + rpc_timeout = '100ms' + retry_timeout = '100ms' + + [cluster.client_config] + initial_retry_timeout = '5s' + max_retry_timeout = '50s' + + [storage] + engine = { type = 'memory'} + + [compact] + compact_batch_size = 123 + compact_sleep_interval = '5ms' + + [compact.auto_compact_config] + mode = 'periodic' + retention = '10h' + + [log] + path = '/var/log/xline' + rotation = 'daily' + level = 'info' + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + auth_public_key = './public_key.pem' + auth_private_key = './private_key.pem' + + [tls] + server_cert_path = './cert.pem' + server_key_path = './key.pem' + client_ca_cert_path = './ca.pem' + + [metrics] + enable = true + port = 9100 + path = "/metrics" + push = true + push_endpoint = 'http://some-endpoint.com:4396' + push_protocol = 'http' + "#, + ) + .unwrap(); + + let curp_config = CurpConfigBuilder::default() + .heartbeat_interval(Duration::from_millis(200)) + .wait_synced_timeout(Duration::from_millis(100)) + .rpc_timeout(Duration::from_millis(100)) + .build() + .unwrap(); + + let client_config = ClientConfig::new( + default_client_wait_synced_timeout(), + default_propose_timeout(), + Duration::from_secs(5), + Duration::from_secs(50), + default_retry_count(), + default_fixed_backoff(), + ); + + let server_timeout = ServerTimeout::new( + Duration::from_secs(3), + Duration::from_secs(5), + Duration::from_millis(20), + Duration::from_secs(1), + ); + + assert_eq!( + config.cluster, + ClusterConfig::new( + "node1".to_owned(), + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + HashMap::from_iter([ + ( + "node1".to_owned(), + vec!["127.0.0.1:2378".to_owned(), "127.0.0.1:2379".to_owned()] + ), + ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), + ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), + ]), + true, + curp_config, + client_config, + server_timeout, + InitialClusterState::New + ) + ); + + assert_eq!( + config.storage, + StorageConfig::new(EngineConfig::Memory, default_quota()) + ); + + assert_eq!( + config.log, + LogConfig::new( + PathBuf::from("/var/log/xline"), + RotationConfig::Daily, + LevelConfig::INFO + ) + ); + assert_eq!( + config.trace, + TraceConfig::new( + false, + false, + PathBuf::from("./jaeger_jsons"), + LevelConfig::INFO + ) + ); + + assert_eq!( + config.compact, + CompactConfig::new( + 123, + Duration::from_millis(5), + Some(AutoCompactConfig::Periodic(Duration::from_secs( + 10 * 60 * 60 + ))) + ) + ); + + assert_eq!( + config.auth, + AuthConfig { + auth_private_key: Some(PathBuf::from("./private_key.pem")), + auth_public_key: Some(PathBuf::from("./public_key.pem")), + } + ); + + assert_eq!( + config.tls, + TlsConfig { + server_cert_path: Some(PathBuf::from("./cert.pem")), + server_key_path: Some(PathBuf::from("./key.pem")), + client_ca_cert_path: Some(PathBuf::from("./ca.pem")), + ..Default::default() + } + ); + + assert_eq!( + config.metrics, + MetricsConfig { + enable: true, + port: 9100, + path: "/metrics".to_owned(), + push: true, + push_endpoint: "http://some-endpoint.com:4396".to_owned(), + push_protocol: MetricsPushProtocol::HTTP, + }, + ); + } + + #[test] + fn test_xline_server_default_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.peers] + node1 = ['127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.storage] + + [log] + path = '/var/log/xline' + + [storage] + engine = { type = 'rocksdb', data_dir = '/usr/local/xline/data-dir' } + + [compact] + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + + [tls] + "#, + ) + .unwrap(); + + assert_eq!( + config.cluster, + ClusterConfig::new( + "node1".to_owned(), + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2380".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + vec!["127.0.0.1:2379".to_owned()], + HashMap::from([ + ("node1".to_owned(), vec!["127.0.0.1:2379".to_owned()]), + ("node2".to_owned(), vec!["127.0.0.1:2380".to_owned()]), + ("node3".to_owned(), vec!["127.0.0.1:2381".to_owned()]), + ]), + true, + CurpConfigBuilder::default().build().unwrap(), + ClientConfig::default(), + ServerTimeout::default(), + InitialClusterState::default() + ) + ); + + if let EngineConfig::RocksDB(path) = config.storage.engine { + assert_eq!(path, PathBuf::from("/usr/local/xline/data-dir")); + } else { + unreachable!(); + } + + assert_eq!( + config.log, + LogConfig::new( + PathBuf::from("/var/log/xline"), + RotationConfig::Daily, + LevelConfig::INFO + ) + ); + assert_eq!( + config.trace, + TraceConfig::new( + false, + false, + PathBuf::from("./jaeger_jsons"), + LevelConfig::INFO + ) + ); + assert_eq!(config.compact, CompactConfig::default()); + assert_eq!(config.auth, AuthConfig::default()); + assert_eq!(config.tls, TlsConfig::default()); + assert_eq!(config.metrics, MetricsConfig::default()); + } + + #[test] + fn test_auto_revision_compactor_config_should_be_loaded() { + let config: XlineServerConfig = toml::from_str( + r#"[cluster] + name = 'node1' + is_leader = true + peer_listen_urls = ['127.0.0.1:2380'] + peer_advertise_urls = ['127.0.0.1:2380'] + client_listen_urls = ['127.0.0.1:2379'] + client_advertise_urls = ['127.0.0.1:2379'] + + [cluster.peers] + node1 = ['127.0.0.1:2379'] + node2 = ['127.0.0.1:2380'] + node3 = ['127.0.0.1:2381'] + + [cluster.storage] + + [log] + path = '/var/log/xline' + + [storage] + engine = { type = 'memory' } + + [compact] + + [compact.auto_compact_config] + mode = 'revision' + retention = 10000 + + [trace] + jaeger_online = false + jaeger_offline = false + jaeger_output_dir = './jaeger_jsons' + jaeger_level = 'info' + + [auth] + + [tls] + "#, + ) + .unwrap(); + + assert_eq!( + config.compact, + CompactConfig { + auto_compact_config: Some(AutoCompactConfig::Revision(10000)), + ..Default::default() + } + ); + } +} diff --git a/crates/utils/src/config/server_config.rs b/crates/utils/src/config/server_config.rs new file mode 100644 index 000000000..578e8c876 --- /dev/null +++ b/crates/utils/src/config/server_config.rs @@ -0,0 +1,106 @@ +use std::time::Duration; + +use getset::Getters; +use serde::Deserialize; + +/// Xline server settings +#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct ServerTimeout { + /// Range request retry timeout settings + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_range_retry_timeout")] + range_retry_timeout: Duration, + /// Range request retry timeout settings + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_compact_timeout")] + compact_timeout: Duration, + /// Sync victims interval + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_sync_victims_interval")] + sync_victims_interval: Duration, + /// Watch progress notify interval settings + #[getset(get = "pub")] + #[serde( + with = "duration_format", + default = "default_watch_progress_notify_interval" + )] + watch_progress_notify_interval: Duration, +} + +impl ServerTimeout { + /// Create a new server timeout + #[must_use] + #[inline] + pub fn new( + range_retry_timeout: Duration, + compact_timeout: Duration, + sync_victims_interval: Duration, + watch_progress_notify_interval: Duration, + ) -> Self { + Self { + range_retry_timeout, + compact_timeout, + sync_victims_interval, + watch_progress_notify_interval, + } + } +} + +impl Default for ServerTimeout { + #[inline] + fn default() -> Self { + Self { + range_retry_timeout: default_range_retry_timeout(), + compact_timeout: default_compact_timeout(), + sync_victims_interval: default_sync_victims_interval(), + watch_progress_notify_interval: default_watch_progress_notify_interval(), + } + } +} + +/// default range retry timeout +#[must_use] +#[inline] +pub const fn default_range_retry_timeout() -> Duration { + Duration::from_secs(2) +} + +/// default compact timeout +#[must_use] +#[inline] +pub const fn default_compact_timeout() -> Duration { + Duration::from_secs(5) +} + +/// default sync victims interval +#[must_use] +#[inline] +pub const fn default_sync_victims_interval() -> Duration { + Duration::from_millis(10) +} + +/// default watch progress notify interval +#[must_use] +#[inline] +pub const fn default_watch_progress_notify_interval() -> Duration { + Duration::from_secs(600) +} + +/// `Duration` deserialization formatter +pub mod duration_format { + use std::time::Duration; + + use serde::{self, Deserialize, Deserializer}; + + use crate::parse_duration; + + /// deserializes a cluster duration + #[allow(single_use_lifetimes)] // the false positive case blocks us + pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + parse_duration(&s).map_err(serde::de::Error::custom) + } +} diff --git a/crates/utils/src/config/storage_config.rs b/crates/utils/src/config/storage_config.rs new file mode 100644 index 000000000..74442d565 --- /dev/null +++ b/crates/utils/src/config/storage_config.rs @@ -0,0 +1,43 @@ +use serde::Deserialize; + +use crate::config::engine_config::EngineConfig; + +/// Storage Configuration +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +pub struct StorageConfig { + /// Engine Configuration + #[serde(default = "EngineConfig::default")] + pub engine: EngineConfig, + /// Quota + #[serde(default = "default_quota")] + pub quota: u64, +} + +impl StorageConfig { + /// Create a new storage config + #[inline] + #[must_use] + pub fn new(engine: EngineConfig, quota: u64) -> Self { + Self { engine, quota } + } +} + +impl Default for StorageConfig { + #[inline] + fn default() -> Self { + Self { + engine: EngineConfig::default(), + quota: default_quota(), + } + } +} + +/// Default quota: 8GB +#[inline] +#[must_use] +pub fn default_quota() -> u64 { + // 8 * 1024 * 1024 * 1024 + 0x0002_0000_0000 +} diff --git a/crates/utils/src/config/tls_config.rs b/crates/utils/src/config/tls_config.rs new file mode 100644 index 000000000..99981a1cc --- /dev/null +++ b/crates/utils/src/config/tls_config.rs @@ -0,0 +1,52 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +/// Xline tls configuration object +#[allow(clippy::module_name_repetitions)] +#[non_exhaustive] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Default)] +pub struct TlsConfig { + /// The CA certificate file used by server to verify client certificates + #[getset(get = "pub")] + pub server_ca_cert_path: Option, + /// The public key file used by server + #[getset(get = "pub")] + pub server_cert_path: Option, + /// The private key file used by server + #[getset(get = "pub")] + pub server_key_path: Option, + /// The CA certificate file used by client to verify server certificates + #[getset(get = "pub")] + pub client_ca_cert_path: Option, + /// The public key file used by client + #[getset(get = "pub")] + pub client_cert_path: Option, + /// The private key file used by client + #[getset(get = "pub")] + pub client_key_path: Option, +} + +impl TlsConfig { + /// Create a new `TlsConfig` object + #[must_use] + #[inline] + pub fn new( + server_ca_cert_path: Option, + server_cert_path: Option, + server_key_path: Option, + client_ca_cert_path: Option, + client_cert_path: Option, + client_key_path: Option, + ) -> Self { + Self { + server_ca_cert_path, + server_cert_path, + server_key_path, + client_ca_cert_path, + client_cert_path, + client_key_path, + } + } +} diff --git a/crates/utils/src/config/trace_config.rs b/crates/utils/src/config/trace_config.rs new file mode 100644 index 000000000..866941935 --- /dev/null +++ b/crates/utils/src/config/trace_config.rs @@ -0,0 +1,56 @@ +use std::path::PathBuf; + +use getset::Getters; +use serde::Deserialize; + +use super::log_config::{default_log_level, level_format, LevelConfig}; + +/// Xline tracing configuration object +#[allow(clippy::module_name_repetitions)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters)] +pub struct TraceConfig { + /// Open jaeger online, sending data to jaeger agent directly + #[getset(get = "pub")] + jaeger_online: bool, + /// Open jaeger offline, saving data to the `jaeger_output_dir` + #[getset(get = "pub")] + jaeger_offline: bool, + /// The dir path to save the data when `jaeger_offline` is on + #[getset(get = "pub")] + jaeger_output_dir: PathBuf, + /// The verbosity level of tracing + #[getset(get = "pub")] + #[serde(with = "level_format", default = "default_log_level")] + jaeger_level: LevelConfig, +} + +impl Default for TraceConfig { + #[inline] + fn default() -> Self { + Self { + jaeger_online: false, + jaeger_offline: false, + jaeger_output_dir: "".into(), + jaeger_level: default_log_level(), + } + } +} + +impl TraceConfig { + /// Generate a new `TraceConfig` object + #[must_use] + #[inline] + pub fn new( + jaeger_online: bool, + jaeger_offline: bool, + jaeger_output_dir: PathBuf, + jaeger_level: LevelConfig, + ) -> Self { + Self { + jaeger_online, + jaeger_offline, + jaeger_output_dir, + jaeger_level, + } + } +} diff --git a/crates/utils/src/parser.rs b/crates/utils/src/parser.rs index ba7186074..5442066a7 100644 --- a/crates/utils/src/parser.rs +++ b/crates/utils/src/parser.rs @@ -3,8 +3,10 @@ use std::{collections::HashMap, time::Duration}; use clippy_utilities::OverflowArithmetic; use thiserror::Error; -use crate::config::{ - ClusterRange, InitialClusterState, LevelConfig, MetricsPushProtocol, RotationConfig, +pub use crate::config::{ + cluster_config::{ClusterRange, InitialClusterState}, + log_config::{LevelConfig, RotationConfig}, + metrics_config::MetricsPushProtocol, }; /// seconds per minute diff --git a/crates/xline-client/src/lib.rs b/crates/xline-client/src/lib.rs index e1bf11c5b..940751fa1 100644 --- a/crates/xline-client/src/lib.rs +++ b/crates/xline-client/src/lib.rs @@ -167,7 +167,7 @@ use tonic::transport::ClientTlsConfig; use tower::Service; #[cfg(madsim)] use utils::ClientTlsConfig; -use utils::{build_endpoint, config::ClientConfig}; +use utils::{build_endpoint, config::client_config::ClientConfig}; use xlineapi::command::{Command, CurpClient}; use crate::{ diff --git a/crates/xline-test-utils/src/lib.rs b/crates/xline-test-utils/src/lib.rs index 52a3517d5..8e4c4cc6d 100644 --- a/crates/xline-test-utils/src/lib.rs +++ b/crates/xline-test-utils/src/lib.rs @@ -8,8 +8,10 @@ use tokio::{ time::{self, Duration}, }; use utils::config::{ - default_quota, AuthConfig, ClusterConfig, CompactConfig, EngineConfig, InitialClusterState, - LogConfig, MetricsConfig, StorageConfig, TlsConfig, TraceConfig, XlineServerConfig, + auth_config::AuthConfig, cluster_config::ClusterConfig, cluster_config::InitialClusterState, + compact_config::CompactConfig, engine_config::EngineConfig, log_config::LogConfig, + metrics_config::MetricsConfig, storage_config::default_quota, storage_config::StorageConfig, + tls_config::TlsConfig, trace_config::TraceConfig, XlineServerConfig, }; use xline::server::XlineServer; pub use xline_client::{types, Client, ClientOptions}; diff --git a/crates/xline/src/server/maintenance.rs b/crates/xline/src/server/maintenance.rs index 17741e8ea..95e891002 100644 --- a/crates/xline/src/server/maintenance.rs +++ b/crates/xline/src/server/maintenance.rs @@ -291,7 +291,7 @@ mod test { use test_macros::abort_on_panic; use tokio_stream::StreamExt; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use super::*; use crate::storage::db::DB; diff --git a/crates/xline/src/server/watch_server.rs b/crates/xline/src/server/watch_server.rs index 2b071aa62..6dcc8ef01 100644 --- a/crates/xline/src/server/watch_server.rs +++ b/crates/xline/src/server/watch_server.rs @@ -435,7 +435,9 @@ mod test { sync::mpsc, time::{sleep, timeout}, }; - use utils::config::{default_watch_progress_notify_interval, EngineConfig}; + use utils::config::{ + engine_config::EngineConfig, server_config::default_watch_progress_notify_interval, + }; use xlineapi::RequestWrapper; use super::*; diff --git a/crates/xline/src/server/xline_server.rs b/crates/xline/src/server/xline_server.rs index a6277d517..2bd1a316d 100644 --- a/crates/xline/src/server/xline_server.rs +++ b/crates/xline/src/server/xline_server.rs @@ -24,8 +24,9 @@ use tonic::transport::{server::Router, Server}; use tracing::{info, warn}; use utils::{ config::{ - AuthConfig, ClusterConfig, CompactConfig, EngineConfig, InitialClusterState, StorageConfig, - TlsConfig, + auth_config::AuthConfig, cluster_config::ClusterConfig, + cluster_config::InitialClusterState, compact_config::CompactConfig, + engine_config::EngineConfig, storage_config::StorageConfig, tls_config::TlsConfig, }, task_manager::{tasks::TaskName, TaskManager}, }; diff --git a/crates/xline/src/storage/auth_store/store.rs b/crates/xline/src/storage/auth_store/store.rs index 4c6e6b848..35d0da78f 100644 --- a/crates/xline/src/storage/auth_store/store.rs +++ b/crates/xline/src/storage/auth_store/store.rs @@ -1145,7 +1145,7 @@ mod test { use std::collections::HashMap; use merged_range::MergedRange; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use super::*; use crate::{ diff --git a/crates/xline/src/storage/compact/mod.rs b/crates/xline/src/storage/compact/mod.rs index 772a76088..9fde5e8df 100644 --- a/crates/xline/src/storage/compact/mod.rs +++ b/crates/xline/src/storage/compact/mod.rs @@ -7,7 +7,7 @@ use periodic_compactor::PeriodicCompactor; use revision_compactor::RevisionCompactor; use tokio::{sync::mpsc::Receiver, time::sleep}; use utils::{ - config::AutoCompactConfig, + config::compact_config::AutoCompactConfig, task_manager::{tasks::TaskName, Listener, TaskManager}, }; use xlineapi::{command::Command, execute_error::ExecuteError, RequestWrapper}; diff --git a/crates/xline/src/storage/db.rs b/crates/xline/src/storage/db.rs index 9eaf04aae..ceeb9b27d 100644 --- a/crates/xline/src/storage/db.rs +++ b/crates/xline/src/storage/db.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, path::Path, sync::Arc}; use engine::{Engine, EngineType, Snapshot, StorageEngine, WriteOperation}; use prost::Message; use utils::{ - config::EngineConfig, + config::engine_config::EngineConfig, table_names::{ ALARM_TABLE, AUTH_TABLE, KV_TABLE, LEASE_TABLE, META_TABLE, ROLE_TABLE, USER_TABLE, XLINE_TABLES, diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index d3029aa0d..a4969ad20 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -931,7 +931,7 @@ mod test { use test_macros::abort_on_panic; use tokio::{runtime::Handle, task::block_in_place}; use utils::{ - config::EngineConfig, + config::engine_config::EngineConfig, task_manager::{tasks::TaskName, TaskManager}, }; diff --git a/crates/xline/src/storage/kvwatcher.rs b/crates/xline/src/storage/kvwatcher.rs index 174ee0a9e..17aa0fff1 100644 --- a/crates/xline/src/storage/kvwatcher.rs +++ b/crates/xline/src/storage/kvwatcher.rs @@ -593,7 +593,7 @@ mod test { use clippy_utilities::Cast; use test_macros::abort_on_panic; use tokio::time::{sleep, timeout}; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use xlineapi::RequestWrapper; use super::*; diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index efedf8e27..3414c2596 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -373,7 +373,7 @@ mod test { use std::{error::Error, time::Duration}; use test_macros::abort_on_panic; - use utils::config::EngineConfig; + use utils::config::engine_config::EngineConfig; use super::*; use crate::storage::db::DB; diff --git a/crates/xline/src/utils/args.rs b/crates/xline/src/utils/args.rs index 98997023f..303a01457 100644 --- a/crates/xline/src/utils/args.rs +++ b/crates/xline/src/utils/args.rs @@ -5,19 +5,28 @@ use clap::Parser; use tokio::fs; use utils::{ config::{ - default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks, - default_client_wait_synced_timeout, default_cmd_workers, default_compact_batch_size, - default_compact_sleep_interval, default_compact_timeout, default_follower_timeout_ticks, - default_gc_interval, default_heartbeat_interval, default_initial_retry_timeout, - default_log_entries_cap, default_log_level, default_max_retry_timeout, - default_metrics_enable, default_metrics_path, default_metrics_port, - default_metrics_push_endpoint, default_metrics_push_protocol, default_propose_timeout, - default_quota, default_range_retry_timeout, default_retry_count, default_rotation, - default_rpc_timeout, default_server_wait_synced_timeout, default_sync_victims_interval, - default_watch_progress_notify_interval, AuthConfig, AutoCompactConfig, ClientConfig, - ClusterConfig, CompactConfig, CurpConfigBuilder, EngineConfig, InitialClusterState, - LevelConfig, LogConfig, MetricsConfig, MetricsPushProtocol, RotationConfig, ServerTimeout, - StorageConfig, TlsConfig, TraceConfig, XlineServerConfig, + auth_config::AuthConfig, client_config::default_client_wait_synced_timeout, + client_config::default_initial_retry_timeout, client_config::default_max_retry_timeout, + client_config::default_propose_timeout, client_config::default_retry_count, + client_config::ClientConfig, cluster_config::ClusterConfig, + cluster_config::InitialClusterState, compact_config::default_compact_batch_size, + compact_config::default_compact_sleep_interval, compact_config::AutoCompactConfig, + compact_config::CompactConfig, curp_config::default_batch_max_size, + curp_config::default_batch_timeout, curp_config::default_candidate_timeout_ticks, + curp_config::default_cmd_workers, curp_config::default_follower_timeout_ticks, + curp_config::default_gc_interval, curp_config::default_heartbeat_interval, + curp_config::default_log_entries_cap, curp_config::default_rpc_timeout, + curp_config::default_server_wait_synced_timeout, curp_config::CurpConfigBuilder, + engine_config::EngineConfig, log_config::default_log_level, log_config::default_rotation, + log_config::LevelConfig, log_config::LogConfig, log_config::RotationConfig, + metrics_config::default_metrics_enable, metrics_config::default_metrics_path, + metrics_config::default_metrics_port, metrics_config::default_metrics_push_endpoint, + metrics_config::default_metrics_push_protocol, metrics_config::MetricsConfig, + metrics_config::MetricsPushProtocol, server_config::default_compact_timeout, + server_config::default_range_retry_timeout, server_config::default_sync_victims_interval, + server_config::default_watch_progress_notify_interval, server_config::ServerTimeout, + storage_config::default_quota, storage_config::StorageConfig, tls_config::TlsConfig, + trace_config::TraceConfig, XlineServerConfig, }, parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_metrics_push_protocol, parse_rotation, parse_state, ConfigFileError, diff --git a/crates/xline/src/utils/metrics.rs b/crates/xline/src/utils/metrics.rs index c06eebfd9..fcb8235a1 100644 --- a/crates/xline/src/utils/metrics.rs +++ b/crates/xline/src/utils/metrics.rs @@ -2,7 +2,7 @@ use opentelemetry::global; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{metrics::MeterProvider, runtime::Tokio}; use tracing::info; -use utils::config::{MetricsConfig, MetricsPushProtocol}; +use utils::config::metrics_config::{MetricsConfig, MetricsPushProtocol}; /// Start metrics server /// # Errors diff --git a/crates/xline/src/utils/trace.rs b/crates/xline/src/utils/trace.rs index 443d72e79..d4c095364 100644 --- a/crates/xline/src/utils/trace.rs +++ b/crates/xline/src/utils/trace.rs @@ -3,7 +3,7 @@ use opentelemetry_contrib::trace::exporter::jaeger_json::JaegerJsonExporter; use opentelemetry_sdk::runtime::Tokio; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt::format, layer::SubscriberExt, util::SubscriberInitExt, Layer}; -use utils::config::{file_appender, LogConfig, TraceConfig}; +use utils::config::{log_config::file_appender, log_config::LogConfig, trace_config::TraceConfig}; /// init tracing subscriber /// # Errors diff --git a/crates/xline/tests/it/auth_test.rs b/crates/xline/tests/it/auth_test.rs index 27804a088..6a91f91e4 100644 --- a/crates/xline/tests/it/auth_test.rs +++ b/crates/xline/tests/it/auth_test.rs @@ -2,8 +2,9 @@ use std::{error::Error, iter, path::PathBuf}; use test_macros::abort_on_panic; use utils::config::{ - AuthConfig, ClusterConfig, CompactConfig, LogConfig, MetricsConfig, StorageConfig, TlsConfig, - TraceConfig, XlineServerConfig, + auth_config::AuthConfig, cluster_config::ClusterConfig, compact_config::CompactConfig, + log_config::LogConfig, metrics_config::MetricsConfig, storage_config::StorageConfig, + tls_config::TlsConfig, trace_config::TraceConfig, XlineServerConfig, }; use xline_test_utils::{ types::{ diff --git a/crates/xlinectl/src/main.rs b/crates/xlinectl/src/main.rs index 1481ba9fc..bf5a957ce 100644 --- a/crates/xlinectl/src/main.rs +++ b/crates/xlinectl/src/main.rs @@ -163,7 +163,7 @@ use std::{path::PathBuf, time::Duration}; use anyhow::Result; use clap::{arg, value_parser, Command}; use command::compaction; -use ext_utils::config::ClientConfig; +use ext_utils::config::client_config::ClientConfig; use tokio::fs; use tonic::transport::{Certificate, ClientTlsConfig}; use xline_client::{Client, ClientOptions};