From 22620201c11555381bc6d368f297cd3039ebf4dc Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Mon, 3 Jul 2023 22:59:22 +0800 Subject: [PATCH] feat(compactor): add kv server compact implementation Refs: 188 Signed-off-by: Phoeniix Zhao --- xline/src/server/kv_server.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 4d752481a..df1c2ff20 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -9,6 +9,7 @@ use futures::future::join_all; use tokio::time::timeout; use tracing::{debug, instrument}; use uuid::Uuid; +use xlineapi::ResponseWrapper; use super::{ auth_server::get_token, @@ -547,11 +548,19 @@ where debug!("Receive CompactionRequest {:?}", request); let compacted_revision = self.kv_storage.compacted_revision(); let current_revision = self.kv_storage.revision(); - Self::check_compact_request(request.get_ref(), compacted_revision, current_revision)?; - Err(tonic::Status::new( - tonic::Code::Unimplemented, - "Not Implemented".to_owned(), - )) + let req = request.get_ref(); + Self::check_compact_request(req, compacted_revision, current_revision)?; + + let is_fast_path = !req.physical; + let (cmd_res, _sync_res) = self.propose(request, is_fast_path).await?; + let resp = cmd_res.decode(); + + if let ResponseWrapper::CompactionResponse(response) = resp{ + Ok(tonic::Response::new(response)) + } else { + panic!("Receive wrong response {resp:?} for CompactionRequest"); + } + } }