From 0438947a8d348d7214f5868f99b00b8a0e6a983d Mon Sep 17 00:00:00 2001 From: arriqaaq Date: Fri, 31 Jan 2025 09:27:40 +0530 Subject: [PATCH 1/3] allow storing single values --- .github/workflows/ci.yml | 8 - .github/workflows/release.yml | 8 + benches/vart_bench.rs | 96 +++- src/art.rs | 284 +++++++---- src/iter.rs | 5 +- src/lib.rs | 2 +- src/node.rs | 328 +++++-------- src/version.rs | 879 ++++++++++++++++++++++++++++++++++ 8 files changed, 1301 insertions(+), 309 deletions(-) create mode 100644 src/version.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 551b8bf..2aa82fa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,11 +57,3 @@ jobs: - name: Format run: cargo fmt --all -- --check - - semver: - name: semver - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - name: Check semver - uses: obi1kenobi/cargo-semver-checks-action@v2 \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4fb4a51..750e27f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -6,6 +6,14 @@ on: workflow_dispatch: jobs: + semver: + name: semver + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Check semver + uses: obi1kenobi/cargo-semver-checks-action@v2 + release: name: Process Release runs-on: ubuntu-latest diff --git a/benches/vart_bench.rs b/benches/vart_bench.rs index 96f65cc..66206ab 100644 --- a/benches/vart_bench.rs +++ b/benches/vart_bench.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::str::FromStr; use std::time::Instant; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; @@ -6,7 +7,7 @@ use rand::prelude::SliceRandom; use rand::SeedableRng; use rand::{rngs::StdRng, Rng}; -use vart::art::Tree; +use vart::art::{QueryType, Tree}; use vart::{FixedSizeKey, VariableSizeKey}; fn seeded_rng(alter: u64) -> impl Rng { @@ -305,7 +306,7 @@ fn variable_size_bulk_insert_mut(c: &mut Criterion) { // Test different combinations of key sizes, value sizes, and number of keys let key_sizes = vec![16, 32, 64, 128]; - let num_keys_list = vec![100_000, 500_000, 1_000_000]; + let num_keys_list = vec![100_000]; for &num_keys in &num_keys_list { for &key_size in &key_sizes { @@ -330,6 +331,92 @@ fn variable_size_bulk_insert_mut(c: &mut Criterion) { group.finish(); } +fn setup_benchmark_tree(num_keys: usize, versions_per_key: usize) -> Tree { + let mut tree = Tree::::new(); + + for key_idx in 0..num_keys { + let key = VariableSizeKey::from_str(&format!("key_{}", key_idx)).unwrap(); + for version in 0..versions_per_key { + let value = (key_idx * versions_per_key + version) as i32; + let ts = version as u64; + tree.insert_unchecked(&key, value, version as u64, ts) + .unwrap(); + } + } + + tree +} + +fn bench_query_types(c: &mut Criterion) { + let num_keys = 100; + let versions_per_key = 10_000; + let tree = setup_benchmark_tree(num_keys, versions_per_key); + let test_key = VariableSizeKey::from_str("key_50").unwrap(); + let mid_ts = (versions_per_key / 2) as u64; + + let mut group = c.benchmark_group("tree_queries"); + group.sample_size(10); // Adjust based on your needs + group.measurement_time(std::time::Duration::from_secs(5)); + + // Benchmark LatestByVersion + group.bench_with_input( + BenchmarkId::new("LatestByVersion", mid_ts), + &mid_ts, + |b, &ts| { + b.iter(|| black_box(tree.get_value_by_query(&test_key, QueryType::LatestByVersion(ts)))) + }, + ); + + // Benchmark LatestByTs + group.bench_with_input(BenchmarkId::new("LatestByTs", mid_ts), &mid_ts, |b, &ts| { + b.iter(|| black_box(tree.get_value_by_query(&test_key, QueryType::LatestByTs(ts)))) + }); + + // Benchmark LastLessThanTs + group.bench_with_input( + BenchmarkId::new("LastLessThanTs", mid_ts), + &mid_ts, + |b, &ts| { + b.iter(|| black_box(tree.get_value_by_query(&test_key, QueryType::LastLessThanTs(ts)))) + }, + ); + + // Benchmark LastLessOrEqualTs + group.bench_with_input( + BenchmarkId::new("LastLessOrEqualTs", mid_ts), + &mid_ts, + |b, &ts| { + b.iter(|| { + black_box(tree.get_value_by_query(&test_key, QueryType::LastLessOrEqualTs(ts))) + }) + }, + ); + + // Benchmark FirstGreaterThanTs + group.bench_with_input( + BenchmarkId::new("FirstGreaterThanTs", mid_ts), + &mid_ts, + |b, &ts| { + b.iter(|| { + black_box(tree.get_value_by_query(&test_key, QueryType::FirstGreaterThanTs(ts))) + }) + }, + ); + + // Benchmark FirstGreaterOrEqualTs + group.bench_with_input( + BenchmarkId::new("FirstGreaterOrEqualTs", mid_ts), + &mid_ts, + |b, &ts| { + b.iter(|| { + black_box(tree.get_value_by_query(&test_key, QueryType::FirstGreaterOrEqualTs(ts))) + }) + }, + ); + + group.finish(); +} + criterion_group!(delete_benches, seq_delete, rand_delete); criterion_group!( insert_benches, @@ -342,10 +429,13 @@ criterion_group!( criterion_group!(read_benches, seq_get, rand_get, rand_get_str); criterion_group!(iter_benches, iter_benchmark); criterion_group!(range_benches, range_benchmark); +criterion_group!(ts_benches, bench_query_types); + criterion_main!( insert_benches, read_benches, delete_benches, iter_benches, - range_benches + range_benches, + ts_benches ); diff --git a/src/art.rs b/src/art.rs index b6fcc7f..ec2b313 100644 --- a/src/art.rs +++ b/src/art.rs @@ -6,7 +6,9 @@ use std::sync::Arc; use crate::iter::IterItem; use crate::iter::{scan_node, Iter, Range}; -use crate::node::{FlatNode, LeafValue, Node256, Node48, NodeTrait, TwigNode}; +use crate::node::{FlatNode, Node256, Node48, NodeTrait, TwigNode}; +use crate::version::LeafValue; +use crate::version::LeavesType; use crate::{KeyTrait, TrieError}; // Minimum and maximum number of children for Node4 @@ -144,9 +146,16 @@ impl Node { /// Returns a new `Node` instance with a Twig node containing the provided key, value, and version. /// #[inline] - pub(crate) fn new_twig(prefix: P, key: P, value: V, version: u64, ts: u64) -> Node { + pub(crate) fn new_twig( + prefix: P, + key: P, + value: V, + version: u64, + ts: u64, + leaves_type: LeavesType, + ) -> Node { // Create a new TwigNode instance using the provided prefix and key. - let mut twig = TwigNode::new(prefix, key); + let mut twig = TwigNode::new(prefix, key, leaves_type); // Insert the provided value into the TwigNode along with the version. twig.insert_mut(value, version, ts); @@ -796,7 +805,7 @@ impl Node { commit_version: u64, ts: u64, depth: usize, - replace: bool, + leaves_type: LeavesType, ) -> NodeArc { let (key_prefix, new_prefix, shared_prefix, is_prefix_match, shared_prefix_length) = Self::common_insert_logic(cur_node.prefix(), key, depth); @@ -814,6 +823,7 @@ impl Node { value, commit_version, ts, + leaves_type, ); let mut n4 = Node::new_node4(shared_prefix); @@ -831,7 +841,7 @@ impl Node { // If the current node is a Twig node and the prefixes match up to the end of both prefixes, // update the existing value in the Twig node. if let NodeType::Twig(twig) = &cur_node.node_type { - let new_twig = twig.insert_or_replace(value, commit_version, ts, replace); + let new_twig = twig.insert(value, commit_version, ts); Arc::new(Node { node_type: NodeType::Twig(new_twig), }) @@ -839,10 +849,13 @@ impl Node { // If the current node is an inner node, then either insert the new value // in its existing inner Twig node, or create new one. let leaf = match cur_node.get_inner_twig() { - Some(twig) => twig.insert_or_replace(value, commit_version, ts, replace), + Some(twig) => twig.insert(value, commit_version, ts), None => { - let mut new_twig = - TwigNode::new(cur_node.prefix().clone(), key.as_slice().into()); + let mut new_twig = TwigNode::new( + cur_node.prefix().clone(), + key.as_slice().into(), + leaves_type, + ); new_twig.insert_mut(value, commit_version, ts); new_twig } @@ -867,8 +880,14 @@ impl Node { if let NodeType::Twig(twig) = &cur_node.node_type { let mut n4 = Node::new_node4(shared_prefix); n4.set_inner_twig(twig.clone()); - let new_twig = - Node::new_twig(new_prefix, key.clone(), value, commit_version, ts); + let new_twig = Node::new_twig( + new_prefix, + key.clone(), + value, + commit_version, + ts, + leaves_type, + ); n4.add_child_mut(k, new_twig); Arc::new(n4) } else { @@ -881,7 +900,7 @@ impl Node { commit_version, ts, depth + shared_prefix_length, - replace, + leaves_type, ); let new_node = cur_node.replace_child(k, new_child); return Arc::new(new_node); @@ -894,6 +913,7 @@ impl Node { value, commit_version, ts, + leaves_type, ); let new_node = cur_node.add_child(k, new_twig); Arc::new(new_node) @@ -908,7 +928,7 @@ impl Node { // Current node is also replaced by a new Node4, but this time its prefix is // adjusted and it becomes the Node4's child, while the new Twig node becomes // Node4's inner Twig. - let mut inner_twig = TwigNode::new(key_prefix.into(), key.clone()); + let mut inner_twig = TwigNode::new(key_prefix.into(), key.clone(), leaves_type); inner_twig.insert_mut(value, commit_version, ts); let old_node_key = new_prefix.at(0); let mut old_node = cur_node.clone_node(); @@ -929,7 +949,7 @@ impl Node { commit_version: u64, ts: u64, depth: usize, - replace: bool, + leaves_type: LeavesType, ) { let (key_prefix, new_prefix, shared_prefix, is_prefix_match, shared_prefix_length) = Self::common_insert_logic(cur_node.prefix(), key, depth); @@ -952,6 +972,7 @@ impl Node { value, commit_version, ts, + leaves_type, ); cur_node.add_child_mut(k1, old_node); cur_node.add_child_mut(k2, new_twig); @@ -969,31 +990,20 @@ impl Node { // If the current node is a Twig node and the prefixes match up to the // end of both prefixes, update the existing value in the Twig node. if let NodeType::Twig(ref mut twig) = &mut cur_node.node_type { - if replace { - // Only replace if the provided value is more recent than - // the existing ones. This is important because this method - // is used for loading the index in SurrealKV and thus must - // be able to handle segments left by an unfinished compaction - // where older versions can end up in more recent segments - // after the newer versions. - twig.replace_if_newer_mut(value, commit_version, ts); - } else { - twig.insert_mut(value, commit_version, ts); - } + twig.insert_mut(value, commit_version, ts); } else { // If the current node is an inner node, then either insert the new value // in its existing inner Twig node, or create new one. match cur_node.get_inner_twig_mut() { Some(twig) => { - if replace { - twig.replace_if_newer_mut(value, commit_version, ts); - } else { - twig.insert_mut(value, commit_version, ts); - } + twig.insert_mut(value, commit_version, ts); } None => { - let mut new_twig = - TwigNode::new(cur_node.prefix().clone(), key.as_slice().into()); + let mut new_twig = TwigNode::new( + cur_node.prefix().clone(), + key.as_slice().into(), + leaves_type, + ); new_twig.insert_mut(value, commit_version, ts); cur_node.set_inner_twig(new_twig); } @@ -1018,8 +1028,14 @@ impl Node { NodeType::Twig(n) => cur_node.set_inner_twig(n), _ => panic!("must be Twig"), } - let new_twig = - Node::new_twig(new_prefix, key.clone(), value, commit_version, ts); + let new_twig = Node::new_twig( + new_prefix, + key.clone(), + value, + commit_version, + ts, + leaves_type, + ); cur_node.add_child_mut(k, new_twig); } else { // Case 2b2: Continue traversal with existing child @@ -1031,7 +1047,7 @@ impl Node { commit_version, ts, depth + shared_prefix_length, - replace, + leaves_type, ); return; } @@ -1044,6 +1060,7 @@ impl Node { value, commit_version, ts, + leaves_type, ); cur_node.add_child_mut(k, new_twig); } @@ -1060,7 +1077,7 @@ impl Node { // Node4's inner Twig. let n4 = Node::new_node4(key_prefix.into()); let mut old_node = std::mem::replace(cur_node, n4); - let mut inner_twig = TwigNode::new(key_prefix.into(), key.clone()); + let mut inner_twig = TwigNode::new(key_prefix.into(), key.clone(), leaves_type); inner_twig.insert_mut(value, commit_version, ts); cur_node.set_inner_twig(inner_twig); let old_node_key = new_prefix.at(0); @@ -1154,6 +1171,7 @@ pub struct Tree { pub(crate) root: Option>>, pub size: usize, pub version: u64, + pub leaves_type: LeavesType, } // A type alias for a node reference. @@ -1185,6 +1203,7 @@ impl Clone for Tree { root: self.root.as_ref().cloned(), size: self.size, version: self.version, + leaves_type: self.leaves_type, } } } @@ -1195,6 +1214,16 @@ impl Tree { root: None, size: 0, version: 0, + leaves_type: LeavesType::Vector, + } + } + + pub fn with_leaves_type(leaves_type: LeavesType) -> Self { + Tree { + root: None, + size: 0, + version: 0, + leaves_type, } } @@ -1215,7 +1244,6 @@ impl Tree { version: u64, ts: u64, check_version: bool, - replace: bool, ) -> Result<(), TrieError> { let new_root = match &self.root { None => { @@ -1226,6 +1254,7 @@ impl Tree { value, commit_version, ts, + self.leaves_type, )) } Some(root) => { @@ -1236,7 +1265,7 @@ impl Tree { } else if check_version && curr_version > version { return Err(TrieError::VersionIsOld); } - Node::insert_recurse(root, key, value, commit_version, ts, 0, replace) + Node::insert_recurse(root, key, value, commit_version, ts, 0, self.leaves_type) } }; @@ -1254,7 +1283,6 @@ impl Tree { version: u64, ts: u64, check_version: bool, - replace: bool, ) -> Result<(), TrieError> { if let Some(root_arc) = self.root.as_mut() { let curr_version = self.version; @@ -1265,7 +1293,7 @@ impl Tree { return Err(TrieError::VersionIsOld); } if let Some(root) = Arc::get_mut(root_arc) { - Node::insert_recurse_mut(root, key, value, commit_version, ts, 0, replace) + Node::insert_recurse_mut(root, key, value, commit_version, ts, 0, self.leaves_type) } else { return Err(TrieError::RootIsNotUniquelyOwned); } @@ -1277,6 +1305,7 @@ impl Tree { value, commit_version, ts, + self.leaves_type, ))); } self.size += 1; @@ -1311,32 +1340,7 @@ impl Tree { /// /// Returns an error if the given version is older than the root's current version. pub fn insert(&mut self, key: &P, value: V, version: u64, ts: u64) -> Result<(), TrieError> { - self.insert_common(key, value, version, ts, true, false) - } - - /// Inserts or replaces a key-value pair in the trie. - /// - /// This function inserts a new key-value pair into the trie or replaces the existing value - /// if the key already exists. It ensures that the insertion is checked, meaning it will - /// validate the keys are inserted in increasing order of version numbers. - /// - /// # Parameters - /// - `key`: A reference to the key to be inserted or replaced. - /// - `value`: The value to be associated with the key. - /// - `version`: The version number for the key-value pair. - /// - `ts`: The timestamp for the key-value pair. - /// - /// # Returns - /// - `Result<(), TrieError>`: Returns `Ok(())` if the insertion or replacement is successful, - /// or a `TrieError` if an error occurs during the operation. - pub fn insert_or_replace( - &mut self, - key: &P, - value: V, - version: u64, - ts: u64, - ) -> Result<(), TrieError> { - self.insert_common(key, value, version, ts, true, true) + self.insert_common(key, value, version, ts, true) } /// Inserts a key-value pair into the trie without checking for existing keys. @@ -1366,33 +1370,7 @@ impl Tree { version: u64, ts: u64, ) -> Result<(), TrieError> { - self.insert_common_mut(key, value, version, ts, false, false) - } - - /// Inserts or replaces a key-value pair in the trie without checking for existing keys. - /// - /// This function inserts a new key-value pair into the trie or replaces the existing value - /// if the key already exists. It is an unchecked insertion, meaning it will not check if the versions - /// are incremental during insertion. This can be faster but may lead to inconsistencies - /// if not used carefully. - /// - /// # Parameters - /// - `key`: A reference to the key to be inserted or replaced. - /// - `value`: The value to be associated with the key. - /// - `version`: The version number for the key-value pair. - /// - `ts`: The timestamp for the key-value pair. - /// - /// # Returns - /// - `Result<(), TrieError>`: Returns `Ok(())` if the insertion or replacement is successful, - /// or a `TrieError` if an error occurs during the operation. - pub fn insert_or_replace_unchecked( - &mut self, - key: &P, - value: V, - version: u64, - ts: u64, - ) -> Result<(), TrieError> { - self.insert_common_mut(key, value, version, ts, false, true) + self.insert_common_mut(key, value, version, ts, false) } /// Removes the key from the current node or one of its child nodes @@ -1739,10 +1717,12 @@ impl Tree { mod tests { use super::Tree; use crate::art::QueryType; - use crate::{FixedSizeKey, VariableSizeKey}; + use crate::version::LeavesType; + use crate::{FixedSizeKey, KeyTrait, VariableSizeKey}; use rand::{seq::SliceRandom, thread_rng, Rng}; use std::ops::RangeFull; use std::str::FromStr; + use std::time::Instant; use rand::distributions::Alphanumeric; use std::fs::File; @@ -3288,11 +3268,11 @@ mod tests { #[test] fn test_insert_or_replace() { - let mut tree: Tree = Tree::new(); + let mut tree: Tree = Tree::with_leaves_type(LeavesType::Single); let key = VariableSizeKey::from_str("key").unwrap(); - tree.insert_or_replace(&key, 1, 10, 100).unwrap(); - tree.insert_or_replace(&key, 2, 20, 200).unwrap(); + tree.insert(&key, 1, 10, 100).unwrap(); + tree.insert(&key, 2, 20, 200).unwrap(); let history = tree.get_version_history(&key).unwrap(); assert_eq!(history.len(), 1); @@ -3301,12 +3281,12 @@ mod tests { #[test] fn test_insert_or_replace_unchecked() { - let mut tree: Tree = Tree::new(); + let mut tree: Tree = Tree::with_leaves_type(LeavesType::Single); let key = VariableSizeKey::from_str("key").unwrap(); // Scenario 1: the second value is more recent than the first one. - tree.insert_or_replace_unchecked(&key, 1, 10, 100).unwrap(); - tree.insert_or_replace_unchecked(&key, 2, 20, 200).unwrap(); + tree.insert_unchecked(&key, 1, 10, 100).unwrap(); + tree.insert_unchecked(&key, 2, 20, 200).unwrap(); let history = tree.get_version_history(&key).unwrap(); assert_eq!(history.len(), 1); @@ -3315,7 +3295,7 @@ mod tests { // Scenario 2: the new value has the smaller version and hence // is older than the one already in the tree. Discard the new // value. - tree.insert_or_replace_unchecked(&key, 1, 1, 1).unwrap(); + tree.insert_unchecked(&key, 1, 1, 1).unwrap(); let history = tree.get_version_history(&key).unwrap(); assert_eq!(history.len(), 1); @@ -3541,4 +3521,108 @@ mod tests { assert!(tree.remove(&key1)); } } + + #[test] + fn test_insert_multiple_version_keys() { + let mut tree = Tree::::new(); + + let start = std::time::Instant::now(); + + let num_keys = 100; // Number of keys + let versions_per_key = 10_000; // Number of versions per key + + // Insert 100,00 versions for each of the 100 keys + for key_index in 0..num_keys { + let key = VariableSizeKey::from_str(&format!("key_{}", key_index)).unwrap(); + + for version_index in 0..versions_per_key { + let value = key_index * versions_per_key + version_index; // Value for versioning + tree.insert_unchecked(&key, value, 0, 0).unwrap(); + } + } + + println!( + "Insertion time for 1M key-version pairs: {:?}", + start.elapsed() + ); + } + + fn run_query_benchmark( + tree: &Tree, + key: &P, + query_type: QueryType, + iterations: u32, + ) -> std::time::Duration { + let start = Instant::now(); + for _ in 0..iterations { + let _ = tree.get_value_by_query(key, query_type); + } + start.elapsed() + } + + #[test] + fn benchmark_timestamp_queries() { + let mut tree = Tree::::with_leaves_type(LeavesType::Vector); + + // Test parameters + let num_keys = 100; + let versions_per_key = 10_000; + let query_iterations = 1; + + println!("Setting up test data..."); + let setup_start = Instant::now(); + + // Insert test data with incrementing timestamps + for key_idx in 0..num_keys { + let key = VariableSizeKey::from_str(&format!("key_{}", key_idx)).unwrap(); + for version in 0..versions_per_key { + let value = key_idx * versions_per_key + version; + let ts = version as u64; // Using version as timestamp for predictable ordering + tree.insert_unchecked(&key, value, version as u64, ts) + .unwrap(); + } + } + + println!("Setup completed in {:?}", setup_start.elapsed()); + + // Select a key in the middle for testing + let test_key = VariableSizeKey::from_str("key_50").unwrap(); + let mid_ts = (versions_per_key / 2) as u64; + + // Test cases + let test_cases = vec![ + ("LatestByVersion", QueryType::LatestByVersion(mid_ts)), + ("LatestByTs", QueryType::LatestByTs(mid_ts)), + ("LastLessThanTs", QueryType::LastLessThanTs(mid_ts)), + ("LastLessOrEqualTs", QueryType::LastLessOrEqualTs(mid_ts)), + ("FirstGreaterThanTs", QueryType::FirstGreaterThanTs(mid_ts)), + ( + "FirstGreaterOrEqualTs", + QueryType::FirstGreaterOrEqualTs(mid_ts), + ), + ]; + + println!("\nRunning performance tests..."); + println!( + "Each query type will be executed {} times", + query_iterations + ); + println!( + "Tree contains {} keys with {} versions each", + num_keys, versions_per_key + ); + println!("\nResults:"); + println!( + "{:<25} {:<15} {:<10}", + "Query Type", "Total Time", "Avg Time" + ); + println!("{}", "-".repeat(50)); + + for (name, query_type) in test_cases { + let duration = run_query_benchmark(&tree, &test_key, query_type, query_iterations); + let avg_duration = duration.div_f64(query_iterations as f64); + + println!("{:<25} {:?} {:?}", name, duration, avg_duration); + } + } } diff --git a/src/iter.rs b/src/iter.rs index 3c46cfe..640b163 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -3,7 +3,8 @@ use std::ops::RangeBounds; use std::sync::Arc; use crate::art::{Node, NodeType, QueryType}; -use crate::node::{LeafValue, TwigNode}; +use crate::node::TwigNode; +use crate::version::LeafValue; use crate::KeyTrait; type NodeIterator<'a, P, V> = Box>> + 'a>; @@ -276,7 +277,7 @@ impl<'a, P: KeyTrait + 'a, V: Clone> ForwardIterState<'a, P, V> { let mut iters = Vec::new(); if let NodeType::Twig(twig) = &node.node_type { if range.contains(&twig.key) { - if let Some(v) = twig.get_leaf_by_query_ref(query_type) { + if let Some(v) = twig.get_leaf_by_query(query_type) { leafs.push_back(Leaf(&twig.key, v)); } } diff --git a/src/lib.rs b/src/lib.rs index b770dd8..62950a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ -// #[allow(warnings)] pub mod art; pub mod iter; pub mod node; +pub mod version; use std::cmp::{Ord, Ordering, PartialOrd}; use std::error::Error; diff --git a/src/node.rs b/src/node.rs index 5b5e41c..ac2a685 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,6 +1,7 @@ use std::slice::from_ref; use std::sync::Arc; +use crate::version::{LeafValue, Leaves, LeavesTrait, LeavesType}; use crate::{art::QueryType, KeyTrait}; /* @@ -22,119 +23,39 @@ pub(crate) trait NodeTrait { pub(crate) struct TwigNode { pub(crate) prefix: K, pub(crate) key: K, - pub(crate) values: Vec>>, + pub(crate) storage: Leaves, pub(crate) version: u64, // Version for the twig node } -// Timestamp-Version Ordering Constraint Explanation: -// Given two internal keys associated with the same user key, represented as: -// (key, version1, ts1) and (key, version2, ts2), -// the following ordering constraints apply: -// - If version1 < version2, then it must be that ts1 <= ts2. -// - If ts1 < ts2, then it must be that version1 < version2. -// This ensures a consistent ordering of versions based on their timestamps. -// -#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] -pub(crate) struct LeafValue { - pub(crate) value: V, - pub(crate) version: u64, - pub(crate) ts: u64, -} - -impl LeafValue { - pub(crate) fn new(value: V, version: u64, ts: u64) -> Self { - LeafValue { value, version, ts } - } -} - impl TwigNode { - pub(crate) fn new(prefix: K, key: K) -> Self { + pub(crate) fn new(prefix: K, key: K, leaves_type: LeavesType) -> Self { TwigNode { prefix, key, - values: Vec::new(), + storage: Leaves::new(leaves_type), version: 0, } } - fn insert_common(values: &mut Vec>>, value: V, version: u64, ts: u64) { - let new_leaf_value = LeafValue::new(value, version, ts); - - // Check if a LeafValue with the same version exists and update or insert accordingly - match values.binary_search_by(|v| v.version.cmp(&new_leaf_value.version)) { - Ok(index) => { - // If an entry with the same version and timestamp exists, just put the same value - if values[index].ts == ts { - values[index] = Arc::new(new_leaf_value); - } else { - // If an entry with the same version and different timestamp exists, add a new entry - // Determine the direction to scan based on the comparison of timestamps - let mut insert_position = index; - if values[index].ts < ts { - // Scan forward to find the first entry with a timestamp greater than the new entry's timestamp - insert_position += - values[index..].iter().take_while(|v| v.ts <= ts).count(); - } else { - // Scan backward to find the insertion point before the first entry with a timestamp less than the new entry's timestamp - insert_position -= values[..index] - .iter() - .rev() - .take_while(|v| v.ts >= ts) - .count(); - } - values.insert(insert_position, Arc::new(new_leaf_value)); - } - } - Err(index) => { - // If no entry with the same version exists, insert the new value at the correct position - values.insert(index, Arc::new(new_leaf_value)); - } - } - } - - pub(crate) fn insert(&self, value: V, version: u64, ts: u64) -> TwigNode { - let mut new_values = self.values.clone(); - Self::insert_common(&mut new_values, value, version, ts); + pub(crate) fn insert(&self, value: V, version: u64, ts: u64) -> Self { + let mut new_storage = self.storage.clone(); + new_storage.insert_mut(value, version, ts); TwigNode { prefix: self.prefix.clone(), key: self.key.clone(), - values: new_values, + storage: new_storage, version: version.max(self.version), } } pub(crate) fn insert_mut(&mut self, value: V, version: u64, ts: u64) { - Self::insert_common(&mut self.values, value, version, ts); - self.version = version.max(self.version); // Update LeafNode's version - } - - pub(crate) fn replace_if_newer_mut(&mut self, value: V, version: u64, ts: u64) { - if version > self.version { - self.values.clear(); - self.insert_mut(value, version, ts); - } - } - - pub(crate) fn insert_or_replace( - &self, - value: V, - version: u64, - ts: u64, - replace: bool, - ) -> TwigNode { - if replace { - // Create a replacement Twig node with the new value only. - let mut new_twig = TwigNode::new(self.prefix.clone(), self.key.clone()); - new_twig.insert_mut(value, version, ts); - new_twig - } else { - self.insert(value, version, ts) - } + self.storage.insert_mut(value, version, ts); + self.version = version.max(self.version); } pub(crate) fn iter(&self) -> impl DoubleEndedIterator>> { - self.values.iter() + self.storage.iter() } } @@ -142,14 +63,6 @@ impl TwigNode { impl TwigNode { #[inline] pub(crate) fn get_leaf_by_query(&self, query_type: QueryType) -> Option<&Arc>> { - self.get_leaf_by_query_ref(query_type) - } - - #[inline] - pub(crate) fn get_leaf_by_query_ref( - &self, - query_type: QueryType, - ) -> Option<&Arc>> { match query_type { QueryType::LatestByVersion(version) => self.get_leaf_by_version(version), QueryType::LatestByTs(ts) => self.get_leaf_by_ts(ts), @@ -162,39 +75,26 @@ impl TwigNode { #[inline] pub(crate) fn get_latest_leaf(&self) -> Option<&Arc>> { - self.values.iter().max_by_key(|value| value.version) + self.storage.get_latest_leaf() } #[inline] pub(crate) fn get_leaf_by_version(&self, version: u64) -> Option<&Arc>> { - self.values - .iter() - .filter(|value| value.version <= version) - .max_by_key(|value| value.version) + self.storage.get_leaf_by_version(version) } #[inline] pub(crate) fn get_leaf_by_ts(&self, ts: u64) -> Option<&Arc>> { - self.values - .iter() - .filter(|value| value.ts <= ts) - .max_by_key(|value| value.ts) + self.storage.get_leaf_by_ts(ts) } - #[inline] pub(crate) fn get_all_versions(&self) -> Vec<(V, u64, u64)> { - self.values - .iter() - .map(|value| (value.value.clone(), value.version, value.ts)) - .collect() + self.storage.get_all_versions() } #[inline] pub(crate) fn last_less_than_ts(&self, ts: u64) -> Option<&Arc>> { - self.values - .iter() - .filter(|value| value.ts < ts) - .max_by_key(|value| value.ts) + self.storage.last_less_than_ts(ts) } #[inline] @@ -204,18 +104,12 @@ impl TwigNode { #[inline] pub(crate) fn first_greater_than_ts(&self, ts: u64) -> Option<&Arc>> { - self.values - .iter() - .filter(|value| value.ts > ts) - .min_by_key(|value| value.ts) + self.storage.first_greater_than_ts(ts) } #[inline] pub(crate) fn first_greater_or_equal_ts(&self, ts: u64) -> Option<&Arc>> { - self.values - .iter() - .filter(|value| value.ts >= ts) - .min_by_key(|value| value.ts) + self.storage.first_greater_or_equal_ts(ts) } } @@ -647,7 +541,7 @@ impl NodeTrait for Node256 { #[cfg(test)] mod tests { - use crate::FixedSizeKey; + use crate::{version::LeavesType, FixedSizeKey}; use super::{FlatNode, Node256, Node48, NodeTrait, TwigNode}; use rand::prelude::SliceRandom; @@ -842,35 +736,52 @@ mod tests { } } + fn setup_twig() -> TwigNode, usize> { + let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("foo".as_bytes()); + let node: TwigNode, usize> = TwigNode::, usize>::new( + dummy_prefix.clone(), + dummy_prefix, + LeavesType::Vector, + ); + node + } + #[test] fn twig_insert() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("foo".as_bytes()); + let node = setup_twig(); - let node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + // Test initial state + assert!(node.iter().next().is_none()); + // Test after insert let new_node = node.insert(42, 123, 0); - assert_eq!(node.values.len(), 0); - assert_eq!(new_node.values.len(), 1); - assert_eq!(new_node.values[0].value, 42); - assert_eq!(new_node.values[0].version, 123); + assert!(node.iter().next().is_none()); // Original node unchanged + + let mut new_iter = new_node.iter(); + let leaf = new_iter.next().unwrap(); + assert_eq!(leaf.version, 123); + assert_eq!(leaf.value, 42); + assert_eq!(leaf.ts, 0); + assert!(new_iter.next().is_none()); } #[test] fn twig_insert_mut() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("foo".as_bytes()); - - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(42, 123, 0); - assert_eq!(node.values.len(), 1); - assert_eq!(node.values[0].value, 42); - assert_eq!(node.values[0].version, 123); + + let mut iter = node.iter(); + let leaf = iter.next().unwrap(); + assert_eq!(leaf.version, 123); + assert_eq!(leaf.value, 42); + assert_eq!(leaf.ts, 0); + assert!(iter.next().is_none()); } #[test] fn twig_get_latest_leaf() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("foo".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(42, 123, 0); node.insert_mut(43, 124, 1); let latest_leaf = node.get_latest_leaf(); @@ -879,8 +790,7 @@ mod tests { #[test] fn twig_get_leaf_by_version() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("foo".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(42, 123, 0); node.insert_mut(43, 124, 1); let leaf = node.get_leaf_by_version(123); @@ -891,8 +801,7 @@ mod tests { #[test] fn twig_iter() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("foo".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(42, 123, 0); node.insert_mut(43, 124, 1); let mut iter = node.iter(); @@ -969,69 +878,82 @@ mod tests { #[test] fn twig_get_leaf_by_ts() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("bar".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); + // Inserting leaves with different timestamps node.insert_mut(50, 200, 10); // value: 50, version: 200, timestamp: 10 node.insert_mut(51, 201, 20); // value: 51, version: 201, timestamp: 20 // Test case 1: Retrieve leaf by exact timestamp - let leaf_by_ts = node.get_leaf_by_ts(10); - assert_eq!(leaf_by_ts.unwrap().value, 50); + let leaf = node.get_leaf_by_ts(10).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); // Test case 2: Retrieve leaf by another exact timestamp - let leaf_by_ts = node.get_leaf_by_ts(20); - assert_eq!(leaf_by_ts.unwrap().value, 51); + let leaf = node.get_leaf_by_ts(20).unwrap(); + assert_eq!(leaf.version, 201); + assert_eq!(leaf.value, 51); + assert_eq!(leaf.ts, 20); // Test case 3: Attempt to retrieve leaf by a non-existent timestamp - let leaf_by_ts = node.get_leaf_by_ts(30); - assert_eq!(leaf_by_ts.unwrap().value, 51); + let leaf = node.get_leaf_by_ts(30).unwrap(); + assert_eq!(leaf.version, 201); + assert_eq!(leaf.value, 51); + assert_eq!(leaf.ts, 20); } #[test] fn test_get_leaf_by_version() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("bar".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(50, 200, 10); // value: 50, version: 200, timestamp: 10 node.insert_mut(51, 201, 20); // value: 51, version: 201, timestamp: 20 // Exact version match - let leaf = node.get_leaf_by_version(200); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.get_leaf_by_version(200).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); // Version not present, should get closest lower version let leaf = node.get_leaf_by_version(199); assert!(leaf.is_none()); // Higher version, should get the highest available version - let leaf = node.get_leaf_by_version(202); - assert_eq!(leaf.unwrap().value, 51); + let leaf = node.get_leaf_by_version(202).unwrap(); + assert_eq!(leaf.version, 201); + assert_eq!(leaf.value, 51); + assert_eq!(leaf.ts, 20); } #[test] fn test_get_leaf_by_ts() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("bar".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(50, 200, 10); // value: 50, version: 200, timestamp: 10 node.insert_mut(51, 201, 20); // value: 51, version: 201, timestamp: 20 // Exact timestamp match - let leaf = node.get_leaf_by_ts(10); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.get_leaf_by_ts(10).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); // Timestamp not present, should get closest lower timestamp - let leaf = node.get_leaf_by_ts(15); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.get_leaf_by_ts(15).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); // Higher timestamp, should get the highest available timestamp - let leaf = node.get_leaf_by_ts(25); - assert_eq!(leaf.unwrap().value, 51); + let leaf = node.get_leaf_by_ts(25).unwrap(); + assert_eq!(leaf.version, 201); + assert_eq!(leaf.value, 51); + assert_eq!(leaf.ts, 20); } #[test] fn test_get_all_versions() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("bar".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(50, 200, 10); // value: 50, version: 200, timestamp: 10 node.insert_mut(51, 201, 20); // value: 51, version: 201, timestamp: 20 @@ -1043,81 +965,97 @@ mod tests { #[test] fn test_last_less_than_ts() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("bar".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(50, 200, 10); // value: 50, version: 200, timestamp: 10 node.insert_mut(51, 201, 20); // value: 51, version: 201, timestamp: 20 // Timestamp just below an existing timestamp - let leaf = node.last_less_than_ts(20); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.last_less_than_ts(20).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); // Timestamp well below any existing timestamp let leaf = node.last_less_than_ts(5); assert!(leaf.is_none()); // Timestamp above all existing timestamps - let leaf = node.last_less_than_ts(25); - assert_eq!(leaf.unwrap().value, 51); + let leaf = node.last_less_than_ts(25).unwrap(); + assert_eq!(leaf.version, 201); + assert_eq!(leaf.value, 51); + assert_eq!(leaf.ts, 20); } #[test] fn test_last_less_or_equal_ts() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("bar".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(50, 200, 10); // value: 50, version: 200, timestamp: 10 node.insert_mut(51, 201, 20); // value: 51, version: 201, timestamp: 20 // Exact timestamp - let leaf = node.last_less_or_equal_ts(10); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.last_less_or_equal_ts(10).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); // Timestamp not present, should get closest lower timestamp - let leaf = node.last_less_or_equal_ts(15); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.last_less_or_equal_ts(15).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); // Higher timestamp, should get the highest available timestamp - let leaf = node.last_less_or_equal_ts(25); - assert_eq!(leaf.unwrap().value, 51); + let leaf = node.last_less_or_equal_ts(25).unwrap(); + assert_eq!(leaf.version, 201); + assert_eq!(leaf.value, 51); + assert_eq!(leaf.ts, 20); } #[test] fn test_first_greater_than_ts() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("bar".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(50, 200, 10); // value: 50, version: 200, timestamp: 10 node.insert_mut(51, 201, 20); // value: 51, version: 201, timestamp: 20 // Timestamp just above an existing timestamp - let leaf = node.first_greater_than_ts(10); - assert_eq!(leaf.unwrap().value, 51); + let leaf = node.first_greater_than_ts(10).unwrap(); + assert_eq!(leaf.version, 201); + assert_eq!(leaf.value, 51); + assert_eq!(leaf.ts, 20); // Timestamp well above any existing timestamp let leaf = node.first_greater_than_ts(25); assert!(leaf.is_none()); // Timestamp below all existing timestamps - let leaf = node.first_greater_than_ts(5); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.first_greater_than_ts(5).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); } #[test] fn test_first_greater_or_equal_ts() { - let dummy_prefix: FixedSizeKey<8> = FixedSizeKey::create_key("bar".as_bytes()); - let mut node = TwigNode::, usize>::new(dummy_prefix.clone(), dummy_prefix); + let mut node = setup_twig(); node.insert_mut(50, 200, 10); // value: 50, version: 200, timestamp: 10 node.insert_mut(51, 201, 20); // value: 51, version: 201, timestamp: 20 // Exact timestamp - let leaf = node.first_greater_or_equal_ts(10); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.first_greater_or_equal_ts(10).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); // Timestamp not present, should get closest higher timestamp - let leaf = node.first_greater_or_equal_ts(15); - assert_eq!(leaf.unwrap().value, 51); + let leaf = node.first_greater_or_equal_ts(15).unwrap(); + assert_eq!(leaf.version, 201); + assert_eq!(leaf.value, 51); + assert_eq!(leaf.ts, 20); // Lower timestamp, should get the lowest available timestamp - let leaf = node.first_greater_or_equal_ts(5); - assert_eq!(leaf.unwrap().value, 50); + let leaf = node.first_greater_or_equal_ts(5).unwrap(); + assert_eq!(leaf.version, 200); + assert_eq!(leaf.value, 50); + assert_eq!(leaf.ts, 10); } } diff --git a/src/version.rs b/src/version.rs new file mode 100644 index 0000000..32765b6 --- /dev/null +++ b/src/version.rs @@ -0,0 +1,879 @@ +use std::sync::Arc; + +#[derive(Clone)] +pub(crate) enum Leaves { + Vector(Vector), + Single(Single), +} + +#[derive(Clone, Copy, Debug)] +pub enum LeavesType { + Vector, + Single, +} + +// Timestamp-Version Ordering Constraint Explanation: +// Given two internal keys associated with the same user key, represented as: +// (key, version1, ts1) and (key, version2, ts2), +// the following ordering constraints apply: +// - If version1 < version2, then it must be that ts1 <= ts2. +// - If ts1 < ts2, then it must be that version1 < version2. +// This ensures a consistent ordering of versions based on their timestamps. +// +#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)] +pub(crate) struct LeafValue { + pub(crate) value: V, + pub(crate) version: u64, + pub(crate) ts: u64, +} + +#[allow(unused)] +pub(crate) trait LeavesTrait: Clone { + type Iter<'a>: DoubleEndedIterator>> + 'a + where + Self: 'a, + V: 'a; + + fn new(storage_type: LeavesType) -> Self; + fn insert_mut(&mut self, value: V, version: u64, ts: u64); + fn clear(&mut self); + fn get_latest_leaf(&self) -> Option<&Arc>>; + fn get_leaf_by_version(&self, version: u64) -> Option<&Arc>>; + fn get_leaf_by_ts(&self, ts: u64) -> Option<&Arc>>; + fn last_less_than_ts(&self, ts: u64) -> Option<&Arc>>; + fn first_greater_than_ts(&self, ts: u64) -> Option<&Arc>>; + fn first_greater_or_equal_ts(&self, ts: u64) -> Option<&Arc>>; + fn get_all_versions(&self) -> Vec<(V, u64, u64)>; + fn iter(&self) -> Self::Iter<'_>; +} + +impl LeavesTrait for Leaves { + type Iter<'a> + = Box>> + 'a> + where + Self: 'a; + + fn new(storage_type: LeavesType) -> Self { + match storage_type { + LeavesType::Vector => Leaves::Vector(Vector::new()), + LeavesType::Single => Leaves::Single(Single::new()), + } + } + + fn insert_mut(&mut self, value: V, version: u64, ts: u64) { + match self { + Leaves::Vector(storage) => storage.insert_mut(value, version, ts), + Leaves::Single(storage) => storage.insert_mut(value, version, ts), + } + } + + fn clear(&mut self) { + match self { + Leaves::Vector(storage) => storage.clear(), + Leaves::Single(storage) => storage.clear(), + } + } + + fn get_latest_leaf(&self) -> Option<&Arc>> { + match self { + Leaves::Vector(storage) => storage.get_latest_leaf(), + Leaves::Single(storage) => storage.get_latest_leaf(), + } + } + + fn get_leaf_by_version(&self, version: u64) -> Option<&Arc>> { + match self { + Leaves::Vector(storage) => storage.get_leaf_by_version(version), + Leaves::Single(storage) => storage.get_leaf_by_version(version), + } + } + + fn get_leaf_by_ts(&self, ts: u64) -> Option<&Arc>> { + match self { + Leaves::Vector(storage) => storage.get_leaf_by_ts(ts), + Leaves::Single(storage) => storage.get_leaf_by_ts(ts), + } + } + + fn last_less_than_ts(&self, ts: u64) -> Option<&Arc>> { + match self { + Leaves::Vector(storage) => storage.last_less_than_ts(ts), + Leaves::Single(storage) => storage.last_less_than_ts(ts), + } + } + + fn first_greater_than_ts(&self, ts: u64) -> Option<&Arc>> { + match self { + Leaves::Vector(storage) => storage.first_greater_than_ts(ts), + Leaves::Single(storage) => storage.first_greater_than_ts(ts), + } + } + + fn first_greater_or_equal_ts(&self, ts: u64) -> Option<&Arc>> { + match self { + Leaves::Vector(storage) => storage.first_greater_or_equal_ts(ts), + Leaves::Single(storage) => storage.first_greater_or_equal_ts(ts), + } + } + + fn get_all_versions(&self) -> Vec<(V, u64, u64)> { + match self { + Leaves::Vector(storage) => storage.get_all_versions(), + Leaves::Single(storage) => storage.get_all_versions(), + } + } + + fn iter(&self) -> Self::Iter<'_> { + match self { + Leaves::Vector(storage) => Box::new(storage.iter()), + Leaves::Single(storage) => Box::new(storage.iter()), + } + } +} + +#[derive(Clone)] +pub(crate) struct Vector { + values: Vec>>, +} + +impl Vector { + fn new() -> Self { + Vector { values: Vec::new() } + } + + fn insert_mut(&mut self, value: V, version: u64, ts: u64) { + let leaf_value = Arc::new(LeafValue { value, version, ts }); + + // Check if a LeafValue with the same version exists and update or insert accordingly + match self.values.binary_search_by(|v| v.version.cmp(&version)) { + Ok(index) => { + // If an entry with the same version and timestamp exists, just put the same value + if self.values[index].ts == ts { + self.values[index] = leaf_value; + } else { + // If an entry with the same version and different timestamp exists, add a new entry + // Determine the direction to scan based on the comparison of timestamps + + // Scan forward to find the first entry with a timestamp greater than the new entry's timestamp + let insert_position = if self.values[index].ts < ts { + index + + self.values[index..] + .iter() + .take_while(|v| v.ts <= ts) + .count() + } else { + // Scan backward to find the insertion point before the first entry with a timestamp less than the new entry's timestamp + index + - self.values[..index] + .iter() + .rev() + .take_while(|v| v.ts >= ts) + .count() + }; + self.values.insert(insert_position, leaf_value); + } + } + Err(index) => self.values.insert(index, leaf_value), + } + } + + #[allow(unused)] + fn clear(&mut self) { + self.values.clear(); + } + + fn get_latest_leaf(&self) -> Option<&Arc>> { + self.values.iter().max_by_key(|value| value.version) + } + + fn get_leaf_by_version(&self, version: u64) -> Option<&Arc>> { + self.values + .iter() + .filter(|value| value.version <= version) + .max_by_key(|value| value.version) + } + + fn get_leaf_by_ts(&self, ts: u64) -> Option<&Arc>> { + self.values + .iter() + .filter(|value| value.ts <= ts) + .max_by_key(|value| value.ts) + } + + fn last_less_than_ts(&self, ts: u64) -> Option<&Arc>> { + self.values + .iter() + .filter(|value| value.ts < ts) + .max_by_key(|value| value.ts) + } + + fn first_greater_than_ts(&self, ts: u64) -> Option<&Arc>> { + self.values + .iter() + .filter(|value| value.ts > ts) + .min_by_key(|value| value.ts) + } + + fn first_greater_or_equal_ts(&self, ts: u64) -> Option<&Arc>> { + self.values + .iter() + .filter(|value| value.ts >= ts) + .min_by_key(|value| value.ts) + } + + fn get_all_versions(&self) -> Vec<(V, u64, u64)> { + self.values + .iter() + .map(|value| (value.value.clone(), value.version, value.ts)) + .collect() + } + + fn iter(&self) -> impl DoubleEndedIterator>> + '_ { + self.values.iter() + } +} + +#[derive(Clone)] +pub(crate) struct Single { + value: Option>>, +} + +impl Single { + fn new() -> Self { + Single { value: None } + } + + fn insert_mut(&mut self, value: V, version: u64, ts: u64) { + // Only replace if the provided value is more recent than + // the existing ones. This is important because this method + // is used for loading the index in SurrealKV and thus must + // be able to handle segments left by an unfinished compaction + // where older versions can end up in more recent segments + // after the newer versions. + if version > self.value.as_ref().map_or(0, |v| v.version) { + self.value = Some(Arc::new(LeafValue { value, version, ts })); + } + } + + #[allow(unused)] + fn clear(&mut self) { + self.value = None; + } + + fn get_latest_leaf(&self) -> Option<&Arc>> { + self.value.as_ref() + } + + fn get_leaf_by_version(&self, version: u64) -> Option<&Arc>> { + self.value + .as_ref() + .and_then(|v| if v.version <= version { Some(v) } else { None }) + } + + fn get_leaf_by_ts(&self, ts: u64) -> Option<&Arc>> { + self.value + .as_ref() + .and_then(|v| if v.ts <= ts { Some(v) } else { None }) + } + + fn last_less_than_ts(&self, ts: u64) -> Option<&Arc>> { + self.value + .as_ref() + .and_then(|v| if v.ts < ts { Some(v) } else { None }) + } + + fn first_greater_than_ts(&self, ts: u64) -> Option<&Arc>> { + self.value + .as_ref() + .and_then(|v| if v.ts > ts { Some(v) } else { None }) + } + + fn first_greater_or_equal_ts(&self, ts: u64) -> Option<&Arc>> { + self.value + .as_ref() + .and_then(|v| if v.ts >= ts { Some(v) } else { None }) + } + + fn get_all_versions(&self) -> Vec<(V, u64, u64)> { + self.value + .as_ref() + .map(|v| vec![(v.value.clone(), v.version, v.ts)]) + .unwrap_or_default() + } + + fn iter(&self) -> impl DoubleEndedIterator>> + '_ { + self.value.iter() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_storage_types() { + let test_cases = vec![LeavesType::Vector, LeavesType::Single]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + + // Test empty state + assert!(storage.get_latest_leaf().is_none()); + + // Test single insert + storage.insert_mut(42, 1, 100); + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!(leaf.value, 42); + + // Test version query + assert!(storage.get_leaf_by_version(0).is_none()); + assert_eq!(storage.get_leaf_by_version(1).unwrap().value, 42); + assert_eq!(storage.get_leaf_by_version(2).unwrap().value, 42); + + // Test timestamp query + assert!(storage.get_leaf_by_ts(50).is_none()); + assert_eq!(storage.get_leaf_by_ts(100).unwrap().value, 42); + assert_eq!(storage.get_leaf_by_ts(150).unwrap().value, 42); + + // Test range queries + assert!(storage.last_less_than_ts(100).is_none()); + assert_eq!(storage.first_greater_than_ts(50).unwrap().value, 42); + assert_eq!(storage.first_greater_or_equal_ts(100).unwrap().value, 42); + + // Test multiple inserts + storage.insert_mut(43, 2, 200); + let versions = storage.get_all_versions(); + match storage_type { + LeavesType::Single => assert_eq!(versions.len(), 1), + _ => assert_eq!(versions.len(), 2), + } + + // Test clear + storage.clear(); + assert!(storage.get_latest_leaf().is_none()); + } + } + + #[test] + fn test_single_storage_specific() { + let mut storage: Leaves = Leaves::new(LeavesType::Single); + + // Test that single storage only keeps the latest value + storage.insert_mut(1, 1, 100); + storage.insert_mut(2, 2, 200); + storage.insert_mut(3, 3, 300); + + let versions = storage.get_all_versions(); + assert_eq!(versions.len(), 1); + assert_eq!(versions[0].0, 3); + + // Test that older versions are not accessible + assert!(storage.get_leaf_by_version(1).is_none()); + assert!(storage.get_leaf_by_ts(100).is_none()); + } + + fn setup_test_data(storage: &mut Leaves) { + // Insert test data with different versions and timestamps + // version: 1, ts: 100 - First entry + // version: 2, ts: 200 - Middle entry + // version: 3, ts: 300 - Latest entry + storage.insert_mut(1, 1, 100); + storage.insert_mut(2, 2, 200); + storage.insert_mut(3, 3, 300); + } + + #[test] + fn test_version_queries() { + let test_cases = vec![LeavesType::Vector, LeavesType::Single]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + setup_test_data(&mut storage); + + // Test get_latest_leaf + let latest = storage.get_latest_leaf().unwrap(); + match storage_type { + LeavesType::Single => { + assert_eq!(latest.value, 3); + assert_eq!(latest.version, 3); + assert_eq!(latest.ts, 300); + } + _ => { + assert_eq!(latest.value, 3); + assert_eq!(latest.version, 3); + assert_eq!(latest.ts, 300); + } + } + + // Test get_leaf_by_version + match storage_type { + LeavesType::Single => { + assert!(storage.get_leaf_by_version(1).is_none()); // Single only has latest + assert!(storage.get_leaf_by_version(2).is_none()); + let leaf = storage.get_leaf_by_version(3).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + let leaf = storage.get_leaf_by_version(4).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + } + _ => { + let leaf = storage.get_leaf_by_version(1).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (1, 1, 100)); + let leaf = storage.get_leaf_by_version(2).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (2, 2, 200)); + let leaf = storage.get_leaf_by_version(3).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + let leaf = storage.get_leaf_by_version(4).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + } + } + + // Test non-existent version + assert!(storage.get_leaf_by_version(0).is_none()); + } + } + + #[test] + fn test_timestamp_queries() { + let test_cases = vec![LeavesType::Vector, LeavesType::Single]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + setup_test_data(&mut storage); + + // Test get_leaf_by_ts + match storage_type { + LeavesType::Single => { + assert!(storage.get_leaf_by_ts(100).is_none()); // Single only has latest + assert!(storage.get_leaf_by_ts(200).is_none()); + let leaf = storage.get_leaf_by_ts(300).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + let leaf = storage.get_leaf_by_ts(400).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + } + _ => { + let leaf = storage.get_leaf_by_ts(100).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (1, 1, 100)); + let leaf = storage.get_leaf_by_ts(200).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (2, 2, 200)); + let leaf = storage.get_leaf_by_ts(300).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + let leaf = storage.get_leaf_by_ts(400).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + } + } + + // Test last_less_than_ts + match storage_type { + LeavesType::Single => { + assert!(storage.last_less_than_ts(300).is_none()); // Single only has latest + let leaf = storage.last_less_than_ts(400).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + } + _ => { + assert!(storage.last_less_than_ts(100).is_none()); // Nothing before first entry + let leaf = storage.last_less_than_ts(200).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (1, 1, 100)); + let leaf = storage.last_less_than_ts(300).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (2, 2, 200)); + let leaf = storage.last_less_than_ts(400).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + } + } + + // Test first_greater_than_ts + match storage_type { + LeavesType::Single => { + let leaf = storage.first_greater_than_ts(200).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + assert!(storage.first_greater_than_ts(300).is_none()); // Nothing after latest + } + _ => { + let leaf = storage.first_greater_than_ts(0).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (1, 1, 100)); + let leaf = storage.first_greater_than_ts(100).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (2, 2, 200)); + let leaf = storage.first_greater_than_ts(200).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + assert!(storage.first_greater_than_ts(300).is_none()); // Nothing after latest + } + } + + // Test first_greater_or_equal_ts + match storage_type { + LeavesType::Single => { + let leaf = storage.first_greater_or_equal_ts(300).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + assert!(storage.first_greater_or_equal_ts(301).is_none()); // Nothing after latest + } + _ => { + let leaf = storage.first_greater_or_equal_ts(0).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (1, 1, 100)); + let leaf = storage.first_greater_or_equal_ts(100).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (1, 1, 100)); + let leaf = storage.first_greater_or_equal_ts(200).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (2, 2, 200)); + let leaf = storage.first_greater_or_equal_ts(300).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + assert!(storage.first_greater_or_equal_ts(301).is_none()); // Nothing after latest + } + } + } + } + + #[test] + fn test_get_all_versions() { + let test_cases = vec![LeavesType::Vector, LeavesType::Single]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + setup_test_data(&mut storage); + + let versions = storage.get_all_versions(); + match storage_type { + LeavesType::Single => { + assert_eq!(versions.len(), 1); + assert_eq!(versions[0], (3, 3, 300)); // Only latest version + } + _ => { + assert_eq!(versions.len(), 3); + assert_eq!(versions[0], (1, 1, 100)); + assert_eq!(versions[1], (2, 2, 200)); + assert_eq!(versions[2], (3, 3, 300)); + } + } + } + } + + #[test] + fn test_iter() { + let test_cases = vec![LeavesType::Vector, LeavesType::Single]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + setup_test_data(&mut storage); + + let iter_vec: Vec<_> = storage.iter().collect(); + match storage_type { + LeavesType::Single => { + assert_eq!(iter_vec.len(), 1); + let leaf = iter_vec[0]; + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + } + _ => { + assert_eq!(iter_vec.len(), 3); + assert_eq!( + (iter_vec[0].value, iter_vec[0].version, iter_vec[0].ts), + (1, 1, 100) + ); + assert_eq!( + (iter_vec[1].value, iter_vec[1].version, iter_vec[1].ts), + (2, 2, 200) + ); + assert_eq!( + (iter_vec[2].value, iter_vec[2].version, iter_vec[2].ts), + (3, 3, 300) + ); + } + } + + // Test reverse iteration + let rev_iter_vec: Vec<_> = storage.iter().rev().collect(); + match storage_type { + LeavesType::Single => { + assert_eq!(rev_iter_vec.len(), 1); + let leaf = rev_iter_vec[0]; + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 300)); + } + _ => { + assert_eq!(rev_iter_vec.len(), 3); + assert_eq!( + ( + rev_iter_vec[0].value, + rev_iter_vec[0].version, + rev_iter_vec[0].ts + ), + (3, 3, 300) + ); + assert_eq!( + ( + rev_iter_vec[1].value, + rev_iter_vec[1].version, + rev_iter_vec[1].ts + ), + (2, 2, 200) + ); + assert_eq!( + ( + rev_iter_vec[2].value, + rev_iter_vec[2].version, + rev_iter_vec[2].ts + ), + (1, 1, 100) + ); + } + } + } + } + + #[test] + fn test_edge_cases() { + let test_cases = vec![LeavesType::Vector, LeavesType::Single]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + + // Test empty storage + assert!(storage.get_latest_leaf().is_none()); + assert!(storage.get_leaf_by_version(1).is_none()); + assert!(storage.get_leaf_by_ts(100).is_none()); + assert!(storage.last_less_than_ts(100).is_none()); + assert!(storage.first_greater_than_ts(100).is_none()); + assert!(storage.first_greater_or_equal_ts(100).is_none()); + assert_eq!(storage.get_all_versions().len(), 0); + assert_eq!(storage.iter().count(), 0); + + // Test single value + storage.insert_mut(42, 1, 100); + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (42, 1, 100)); + let leaf = storage.get_leaf_by_version(1).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (42, 1, 100)); + let leaf = storage.get_leaf_by_ts(100).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (42, 1, 100)); + assert!(storage.last_less_than_ts(100).is_none()); + assert!(storage.first_greater_than_ts(100).is_none()); + let leaf = storage.first_greater_or_equal_ts(100).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (42, 1, 100)); + + // Test clear + storage.clear(); + assert!(storage.get_latest_leaf().is_none()); + assert_eq!(storage.get_all_versions().len(), 0); + } + } + + #[test] + fn test_large_dataset() { + let test_cases = vec![LeavesType::Vector, LeavesType::Single]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + + // Insert 1000 versions with increasing timestamps + for i in 0..1000 { + storage.insert_mut(i, i as u64, i as u64 * 100); + } + + match storage_type { + LeavesType::Single => { + // Single storage should only have the latest value + assert_eq!(storage.get_all_versions().len(), 1); + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (999, 999, 99900)); + + // Only latest version should be accessible + assert!(storage.get_leaf_by_version(500).is_none()); + let leaf = storage.get_leaf_by_version(999).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (999, 999, 99900)); + + // Only latest timestamp should be accessible + assert!(storage.get_leaf_by_ts(50000).is_none()); + let leaf = storage.get_leaf_by_ts(99900).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (999, 999, 99900)); + } + _ => { + // Test version queries + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (999, 999, 99900)); + let leaf = storage.get_leaf_by_version(500).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (500, 500, 50000)); + let leaf = storage.get_leaf_by_version(999).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (999, 999, 99900)); + + // Test timestamp queries + let leaf = storage.get_leaf_by_ts(50000).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (500, 500, 50000)); + let leaf = storage.get_leaf_by_ts(99900).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (999, 999, 99900)); + + // Test range queries + let leaf = storage.last_less_than_ts(50000).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (499, 499, 49900)); + let leaf = storage.first_greater_than_ts(50000).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (501, 501, 50100)); + let leaf = storage.first_greater_or_equal_ts(50000).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (500, 500, 50000)); + + // Test iterator + let all_versions = storage.get_all_versions(); + assert_eq!(all_versions.len(), 1000); + assert_eq!(all_versions[0], (0, 0, 0)); + assert_eq!(all_versions[499], (499, 499, 49900)); + assert_eq!(all_versions[999], (999, 999, 99900)); + + // Test reverse iteration + let rev_iter: Vec<_> = storage.iter().rev().collect(); + assert_eq!( + (rev_iter[0].value, rev_iter[0].version, rev_iter[0].ts), + (999, 999, 99900) + ); + assert_eq!( + (rev_iter[499].value, rev_iter[499].version, rev_iter[499].ts), + (500, 500, 50000) + ); + assert_eq!( + (rev_iter[999].value, rev_iter[999].version, rev_iter[999].ts), + (0, 0, 0) + ); + } + } + } + } + + #[test] + fn test_non_monotonic_inserts() { + // LeavesType::Single does not support non-monotonic inserts + let test_cases = vec![LeavesType::Vector]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + + // Insert values in non-monotonic order + storage.insert_mut(3, 3, 300); + storage.insert_mut(1, 1, 100); + storage.insert_mut(2, 2, 200); + + match storage_type { + LeavesType::Single => { + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (2, 2, 200)); + assert_eq!(storage.get_all_versions().len(), 1); + } + _ => { + // Verify correct ordering + let versions = storage.get_all_versions(); + assert_eq!(versions.len(), 3); + assert_eq!(versions[0], (1, 1, 100)); + assert_eq!(versions[1], (2, 2, 200)); + assert_eq!(versions[2], (3, 3, 300)); + } + } + } + } + + #[test] + fn test_with_duplicates() { + // LeavesType::Single does not support non-monotonic or duplicate inserts + let test_cases = vec![LeavesType::Vector]; + + for storage_type in test_cases { + let mut storage: Leaves = Leaves::new(storage_type); + + // Test duplicate timestamps with different versions + storage.insert_mut(1, 1, 100); + storage.insert_mut(2, 2, 100); // Same timestamp + storage.insert_mut(3, 3, 100); // Same timestamp + + match storage_type { + LeavesType::Single => { + let leaf = storage.get_leaf_by_ts(100).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 100)); + } + _ => { + let versions_at_ts = storage + .get_all_versions() + .iter() + .filter(|(_, _, ts)| *ts == 100) + .count(); + assert_eq!(versions_at_ts, 3); + } + } + + storage.clear(); + + // Test duplicate versions with different timestamps + storage.insert_mut(1, 5, 100); + storage.insert_mut(2, 5, 200); // Same version + storage.insert_mut(3, 5, 300); // Same version + + match storage_type { + LeavesType::Single => { + let leaf = storage.get_leaf_by_version(5).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 5, 300)); + } + _ => { + let leaf = storage.get_leaf_by_version(5).unwrap(); + assert_eq!(leaf.value, 3); // Should get latest by timestamp + } + } + + storage.clear(); + + // Test large gaps + storage.insert_mut(1, 1, 100); + storage.insert_mut(2, 1_000_000, u64::MAX / 2); // Large version gap + storage.insert_mut(3, 1_000_001, u64::MAX); // Large timestamp gap + + match storage_type { + LeavesType::Single => { + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!( + (leaf.value, leaf.version, leaf.ts), + (3, 1_000_001, u64::MAX) + ); + } + _ => { + let leaf = storage.get_leaf_by_version(500_000).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (1, 1, 100)); + let leaf = storage.get_leaf_by_ts(u64::MAX / 4).unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (1, 1, 100)); + } + } + + storage.clear(); + + // Test maximum u64 values + storage.insert_mut(1, u64::MAX - 2, 100); + storage.insert_mut(2, u64::MAX - 1, u64::MAX - 1); + storage.insert_mut(3, u64::MAX, u64::MAX); + + match storage_type { + LeavesType::Single => { + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, u64::MAX, u64::MAX)); + } + _ => { + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, u64::MAX, u64::MAX)); + let leaf = storage.get_leaf_by_version(u64::MAX - 1).unwrap(); + assert_eq!( + (leaf.value, leaf.version, leaf.ts), + (2, u64::MAX - 1, u64::MAX - 1) + ); + } + } + + storage.clear(); + + // Test non-sequential timestamp ordering with same versions + storage.insert_mut(1, 1, 300); // Higher timestamp first + storage.insert_mut(2, 2, 200); // Middle timestamp second + storage.insert_mut(3, 3, 100); // Lowest timestamp last + + match storage_type { + LeavesType::Single => { + let leaf = storage.get_latest_leaf().unwrap(); + assert_eq!((leaf.value, leaf.version, leaf.ts), (3, 3, 100)); + } + _ => { + let versions = storage.get_all_versions(); + assert_eq!(versions.len(), 3); + // Should be ordered by version + assert_eq!(versions[0], (1, 1, 300)); + assert_eq!(versions[1], (2, 2, 200)); + assert_eq!(versions[2], (3, 3, 100)); + } + } + } + } +} From 7d382efb5e1a6ad5741a9906fc38da6f3807e60a Mon Sep 17 00:00:00 2001 From: arriqaaq Date: Fri, 7 Feb 2025 05:40:46 +0530 Subject: [PATCH 2/3] optimise insert_mut on vector --- benches/vart_bench.rs | 21 ---- src/version.rs | 237 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 209 insertions(+), 49 deletions(-) diff --git a/benches/vart_bench.rs b/benches/vart_bench.rs index 66206ab..179ae7d 100644 --- a/benches/vart_bench.rs +++ b/benches/vart_bench.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::str::FromStr; use std::time::Instant; @@ -41,16 +40,6 @@ pub fn seq_insert_mut(c: &mut Criterion) { }) }); - // Benchmark for BTreeMap - group.bench_function("btreemap", |b| { - let mut btree = BTreeMap::new(); - let mut key = 0u64; - b.iter(|| { - btree.insert(key, key); - key += 1; - }) - }); - group.finish(); } @@ -87,16 +76,6 @@ pub fn rand_insert_mut(c: &mut Criterion) { }) }); - // Benchmark for BTreeMap - group.bench_function("btreemap", |b| { - let mut btree = BTreeMap::new(); - let mut rng = seeded_rng(0xE080D1A42C207DAF); - b.iter(|| { - let key = &keys[rng.gen_range(0..keys.len())]; - btree.insert(key.clone(), key.clone()); - }) - }); - group.finish(); } diff --git a/src/version.rs b/src/version.rs index 32765b6..08982d5 100644 --- a/src/version.rs +++ b/src/version.rs @@ -144,36 +144,28 @@ impl Vector { fn insert_mut(&mut self, value: V, version: u64, ts: u64) { let leaf_value = Arc::new(LeafValue { value, version, ts }); - // Check if a LeafValue with the same version exists and update or insert accordingly - match self.values.binary_search_by(|v| v.version.cmp(&version)) { - Ok(index) => { - // If an entry with the same version and timestamp exists, just put the same value - if self.values[index].ts == ts { - self.values[index] = leaf_value; - } else { - // If an entry with the same version and different timestamp exists, add a new entry - // Determine the direction to scan based on the comparison of timestamps - - // Scan forward to find the first entry with a timestamp greater than the new entry's timestamp - let insert_position = if self.values[index].ts < ts { - index - + self.values[index..] - .iter() - .take_while(|v| v.ts <= ts) - .count() - } else { - // Scan backward to find the insertion point before the first entry with a timestamp less than the new entry's timestamp - index - - self.values[..index] - .iter() - .rev() - .take_while(|v| v.ts >= ts) - .count() - }; - self.values.insert(insert_position, leaf_value); + let version = leaf_value.version; + // Find the start and end of the version group + let start = self.values.partition_point(|v| v.version < version); + let end = self.values.partition_point(|v| v.version <= version); + + if start == end { + // No existing entries with this version; insert at the correct position + self.values.insert(start, leaf_value); + } else { + // Within the version group, find the position based on ts + let ts = leaf_value.ts; + let version_group = &self.values[start..end]; + match version_group.binary_search_by(|v| v.ts.cmp(&ts)) { + Ok(pos) => { + // Replace existing entry with the same version and ts + self.values[start + pos] = leaf_value; + } + Err(pos) => { + // Insert the new entry at the correct position to maintain order + self.values.insert(start + pos, leaf_value); } } - Err(index) => self.values.insert(index, leaf_value), } } @@ -876,4 +868,193 @@ mod tests { } } } + + #[test] + fn test_version_ordering() { + let mut vec = Vector::new(); + + // Insert out of order versions + vec.insert_mut(1, 2, 100); + vec.insert_mut(2, 1, 100); + vec.insert_mut(3, 3, 100); + + // Verify versions are ordered + let versions: Vec<_> = vec.values.iter().map(|v| v.version).collect(); + assert_eq!(versions, vec![1, 2, 3]); + } + + #[test] + fn test_timestamp_ordering_within_version() { + let mut vec = Vector::new(); + + // Insert same version with different timestamps out of order + vec.insert_mut(1, 1, 200); + vec.insert_mut(2, 1, 100); + vec.insert_mut(3, 1, 150); + + // Check values are ordered by timestamp within version 1 + let timestamps: Vec<_> = vec.values.iter().map(|v| v.ts).collect(); + assert_eq!(timestamps, vec![100, 150, 200]); + } + + #[test] + fn test_mixed_ordering() { + let mut vec = Vector::new(); + + // Mix of versions and timestamps + vec.insert_mut(1, 1, 200); // version 1, late ts + vec.insert_mut(2, 2, 100); // version 2, early ts + vec.insert_mut(3, 1, 100); // version 1, early ts + + let values: Vec<_> = vec.values.iter().map(|v| v.value).collect(); + assert_eq!(values, vec![3, 1, 2]); // Ordered by version first, then ts + } + + #[test] + fn test_replace_exact_match() { + let mut vec = Vector::new(); + + // Insert initial value + vec.insert_mut(1, 1, 100); + // Replace with same version and timestamp + vec.insert_mut(2, 1, 100); + + assert_eq!(vec.values.len(), 1); + assert_eq!(vec.values[0].value, 2); + } + + #[test] + fn test_multiple_versions() { + let mut vec = Vector::new(); + + // Insert multiple entries for each version + vec.insert_mut(1, 1, 100); + vec.insert_mut(2, 1, 200); + vec.insert_mut(3, 2, 150); + vec.insert_mut(4, 2, 250); + + let version_groups: Vec<_> = vec.values.iter().map(|v| (v.version, v.ts)).collect(); + + assert_eq!(version_groups, vec![(1, 100), (1, 200), (2, 150), (2, 250)]); + } + + #[test] + fn test_insert_between_existing() { + let mut vec = Vector::new(); + + vec.insert_mut(1, 1, 100); + vec.insert_mut(2, 1, 300); + vec.insert_mut(3, 1, 200); // Should insert between the two + + let timestamps: Vec<_> = vec.values.iter().map(|v| v.ts).collect(); + assert_eq!(timestamps, vec![100, 200, 300]); + } + + #[test] + fn test_edge_case_orderings() { + let mut vec = Vector::new(); + + // Test edge cases: + // 1. Multiple entries with same version, increasing timestamps + vec.insert_mut(1, 1, 100); + vec.insert_mut(2, 1, 101); + vec.insert_mut(3, 1, 102); + + // 2. Multiple entries with same version, decreasing timestamps + vec.insert_mut(4, 2, 103); + vec.insert_mut(5, 2, 102); + vec.insert_mut(6, 2, 101); + + // 3. Same timestamp, different versions + vec.insert_mut(7, 3, 100); + vec.insert_mut(8, 4, 100); + vec.insert_mut(9, 5, 100); + + // 4. Interleaved versions and timestamps + vec.insert_mut(10, 6, 102); + vec.insert_mut(11, 6, 101); + vec.insert_mut(12, 7, 101); + vec.insert_mut(13, 7, 102); + + // Verify the final ordering + let entries: Vec<_> = vec + .values + .iter() + .map(|v| (v.version, v.ts, v.value)) + .collect(); + + assert_eq!( + entries, + vec![ + (1, 100, 1), + (1, 101, 2), + (1, 102, 3), + (2, 101, 6), + (2, 102, 5), + (2, 103, 4), + (3, 100, 7), + (4, 100, 8), + (5, 100, 9), + (6, 101, 11), + (6, 102, 10), + (7, 101, 12), + (7, 102, 13), + ] + ); + } + + #[test] + fn test_boundary_value_ordering() { + let mut vec = Vector::new(); + + // Test boundary values for timestamps and versions + vec.insert_mut(1, 0, 0); // Minimum values + vec.insert_mut(2, u64::MAX, 0); // Max version, min ts + vec.insert_mut(3, 0, u64::MAX); // Min version, max ts + vec.insert_mut(4, u64::MAX, u64::MAX); // Maximum values + + let entries: Vec<_> = vec.values.iter().map(|v| (v.version, v.ts)).collect(); + + assert_eq!( + entries, + vec![(0, 0), (0, u64::MAX), (u64::MAX, 0), (u64::MAX, u64::MAX),] + ); + } + + #[test] + fn test_sparse_dense_ordering() { + let mut vec = Vector::new(); + + // Test sparse vs dense version numbers + vec.insert_mut(1, 1, 100); + vec.insert_mut(2, 1000, 100); // Sparse + vec.insert_mut(3, 1001, 100); // Dense after sparse + vec.insert_mut(4, 2, 100); // Dense after initial + vec.insert_mut(5, 999, 100); // Fill gap + + let versions: Vec<_> = vec.values.iter().map(|v| v.version).collect(); + + assert_eq!(versions, vec![1, 2, 999, 1000, 1001]); + } + + #[test] + fn test_duplicate_handling() { + let mut vec = Vector::new(); + + // Insert same version+ts multiple times + vec.insert_mut(1, 1, 100); + vec.insert_mut(2, 1, 100); // Should replace + vec.insert_mut(3, 1, 100); // Should replace again + + assert_eq!(vec.values.len(), 1); + assert_eq!(vec.values[0].value, 3); + + // Verify ordering isn't affected by duplicates + vec.insert_mut(4, 1, 99); + vec.insert_mut(5, 1, 101); + + let entries: Vec<_> = vec.values.iter().map(|v| (v.ts, v.value)).collect(); + + assert_eq!(entries, vec![(99, 4), (100, 3), (101, 5),]); + } } From 64c10ce06311840c401385f96459f7576f52bb07 Mon Sep 17 00:00:00 2001 From: arriqaaq Date: Fri, 7 Feb 2025 05:46:58 +0530 Subject: [PATCH 3/3] shorten PR --- benches/vart_bench.rs | 117 +++++++++--------------------------------- 1 file changed, 24 insertions(+), 93 deletions(-) diff --git a/benches/vart_bench.rs b/benches/vart_bench.rs index 179ae7d..96f65cc 100644 --- a/benches/vart_bench.rs +++ b/benches/vart_bench.rs @@ -1,4 +1,4 @@ -use std::str::FromStr; +use std::collections::BTreeMap; use std::time::Instant; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; @@ -6,7 +6,7 @@ use rand::prelude::SliceRandom; use rand::SeedableRng; use rand::{rngs::StdRng, Rng}; -use vart::art::{QueryType, Tree}; +use vart::art::Tree; use vart::{FixedSizeKey, VariableSizeKey}; fn seeded_rng(alter: u64) -> impl Rng { @@ -40,6 +40,16 @@ pub fn seq_insert_mut(c: &mut Criterion) { }) }); + // Benchmark for BTreeMap + group.bench_function("btreemap", |b| { + let mut btree = BTreeMap::new(); + let mut key = 0u64; + b.iter(|| { + btree.insert(key, key); + key += 1; + }) + }); + group.finish(); } @@ -76,6 +86,16 @@ pub fn rand_insert_mut(c: &mut Criterion) { }) }); + // Benchmark for BTreeMap + group.bench_function("btreemap", |b| { + let mut btree = BTreeMap::new(); + let mut rng = seeded_rng(0xE080D1A42C207DAF); + b.iter(|| { + let key = &keys[rng.gen_range(0..keys.len())]; + btree.insert(key.clone(), key.clone()); + }) + }); + group.finish(); } @@ -285,7 +305,7 @@ fn variable_size_bulk_insert_mut(c: &mut Criterion) { // Test different combinations of key sizes, value sizes, and number of keys let key_sizes = vec![16, 32, 64, 128]; - let num_keys_list = vec![100_000]; + let num_keys_list = vec![100_000, 500_000, 1_000_000]; for &num_keys in &num_keys_list { for &key_size in &key_sizes { @@ -310,92 +330,6 @@ fn variable_size_bulk_insert_mut(c: &mut Criterion) { group.finish(); } -fn setup_benchmark_tree(num_keys: usize, versions_per_key: usize) -> Tree { - let mut tree = Tree::::new(); - - for key_idx in 0..num_keys { - let key = VariableSizeKey::from_str(&format!("key_{}", key_idx)).unwrap(); - for version in 0..versions_per_key { - let value = (key_idx * versions_per_key + version) as i32; - let ts = version as u64; - tree.insert_unchecked(&key, value, version as u64, ts) - .unwrap(); - } - } - - tree -} - -fn bench_query_types(c: &mut Criterion) { - let num_keys = 100; - let versions_per_key = 10_000; - let tree = setup_benchmark_tree(num_keys, versions_per_key); - let test_key = VariableSizeKey::from_str("key_50").unwrap(); - let mid_ts = (versions_per_key / 2) as u64; - - let mut group = c.benchmark_group("tree_queries"); - group.sample_size(10); // Adjust based on your needs - group.measurement_time(std::time::Duration::from_secs(5)); - - // Benchmark LatestByVersion - group.bench_with_input( - BenchmarkId::new("LatestByVersion", mid_ts), - &mid_ts, - |b, &ts| { - b.iter(|| black_box(tree.get_value_by_query(&test_key, QueryType::LatestByVersion(ts)))) - }, - ); - - // Benchmark LatestByTs - group.bench_with_input(BenchmarkId::new("LatestByTs", mid_ts), &mid_ts, |b, &ts| { - b.iter(|| black_box(tree.get_value_by_query(&test_key, QueryType::LatestByTs(ts)))) - }); - - // Benchmark LastLessThanTs - group.bench_with_input( - BenchmarkId::new("LastLessThanTs", mid_ts), - &mid_ts, - |b, &ts| { - b.iter(|| black_box(tree.get_value_by_query(&test_key, QueryType::LastLessThanTs(ts)))) - }, - ); - - // Benchmark LastLessOrEqualTs - group.bench_with_input( - BenchmarkId::new("LastLessOrEqualTs", mid_ts), - &mid_ts, - |b, &ts| { - b.iter(|| { - black_box(tree.get_value_by_query(&test_key, QueryType::LastLessOrEqualTs(ts))) - }) - }, - ); - - // Benchmark FirstGreaterThanTs - group.bench_with_input( - BenchmarkId::new("FirstGreaterThanTs", mid_ts), - &mid_ts, - |b, &ts| { - b.iter(|| { - black_box(tree.get_value_by_query(&test_key, QueryType::FirstGreaterThanTs(ts))) - }) - }, - ); - - // Benchmark FirstGreaterOrEqualTs - group.bench_with_input( - BenchmarkId::new("FirstGreaterOrEqualTs", mid_ts), - &mid_ts, - |b, &ts| { - b.iter(|| { - black_box(tree.get_value_by_query(&test_key, QueryType::FirstGreaterOrEqualTs(ts))) - }) - }, - ); - - group.finish(); -} - criterion_group!(delete_benches, seq_delete, rand_delete); criterion_group!( insert_benches, @@ -408,13 +342,10 @@ criterion_group!( criterion_group!(read_benches, seq_get, rand_get, rand_get_str); criterion_group!(iter_benches, iter_benchmark); criterion_group!(range_benches, range_benchmark); -criterion_group!(ts_benches, bench_query_types); - criterion_main!( insert_benches, read_benches, delete_benches, iter_benches, - range_benches, - ts_benches + range_benches );