diff --git a/src/cache/delta_reader.rs b/src/cache/delta_reader.rs index f99c4f4..1b7937a 100644 --- a/src/cache/delta_reader.rs +++ b/src/cache/delta_reader.rs @@ -16,75 +16,89 @@ pub type SnapshotIterRange<'a> = btree_map::Range<'a, SchemaKey, Operation>; /// Read-only data provider that supports a list of snapshots on top of [`DB`]. /// Maintains total ordering and respects uncommited deletions. /// Should not write to underlying [`DB`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DeltaReader { /// Set of not commited changes in chronological order. /// Meaning that the first snapshot in the vector is the oldest and the latest is the most recent. /// If keys are equal, the value from more recent snapshot is taken. snapshots: Vec>, /// Reading finalized data from here. - db: DB, + db: Arc, } impl DeltaReader { /// Creates new [`DeltaReader`] with given [`DB`] and vector with uncommited snapshots of [`SchemaBatch`]. /// Snapshots should be in chronological order. - pub fn new(db: DB, snapshots: Vec>) -> Self { + pub fn new(db: Arc, snapshots: Vec>) -> Self { Self { snapshots, db } } /// Get a value for given [`Schema`]. If value has been deleted in uncommitted changes, returns None. - pub async fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { + pub fn get(&self, key: &impl KeyCodec) -> anyhow::Result> { // Some(Operation) means that key was touched, // but in case of deletion, we early return None // Only in case of not finding operation for a key, // we go deeper - - tokio::task::block_in_place(|| { - for snapshot in self.snapshots.iter().rev() { - if let Some(operation) = snapshot.get_operation::(key)? { - return operation.decode_value::(); - } + for snapshot in self.snapshots.iter().rev() { + if let Some(operation) = snapshot.get_operation::(key)? { + return operation.decode_value::(); } + } - self.db.get(key) - }) + self.db.get(key) + } + + /// Async version of [`DeltaReader::get`]. + pub async fn get_async( + &self, + key: &impl KeyCodec, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get(key)) } /// Get a value of the largest key written value for given [`Schema`]. - pub async fn get_largest(&self) -> anyhow::Result> { - tokio::task::block_in_place(|| { - let mut iterator = self.iter_rev::()?; - if let Some((key, value)) = iterator.next() { - let key = S::Key::decode_key(&key)?; - let value = S::Value::decode_value(&value)?; - return Ok(Some((key, value))); - } - Ok(None) - }) + pub fn get_largest(&self) -> anyhow::Result> { + let mut iterator = self.iter_rev::()?; + if let Some((key, value)) = iterator.next() { + let key = S::Key::decode_key(&key)?; + let value = S::Value::decode_value(&value)?; + return Ok(Some((key, value))); + } + Ok(None) + } + + /// Async version [`DeltaReader::get_largest`]. + pub async fn get_largest_async(&self) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get_largest::()) } /// Get the largest value in [`Schema`] that is smaller or equal given `seek_key`. - pub async fn get_prev( + pub fn get_prev( &self, seek_key: &impl SeekKeyEncoder, ) -> anyhow::Result> { let seek_key = seek_key.encode_seek_key()?; let range = ..=seek_key; - tokio::task::block_in_place(|| { - let mut iterator = self.iter_rev_range::(range)?; - if let Some((key, value)) = iterator.next() { - let key = S::Key::decode_key(&key)?; - let value = S::Value::decode_value(&value)?; - return Ok(Some((key, value))); - } - Ok(None) - }) + let mut iterator = self.iter_rev_range::(range)?; + if let Some((key, value)) = iterator.next() { + let key = S::Key::decode_key(&key)?; + let value = S::Value::decode_value(&value)?; + return Ok(Some((key, value))); + } + Ok(None) + } + + /// Async version of [`DeltaReader::get_prev`]. + pub async fn get_prev_async( + &self, + seek_key: &impl SeekKeyEncoder, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get_prev(seek_key)) } /// Get `n` keys >= `seek_key`, paginated. - pub async fn get_n_from_first_match( + pub fn get_n_from_first_match( &self, seek_key: &impl SeekKeyEncoder, n: usize, @@ -92,32 +106,39 @@ impl DeltaReader { let seek_key = seek_key.encode_seek_key()?; let range = seek_key..; - tokio::task::block_in_place(|| { - let mut iterator = self.iter_range::(range)?; - let results: Vec<(S::Key, S::Value)> = iterator - .by_ref() - .filter_map(|(key_bytes, value_bytes)| { - let key = S::Key::decode_key(&key_bytes).ok()?; - let value = S::Value::decode_value(&value_bytes).ok()?; - Some((key, value)) - }) - .take(n) - .collect(); - - let next_start_key = match iterator.next().map(|(key_bytes, _)| key_bytes) { - None => None, - Some(key_bytes) => Some(S::Key::decode_key(&key_bytes)?), - }; - - Ok(PaginatedResponse { - key_value: results, - next: next_start_key, + let mut iterator = self.iter_range::(range)?; + let results: Vec<(S::Key, S::Value)> = iterator + .by_ref() + .filter_map(|(key_bytes, value_bytes)| { + let key = S::Key::decode_key(&key_bytes).ok()?; + let value = S::Value::decode_value(&value_bytes).ok()?; + Some((key, value)) }) + .take(n) + .collect(); + + let next_start_key = match iterator.next().map(|(key_bytes, _)| key_bytes) { + None => None, + Some(key_bytes) => Some(S::Key::decode_key(&key_bytes)?), + }; + + Ok(PaginatedResponse { + key_value: results, + next: next_start_key, }) } + /// Async version of [`DeltaReader::get_n_from_first_match`]. + pub async fn get_n_from_first_match_async( + &self, + seek_key: &impl SeekKeyEncoder, + n: usize, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.get_n_from_first_match::(seek_key, n)) + } + /// Collects all key-value pairs in given range: `smallest..largest`. - pub async fn collect_in_range>( + pub fn collect_in_range>( &self, range: std::ops::Range, ) -> anyhow::Result> { @@ -125,17 +146,23 @@ impl DeltaReader { let upper_bound = range.end.encode_seek_key()?; let range = lower_bound..upper_bound; - tokio::task::block_in_place(|| { - let result = self - .iter_range::(range)? - .map(|(key, value)| { - let key = S::Key::decode_key(&key).unwrap(); - let value = S::Value::decode_value(&value).unwrap(); - (key, value) - }) - .collect(); - Ok(result) - }) + let result = self + .iter_range::(range)? + .map(|(key, value)| { + let key = S::Key::decode_key(&key).unwrap(); + let value = S::Value::decode_value(&value).unwrap(); + (key, value) + }) + .collect(); + Ok(result) + } + + /// Async version of [`DeltaReader::collect_in_range`]. + pub async fn collect_in_range_async>( + &self, + range: std::ops::Range, + ) -> anyhow::Result> { + tokio::task::block_in_place(|| self.collect_in_range(range)) } #[allow(dead_code)] @@ -381,12 +408,12 @@ mod tests { const FIELD_6: TestCompositeField = TestCompositeField(4, 2, 1); const FIELD_7: TestCompositeField = TestCompositeField(4, 3, 0); - fn open_db(dir: impl AsRef) -> DB { + fn open_db(dir: impl AsRef) -> Arc { let column_families = vec![DEFAULT_COLUMN_FAMILY_NAME, TestSchema::COLUMN_FAMILY_NAME]; let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); db_opts.create_missing_column_families(true); - DB::open(dir, "test", column_families, &db_opts).expect("Failed to open DB.") + Arc::new(DB::open(dir, "test", column_families, &db_opts).expect("Failed to open DB.")) } // Test utils @@ -459,14 +486,14 @@ mod tests { let delta_reader = DeltaReader::new(db, vec![]); - let largest = delta_reader.get_largest::().await.unwrap(); + let largest = delta_reader.get_largest_async::().await.unwrap(); assert!(largest.is_none()); let key = TestCompositeField::MAX; - let prev = delta_reader.get_prev::(&key).await.unwrap(); + let prev = delta_reader.get_prev_async::(&key).await.unwrap(); assert!(prev.is_none()); - let value_1 = delta_reader.get::(&FIELD_1).await.unwrap(); + let value_1 = delta_reader.get_async::(&FIELD_1).await.unwrap(); assert!(value_1.is_none()); let values: Vec<_> = delta_reader.iter::().unwrap().collect(); @@ -489,14 +516,14 @@ mod tests { assert!(values.is_empty()); let paginated_response = delta_reader - .get_n_from_first_match::(&TestCompositeField::MAX, 100) + .get_n_from_first_match_async::(&TestCompositeField::MAX, 100) .await .unwrap(); assert!(paginated_response.key_value.is_empty()); assert!(paginated_response.next.is_none()); let values: Vec<_> = delta_reader - .collect_in_range::( + .collect_in_range_async::( TestCompositeField::MIN..TestCompositeField::MAX, ) .await @@ -510,38 +537,38 @@ mod tests { let delta_reader = build_sample_delta_reader(tmpdir.path()); // From DB - let value = delta_reader.get::(&FIELD_4).await.unwrap(); + let value = delta_reader.get_async::(&FIELD_4).await.unwrap(); assert_eq!(Some(TestField(4)), value); // From the most recent snapshot - let value = delta_reader.get::(&FIELD_3).await.unwrap(); + let value = delta_reader.get_async::(&FIELD_3).await.unwrap(); assert_eq!(Some(TestField(3000)), value); - let value = delta_reader.get::(&FIELD_6).await.unwrap(); + let value = delta_reader.get_async::(&FIELD_6).await.unwrap(); assert_eq!(Some(TestField(6000)), value); // From middle snapshot - let value = delta_reader.get::(&FIELD_2).await.unwrap(); + let value = delta_reader.get_async::(&FIELD_2).await.unwrap(); assert_eq!(Some(TestField(200)), value); // Deleted values - let value = delta_reader.get::(&FIELD_7).await.unwrap(); + let value = delta_reader.get_async::(&FIELD_7).await.unwrap(); assert!(value.is_none()); - let value = delta_reader.get::(&FIELD_5).await.unwrap(); + let value = delta_reader.get_async::(&FIELD_5).await.unwrap(); assert!(value.is_none()); // Not found let value = delta_reader - .get::(&TestCompositeField::MIN) + .get_async::(&TestCompositeField::MIN) .await .unwrap(); assert!(value.is_none()); let value = delta_reader - .get::(&TestCompositeField::MAX) + .get_async::(&TestCompositeField::MAX) .await .unwrap(); assert!(value.is_none()); let value = delta_reader - .get::(&TestCompositeField(3, 5, 1)) + .get_async::(&TestCompositeField(3, 5, 1)) .await .unwrap(); assert!(value.is_none()); @@ -638,7 +665,7 @@ mod tests { let tmpdir = tempfile::tempdir().unwrap(); let delta_reader = build_sample_delta_reader(tmpdir.path()); - let largest = delta_reader.get_largest::().await.unwrap(); + let largest = delta_reader.get_largest_async::().await.unwrap(); assert!(largest.is_some(), "largest value is not found"); let (largest_key, largest_value) = largest.unwrap(); @@ -653,28 +680,32 @@ mod tests { // MIN, should not find anything let prev = delta_reader - .get_prev::(&TestCompositeField::MIN) + .get_prev_async::(&TestCompositeField::MIN) .await .unwrap(); assert!(prev.is_none()); // Should get the lowest value in - let prev = delta_reader.get_prev::(&FIELD_1).await.unwrap(); + let prev = delta_reader.get_prev_async::(&FIELD_1).await.unwrap(); assert!(prev.is_none()); - let prev = delta_reader.get_prev::(&FIELD_2).await.unwrap(); + let prev = delta_reader.get_prev_async::(&FIELD_2).await.unwrap(); let (prev_key, prev_value) = prev.unwrap(); assert_eq!(FIELD_2, prev_key); assert_eq!(TestField(200), prev_value); // Some value in the middle - let (prev_key, prev_value) = delta_reader.get_prev::(&FIELD_3).await.unwrap().unwrap(); + let (prev_key, prev_value) = delta_reader + .get_prev_async::(&FIELD_3) + .await + .unwrap() + .unwrap(); assert_eq!(FIELD_3, prev_key); assert_eq!(TestField(3000), prev_value); // Value in between let (prev_key, prev_value) = delta_reader - .get_prev::(&TestCompositeField(3, 5, 8)) + .get_prev_async::(&TestCompositeField(3, 5, 8)) .await .unwrap() .unwrap(); @@ -688,7 +719,7 @@ mod tests { let delta_reader = build_sample_delta_reader(tmpdir.path()); let paginated_response = delta_reader - .get_n_from_first_match::(&TestCompositeField::MIN, 2) + .get_n_from_first_match_async::(&TestCompositeField::MIN, 2) .await .unwrap(); assert_eq!(2, paginated_response.key_value.len()); @@ -697,7 +728,7 @@ mod tests { assert_eq!(Some(FIELD_4), paginated_response.next); let paginated_response = delta_reader - .get_n_from_first_match::(&FIELD_4, 2) + .get_n_from_first_match_async::(&FIELD_4, 2) .await .unwrap(); assert_eq!(2, paginated_response.key_value.len()); @@ -728,7 +759,7 @@ mod tests { for (field_range, expected_range) in test_cases { let range_values = delta_reader - .collect_in_range::(field_range.clone()) + .collect_in_range_async::(field_range.clone()) .await .unwrap(); @@ -840,9 +871,9 @@ mod tests { let rt = Runtime::new().unwrap(); let _ = rt.block_on(async { - let largest = delta_reader.get_largest::().await.unwrap(); + let largest = delta_reader.get_largest_async::().await.unwrap(); let prev = delta_reader - .get_prev::(&TestCompositeField::MAX) + .get_prev_async::(&TestCompositeField::MAX) .await; prop_assert!(prev.is_ok()); let prev = prev.unwrap(); @@ -857,11 +888,11 @@ mod tests { } for (key, expected_value) in all_kv.into_iter() { - let value = delta_reader.get::(&key).await; + let value = delta_reader.get_async::(&key).await; prop_assert!(value.is_ok()); let value = value.unwrap(); prop_assert_eq!(Some(expected_value), value); - let prev_value = delta_reader.get_prev::(&key).await; + let prev_value = delta_reader.get_prev_async::(&key).await; prop_assert!(prev_value.is_ok()); let prev_value = prev_value.unwrap(); prop_assert_eq!(Some((key, expected_value)), prev_value); @@ -893,7 +924,7 @@ mod tests { "iter_rev should be sorted in reversed order" ); - // Building a reference for all K/V for validation if basic check is passed. + // Building a reference for all K/V for validation if the basic check is passed. let mut all_kv: BTreeMap = BTreeMap::new(); for (key, value) in db_entries { all_kv.insert(key, value); @@ -911,7 +942,7 @@ mod tests { let mut next_key = Some(TestCompositeField::MIN); while let Some(actual_next_key) = next_key { let paginated_response = delta_reader - .get_n_from_first_match::(&actual_next_key, n) + .get_n_from_first_match_async::(&actual_next_key, n) .await; prop_assert!(paginated_response.is_ok()); let paginated_response = paginated_response.unwrap(); @@ -949,7 +980,7 @@ mod tests { for range in def_chopped_ranges { let range_values = delta_reader - .collect_in_range::(range) + .collect_in_range_async::(range) .await; prop_assert!(range_values.is_ok()); let range_values = range_values.unwrap(); @@ -961,7 +992,7 @@ mod tests { for range in full_ranges { let range_values = delta_reader - .collect_in_range::(range) + .collect_in_range_async::(range) .await; prop_assert!(range_values.is_ok()); let range_values = range_values.unwrap(); diff --git a/src/test.rs b/src/test.rs index e5774d6..a8e2e49 100644 --- a/src/test.rs +++ b/src/test.rs @@ -97,6 +97,12 @@ impl KeyEncoder for TestField { } } +impl SeekKeyEncoder for TestField { + fn encode_seek_key(&self) -> Result, CodecError> { + Ok(self.as_bytes()) + } +} + /// KeyPrefix over single u32 pub struct KeyPrefix1(pub u32);