From ea3179e4b88ed772bf4d0e1aabf53e30ac8ed960 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Mon, 3 Jul 2023 17:15:30 +0800 Subject: [PATCH] feat(compactor): make compact interval and batch_size configurable Refs: #188 Signed-off-by: Phoeniix Zhao --- utils/src/config.rs | 70 ++++++++++++++++++++++++++++++++ xline-test-utils/src/lib.rs | 3 +- xline/src/main.rs | 30 ++++++++++---- xline/src/server/xline_server.rs | 8 +++- xline/src/storage/compact.rs | 11 +++-- 5 files changed, 105 insertions(+), 17 deletions(-) diff --git a/utils/src/config.rs b/utils/src/config.rs index 706d666d1..d721d2e43 100644 --- a/utils/src/config.rs +++ b/utils/src/config.rs @@ -24,6 +24,9 @@ pub struct XlineServerConfig { /// auth configuration object #[getset(get = "pub")] auth: AuthConfig, + /// compactor configuration object + #[getset(get = "pub")] + compact: CompactConfig, } /// Cluster Range type alias @@ -118,6 +121,56 @@ impl ClusterConfig { } } +/// Compaction configuration +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)] +#[allow(clippy::module_name_repetitions)] +pub struct CompactConfig { + /// The max number of historical versions processed in a single compact operation + #[getset(get = "pub")] + #[serde(default = "default_compact_batch_size")] + compact_batch_size: usize, + /// The interval between two compact operations + #[getset(get = "pub")] + #[serde(with = "duration_format", default = "default_compact_interval")] + compact_interval: Duration, +} + +impl Default for CompactConfig { + #[inline] + fn default() -> Self { + Self { + compact_batch_size: default_compact_batch_size(), + compact_interval: default_compact_interval(), + } + } +} + +impl CompactConfig { + /// Create a new compact config + #[must_use] + #[inline] + pub fn new(compact_batch_size: usize, compact_interval: Duration) -> Self { + Self { + compact_batch_size, + compact_interval, + } + } +} + +/// default compact batch size +#[must_use] +#[inline] +pub const fn default_compact_batch_size() -> usize { + 1000 +} + +/// default compact interval +#[must_use] +#[inline] +pub const fn default_compact_interval() -> Duration { + Duration::from_millis(10) +} + /// Curp server timeout settings #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)] #[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)] @@ -647,6 +700,7 @@ impl XlineServerConfig { log: LogConfig, trace: TraceConfig, auth: AuthConfig, + compact: CompactConfig, ) -> Self { Self { cluster, @@ -654,6 +708,7 @@ impl XlineServerConfig { log, trace, auth, + compact, } } } @@ -692,6 +747,10 @@ mod tests { [storage] engine = 'memory' + [compact] + compact_batch_size = 123 + compact_interval = '5ms' + [log] path = '/var/log/xline' rotation = 'daily' @@ -761,6 +820,14 @@ mod tests { LevelConfig::INFO ) ); + + assert_eq!( + config.compact, + CompactConfig { + compact_batch_size: 123, + compact_interval: Duration::from_millis(5) + } + ); } #[allow(clippy::unwrap_used)] @@ -785,6 +852,8 @@ mod tests { engine = 'rocksdb' data_dir = '/usr/local/xline/data-dir' + [compact] + [trace] jaeger_online = false jaeger_offline = false @@ -836,5 +905,6 @@ mod tests { LevelConfig::INFO ) ); + assert_eq!(config.compact, CompactConfig::default()); } } diff --git a/xline-test-utils/src/lib.rs b/xline-test-utils/src/lib.rs index 9a68abc6c..05a16bb58 100644 --- a/xline-test-utils/src/lib.rs +++ b/xline-test-utils/src/lib.rs @@ -12,7 +12,7 @@ use tokio::{ sync::broadcast::{self, Sender}, time::{self, Duration}, }; -use utils::config::{ClientTimeout, CurpConfig, ServerTimeout, StorageConfig}; +use utils::config::{ClientTimeout, CompactConfig, CurpConfig, ServerTimeout, StorageConfig}; use xline::{client::Client, server::XlineServer, storage::db::DB}; /// Cluster @@ -86,6 +86,7 @@ impl Cluster { ClientTimeout::default(), ServerTimeout::default(), StorageConfig::Memory, + CompactConfig::default(), ); let signal = async { let _ = rx.recv().await; diff --git a/xline/src/main.rs b/xline/src/main.rs index cf6eb1495..0403f2fc9 100644 --- a/xline/src/main.rs +++ b/xline/src/main.rs @@ -151,14 +151,14 @@ use tracing_subscriber::{fmt::format, prelude::*}; use utils::{ config::{ default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks, - default_client_wait_synced_timeout, default_cmd_workers, default_follower_timeout_ticks, - default_gc_interval, default_heartbeat_interval, default_log_entries_cap, - default_log_level, default_propose_timeout, default_range_retry_timeout, - default_retry_timeout, default_rotation, default_rpc_timeout, - default_server_wait_synced_timeout, default_sync_victims_interval, - default_watch_progress_notify_interval, file_appender, AuthConfig, ClientTimeout, - ClusterConfig, CurpConfigBuilder, LevelConfig, LogConfig, RotationConfig, ServerTimeout, - StorageConfig, TraceConfig, XlineServerConfig, + default_client_wait_synced_timeout, default_cmd_workers, default_compact_batch_size, + default_compact_interval, default_follower_timeout_ticks, default_gc_interval, + default_heartbeat_interval, default_log_entries_cap, default_log_level, + default_propose_timeout, default_range_retry_timeout, default_retry_timeout, + default_rotation, default_rpc_timeout, default_server_wait_synced_timeout, + default_sync_victims_interval, default_watch_progress_notify_interval, file_appender, + AuthConfig, ClientTimeout, ClusterConfig, CompactConfig, CurpConfigBuilder, LevelConfig, + LogConfig, RotationConfig, ServerTimeout, StorageConfig, TraceConfig, XlineServerConfig, }, parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_rotation, }; @@ -263,6 +263,12 @@ struct ServerArgs { /// Curp command workers count #[clap(long, default_value_t = default_cmd_workers())] cmd_workers: u8, + /// The max number of historical versions processed in a single compact operation [default: 1000] + #[clap(long, default_value_t = default_compact_batch_size())] + compact_batch_size: usize, + /// Interval between two compaction operations [default: 10ms] + #[clap(long, value_parser = parse_duration)] + compact_interval: Option, } impl From for XlineServerConfig { @@ -329,7 +335,12 @@ impl From for XlineServerConfig { args.jaeger_level, ); let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key); - XlineServerConfig::new(cluster, storage, log, trace, auth) + let compact = CompactConfig::new( + args.compact_batch_size, + args.compact_interval + .unwrap_or_else(default_compact_interval), + ); + XlineServerConfig::new(cluster, storage, log, trace, auth, compact) } } @@ -480,6 +491,7 @@ async fn main() -> Result<()> { *cluster_config.client_timeout(), *cluster_config.server_timeout(), config.storage().clone(), + *config.compact(), ); debug!("{:?}", server); server.start(self_addr, db_proxy, key_pair).await?; diff --git a/xline/src/server/xline_server.rs b/xline/src/server/xline_server.rs index ade3d9d76..a0f40c883 100644 --- a/xline/src/server/xline_server.rs +++ b/xline/src/server/xline_server.rs @@ -9,7 +9,7 @@ use tokio::{net::TcpListener, sync::mpsc::unbounded_channel}; use tokio_stream::wrappers::TcpListenerStream; use tonic::transport::Server; use tonic_health::ServingStatus; -use utils::config::{ClientTimeout, CurpConfig, ServerTimeout, StorageConfig}; +use utils::config::{ClientTimeout, CompactConfig, CurpConfig, ServerTimeout, StorageConfig}; use super::{ auth_server::AuthServer, @@ -58,6 +58,8 @@ pub struct XlineServer { client_timeout: ClientTimeout, /// Storage config, storage_cfg: StorageConfig, + /// Compact config + compact_cfg: CompactConfig, /// Server timeout server_timeout: ServerTimeout, /// Shutdown trigger @@ -79,6 +81,7 @@ impl XlineServer { client_timeout: ClientTimeout, server_timeout: ServerTimeout, storage_config: StorageConfig, + compact_config: CompactConfig, ) -> Self { Self { cluster_info, @@ -86,6 +89,7 @@ impl XlineServer { curp_cfg: Arc::new(curp_config), client_timeout, storage_cfg: storage_config, + compact_cfg: compact_config, server_timeout, shutdown_trigger: Arc::new(Event::new()), } @@ -135,6 +139,8 @@ impl XlineServer { let _hd = tokio::spawn(compactor( Arc::clone(&kv_storage), Arc::clone(&index), + *self.compact_cfg.compact_batch_size(), + *self.compact_cfg.compact_interval(), compact_task_rx, )); // TODO: Boot up the compact policy scheduler diff --git a/xline/src/storage/compact.rs b/xline/src/storage/compact.rs index 4b34bef47..ae14fc263 100644 --- a/xline/src/storage/compact.rs +++ b/xline/src/storage/compact.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use event_listener::Event; use tokio::{sync::mpsc::UnboundedReceiver, time::sleep}; @@ -13,24 +13,23 @@ use super::{ pub(crate) async fn compactor( kv_store: Arc>, index: Arc, + batch_limit: usize, + interval: Duration, mut compact_task_rx: UnboundedReceiver<(i64, Option>)>, ) where DB: StorageApi, { - // TODO: make compact_interval and compact_batch_limit configurable - let compact_interval = std::time::Duration::from_millis(10); - let compact_batch_limit = 1000; while let Some((revision, listener)) = compact_task_rx.recv().await { let target_revisions = index .compact(revision) .into_iter() .map(|key_rev| key_rev.as_revision().encode_to_vec()) .collect::>>(); - for revision_chunk in target_revisions.chunks(compact_batch_limit) { + for revision_chunk in target_revisions.chunks(batch_limit) { if let Err(e) = kv_store.compact(revision_chunk) { panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); } - sleep(compact_interval).await; + sleep(interval).await; } if let Some(notifier) = listener { notifier.notify(usize::MAX);