Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/compactor #311

Merged
merged 9 commits into from
Jul 25, 2023
3 changes: 2 additions & 1 deletion simulation/src/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,13 @@ impl CurpGroup {
}

pub async fn get_leader(&self) -> (ServerId, u64) {
const RETRY_INTERVAL: u64 = 100;
loop {
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
if let Some(leader) = self.try_get_leader().await {
return leader;
}
debug!("failed to get leader");
madsim::time::sleep(Duration::from_millis(100)).await;
madsim::time::sleep(Duration::from_millis(RETRY_INTERVAL)).await;
}
}

Expand Down
72 changes: 71 additions & 1 deletion utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub struct XlineServerConfig {
/// auth configuration object
#[getset(get = "pub")]
auth: AuthConfig,
/// compactor configuration object
#[getset(get = "pub")]
compact: CompactConfig,
}

/// Cluster Range type alias
Expand Down Expand Up @@ -118,6 +121,56 @@ impl ClusterConfig {
}
}

/// Compaction configuration
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Getters)]
#[allow(clippy::module_name_repetitions)]
pub struct CompactConfig {
/// The max number of historical versions processed in a single compact operation
#[getset(get = "pub")]
#[serde(default = "default_compact_batch_size")]
compact_batch_size: usize,
/// The interval between two compaction batches
#[getset(get = "pub")]
#[serde(with = "duration_format", default = "default_compact_sleep_interval")]
compact_sleep_interval: Duration,
}

impl Default for CompactConfig {
#[inline]
fn default() -> Self {
Self {
compact_batch_size: default_compact_batch_size(),
compact_sleep_interval: default_compact_sleep_interval(),
}
}
}

impl CompactConfig {
/// Create a new compact config
#[must_use]
#[inline]
pub fn new(compact_batch_size: usize, compact_sleep_interval: Duration) -> Self {
Self {
compact_batch_size,
compact_sleep_interval,
}
}
}

/// default compact batch size
#[must_use]
#[inline]
pub const fn default_compact_batch_size() -> usize {
1000
}

/// default compact interval
#[must_use]
#[inline]
pub const fn default_compact_sleep_interval() -> Duration {
Duration::from_millis(10)
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
}

/// Curp server timeout settings
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Getters, Builder)]
#[allow(clippy::module_name_repetitions, clippy::exhaustive_structs)]
Expand Down Expand Up @@ -470,7 +523,7 @@ pub mod level_format {
use crate::parse_log_level;

/// deserializes a cluster duration
#[allow(single_use_lifetimes)] // TODO: Think is it necessary to allow this clippy??
#[allow(single_use_lifetimes)]
pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result<LevelConfig, D::Error>
where
D: Deserializer<'de>,
Expand Down Expand Up @@ -647,13 +700,15 @@ impl XlineServerConfig {
log: LogConfig,
trace: TraceConfig,
auth: AuthConfig,
compact: CompactConfig,
) -> Self {
Self {
cluster,
storage,
log,
trace,
auth,
compact,
}
}
}
Expand Down Expand Up @@ -692,6 +747,10 @@ mod tests {
[storage]
engine = 'memory'

[compact]
compact_batch_size = 123
compact_sleep_interval = '5ms'

[log]
path = '/var/log/xline'
rotation = 'daily'
Expand Down Expand Up @@ -761,6 +820,14 @@ mod tests {
LevelConfig::INFO
)
);

assert_eq!(
config.compact,
CompactConfig {
compact_batch_size: 123,
compact_sleep_interval: Duration::from_millis(5)
}
);
}

#[allow(clippy::unwrap_used)]
Expand All @@ -785,6 +852,8 @@ mod tests {
engine = 'rocksdb'
data_dir = '/usr/local/xline/data-dir'

[compact]

[trace]
jaeger_online = false
jaeger_offline = false
Expand Down Expand Up @@ -836,5 +905,6 @@ mod tests {
LevelConfig::INFO
)
);
assert_eq!(config.compact, CompactConfig::default());
}
}
3 changes: 2 additions & 1 deletion xline-test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::{
sync::broadcast::{self, Sender},
time::{self, Duration},
};
use utils::config::{ClientTimeout, CurpConfig, ServerTimeout, StorageConfig};
use utils::config::{ClientTimeout, CompactConfig, CurpConfig, ServerTimeout, StorageConfig};
use xline::{client::Client, server::XlineServer, storage::db::DB};

/// Cluster
Expand Down Expand Up @@ -86,6 +86,7 @@ impl Cluster {
ClientTimeout::default(),
ServerTimeout::default(),
StorageConfig::Memory,
CompactConfig::default(),
);
let signal = async {
let _ = rx.recv().await;
Expand Down
30 changes: 21 additions & 9 deletions xline/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ use tracing_subscriber::{fmt::format, prelude::*};
use utils::{
config::{
default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks,
default_client_wait_synced_timeout, default_cmd_workers, default_follower_timeout_ticks,
default_gc_interval, default_heartbeat_interval, default_log_entries_cap,
default_log_level, default_propose_timeout, default_range_retry_timeout,
default_retry_timeout, default_rotation, default_rpc_timeout,
default_server_wait_synced_timeout, default_sync_victims_interval,
default_watch_progress_notify_interval, file_appender, AuthConfig, ClientTimeout,
ClusterConfig, CurpConfigBuilder, LevelConfig, LogConfig, RotationConfig, ServerTimeout,
StorageConfig, TraceConfig, XlineServerConfig,
default_client_wait_synced_timeout, default_cmd_workers, default_compact_batch_size,
default_compact_sleep_interval, default_follower_timeout_ticks, default_gc_interval,
default_heartbeat_interval, default_log_entries_cap, default_log_level,
default_propose_timeout, default_range_retry_timeout, default_retry_timeout,
default_rotation, default_rpc_timeout, default_server_wait_synced_timeout,
default_sync_victims_interval, default_watch_progress_notify_interval, file_appender,
AuthConfig, ClientTimeout, ClusterConfig, CompactConfig, CurpConfigBuilder, LevelConfig,
LogConfig, RotationConfig, ServerTimeout, StorageConfig, TraceConfig, XlineServerConfig,
},
parse_batch_bytes, parse_duration, parse_log_level, parse_members, parse_rotation,
};
Expand Down Expand Up @@ -263,6 +263,12 @@ struct ServerArgs {
/// Curp command workers count
#[clap(long, default_value_t = default_cmd_workers())]
cmd_workers: u8,
/// The max number of historical versions processed in a single compact operation [default: 1000]
#[clap(long, default_value_t = default_compact_batch_size())]
compact_batch_size: usize,
/// Interval between two compaction operations [default: 10ms]
#[clap(long, value_parser = parse_duration)]
compact_sleep_interval: Option<Duration>,
}

impl From<ServerArgs> for XlineServerConfig {
Expand Down Expand Up @@ -329,7 +335,12 @@ impl From<ServerArgs> for XlineServerConfig {
args.jaeger_level,
);
let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key);
XlineServerConfig::new(cluster, storage, log, trace, auth)
let compact = CompactConfig::new(
args.compact_batch_size,
args.compact_sleep_interval
.unwrap_or_else(default_compact_sleep_interval),
);
XlineServerConfig::new(cluster, storage, log, trace, auth, compact)
}
}

Expand Down Expand Up @@ -480,6 +491,7 @@ async fn main() -> Result<()> {
*cluster_config.client_timeout(),
*cluster_config.server_timeout(),
config.storage().clone(),
*config.compact(),
);
debug!("{:?}", server);
server.start(self_addr, db_proxy, key_pair).await?;
Expand Down
133 changes: 131 additions & 2 deletions xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,29 @@ impl ConflictCheck for Command {
return true;
}

if this_req.is_compaction_request() && other_req.is_compaction_request() {
return true;
Phoenix500526 marked this conversation as resolved.
Show resolved Hide resolved
}

if (this_req.is_txn_request() && other_req.is_compaction_request())
|| (this_req.is_compaction_request() && other_req.is_txn_request())
{
match (this_req, other_req) {
(
&RequestWrapper::CompactionRequest(ref com_req),
&RequestWrapper::TxnRequest(ref txn_req),
)
| (
&RequestWrapper::TxnRequest(ref txn_req),
&RequestWrapper::CompactionRequest(ref com_req),
) => {
let target_revision = com_req.revision;
return txn_req.is_conflict_with_rev(target_revision)
themanforfree marked this conversation as resolved.
Show resolved Hide resolved
}
_ => unreachable!("The request must be either a transaction or a compaction request! \nthis_req = {this_req:?} \nother_req = {other_req:?}")
}
}

let this_lease_ids = get_lease_ids(this_req);
let other_lease_ids = get_lease_ids(other_req);
let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids);
Expand Down Expand Up @@ -577,10 +600,12 @@ impl CurpCommand for Command {

#[cfg(test)]
mod test {
use xlineapi::Compare;

use super::*;
use crate::rpc::{
AuthEnableRequest, AuthStatusRequest, LeaseGrantRequest, LeaseLeasesRequest,
LeaseRevokeRequest, PutRequest, RequestOp, TxnRequest,
AuthEnableRequest, AuthStatusRequest, CompactionRequest, LeaseGrantRequest,
LeaseLeasesRequest, LeaseRevokeRequest, PutRequest, RangeRequest, RequestOp, TxnRequest,
};

#[test]
Expand Down Expand Up @@ -708,4 +733,108 @@ mod test {
assert!(lease_leases_cmd.is_conflict(&cmd5)); // lease read and write
assert!(cmd6.is_conflict(&lease_leases_cmd)); // lease read and write
}

fn generate_txn_command(
keys: Vec<KeyRange>,
compare: Vec<Compare>,
success: Vec<RequestOp>,
failure: Vec<RequestOp>,
propose_id: &str,
) -> Command {
Command::new(
keys,
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare,
success,
failure,
})),
ProposeId::new(propose_id.to_owned()),
)
}

#[test]
fn test_compaction_txn_conflict() {
let compaction_cmd_1 = Command::new(
vec![],
RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest {
revision: 3,
physical: false,
})),
ProposeId::new("id11".to_owned()),
);

let compaction_cmd_2 = Command::new(
vec![],
RequestWithToken::new(RequestWrapper::CompactionRequest(CompactionRequest {
revision: 5,
physical: false,
})),
ProposeId::new("id12".to_owned()),
);

let txn_with_lease_id_cmd = generate_txn_command(
vec![KeyRange::new_one_key("key")],
vec![],
vec![RequestOp {
request: Some(Request::RequestPut(PutRequest {
key: b"key".to_vec(),
value: b"value".to_vec(),
lease: 123,
..Default::default()
})),
}],
vec![],
"id6",
);

let txn_cmd_1 = generate_txn_command(
vec![KeyRange::new_one_key("key")],
vec![],
vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
..Default::default()
})),
}],
vec![],
"id13",
);

let txn_cmd_2 = generate_txn_command(
vec![KeyRange::new_one_key("key")],
vec![],
vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
revision: 3,
..Default::default()
})),
}],
vec![],
"id14",
);

let txn_cmd_3 = generate_txn_command(
vec![KeyRange::new_one_key("key")],
vec![],
vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
revision: 7,
..Default::default()
})),
}],
vec![],
"id15",
);

assert!(compaction_cmd_1.is_conflict(&compaction_cmd_2));
assert!(compaction_cmd_2.is_conflict(&compaction_cmd_1));
assert!(!compaction_cmd_1.is_conflict(&txn_with_lease_id_cmd));
assert!(!compaction_cmd_2.is_conflict(&txn_with_lease_id_cmd));

assert!(!compaction_cmd_2.is_conflict(&txn_cmd_1));
assert!(compaction_cmd_2.is_conflict(&txn_cmd_2));
assert!(!compaction_cmd_2.is_conflict(&txn_cmd_3));
}
}
Loading