Skip to content

Commit

Permalink
feat(compactor): add compact logic in recover process
Browse files Browse the repository at this point in the history
Refs: #188
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 authored and mergify[bot] committed Jul 25, 2023
1 parent dd2e492 commit 1a1f446
Showing 1 changed file with 75 additions and 35 deletions.
110 changes: 75 additions & 35 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ use crate::{
PutResponse, RangeRequest, RangeResponse, Request, RequestWithToken, RequestWrapper,
ResponseWrapper, SortOrder, SortTarget, TargetUnion, TxnRequest, TxnResponse,
},
server::command::{CommandResponse, KeyRange, SyncResponse},
storage::{db::WriteOp, ExecuteError},
server::command::{CommandResponse, KeyRange, SyncResponse, META_TABLE},
storage::{
db::{WriteOp, COMPACT_REVISION},
ExecuteError,
},
};

/// KV table name
Expand Down Expand Up @@ -116,8 +119,18 @@ where
self.attach(lease_id, key)?;
}

// compact Lock free

if let Some(revision_bytes) = self.db.get_value(META_TABLE, COMPACT_REVISION)? {
let compacted_revision =
i64::from_le_bytes(revision_bytes.try_into().map_err(|e| {
ExecuteError::DbError(format!(
"cannot decode compacted revision from META_TABLE: {e:?}"
))
})?);
self.update_compacted_revision(compacted_revision);
if let Err(e) = self.compact_task_tx.send((compacted_revision, None)) {
panic!("the compactor exited unexpectedly: {e:?}");
}
}
Ok(())
}

Expand Down Expand Up @@ -157,12 +170,12 @@ where
self.revision.get()
}

/// Get compacted revision of KV stre
/// Get compacted revision of KV store
pub(crate) fn compacted_revision(&self) -> i64 {
self.compacted_rev.load(Relaxed)
}

#[cfg(test)]
/// Update compacted revision of KV store
pub(crate) fn update_compacted_revision(&self, revision: i64) {
self.compacted_rev.store(revision, Relaxed);
}
Expand Down Expand Up @@ -591,7 +604,7 @@ where
target_revision > self.compacted_revision(),
"required revision should not be compacted"
);
self.compacted_rev.store(target_revision, Relaxed);
self.update_compacted_revision(target_revision);
CompactionResponse {
header: Some(self.header_gen.gen_header_without_revision()),
}
Expand Down Expand Up @@ -832,7 +845,7 @@ mod test {
use crate::{
revision_number::RevisionNumberGenerator,
rpc::RequestOp,
storage::{db::DB, kvwatcher::KvWatcher},
storage::{compact::compactor, db::DB, kvwatcher::KvWatcher},
};

const CHANNEL_SIZE: usize = 1024;
Expand All @@ -851,8 +864,8 @@ mod test {
db: Arc<DB>,
) -> Result<(Arc<KvStore<DB>>, RevisionNumberGenerator), ExecuteError> {
let store = init_empty_store(db);
let keys = vec!["a", "b", "c", "d", "e"];
let vals = vec!["a", "b", "c", "d", "e"];
let keys = vec!["a", "b", "c", "d", "e", "z", "z", "z"];
let vals = vec!["a", "b", "c", "d", "e", "z1", "z2", "z3"];
let revision = RevisionNumberGenerator::default();
for (key, val) in keys.into_iter().zip(vals.into_iter()) {
let req = RequestWithToken::new(
Expand All @@ -869,13 +882,13 @@ mod test {
}

fn init_empty_store(db: Arc<DB>) -> Arc<KvStore<DB>> {
let (compact_tx, _compact_rx) = mpsc::unbounded_channel();
let (compact_tx, compact_rx) = mpsc::unbounded_channel();
let (kv_update_tx, kv_update_rx) = mpsc::channel(CHANNEL_SIZE);
let lease_collection = Arc::new(LeaseCollection::new(0));
let header_gen = Arc::new(HeaderGenerator::new(0, 0));
let index = Arc::new(Index::new());
let storage = Arc::new(KvStore::new(
index,
Arc::clone(&index),
db,
header_gen,
kv_update_tx,
Expand All @@ -889,6 +902,13 @@ mod test {
shutdown_trigger,
Duration::from_millis(10),
);
let _compactor = tokio::spawn(compactor(
Arc::clone(&storage),
index,
1000,
Duration::from_millis(10),
compact_rx,
));
storage
}

Expand Down Expand Up @@ -925,7 +945,7 @@ mod test {
..Default::default()
};
let response = store.handle_range_request(&request)?;
assert_eq!(response.kvs.len(), 5);
assert_eq!(response.kvs.len(), 6);
for kv in response.kvs {
assert!(kv.value.is_empty());
}
Expand Down Expand Up @@ -968,7 +988,7 @@ mod test {
..Default::default()
};
let response = store.handle_range_request(&request)?;
assert_eq!(response.count, 5);
assert_eq!(response.count, 6);
assert_eq!(response.kvs.len(), 2);
assert_eq!(response.kvs[0].create_revision, 2);
assert_eq!(response.kvs[1].create_revision, 3);
Expand All @@ -981,8 +1001,9 @@ mod test {
async fn test_range_sort() -> Result<(), ExecuteError> {
let db = DB::open(&StorageConfig::Memory)?;
let (store, _rev) = init_store(db).await?;
let keys = ["a", "b", "c", "d", "e"];
let reversed_keys = ["e", "d", "c", "b", "a"];
let keys = ["a", "b", "c", "d", "e", "z"];
let reversed_keys = ["z", "e", "d", "c", "b", "a"];
let version_keys = ["z", "a", "b", "c", "d", "e"];

for order in [SortOrder::Ascend, SortOrder::Descend, SortOrder::None] {
for target in [
Expand All @@ -992,30 +1013,43 @@ mod test {
SortTarget::Value,
] {
let response = store.handle_range_request(&sort_req(order, target))?;
assert_eq!(response.count, 5);
assert_eq!(response.kvs.len(), 5);
let expected = match order {
assert_eq!(response.count, 6);
assert_eq!(response.kvs.len(), 6);
let expected: [&str; 6] = match order {
SortOrder::Descend => reversed_keys,
SortOrder::Ascend | SortOrder::None => keys,
};
let is_identical = response
.kvs
.iter()
.zip(expected.iter())
.all(|(kv, want)| kv.key == want.as_bytes());
assert!(is_identical);
for (kv, want) in response.kvs.iter().zip(expected.iter()) {
assert_eq!(
kv.key,
want.as_bytes(),
"order: {:?}, target: {:?}, key {:?}, want {:?}",
order,
target,
kv.key,
want.as_bytes(),
);
}
}
}
for order in [SortOrder::Ascend, SortOrder::Descend, SortOrder::None] {
let response = store.handle_range_request(&sort_req(order, SortTarget::Version))?;
assert_eq!(response.count, 5);
assert_eq!(response.kvs.len(), 5);
let is_identical = response
.kvs
.iter()
.zip(keys.iter())
.all(|(kv, want)| kv.key == want.as_bytes());
assert!(is_identical);
assert_eq!(response.count, 6);
assert_eq!(response.kvs.len(), 6);
let expected = match order {
SortOrder::Ascend | SortOrder::None => keys,
SortOrder::Descend => version_keys,
};
for (kv, want) in response.kvs.iter().zip(expected.iter()) {
assert_eq!(
kv.key,
want.as_bytes(),
"order: {:?}, key {:?}, want {:?}",
order,
kv.key,
want.as_bytes(),
);
}
}
Ok(())
}
Expand All @@ -1024,7 +1058,10 @@ mod test {
#[abort_on_panic]
async fn test_recover() -> Result<(), ExecuteError> {
let db = DB::open(&StorageConfig::Memory)?;
let _store = init_store(Arc::clone(&db)).await?;
let ops = vec![WriteOp::PutCompactRevision(8)];
db.flush_ops(ops)?;
let (store, _rev_gen) = init_store(Arc::clone(&db)).await?;
assert_eq!(store.index.get_from_rev(b"z", b"", 5).len(), 3);

let new_store = init_empty_store(db);

Expand All @@ -1035,13 +1072,16 @@ mod test {
};
let res = new_store.handle_range_request(&range_req)?;
assert_eq!(res.kvs.len(), 0);
assert_eq!(new_store.compacted_revision(), -1);

new_store.recover()?;

let res = new_store.handle_range_request(&range_req)?;
assert_eq!(res.kvs.len(), 1);
assert_eq!(res.kvs[0].key, b"a");

assert_eq!(new_store.compacted_revision(), 8);
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(new_store.index.get_from_rev(b"z", b"", 5).len(), 2);
Ok(())
}

Expand Down

0 comments on commit 1a1f446

Please sign in to comment.