Skip to content

Commit

Permalink
feat(compactor): add kv server compact implementation
Browse files Browse the repository at this point in the history
Refs: 188
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 authored and mergify[bot] committed Jul 25, 2023
1 parent 1a1f446 commit 2ee85ae
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 96 deletions.
3 changes: 2 additions & 1 deletion simulation/src/curp_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,13 @@ impl CurpGroup {
}

pub async fn get_leader(&self) -> (ServerId, u64) {
const RETRY_INTERVAL: u64 = 100;
loop {
if let Some(leader) = self.try_get_leader().await {
return leader;
}
debug!("failed to get leader");
madsim::time::sleep(Duration::from_millis(100)).await;
madsim::time::sleep(Duration::from_millis(RETRY_INTERVAL)).await;
}
}

Expand Down
18 changes: 9 additions & 9 deletions utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,18 @@ pub struct CompactConfig {
#[getset(get = "pub")]
#[serde(default = "default_compact_batch_size")]
compact_batch_size: usize,
/// The interval between two compact operations
/// The interval between two compaction batches
#[getset(get = "pub")]
#[serde(with = "duration_format", default = "default_compact_interval")]
compact_interval: Duration,
#[serde(with = "duration_format", default = "default_compact_sleep_interval")]
compact_sleep_interval: Duration,
}

impl Default for CompactConfig {
#[inline]
fn default() -> Self {
Self {
compact_batch_size: default_compact_batch_size(),
compact_interval: default_compact_interval(),
compact_sleep_interval: default_compact_sleep_interval(),
}
}
}
Expand All @@ -149,10 +149,10 @@ impl CompactConfig {
/// Create a new compact config
#[must_use]
#[inline]
pub fn new(compact_batch_size: usize, compact_interval: Duration) -> Self {
pub fn new(compact_batch_size: usize, compact_sleep_interval: Duration) -> Self {
Self {
compact_batch_size,
compact_interval,
compact_sleep_interval,
}
}
}
Expand All @@ -167,7 +167,7 @@ pub const fn default_compact_batch_size() -> usize {
/// default compact interval
#[must_use]
#[inline]
pub const fn default_compact_interval() -> Duration {
pub const fn default_compact_sleep_interval() -> Duration {
Duration::from_millis(10)
}

Expand Down Expand Up @@ -749,7 +749,7 @@ mod tests {
[compact]
compact_batch_size = 123
compact_interval = '5ms'
compact_sleep_interval = '5ms'
[log]
path = '/var/log/xline'
Expand Down Expand Up @@ -825,7 +825,7 @@ mod tests {
config.compact,
CompactConfig {
compact_batch_size: 123,
compact_interval: Duration::from_millis(5)
compact_sleep_interval: Duration::from_millis(5)
}
);
}
Expand Down
8 changes: 4 additions & 4 deletions xline/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ use utils::{
config::{
default_batch_max_size, default_batch_timeout, default_candidate_timeout_ticks,
default_client_wait_synced_timeout, default_cmd_workers, default_compact_batch_size,
default_compact_interval, default_follower_timeout_ticks, default_gc_interval,
default_compact_sleep_interval, default_follower_timeout_ticks, default_gc_interval,
default_heartbeat_interval, default_log_entries_cap, default_log_level,
default_propose_timeout, default_range_retry_timeout, default_retry_timeout,
default_rotation, default_rpc_timeout, default_server_wait_synced_timeout,
Expand Down Expand Up @@ -268,7 +268,7 @@ struct ServerArgs {
compact_batch_size: usize,
/// Interval between two compaction operations [default: 10ms]
#[clap(long, value_parser = parse_duration)]
compact_interval: Option<Duration>,
compact_sleep_interval: Option<Duration>,
}

impl From<ServerArgs> for XlineServerConfig {
Expand Down Expand Up @@ -337,8 +337,8 @@ impl From<ServerArgs> for XlineServerConfig {
let auth = AuthConfig::new(args.auth_public_key, args.auth_private_key);
let compact = CompactConfig::new(
args.compact_batch_size,
args.compact_interval
.unwrap_or_else(default_compact_interval),
args.compact_sleep_interval
.unwrap_or_else(default_compact_sleep_interval),
);
XlineServerConfig::new(cluster, storage, log, trace, auth, compact)
}
Expand Down
137 changes: 73 additions & 64 deletions xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,29 +488,26 @@ impl ConflictCheck for Command {
if (this_req.is_txn_request() && other_req.is_compaction_request())
|| (this_req.is_compaction_request() && other_req.is_txn_request())
{
let conflict = match (this_req.clone(), other_req.clone()) {
match (this_req, other_req) {
(
RequestWrapper::CompactionRequest(ref com_req),
RequestWrapper::TxnRequest(ref txn_req),
&RequestWrapper::CompactionRequest(ref com_req),
&RequestWrapper::TxnRequest(ref txn_req),
)
| (
RequestWrapper::TxnRequest(ref txn_req),
RequestWrapper::CompactionRequest(ref com_req),
&RequestWrapper::TxnRequest(ref txn_req),
&RequestWrapper::CompactionRequest(ref com_req),
) => {
let target_revision = com_req.revision;
txn_req.is_conflict_with_rev(target_revision)
return txn_req.is_conflict_with_rev(target_revision)
}
_ => false,
};
if conflict {
return true;
_ => unreachable!("The request must be either a transaction or a compaction request! \nthis_req = {this_req:?} \nother_req = {other_req:?}")
}
}

let this_lease_ids = get_lease_ids(this_req);
let other_lease_ids = get_lease_ids(other_req);
let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids);
let key_conflict: bool = self
let key_conflict = self
.keys()
.iter()
.cartesian_product(other.keys().iter())
Expand Down Expand Up @@ -603,6 +600,8 @@ impl CurpCommand for Command {

#[cfg(test)]
mod test {
use xlineapi::Compare;

use super::*;
use crate::rpc::{
AuthEnableRequest, AuthStatusRequest, CompactionRequest, LeaseGrantRequest,
Expand Down Expand Up @@ -699,7 +698,7 @@ mod test {
lease: 123,
..Default::default()
})),
ProposeId::new("id8".to_owned()),
ProposeId::new("id5".to_owned()),
);
let txn_with_lease_id_cmd = Command::new(
vec![KeyRange::new_one_key("key")],
Expand Down Expand Up @@ -735,6 +734,24 @@ mod test {
assert!(cmd6.is_conflict(&lease_leases_cmd)); // lease read and write
}

fn generate_txn_command(
keys: Vec<KeyRange>,
compare: Vec<Compare>,
success: Vec<RequestOp>,
failure: Vec<RequestOp>,
propose_id: &str,
) -> Command {
Command::new(
keys,
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare,
success,
failure,
})),
ProposeId::new(propose_id.to_owned()),
)
}

#[test]
fn test_compaction_txn_conflict() {
let compaction_cmd_1 = Command::new(
Expand All @@ -755,68 +772,60 @@ mod test {
ProposeId::new("id12".to_owned()),
);

let txn_with_lease_id_cmd = Command::new(
let txn_with_lease_id_cmd = generate_txn_command(
vec![KeyRange::new_one_key("key")],
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestPut(PutRequest {
key: b"key".to_vec(),
value: b"value".to_vec(),
lease: 123,
..Default::default()
})),
}],
failure: vec![],
})),
ProposeId::new("id6".to_owned()),
vec![],
vec![RequestOp {
request: Some(Request::RequestPut(PutRequest {
key: b"key".to_vec(),
value: b"value".to_vec(),
lease: 123,
..Default::default()
})),
}],
vec![],
"id6",
);

let txn_cmd_1 = Command::new(
let txn_cmd_1 = generate_txn_command(
vec![KeyRange::new_one_key("key")],
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
..Default::default()
})),
}],
failure: vec![],
})),
ProposeId::new("id13".to_owned()),
vec![],
vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
..Default::default()
})),
}],
vec![],
"id13",
);

let txn_cmd_2 = Command::new(
let txn_cmd_2 = generate_txn_command(
vec![KeyRange::new_one_key("key")],
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
revision: 3,
..Default::default()
})),
}],
failure: vec![],
})),
ProposeId::new("id14".to_owned()),
vec![],
vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
revision: 3,
..Default::default()
})),
}],
vec![],
"id14",
);

let txn_cmd_3 = Command::new(
let txn_cmd_3 = generate_txn_command(
vec![KeyRange::new_one_key("key")],
RequestWithToken::new(RequestWrapper::TxnRequest(TxnRequest {
compare: vec![],
success: vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
revision: 7,
..Default::default()
})),
}],
failure: vec![],
})),
ProposeId::new("id15".to_owned()),
vec![],
vec![RequestOp {
request: Some(Request::RequestRange(RangeRequest {
key: b"key".to_vec(),
revision: 7,
..Default::default()
})),
}],
vec![],
"id15",
);

assert!(compaction_cmd_1.is_conflict(&compaction_cmd_2));
Expand Down
47 changes: 34 additions & 13 deletions xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::future::join_all;
use tokio::time::timeout;
use tracing::{debug, instrument};
use uuid::Uuid;
use xlineapi::ResponseWrapper;

use super::{
auth_server::get_token,
Expand Down Expand Up @@ -211,16 +212,23 @@ where
"required revision {} is higher than current revision {}",
req.revision, current_revision
)))
} else if req.revision < compacted_revision {
Err(tonic::Status::invalid_argument(format!(
"required revision {} has been compacted, compacted revision is {}",
req.revision, compacted_revision
)))
} else {
Ok(())
Self::check_range_compacted(req.revision, compacted_revision)
}
}

/// check whether the required revision is compacted or not
fn check_range_compacted(
range_revision: i64,
compacted_revision: i64,
) -> Result<(), tonic::Status> {
(range_revision >= compacted_revision)
.then_some(())
.ok_or(tonic::Status::invalid_argument(format!(
"required revision {range_revision} has been compacted, compacted revision is {compacted_revision}"
)))
}

/// Validate compact request before handle
fn check_compact_request(
req: &CompactionRequest,
Expand Down Expand Up @@ -439,13 +447,21 @@ where
self.kv_storage.compacted_revision(),
self.kv_storage.revision(),
)?;
let range_required_revision = range_req.revision;
let is_serializable = range_req.serializable;
let token = get_token(request.metadata());
let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token);
let propose_id = self.generate_propose_id();
let cmd = Self::command_from_request_wrapper(propose_id, wrapper);
if !is_serializable {
self.wait_read_state(&cmd).await?;
// Double check whether the range request is compacted or not since the compaction request
// may be executed during the process of `wait_read_state` which results in the result of
// previous `check_range_request` outdated.
Self::check_range_compacted(
range_required_revision,
self.kv_storage.compacted_revision(),
)?;
}
self.serializable_range(cmd.request())
}
Expand Down Expand Up @@ -547,18 +563,23 @@ where
debug!("Receive CompactionRequest {:?}", request);
let compacted_revision = self.kv_storage.compacted_revision();
let current_revision = self.kv_storage.revision();
Self::check_compact_request(request.get_ref(), compacted_revision, current_revision)?;
Err(tonic::Status::new(
tonic::Code::Unimplemented,
"Not Implemented".to_owned(),
))
let req = request.get_ref();
Self::check_compact_request(req, compacted_revision, current_revision)?;

let is_fast_path = !req.physical;
let (cmd_res, _sync_res) = self.propose(request, is_fast_path).await?;
let resp = cmd_res.decode();

if let ResponseWrapper::CompactionResponse(response) = resp {
Ok(tonic::Response::new(response))
} else {
panic!("Receive wrong response {resp:?} for CompactionRequest");
}
}
}

#[cfg(test)]
mod test {
use test_macros::abort_on_panic;

use super::*;
use crate::storage::db::DB;

Expand Down
Loading

0 comments on commit 2ee85ae

Please sign in to comment.