From 004ef186484384e16c0fd9d9669390dfa4667e07 Mon Sep 17 00:00:00 2001 From: Phoeniix Zhao Date: Mon, 3 Jul 2023 18:13:06 +0800 Subject: [PATCH] feat(compactor): implement compact consensus process Refs: 188 Signed-off-by: Phoeniix Zhao --- xline/src/storage/db.rs | 10 ++++++ xline/src/storage/kv_store.rs | 60 +++++++++++++++++++++++++++++++---- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/xline/src/storage/db.rs b/xline/src/storage/db.rs index 92d9724d7..2b31e4a16 100644 --- a/xline/src/storage/db.rs +++ b/xline/src/storage/db.rs @@ -26,6 +26,9 @@ pub(crate) const XLINE_TABLES: [&str; 6] = [ ROLE_TABLE, ]; +/// Key of compacted revision +pub(crate) const COMPACT_REVISION: &str = "compact_revision"; + /// Database to store revision to kv mapping #[derive(Debug)] pub struct DB { @@ -144,6 +147,11 @@ impl StorageApi for DB { lease.id.encode_to_vec(), lease.encode_to_vec(), ), + WriteOp::PutCompactRevision(rev) => WriteOperation::new_put( + META_TABLE, + COMPACT_REVISION.as_bytes().to_vec(), + rev.to_le_bytes().to_vec(), + ), WriteOp::DeleteKeyValue(rev) => WriteOperation::new_delete(KV_TABLE, rev), WriteOp::DeleteLease(lease_id) => { let key = del_lease_key_buffer.get(&lease_id).unwrap_or_else(|| { @@ -195,6 +203,8 @@ pub enum WriteOp<'a> { PutAppliedIndex(u64), /// Put a lease to lease table PutLease(PbLease), + /// Put a compacted revision into meta table + PutCompactRevision(i64), /// Delete a key-value pair from kv table DeleteKeyValue(&'a [u8]), /// Delete a lease from lease table diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index e0cef03f3..fa468c9ef 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -22,10 +22,10 @@ use crate::{ header_gen::HeaderGenerator, revision_number::RevisionNumberGenerator, rpc::{ - Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, Event, - EventType, KeyValue, PutRequest, PutResponse, RangeRequest, RangeResponse, Request, - RequestWithToken, RequestWrapper, ResponseWrapper, SortOrder, SortTarget, TargetUnion, - TxnRequest, TxnResponse, + CompactionRequest, CompactionResponse, Compare, CompareResult, CompareTarget, + DeleteRangeRequest, DeleteRangeResponse, Event, EventType, KeyValue, PutRequest, + PutResponse, RangeRequest, RangeResponse, Request, RequestWithToken, RequestWrapper, + ResponseWrapper, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse, }, server::command::{CommandResponse, KeyRange, SyncResponse}, storage::{db::WriteOp, ExecuteError}, @@ -35,7 +35,6 @@ use crate::{ pub(crate) const KV_TABLE: &str = "kv"; /// KV store -#[allow(dead_code)] #[derive(Debug)] pub(crate) struct KvStore where @@ -465,7 +464,14 @@ where RequestWrapper::DeleteRangeRequest(ref req) => { self.handle_delete_range_request(req).map(Into::into) } - RequestWrapper::TxnRequest(ref req) => self.handle_txn_request(req).map(Into::into), + RequestWrapper::TxnRequest(ref req) => { + debug!("Receive TxnRequest {:?}", req); + self.handle_txn_request(req).map(Into::into) + } + RequestWrapper::CompactionRequest(ref req) => { + debug!("Receive CompactionRequest {:?}", req); + Ok(self.handle_compaction_request(req).into()) + } _ => unreachable!("Other request should not be sent to this store"), }; res @@ -578,6 +584,19 @@ where }) } + /// Handle `CompactionRequest` + fn handle_compaction_request(&self, req: &CompactionRequest) -> CompactionResponse { + let target_revision = req.revision; + debug_assert!( + target_revision > self.compacted_revision(), + "required revision should not be compacted" + ); + self.compacted_rev.store(target_revision, Relaxed); + CompactionResponse { + header: Some(self.header_gen.gen_header_without_revision()), + } + } + /// Sync requests in kv store async fn sync_request( &self, @@ -593,6 +612,9 @@ where self.sync_delete_range_request(req, revision, 0) } RequestWrapper::TxnRequest(ref req) => self.sync_txn_request(req, revision)?, + RequestWrapper::CompactionRequest(ref req) => { + self.sync_compaction_request(req, revision).await? + } _ => { unreachable!("only kv requests can be sent to kv store"); } @@ -601,6 +623,32 @@ where Ok((revision, ops)) } + /// Sync `CompactionRequest` and return if kvstore is changed + async fn sync_compaction_request( + &self, + req: &CompactionRequest, + _revision: i64, + ) -> Result<(Vec, Vec), ExecuteError> { + let revision = req.revision; + let ops = vec![WriteOp::PutCompactRevision(revision)]; + #[allow(clippy::collapsible_else_if)] + if req.physical { + let event = Arc::new(event_listener::Event::new()); + if let Err(e) = self + .compact_task_tx + .send((revision, Some(Arc::clone(&event)))) + { + panic!("the compactor exited unexpectedly: {e:?}"); + } + event.listen().await; + } else { + if let Err(e) = self.compact_task_tx.send((revision, None)) { + panic!("the compactor exited unexpectedly: {e:?}"); + } + } + Ok((ops, Vec::new())) + } + /// Sync `TxnRequest` and return if kvstore is changed fn sync_txn_request( &self,