From 0a91c4ebeb51e6019a4e9a8ab84dc3c7b928cbcc Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 26 Feb 2024 14:00:32 +0900 Subject: [PATCH] Bugfix: validation of delta before applying them. Due to the nature UDP, the existence of resets and the fact that we are gossipping to several nodes at the same time, it is possible for our obsolete deltas to arrive. This PR adds some validation to detect if the delta is valid, and whether it will bring us to a better state or not. It also removes the nodes to reset information, which was actually taking a large amount of the MTU on large clusters. (For 20 nodes, around 1KB) Reset is now just expressed by sending the delta with `from_version = 0`. Closes #129 --- chitchat/src/delta.rs | 44 +++--- chitchat/src/message.rs | 15 +- chitchat/src/state.rs | 340 +++++++++++++++++++++++++++++++--------- 3 files changed, 297 insertions(+), 102 deletions(-) diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index b914a6e..979388a 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -30,7 +30,7 @@ impl Delta { std::iter::once(DeltaOpRef::Node { chitchat_id: &node_delta.chitchat_id, last_gc_version: node_delta.last_gc_version, - from_version: node_delta.from_version, + from_version_excluded: node_delta.from_version_excluded, }) .chain(node_delta.key_values.iter().map(|(key, versioned_value)| { DeltaOpRef::KeyValue { @@ -46,7 +46,7 @@ enum DeltaOp { Node { chitchat_id: ChitchatId, last_gc_version: Version, - from_version: u64, + from_version_excluded: u64, }, KeyValue { key: String, @@ -58,7 +58,7 @@ enum DeltaOpRef<'a> { Node { chitchat_id: &'a ChitchatId, last_gc_version: Version, - from_version: u64, + from_version_excluded: u64, }, KeyValue { key: &'a str, @@ -104,7 +104,7 @@ impl Deserializable for DeltaOp { Ok(DeltaOp::Node { chitchat_id, last_gc_version, - from_version, + from_version_excluded: from_version, }) } DeltaOpTag::KeyValue => { @@ -133,11 +133,11 @@ impl DeltaOp { DeltaOp::Node { chitchat_id, last_gc_version, - from_version, + from_version_excluded: from_version, } => DeltaOpRef::Node { chitchat_id, last_gc_version: *last_gc_version, - from_version: *from_version, + from_version_excluded: *from_version, }, DeltaOp::KeyValue { key, @@ -166,7 +166,7 @@ impl<'a> Serializable for DeltaOpRef<'a> { Self::Node { chitchat_id, last_gc_version, - from_version, + from_version_excluded: from_version, } => { buf.push(DeltaOpTag::Node.into()); chitchat_id.serialize(buf); @@ -191,7 +191,7 @@ impl<'a> Serializable for DeltaOpRef<'a> { Self::Node { chitchat_id, last_gc_version, - from_version, + from_version_excluded: from_version, } => { chitchat_id.serialized_len() + last_gc_version.serialized_len() @@ -261,7 +261,7 @@ impl Delta { self.node_deltas.push(NodeDelta { chitchat_id, last_gc_version, - from_version, + from_version_excluded: from_version, key_values: Vec::new(), }); } @@ -319,7 +319,7 @@ pub(crate) struct NodeDelta { // // Inspect the code in `prepare_apply_delta(..)` to see the rules on how `from_version` // and `last_gc_version` are used. - pub from_version: Version, + pub from_version_excluded: Version, pub last_gc_version: Version, pub key_values: Vec<(String, VersionedValue)>, } @@ -350,7 +350,7 @@ impl DeltaBuilder { DeltaOp::Node { chitchat_id, last_gc_version, - from_version, + from_version_excluded, } => { self.flush(); anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id)); @@ -358,7 +358,7 @@ impl DeltaBuilder { self.current_node_delta = Some(NodeDelta { chitchat_id, last_gc_version, - from_version, + from_version_excluded, key_values: Vec::new(), }); } @@ -452,7 +452,7 @@ impl DeltaSerializer { let new_node_op = DeltaOp::Node { chitchat_id, last_gc_version, - from_version, + from_version_excluded: from_version, }; self.try_add_op(new_node_op) } @@ -531,7 +531,7 @@ mod tests { #[test] fn test_delta_serialization_simple_node() { // 1 bytes (End tag) - let mut delta_writer = DeltaSerializer::with_mtu(128); + let mut delta_writer = DeltaSerializer::with_mtu(140); // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); @@ -608,9 +608,9 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag) // = 155 - assert!(delta_writer.try_add_node(node2, 0u64, false)); + assert!(delta_writer.try_add_node(node2, 0u64, 0)); // The block got compressed. - test_aux_delta_writer(delta_writer, 85); + test_aux_delta_writer(delta_writer, 80); } #[test] @@ -620,7 +620,7 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(delta_writer.try_add_node(node1, 0, false)); + assert!(delta_writer.try_add_node(node1, 0, 0)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -643,7 +643,7 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(!delta_writer.try_add_node(node2, 0u64, false)); + assert!(!delta_writer.try_add_node(node2, 0u64, 1u64)); // The block got compressed. test_aux_delta_writer(delta_writer, 72); @@ -658,7 +658,7 @@ mod tests { // + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag) // = 40 - assert!(delta_writer.try_add_node(node1, 0u64, false)); + assert!(delta_writer.try_add_node(node1, 0u64, 1u64)); // +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag) // = 67 @@ -681,7 +681,7 @@ mod tests { tombstone: None, } )); - test_aux_delta_writer(delta_writer, 64); + test_aux_delta_writer(delta_writer, 72); } #[test] @@ -690,7 +690,7 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(62); let node1 = ChitchatId::for_local_test(10_001); - assert!(delta_writer.try_add_node(node1, 0u64, false)); + assert!(delta_writer.try_add_node(node1, 0u64, 1u64)); assert!(delta_writer.try_add_kv( "key11", @@ -728,6 +728,6 @@ mod tests { num_valid_tags += 1; } } - assert_eq!(num_valid_tags, 3); + assert_eq!(num_valid_tags, 2); } } diff --git a/chitchat/src/message.rs b/chitchat/src/message.rs index c040454..aaf1b7f 100644 --- a/chitchat/src/message.rs +++ b/chitchat/src/message.rs @@ -162,15 +162,16 @@ mod tests { // 4 bytes let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); - // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), 0u64, false); + // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes + // (last_gc_version) + 8 bytes (from_version). + delta.add_node(node.clone(), 0u64, 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); - delta.set_serialized_len(62); + delta.set_serialized_len(60); let syn_ack = ChitchatMessage::SynAck { digest, delta }; // 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta). - test_serdeser_aux(&syn_ack, 108); + test_serdeser_aux(&syn_ack, 106); } } @@ -186,12 +187,12 @@ mod tests { let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), 0u64, false); + delta.add_node(node.clone(), 0u64, 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); - delta.set_serialized_len(62); + delta.set_serialized_len(60); let ack = ChitchatMessage::Ack { delta }; - test_serdeser_aux(&ack, 63); + test_serdeser_aux(&ack, 61); } } diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 9ba539d..7f02ca2 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -12,7 +12,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio::time::Instant; -use tracing::{error, info, warn}; +use tracing::{info, warn}; use crate::delta::{Delta, DeltaSerializer, NodeDelta}; use crate::digest::{Digest, NodeDigest}; @@ -102,13 +102,13 @@ impl NodeState { // state is not modified. #[must_use] fn prepare_apply_delta(&mut self, node_delta: &NodeDelta) -> bool { - if node_delta.from_version > self.max_version { + if node_delta.from_version_excluded > self.max_version { // This delta is coming from the future. // We probably experienced a reset and this delta is not usable for us anymore. // This is not a bug, it can happen, but we just need to ignore it! info!( node=?node_delta.chitchat_id, - from_version=node_delta.from_version, + from_version=node_delta.from_version_excluded, last_gc_version=node_delta.last_gc_version, current_last_gc_version=self.last_gc_version, "received delta from the future, ignoring it" @@ -125,30 +125,64 @@ impl NodeState { // This delta might be missing tombstones with version within // (`node_state.max_version`..`last_gc_version`]. // - // It is ok if we never received the associated tombstones to begin + // It is ok if we don't have the associated values to begin // with. if self.last_gc_version >= node_delta.last_gc_version { return true; } - if node_delta.from_version == 0 { - // We are out of sync. This delta is an invitation to `reset` our state. - info!( + if node_delta.from_version_excluded > 0 { + warn!( node=?node_delta.chitchat_id, + from_version=node_delta.from_version_excluded, last_gc_version=node_delta.last_gc_version, current_last_gc_version=self.last_gc_version, - "resetting node"); - *self = NodeState::new(node_delta.chitchat_id.clone(), self.listeners.clone()); - self.last_gc_version = node_delta.last_gc_version; - true - } else { - error!( + "received an inapplicable delta, ignoring it"); + } + + let Some(delta_max_version) = node_delta.key_values.iter().map(|(_, v)| v.version).max() + else { + return false; + }; + + if delta_max_version <= self.max_version() { + // There is not point apply in this delta as it is not bringing us to a newer state. + warn!( node=?node_delta.chitchat_id, - from_version=node_delta.from_version, + from_version=node_delta.from_version_excluded, last_gc_version=node_delta.last_gc_version, current_last_gc_version=self.last_gc_version, - "received an invalid delta and ignoring it, please report"); - false + "received a delta that does not bring us to a fresher state, ignoring it"); + return false; + } + + // We are out of sync. This delta is an invitation to `reset` our state. + info!( + node=?node_delta.chitchat_id, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "resetting node"); + *self = NodeState::new(node_delta.chitchat_id.clone(), self.listeners.clone()); + self.last_gc_version = node_delta.last_gc_version; + true + } + + fn apply_delta(&mut self, node_delta: NodeDelta) { + if !self.prepare_apply_delta(&node_delta) { + return; + } + let current_max_version = self.max_version(); + for (key, versioned_value) in node_delta.key_values { + if versioned_value.tombstone.is_some() { + // We don't want to keep any tombstone before `last_gc_version`. + if versioned_value.version <= self.last_gc_version { + continue; + } + } + if versioned_value.version <= current_max_version { + continue; + } + self.set_versioned_value(key, versioned_value); } } @@ -400,13 +434,7 @@ impl ClusterState { // Apply delta. for node_delta in delta.node_deltas { let node_state = self.node_state_mut(&node_delta.chitchat_id); - if !node_state.prepare_apply_delta(&node_delta) { - continue; - } - for (key, versioned_value) in node_delta.key_values { - node_state.max_version = node_state.max_version.max(versioned_value.version); - node_state.set_versioned_value(key, versioned_value); - } + node_state.apply_delta(node_delta); } } @@ -451,7 +479,7 @@ impl ClusterState { // We have garbage collected some tombstones that the other node does not know about // yet. A reset is needed. let should_reset = node_digest.max_version < node_state.last_gc_version; - let from_version = if should_reset { + let from_version_excluded = if should_reset { warn!( "Node to reset {chitchat_id:?} last gc version: {} max version: {}", node_state.last_gc_version, node_digest.max_version @@ -460,7 +488,7 @@ impl ClusterState { } else { node_digest.max_version }; - stale_nodes.offer(chitchat_id, node_state, from_version); + stale_nodes.offer(chitchat_id, node_state, from_version_excluded); } let mut delta_serializer = DeltaSerializer::with_mtu(mtu); @@ -468,7 +496,7 @@ impl ClusterState { if !delta_serializer.try_add_node( stale_node.chitchat_id.clone(), stale_node.node_state.last_gc_version, - stale_node.from_version, + stale_node.from_version_exclued, ) { break; }; @@ -557,33 +585,23 @@ fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option SortedStaleNodes<'a> { - // /// Adds a node to the list of stale nodes. - // fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) { - // let Some(staleness) = staleness_score(node_state, 0u64) else { - // // The node does not have any stale key values. - // return; - // }; - // let stale_node = StaleNode { - // chitchat_id, - // node_state, - // from_version: 0u64, - // }; - // self.stale_nodes - // .entry(staleness) - // .or_default() - // .push(stale_node); - // } - - /// Evaluates whether the node should be added to the list of stale nodes. - fn offer(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState, from_version: u64) { - let Some(staleness) = staleness_score(node_state, from_version) else { + /// Adds a to the list of stale nodes. + /// If the node is not stale (meaning we have no fresher Key Values to share), then this + /// function simply returns. + fn offer( + &mut self, + chitchat_id: &'a ChitchatId, + node_state: &'a NodeState, + from_version_excluded: u64, + ) { + let Some(staleness) = staleness_score(node_state, from_version_excluded) else { // The node does not have any stale KV. return; }; let stale_node = StaleNode { chitchat_id, node_state, - from_version, + from_version_exclued: from_version_excluded, }; self.stale_nodes .entry(staleness) @@ -611,14 +629,14 @@ impl<'a> SortedStaleNodes<'a> { struct StaleNode<'a> { chitchat_id: &'a ChitchatId, node_state: &'a NodeState, - from_version: u64, + from_version_exclued: u64, } impl<'a> StaleNode<'a> { /// Iterates over the stale key-value pairs in decreasing order of staleness. fn stale_key_values(&self) -> impl Iterator { self.node_state - .stale_key_values(self.from_version) + .stale_key_values(self.from_version_exclued) .sorted_unstable_by_key(|(_, versioned_value)| versioned_value.version) } } @@ -679,8 +697,7 @@ mod tests { let stale_node = StaleNode { chitchat_id: &node, node_state: &node_state, - floor_version: 0, - need_reset: false, + from_version_exclued: 0u64, }; assert!(stale_node.stale_key_values().next().is_none()); } @@ -700,8 +717,7 @@ mod tests { let stale_node = StaleNode { chitchat_id: &node, node_state: &node_state, - floor_version: 1, - need_reset: false, + from_version_exclued: 1u64, }; assert_eq!( stale_node.stale_key_values().collect::>(), @@ -730,12 +746,12 @@ mod tests { // No stale KV. We still insert the node! // That way it will get a node state, and be a candidate for gossip later. let node_state1 = NodeState::for_test(); - stale_nodes.insert(&node1, &node_state1); + stale_nodes.offer(&node1, &node_state1, 0u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); let mut node2_state = NodeState::for_test(); node2_state.set_with_version("key_a", "value_a", 1); - stale_nodes.insert(&node2, &node2_state); + stale_nodes.offer(&node2, &node2_state, 0u64); let expected_staleness = Staleness { is_unknown: true, max_version: 0, @@ -747,7 +763,7 @@ mod tests { node3_state.set_with_version("key_b", "value_b", 2); node3_state.set_with_version("key_c", "value_c", 3); - stale_nodes.insert(&node3, &node3_state); + stale_nodes.offer(&node3, &node3_state, 0u64); let expected_staleness = Staleness { is_unknown: true, max_version: 3, @@ -762,7 +778,7 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); let node1_state = NodeState::for_test(); - stale_nodes.offer(&node1, &node1_state, &NodeDigest::new(Heartbeat(0), 1)); + stale_nodes.offer(&node1, &node1_state, 1u64); // No stale records. This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); @@ -771,7 +787,7 @@ mod tests { node2_state .key_values .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - stale_nodes.offer(&node2, &node2_state, &NodeDigest::new(Heartbeat(0), 1)); + stale_nodes.offer(&node2, &node2_state, 1u64); // No stale records (due to the floor version). This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); @@ -786,7 +802,7 @@ mod tests { node3_state .key_values .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node3, &node3_state, &NodeDigest::new(Heartbeat(0), 1)); + stale_nodes.offer(&node3, &node3_state, 1u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); let expected_staleness = Staleness { is_unknown: false, @@ -810,7 +826,7 @@ mod tests { node_state1 .key_values .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node1, &node_state1, &NodeDigest::new(Heartbeat(0), 1)); + stale_nodes.offer(&node1, &node_state1, 1u64); // 2 stale values. let node2 = ChitchatId::for_local_test(10_002); @@ -824,7 +840,7 @@ mod tests { node_state2 .key_values .insert("key_c".to_string(), VersionedValue::for_test("value_c", 5)); - stale_nodes.offer(&node2, &node_state2, &NodeDigest::new(Heartbeat(0), 2)); + stale_nodes.offer(&node2, &node_state2, 2u64); // 1 stale value. let node3 = ChitchatId::for_local_test(10_003); @@ -838,7 +854,7 @@ mod tests { node_state3 .key_values .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node3, &node_state3, &NodeDigest::new(Heartbeat(0), 7)); + stale_nodes.offer(&node3, &node_state3, 7u64); // 0 stale values. let node4 = ChitchatId::for_local_test(10_004); @@ -855,12 +871,12 @@ mod tests { node_state4 .key_values .insert("key_d".to_string(), VersionedValue::for_test("value_d", 7)); - stale_nodes.offer(&node4, &node_state4, &NodeDigest::new(Heartbeat(0), 1)); + stale_nodes.offer(&node4, &node_state4, 1); // 3 stale values let node5 = ChitchatId::for_local_test(10_005); let node_state5 = NodeState::for_test(); - stale_nodes.insert(&node5, &node_state5); + stale_nodes.offer(&node5, &node_state5, 0); // 0 stale values let node6 = ChitchatId::for_local_test(10_006); @@ -868,7 +884,7 @@ mod tests { node_state6 .key_values .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - stale_nodes.insert(&node6, &node_state6); + stale_nodes.offer(&node6, &node_state6, 0u64); // 1 stale values assert_eq!( @@ -1075,7 +1091,7 @@ mod tests { delta.add_kv(&node1, "key_b", "2", 2, false); // We reset node 2 - delta.add_node(node2.clone(), 0u64, true); + delta.add_node(node2.clone(), 3, 0); delta.add_kv(&node2, "key_d", "4", 4, false); cluster_state.apply_delta(delta); @@ -1285,7 +1301,7 @@ mod tests { let mut expected_delta = Delta::default(); expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node(node1.clone(), 0u64, 0u64); + expected_delta.add_node(node1.clone(), 0u64, 1u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); expected_delta.set_serialized_len(76); assert_eq!(delta, expected_delta); @@ -1309,10 +1325,10 @@ mod tests { let mut expected_delta = Delta::default(); expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node(node1.clone(), 0u64, false); + expected_delta.add_node(node1.clone(), 0u64, 1u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); expected_delta.add_kv(&node1, "key_a", "", 3, true); - expected_delta.set_serialized_len(91); + expected_delta.set_serialized_len(90); assert_eq!(delta, expected_delta); } @@ -1321,7 +1337,7 @@ mod tests { tokio::time::advance(DELETE_GRACE_PERIOD).await; cluster_state .node_state_mut(&node1) - .gc_keys_marked_for_deletion(Duration::from_secs(10)); + .gc_keys_marked_for_deletion(DELETE_GRACE_PERIOD); { let mut digest = Digest::default(); @@ -1332,11 +1348,12 @@ mod tests { &HashSet::new(), ); let mut expected_delta = Delta::default(); - expected_delta.add_node(node2.clone(), 0u64, true); + expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node(node1.clone(), 3u64, true); + // Last gc set to 3 and from version to 0. That's a reset right there. + expected_delta.add_node(node1.clone(), 3u64, 0u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.set_serialized_len(83); + expected_delta.set_serialized_len(75); assert_eq!(&delta, &expected_delta); } } @@ -1358,4 +1375,181 @@ mod tests { .collect(); assert_eq!(node_states, &["Europe:", "Europe:Italy"]); } + + #[test] + fn test_node_apply_delta_simple() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 1); + node_state.set_with_version("key_b", "val_a", 2); + let node_delta = NodeDelta { + chitchat_id: node_state.node_id.clone(), + from_version_excluded: 2, + last_gc_version: 0u64, + key_values: vec![ + ( + "key_c".to_string(), + VersionedValue { + value: "val_c".to_string(), + version: 4, + tombstone: None, + }, + ), + ( + "key_b".to_string(), + VersionedValue { + value: "val_b2".to_string(), + version: 3, + tombstone: None, + }, + ), + ], + }; + node_state.apply_delta(node_delta); + assert_eq!(node_state.num_key_values(), 3); + assert_eq!(node_state.max_version(), 4); + assert_eq!(node_state.last_gc_version, 0); + assert_eq!(node_state.get("key_a").unwrap(), "val_a"); + assert_eq!(node_state.get("key_b").unwrap(), "val_b2"); + assert_eq!(node_state.get("key_c").unwrap(), "val_c"); + } + + // Here we check that the accessor that dismiss resetting a Kv to the same value is not + // used in apply delta. Resetting to the same value is very possible in reality several updates + // happened in a row but were shadowed by the scuttlebutt logic. We DO need to update the + // version. + #[test] + fn test_node_apply_same_value_different_version() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 1); + let node_delta = NodeDelta { + chitchat_id: node_state.node_id.clone(), + from_version_excluded: 1, + last_gc_version: 0, + key_values: vec![( + "key_a".to_string(), + VersionedValue { + value: "val_a".to_string(), + version: 3, + tombstone: None, + }, + )], + }; + node_state.apply_delta(node_delta); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 3); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "val_a"); + } + + #[test] + fn test_node_skip_delta_from_the_future() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 5); + assert_eq!(node_state.max_version(), 5); + let node_delta = NodeDelta { + chitchat_id: node_state.node_id.clone(), + from_version_excluded: 6, // we skipped version 6 here. + last_gc_version: 0, + key_values: vec![( + "key_a".to_string(), + VersionedValue { + value: "new_val".to_string(), + version: 7, + tombstone: None, + }, + )], + }; + node_state.apply_delta(node_delta); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 5); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "val_a"); + } + + #[tokio::test] + async fn test_node_apply_delta_different_last_gc_is_ok_if_below_max_version() { + tokio::time::pause(); + const GC_PERIOD: Duration = Duration::from_secs(10); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 17); + node_state.mark_for_deletion("key_a"); + tokio::time::advance(GC_PERIOD).await; + node_state.gc_keys_marked_for_deletion(GC_PERIOD); + assert_eq!(node_state.last_gc_version, 18); + assert_eq!(node_state.max_version(), 18); + node_state.set_with_version("key_a", "val_a", 31); + let node_delta = NodeDelta { + chitchat_id: node_state.node_id.clone(), + from_version_excluded: 31, // we skipped version 6 here. + last_gc_version: 30, + key_values: vec![( + "key_a".to_string(), + VersionedValue { + value: "new_val".to_string(), + version: 32, + tombstone: None, + }, + )], + }; + node_state.apply_delta(node_delta); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 32); + assert_eq!(node_state.max_version(), 32); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "new_val"); + } + + #[tokio::test] + async fn test_node_apply_delta_on_reset_fresher_version() { + tokio::time::pause(); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 17); + assert_eq!(node_state.max_version(), 17); + let node_delta = NodeDelta { + chitchat_id: node_state.node_id.clone(), + from_version_excluded: 0, // we skipped version 6 here. + last_gc_version: 30, + key_values: vec![( + "key_b".to_string(), + VersionedValue { + value: "val_b".to_string(), + version: 32, + tombstone: None, + }, + )], + }; + node_state.apply_delta(node_delta); + assert!(node_state.get_versioned("key_a").is_none()); + let versioned_b = node_state.get_versioned("key_b").unwrap(); + assert_eq!(versioned_b.version, 32); + } + + #[tokio::test] + async fn test_node_apply_delta_no_reset_if_older_version() { + tokio::time::pause(); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 31); + node_state.set_with_version("key_b", "val_b2", 32); + assert_eq!(node_state.max_version(), 32); + // This does look like a valid reset, but we are already at version 32. + // Let's ignore this. + let node_delta = NodeDelta { + chitchat_id: node_state.node_id.clone(), + from_version_excluded: 0, // we skipped version 6 here. + last_gc_version: 17, + key_values: vec![( + "key_b".to_string(), + VersionedValue { + value: "val_b".to_string(), + version: 30, + tombstone: None, + }, + )], + }; + node_state.apply_delta(node_delta); + assert_eq!(node_state.max_version, 32); + let versioned_b = node_state.get_versioned("key_b").unwrap(); + assert_eq!(versioned_b.version, 32); + assert_eq!(versioned_b.value, "val_b2"); + } }