diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 01efde7..f8e8021 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -1,8 +1,7 @@ use std::collections::HashSet; -use tokio::time::Instant; - use crate::serialize::*; +use crate::types::{KeyValueMutation, KeyValueMutationRef}; use crate::{ChitchatId, Version, VersionedValue}; /// A delta is the message we send to another node to update it. @@ -11,7 +10,6 @@ use crate::{ChitchatId, Version, VersionedValue}; /// encoded one after the other in a compressed stream. #[derive(Debug, Eq, PartialEq)] pub struct Delta { - pub(crate) nodes_to_reset: Vec, pub(crate) node_deltas: Vec, serialized_len: usize, } @@ -19,7 +17,6 @@ pub struct Delta { impl Default for Delta { fn default() -> Self { Delta { - nodes_to_reset: Vec::new(), node_deltas: Vec::new(), serialized_len: 1, } @@ -28,44 +25,48 @@ impl Default for Delta { impl Delta { fn get_operations(&self) -> impl Iterator> { - let nodes_to_reset_ops = self.nodes_to_reset.iter().map(DeltaOpRef::NodeToReset); - let node_deltas = self.node_deltas.iter().flat_map(|node_delta| { + self.node_deltas.iter().flat_map(|node_delta| { std::iter::once(DeltaOpRef::Node { chitchat_id: &node_delta.chitchat_id, last_gc_version: node_delta.last_gc_version, + from_version_excluded: node_delta.from_version_excluded, }) - .chain(node_delta.key_values.iter().map(|(key, versioned_value)| { - DeltaOpRef::KeyValue { - key, - versioned_value, - } - })) - }); - nodes_to_reset_ops.chain(node_deltas) + .chain( + node_delta + .key_values + .iter() + .map(|key_value_mutation| DeltaOpRef::KeyValue(key_value_mutation.into())), + ) + .chain({ + node_delta + .max_version + .map(|max_version| DeltaOpRef::SetMaxVersion { max_version }) + }) + }) } } enum DeltaOp { - NodeToReset(ChitchatId), Node { chitchat_id: ChitchatId, last_gc_version: Version, + from_version_excluded: u64, }, - KeyValue { - key: String, - versioned_value: VersionedValue, + KeyValue(KeyValueMutation), + SetMaxVersion { + max_version: Version, }, } enum DeltaOpRef<'a> { - NodeToReset(&'a ChitchatId), Node { chitchat_id: &'a ChitchatId, last_gc_version: Version, + from_version_excluded: u64, }, - KeyValue { - key: &'a str, - versioned_value: &'a VersionedValue, + KeyValue(KeyValueMutationRef<'a>), + SetMaxVersion { + max_version: Version, }, } @@ -73,7 +74,7 @@ enum DeltaOpRef<'a> { enum DeltaOpTag { Node = 0u8, KeyValue = 1u8, - NodeToReset = 2u8, + SetMaxVersion = 2u8, } impl TryFrom for DeltaOpTag { @@ -83,7 +84,6 @@ impl TryFrom for DeltaOpTag { match tag_byte { 0u8 => Ok(DeltaOpTag::Node), 1u8 => Ok(DeltaOpTag::KeyValue), - 2u8 => Ok(DeltaOpTag::NodeToReset), _ => { anyhow::bail!("Unknown tag: {tag_byte}") } @@ -102,16 +102,14 @@ impl Deserializable for DeltaOp { let tag_bytes: [u8; 1] = Deserializable::deserialize(buf)?; let tag = DeltaOpTag::try_from(tag_bytes[0])?; match tag { - DeltaOpTag::NodeToReset => { - let chitchat_id = ChitchatId::deserialize(buf)?; - Ok(DeltaOp::NodeToReset(chitchat_id)) - } DeltaOpTag::Node => { let chitchat_id = ChitchatId::deserialize(buf)?; let last_gc_version = Version::deserialize(buf)?; + let from_version_excluded = u64::deserialize(buf)?; Ok(DeltaOp::Node { chitchat_id, last_gc_version, + from_version_excluded, }) } DeltaOpTag::KeyValue => { @@ -119,16 +117,16 @@ impl Deserializable for DeltaOp { let value = String::deserialize(buf)?; let version = u64::deserialize(buf)?; let deleted = bool::deserialize(buf)?; - let tombstone = if deleted { Some(Instant::now()) } else { None }; - let versioned_value: VersionedValue = VersionedValue { + Ok(DeltaOp::KeyValue(KeyValueMutation { + key, value, version, - tombstone, - }; - Ok(DeltaOp::KeyValue { - key, - versioned_value, - }) + tombstone: deleted, + })) + } + DeltaOpTag::SetMaxVersion => { + let max_version = Version::deserialize(buf)?; + Ok(DeltaOp::SetMaxVersion { max_version }) } } } @@ -140,18 +138,18 @@ impl DeltaOp { DeltaOp::Node { chitchat_id, last_gc_version, + from_version_excluded, } => DeltaOpRef::Node { chitchat_id, last_gc_version: *last_gc_version, + from_version_excluded: *from_version_excluded, }, - DeltaOp::KeyValue { - key, - versioned_value, - } => DeltaOpRef::KeyValue { - key, - versioned_value, + DeltaOp::KeyValue(key_value_mutation) => { + DeltaOpRef::KeyValue(key_value_mutation.into()) + } + DeltaOp::SetMaxVersion { max_version } => DeltaOpRef::SetMaxVersion { + max_version: *max_version, }, - DeltaOp::NodeToReset(node_to_reset) => DeltaOpRef::NodeToReset(node_to_reset), } } } @@ -172,24 +170,20 @@ impl<'a> Serializable for DeltaOpRef<'a> { Self::Node { chitchat_id, last_gc_version, + from_version_excluded: from_version, } => { buf.push(DeltaOpTag::Node.into()); chitchat_id.serialize(buf); last_gc_version.serialize(buf); + from_version.serialize(buf); } - Self::KeyValue { - key, - versioned_value, - } => { + Self::KeyValue(key_value_mutation_ref) => { buf.push(DeltaOpTag::KeyValue.into()); - key.serialize(buf); - versioned_value.value.serialize(buf); - versioned_value.version.serialize(buf); - versioned_value.is_tombstone().serialize(buf); + key_value_mutation_ref.serialize(buf); } - Self::NodeToReset(chitchat_id) => { - buf.push(DeltaOpTag::NodeToReset.into()); - chitchat_id.serialize(buf); + Self::SetMaxVersion { max_version } => { + buf.push(DeltaOpTag::SetMaxVersion.into()); + max_version.serialize(buf); } } } @@ -199,17 +193,14 @@ impl<'a> Serializable for DeltaOpRef<'a> { Self::Node { chitchat_id, last_gc_version, - } => chitchat_id.serialized_len() + last_gc_version.serialized_len(), - Self::KeyValue { - key, - versioned_value, + from_version_excluded: from_version, } => { - key.serialized_len() - + versioned_value.value.serialized_len() - + versioned_value.version.serialized_len() - + 1 + chitchat_id.serialized_len() + + last_gc_version.serialized_len() + + from_version.serialized_len() } - Self::NodeToReset(chitchat_id) => chitchat_id.serialized_len(), + Self::KeyValue(key_value_mutation_ref) => key_value_mutation_ref.serialized_len(), + Self::SetMaxVersion { max_version } => max_version.serialized_len(), } } } @@ -252,7 +243,12 @@ impl Delta { .sum() } - pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) { + pub(crate) fn add_node( + &mut self, + chitchat_id: ChitchatId, + last_gc_version: Version, + from_version: Version, + ) { assert!(!self .node_deltas .iter() @@ -260,7 +256,9 @@ impl Delta { self.node_deltas.push(NodeDelta { chitchat_id, last_gc_version, + from_version_excluded: from_version, key_values: Vec::new(), + max_version: None, }); } @@ -277,15 +275,12 @@ impl Delta { .iter_mut() .find(|node_delta| &node_delta.chitchat_id == chitchat_id) .unwrap(); - let tombstone = if deleted { Some(Instant::now()) } else { None }; - node_delta.key_values.push(( - key.to_string(), - VersionedValue { - value: value.to_string(), - version, - tombstone, - }, - )); + node_delta.key_values.push(KeyValueMutation { + key: key.to_string(), + value: value.to_string(), + version, + tombstone: deleted, + }); } pub(crate) fn set_serialized_len(&mut self, serialized_len: usize) { @@ -297,17 +292,30 @@ impl Delta { .iter() .find(|node_delta| &node_delta.chitchat_id == chitchat_id) } - - pub(crate) fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) { - self.nodes_to_reset.push(chitchat_id); - } } #[derive(Debug, Eq, PartialEq, serde::Serialize)] pub(crate) struct NodeDelta { pub chitchat_id: ChitchatId, + // `from_version` and `last_gc_version` are here to express on which states + // this delta can be applied to. + // + // `last_gc_version` expresses that this delta has be computed from a state where no + // keys > `last_gc_version` have been removed. + // + // `from_version` expresses that from this state, ALL of the records in + // (`from_version`.. `max_version`] are present in this delta + // (where max_version is maximum version in the delta.) + // + // In other words, the only gaps in this interval are deleted key-values with a version + // <= `last gc version`. + // + // Inspect the code in `prepare_apply_delta(..)` to see the rules on how `from_version` + // and `last_gc_version` are used. + pub from_version_excluded: Version, pub last_gc_version: Version, - pub key_values: Vec<(String, VersionedValue)>, + pub key_values: Vec, + pub max_version: Option, } #[cfg(test)] @@ -336,6 +344,7 @@ impl DeltaBuilder { DeltaOp::Node { chitchat_id, last_gc_version, + from_version_excluded, } => { self.flush(); anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id)); @@ -343,34 +352,28 @@ impl DeltaBuilder { self.current_node_delta = Some(NodeDelta { chitchat_id, last_gc_version, + from_version_excluded, key_values: Vec::new(), + max_version: None, }); } - DeltaOp::KeyValue { - key, - versioned_value, - } => { + DeltaOp::KeyValue(key_value_mutation) => { let Some(current_node_delta) = self.current_node_delta.as_mut() else { anyhow::bail!("received a key-value op without a node op before."); }; - if let Some((_last_key, last_versioned_value)) = - current_node_delta.key_values.last() - { + if let Some(previous_key_value_mutation) = current_node_delta.key_values.last() { anyhow::ensure!( - last_versioned_value.version < versioned_value.version, + previous_key_value_mutation.version < key_value_mutation.version, "kv version should be increasing" ); } - current_node_delta - .key_values - .push((key.to_string(), versioned_value)); + current_node_delta.key_values.push(key_value_mutation); } - DeltaOp::NodeToReset(chitchat_id) => { - anyhow::ensure!( - self.delta.node_deltas.is_empty(), - "nodes_to_reset should be encoded before node_deltas" - ); - self.delta.nodes_to_reset.push(chitchat_id); + DeltaOp::SetMaxVersion { max_version } => { + let Some(current_node_delta) = self.current_node_delta.as_mut() else { + anyhow::bail!("received a key-value op without a node op before."); + }; + current_node_delta.max_version = Some(max_version); } } Ok(()) @@ -411,11 +414,9 @@ impl DeltaSerializer { } } - /// Returns false if the node to reset could not be added because the payload would exceed the - /// mtu. - pub fn try_add_node_to_reset(&mut self, chitchat_id: ChitchatId) -> bool { - let delta_op = DeltaOp::NodeToReset(chitchat_id); - self.try_add_op(delta_op) + pub fn try_set_max_version(&mut self, max_version: Version) -> bool { + let key_value_op = DeltaOp::SetMaxVersion { max_version }; + self.try_add_op(key_value_op) } fn try_add_op(&mut self, delta_op: DeltaOp) -> bool { @@ -433,18 +434,27 @@ impl DeltaSerializer { /// Returns false if the KV could not be added because the payload would exceed the mtu. pub fn try_add_kv(&mut self, key: &str, versioned_value: VersionedValue) -> bool { - let key_value_op = DeltaOp::KeyValue { + let key_value_mutation = KeyValueMutation { key: key.to_string(), - versioned_value, + value: versioned_value.value, + version: versioned_value.version, + tombstone: versioned_value.tombstone.is_some(), }; + let key_value_op = DeltaOp::KeyValue(key_value_mutation); self.try_add_op(key_value_op) } /// Returns false if the node could not be added because the payload would exceed the mtu. - pub fn try_add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) -> bool { + pub fn try_add_node( + &mut self, + chitchat_id: ChitchatId, + last_gc_version: Version, + from_version: Version, + ) -> bool { let new_node_op = DeltaOp::Node { chitchat_id, last_gc_version, + from_version_excluded: from_version, }; self.try_add_op(new_node_op) } @@ -457,6 +467,8 @@ impl DeltaSerializer { #[cfg(test)] mod tests { + use tokio::time::Instant; + use super::*; #[test] @@ -472,7 +484,7 @@ mod tests { // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node). - assert!(delta_writer.try_add_node(node1, 0u64)); + assert!(delta_writer.try_add_node(node1, 0u64, 0u64)); // +23 bytes: 2 bytes (key length) + 5 bytes (key) + 7 bytes (values) + 8 bytes (version) + // 1 bytes (empty tombstone). @@ -497,7 +509,7 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +37 bytes - assert!(delta_writer.try_add_node(node2, 0)); + assert!(delta_writer.try_add_node(node2, 0, 0u64)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -523,12 +535,12 @@ mod tests { #[test] fn test_delta_serialization_simple_node() { // 1 bytes (End tag) - let mut delta_writer = DeltaSerializer::with_mtu(128); + let mut delta_writer = DeltaSerializer::with_mtu(140); // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); // +37 bytes = 8 bytes (last gc version) + 27 bytes (node) + 2bytes (block length) - assert!(delta_writer.try_add_node(node1, 0)); + assert!(delta_writer.try_add_node(node1, 0, 0u64)); // +24 bytes (kv + op tag) assert!(delta_writer.try_add_kv( @@ -552,7 +564,7 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +37 bytes = 8 bytes (last gc version) + 2 bytes (empty node delta) + 27 bytes (node). - assert!(delta_writer.try_add_node(node2, 0)); + assert!(delta_writer.try_add_node(node2, 0, 0u64)); test_aux_delta_writer(delta_writer, 80); } @@ -569,13 +581,12 @@ mod tests { // +27 bytes (ChitchatId) + 1 (op tag) + 3 bytes (block len) // = 32 bytes - assert!(delta_writer.try_add_node_to_reset(ChitchatId::for_local_test(10_000))); let node1 = ChitchatId::for_local_test(10_001); // +8 bytes (last gc version) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic // new block) = 71 - assert!(delta_writer.try_add_node(node1, 0u64)); + assert!(delta_writer.try_add_node(node1, 0u64, 0u64)); // +23 bytes (kv) + 1 (op tag) // = 95 @@ -601,9 +612,9 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag) // = 155 - assert!(delta_writer.try_add_node(node2, 0u64)); + assert!(delta_writer.try_add_node(node2, 0u64, 0)); // The block got compressed. - test_aux_delta_writer(delta_writer, 85); + test_aux_delta_writer(delta_writer, 80); } #[test] @@ -613,7 +624,7 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(delta_writer.try_add_node(node1, 0)); + assert!(delta_writer.try_add_node(node1, 0, 0)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -636,42 +647,7 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(!delta_writer.try_add_node(node2, 0u64)); - - // The block got compressed. - test_aux_delta_writer(delta_writer, 72); - } - - #[test] - fn test_delta_serialization_exceed_mtu_on_add_node_to_reset() { - // 4 bytes. - let mut delta_writer = DeltaSerializer::with_mtu(100); - - let node1 = ChitchatId::for_local_test(10_001); - // +37 bytes. - assert!(delta_writer.try_add_node(node1, 0u64)); - - // +23 bytes. - assert!(delta_writer.try_add_kv( - "key11", - VersionedValue { - value: "val11".to_string(), - version: 1, - tombstone: None, - } - )); - // +23 bytes. - assert!(delta_writer.try_add_kv( - "key12", - VersionedValue { - value: "val12".to_string(), - version: 2, - tombstone: None, - } - )); - - let node2 = ChitchatId::for_local_test(10_002); - assert!(!delta_writer.try_add_node_to_reset(node2)); + assert!(!delta_writer.try_add_node(node2, 0u64, 1u64)); // The block got compressed. test_aux_delta_writer(delta_writer, 72); @@ -686,7 +662,7 @@ mod tests { // + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag) // = 40 - assert!(delta_writer.try_add_node(node1, 0u64)); + assert!(delta_writer.try_add_node(node1, 0u64, 1u64)); // +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag) // = 67 @@ -709,7 +685,7 @@ mod tests { tombstone: None, } )); - test_aux_delta_writer(delta_writer, 64); + test_aux_delta_writer(delta_writer, 72); } #[test] @@ -718,7 +694,7 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(62); let node1 = ChitchatId::for_local_test(10_001); - assert!(delta_writer.try_add_node(node1, 0u64)); + assert!(delta_writer.try_add_node(node1, 0u64, 1u64)); assert!(delta_writer.try_add_kv( "key11", @@ -756,6 +732,6 @@ mod tests { num_valid_tags += 1; } } - assert_eq!(num_valid_tags, 3); + assert_eq!(num_valid_tags, 2); } } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 79ba476..7f93a5c 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -109,7 +109,10 @@ impl Chitchat { /// Executes the catch-up callback if necessary. fn maybe_trigger_catchup_callback(&self, delta: &Delta) { - if !delta.nodes_to_reset.is_empty() { + let has_reset = delta.node_deltas.iter().any(|node_delta| { + node_delta.from_version_excluded == 0 && node_delta.last_gc_version > 0 + }); + if has_reset { if let Some(catchup_callback) = &self.config.catchup_callback { info!("executing catch-up callback"); catchup_callback(); @@ -1046,7 +1049,8 @@ mod tests { node.process_delta(delta); let mut delta = Delta::default(); - delta.add_node_to_reset(ChitchatId::for_local_test(10_002)); + let chitchat_id = ChitchatId::for_local_test(10_002); + delta.add_node(chitchat_id, 1000u64, 0u64); node.process_delta(delta); assert_eq!(catchup_callback_counter.load(Ordering::Acquire), 1); diff --git a/chitchat/src/message.rs b/chitchat/src/message.rs index b79e67b..b0b74a6 100644 --- a/chitchat/src/message.rs +++ b/chitchat/src/message.rs @@ -162,15 +162,19 @@ mod tests { // 4 bytes let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); - // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), 0u64); + // +27 bytes (ChitchatId) + // + 2 bytes (node delta len) + // + 8 bytes (last_gc_version) + // + 8 bytes (from_version). + delta.add_node(node.clone(), 0u64, 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); - delta.set_serialized_len(62); + // That's compression kicking in. + delta.set_serialized_len(60); let syn_ack = ChitchatMessage::SynAck { digest, delta }; // 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta). - test_serdeser_aux(&syn_ack, 108); + test_serdeser_aux(&syn_ack, 1 + 60 + 45); } } @@ -186,12 +190,12 @@ mod tests { let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), 0u64); + delta.add_node(node.clone(), 0u64, 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); - delta.set_serialized_len(62); + delta.set_serialized_len(60); let ack = ChitchatMessage::Ack { delta }; - test_serdeser_aux(&ack, 63); + test_serdeser_aux(&ack, 1 + 60); } } diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 9fddbce..81bc446 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -578,6 +578,12 @@ mod tests { let server_handle = spawn_chitchat(server_config, Vec::new(), &transport) .await .unwrap(); + server_handle + .chitchat() + .lock() + .await + .self_node_state() + .set("key", "value"); // Add our test socket to the server's nodes. server_handle diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index a7d171a..ca5383b 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -12,7 +12,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio::time::Instant; -use tracing::warn; +use tracing::{info, warn}; use crate::delta::{Delta, DeltaSerializer, NodeDelta}; use crate::digest::{Digest, NodeDigest}; @@ -24,14 +24,29 @@ pub struct NodeState { chitchat_id: ChitchatId, heartbeat: Heartbeat, key_values: BTreeMap, - max_version: Version, #[serde(skip)] listeners: Listeners, + max_version: Version, // This is the maximum version of the last tombstone GC. // - // After we GC the tombstones, we cannot safely do replication with - // nodes that are asking for a diff from a version lower than this. + // Due to the garbage collection of tombstones, we cannot + // safely do replication with nodes that are asking for a + // diff from a version lower than this. + // + // `last_gc_version` expresses the idea: what is the oldest version from which I can + // confidently emit delta from. The reason why we update it here, is + // because a node that was just reset or just joined the cluster will get updates + // from another node that are actually only make sense in the context of the + // emission of delta from a `last_gc_version`. last_gc_version: Version, + // A proper interpretation of `max_version` and `last_gc_version` is the following: + // The state contains exactly: + // - all of the (non-deleted) key values present at snapshot `max_version`. + // - all of the tombstones of the entry that were deleted between (`last_gc_version`, + // `max_version]`. + // + // It does not contain any trace of the tombstones of the entries that were deleted before + // `<= last_gc_version`. } impl Debug for NodeState { @@ -104,6 +119,123 @@ impl NodeState { .map(|(key, versioned_value)| (key, versioned_value.value.as_str())) } + pub fn set_max_version(&mut self, max_version: Version) { + self.max_version = max_version; + } + + // Prepare the node state to receive a delta. + // Returns `true` if the delta can be applied. In that case, the node state may be mutated (if a + // reset is required) Returns `false` if the delta cannot be applied. In that case, the node + // state is not modified. + #[must_use] + fn prepare_apply_delta(&mut self, node_delta: &NodeDelta) -> bool { + if node_delta.from_version_excluded > self.max_version { + // This delta is coming from the future. + // We probably experienced a reset and this delta is not usable for us anymore. + // This is not a bug, it can happen, but we just need to ignore it! + info!( + node=?node_delta.chitchat_id, + from_version=node_delta.from_version_excluded, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "received delta from the future, ignoring it" + ); + return false; + } + + if self.max_version > node_delta.last_gc_version { + // The GCed tombstone have all been already received. + // We won't miss anything by applying the delta! + return true; + } + + // This delta might be missing tombstones with version within + // (`node_state.max_version`..`last_gc_version`]. + // + // It is ok if we don't have the associated values to begin + // with. + if self.last_gc_version >= node_delta.last_gc_version { + return true; + } + + if node_delta.from_version_excluded > 0 { + warn!( + node=?node_delta.chitchat_id, + from_version=node_delta.from_version_excluded, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "received an inapplicable delta, ignoring it"); + } + + let Some(delta_max_version) = node_delta + .key_values + .iter() + .map(|key_value_mutation| key_value_mutation.version) + .max() + .or(node_delta.max_version) + else { + // This can happen if we just hit the mtu at the moment + // of writing the SetMaxVersion operation. + return false; + }; + + if delta_max_version <= self.max_version() { + // There is not point apply in this delta as it is not bringing us to a newer state. + warn!( + node=?node_delta.chitchat_id, + from_version=node_delta.from_version_excluded, + delta_max_version=delta_max_version, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "received a delta that does not bring us to a fresher state, ignoring it"); + return false; + } + + // We are out of sync. This delta is an invitation to `reset` our state. + info!( + node=?node_delta.chitchat_id, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "resetting node"); + *self = NodeState::new(node_delta.chitchat_id.clone(), self.listeners.clone()); + if let Some(max_version) = node_delta.max_version { + self.max_version = max_version; + } + // We need to reset our `last_gc_version`. + self.last_gc_version = node_delta.last_gc_version; + true + } + + fn apply_delta(&mut self, node_delta: NodeDelta, now: Instant) { + info!(delta=?node_delta); + if !self.prepare_apply_delta(&node_delta) { + return; + } + let current_max_version = self.max_version(); + for key_value_mutation in node_delta.key_values { + if key_value_mutation.version <= current_max_version { + // We already know about this KV. + continue; + } + if key_value_mutation.tombstone { + // We don't want to keep any tombstone before `last_gc_version`. + if key_value_mutation.version <= self.last_gc_version { + continue; + } + } + let versioned_value = VersionedValue { + value: key_value_mutation.value, + version: key_value_mutation.version, + tombstone: if key_value_mutation.tombstone { + Some(now) + } else { + None + }, + }; + self.set_versioned_value(key_value_mutation.key, versioned_value); + } + } + /// Returns key values matching a prefix pub fn iter_prefix<'a>( &'a self, @@ -344,53 +476,11 @@ impl ClusterState { } pub(crate) fn apply_delta(&mut self, delta: Delta) { - // Remove nodes to reset. - if !delta.nodes_to_reset.is_empty() { - tracing::info!(nodes_to_reset=?delta.nodes_to_reset, "nodes to reset"); - } - - // Clearing the node of the states to reset. - for node_to_reset in delta.nodes_to_reset { - // We don't want to remove the entire state here: the node could be alive and the - // live watcher panics if the state is missing. - if let Some(node_state) = self.node_states.get_mut(&node_to_reset) { - *node_state = NodeState::new(node_to_reset, self.listeners.clone()); - } - } - + let now = Instant::now(); // Apply delta. for node_delta in delta.node_deltas { - let NodeDelta { - chitchat_id, - last_gc_version, - key_values, - } = node_delta; - let node_state = self.node_state_mut(&chitchat_id); - if node_state.last_gc_version == 0u64 { - // You may have expected `node_state.last_gc_version = max(last_gc_version, - // node_state.last_gc_version)`. This is correct too of course, but - // slightly too restrictive. - // - // `last_gc_version` expresses the idea: what is the oldest version from which I can - // confidently emit delta from. The reason why we update it here, is - // because a node that was just reset or just joined the cluster will get updates - // from another node that are actually only make sense in the context of the - // emission of delta from a `last_gc_version`. - // - // We want to avoid the case where: - // - Cluster with Node A, B, C - // - Node A, inserts Key K that gets replicated. - // - Network partition as {A} {B,C} - // - A deletes Key K and GCs it. - // - Network restoration - // - B gossips with A and get reset - // - C gossips with B and does NOT get reset. - node_state.last_gc_version = last_gc_version; - } - for (key, versioned_value) in key_values { - node_state.max_version = node_state.max_version.max(versioned_value.version); - node_state.set_versioned_value(key, versioned_value); - } + let node_state = self.node_state_mut(&node_delta.chitchat_id); + node_state.apply_delta(node_delta, now); } } @@ -421,70 +511,64 @@ impl ClusterState { scheduled_for_deletion: &HashSet<&ChitchatId>, ) -> Delta { let mut stale_nodes = SortedStaleNodes::default(); - let mut nodes_to_reset = Vec::new(); for (chitchat_id, node_state) in &self.node_states { if scheduled_for_deletion.contains(chitchat_id) { continue; } - let Some(node_digest) = digest.node_digests.get(chitchat_id) else { - stale_nodes.insert(chitchat_id, node_state); + + let digest_max_version: Version = digest + .node_digests + .get(chitchat_id) + .map(|node_digest| node_digest.max_version) + .unwrap_or(0u64); + + if node_state.max_version <= digest_max_version { + // Our version is actually older than the version of the digest. + // We have no update to offer. continue; - }; - // TODO: We have problem here. If after the delta we end up with a max version that is - // not high enough to bring us to `last_gc_version`, we might get reset again - // and again. - let should_reset = - node_state.last_gc_version > node_digest.max_version && node_digest.max_version > 0; - if should_reset { + } + + // We have garbage collected some tombstones that the other node does not know about + // yet. A reset is needed. + let should_reset = digest_max_version < node_state.last_gc_version; + let from_version_excluded = if should_reset { warn!( "Node to reset {chitchat_id:?} last gc version: {} max version: {}", - node_state.last_gc_version, node_digest.max_version + node_state.last_gc_version, digest_max_version ); - nodes_to_reset.push(chitchat_id); - stale_nodes.insert(chitchat_id, node_state); - continue; - } - stale_nodes.offer(chitchat_id, node_state, node_digest); - } - let mut delta_serializer = DeltaSerializer::with_mtu(mtu); + 0u64 + } else { + digest_max_version + }; - for chitchat_id in &nodes_to_reset { - if !delta_serializer.try_add_node_to_reset((*chitchat_id).clone()) { - break; - } + stale_nodes.offer(chitchat_id, node_state, from_version_excluded); } + let mut delta_serializer = DeltaSerializer::with_mtu(mtu); for stale_node in stale_nodes.into_iter() { if !delta_serializer.try_add_node( stale_node.chitchat_id.clone(), stale_node.node_state.last_gc_version, + stale_node.from_version_excluded, ) { break; - } + }; + let mut added_something = false; for (key, versioned_value) in stale_node.stale_key_values() { - added_something = true; if !delta_serializer.try_add_kv(key, versioned_value.clone()) { return delta_serializer.finish(); } + added_something = true; } - if !added_something && nodes_to_reset.contains(&stale_node.chitchat_id) { - // send a sentinel element to update the max_version. Otherwise the node's vision - // of max_version will be 0, and it may accept writes that are supposed to be - // stale, but it can tell they are. - if !delta_serializer.try_add_kv( - "__reset_sentinel", - VersionedValue { - value: String::new(), - version: stale_node.node_state.max_version, - tombstone: Some(Instant::now()), - }, - ) { - return delta_serializer.finish(); - } + // There aren't any key-values in the state_node apparently. + // Let's add a specific instruction to the delta to set the max version. + if !added_something { + delta_serializer.try_set_max_version(stale_node.node_state.max_version); } } + delta_serializer.finish() } } @@ -547,15 +631,15 @@ struct SortedStaleNodes<'a> { /// If no KV is stale, there is nothing to gossip, and we simply return `None`: /// the node is not a candidate for gossip. fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option { + if node_state.max_version() <= floor_version { + return None; + } let is_unknown = floor_version == 0u64; let num_stale_key_values = if is_unknown { node_state.num_key_values() } else { node_state.stale_key_values(floor_version).count() }; - if !is_unknown && num_stale_key_values == 0 { - return None; - } Some(Staleness { is_unknown, max_version: node_state.max_version, @@ -564,41 +648,23 @@ fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option SortedStaleNodes<'a> { - /// Adds a node to the list of stale nodes. - fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) { - let Some(staleness) = staleness_score(node_state, 0u64) else { - // The node does not have any stale key values. - return; - }; - - let floor_version = 0; - let stale_node = StaleNode { - chitchat_id, - node_state, - floor_version, - }; - self.stale_nodes - .entry(staleness) - .or_default() - .push(stale_node); - } - - /// Evaluates whether the node should be added to the list of stale nodes. + /// Adds a to the list of stale nodes. + /// If the node is not stale (meaning we have no fresher Key Values to share), then this + /// function simply returns. fn offer( &mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState, - node_digest: &NodeDigest, + from_version_excluded: u64, ) { - let floor_version = node_digest.max_version; - let Some(staleness) = staleness_score(node_state, floor_version) else { + let Some(staleness) = staleness_score(node_state, from_version_excluded) else { // The node does not have any stale KV. return; }; let stale_node = StaleNode { chitchat_id, node_state, - floor_version, + from_version_excluded, }; self.stale_nodes .entry(staleness) @@ -626,14 +692,14 @@ impl<'a> SortedStaleNodes<'a> { struct StaleNode<'a> { chitchat_id: &'a ChitchatId, node_state: &'a NodeState, - floor_version: u64, + from_version_excluded: u64, } impl<'a> StaleNode<'a> { /// Iterates over the stale key-value pairs in decreasing order of staleness. fn stale_key_values(&self) -> impl Iterator { self.node_state - .stale_key_values(self.floor_version) + .stale_key_values(self.from_version_excluded) .sorted_unstable_by_key(|(_, versioned_value)| versioned_value.version) } } @@ -684,6 +750,7 @@ fn random_generator() -> impl Rng { mod tests { use super::*; use crate::serialize::Serializable; + use crate::types::KeyValueMutation; use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE; #[test] @@ -694,7 +761,7 @@ mod tests { let stale_node = StaleNode { chitchat_id: &node, node_state: &node_state, - floor_version: 0, + from_version_excluded: 0u64, }; assert!(stale_node.stale_key_values().next().is_none()); } @@ -714,7 +781,7 @@ mod tests { let stale_node = StaleNode { chitchat_id: &node, node_state: &node_state, - floor_version: 1, + from_version_excluded: 1u64, }; assert_eq!( stale_node.stale_key_values().collect::>(), @@ -742,16 +809,18 @@ mod tests { // No stale KV. We still insert the node! // That way it will get a node state, and be a candidate for gossip later. - let node_state1 = NodeState::for_test(); - stale_nodes.insert(&node1, &node_state1); + let mut node_state1 = NodeState::for_test(); + node_state1.set_max_version(2); + + stale_nodes.offer(&node1, &node_state1, 0u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); let mut node2_state = NodeState::for_test(); node2_state.set_with_version("key_a", "value_a", 1); - stale_nodes.insert(&node2, &node2_state); + stale_nodes.offer(&node2, &node2_state, 0u64); let expected_staleness = Staleness { is_unknown: true, - max_version: 0, + max_version: 1, num_stale_key_values: 0, }; assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1); @@ -760,7 +829,7 @@ mod tests { node3_state.set_with_version("key_b", "value_b", 2); node3_state.set_with_version("key_c", "value_c", 3); - stale_nodes.insert(&node3, &node3_state); + stale_nodes.offer(&node3, &node3_state, 0u64); let expected_staleness = Staleness { is_unknown: true, max_version: 3, @@ -775,31 +844,23 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); let node1_state = NodeState::for_test(); - stale_nodes.offer(&node1, &node1_state, &NodeDigest::new(Heartbeat(0), 1)); + stale_nodes.offer(&node1, &node1_state, 1u64); // No stale records. This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); let node2 = ChitchatId::for_local_test(10_002); let mut node2_state = NodeState::for_test(); - node2_state - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - stale_nodes.offer(&node2, &node2_state, &NodeDigest::new(Heartbeat(0), 1)); + node2_state.set_with_version("key_a", "value_a", 1); + stale_nodes.offer(&node2, &node2_state, 1u64); // No stale records (due to the floor version). This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); let node3 = ChitchatId::for_local_test(10_002); let mut node3_state = NodeState::for_test(); - node3_state - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node3_state - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node3_state - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node3, &node3_state, &NodeDigest::new(Heartbeat(0), 1)); + node3_state.set_with_version("key_a", "value_a", 1); + node3_state.set_with_version("key_b", "value_b", 2); + node3_state.set_with_version("key_c", "value_c", 3); + stale_nodes.offer(&node3, &node3_state, 1u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); let expected_staleness = Staleness { is_unknown: false, @@ -814,74 +875,46 @@ mod tests { let mut stale_nodes = SortedStaleNodes::default(); let node1 = ChitchatId::for_local_test(10_001); let mut node_state1 = NodeState::for_test(); - node_state1 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node_state1 - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state1 - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node1, &node_state1, &NodeDigest::new(Heartbeat(0), 1)); + node_state1.set_with_version("key_a", "value_a", 1); + node_state1.set_with_version("key_b", "value_b", 2); + node_state1.set_with_version("key_c", "value_c", 3); + stale_nodes.offer(&node1, &node_state1, 1u64); // 2 stale values. let node2 = ChitchatId::for_local_test(10_002); let mut node_state2 = NodeState::for_test(); - node_state2 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node_state2 - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state2 - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 5)); - stale_nodes.offer(&node2, &node_state2, &NodeDigest::new(Heartbeat(0), 2)); + node_state2.set_with_version("key_a", "value", 1); + node_state2.set_with_version("key_b", "value_b", 2); + node_state2.set_with_version("key_c", "value_c", 5); + stale_nodes.offer(&node2, &node_state2, 2u64); // 1 stale value. let node3 = ChitchatId::for_local_test(10_003); let mut node_state3 = NodeState::for_test(); - node_state3 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node_state3 - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state3 - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node3, &node_state3, &NodeDigest::new(Heartbeat(0), 7)); + node_state3.set_with_version("key_a", "value_a", 1); + node_state3.set_with_version("key_b", "value_b", 2); + node_state3.set_with_version("key_c", "value_c", 3); + stale_nodes.offer(&node3, &node_state3, 7u64); // 0 stale values. let node4 = ChitchatId::for_local_test(10_004); let mut node_state4 = NodeState::for_test(); - node_state4 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node_state4 - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state4 - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 5)); - node_state4 - .key_values - .insert("key_d".to_string(), VersionedValue::for_test("value_d", 7)); - stale_nodes.offer(&node4, &node_state4, &NodeDigest::new(Heartbeat(0), 1)); + node_state4.set_with_version("key_a", "value_a", 1); + node_state4.set_with_version("key_b", "value_b", 2); + node_state4.set_with_version("key_c", "value_c", 5); + node_state4.set_with_version("key_d", "value_d", 7); + stale_nodes.offer(&node4, &node_state4, 1); // 3 stale values let node5 = ChitchatId::for_local_test(10_005); let node_state5 = NodeState::for_test(); - stale_nodes.insert(&node5, &node_state5); + stale_nodes.offer(&node5, &node_state5, 0); // 0 stale values let node6 = ChitchatId::for_local_test(10_006); let mut node_state6 = NodeState::for_test(); - node_state6 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - stale_nodes.insert(&node6, &node_state6); + node_state6.set_with_version("key_a", "value_a", 1); + stale_nodes.offer(&node6, &node_state6, 0u64); // 1 stale values assert_eq!( @@ -889,7 +922,7 @@ mod tests { .into_iter() .map(|stale_node| stale_node.chitchat_id.gossip_advertise_addr.port()) .collect::>(), - vec![10_005, 10_006, 10_004, 10_001, 10_002] + vec![10_006, 10_004, 10_001, 10_002] ); } @@ -1083,13 +1116,12 @@ mod tests { node2_state.set_with_version("key_c".to_string(), "3".to_string(), 1); // 1 let mut delta = Delta::default(); - delta.add_node(node1.clone(), 0u64); + delta.add_node(node1.clone(), 0u64, 0u64); delta.add_kv(&node1, "key_a", "4", 4, false); delta.add_kv(&node1, "key_b", "2", 2, false); - // Reset node 2. - delta.add_node_to_reset(node2.clone()); - delta.add_node(node2.clone(), 0u64); + // We reset node 2 + delta.add_node(node2.clone(), 3, 0); delta.add_kv(&node2, "key_d", "4", 4, false); cluster_state.apply_delta(delta); @@ -1150,7 +1182,7 @@ mod tests { for (num_entries, &mtu) in mtu_per_num_entries.iter().enumerate() { let mut expected_delta = Delta::default(); for &(node, key, val, version, tombstone) in &expected_delta_atoms[..num_entries] { - expected_delta.add_node(node.clone(), 0u64); + expected_delta.add_node(node.clone(), 0u64, 0u64); expected_delta.add_kv(node, key, val, version, tombstone); } { @@ -1296,13 +1328,12 @@ mod tests { MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), ); - assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); - expected_delta.add_node(node2.clone(), 0u64); + expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node(node1.clone(), 0u64); + expected_delta.add_node(node1.clone(), 0u64, 1u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.set_serialized_len(73); + expected_delta.set_serialized_len(76); assert_eq!(delta, expected_delta); } @@ -1321,14 +1352,13 @@ mod tests { MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), ); - assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); - expected_delta.add_node(node2.clone(), 0u64); + expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node(node1.clone(), 0u64); + expected_delta.add_node(node1.clone(), 0u64, 1u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); expected_delta.add_kv(&node1, "key_a", "", 3, true); - expected_delta.set_serialized_len(87); + expected_delta.set_serialized_len(90); assert_eq!(delta, expected_delta); } @@ -1337,7 +1367,7 @@ mod tests { tokio::time::advance(DELETE_GRACE_PERIOD).await; cluster_state .node_state_mut(&node1) - .gc_keys_marked_for_deletion(Duration::from_secs(10)); + .gc_keys_marked_for_deletion(DELETE_GRACE_PERIOD); { let mut digest = Digest::default(); @@ -1348,12 +1378,12 @@ mod tests { &HashSet::new(), ); let mut expected_delta = Delta::default(); - expected_delta.add_node(node2.clone(), 0u64); + expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node_to_reset(node1.clone()); - expected_delta.add_node(node1.clone(), 3u64); + // Last gc set to 3 and from version to 0. That's a reset right there. + expected_delta.add_node(node1.clone(), 3u64, 0u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.set_serialized_len(83); + expected_delta.set_serialized_len(75); assert_eq!(&delta, &expected_delta); } } @@ -1375,4 +1405,173 @@ mod tests { .collect(); assert_eq!(node_states, &["Europe:", "Europe:Italy"]); } + + #[test] + fn test_node_apply_delta_simple() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 1); + node_state.set_with_version("key_b", "val_a", 2); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 2, + last_gc_version: 0u64, + max_version: None, + key_values: vec![ + KeyValueMutation { + key: "key_c".to_string(), + value: "val_c".to_string(), + version: 4, + tombstone: false, + }, + KeyValueMutation { + key: "key_b".to_string(), + value: "val_b2".to_string(), + version: 3, + tombstone: false, + }, + ], + }; + node_state.apply_delta(node_delta, Instant::now()); + assert_eq!(node_state.num_key_values(), 3); + assert_eq!(node_state.max_version(), 4); + assert_eq!(node_state.last_gc_version, 0); + assert_eq!(node_state.get("key_a").unwrap(), "val_a"); + assert_eq!(node_state.get("key_b").unwrap(), "val_b2"); + assert_eq!(node_state.get("key_c").unwrap(), "val_c"); + } + + // Here we check that the accessor that dismiss resetting a Kv to the same value is not + // used in apply delta. Resetting to the same value is very possible in reality several updates + // happened in a row but were shadowed by the scuttlebutt logic. We DO need to update the + // version. + #[test] + fn test_node_apply_same_value_different_version() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 1); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 1, + last_gc_version: 0, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "val_a".to_string(), + version: 3, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 3); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "val_a"); + } + + #[test] + fn test_node_skip_delta_from_the_future() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 5); + assert_eq!(node_state.max_version(), 5); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 6, // we skipped version 6 here. + last_gc_version: 0, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "new_val".to_string(), + version: 7, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 5); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "val_a"); + } + + #[tokio::test] + async fn test_node_apply_delta_different_last_gc_is_ok_if_below_max_version() { + tokio::time::pause(); + const GC_PERIOD: Duration = Duration::from_secs(10); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 17); + node_state.mark_for_deletion("key_a"); + tokio::time::advance(GC_PERIOD).await; + node_state.gc_keys_marked_for_deletion(GC_PERIOD); + assert_eq!(node_state.last_gc_version, 18); + assert_eq!(node_state.max_version(), 18); + node_state.set_with_version("key_a", "val_a", 31); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 31, // we skipped version 6 here. + last_gc_version: 30, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "new_val".to_string(), + version: 32, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 32); + assert_eq!(node_state.max_version(), 32); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "new_val"); + } + + #[tokio::test] + async fn test_node_apply_delta_on_reset_fresher_version() { + tokio::time::pause(); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 17); + assert_eq!(node_state.max_version(), 17); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 0, // we skipped version 6 here. + last_gc_version: 30, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_b".to_string(), + value: "val_b".to_string(), + version: 32, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + assert!(node_state.get_versioned("key_a").is_none()); + let versioned_b = node_state.get_versioned("key_b").unwrap(); + assert_eq!(versioned_b.version, 32); + } + + #[tokio::test] + async fn test_node_apply_delta_no_reset_if_older_version() { + tokio::time::pause(); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 31); + node_state.set_with_version("key_b", "val_b2", 32); + assert_eq!(node_state.max_version(), 32); + // This does look like a valid reset, but we are already at version 32. + // Let's ignore this. + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 0, // we skipped version 6 here. + last_gc_version: 17, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_b".to_string(), + value: "val_b".to_string(), + version: 30, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + assert_eq!(node_state.max_version, 32); + let versioned_b = node_state.get_versioned("key_b").unwrap(); + assert_eq!(versioned_b.version, 32); + assert_eq!(versioned_b.value, "val_b2"); + } } diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index fc6a2a3..27f8664 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -4,6 +4,9 @@ use std::net::SocketAddr; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use crate::serialize::Deserializable; +use crate::Serializable; + /// For the lifetime of a cluster, nodes can go down and come back up multiple times. They may also /// die permanently. A [`ChitchatId`] is composed of three components: /// - `node_id`: an identifier unique across the cluster. @@ -30,7 +33,9 @@ impl Debug for ChitchatId { write!( f, "{}:{}:{}", - &self.node_id, self.generation_id, self.gossip_advertise_addr + self.node_id.as_str(), + self.generation_id, + self.gossip_advertise_addr ) } } @@ -107,6 +112,64 @@ impl PartialEq for VersionedValue { } } +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub(crate) struct KeyValueMutation { + pub(crate) key: String, + pub(crate) value: String, + pub(crate) version: Version, + pub(crate) tombstone: bool, +} + +impl<'a> From<&'a KeyValueMutation> for KeyValueMutationRef<'a> { + fn from(mutation: &'a KeyValueMutation) -> KeyValueMutationRef<'a> { + KeyValueMutationRef { + key: mutation.key.as_str(), + value: mutation.value.as_str(), + version: mutation.version, + tombstone: mutation.tombstone, + } + } +} + +#[derive(Debug, Eq, PartialEq, Clone, Serialize)] +pub(crate) struct KeyValueMutationRef<'a> { + pub(crate) key: &'a str, + pub(crate) value: &'a str, + pub(crate) version: Version, + pub(crate) tombstone: bool, +} + +impl<'a> Serializable for KeyValueMutationRef<'a> { + fn serialize(&self, buf: &mut Vec) { + Serializable::serialize(self.key, buf); + Serializable::serialize(self.value, buf); + Serializable::serialize(&self.version, buf); + Serializable::serialize(&self.tombstone, buf); + } + + fn serialized_len(&self) -> usize { + Serializable::serialized_len(self.key) + + Serializable::serialized_len(self.value) + + Serializable::serialized_len(&self.version) + + Serializable::serialized_len(&self.tombstone) + } +} + +impl Deserializable for KeyValueMutation { + fn deserialize(buf: &mut &[u8]) -> anyhow::Result { + let key: String = Deserializable::deserialize(buf)?; + let value: String = Deserializable::deserialize(buf)?; + let version: u64 = Deserializable::deserialize(buf)?; + let tombstone: bool = Deserializable::deserialize(buf)?; + Ok(KeyValueMutation { + key, + value, + version, + tombstone, + }) + } +} + #[derive(Serialize, Deserialize)] struct VersionedValueForSerialization { pub value: String, diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index d6c6533..919dd00 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -83,7 +83,7 @@ impl Simulator { pub async fn execute(&mut self, operations: Vec) { for operation in operations { - debug!("Execute operation {operation:?}"); + info!("Execute operation {operation:?}"); match operation { Operation::AddNode { chitchat_id, @@ -405,6 +405,7 @@ async fn test_marked_for_deletion_gc_with_network_partition_2_nodes() { } #[tokio::test] async fn test_marked_for_deletion_gc_with_network_partition_4_nodes() { + let _ = tracing_subscriber::fmt::try_init(); const TIMEOUT: Duration = Duration::from_millis(500); let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); let chitchat_id_1 = test_chitchat_id(1);