Skip to content

Commit

Permalink
refactor: unify the argument between PutKeyValue and DeleteKeyValue
Browse files Browse the repository at this point in the history
Signed-off-by: Phoeniix Zhao <[email protected]>
  • Loading branch information
Phoenix500526 committed Jul 14, 2023
1 parent a976510 commit 9eae316
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 44 deletions.
18 changes: 9 additions & 9 deletions xline/src/storage/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use super::{
kv_store::KV_TABLE,
lease_store::LEASE_TABLE,
storage_api::StorageApi,
ExecuteError, Revision,
ExecuteError,
};
use crate::{
rpc::{PbLease, Role, User},
Expand Down Expand Up @@ -133,8 +133,7 @@ impl StorageApi for DB {
.collect::<HashMap<_, _>>();
for op in ops {
let wop = match op {
WriteOp::PutKeyValue(rev, value) => {
let key = rev.encode_to_vec();
WriteOp::PutKeyValue(key, value) => {
WriteOperation::new_put(KV_TABLE, key, value.clone())
}
WriteOp::PutAppliedIndex(index) => WriteOperation::new_put(
Expand Down Expand Up @@ -198,7 +197,7 @@ impl StorageApi for DB {
#[non_exhaustive]
pub enum WriteOp<'a> {
/// Put a key-value pair to kv table
PutKeyValue(Revision, Vec<u8>),
PutKeyValue(Vec<u8>, Vec<u8>),
/// Put the applied index to meta table
PutAppliedIndex(u64),
/// Put a lease to lease table
Expand Down Expand Up @@ -231,14 +230,15 @@ mod test {
use test_macros::abort_on_panic;

use super::*;
use crate::storage::Revision;
#[tokio::test]
#[abort_on_panic]
async fn test_reset() -> Result<(), ExecuteError> {
let data_dir = PathBuf::from("/tmp/test_reset");
let db = DB::open(&StorageConfig::RocksDB(data_dir.clone()))?;

let revision = Revision::new(1, 1);
let key = revision.encode_to_vec();
let revision = Revision::new(1, 1).encode_to_vec();
let key = revision.clone();
let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())];
db.flush_ops(ops)?;
let res = db.get_value(KV_TABLE, &key)?;
Expand All @@ -264,8 +264,8 @@ mod test {
let snapshot_path = dir.join("snapshot");
let origin_db = DB::open(&StorageConfig::RocksDB(origin_db_path))?;

let revision = Revision::new(1, 1);
let key = revision.encode_to_vec();
let revision = Revision::new(1, 1).encode_to_vec();
let key: Vec<u8> = revision.clone();
let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())];
origin_db.flush_ops(ops)?;

Expand Down Expand Up @@ -336,7 +336,7 @@ mod test {
};
let role_bytes = role.encode_to_vec();
let write_ops = vec![
WriteOp::PutKeyValue(Revision::new(1, 2), "value".into()),
WriteOp::PutKeyValue(Revision::new(1, 2).encode_to_vec(), "value".into()),
WriteOp::PutAppliedIndex(5),
WriteOp::PutLease(lease),
WriteOp::PutAuthEnable(true),
Expand Down
48 changes: 16 additions & 32 deletions xline/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,44 +299,28 @@ impl IndexOperate for Index {
inner.index.iter_mut().for_each(|(key, revisions)| {
if let Some(revision) = revisions.first() {
if revision.mod_revision < at_rev {
match revisions.binary_search_by(|rev| rev.mod_revision.cmp(&at_rev)) {
Ok(idx) => {
let key_rev = revisions.get(idx).unwrap_or_else(|| {
unreachable!(
"{idx} is out of range, len of revisions is {}",
revisions.len()
)
});
let compact_revs = if key_rev.is_deleted() {
revisions.drain(..=idx)
} else {
revisions.drain(..idx)
};
revs.extend(compact_revs.into_iter());
}
Err(idx) => {
let compacted_last_idx = idx.overflow_sub(1);
let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| {
unreachable!(
"{idx} is out of range, len of revisions is {}",
revisions.len()
)
});
let compact_revs = if key_rev.is_deleted() {
revisions.drain(..=compacted_last_idx)
} else {
revisions.drain(..compacted_last_idx)
};
revs.extend(compact_revs.into_iter());
}
}
let pivot = revisions.partition_point(|rev| rev.mod_revision <= at_rev);
let compacted_last_idx = pivot.overflow_sub(1);
// There is at least 1 element in the first partition, so the key revision at `compacted_last_idx`
// must exist.
let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| {
unreachable!(
"Oops, the key revision at {compacted_last_idx} should not be None",
)
});
let compact_revs = if key_rev.is_deleted() {
revisions.drain(..=compacted_last_idx)
} else {
revisions.drain(..compacted_last_idx)
};
revs.extend(compact_revs.into_iter());

if revisions.is_empty() {
del_keys.push(key.clone());
}
}
}
});

for key in del_keys {
let _ignore = inner.index.remove(&key);
}
Expand Down
4 changes: 2 additions & 2 deletions xline/src/storage/kv_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ where
.unwrap_or_else(|e| panic!("unexpected error from lease Attach: {e}"));
}
ops.push(WriteOp::PutKeyValue(
new_rev.as_revision(),
new_rev.as_revision().encode_to_vec(),
kv.encode_to_vec(),
));
let event = Event {
Expand Down Expand Up @@ -793,7 +793,7 @@ where
..KeyValue::default()
};
let value = del_kv.encode_to_vec();
WriteOp::PutKeyValue(new_rev, value)
WriteOp::PutKeyValue(new_rev.encode_to_vec(), value)
})
.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion xline/src/storage/revision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub(super) struct KeyRevision {

/// Revision
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub struct Revision {
pub(crate) struct Revision {
/// Main revision
revision: i64,
/// Sub revision in one transaction or range deletion
Expand Down

0 comments on commit 9eae316

Please sign in to comment.