diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 7864930ba..c606016c5 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -9,6 +9,7 @@ use futures::future::join_all; use tokio::time::timeout; use tracing::{debug, instrument}; use uuid::Uuid; +use xlineapi::ResponseWrapper; use super::{ auth_server::get_token, @@ -547,18 +548,23 @@ where debug!("Receive CompactionRequest {:?}", request); let compacted_revision = self.kv_storage.compacted_revision(); let current_revision = self.kv_storage.revision(); - Self::check_compact_request(request.get_ref(), compacted_revision, current_revision)?; - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Not Implemented".to_owned(), - )) + let req = request.get_ref(); + Self::check_compact_request(req, compacted_revision, current_revision)?; + + let is_fast_path = !req.physical; + let (cmd_res, _sync_res) = self.propose(request, is_fast_path).await?; + let resp = cmd_res.decode(); + + if let ResponseWrapper::CompactionResponse(response) = resp { + Ok(tonic::Response::new(response)) + } else { + panic!("Receive wrong response {resp:?} for CompactionRequest"); + } } } #[cfg(test)] mod test { - use test_macros::abort_on_panic; - use super::*; use crate::storage::db::DB; diff --git a/xline/src/storage/compact.rs b/xline/src/storage/compact.rs index ae14fc263..03ac75200 100644 --- a/xline/src/storage/compact.rs +++ b/xline/src/storage/compact.rs @@ -25,6 +25,7 @@ pub(crate) async fn compactor( .into_iter() .map(|key_rev| key_rev.as_revision().encode_to_vec()) .collect::>>(); + // Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance? for revision_chunk in target_revisions.chunks(batch_limit) { if let Err(e) = kv_store.compact(revision_chunk) { panic!("failed to compact revision chunk {revision_chunk:?} due to {e}"); diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index fd96758be..b60ef1562 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -644,6 +644,7 @@ where ) -> Result<(Vec, Vec), ExecuteError> { let revision = req.revision; let ops = vec![WriteOp::PutCompactRevision(revision)]; + // TODO: Remove the physical process logic here. It's better to move into the KvServer #[allow(clippy::collapsible_else_if)] if req.physical { let event = Arc::new(event_listener::Event::new());