Skip to content

Commit

Permalink
feat(compactor): implement compact consensus process
Browse files Browse the repository at this point in the history
Refs: 188
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Jul 3, 2023
1 parent fcf9406 commit 004ef18
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 6 deletions.
10 changes: 10 additions & 0 deletions xline/src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub(crate) const XLINE_TABLES: [&str; 6] = [
ROLE_TABLE,
];

/// Key of compacted revision
pub(crate) const COMPACT_REVISION: &str = "compact_revision";

/// Database to store revision to kv mapping
#[derive(Debug)]
pub struct DB {
Expand Down Expand Up @@ -144,6 +147,11 @@ impl StorageApi for DB {
lease.id.encode_to_vec(),
lease.encode_to_vec(),
),
WriteOp::PutCompactRevision(rev) => WriteOperation::new_put(
META_TABLE,
COMPACT_REVISION.as_bytes().to_vec(),
rev.to_le_bytes().to_vec(),
),
WriteOp::DeleteKeyValue(rev) => WriteOperation::new_delete(KV_TABLE, rev),
WriteOp::DeleteLease(lease_id) => {
let key = del_lease_key_buffer.get(&lease_id).unwrap_or_else(|| {
Expand Down Expand Up @@ -195,6 +203,8 @@ pub enum WriteOp<'a> {
PutAppliedIndex(u64),
/// Put a lease to lease table
PutLease(PbLease),
/// Put a compacted revision into meta table
PutCompactRevision(i64),
/// Delete a key-value pair from kv table
DeleteKeyValue(&'a [u8]),
/// Delete a lease from lease table
Expand Down
60 changes: 54 additions & 6 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use crate::{
header_gen::HeaderGenerator,
revision_number::RevisionNumberGenerator,
rpc::{
Compare, CompareResult, CompareTarget, DeleteRangeRequest, DeleteRangeResponse, Event,
EventType, KeyValue, PutRequest, PutResponse, RangeRequest, RangeResponse, Request,
RequestWithToken, RequestWrapper, ResponseWrapper, SortOrder, SortTarget, TargetUnion,
TxnRequest, TxnResponse,
CompactionRequest, CompactionResponse, Compare, CompareResult, CompareTarget,
DeleteRangeRequest, DeleteRangeResponse, Event, EventType, KeyValue, PutRequest,
PutResponse, RangeRequest, RangeResponse, Request, RequestWithToken, RequestWrapper,
ResponseWrapper, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse,
},
server::command::{CommandResponse, KeyRange, SyncResponse},
storage::{db::WriteOp, ExecuteError},
Expand All @@ -35,7 +35,6 @@ use crate::{
pub(crate) const KV_TABLE: &str = "kv";

/// KV store
#[allow(dead_code)]
#[derive(Debug)]
pub(crate) struct KvStore<DB>
where
Expand Down Expand Up @@ -465,7 +464,14 @@ where
RequestWrapper::DeleteRangeRequest(ref req) => {
self.handle_delete_range_request(req).map(Into::into)
}
RequestWrapper::TxnRequest(ref req) => self.handle_txn_request(req).map(Into::into),
RequestWrapper::TxnRequest(ref req) => {
debug!("Receive TxnRequest {:?}", req);
self.handle_txn_request(req).map(Into::into)
}
RequestWrapper::CompactionRequest(ref req) => {
debug!("Receive CompactionRequest {:?}", req);
Ok(self.handle_compaction_request(req).into())
}
_ => unreachable!("Other request should not be sent to this store"),
};
res
Expand Down Expand Up @@ -578,6 +584,19 @@ where
})
}

/// Handle `CompactionRequest`
fn handle_compaction_request(&self, req: &CompactionRequest) -> CompactionResponse {
let target_revision = req.revision;
debug_assert!(
target_revision > self.compacted_revision(),
"required revision should not be compacted"
);
self.compacted_rev.store(target_revision, Relaxed);
CompactionResponse {
header: Some(self.header_gen.gen_header_without_revision()),
}
}

/// Sync requests in kv store
async fn sync_request(
&self,
Expand All @@ -593,6 +612,9 @@ where
self.sync_delete_range_request(req, revision, 0)
}
RequestWrapper::TxnRequest(ref req) => self.sync_txn_request(req, revision)?,
RequestWrapper::CompactionRequest(ref req) => {
self.sync_compaction_request(req, revision).await?
}
_ => {
unreachable!("only kv requests can be sent to kv store");
}
Expand All @@ -601,6 +623,32 @@ where
Ok((revision, ops))
}

/// Sync `CompactionRequest` and return if kvstore is changed
async fn sync_compaction_request(
&self,
req: &CompactionRequest,
_revision: i64,
) -> Result<(Vec<WriteOp>, Vec<Event>), ExecuteError> {
let revision = req.revision;
let ops = vec![WriteOp::PutCompactRevision(revision)];
#[allow(clippy::collapsible_else_if)]
if req.physical {
let event = Arc::new(event_listener::Event::new());
if let Err(e) = self
.compact_task_tx
.send((revision, Some(Arc::clone(&event))))
{
panic!("the compactor exited unexpectedly: {e:?}");
}
event.listen().await;
} else {
if let Err(e) = self.compact_task_tx.send((revision, None)) {
panic!("the compactor exited unexpectedly: {e:?}");
}
}
Ok((ops, Vec::new()))
}

/// Sync `TxnRequest` and return if kvstore is changed
fn sync_txn_request(
&self,
Expand Down

0 comments on commit 004ef18

Please sign in to comment.