Skip to content

Commit

Permalink
feat(compactor): add kv server compact implementation
Browse files Browse the repository at this point in the history
Refs: 188
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Jul 10, 2023
1 parent eb95ff7 commit add7c47
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 27 deletions.
6 changes: 4 additions & 2 deletions xline-client/tests/watch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! The following tests are originally from `etcd-client`
use crate::common::get_cluster_client;
use xline_client::{error::Result, types::kv::PutRequest, types::watch::WatchRequest};
use xline_client::{
error::Result,
types::{kv::PutRequest, watch::WatchRequest},
};
use xlineapi::EventType;

use crate::common::get_cluster_client;
Expand Down
21 changes: 9 additions & 12 deletions xline/src/server/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,29 +488,26 @@ impl ConflictCheck for Command {
if (this_req.is_txn_request() && other_req.is_compaction_request())
|| (this_req.is_compaction_request() && other_req.is_txn_request())
{
let conflict = match (this_req.clone(), other_req.clone()) {
match (this_req, other_req) {
(
RequestWrapper::CompactionRequest(ref com_req),
RequestWrapper::TxnRequest(ref txn_req),
&RequestWrapper::CompactionRequest(ref com_req),
&RequestWrapper::TxnRequest(ref txn_req),
)
| (
RequestWrapper::TxnRequest(ref txn_req),
RequestWrapper::CompactionRequest(ref com_req),
&RequestWrapper::TxnRequest(ref txn_req),
&RequestWrapper::CompactionRequest(ref com_req),
) => {
let target_revision = com_req.revision;
txn_req.is_conflict_with_rev(target_revision)
return txn_req.is_conflict_with_rev(target_revision)
}
_ => false,
};
if conflict {
return true;
_ => unreachable!("The request must be either a transaction or a compaction request! \nthis_req = {this_req:?} \nother_req = {other_req:?}")
}
}

let this_lease_ids = get_lease_ids(this_req);
let other_lease_ids = get_lease_ids(other_req);
let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids);
let key_conflict: bool = self
let key_conflict = self
.keys()
.iter()
.cartesian_product(other.keys().iter())
Expand Down Expand Up @@ -699,7 +696,7 @@ mod test {
lease: 123,
..Default::default()
})),
ProposeId::new("id8".to_owned()),
ProposeId::new("id5".to_owned()),
);
let txn_with_lease_id_cmd = Command::new(
vec![KeyRange::new_one_key("key")],
Expand Down
47 changes: 34 additions & 13 deletions xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::future::join_all;
use tokio::time::timeout;
use tracing::{debug, instrument};
use uuid::Uuid;
use xlineapi::ResponseWrapper;

use super::{
auth_server::get_token,
Expand Down Expand Up @@ -211,16 +212,23 @@ where
"required revision {} is higher than current revision {}",
req.revision, current_revision
)))
} else if req.revision < compacted_revision {
Err(tonic::Status::invalid_argument(format!(
"required revision {} has been compacted, compacted revision is {}",
req.revision, compacted_revision
)))
} else {
Ok(())
Self::check_range_compacted(req.revision, compacted_revision)
}
}

/// check whether the required revision is compacted or not
fn check_range_compacted(
range_revision: i64,
compacted_revision: i64,
) -> Result<(), tonic::Status> {
(range_revision >= compacted_revision)
.then_some(())
.ok_or(tonic::Status::invalid_argument(format!(
"required revision {range_revision} has been compacted, compacted revision is {compacted_revision}"
)))
}

/// Validate compact request before handle
fn check_compact_request(
req: &CompactionRequest,
Expand Down Expand Up @@ -439,13 +447,21 @@ where
self.kv_storage.compacted_revision(),
self.kv_storage.revision(),
)?;
let range_required_revision = range_req.revision;
let is_serializable = range_req.serializable;
let token = get_token(request.metadata());
let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token);
let propose_id = self.generate_propose_id();
let cmd = Self::command_from_request_wrapper(propose_id, wrapper);
if !is_serializable {
self.wait_read_state(&cmd).await?;
// Double check whether the range request is compacted or not since the compaction request
// may be executed during the process of `wait_read_state` which results in the result of
// previous `check_range_request` outdated.
Self::check_range_compacted(
range_required_revision,
self.kv_storage.compacted_revision(),
)?;
}
self.serializable_range(cmd.request())
}
Expand Down Expand Up @@ -547,18 +563,23 @@ where
debug!("Receive CompactionRequest {:?}", request);
let compacted_revision = self.kv_storage.compacted_revision();
let current_revision = self.kv_storage.revision();
Self::check_compact_request(request.get_ref(), compacted_revision, current_revision)?;
Err(tonic::Status::new(
tonic::Code::Unimplemented,
"Not Implemented".to_owned(),
))
let req = request.get_ref();
Self::check_compact_request(req, compacted_revision, current_revision)?;

let is_fast_path = !req.physical;
let (cmd_res, _sync_res) = self.propose(request, is_fast_path).await?;
let resp = cmd_res.decode();

if let ResponseWrapper::CompactionResponse(response) = resp {
Ok(tonic::Response::new(response))
} else {
panic!("Receive wrong response {resp:?} for CompactionRequest");
}
}
}

#[cfg(test)]
mod test {
use test_macros::abort_on_panic;

use super::*;
use crate::storage::db::DB;

Expand Down
1 change: 1 addition & 0 deletions xline/src/storage/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) async fn compactor<DB>(
.into_iter()
.map(|key_rev| key_rev.as_revision().encode_to_vec())
.collect::<Vec<Vec<_>>>();
// Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance?
for revision_chunk in target_revisions.chunks(batch_limit) {
if let Err(e) = kv_store.compact(revision_chunk) {
panic!("failed to compact revision chunk {revision_chunk:?} due to {e}");
Expand Down
1 change: 1 addition & 0 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ where
) -> Result<(Vec<WriteOp>, Vec<Event>), ExecuteError> {
let revision = req.revision;
let ops = vec![WriteOp::PutCompactRevision(revision)];
// TODO: Remove the physical process logic here. It's better to move into the KvServer
#[allow(clippy::collapsible_else_if)]
if req.physical {
let event = Arc::new(event_listener::Event::new());
Expand Down

0 comments on commit add7c47

Please sign in to comment.