Skip to content

Commit

Permalink
feat(compactor): make compact interval and batch_size configurable
Browse files Browse the repository at this point in the history
Refs: #188
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Jul 3, 2023
1 parent 0a0000e commit fcf9406
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 17 deletions.
70 changes: 70 additions & 0 deletions utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub struct XlineServerConfig {
/// auth configuration object
#[getset(get = "pub")]
auth: AuthConfig,
/// compactor configuration object
#[getset(get = "pub")]
compact: CompactConfig,
}

/// Cluster Range type alias
Expand Down Expand Up @@ -118,6 +121,56 @@ impl ClusterConfig {
}
}

/// Compaction configuration
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)]
#[allow(clippy::module_name_repetitions)]
pub struct CompactConfig {
/// The max number of historical versions processed in a single compact operation
#[getset(get = "pub")]
#[serde(default = "default_compact_batch_size")]
compact_batch_size: usize,
/// The interval between two compact operations
#[getset(get = "pub")]
#[serde(with = "duration_format", default = "default_compact_interval")]
compact_interval: Duration,
}

impl Default for CompactConfig {
#[inline]
fn default() -> Self {
Self {
compact_batch_size: default_compact_batch_size(),
compact_interval: default_compact_interval(),
}
}
}

impl CompactConfig {
/// Create a new compact config
#[must_use]
#[inline]
pub fn new(compact_batch_size: usize, compact_interval: Duration) -> Self {
Self {
compact_batch_size,
compact_interval,
}
}
}

/// default compact batch size
#[must_use]
#[inline]
pub const fn default_compact_batch_size() -> usize {
1000
}

/// default compact interval
#[must_use]
#[inline]
pub const fn default_compact_interval() -> Duration {
Duration::from_millis(10)
}

/// Curp server timeout settings
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)]
#[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)]
Expand Down Expand Up @@ -647,13 +700,15 @@ impl XlineServerConfig {
log: LogConfig,
trace: TraceConfig,
auth: AuthConfig,
compact: CompactConfig,
) -> Self {
Self {
cluster,
storage,
log,
trace,
auth,
compact,
}
}
}
Expand Down Expand Up @@ -692,6 +747,10 @@ mod tests {
[storage]
engine = 'memory'
[compact]
compact_batch_size = 123
compact_interval = '5ms'
[log]
path = '/var/log/xline'
rotation = 'daily'
Expand Down Expand Up @@ -761,6 +820,14 @@ mod tests {
LevelConfig::INFO
)
);

assert_eq!(
config.compact,
CompactConfig {
compact_batch_size: 123,
compact_interval: Duration::from_millis(5)
}
);
}

#[allow(clippy::unwrap_used)]
Expand All @@ -783,6 +850,8 @@ mod tests {
engine = 'rocksdb'
data_dir = '/usr/local/xline/data-dir'
[compact]
[trace]
jaeger_online = false
jaeger_offline = false
Expand Down Expand Up @@ -834,5 +903,6 @@ mod tests {
LevelConfig::INFO
)
);
assert_eq!(config.compact, CompactConfig::default());
}
}
3 changes: 2 additions & 1 deletion xline-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
sync::broadcast::{self, Sender},
time::{self, Duration},
};
use utils::config::{ClientTimeout, CurpConfig, ServerTimeout, StorageConfig};
use utils::config::{ClientTimeout, CompactConfig, CurpConfig, ServerTimeout, StorageConfig};
use xline::{client::Client, server::XlineServer, storage::db::DB};

/// Cluster
Expand Down Expand Up @@ -86,6 +86,7 @@ impl Cluster {
ClientTimeout::default(),
ServerTimeout::default(),
StorageConfig::Memory,
CompactConfig::default(),
);
let signal = async {
let _ = rx.recv().await;
Expand Down
30 changes: 21 additions & 9 deletions xline/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ use tracing_subscriber::{fmt::format, prelude::*};
use utils::{
config::{
default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks,
default_client_wait_synced_timeout, default_cmd_workers, default_follower_timeout_ticks,
default_gc_interval, default_heartbeat_interval, default_log_entries_cap,
default_log_level, default_propose_timeout, default_range_retry_timeout,
default_retry_timeout, default_rotation, default_rpc_timeout,
default_server_wait_synced_timeout, default_sync_victims_interval,
default_watch_progress_notify_interval, file_appender, AuthConfig, ClientTimeout,
ClusterConfig, CurpConfigBuilder, LevelConfig, LogConfig, RotationConfig, ServerTimeout,
StorageConfig, TraceConfig, XlineServerConfig,
default_client_wait_synced_timeout, default_cmd_workers, default_compact_batch_size,
default_compact_interval, default_follower_timeout_ticks, default_gc_interval,
default_heartbeat_interval, default_log_entries_cap, default_log_level,
default_propose_timeout, default_range_retry_timeout, default_retry_timeout,
default_rotation, default_rpc_timeout, default_server_wait_synced_timeout,
default_sync_victims_interval, default_watch_progress_notify_interval, file_appender,
AuthConfig, ClientTimeout, ClusterConfig, CompactConfig, CurpConfigBuilder, LevelConfig,
LogConfig, RotationConfig, ServerTimeout, StorageConfig, TraceConfig, XlineServerConfig,
},
parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_rotation,
};
Expand Down Expand Up @@ -263,6 +263,12 @@ struct ServerArgs {
/// Curp command workers count
#[clap(long, default_value_t = default_cmd_workers())]
cmd_workers: u8,
/// The max number of historical versions processed in a single compact operation [default: 1000]
#[clap(long, default_value_t = default_compact_batch_size())]
compact_batch_size: usize,
/// Interval between two compaction operations [default: 10ms]
#[clap(long, value_parser = parse_duration)]
compact_interval: Option<Duration>,
}

impl From<ServerArgs> for XlineServerConfig {
Expand Down Expand Up @@ -326,7 +332,12 @@ impl From<ServerArgs> for XlineServerConfig {
args.jaeger_level,
);
let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key);
XlineServerConfig::new(cluster, storage, log, trace, auth)
let compact = CompactConfig::new(
args.compact_batch_size,
args.compact_interval
.unwrap_or_else(default_compact_interval),
);
XlineServerConfig::new(cluster, storage, log, trace, auth, compact)
}
}

Expand Down Expand Up @@ -477,6 +488,7 @@ async fn main() -> Result<()> {
*cluster_config.client_timeout(),
*cluster_config.server_timeout(),
config.storage().clone(),
*config.compact(),
);
debug!("{:?}", server);
server.start(self_addr, db_proxy, key_pair).await?;
Expand Down
8 changes: 7 additions & 1 deletion xline/src/server/xline_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use jsonwebtoken::{DecodingKey, EncodingKey};
use tokio::{net::TcpListener, sync::mpsc::unbounded_channel};
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::Server;
use utils::config::{ClientTimeout, CurpConfig, ServerTimeout, StorageConfig};
use utils::config::{ClientTimeout, CompactConfig, CurpConfig, ServerTimeout, StorageConfig};

use super::{
auth_server::AuthServer,
Expand Down Expand Up @@ -57,6 +57,8 @@ pub struct XlineServer {
client_timeout: ClientTimeout,
/// Storage config,
storage_cfg: StorageConfig,
/// Compact config
compact_cfg: CompactConfig,
/// Server timeout
server_timeout: ServerTimeout,
/// Shutdown trigger
Expand All @@ -78,13 +80,15 @@ impl XlineServer {
client_timeout: ClientTimeout,
server_timeout: ServerTimeout,
storage_config: StorageConfig,
compact_config: CompactConfig,
) -> Self {
Self {
cluster_info,
is_leader,
curp_cfg: Arc::new(curp_config),
client_timeout,
storage_cfg: storage_config,
compact_cfg: compact_config,
server_timeout,
shutdown_trigger: Arc::new(event_listener::Event::new()),
}
Expand Down Expand Up @@ -134,6 +138,8 @@ impl XlineServer {
let _hd = tokio::spawn(compactor(
Arc::clone(&kv_storage),
Arc::clone(&index),
*self.compact_cfg.compact_batch_size(),
*self.compact_cfg.compact_interval(),
compact_task_rx,
));
// TODO: Boot up the compact policy scheduler
Expand Down
11 changes: 5 additions & 6 deletions xline/src/storage/compact.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use event_listener::Event;
use tokio::{sync::mpsc::UnboundedReceiver, time::sleep};
Expand All @@ -13,24 +13,23 @@ use super::{
pub(crate) async fn compactor<DB>(
kv_store: Arc<KvStore<DB>>,
index: Arc<Index>,
batch_limit: usize,
interval: Duration,
mut compact_task_rx: UnboundedReceiver<(i64, Option<Arc<Event>>)>,
) where
DB: StorageApi,
{
// TODO: make compact_interval and compact_batch_limit configurable
let compact_interval = std::time::Duration::from_millis(10);
let compact_batch_limit = 1000;
while let Some((revision, listener)) = compact_task_rx.recv().await {
let target_revisions = index
.compact(revision)
.into_iter()
.map(|key_rev| key_rev.as_revision().encode_to_vec())
.collect::<Vec<Vec<_>>>();
for revision_chunk in target_revisions.chunks(compact_batch_limit) {
for revision_chunk in target_revisions.chunks(batch_limit) {
if let Err(e) = kv_store.compact(revision_chunk) {
panic!("failed to compact revision chunk {revision_chunk:?} due to {e}");
}
sleep(compact_interval).await;
sleep(interval).await;
}
if let Some(notifier) = listener {
notifier.notify(usize::MAX);
Expand Down

0 comments on commit fcf9406

Please sign in to comment.