From a7fc0c2a2261b693223b4766a477a84ddc243610 Mon Sep 17 00:00:00 2001 From: lxl66566 Date: Wed, 7 Aug 2024 17:16:48 +0800 Subject: [PATCH] refactor(xlineapi): use KeyRange instead of directly pass &[u8] to avoid clone Signed-off-by: lxl66566 --- crates/xline/src/storage/index.rs | 185 +++++++++++--------- crates/xline/src/storage/kv_store.rs | 140 ++++++++++----- crates/xline/src/storage/lease_store/mod.rs | 6 +- crates/xlineapi/src/keyrange.rs | 34 +--- 4 files changed, 211 insertions(+), 154 deletions(-) diff --git a/crates/xline/src/storage/index.rs b/crates/xline/src/storage/index.rs index cbbe0c6f4..6f223163e 100644 --- a/crates/xline/src/storage/index.rs +++ b/crates/xline/src/storage/index.rs @@ -8,14 +8,14 @@ use crossbeam_skiplist::{map::Entry, SkipMap}; use itertools::Itertools; use parking_lot::{Mutex, RwLock}; use utils::parking_lot_lock::RwLockMap; -use xlineapi::keyrange::{EtcdKeyRange, KeyRange, RangeType, StdBoundRange}; +use xlineapi::keyrange::{BytesAffine, EtcdKeyRange, KeyRange, StdBoundRange}; use super::revision::{KeyRevision, Revision}; /// Operations for `Index` pub(crate) trait IndexOperate { /// Get `Revision` of keys, get the latest `Revision` when revision <= 0 - fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec; + fn get(&self, keyrange: KeyRange, revision: i64) -> Vec; /// Register a new `KeyRevision` of the given key /// @@ -37,8 +37,7 @@ pub(crate) trait IndexOperate { /// return all revision pairs and all keys in range fn delete( &self, - key: &[u8], - range_end: &[u8], + keyrange: KeyRange, revision: i64, sub_revision: i64, ) -> (Vec<(Revision, Revision)>, Vec>); @@ -134,35 +133,33 @@ impl Index { impl Index { /// Get `Revision` of keys from one revision - pub(super) fn get_from_rev( - &self, - key: &[u8], - range_end: &[u8], - revision: i64, - ) -> Vec { - match RangeType::get_range_type(key, range_end) { - RangeType::OneKey => self + pub(super) fn get_from_rev(&self, keyrange: KeyRange, revision: i64) -> Vec { + match keyrange { + KeyRange::OneKey(key) => self .inner - .get(key) + .get(&key) .map(|entry| { entry .value() .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) }) .unwrap_or_default(), - RangeType::AllKeys => self - .inner - .iter() - .flat_map(|entry| { - entry - .value() - .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) - }) - .sorted() - .collect(), - RangeType::Range => self + KeyRange::Range(r) + if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded => + { + self.inner + .iter() + .flat_map(|entry| { + entry + .value() + .map_read(|revs| Self::filter_revision(revs.as_ref(), revision)) + }) + .sorted() + .collect() + } + KeyRange::Range(_) => self .inner - .range(KeyRange::new_etcd(key, range_end)) + .range(keyrange) .flat_map(|entry| { entry .value() @@ -264,22 +261,25 @@ where } impl IndexOperate for Index { - fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec { - match RangeType::get_range_type(key, range_end) { - RangeType::OneKey => self + fn get(&self, keyrange: KeyRange, revision: i64) -> Vec { + match keyrange { + KeyRange::OneKey(key) => self .inner - .get(key) + .get(&key) .and_then(fmap_value(|revs| Index::get_revision(revs, revision))) .map(|rev| vec![rev]) .unwrap_or_default(), - RangeType::AllKeys => self - .inner - .iter() - .filter_map(fmap_value(|revs| Index::get_revision(revs, revision))) - .collect(), - RangeType::Range => self + KeyRange::Range(r) + if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded => + { + self.inner + .iter() + .filter_map(fmap_value(|revs| Index::get_revision(revs, revision))) + .collect() + } + KeyRange::Range(_) => self .inner - .range(KeyRange::new_etcd(key, range_end)) + .range(keyrange) .filter_map(fmap_value(|revs| Index::get_revision(revs, revision))) .collect(), } @@ -338,42 +338,44 @@ impl IndexOperate for Index { fn delete( &self, - key: &[u8], - range_end: &[u8], + keyrange: KeyRange, revision: i64, sub_revision: i64, ) -> (Vec<(Revision, Revision)>, Vec>) { - let (pairs, keys) = match RangeType::get_range_type(key, range_end) { - RangeType::OneKey => { + let (pairs, keys) = match keyrange { + KeyRange::OneKey(key) => { let pairs: Vec<(Revision, Revision)> = self .inner - .get(key) + .get(&key) .into_iter() .filter_map(fmap_value_mut(|revs| { Self::gen_del_revision(revs, revision, sub_revision) })) .collect(); - let keys = if pairs.is_empty() { - vec![] - } else { - vec![key.to_vec()] - }; + let keys = if pairs.is_empty() { vec![] } else { vec![key] }; (pairs, keys) } - RangeType::AllKeys => self - .inner - .iter() - .zip(0..) - .filter_map(|(entry, i)| { - entry.value().map_write(|mut revs| { - Self::gen_del_revision(&mut revs, revision, sub_revision.overflow_add(i)) + KeyRange::Range(r) + if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded => + { + self.inner + .iter() + .zip(0..) + .filter_map(|(entry, i)| { + entry.value().map_write(|mut revs| { + Self::gen_del_revision( + &mut revs, + revision, + sub_revision.overflow_add(i), + ) .map(|pair| (pair, entry.key().clone())) + }) }) - }) - .unzip(), - RangeType::Range => self + .unzip() + } + KeyRange::Range(_) => self .inner - .range(KeyRange::new_etcd(key, range_end)) + .range(keyrange) .zip(0..) .filter_map(|(entry, i)| { entry.value().map_write(|mut revs| { @@ -515,20 +517,23 @@ impl IndexState<'_> { } impl IndexOperate for IndexState<'_> { - fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec { - match RangeType::get_range_type(key, range_end) { - RangeType::OneKey => { - Index::get_revision(&self.one_key_revisions(key, &self.state.lock()), revision) + fn get(&self, keyrange: KeyRange, revision: i64) -> Vec { + match keyrange { + KeyRange::OneKey(key) => { + Index::get_revision(&self.one_key_revisions(&key, &self.state.lock()), revision) .map(|rev| vec![rev]) .unwrap_or_default() } - RangeType::AllKeys => self - .all_key_revisions() - .into_iter() - .filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision)) - .collect(), - RangeType::Range => self - .range_key_revisions(KeyRange::new_etcd(key, range_end)) + KeyRange::Range(r) + if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded => + { + self.all_key_revisions() + .into_iter() + .filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision)) + .collect() + } + KeyRange::Range(_) => self + .range_key_revisions(keyrange) .into_iter() .filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision)) .collect(), @@ -604,20 +609,21 @@ impl IndexOperate for IndexState<'_> { fn delete( &self, - key: &[u8], - range_end: &[u8], + keyrange: KeyRange, revision: i64, sub_revision: i64, ) -> (Vec<(Revision, Revision)>, Vec>) { - let (pairs, keys) = match RangeType::get_range_type(key, range_end) { - RangeType::OneKey => self - .delete_one(key, revision, sub_revision) + let (pairs, keys) = match keyrange { + KeyRange::OneKey(key) => self + .delete_one(&key, revision, sub_revision) .into_iter() .unzip(), - RangeType::AllKeys => self.delete_all(revision, sub_revision), - RangeType::Range => { - self.delete_range(KeyRange::new_etcd(key, range_end), revision, sub_revision) + KeyRange::Range(r) + if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded => + { + self.delete_all(revision, sub_revision) } + KeyRange::Range(_) => self.delete_range(keyrange, revision, sub_revision), }; (pairs, keys) @@ -687,15 +693,21 @@ mod test { fn test_get() { let index = init_and_test_insert(); let txn = index.state(); - assert_eq!(txn.get(b"key", b"", 0), vec![Revision::new(3, 1)]); - assert_eq!(txn.get(b"key", b"", 1), vec![Revision::new(1, 3)]); + assert_eq!( + txn.get(KeyRange::new_one_key("key"), 0), + vec![Revision::new(3, 1)] + ); + assert_eq!( + txn.get(KeyRange::new_one_key("key"), 1), + vec![Revision::new(1, 3)] + ); txn.commit(); assert_eq!( - index.get_from_rev(b"key", b"", 2), + index.get_from_rev(KeyRange::new_one_key("key"), 2), // one key vec![Revision::new(2, 2), Revision::new(3, 1)] ); assert_eq!( - index.get_from_rev(b"a", b"g", 3), + index.get_from_rev(KeyRange::new_etcd("a", "g"), 3), // range vec![ Revision::new(4, 5), Revision::new(5, 4), @@ -706,7 +718,7 @@ mod test { ] ); assert_eq!( - index.get_from_rev(b"\0", b"\0", 3), + index.get_from_rev(KeyRange::new_all_keys(), 3), // all keys vec![ Revision::new(3, 1), Revision::new(4, 5), @@ -725,7 +737,7 @@ mod test { let mut txn = index.state(); assert_eq!( - txn.delete(b"key", b"", 10, 0), + txn.delete(KeyRange::new_one_key("key"), 10, 0), ( vec![(Revision::new(3, 1), Revision::new(10, 0))], vec![b"key".to_vec()] @@ -733,7 +745,7 @@ mod test { ); assert_eq!( - txn.delete(b"a", b"g", 11, 0), + txn.delete(KeyRange::new_etcd("a", "g"), 11, 0), ( vec![ (Revision::new(9, 9), Revision::new(11, 0)), @@ -743,7 +755,10 @@ mod test { ) ); - assert_eq!(txn.delete(b"\0", b"\0", 12, 0), (vec![], vec![])); + assert_eq!( + txn.delete(KeyRange::new_all_keys(), 12, 0), + (vec![], vec![]) + ); txn.commit(); @@ -827,7 +842,7 @@ mod test { let index = init_and_test_insert(); let mut txn = index.state(); - txn.delete(b"a", b"g", 10, 0); + txn.delete(KeyRange::new_etcd("a", "g"), 10, 0); txn.register_revision(b"bar".to_vec(), 11, 0); txn.commit(); diff --git a/crates/xline/src/storage/kv_store.rs b/crates/xline/src/storage/kv_store.rs index fa330dc67..5f1f5ff98 100644 --- a/crates/xline/src/storage/kv_store.rs +++ b/crates/xline/src/storage/kv_store.rs @@ -17,7 +17,7 @@ use utils::table_names::{KV_TABLE, META_TABLE}; use xlineapi::{ command::{CommandResponse, SyncResponse}, execute_error::ExecuteError, - keyrange::{KeyRange, ONE_KEY}, + keyrange::KeyRange, }; use super::{ @@ -102,25 +102,18 @@ impl KvStoreInner { /// Get `KeyValue` of a range /// /// If `range_end` is `&[]`, this function will return one or zero `KeyValue`. - fn get_range( - &self, - key: &[u8], - range_end: &[u8], - revision: i64, - ) -> Result, ExecuteError> { - let revisions = self.index.get(key, range_end, revision); + fn get_range(&self, keyrange: KeyRange, revision: i64) -> Result, ExecuteError> { + let revisions = self.index.get(keyrange, revision); self.get_values(&revisions) } /// Get `KeyValue` start from a revision and convert to `Event` pub(crate) fn get_event_from_revision( &self, - key_range: KeyRange, + keyrange: KeyRange, revision: i64, ) -> Result, ExecuteError> { - let revisions = - self.index - .get_from_rev(key_range.range_start(), key_range.range_end(), revision); + let revisions = self.index.get_from_rev(keyrange, revision); let events: Vec = self .get_values(&revisions)? .into_iter() @@ -146,9 +139,12 @@ impl KvStoreInner { /// Get previous `KeyValue` of a `KeyValue` pub(crate) fn get_prev_kv(&self, kv: &KeyValue) -> Option { - self.get_range(&kv.key, ONE_KEY, kv.mod_revision.overflow_sub(1)) - .ok()? - .pop() + self.get_range( + KeyRange::new_one_key(kv.key.clone()), + kv.mod_revision.overflow_sub(1), + ) + .ok()? + .pop() } /// Get compacted revision of KV store @@ -159,13 +155,12 @@ impl KvStoreInner { /// Get `KeyValue` of a range with limit and count only, return kvs and total count fn get_range_with_opts( &self, - key: &[u8], - range_end: &[u8], + keyrange: KeyRange, revision: i64, limit: usize, count_only: bool, ) -> Result<(Vec, usize), ExecuteError> { - let mut revisions = self.index.get(key, range_end, revision); + let mut revisions = self.index.get(keyrange, revision); let total = revisions.len(); if count_only || total == 0 { return Ok((vec![], total)); @@ -446,7 +441,10 @@ impl KvStore { fn check_compare(&self, cmp: &Compare) -> bool { let kvs = self .inner - .get_range(&cmp.key, &cmp.range_end, 0) + .get_range( + KeyRange::new_etcd(cmp.key.clone(), cmp.range_end.clone()), + 0, + ) .unwrap_or_default(); if kvs.is_empty() { if let Some(TargetUnion::Value(_)) = cmp.target_union { @@ -566,8 +564,7 @@ impl KvStore { req.limit.overflow_add(1) // get one extra for "more" flag }; let (mut kvs, total) = self.inner.get_range_with_opts( - &req.key, - &req.range_end, + KeyRange::new_etcd(req.key.clone(), req.range_end.clone()), req.revision, storage_fetch_limit.numeric_cast(), req.count_only, @@ -611,7 +608,10 @@ impl KvStore { return Err(ExecuteError::LeaseNotFound(req.lease)); }; if req.prev_kv || req.ignore_lease || req.ignore_value { - let prev_kv = self.inner.get_range(&req.key, &[], 0)?.pop(); + let prev_kv = self + .inner + .get_range(KeyRange::new_one_key(req.key.clone()), 0)? + .pop(); if prev_kv.is_none() && (req.ignore_lease || req.ignore_value) { return Err(ExecuteError::KeyNotFound); } @@ -627,7 +627,10 @@ impl KvStore { &self, req: &DeleteRangeRequest, ) -> Result { - let prev_kvs = self.inner.get_range(&req.key, &req.range_end, 0)?; + let prev_kvs = self.inner.get_range( + KeyRange::new_etcd(req.key.clone(), req.range_end.clone()), + 0, + )?; let mut response = DeleteRangeResponse { header: Some(self.header_gen.gen_header()), ..DeleteRangeResponse::default() @@ -795,7 +798,10 @@ impl KvStore { }; if req.ignore_lease || req.ignore_value { let pre_mod_rev = prev_rev.ok_or(ExecuteError::KeyNotFound)?.mod_revision; - let prev_kv = self.inner.get_range(&req.key, &[], pre_mod_rev)?.pop(); + let prev_kv = self + .inner + .get_range(KeyRange::new_one_key(req.key.clone()), pre_mod_rev)? + .pop(); let prev = prev_kv.as_ref().ok_or(ExecuteError::KeyNotFound)?; if req.ignore_lease { kv.lease = prev.lease; @@ -872,8 +878,7 @@ impl KvStore { Self::delete_keys( &self.inner.index, &self.lease_collection, - &req.key, - &req.range_end, + KeyRange::new_etcd(req.key.clone(), req.range_end.clone()), revision, sub_revision, ) @@ -883,13 +888,12 @@ impl KvStore { pub(crate) fn delete_keys<'a>( index: &Index, lease_collection: &LeaseCollection, - key: &[u8], - range_end: &[u8], + keyrange: KeyRange, revision: i64, sub_revision: i64, ) -> (Vec>, Vec) { let mut ops = Vec::new(); - let (revisions, keys) = index.delete(key, range_end, revision, sub_revision); + let (revisions, keys) = index.delete(keyrange, revision, sub_revision); let mut del_ops = Self::mark_deletions(&revisions, &keys); ops.append(&mut del_ops); for k in &keys { @@ -1161,7 +1165,14 @@ mod test { let ops = vec![WriteOp::PutScheduledCompactRevision(8)]; db.write_ops(ops)?; let (store, _rev_gen) = init_store(Arc::clone(&db)).await?; - assert_eq!(store.inner.index.get_from_rev(b"z", b"", 5).len(), 3); + assert_eq!( + store + .inner + .index + .get_from_rev(KeyRange::new_one_key("z"), 5) + .len(), + 3 + ); let new_store = init_empty_store(db); @@ -1181,7 +1192,14 @@ mod test { assert_eq!(res.kvs[0].key, b"a"); assert_eq!(new_store.compacted_revision(), 8); tokio::time::sleep(Duration::from_millis(500)).await; - assert_eq!(new_store.inner.index.get_from_rev(b"z", b"", 5).len(), 2); + assert_eq!( + new_store + .inner + .index + .get_from_rev(KeyRange::new_one_key("z"), 5) + .len(), + 2 + ); Ok(()) } @@ -1261,7 +1279,10 @@ mod test { } }); tokio::time::sleep(std::time::Duration::from_micros(50)).await; - let revs = store.inner.index.get_from_rev(b"foo", b"", 1); + let revs = store + .inner + .index + .get_from_rev(KeyRange::new_one_key("foo"), 1); let kvs = store.inner.get_values(&revs).unwrap(); assert_eq!( kvs.len(), @@ -1272,6 +1293,7 @@ mod test { } #[tokio::test(flavor = "multi_thread")] + #[allow(clippy::too_many_lines)] async fn test_compaction() -> Result<(), ExecuteError> { let db = DB::open(&EngineConfig::Memory)?; let store = init_empty_store(db); @@ -1309,12 +1331,20 @@ mod test { let target_revisions = index_compact(&store, 3); store.compact(target_revisions.as_ref())?; assert_eq!( - store.inner.get_range(b"a", b"", 2).unwrap().len(), + store + .inner + .get_range(KeyRange::new_one_key("a"), 2) + .unwrap() + .len(), 1, "(a, 1) should not be removed" ); assert_eq!( - store.inner.get_range(b"b", b"", 3).unwrap().len(), + store + .inner + .get_range(KeyRange::new_one_key("b"), 3) + .unwrap() + .len(), 1, "(b, 2) should not be removed" ); @@ -1322,16 +1352,28 @@ mod test { let target_revisions = index_compact(&store, 4); store.compact(target_revisions.as_ref())?; assert!( - store.inner.get_range(b"a", b"", 2).unwrap().is_empty(), + store + .inner + .get_range(KeyRange::new_one_key("a"), 2) + .unwrap() + .is_empty(), "(a, 1) should be removed" ); assert_eq!( - store.inner.get_range(b"b", b"", 3).unwrap().len(), + store + .inner + .get_range(KeyRange::new_one_key("b"), 3) + .unwrap() + .len(), 1, "(b, 2) should not be removed" ); assert_eq!( - store.inner.get_range(b"a", b"", 4).unwrap().len(), + store + .inner + .get_range(KeyRange::new_one_key("a"), 4) + .unwrap() + .len(), 1, "(a, 3) should not be removed" ); @@ -1339,20 +1381,36 @@ mod test { let target_revisions = index_compact(&store, 5); store.compact(target_revisions.as_ref())?; assert!( - store.inner.get_range(b"a", b"", 2).unwrap().is_empty(), + store + .inner + .get_range(KeyRange::new_one_key("a"), 2) + .unwrap() + .is_empty(), "(a, 1) should be removed" ); assert_eq!( - store.inner.get_range(b"b", b"", 3).unwrap().len(), + store + .inner + .get_range(KeyRange::new_one_key("b"), 3) + .unwrap() + .len(), 1, "(b, 2) should not be removed" ); assert!( - store.inner.get_range(b"a", b"", 4).unwrap().is_empty(), + store + .inner + .get_range(KeyRange::new_one_key("a"), 4) + .unwrap() + .is_empty(), "(a, 3) should be removed" ); assert!( - store.inner.get_range(b"a", b"", 5).unwrap().is_empty(), + store + .inner + .get_range(KeyRange::new_one_key("a"), 5) + .unwrap() + .is_empty(), "(a, 4) should be removed" ); diff --git a/crates/xline/src/storage/lease_store/mod.rs b/crates/xline/src/storage/lease_store/mod.rs index ed68a15bb..0ba1a2c2d 100644 --- a/crates/xline/src/storage/lease_store/mod.rs +++ b/crates/xline/src/storage/lease_store/mod.rs @@ -24,6 +24,7 @@ use utils::table_names::LEASE_TABLE; use xlineapi::{ command::{CommandResponse, SyncResponse}, execute_error::ExecuteError, + keyrange::KeyRange, }; pub(crate) use self::{lease::Lease, lease_collection::LeaseCollection}; @@ -334,12 +335,11 @@ impl LeaseStore { return Ok(Vec::new()); } - for (key, sub_revision) in del_keys.iter().zip(0..) { + for (key, sub_revision) in del_keys.into_iter().zip(0..) { let (mut del_ops, mut del_event) = KvStore::delete_keys( &self.index, &self.lease_collection, - key, - &[], + KeyRange::new_one_key(key), revision, sub_revision, ); diff --git a/crates/xlineapi/src/keyrange.rs b/crates/xlineapi/src/keyrange.rs index 4edd8aa50..87c6de060 100644 --- a/crates/xlineapi/src/keyrange.rs +++ b/crates/xlineapi/src/keyrange.rs @@ -136,6 +136,15 @@ impl KeyRange { Self::OneKey(key_vec) } + /// New `KeyRange` of all keys + #[inline] + pub fn new_all_keys() -> Self { + Self::Range(Interval::new( + BytesAffine::Bytes(UNBOUNDED.into()), + BytesAffine::Unbounded, + )) + } + /// Construct `KeyRange` directly from [`start`, `end`], both included /// /// # Panics @@ -354,31 +363,6 @@ impl ConflictCheck for KeyRange { } } -/// Type of `KeyRange` -#[derive(Debug)] -pub enum RangeType { - /// `KeyRange` contains only one key - OneKey, - /// `KeyRange` contains all keys - AllKeys, - /// `KeyRange` contains the keys in the range - Range, -} - -impl RangeType { - /// Get `RangeType` by given `key` and `range_end` - #[inline] - pub fn get_range_type(key: &[u8], range_end: &[u8]) -> Self { - if range_end == ONE_KEY { - RangeType::OneKey - } else if key == UNBOUNDED && range_end == UNBOUNDED { - RangeType::AllKeys - } else { - RangeType::Range - } - } -} - #[cfg(test)] mod tests { use super::*;