From eed9a583f9fc2e5271cb1d7d1a53e1d2f05b45b1 Mon Sep 17 00:00:00 2001 From: lxl66566 Date: Thu, 19 Sep 2024 20:58:03 +0800 Subject: [PATCH] refactor(xlineapi): introduce BytesAffineRef and KeyRangeRef Signed-off-by: lxl66566 --- crates/xline/src/storage/index.rs | 32 +++--- crates/xline/src/storage/kv_store.rs | 32 +++--- crates/xlineapi/src/keyrange.rs | 166 ++++++++++++++++++++++----- 3 files changed, 170 insertions(+), 60 deletions(-) diff --git a/crates/xline/src/storage/index.rs b/crates/xline/src/storage/index.rs index 500266117..27a7b32a8 100644 --- a/crates/xline/src/storage/index.rs +++ b/crates/xline/src/storage/index.rs @@ -8,14 +8,14 @@ use crossbeam_skiplist::{map::Entry, SkipMap}; use itertools::Itertools; use parking_lot::{Mutex, RwLock}; use utils::parking_lot_lock::RwLockMap; -use xlineapi::keyrange::{BytesAffine, EtcdKeyRange, KeyRange, StdBoundRange}; +use xlineapi::keyrange::{BytesAffine, EtcdKeyRange, KeyRange, KeyRangeRef, StdBoundRange}; use super::revision::{KeyRevision, Revision}; /// Operations for `Index` pub(crate) trait IndexOperate { /// Get `Revision` of keys, get the latest `Revision` when revision <= 0 - fn get(&self, keyrange: KeyRange, revision: i64) -> Vec; + fn get(&self, keyrange: KeyRangeRef, revision: i64) -> Vec; /// Register a new `KeyRevision` of the given key /// @@ -258,22 +258,22 @@ where } impl IndexOperate for Index { - fn get(&self, keyrange: KeyRange, revision: i64) -> Vec { + fn get(&self, keyrange: KeyRangeRef, revision: i64) -> Vec { match keyrange { - KeyRange::OneKey(key) => self + KeyRangeRef::OneKey(key) => self .inner - .get(&key) + .get(key) .and_then(fmap_value(|revs| Index::get_revision(revs, revision))) .map(|rev| vec![rev]) .unwrap_or_default(), - KeyRange::Range(_) if keyrange.is_all_keys() => self + KeyRangeRef::Range(_) if keyrange.is_all_keys() => self .inner .iter() .filter_map(fmap_value(|revs| Index::get_revision(revs, revision))) .collect(), - KeyRange::Range(_) => self + KeyRangeRef::Range(_) => self .inner - .range(keyrange) + .range(keyrange.to_owned()) .filter_map(fmap_value(|revs| Index::get_revision(revs, revision))) .collect(), } @@ -504,20 +504,20 @@ impl IndexState<'_> { } impl IndexOperate for IndexState<'_> { - fn get(&self, keyrange: KeyRange, revision: i64) -> Vec { + fn get(&self, keyrange: KeyRangeRef, revision: i64) -> Vec { match keyrange { - KeyRange::OneKey(key) => { - Index::get_revision(&self.one_key_revisions(&key, &self.state.lock()), revision) + KeyRangeRef::OneKey(key) => { + Index::get_revision(&self.one_key_revisions(key, &self.state.lock()), revision) .map(|rev| vec![rev]) .unwrap_or_default() } - KeyRange::Range(_) if keyrange.is_all_keys() => self + KeyRangeRef::Range(_) if keyrange.is_all_keys() => self .all_key_revisions() .into_iter() .filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision)) .collect(), - KeyRange::Range(_) => self - .range_key_revisions(keyrange) + KeyRangeRef::Range(_) => self + .range_key_revisions(keyrange.to_owned()) .into_iter() .filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision)) .collect(), @@ -674,11 +674,11 @@ mod test { let index = init_and_test_insert(); let txn = index.state(); assert_eq!( - txn.get(KeyRange::new_one_key("key"), 0), + txn.get(KeyRange::new_one_key("key").to_ref(), 0), vec![Revision::new(3, 1)] ); assert_eq!( - txn.get(KeyRange::new_one_key("key"), 1), + txn.get(KeyRange::new_one_key("key").to_ref(), 1), vec![Revision::new(1, 3)] ); txn.commit(); diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index 4c4dce6d2..667d0e200 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -17,7 +17,7 @@ use utils::table_names::{KV_TABLE, META_TABLE}; use xlineapi::{ command::{CommandResponse, SyncResponse}, execute_error::ExecuteError, - keyrange::KeyRange, + keyrange::{KeyRange, KeyRangeRef}, }; use super::{ @@ -109,7 +109,7 @@ impl KvStoreInner { fn get_range( txn_db: &T, index: &dyn IndexOperate, - keyrange: KeyRange, + keyrange: KeyRangeRef, revision: i64, ) -> Result, ExecuteError> where @@ -124,7 +124,7 @@ impl KvStoreInner { fn get_range_with_opts( txn_db: &T, index: &dyn IndexOperate, - keyrange: KeyRange, + keyrange: KeyRangeRef, revision: i64, limit: usize, count_only: bool, @@ -149,7 +149,7 @@ impl KvStoreInner { Self::get_range( self.db.as_ref(), self.index.as_ref(), - KeyRange::OneKey(kv.key.clone()), + KeyRangeRef::OneKey(&kv.key), kv.mod_revision.overflow_sub(1), ) .ok()? @@ -473,7 +473,7 @@ impl KvStore { let kvs = KvStoreInner::get_range( txn_db, index, - KeyRange::new_etcd(cmp.key.clone(), cmp.range_end.clone()), + KeyRangeRef::new_etcd(&cmp.key, &cmp.range_end), 0, ) .unwrap_or_default(); @@ -628,7 +628,7 @@ impl KvStore { let (mut kvs, total) = KvStoreInner::get_range_with_opts( tnx_db, index, - KeyRange::new_etcd(req.key.clone(), req.range_end.clone()), + KeyRangeRef::new_etcd(&req.key, &req.range_end), req.revision, storage_fetch_limit.numeric_cast(), req.count_only, @@ -767,7 +767,7 @@ impl KvStore { let prev_kvs = KvStoreInner::get_range( txn_db, index, - KeyRange::new_etcd(req.key.clone(), req.range_end.clone()), + KeyRangeRef::new_etcd(&req.key, &req.range_end), 0, )?; let mut response = DeleteRangeResponse { @@ -1692,14 +1692,14 @@ mod test { let txn_db = store.inner.db.transaction(); let index = store.inner.index.state(); assert_eq!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a"), 2) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a").to_ref(), 2) .unwrap() .len(), 1, "(a, 1) should not be removed" ); assert_eq!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("b"), 3) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("b").to_ref(), 3) .unwrap() .len(), 1, @@ -1709,20 +1709,20 @@ mod test { let target_revisions = index_compact(&store, 4); store.compact(target_revisions.as_ref())?; assert!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a"), 2) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a").to_ref(), 2) .unwrap() .is_empty(), "(a, 1) should be removed" ); assert_eq!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("b"), 3) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("b").to_ref(), 3) .unwrap() .len(), 1, "(b, 2) should not be removed" ); assert_eq!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a"), 4) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a").to_ref(), 4) .unwrap() .len(), 1, @@ -1732,26 +1732,26 @@ mod test { let target_revisions = index_compact(&store, 5); store.compact(target_revisions.as_ref())?; assert!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a"), 2) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a").to_ref(), 2) .unwrap() .is_empty(), "(a, 1) should be removed" ); assert_eq!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("b"), 3) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("b").to_ref(), 3) .unwrap() .len(), 1, "(b, 2) should not be removed" ); assert!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a"), 4) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a").to_ref(), 4) .unwrap() .is_empty(), "(a, 3) should be removed" ); assert!( - KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a"), 5) + KvStoreInner::get_range(&txn_db, &index, KeyRange::new_one_key("a").to_ref(), 5) .unwrap() .is_empty(), "(a, 4) should be removed" diff --git a/crates/xlineapi/src/keyrange.rs b/crates/xlineapi/src/keyrange.rs index 901169973..1e0edf246 100644 --- a/crates/xlineapi/src/keyrange.rs +++ b/crates/xlineapi/src/keyrange.rs @@ -57,26 +57,71 @@ impl BytesAffine { pub fn new_unbounded() -> Self { Self::Unbounded } + + pub fn to_ref<'a>(&'a self) -> BytesAffineRef<'a> { + match self { + BytesAffine::Bytes(k) => BytesAffineRef::Bytes(k), + BytesAffine::Unbounded => BytesAffineRef::Unbounded, + } + } } impl PartialOrd for BytesAffine { + fn partial_cmp(&self, other: &Self) -> Option { + return self.to_ref().partial_cmp(&other.to_ref()); + } +} + +impl Ord for BytesAffine { + fn cmp(&self, other: &Self) -> cmp::Ordering { + return self.to_ref().cmp(&other.to_ref()); + } +} + +/// A Ref of a [`BytesAffine`] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] +pub enum BytesAffineRef<'a> { + /// Bytes bound, could be either Included or Excluded + Bytes(&'a [u8]), + /// Unbounded + Unbounded, +} + +impl<'a> BytesAffineRef<'a> { + pub fn new_key(bytes: &'a [u8]) -> Self { + Self::Bytes(bytes) + } + + pub fn new_unbounded() -> Self { + Self::Unbounded + } + + pub fn to_owned(&self) -> BytesAffine { + match self { + Self::Bytes(k) => BytesAffine::Bytes(k.to_vec()), + Self::Unbounded => BytesAffine::Unbounded, + } + } +} + +impl PartialOrd for BytesAffineRef<'_> { fn partial_cmp(&self, other: &Self) -> Option { match (self, other) { - (BytesAffine::Bytes(x), BytesAffine::Bytes(y)) => x.partial_cmp(y), - (BytesAffine::Bytes(_), BytesAffine::Unbounded) => Some(cmp::Ordering::Less), - (BytesAffine::Unbounded, BytesAffine::Bytes(_)) => Some(cmp::Ordering::Greater), - (BytesAffine::Unbounded, BytesAffine::Unbounded) => Some(cmp::Ordering::Equal), + (BytesAffineRef::Bytes(x), BytesAffineRef::Bytes(y)) => x.partial_cmp(y), + (BytesAffineRef::Bytes(_), BytesAffineRef::Unbounded) => Some(cmp::Ordering::Less), + (BytesAffineRef::Unbounded, BytesAffineRef::Bytes(_)) => Some(cmp::Ordering::Greater), + (BytesAffineRef::Unbounded, BytesAffineRef::Unbounded) => Some(cmp::Ordering::Equal), } } } -impl Ord for BytesAffine { +impl Ord for BytesAffineRef<'_> { fn cmp(&self, other: &Self) -> cmp::Ordering { match (self, other) { - (BytesAffine::Bytes(x), BytesAffine::Bytes(y)) => x.cmp(y), - (BytesAffine::Bytes(_), BytesAffine::Unbounded) => cmp::Ordering::Less, - (BytesAffine::Unbounded, BytesAffine::Bytes(_)) => cmp::Ordering::Greater, - (BytesAffine::Unbounded, BytesAffine::Unbounded) => cmp::Ordering::Equal, + (BytesAffineRef::Bytes(x), BytesAffineRef::Bytes(y)) => x.cmp(y), + (BytesAffineRef::Bytes(_), BytesAffineRef::Unbounded) => cmp::Ordering::Less, + (BytesAffineRef::Unbounded, BytesAffineRef::Bytes(_)) => cmp::Ordering::Greater, + (BytesAffineRef::Unbounded, BytesAffineRef::Unbounded) => cmp::Ordering::Equal, } } } @@ -171,24 +216,13 @@ impl KeyRange { #[must_use] #[inline] pub fn contains_key(&self, key: &[u8]) -> bool { - match self { - Self::OneKey(k) => k == key, - Self::Range(r) => { - let key_aff = BytesAffine::Bytes(key.to_vec()); - r.low <= key_aff && key_aff < r.high - } - } + self.to_ref().contains_key(key) } /// Check if `KeyRange` overlaps with another `KeyRange` #[inline] pub fn overlaps(&self, other: &Self) -> bool { - match (self, other) { - (Self::OneKey(k1), Self::OneKey(k2)) => k1 == k2, - (Self::Range(r1), Self::Range(r2)) => r1.overlaps(r2), - (Self::OneKey(k), Self::Range(_)) => other.contains_key(k), - (Self::Range(_), Self::OneKey(k)) => self.contains_key(&k), - } + self.to_ref().overlaps(&other.to_ref()) } /// Get end of range with prefix @@ -204,12 +238,7 @@ impl KeyRange { #[must_use] #[inline] pub fn is_all_keys(&self) -> bool { - match self { - Self::OneKey(_) => false, - Self::Range(r) => { - r.low == BytesAffine::Bytes(UNBOUNDED.into()) && r.high == BytesAffine::Unbounded - } - } + return self.to_ref().is_all_keys(); } /// unpack `KeyRange` to `BytesAffine` tuple @@ -272,6 +301,13 @@ impl KeyRange { }, } } + + pub fn to_ref<'a>(&'a self) -> KeyRangeRef<'a> { + match self { + Self::OneKey(k) => KeyRangeRef::OneKey(k), + Self::Range(r) => KeyRangeRef::Range(Interval::new(r.low.to_ref(), r.high.to_ref())), + } + } } impl std::ops::RangeBounds> for KeyRange { @@ -375,6 +411,80 @@ impl ConflictCheck for KeyRange { } } +/// A Ref of a [`KeyRange`]. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] +pub enum KeyRangeRef<'a> { + /// OneKey, to distinguish from `Prefix` because they all have [a, a+1) form + OneKey(&'a [u8]), + /// A [start, end) range. + /// + /// Note: The `low` of [`KeyRange::Range`] Interval must be Bytes, because a Interval `low` + /// must less than `high`, but [`BytesAffine::Unbounded`] is always greater than any Bytes. + Range(Interval>), +} + +impl<'a> KeyRangeRef<'a> { + /// convert to owned [`KeyRange`], this will clone the inner key. + pub fn to_owned(&self) -> KeyRange { + match self { + Self::OneKey(k) => KeyRange::OneKey(k.to_vec()), + Self::Range(r) => KeyRange::Range(Interval::new(r.low.to_owned(), r.high.to_owned())), + } + } + + /// create a new `KeyRangeRef` from `start` and `end` in etcd form + pub fn new_etcd(start: &'a [u8], end: &'a [u8]) -> Self { + let range_end = match end { + ONE_KEY => return Self::OneKey(start), + UNBOUNDED => BytesAffineRef::Unbounded, + _ => BytesAffineRef::Bytes(end), + }; + let key = BytesAffineRef::Bytes(start); // `low` must be Bytes + debug_assert!( + key < range_end, + "key `{key:?}` must be less than range_end `{range_end:?}`" + ); + Self::Range(Interval::new(key, range_end)) + } + + /// if this range contains all keys + #[must_use] + #[inline] + pub fn is_all_keys(&self) -> bool { + match self { + Self::OneKey(_) => false, + Self::Range(r) => { + r.low == BytesAffineRef::Bytes(UNBOUNDED.into()) + && r.high == BytesAffineRef::Unbounded + } + } + } + + /// Check if `KeyRangeRef` contains a key + #[must_use] + #[inline] + pub fn contains_key(&self, key: &[u8]) -> bool { + match self { + Self::OneKey(k) => *k == key, + Self::Range(r) => { + let key_aff = BytesAffineRef::Bytes(key); + r.low <= key_aff && key_aff < r.high + } + } + } + + /// Check if `KeyRangeRef` overlaps with another `KeyRangeRef` + #[inline] + pub fn overlaps(&self, other: &Self) -> bool { + match (self, other) { + (Self::OneKey(k1), Self::OneKey(k2)) => k1 == k2, + (Self::Range(r1), Self::Range(r2)) => r1.overlaps(r2), + (Self::OneKey(k), Self::Range(_)) => other.contains_key(k), + (Self::Range(_), Self::OneKey(k)) => self.contains_key(&k), + } + } +} + #[cfg(test)] mod tests { use super::*;