diff --git a/utils/src/config.rs b/utils/src/config.rs index 474225f07..771387815 100644 --- a/utils/src/config.rs +++ b/utils/src/config.rs @@ -470,7 +470,7 @@ pub mod level_format { use crate::parse_log_level; /// deserializes a cluster duration - #[allow(single_use_lifetimes)] // TODO: Think is it necessary to allow this clippy?? + #[allow(single_use_lifetimes)] pub(crate) fn deserialize<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, diff --git a/xline/src/storage/index.rs b/xline/src/storage/index.rs index 33fb259c2..9ad5b67b0 100644 --- a/xline/src/storage/index.rs +++ b/xline/src/storage/index.rs @@ -68,10 +68,10 @@ impl Index { revision: i64, sub_revision: i64, ) -> Option<(Revision, Revision)> { - let last_rev = Self::get_revision(revs, 0)?; + let last_available_rev = Self::get_revision(revs, 0)?; let del_rev = KeyRevision::new_deletion(revision, sub_revision); revs.push(del_rev); - Some((last_rev, del_rev.as_revision())) + Some((last_available_rev, del_rev.as_revision())) } /// Mark the `KeyRevision` as available @@ -129,7 +129,10 @@ pub(super) trait IndexOperate { version: i64, ); - // TODO: fn compact(rev:i64) + /// Compact a `KeyRevision` by removing the versions with smaller or equal + /// revision than the given atRev except the largest one (If the largest one is + /// a tombstone, it will not be kept). + fn compact(&self, at_rev: i64) -> Vec; } impl IndexOperate for Index { @@ -287,6 +290,58 @@ impl IndexOperate for Index { .or_insert_with(Vec::new) .push(new_rev); } + + fn compact(&self, at_rev: i64) -> Vec { + let mut revs = Vec::new(); + let mut del_keys = Vec::new(); + + let mut inner = self.inner.lock(); + inner.index.iter_mut().for_each(|(key, revisions)| { + if let Some(revision) = revisions.first() { + if revision.mod_revision < at_rev { + match revisions.binary_search_by(|rev| rev.mod_revision.cmp(&at_rev)) { + Ok(idx) => { + let key_rev = revisions.get(idx).unwrap_or_else(|| { + unreachable!( + "{idx} is out of range, len of revisions is {}", + revisions.len() + ) + }); + let compact_revs = if key_rev.is_deleted() { + revisions.drain(..=idx) + } else { + revisions.drain(..idx) + }; + revs.extend(compact_revs.into_iter()); + } + Err(idx) => { + let compacted_last_idx = idx.overflow_sub(1); + let key_rev = revisions.get(compacted_last_idx).unwrap_or_else(|| { + unreachable!( + "{idx} is out of range, len of revisions is {}", + revisions.len() + ) + }); + let compact_revs = if key_rev.is_deleted() { + revisions.drain(..=compacted_last_idx) + } else { + revisions.drain(..compacted_last_idx) + }; + revs.extend(compact_revs.into_iter()); + } + } + if revisions.is_empty() { + del_keys.push(key.clone()); + } + } + } + }); + + for key in del_keys { + let _ignore = inner.index.remove(&key); + } + revs + } } #[cfg(test)] @@ -309,12 +364,20 @@ mod test { index.insert_or_update_revision(b"key", 3, 1); index.insert_or_update_revision(b"foo", 4, 5); index.insert_or_update_revision(b"bar", 5, 4); + index.insert_or_update_revision(b"foo", 6, 6); + index.insert_or_update_revision(b"bar", 7, 7); + index.insert_or_update_revision(b"foo", 8, 8); + index.insert_or_update_revision(b"bar", 9, 9); index.mark_available(1); index.mark_available(2); index.mark_available(3); index.mark_available(4); index.mark_available(5); + index.mark_available(6); + index.mark_available(7); + index.mark_available(8); + index.mark_available(9); assert_eq!( index.inner.lock().index, @@ -329,11 +392,19 @@ mod test { ), ( b"foo".to_vec(), - vec![KeyRevision::new(4, 1, 4, 5).mark_available()] + vec![ + KeyRevision::new(4, 1, 4, 5).mark_available(), + KeyRevision::new(4, 2, 6, 6).mark_available(), + KeyRevision::new(4, 3, 8, 8).mark_available() + ] ), ( b"bar".to_vec(), - vec![KeyRevision::new(5, 1, 5, 4).mark_available()] + vec![ + KeyRevision::new(5, 1, 5, 4).mark_available(), + KeyRevision::new(5, 2, 7, 7).mark_available(), + KeyRevision::new(5, 3, 9, 9).mark_available(), + ] ) ]) ); @@ -352,14 +423,25 @@ mod test { ); assert_eq!( index.get_from_rev(b"a", b"g", 3), - vec![Revision::new(4, 5), Revision::new(5, 4)] + vec![ + Revision::new(4, 5), + Revision::new(5, 4), + Revision::new(6, 6), + Revision::new(7, 7), + Revision::new(8, 8), + Revision::new(9, 9) + ] ); assert_eq!( index.get_from_rev(b"\0", b"\0", 3), vec![ Revision::new(3, 1), Revision::new(4, 5), - Revision::new(5, 4) + Revision::new(5, 4), + Revision::new(6, 6), + Revision::new(7, 7), + Revision::new(8, 8), + Revision::new(9, 9) ] ); } @@ -369,27 +451,27 @@ mod test { let index = init_and_test_insert(); assert_eq!( - index.delete(b"key", b"", 6, 0), + index.delete(b"key", b"", 10, 0), ( - vec![(Revision::new(3, 1), Revision::new(6, 0))], + vec![(Revision::new(3, 1), Revision::new(10, 0))], vec![b"key".to_vec()] ) ); - index.mark_available(6); + index.mark_available(10); assert_eq!( - index.delete(b"a", b"g", 7, 0), + index.delete(b"a", b"g", 11, 0), ( vec![ - (Revision::new(5, 4), Revision::new(7, 0)), - (Revision::new(4, 5), Revision::new(7, 1)), + (Revision::new(9, 9), Revision::new(11, 0)), + (Revision::new(8, 8), Revision::new(11, 1)), ], vec![b"bar".to_vec(), b"foo".to_vec()] ) ); - index.mark_available(7); + index.mark_available(11); - assert_eq!(index.delete(b"\0", b"\0", 8, 0), (vec![], vec![])); + assert_eq!(index.delete(b"\0", b"\0", 12, 0), (vec![], vec![])); assert_eq!( index.inner.lock().index, @@ -400,21 +482,25 @@ mod test { KeyRevision::new(1, 1, 1, 3).mark_available(), KeyRevision::new(1, 2, 2, 2).mark_available(), KeyRevision::new(1, 3, 3, 1).mark_available(), - KeyRevision::new_deletion(6, 0).mark_available() + KeyRevision::new_deletion(10, 0).mark_available() ] ), ( b"foo".to_vec(), vec![ KeyRevision::new(4, 1, 4, 5).mark_available(), - KeyRevision::new_deletion(7, 1).mark_available() + KeyRevision::new(4, 2, 6, 6).mark_available(), + KeyRevision::new(4, 3, 8, 8).mark_available(), + KeyRevision::new_deletion(11, 1).mark_available() ] ), ( b"bar".to_vec(), vec![ KeyRevision::new(5, 1, 5, 4).mark_available(), - KeyRevision::new_deletion(7, 0).mark_available() + KeyRevision::new(5, 2, 7, 7).mark_available(), + KeyRevision::new(5, 3, 9, 9).mark_available(), + KeyRevision::new_deletion(11, 0).mark_available() ] ) ]) @@ -444,4 +530,82 @@ mod test { ]) ); } + + #[test] + fn test_compact() { + let index = init_and_test_insert(); + let res = index.compact(7); + assert_eq!( + index.inner.lock().index, + BTreeMap::from_iter(vec![ + ( + b"key".to_vec(), + vec![KeyRevision::new(1, 3, 3, 1).mark_available(),] + ), + ( + b"foo".to_vec(), + vec![ + KeyRevision::new(4, 2, 6, 6).mark_available(), + KeyRevision::new(4, 3, 8, 8).mark_available() + ] + ), + ( + b"bar".to_vec(), + vec![ + KeyRevision::new(5, 2, 7, 7).mark_available(), + KeyRevision::new(5, 3, 9, 9).mark_available(), + ] + ) + ]) + ); + assert_eq!( + res, + vec![ + KeyRevision::new(5, 1, 5, 4).mark_available(), + KeyRevision::new(4, 1, 4, 5).mark_available(), + KeyRevision::new(1, 1, 1, 3).mark_available(), + KeyRevision::new(1, 2, 2, 2).mark_available(), + ] + ); + } + + #[test] + fn test_compact_with_deletion() { + let index = init_and_test_insert(); + index.delete(b"a", b"g", 10, 0); + index.mark_available(10); + + index.insert_or_update_revision(b"bar", 11, 0); + index.mark_available(11); + + let res = index.compact(10); + assert_eq!( + index.inner.lock().index, + BTreeMap::from_iter(vec![ + ( + b"key".to_vec(), + vec![KeyRevision::new(1, 3, 3, 1).mark_available(),] + ), + ( + b"bar".to_vec(), + vec![KeyRevision::new(11, 1, 11, 0).mark_available(),] + ) + ]) + ); + assert_eq!( + res, + vec![ + KeyRevision::new(5, 1, 5, 4).mark_available(), + KeyRevision::new(5, 2, 7, 7).mark_available(), + KeyRevision::new(5, 3, 9, 9).mark_available(), + KeyRevision::new(0, 0, 10, 0).mark_available(), + KeyRevision::new(4, 1, 4, 5).mark_available(), + KeyRevision::new(4, 2, 6, 6).mark_available(), + KeyRevision::new(4, 3, 8, 8).mark_available(), + KeyRevision::new(0, 0, 10, 1).mark_available(), + KeyRevision::new(1, 1, 1, 3).mark_available(), + KeyRevision::new(1, 2, 2, 2).mark_available(), + ] + ); + } }