diff --git a/xline/src/server/kv_server.rs b/xline/src/server/kv_server.rs index 84dd2c933..ccf9e3762 100644 --- a/xline/src/server/kv_server.rs +++ b/xline/src/server/kv_server.rs @@ -222,7 +222,7 @@ where range_revision: i64, compacted_revision: i64, ) -> Result<(), tonic::Status> { - (range_revision >= compacted_revision) + (range_revision <= 0 || range_revision >= compacted_revision) .then_some(()) .ok_or(tonic::Status::invalid_argument(format!( "required revision {range_revision} has been compacted, compacted revision is {compacted_revision}" diff --git a/xline/src/storage/db.rs b/xline/src/storage/db.rs index 9486c00b6..d2294aaf8 100644 --- a/xline/src/storage/db.rs +++ b/xline/src/storage/db.rs @@ -9,7 +9,7 @@ use super::{ kv_store::KV_TABLE, lease_store::LEASE_TABLE, storage_api::StorageApi, - ExecuteError, Revision, + ExecuteError, }; use crate::{ rpc::{PbLease, Role, User}, @@ -133,8 +133,7 @@ impl StorageApi for DB { .collect::>(); for op in ops { let wop = match op { - WriteOp::PutKeyValue(rev, value) => { - let key = rev.encode_to_vec(); + WriteOp::PutKeyValue(key, value) => { WriteOperation::new_put(KV_TABLE, key, value.clone()) } WriteOp::PutAppliedIndex(index) => WriteOperation::new_put( @@ -198,7 +197,7 @@ impl StorageApi for DB { #[non_exhaustive] pub enum WriteOp<'a> { /// Put a key-value pair to kv table - PutKeyValue(Revision, Vec), + PutKeyValue(Vec, Vec), /// Put the applied index to meta table PutAppliedIndex(u64), /// Put a lease to lease table @@ -231,14 +230,15 @@ mod test { use test_macros::abort_on_panic; use super::*; + use crate::storage::Revision; #[tokio::test] #[abort_on_panic] async fn test_reset() -> Result<(), ExecuteError> { let data_dir = PathBuf::from("/tmp/test_reset"); let db = DB::open(&StorageConfig::RocksDB(data_dir.clone()))?; - let revision = Revision::new(1, 1); - let key = revision.encode_to_vec(); + let revision = Revision::new(1, 1).encode_to_vec(); + let key = revision.clone(); let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())]; db.flush_ops(ops)?; let res = db.get_value(KV_TABLE, &key)?; @@ -264,8 +264,8 @@ mod test { let snapshot_path = dir.join("snapshot"); let origin_db = DB::open(&StorageConfig::RocksDB(origin_db_path))?; - let revision = Revision::new(1, 1); - let key = revision.encode_to_vec(); + let revision = Revision::new(1, 1).encode_to_vec(); + let key: Vec = revision.clone(); let ops = vec![WriteOp::PutKeyValue(revision, "value1".into())]; origin_db.flush_ops(ops)?; @@ -336,7 +336,7 @@ mod test { }; let role_bytes = role.encode_to_vec(); let write_ops = vec![ - WriteOp::PutKeyValue(Revision::new(1, 2), "value".into()), + WriteOp::PutKeyValue(Revision::new(1, 2).encode_to_vec(), "value".into()), WriteOp::PutAppliedIndex(5), WriteOp::PutLease(lease), WriteOp::PutAuthEnable(true), diff --git a/xline/src/storage/index.rs b/xline/src/storage/index.rs index 9ad5b67b0..997d66ce1 100644 --- a/xline/src/storage/index.rs +++ b/xline/src/storage/index.rs @@ -299,44 +299,28 @@ impl IndexOperate for Index { inner.index.iter_mut().for_each(|(key, revisions)| { if let Some(revision) = revisions.first() { if revision.mod_revision < at_rev { - match revisions.binary_search_by(|rev| rev.mod_revision.cmp(&at_rev)) { - Ok(idx) => { - let key_rev = revisions.get(idx).unwrap_or_else(|| { - unreachable!( - "{idx} is out of range, len of revisions is {}", - revisions.len() - ) - }); - let compact_revs = if key_rev.is_deleted() { - revisions.drain(..=idx) - } else { - revisions.drain(..idx) - }; - revs.extend(compact_revs.into_iter()); - } - Err(idx) => { - let compacted_last_idx = idx.overflow_sub(1); - let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| { - unreachable!( - "{idx} is out of range, len of revisions is {}", - revisions.len() - ) - }); - let compact_revs = if key_rev.is_deleted() { - revisions.drain(..=compacted_last_idx) - } else { - revisions.drain(..compacted_last_idx) - }; - revs.extend(compact_revs.into_iter()); - } - } + let pivot = revisions.partition_point(|rev| rev.mod_revision <= at_rev); + let compacted_last_idx = pivot.overflow_sub(1); + // There is at least 1 element in the first partition, so the key revision at `compacted_last_idx` + // must exist. + let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| { + unreachable!( + "Oops, the key revision at {compacted_last_idx} should not be None", + ) + }); + let compact_revs = if key_rev.is_deleted() { + revisions.drain(..=compacted_last_idx) + } else { + revisions.drain(..compacted_last_idx) + }; + revs.extend(compact_revs.into_iter()); + if revisions.is_empty() { del_keys.push(key.clone()); } } } }); - for key in del_keys { let _ignore = inner.index.remove(&key); } diff --git a/xline/src/storage/kv_store.rs b/xline/src/storage/kv_store.rs index 97cb9b6f7..5d8a4d3b5 100644 --- a/xline/src/storage/kv_store.rs +++ b/xline/src/storage/kv_store.rs @@ -747,7 +747,7 @@ where .unwrap_or_else(|e| panic!("unexpected error from lease Attach: {e}")); } ops.push(WriteOp::PutKeyValue( - new_rev.as_revision(), + new_rev.as_revision().encode_to_vec(), kv.encode_to_vec(), )); let event = Event { @@ -793,7 +793,7 @@ where ..KeyValue::default() }; let value = del_kv.encode_to_vec(); - WriteOp::PutKeyValue(new_rev, value) + WriteOp::PutKeyValue(new_rev.encode_to_vec(), value) }) .collect() } diff --git a/xline/src/storage/revision.rs b/xline/src/storage/revision.rs index 652951412..c47d061da 100644 --- a/xline/src/storage/revision.rs +++ b/xline/src/storage/revision.rs @@ -18,7 +18,7 @@ pub(super) struct KeyRevision { /// Revision #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] -pub struct Revision { +pub(crate) struct Revision { /// Main revision revision: i64, /// Sub revision in one transaction or range deletion