Skip to content

Commit

Permalink
feat(compactor): add kv server compact implementation
Browse files Browse the repository at this point in the history
Refs: 188
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Jul 6, 2023
1 parent 9858d72 commit f404ad1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
20 changes: 13 additions & 7 deletions xline/src/server/kv_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::future::join_all;
use tokio::time::timeout;
use tracing::{debug, instrument};
use uuid::Uuid;
use xlineapi::ResponseWrapper;

use super::{
auth_server::get_token,
Expand Down Expand Up @@ -547,18 +548,23 @@ where
debug!("Receive CompactionRequest {:?}", request);
let compacted_revision = self.kv_storage.compacted_revision();
let current_revision = self.kv_storage.revision();
Self::check_compact_request(request.get_ref(), compacted_revision, current_revision)?;
Err(tonic::Status::new(
tonic::Code::Unimplemented,
"Not Implemented".to_owned(),
))
let req = request.get_ref();
Self::check_compact_request(req, compacted_revision, current_revision)?;

let is_fast_path = !req.physical;
let (cmd_res, _sync_res) = self.propose(request, is_fast_path).await?;
let resp = cmd_res.decode();

if let ResponseWrapper::CompactionResponse(response) = resp {
Ok(tonic::Response::new(response))
} else {
panic!("Receive wrong response {resp:?} for CompactionRequest");
}
}
}

#[cfg(test)]
mod test {
use test_macros::abort_on_panic;

use super::*;
use crate::storage::db::DB;

Expand Down
1 change: 1 addition & 0 deletions xline/src/storage/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) async fn compactor<DB>(
.into_iter()
.map(|key_rev| key_rev.as_revision().encode_to_vec())
.collect::<Vec<Vec<_>>>();
// Given that the Xline uses a lim-tree database with smaller write amplification as the storage backend , does using progressive compaction really good at improving performance?
for revision_chunk in target_revisions.chunks(batch_limit) {
if let Err(e) = kv_store.compact(revision_chunk) {
panic!("failed to compact revision chunk {revision_chunk:?} due to {e}");
Expand Down
1 change: 1 addition & 0 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ where
) -> Result<(Vec<WriteOp>, Vec<Event>), ExecuteError> {
let revision = req.revision;
let ops = vec![WriteOp::PutCompactRevision(revision)];
// TODO: Remove the physical process logic here. It's better to move into the KvServer
#[allow(clippy::collapsible_else_if)]
if req.physical {
let event = Arc::new(event_listener::Event::new());
Expand Down

0 comments on commit f404ad1

Please sign in to comment.