diff --git a/grovedb/src/bidirectional_references.rs b/grovedb/src/bidirectional_references.rs index 1741bc16..8e817631 100644 --- a/grovedb/src/bidirectional_references.rs +++ b/grovedb/src/bidirectional_references.rs @@ -36,6 +36,8 @@ pub struct BidirectionalReference { pub flags: Option, } +/// Insert bidirectional reference at specified location performing required +/// checks and updates pub(crate) fn process_bidirectional_reference_insertion<'db, 'b, 'k, B: AsRef<[u8]>>( merk_cache: &mut MerkCache<'db, 'b, B>, path: SubtreePath<'b, B>, @@ -162,7 +164,7 @@ pub(crate) fn process_bidirectional_reference_insertion<'db, 'b, 'k, B: AsRef<[u .. } = cost_return_on_error!( &mut cost, - util::follow_reference( + util::follow_reference_once( merk_cache, path.derive_owned(), key, @@ -206,13 +208,137 @@ pub(crate) fn process_bidirectional_reference_insertion<'db, 'b, 'k, B: AsRef<[u Ok(()).wrap_with_cost(cost) } -pub(crate) fn process_update_element_with_backward_references<'db, 'b, 'k, B: AsRef<[u8]>>( - merk_cache: &mut MerkCache<'db, 'b, B>, +/// Post-processing of possible backward references relationships after +/// insertion of anything but bidirectional reference +pub(crate) fn process_update_element_with_backward_references<'db, 'b, 'c, B: AsRef<[u8]>>( + merk_cache: &'c mut MerkCache<'db, 'b, B>, + merk: MerkHandle<'db, 'c>, path: SubtreePath<'b, B>, key: &[u8], - Delta { new, old }: Delta, + delta: Delta, +) -> CostResult<(), Error> { + let mut cost = Default::default(); + + // On no changes no propagations shall happen: + if !delta.has_changed() { + return Ok(()).wrap_with_cost(cost); + } + + // If there was no overwrite we short-circuit as well: + let Some(old) = delta.old else { + return Ok(()).wrap_with_cost(cost); + }; + + match (old, delta.new) { + ( + Element::ItemWithBackwardsReferences(..) | Element::SumItemWithBackwardsReferences(..), + Element::ItemWithBackwardsReferences(..) | Element::SumItemWithBackwardsReferences(..), + ) => { + // Update with another backward references-compatible element variant, that + // means value hash propagation across backward references' chains: + cost_return_on_error!( + &mut cost, + propagate_backward_references( + merk_cache, + merk, + path.derive_owned(), + key.to_vec(), + cost_return_on_error!(&mut cost, delta.new.value_hash(&merk_cache.version)) + ) + ); + } + ( + Element::ItemWithBackwardsReferences(..) | Element::SumItemWithBackwardsReferences(..), + _, + ) => { + // Update with non backward references-compatible element, equals to cascade + // deletion of references' chains: + cost_return_on_error!( + &mut cost, + delete_backward_references_recursively( + merk_cache, + merk, + path.derive_owned(), + key.to_vec() + ) + ); + } + _ => { + // All other overwrites don't require special attention + } + } + + Ok(()).wrap_with_cost(cost) +} + +/// Recursively deletes all backward references' chains of a key if all of them +/// allow cascade deletion. +fn delete_backward_references_recursively<'db, 'b, 'c, B: AsRef<[u8]>>( + merk_cache: &'c MerkCache<'db, 'b, B>, + merk: MerkHandle<'db, 'c>, + path: SubtreePathBuilder<'b, B>, + key: Vec, ) -> CostResult<(), Error> { - todo!() + let mut cost = Default::default(); + let mut queue = VecDeque::new(); + + queue.push_back((merk, path, key)); + + // Just like with propagation we follow all references chains... + while let Some((mut current_merk, current_path, current_key)) = queue.pop_front() { + let backward_references = cost_return_on_error!( + &mut cost, + get_backward_references(&mut current_merk, ¤t_key) + ); + for (idx, backward_ref) in backward_references.into_iter() { + if !backward_ref.cascade_on_update { + return Err(Error::BidirectionalReferenceRule( + "deletion of backward references through deletion of an element requires \ + `cascade_on_update` setting" + .to_owned(), + )) + .wrap_with_cost(cost); + } + + let ResolvedReference { + target_merk: origin_bidi_merk, + target_path: origin_bidi_path, + target_key: origin_bidi_key, + .. + } = cost_return_on_error!( + &mut cost, + util::follow_reference_once( + merk_cache, + current_path.clone(), + ¤t_key, + backward_ref.inverted_reference + ) + ); + + // ... except removing backward references from meta... + cost_return_on_error!( + &mut cost, + remove_backward_reference(&mut current_merk, ¤t_key, idx) + ); + + queue.push_back((origin_bidi_merk, origin_bidi_path, origin_bidi_key)); + } + + // ... and the element altogether + cost_return_on_error!( + &mut cost, + current_merk.for_merk(|m| Element::delete( + m, + current_key, + None, + false, + false, + &merk_cache.version + )) + ); + } + + Ok(()).wrap_with_cost(cost) } /// Recursively updates all backward references' chains of a key. @@ -233,7 +359,7 @@ fn propagate_backward_references<'db, 'b, 'c, B: AsRef<[u8]>>( &mut cost, get_backward_references(&mut current_merk, ¤t_key) ); - for backward_ref in backward_references.into_iter() { + for (_, backward_ref) in backward_references.into_iter() { let ResolvedReference { target_merk: mut origin_bidi_merk, target_path: origin_bidi_path, @@ -383,7 +509,7 @@ fn add_backward_reference<'db, 'c>( fn get_backward_references<'db, 'c>( merk: &mut MerkHandle<'db, 'c>, key: &[u8], -) -> CostResult, Error> { +) -> CostResult, Error> { let mut cost = Default::default(); let (prefix, bits) = @@ -412,9 +538,9 @@ fn get_backward_references<'db, 'c>( }) ); - backward_references.push(cost_return_on_error_no_add!( - cost, - BackwardReference::deserialize(&bytes) + backward_references.push(( + idx, + cost_return_on_error_no_add!(cost, BackwardReference::deserialize(&bytes)), )); } @@ -522,9 +648,12 @@ mod tests { .unwrap() .into_iter() .map( - |BackwardReference { - inverted_reference, .. - }| inverted_reference + |( + _, + BackwardReference { + inverted_reference, .. + }, + )| inverted_reference ) .collect::>(), vec![ diff --git a/grovedb/src/element/insert.rs b/grovedb/src/element/insert.rs index 0d768c38..8c654ef0 100644 --- a/grovedb/src/element/insert.rs +++ b/grovedb/src/element/insert.rs @@ -14,12 +14,12 @@ use crate::{Element, Element::SumItem, Error, Hash}; pub struct Delta<'e> { pub new: &'e Element, - pub old: Element, + pub old: Option, } impl Delta<'_> { pub(crate) fn has_changed(&self) -> bool { - &self.old != self.new + self.old.as_ref().map(|o| o != self.new).unwrap_or(true) } } @@ -200,19 +200,14 @@ impl Element { #[cfg(feature = "full")] /// Insert an element in Merk under a key if the value is different from - /// what already exists; path should be resolved and proper Merk should - /// be loaded by this moment If transaction is not passed, the batch - /// will be written immediately. If transaction is passed, the operation - /// will be committed on the transaction commit. - /// The bool represents if we indeed inserted. - /// If the value changed we return the old element. + /// what already exists, returning delta. pub fn insert_if_changed_value<'db, S: StorageContext<'db>>( &self, merk: &mut Merk, key: &[u8], options: Option, grove_version: &GroveVersion, - ) -> CostResult<(bool, Option), Error> { + ) -> CostResult { check_grovedb_v0_with_cost!( "insert_if_changed_value", grove_version @@ -226,16 +221,16 @@ impl Element { &mut cost, Self::get_optional_from_storage(&merk.storage, key, grove_version) ); - let needs_insert = match &previous_element { - None => true, - Some(previous_element) => previous_element != self, + let delta = Delta { + new: self, + old: previous_element, }; - if !needs_insert { - Ok((false, None)).wrap_with_cost(cost) - } else { + + if delta.has_changed() { cost_return_on_error!(&mut cost, self.insert(merk, key, options, grove_version)); - Ok((true, previous_element)).wrap_with_cost(cost) } + + Ok(delta).wrap_with_cost(cost) } #[cfg(feature = "full")] @@ -515,15 +510,15 @@ mod tests { merk.commit(grove_version); - let (inserted, previous) = Element::new_item(b"value".to_vec()) + let element = Element::new_item(b"value".to_vec()); + let delta = element .insert_if_changed_value(&mut merk, b"another-key", None, grove_version) .unwrap() .expect("expected successful insertion 2"); merk.commit(grove_version); - assert!(!inserted); - assert_eq!(previous, None); + assert!(!delta.has_changed()); assert_eq!( Element::get(&merk, b"another-key", true, grove_version) .unwrap() @@ -557,13 +552,14 @@ mod tests { let batch = StorageBatch::new(); let mut merk = empty_path_merk(&*storage, &batch, &tx, grove_version); - let (inserted, previous) = Element::new_item(b"value2".to_vec()) + let element = Element::new_item(b"value2".to_vec()); + let delta = element .insert_if_changed_value(&mut merk, b"another-key", None, grove_version) .unwrap() .expect("expected successful insertion 2"); - assert!(inserted); - assert_eq!(previous, Some(Element::new_item(b"value".to_vec())),); + assert!(delta.has_changed()); + assert_eq!(delta.old, Some(Element::new_item(b"value".to_vec())),); storage .commit_multi_context_batch(batch, Some(&tx)) @@ -587,13 +583,14 @@ mod tests { .insert(&mut merk, b"mykey", None, grove_version) .unwrap() .expect("expected successful insertion"); - let (inserted, previous) = Element::new_item(b"value2".to_vec()) + let element = Element::new_item(b"value2".to_vec()); + let delta = element .insert_if_changed_value(&mut merk, b"another-key", None, grove_version) .unwrap() .expect("expected successful insertion 2"); - assert!(inserted); - assert_eq!(previous, None); + assert!(delta.has_changed()); + assert_eq!(delta.old, None); assert_eq!( Element::get(&merk, b"another-key", true, grove_version)