diff --git a/xline/src/server/command.rs b/xline/src/server/command.rs index 745bf0149..bdda89d79 100644 --- a/xline/src/server/command.rs +++ b/xline/src/server/command.rs @@ -488,29 +488,26 @@ impl ConflictCheck for Command { if (this_req.is_txn_request() && other_req.is_compaction_request()) || (this_req.is_compaction_request() && other_req.is_txn_request()) { - let conflict = match (this_req.clone(), other_req.clone()) { + match (this_req, other_req) { ( - RequestWrapper::CompactionRequest(ref com_req), - RequestWrapper::TxnRequest(ref txn_req), + &RequestWrapper::CompactionRequest(ref com_req), + &RequestWrapper::TxnRequest(ref txn_req), ) | ( - RequestWrapper::TxnRequest(ref txn_req), - RequestWrapper::CompactionRequest(ref com_req), + &RequestWrapper::TxnRequest(ref txn_req), + &RequestWrapper::CompactionRequest(ref com_req), ) => { let target_revision = com_req.revision; - txn_req.is_conflict_with_rev(target_revision) + return txn_req.is_conflict_with_rev(target_revision) } - _ => false, - }; - if conflict { - return true; + _ => unreachable!("The request must be either a transaction or a compaction request! \nthis_req = {this_req:?} \nother_req = {other_req:?}") } } let this_lease_ids = get_lease_ids(this_req); let other_lease_ids = get_lease_ids(other_req); let lease_conflict = !this_lease_ids.is_disjoint(&other_lease_ids); - let key_conflict: bool = self + let key_conflict = self .keys() .iter() .cartesian_product(other.keys().iter()) @@ -699,7 +696,7 @@ mod test { lease: 123, ..Default::default() })), - ProposeId::new("id8".to_owned()), + ProposeId::new("id5".to_owned()), ); let txn_with_lease_id_cmd = Command::new( vec![KeyRange::new_one_key("key")], diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 7864930ba..84dd2c933 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -9,6 +9,7 @@ use futures::future::join_all; use tokio::time::timeout; use tracing::{debug, instrument}; use uuid::Uuid; +use xlineapi::ResponseWrapper; use super::{ auth_server::get_token, @@ -211,16 +212,23 @@ where "required revision {} is higher than current revision {}", req.revision, current_revision ))) - } else if req.revision < compacted_revision { - Err(tonic::Status::invalid_argument(format!( - "required revision {} has been compacted, compacted revision is {}", - req.revision, compacted_revision - ))) } else { - Ok(()) + Self::check_range_compacted(req.revision, compacted_revision) } } + /// check whether the required revision is compacted or not + fn check_range_compacted( + range_revision: i64, + compacted_revision: i64, + ) -> Result<(), tonic::Status> { + (range_revision >= compacted_revision) + .then_some(()) + .ok_or(tonic::Status::invalid_argument(format!( + "required revision {range_revision} has been compacted, compacted revision is {compacted_revision}" + ))) + } + /// Validate compact request before handle fn check_compact_request( req: &CompactionRequest, @@ -439,6 +447,7 @@ where self.kv_storage.compacted_revision(), self.kv_storage.revision(), )?; + let range_required_revision = range_req.revision; let is_serializable = range_req.serializable; let token = get_token(request.metadata()); let wrapper = RequestWithToken::new_with_token(request.into_inner().into(), token); @@ -446,6 +455,13 @@ where let cmd = Self::command_from_request_wrapper(propose_id, wrapper); if !is_serializable { self.wait_read_state(&cmd).await?; + // Double check whether the range request is compacted or not since the compaction request + // may be executed during the process of `wait_read_state` which results in the result of + // previous `check_range_request` outdated. + Self::check_range_compacted( + range_required_revision, + self.kv_storage.compacted_revision(), + )?; } self.serializable_range(cmd.request()) } @@ -547,18 +563,23 @@ where debug!("Receive CompactionRequest {:?}", request); let compacted_revision = self.kv_storage.compacted_revision(); let current_revision = self.kv_storage.revision(); - Self::check_compact_request(request.get_ref(), compacted_revision, current_revision)?; - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Not Implemented".to_owned(), - )) + let req = request.get_ref(); + Self::check_compact_request(req, compacted_revision, current_revision)?; + + let is_fast_path = !req.physical; + let (cmd_res, _sync_res) = self.propose(request, is_fast_path).await?; + let resp = cmd_res.decode(); + + if let ResponseWrapper::CompactionResponse(response) = resp { + Ok(tonic::Response::new(response)) + } else { + panic!("Receive wrong response {resp:?} for CompactionRequest"); + } } } #[cfg(test)] mod test { - use test_macros::abort_on_panic; - use super::*; use crate::storage::db::DB; diff --git a/xline/src/storage/compact.rs b/xline/src/storage/compact.rs index ae14fc263..03ac75200 100644 --- a/xline/src/storage/compact.rs +++ b/xline/src/storage/compact.rs @@ -25,6 +25,7 @@ pub(crate) async fn compactor( .into_iter() .map(|key_rev| key_rev.as_revision().encode_to_vec()) .collect::>>(); + // Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance? for revision_chunk in target_revisions.chunks(batch_limit) { if let Err(e) = kv_store.compact(revision_chunk) { panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index fd96758be..b60ef1562 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -644,6 +644,7 @@ where ) -> Result<(Vec, Vec), ExecuteError> { let revision = req.revision; let ops = vec![WriteOp::PutCompactRevision(revision)]; + // TODO: Remove the physical process logic here. It's better to move into the KvServer #[allow(clippy::collapsible_else_if)] if req.physical { let event = Arc::new(event_listener::Event::new());