Skip to content

Commit

Permalink
refactor(xlineapi): use KeyRange instead of directly pass &[u8] to av…
Browse files Browse the repository at this point in the history
…oid clone

Signed-off-by: lxl66566 <[email protected]>
  • Loading branch information
lxl66566 committed Aug 8, 2024
1 parent 94f24e0 commit a7fc0c2
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 154 deletions.
185 changes: 100 additions & 85 deletions crates/xline/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use crossbeam_skiplist::{map::Entry, SkipMap};
use itertools::Itertools;
use parking_lot::{Mutex, RwLock};
use utils::parking_lot_lock::RwLockMap;
use xlineapi::keyrange::{EtcdKeyRange, KeyRange, RangeType, StdBoundRange};
use xlineapi::keyrange::{BytesAffine, EtcdKeyRange, KeyRange, StdBoundRange};

use super::revision::{KeyRevision, Revision};

/// Operations for `Index`
pub(crate) trait IndexOperate {
/// Get `Revision` of keys, get the latest `Revision` when revision <= 0
fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec<Revision>;
fn get(&self, keyrange: KeyRange, revision: i64) -> Vec<Revision>;

/// Register a new `KeyRevision` of the given key
///
Expand All @@ -37,8 +37,7 @@ pub(crate) trait IndexOperate {
/// return all revision pairs and all keys in range
fn delete(
&self,
key: &[u8],
range_end: &[u8],
keyrange: KeyRange,
revision: i64,
sub_revision: i64,
) -> (Vec<(Revision, Revision)>, Vec<Vec<u8>>);
Expand Down Expand Up @@ -134,35 +133,33 @@ impl Index {

impl Index {
/// Get `Revision` of keys from one revision
pub(super) fn get_from_rev(
&self,
key: &[u8],
range_end: &[u8],
revision: i64,
) -> Vec<Revision> {
match RangeType::get_range_type(key, range_end) {
RangeType::OneKey => self
pub(super) fn get_from_rev(&self, keyrange: KeyRange, revision: i64) -> Vec<Revision> {
match keyrange {
KeyRange::OneKey(key) => self
.inner
.get(key)
.get(&key)
.map(|entry| {
entry
.value()
.map_read(|revs| Self::filter_revision(revs.as_ref(), revision))
})
.unwrap_or_default(),
RangeType::AllKeys => self
.inner
.iter()
.flat_map(|entry| {
entry
.value()
.map_read(|revs| Self::filter_revision(revs.as_ref(), revision))
})
.sorted()
.collect(),
RangeType::Range => self
KeyRange::Range(r)
if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded =>
{
self.inner
.iter()
.flat_map(|entry| {
entry
.value()
.map_read(|revs| Self::filter_revision(revs.as_ref(), revision))
})
.sorted()
.collect()
}
KeyRange::Range(_) => self
.inner
.range(KeyRange::new_etcd(key, range_end))
.range(keyrange)
.flat_map(|entry| {
entry
.value()
Expand Down Expand Up @@ -264,22 +261,25 @@ where
}

impl IndexOperate for Index {
fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec<Revision> {
match RangeType::get_range_type(key, range_end) {
RangeType::OneKey => self
fn get(&self, keyrange: KeyRange, revision: i64) -> Vec<Revision> {
match keyrange {
KeyRange::OneKey(key) => self
.inner
.get(key)
.get(&key)
.and_then(fmap_value(|revs| Index::get_revision(revs, revision)))
.map(|rev| vec![rev])
.unwrap_or_default(),
RangeType::AllKeys => self
.inner
.iter()
.filter_map(fmap_value(|revs| Index::get_revision(revs, revision)))
.collect(),
RangeType::Range => self
KeyRange::Range(r)
if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded =>
{
self.inner
.iter()
.filter_map(fmap_value(|revs| Index::get_revision(revs, revision)))
.collect()
}
KeyRange::Range(_) => self
.inner
.range(KeyRange::new_etcd(key, range_end))
.range(keyrange)
.filter_map(fmap_value(|revs| Index::get_revision(revs, revision)))
.collect(),
}
Expand Down Expand Up @@ -338,42 +338,44 @@ impl IndexOperate for Index {

fn delete(
&self,
key: &[u8],
range_end: &[u8],
keyrange: KeyRange,
revision: i64,
sub_revision: i64,
) -> (Vec<(Revision, Revision)>, Vec<Vec<u8>>) {
let (pairs, keys) = match RangeType::get_range_type(key, range_end) {
RangeType::OneKey => {
let (pairs, keys) = match keyrange {
KeyRange::OneKey(key) => {
let pairs: Vec<(Revision, Revision)> = self
.inner
.get(key)
.get(&key)
.into_iter()
.filter_map(fmap_value_mut(|revs| {
Self::gen_del_revision(revs, revision, sub_revision)
}))
.collect();
let keys = if pairs.is_empty() {
vec![]
} else {
vec![key.to_vec()]
};
let keys = if pairs.is_empty() { vec![] } else { vec![key] };
(pairs, keys)
}
RangeType::AllKeys => self
.inner
.iter()
.zip(0..)
.filter_map(|(entry, i)| {
entry.value().map_write(|mut revs| {
Self::gen_del_revision(&mut revs, revision, sub_revision.overflow_add(i))
KeyRange::Range(r)
if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded =>
{
self.inner
.iter()
.zip(0..)
.filter_map(|(entry, i)| {
entry.value().map_write(|mut revs| {
Self::gen_del_revision(
&mut revs,
revision,
sub_revision.overflow_add(i),
)
.map(|pair| (pair, entry.key().clone()))
})
})
})
.unzip(),
RangeType::Range => self
.unzip()
}
KeyRange::Range(_) => self
.inner
.range(KeyRange::new_etcd(key, range_end))
.range(keyrange)
.zip(0..)
.filter_map(|(entry, i)| {
entry.value().map_write(|mut revs| {
Expand Down Expand Up @@ -515,20 +517,23 @@ impl IndexState<'_> {
}

impl IndexOperate for IndexState<'_> {
fn get(&self, key: &[u8], range_end: &[u8], revision: i64) -> Vec<Revision> {
match RangeType::get_range_type(key, range_end) {
RangeType::OneKey => {
Index::get_revision(&self.one_key_revisions(key, &self.state.lock()), revision)
fn get(&self, keyrange: KeyRange, revision: i64) -> Vec<Revision> {
match keyrange {
KeyRange::OneKey(key) => {
Index::get_revision(&self.one_key_revisions(&key, &self.state.lock()), revision)
.map(|rev| vec![rev])
.unwrap_or_default()
}
RangeType::AllKeys => self
.all_key_revisions()
.into_iter()
.filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision))
.collect(),
RangeType::Range => self
.range_key_revisions(KeyRange::new_etcd(key, range_end))
KeyRange::Range(r)
if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded =>
{
self.all_key_revisions()
.into_iter()
.filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision))
.collect()
}
KeyRange::Range(_) => self
.range_key_revisions(keyrange)
.into_iter()
.filter_map(|(_, revs)| Index::get_revision(revs.as_ref(), revision))
.collect(),
Expand Down Expand Up @@ -604,20 +609,21 @@ impl IndexOperate for IndexState<'_> {

fn delete(
&self,
key: &[u8],
range_end: &[u8],
keyrange: KeyRange,
revision: i64,
sub_revision: i64,
) -> (Vec<(Revision, Revision)>, Vec<Vec<u8>>) {
let (pairs, keys) = match RangeType::get_range_type(key, range_end) {
RangeType::OneKey => self
.delete_one(key, revision, sub_revision)
let (pairs, keys) = match keyrange {
KeyRange::OneKey(key) => self
.delete_one(&key, revision, sub_revision)
.into_iter()
.unzip(),
RangeType::AllKeys => self.delete_all(revision, sub_revision),
RangeType::Range => {
self.delete_range(KeyRange::new_etcd(key, range_end), revision, sub_revision)
KeyRange::Range(r)
if r.low == BytesAffine::Unbounded && r.high == BytesAffine::Unbounded =>
{
self.delete_all(revision, sub_revision)
}
KeyRange::Range(_) => self.delete_range(keyrange, revision, sub_revision),
};

(pairs, keys)
Expand Down Expand Up @@ -687,15 +693,21 @@ mod test {
fn test_get() {
let index = init_and_test_insert();
let txn = index.state();
assert_eq!(txn.get(b"key", b"", 0), vec![Revision::new(3, 1)]);
assert_eq!(txn.get(b"key", b"", 1), vec![Revision::new(1, 3)]);
assert_eq!(
txn.get(KeyRange::new_one_key("key"), 0),
vec![Revision::new(3, 1)]
);
assert_eq!(
txn.get(KeyRange::new_one_key("key"), 1),
vec![Revision::new(1, 3)]
);
txn.commit();
assert_eq!(
index.get_from_rev(b"key", b"", 2),
index.get_from_rev(KeyRange::new_one_key("key"), 2), // one key
vec![Revision::new(2, 2), Revision::new(3, 1)]
);
assert_eq!(
index.get_from_rev(b"a", b"g", 3),
index.get_from_rev(KeyRange::new_etcd("a", "g"), 3), // range
vec![
Revision::new(4, 5),
Revision::new(5, 4),
Expand All @@ -706,7 +718,7 @@ mod test {
]
);
assert_eq!(
index.get_from_rev(b"\0", b"\0", 3),
index.get_from_rev(KeyRange::new_all_keys(), 3), // all keys
vec![
Revision::new(3, 1),
Revision::new(4, 5),
Expand All @@ -725,15 +737,15 @@ mod test {
let mut txn = index.state();

assert_eq!(
txn.delete(b"key", b"", 10, 0),
txn.delete(KeyRange::new_one_key("key"), 10, 0),
(
vec![(Revision::new(3, 1), Revision::new(10, 0))],
vec![b"key".to_vec()]
)
);

assert_eq!(
txn.delete(b"a", b"g", 11, 0),
txn.delete(KeyRange::new_etcd("a", "g"), 11, 0),
(
vec![
(Revision::new(9, 9), Revision::new(11, 0)),
Expand All @@ -743,7 +755,10 @@ mod test {
)
);

assert_eq!(txn.delete(b"\0", b"\0", 12, 0), (vec![], vec![]));
assert_eq!(
txn.delete(KeyRange::new_all_keys(), 12, 0),
(vec![], vec![])
);

txn.commit();

Expand Down Expand Up @@ -827,7 +842,7 @@ mod test {
let index = init_and_test_insert();
let mut txn = index.state();

txn.delete(b"a", b"g", 10, 0);
txn.delete(KeyRange::new_etcd("a", "g"), 10, 0);
txn.register_revision(b"bar".to_vec(), 11, 0);
txn.commit();

Expand Down
Loading

0 comments on commit a7fc0c2

Please sign in to comment.