From c55c8834ef1a089a1c6d7826cba1db03790cd986 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 23 Feb 2024 19:57:27 +0900 Subject: [PATCH 1/8] Bugfix: validation of delta before applying them. Due to the nature UDP, the existence of resets and the fact that we are gossipping to several nodes at the same time, it is possible for our obsolete deltas to arrive. This PR adds some validation to detect if the delta is valid, and whether it will bring us to a better state or not. It also removes the nodes to reset information, which was actually taking a large amount of the MTU on large clusters. (For 20 nodes, around 1KB) Reset is now just expressed by sending the delta with `from_version = 0`. Closes #129 - Removing hidden contract We avoid computing tombstone's Instant upon deserialization. It was hiding a very hidden contract forcing us to deserialize mutation in the order of their version. With this change, we defer the computation of the instant to the call of the apply_delta method. All of the tombstone from a delta get the exact same `Instant`. --- chitchat/src/delta.rs | 282 +++++++-------- chitchat/src/lib.rs | 8 +- chitchat/src/message.rs | 18 +- chitchat/src/server.rs | 6 + chitchat/src/state.rs | 609 ++++++++++++++++++++++----------- chitchat/src/types.rs | 65 +++- chitchat/tests/cluster_test.rs | 3 +- 7 files changed, 622 insertions(+), 369 deletions(-) diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 01efde7..f8e8021 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -1,8 +1,7 @@ use std::collections::HashSet; -use tokio::time::Instant; - use crate::serialize::*; +use crate::types::{KeyValueMutation, KeyValueMutationRef}; use crate::{ChitchatId, Version, VersionedValue}; /// A delta is the message we send to another node to update it. @@ -11,7 +10,6 @@ use crate::{ChitchatId, Version, VersionedValue}; /// encoded one after the other in a compressed stream. #[derive(Debug, Eq, PartialEq)] pub struct Delta { - pub(crate) nodes_to_reset: Vec, pub(crate) node_deltas: Vec, serialized_len: usize, } @@ -19,7 +17,6 @@ pub struct Delta { impl Default for Delta { fn default() -> Self { Delta { - nodes_to_reset: Vec::new(), node_deltas: Vec::new(), serialized_len: 1, } @@ -28,44 +25,48 @@ impl Default for Delta { impl Delta { fn get_operations(&self) -> impl Iterator> { - let nodes_to_reset_ops = self.nodes_to_reset.iter().map(DeltaOpRef::NodeToReset); - let node_deltas = self.node_deltas.iter().flat_map(|node_delta| { + self.node_deltas.iter().flat_map(|node_delta| { std::iter::once(DeltaOpRef::Node { chitchat_id: &node_delta.chitchat_id, last_gc_version: node_delta.last_gc_version, + from_version_excluded: node_delta.from_version_excluded, }) - .chain(node_delta.key_values.iter().map(|(key, versioned_value)| { - DeltaOpRef::KeyValue { - key, - versioned_value, - } - })) - }); - nodes_to_reset_ops.chain(node_deltas) + .chain( + node_delta + .key_values + .iter() + .map(|key_value_mutation| DeltaOpRef::KeyValue(key_value_mutation.into())), + ) + .chain({ + node_delta + .max_version + .map(|max_version| DeltaOpRef::SetMaxVersion { max_version }) + }) + }) } } enum DeltaOp { - NodeToReset(ChitchatId), Node { chitchat_id: ChitchatId, last_gc_version: Version, + from_version_excluded: u64, }, - KeyValue { - key: String, - versioned_value: VersionedValue, + KeyValue(KeyValueMutation), + SetMaxVersion { + max_version: Version, }, } enum DeltaOpRef<'a> { - NodeToReset(&'a ChitchatId), Node { chitchat_id: &'a ChitchatId, last_gc_version: Version, + from_version_excluded: u64, }, - KeyValue { - key: &'a str, - versioned_value: &'a VersionedValue, + KeyValue(KeyValueMutationRef<'a>), + SetMaxVersion { + max_version: Version, }, } @@ -73,7 +74,7 @@ enum DeltaOpRef<'a> { enum DeltaOpTag { Node = 0u8, KeyValue = 1u8, - NodeToReset = 2u8, + SetMaxVersion = 2u8, } impl TryFrom for DeltaOpTag { @@ -83,7 +84,6 @@ impl TryFrom for DeltaOpTag { match tag_byte { 0u8 => Ok(DeltaOpTag::Node), 1u8 => Ok(DeltaOpTag::KeyValue), - 2u8 => Ok(DeltaOpTag::NodeToReset), _ => { anyhow::bail!("Unknown tag: {tag_byte}") } @@ -102,16 +102,14 @@ impl Deserializable for DeltaOp { let tag_bytes: [u8; 1] = Deserializable::deserialize(buf)?; let tag = DeltaOpTag::try_from(tag_bytes[0])?; match tag { - DeltaOpTag::NodeToReset => { - let chitchat_id = ChitchatId::deserialize(buf)?; - Ok(DeltaOp::NodeToReset(chitchat_id)) - } DeltaOpTag::Node => { let chitchat_id = ChitchatId::deserialize(buf)?; let last_gc_version = Version::deserialize(buf)?; + let from_version_excluded = u64::deserialize(buf)?; Ok(DeltaOp::Node { chitchat_id, last_gc_version, + from_version_excluded, }) } DeltaOpTag::KeyValue => { @@ -119,16 +117,16 @@ impl Deserializable for DeltaOp { let value = String::deserialize(buf)?; let version = u64::deserialize(buf)?; let deleted = bool::deserialize(buf)?; - let tombstone = if deleted { Some(Instant::now()) } else { None }; - let versioned_value: VersionedValue = VersionedValue { + Ok(DeltaOp::KeyValue(KeyValueMutation { + key, value, version, - tombstone, - }; - Ok(DeltaOp::KeyValue { - key, - versioned_value, - }) + tombstone: deleted, + })) + } + DeltaOpTag::SetMaxVersion => { + let max_version = Version::deserialize(buf)?; + Ok(DeltaOp::SetMaxVersion { max_version }) } } } @@ -140,18 +138,18 @@ impl DeltaOp { DeltaOp::Node { chitchat_id, last_gc_version, + from_version_excluded, } => DeltaOpRef::Node { chitchat_id, last_gc_version: *last_gc_version, + from_version_excluded: *from_version_excluded, }, - DeltaOp::KeyValue { - key, - versioned_value, - } => DeltaOpRef::KeyValue { - key, - versioned_value, + DeltaOp::KeyValue(key_value_mutation) => { + DeltaOpRef::KeyValue(key_value_mutation.into()) + } + DeltaOp::SetMaxVersion { max_version } => DeltaOpRef::SetMaxVersion { + max_version: *max_version, }, - DeltaOp::NodeToReset(node_to_reset) => DeltaOpRef::NodeToReset(node_to_reset), } } } @@ -172,24 +170,20 @@ impl<'a> Serializable for DeltaOpRef<'a> { Self::Node { chitchat_id, last_gc_version, + from_version_excluded: from_version, } => { buf.push(DeltaOpTag::Node.into()); chitchat_id.serialize(buf); last_gc_version.serialize(buf); + from_version.serialize(buf); } - Self::KeyValue { - key, - versioned_value, - } => { + Self::KeyValue(key_value_mutation_ref) => { buf.push(DeltaOpTag::KeyValue.into()); - key.serialize(buf); - versioned_value.value.serialize(buf); - versioned_value.version.serialize(buf); - versioned_value.is_tombstone().serialize(buf); + key_value_mutation_ref.serialize(buf); } - Self::NodeToReset(chitchat_id) => { - buf.push(DeltaOpTag::NodeToReset.into()); - chitchat_id.serialize(buf); + Self::SetMaxVersion { max_version } => { + buf.push(DeltaOpTag::SetMaxVersion.into()); + max_version.serialize(buf); } } } @@ -199,17 +193,14 @@ impl<'a> Serializable for DeltaOpRef<'a> { Self::Node { chitchat_id, last_gc_version, - } => chitchat_id.serialized_len() + last_gc_version.serialized_len(), - Self::KeyValue { - key, - versioned_value, + from_version_excluded: from_version, } => { - key.serialized_len() - + versioned_value.value.serialized_len() - + versioned_value.version.serialized_len() - + 1 + chitchat_id.serialized_len() + + last_gc_version.serialized_len() + + from_version.serialized_len() } - Self::NodeToReset(chitchat_id) => chitchat_id.serialized_len(), + Self::KeyValue(key_value_mutation_ref) => key_value_mutation_ref.serialized_len(), + Self::SetMaxVersion { max_version } => max_version.serialized_len(), } } } @@ -252,7 +243,12 @@ impl Delta { .sum() } - pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) { + pub(crate) fn add_node( + &mut self, + chitchat_id: ChitchatId, + last_gc_version: Version, + from_version: Version, + ) { assert!(!self .node_deltas .iter() @@ -260,7 +256,9 @@ impl Delta { self.node_deltas.push(NodeDelta { chitchat_id, last_gc_version, + from_version_excluded: from_version, key_values: Vec::new(), + max_version: None, }); } @@ -277,15 +275,12 @@ impl Delta { .iter_mut() .find(|node_delta| &node_delta.chitchat_id == chitchat_id) .unwrap(); - let tombstone = if deleted { Some(Instant::now()) } else { None }; - node_delta.key_values.push(( - key.to_string(), - VersionedValue { - value: value.to_string(), - version, - tombstone, - }, - )); + node_delta.key_values.push(KeyValueMutation { + key: key.to_string(), + value: value.to_string(), + version, + tombstone: deleted, + }); } pub(crate) fn set_serialized_len(&mut self, serialized_len: usize) { @@ -297,17 +292,30 @@ impl Delta { .iter() .find(|node_delta| &node_delta.chitchat_id == chitchat_id) } - - pub(crate) fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) { - self.nodes_to_reset.push(chitchat_id); - } } #[derive(Debug, Eq, PartialEq, serde::Serialize)] pub(crate) struct NodeDelta { pub chitchat_id: ChitchatId, + // `from_version` and `last_gc_version` are here to express on which states + // this delta can be applied to. + // + // `last_gc_version` expresses that this delta has be computed from a state where no + // keys > `last_gc_version` have been removed. + // + // `from_version` expresses that from this state, ALL of the records in + // (`from_version`.. `max_version`] are present in this delta + // (where max_version is maximum version in the delta.) + // + // In other words, the only gaps in this interval are deleted key-values with a version + // <= `last gc version`. + // + // Inspect the code in `prepare_apply_delta(..)` to see the rules on how `from_version` + // and `last_gc_version` are used. + pub from_version_excluded: Version, pub last_gc_version: Version, - pub key_values: Vec<(String, VersionedValue)>, + pub key_values: Vec, + pub max_version: Option, } #[cfg(test)] @@ -336,6 +344,7 @@ impl DeltaBuilder { DeltaOp::Node { chitchat_id, last_gc_version, + from_version_excluded, } => { self.flush(); anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id)); @@ -343,34 +352,28 @@ impl DeltaBuilder { self.current_node_delta = Some(NodeDelta { chitchat_id, last_gc_version, + from_version_excluded, key_values: Vec::new(), + max_version: None, }); } - DeltaOp::KeyValue { - key, - versioned_value, - } => { + DeltaOp::KeyValue(key_value_mutation) => { let Some(current_node_delta) = self.current_node_delta.as_mut() else { anyhow::bail!("received a key-value op without a node op before."); }; - if let Some((_last_key, last_versioned_value)) = - current_node_delta.key_values.last() - { + if let Some(previous_key_value_mutation) = current_node_delta.key_values.last() { anyhow::ensure!( - last_versioned_value.version < versioned_value.version, + previous_key_value_mutation.version < key_value_mutation.version, "kv version should be increasing" ); } - current_node_delta - .key_values - .push((key.to_string(), versioned_value)); + current_node_delta.key_values.push(key_value_mutation); } - DeltaOp::NodeToReset(chitchat_id) => { - anyhow::ensure!( - self.delta.node_deltas.is_empty(), - "nodes_to_reset should be encoded before node_deltas" - ); - self.delta.nodes_to_reset.push(chitchat_id); + DeltaOp::SetMaxVersion { max_version } => { + let Some(current_node_delta) = self.current_node_delta.as_mut() else { + anyhow::bail!("received a key-value op without a node op before."); + }; + current_node_delta.max_version = Some(max_version); } } Ok(()) @@ -411,11 +414,9 @@ impl DeltaSerializer { } } - /// Returns false if the node to reset could not be added because the payload would exceed the - /// mtu. - pub fn try_add_node_to_reset(&mut self, chitchat_id: ChitchatId) -> bool { - let delta_op = DeltaOp::NodeToReset(chitchat_id); - self.try_add_op(delta_op) + pub fn try_set_max_version(&mut self, max_version: Version) -> bool { + let key_value_op = DeltaOp::SetMaxVersion { max_version }; + self.try_add_op(key_value_op) } fn try_add_op(&mut self, delta_op: DeltaOp) -> bool { @@ -433,18 +434,27 @@ impl DeltaSerializer { /// Returns false if the KV could not be added because the payload would exceed the mtu. pub fn try_add_kv(&mut self, key: &str, versioned_value: VersionedValue) -> bool { - let key_value_op = DeltaOp::KeyValue { + let key_value_mutation = KeyValueMutation { key: key.to_string(), - versioned_value, + value: versioned_value.value, + version: versioned_value.version, + tombstone: versioned_value.tombstone.is_some(), }; + let key_value_op = DeltaOp::KeyValue(key_value_mutation); self.try_add_op(key_value_op) } /// Returns false if the node could not be added because the payload would exceed the mtu. - pub fn try_add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) -> bool { + pub fn try_add_node( + &mut self, + chitchat_id: ChitchatId, + last_gc_version: Version, + from_version: Version, + ) -> bool { let new_node_op = DeltaOp::Node { chitchat_id, last_gc_version, + from_version_excluded: from_version, }; self.try_add_op(new_node_op) } @@ -457,6 +467,8 @@ impl DeltaSerializer { #[cfg(test)] mod tests { + use tokio::time::Instant; + use super::*; #[test] @@ -472,7 +484,7 @@ mod tests { // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node). - assert!(delta_writer.try_add_node(node1, 0u64)); + assert!(delta_writer.try_add_node(node1, 0u64, 0u64)); // +23 bytes: 2 bytes (key length) + 5 bytes (key) + 7 bytes (values) + 8 bytes (version) + // 1 bytes (empty tombstone). @@ -497,7 +509,7 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +37 bytes - assert!(delta_writer.try_add_node(node2, 0)); + assert!(delta_writer.try_add_node(node2, 0, 0u64)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -523,12 +535,12 @@ mod tests { #[test] fn test_delta_serialization_simple_node() { // 1 bytes (End tag) - let mut delta_writer = DeltaSerializer::with_mtu(128); + let mut delta_writer = DeltaSerializer::with_mtu(140); // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); // +37 bytes = 8 bytes (last gc version) + 27 bytes (node) + 2bytes (block length) - assert!(delta_writer.try_add_node(node1, 0)); + assert!(delta_writer.try_add_node(node1, 0, 0u64)); // +24 bytes (kv + op tag) assert!(delta_writer.try_add_kv( @@ -552,7 +564,7 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +37 bytes = 8 bytes (last gc version) + 2 bytes (empty node delta) + 27 bytes (node). - assert!(delta_writer.try_add_node(node2, 0)); + assert!(delta_writer.try_add_node(node2, 0, 0u64)); test_aux_delta_writer(delta_writer, 80); } @@ -569,13 +581,12 @@ mod tests { // +27 bytes (ChitchatId) + 1 (op tag) + 3 bytes (block len) // = 32 bytes - assert!(delta_writer.try_add_node_to_reset(ChitchatId::for_local_test(10_000))); let node1 = ChitchatId::for_local_test(10_001); // +8 bytes (last gc version) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic // new block) = 71 - assert!(delta_writer.try_add_node(node1, 0u64)); + assert!(delta_writer.try_add_node(node1, 0u64, 0u64)); // +23 bytes (kv) + 1 (op tag) // = 95 @@ -601,9 +612,9 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag) // = 155 - assert!(delta_writer.try_add_node(node2, 0u64)); + assert!(delta_writer.try_add_node(node2, 0u64, 0)); // The block got compressed. - test_aux_delta_writer(delta_writer, 85); + test_aux_delta_writer(delta_writer, 80); } #[test] @@ -613,7 +624,7 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(delta_writer.try_add_node(node1, 0)); + assert!(delta_writer.try_add_node(node1, 0, 0)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -636,42 +647,7 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(!delta_writer.try_add_node(node2, 0u64)); - - // The block got compressed. - test_aux_delta_writer(delta_writer, 72); - } - - #[test] - fn test_delta_serialization_exceed_mtu_on_add_node_to_reset() { - // 4 bytes. - let mut delta_writer = DeltaSerializer::with_mtu(100); - - let node1 = ChitchatId::for_local_test(10_001); - // +37 bytes. - assert!(delta_writer.try_add_node(node1, 0u64)); - - // +23 bytes. - assert!(delta_writer.try_add_kv( - "key11", - VersionedValue { - value: "val11".to_string(), - version: 1, - tombstone: None, - } - )); - // +23 bytes. - assert!(delta_writer.try_add_kv( - "key12", - VersionedValue { - value: "val12".to_string(), - version: 2, - tombstone: None, - } - )); - - let node2 = ChitchatId::for_local_test(10_002); - assert!(!delta_writer.try_add_node_to_reset(node2)); + assert!(!delta_writer.try_add_node(node2, 0u64, 1u64)); // The block got compressed. test_aux_delta_writer(delta_writer, 72); @@ -686,7 +662,7 @@ mod tests { // + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag) // = 40 - assert!(delta_writer.try_add_node(node1, 0u64)); + assert!(delta_writer.try_add_node(node1, 0u64, 1u64)); // +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag) // = 67 @@ -709,7 +685,7 @@ mod tests { tombstone: None, } )); - test_aux_delta_writer(delta_writer, 64); + test_aux_delta_writer(delta_writer, 72); } #[test] @@ -718,7 +694,7 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(62); let node1 = ChitchatId::for_local_test(10_001); - assert!(delta_writer.try_add_node(node1, 0u64)); + assert!(delta_writer.try_add_node(node1, 0u64, 1u64)); assert!(delta_writer.try_add_kv( "key11", @@ -756,6 +732,6 @@ mod tests { num_valid_tags += 1; } } - assert_eq!(num_valid_tags, 3); + assert_eq!(num_valid_tags, 2); } } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 79ba476..7f93a5c 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -109,7 +109,10 @@ impl Chitchat { /// Executes the catch-up callback if necessary. fn maybe_trigger_catchup_callback(&self, delta: &Delta) { - if !delta.nodes_to_reset.is_empty() { + let has_reset = delta.node_deltas.iter().any(|node_delta| { + node_delta.from_version_excluded == 0 && node_delta.last_gc_version > 0 + }); + if has_reset { if let Some(catchup_callback) = &self.config.catchup_callback { info!("executing catch-up callback"); catchup_callback(); @@ -1046,7 +1049,8 @@ mod tests { node.process_delta(delta); let mut delta = Delta::default(); - delta.add_node_to_reset(ChitchatId::for_local_test(10_002)); + let chitchat_id = ChitchatId::for_local_test(10_002); + delta.add_node(chitchat_id, 1000u64, 0u64); node.process_delta(delta); assert_eq!(catchup_callback_counter.load(Ordering::Acquire), 1); diff --git a/chitchat/src/message.rs b/chitchat/src/message.rs index b79e67b..b0b74a6 100644 --- a/chitchat/src/message.rs +++ b/chitchat/src/message.rs @@ -162,15 +162,19 @@ mod tests { // 4 bytes let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); - // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), 0u64); + // +27 bytes (ChitchatId) + // + 2 bytes (node delta len) + // + 8 bytes (last_gc_version) + // + 8 bytes (from_version). + delta.add_node(node.clone(), 0u64, 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); - delta.set_serialized_len(62); + // That's compression kicking in. + delta.set_serialized_len(60); let syn_ack = ChitchatMessage::SynAck { digest, delta }; // 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta). - test_serdeser_aux(&syn_ack, 108); + test_serdeser_aux(&syn_ack, 1 + 60 + 45); } } @@ -186,12 +190,12 @@ mod tests { let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), 0u64); + delta.add_node(node.clone(), 0u64, 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); - delta.set_serialized_len(62); + delta.set_serialized_len(60); let ack = ChitchatMessage::Ack { delta }; - test_serdeser_aux(&ack, 63); + test_serdeser_aux(&ack, 1 + 60); } } diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 9fddbce..81bc446 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -578,6 +578,12 @@ mod tests { let server_handle = spawn_chitchat(server_config, Vec::new(), &transport) .await .unwrap(); + server_handle + .chitchat() + .lock() + .await + .self_node_state() + .set("key", "value"); // Add our test socket to the server's nodes. server_handle diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index a7d171a..ca5383b 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -12,7 +12,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio::time::Instant; -use tracing::warn; +use tracing::{info, warn}; use crate::delta::{Delta, DeltaSerializer, NodeDelta}; use crate::digest::{Digest, NodeDigest}; @@ -24,14 +24,29 @@ pub struct NodeState { chitchat_id: ChitchatId, heartbeat: Heartbeat, key_values: BTreeMap, - max_version: Version, #[serde(skip)] listeners: Listeners, + max_version: Version, // This is the maximum version of the last tombstone GC. // - // After we GC the tombstones, we cannot safely do replication with - // nodes that are asking for a diff from a version lower than this. + // Due to the garbage collection of tombstones, we cannot + // safely do replication with nodes that are asking for a + // diff from a version lower than this. + // + // `last_gc_version` expresses the idea: what is the oldest version from which I can + // confidently emit delta from. The reason why we update it here, is + // because a node that was just reset or just joined the cluster will get updates + // from another node that are actually only make sense in the context of the + // emission of delta from a `last_gc_version`. last_gc_version: Version, + // A proper interpretation of `max_version` and `last_gc_version` is the following: + // The state contains exactly: + // - all of the (non-deleted) key values present at snapshot `max_version`. + // - all of the tombstones of the entry that were deleted between (`last_gc_version`, + // `max_version]`. + // + // It does not contain any trace of the tombstones of the entries that were deleted before + // `<= last_gc_version`. } impl Debug for NodeState { @@ -104,6 +119,123 @@ impl NodeState { .map(|(key, versioned_value)| (key, versioned_value.value.as_str())) } + pub fn set_max_version(&mut self, max_version: Version) { + self.max_version = max_version; + } + + // Prepare the node state to receive a delta. + // Returns `true` if the delta can be applied. In that case, the node state may be mutated (if a + // reset is required) Returns `false` if the delta cannot be applied. In that case, the node + // state is not modified. + #[must_use] + fn prepare_apply_delta(&mut self, node_delta: &NodeDelta) -> bool { + if node_delta.from_version_excluded > self.max_version { + // This delta is coming from the future. + // We probably experienced a reset and this delta is not usable for us anymore. + // This is not a bug, it can happen, but we just need to ignore it! + info!( + node=?node_delta.chitchat_id, + from_version=node_delta.from_version_excluded, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "received delta from the future, ignoring it" + ); + return false; + } + + if self.max_version > node_delta.last_gc_version { + // The GCed tombstone have all been already received. + // We won't miss anything by applying the delta! + return true; + } + + // This delta might be missing tombstones with version within + // (`node_state.max_version`..`last_gc_version`]. + // + // It is ok if we don't have the associated values to begin + // with. + if self.last_gc_version >= node_delta.last_gc_version { + return true; + } + + if node_delta.from_version_excluded > 0 { + warn!( + node=?node_delta.chitchat_id, + from_version=node_delta.from_version_excluded, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "received an inapplicable delta, ignoring it"); + } + + let Some(delta_max_version) = node_delta + .key_values + .iter() + .map(|key_value_mutation| key_value_mutation.version) + .max() + .or(node_delta.max_version) + else { + // This can happen if we just hit the mtu at the moment + // of writing the SetMaxVersion operation. + return false; + }; + + if delta_max_version <= self.max_version() { + // There is not point apply in this delta as it is not bringing us to a newer state. + warn!( + node=?node_delta.chitchat_id, + from_version=node_delta.from_version_excluded, + delta_max_version=delta_max_version, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "received a delta that does not bring us to a fresher state, ignoring it"); + return false; + } + + // We are out of sync. This delta is an invitation to `reset` our state. + info!( + node=?node_delta.chitchat_id, + last_gc_version=node_delta.last_gc_version, + current_last_gc_version=self.last_gc_version, + "resetting node"); + *self = NodeState::new(node_delta.chitchat_id.clone(), self.listeners.clone()); + if let Some(max_version) = node_delta.max_version { + self.max_version = max_version; + } + // We need to reset our `last_gc_version`. + self.last_gc_version = node_delta.last_gc_version; + true + } + + fn apply_delta(&mut self, node_delta: NodeDelta, now: Instant) { + info!(delta=?node_delta); + if !self.prepare_apply_delta(&node_delta) { + return; + } + let current_max_version = self.max_version(); + for key_value_mutation in node_delta.key_values { + if key_value_mutation.version <= current_max_version { + // We already know about this KV. + continue; + } + if key_value_mutation.tombstone { + // We don't want to keep any tombstone before `last_gc_version`. + if key_value_mutation.version <= self.last_gc_version { + continue; + } + } + let versioned_value = VersionedValue { + value: key_value_mutation.value, + version: key_value_mutation.version, + tombstone: if key_value_mutation.tombstone { + Some(now) + } else { + None + }, + }; + self.set_versioned_value(key_value_mutation.key, versioned_value); + } + } + /// Returns key values matching a prefix pub fn iter_prefix<'a>( &'a self, @@ -344,53 +476,11 @@ impl ClusterState { } pub(crate) fn apply_delta(&mut self, delta: Delta) { - // Remove nodes to reset. - if !delta.nodes_to_reset.is_empty() { - tracing::info!(nodes_to_reset=?delta.nodes_to_reset, "nodes to reset"); - } - - // Clearing the node of the states to reset. - for node_to_reset in delta.nodes_to_reset { - // We don't want to remove the entire state here: the node could be alive and the - // live watcher panics if the state is missing. - if let Some(node_state) = self.node_states.get_mut(&node_to_reset) { - *node_state = NodeState::new(node_to_reset, self.listeners.clone()); - } - } - + let now = Instant::now(); // Apply delta. for node_delta in delta.node_deltas { - let NodeDelta { - chitchat_id, - last_gc_version, - key_values, - } = node_delta; - let node_state = self.node_state_mut(&chitchat_id); - if node_state.last_gc_version == 0u64 { - // You may have expected `node_state.last_gc_version = max(last_gc_version, - // node_state.last_gc_version)`. This is correct too of course, but - // slightly too restrictive. - // - // `last_gc_version` expresses the idea: what is the oldest version from which I can - // confidently emit delta from. The reason why we update it here, is - // because a node that was just reset or just joined the cluster will get updates - // from another node that are actually only make sense in the context of the - // emission of delta from a `last_gc_version`. - // - // We want to avoid the case where: - // - Cluster with Node A, B, C - // - Node A, inserts Key K that gets replicated. - // - Network partition as {A} {B,C} - // - A deletes Key K and GCs it. - // - Network restoration - // - B gossips with A and get reset - // - C gossips with B and does NOT get reset. - node_state.last_gc_version = last_gc_version; - } - for (key, versioned_value) in key_values { - node_state.max_version = node_state.max_version.max(versioned_value.version); - node_state.set_versioned_value(key, versioned_value); - } + let node_state = self.node_state_mut(&node_delta.chitchat_id); + node_state.apply_delta(node_delta, now); } } @@ -421,70 +511,64 @@ impl ClusterState { scheduled_for_deletion: &HashSet<&ChitchatId>, ) -> Delta { let mut stale_nodes = SortedStaleNodes::default(); - let mut nodes_to_reset = Vec::new(); for (chitchat_id, node_state) in &self.node_states { if scheduled_for_deletion.contains(chitchat_id) { continue; } - let Some(node_digest) = digest.node_digests.get(chitchat_id) else { - stale_nodes.insert(chitchat_id, node_state); + + let digest_max_version: Version = digest + .node_digests + .get(chitchat_id) + .map(|node_digest| node_digest.max_version) + .unwrap_or(0u64); + + if node_state.max_version <= digest_max_version { + // Our version is actually older than the version of the digest. + // We have no update to offer. continue; - }; - // TODO: We have problem here. If after the delta we end up with a max version that is - // not high enough to bring us to `last_gc_version`, we might get reset again - // and again. - let should_reset = - node_state.last_gc_version > node_digest.max_version && node_digest.max_version > 0; - if should_reset { + } + + // We have garbage collected some tombstones that the other node does not know about + // yet. A reset is needed. + let should_reset = digest_max_version < node_state.last_gc_version; + let from_version_excluded = if should_reset { warn!( "Node to reset {chitchat_id:?} last gc version: {} max version: {}", - node_state.last_gc_version, node_digest.max_version + node_state.last_gc_version, digest_max_version ); - nodes_to_reset.push(chitchat_id); - stale_nodes.insert(chitchat_id, node_state); - continue; - } - stale_nodes.offer(chitchat_id, node_state, node_digest); - } - let mut delta_serializer = DeltaSerializer::with_mtu(mtu); + 0u64 + } else { + digest_max_version + }; - for chitchat_id in &nodes_to_reset { - if !delta_serializer.try_add_node_to_reset((*chitchat_id).clone()) { - break; - } + stale_nodes.offer(chitchat_id, node_state, from_version_excluded); } + let mut delta_serializer = DeltaSerializer::with_mtu(mtu); for stale_node in stale_nodes.into_iter() { if !delta_serializer.try_add_node( stale_node.chitchat_id.clone(), stale_node.node_state.last_gc_version, + stale_node.from_version_excluded, ) { break; - } + }; + let mut added_something = false; for (key, versioned_value) in stale_node.stale_key_values() { - added_something = true; if !delta_serializer.try_add_kv(key, versioned_value.clone()) { return delta_serializer.finish(); } + added_something = true; } - if !added_something && nodes_to_reset.contains(&stale_node.chitchat_id) { - // send a sentinel element to update the max_version. Otherwise the node's vision - // of max_version will be 0, and it may accept writes that are supposed to be - // stale, but it can tell they are. - if !delta_serializer.try_add_kv( - "__reset_sentinel", - VersionedValue { - value: String::new(), - version: stale_node.node_state.max_version, - tombstone: Some(Instant::now()), - }, - ) { - return delta_serializer.finish(); - } + // There aren't any key-values in the state_node apparently. + // Let's add a specific instruction to the delta to set the max version. + if !added_something { + delta_serializer.try_set_max_version(stale_node.node_state.max_version); } } + delta_serializer.finish() } } @@ -547,15 +631,15 @@ struct SortedStaleNodes<'a> { /// If no KV is stale, there is nothing to gossip, and we simply return `None`: /// the node is not a candidate for gossip. fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option { + if node_state.max_version() <= floor_version { + return None; + } let is_unknown = floor_version == 0u64; let num_stale_key_values = if is_unknown { node_state.num_key_values() } else { node_state.stale_key_values(floor_version).count() }; - if !is_unknown && num_stale_key_values == 0 { - return None; - } Some(Staleness { is_unknown, max_version: node_state.max_version, @@ -564,41 +648,23 @@ fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option SortedStaleNodes<'a> { - /// Adds a node to the list of stale nodes. - fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) { - let Some(staleness) = staleness_score(node_state, 0u64) else { - // The node does not have any stale key values. - return; - }; - - let floor_version = 0; - let stale_node = StaleNode { - chitchat_id, - node_state, - floor_version, - }; - self.stale_nodes - .entry(staleness) - .or_default() - .push(stale_node); - } - - /// Evaluates whether the node should be added to the list of stale nodes. + /// Adds a to the list of stale nodes. + /// If the node is not stale (meaning we have no fresher Key Values to share), then this + /// function simply returns. fn offer( &mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState, - node_digest: &NodeDigest, + from_version_excluded: u64, ) { - let floor_version = node_digest.max_version; - let Some(staleness) = staleness_score(node_state, floor_version) else { + let Some(staleness) = staleness_score(node_state, from_version_excluded) else { // The node does not have any stale KV. return; }; let stale_node = StaleNode { chitchat_id, node_state, - floor_version, + from_version_excluded, }; self.stale_nodes .entry(staleness) @@ -626,14 +692,14 @@ impl<'a> SortedStaleNodes<'a> { struct StaleNode<'a> { chitchat_id: &'a ChitchatId, node_state: &'a NodeState, - floor_version: u64, + from_version_excluded: u64, } impl<'a> StaleNode<'a> { /// Iterates over the stale key-value pairs in decreasing order of staleness. fn stale_key_values(&self) -> impl Iterator { self.node_state - .stale_key_values(self.floor_version) + .stale_key_values(self.from_version_excluded) .sorted_unstable_by_key(|(_, versioned_value)| versioned_value.version) } } @@ -684,6 +750,7 @@ fn random_generator() -> impl Rng { mod tests { use super::*; use crate::serialize::Serializable; + use crate::types::KeyValueMutation; use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE; #[test] @@ -694,7 +761,7 @@ mod tests { let stale_node = StaleNode { chitchat_id: &node, node_state: &node_state, - floor_version: 0, + from_version_excluded: 0u64, }; assert!(stale_node.stale_key_values().next().is_none()); } @@ -714,7 +781,7 @@ mod tests { let stale_node = StaleNode { chitchat_id: &node, node_state: &node_state, - floor_version: 1, + from_version_excluded: 1u64, }; assert_eq!( stale_node.stale_key_values().collect::>(), @@ -742,16 +809,18 @@ mod tests { // No stale KV. We still insert the node! // That way it will get a node state, and be a candidate for gossip later. - let node_state1 = NodeState::for_test(); - stale_nodes.insert(&node1, &node_state1); + let mut node_state1 = NodeState::for_test(); + node_state1.set_max_version(2); + + stale_nodes.offer(&node1, &node_state1, 0u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); let mut node2_state = NodeState::for_test(); node2_state.set_with_version("key_a", "value_a", 1); - stale_nodes.insert(&node2, &node2_state); + stale_nodes.offer(&node2, &node2_state, 0u64); let expected_staleness = Staleness { is_unknown: true, - max_version: 0, + max_version: 1, num_stale_key_values: 0, }; assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1); @@ -760,7 +829,7 @@ mod tests { node3_state.set_with_version("key_b", "value_b", 2); node3_state.set_with_version("key_c", "value_c", 3); - stale_nodes.insert(&node3, &node3_state); + stale_nodes.offer(&node3, &node3_state, 0u64); let expected_staleness = Staleness { is_unknown: true, max_version: 3, @@ -775,31 +844,23 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); let node1_state = NodeState::for_test(); - stale_nodes.offer(&node1, &node1_state, &NodeDigest::new(Heartbeat(0), 1)); + stale_nodes.offer(&node1, &node1_state, 1u64); // No stale records. This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); let node2 = ChitchatId::for_local_test(10_002); let mut node2_state = NodeState::for_test(); - node2_state - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - stale_nodes.offer(&node2, &node2_state, &NodeDigest::new(Heartbeat(0), 1)); + node2_state.set_with_version("key_a", "value_a", 1); + stale_nodes.offer(&node2, &node2_state, 1u64); // No stale records (due to the floor version). This is not a candidate for gossip. assert!(stale_nodes.stale_nodes.is_empty()); let node3 = ChitchatId::for_local_test(10_002); let mut node3_state = NodeState::for_test(); - node3_state - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node3_state - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node3_state - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node3, &node3_state, &NodeDigest::new(Heartbeat(0), 1)); + node3_state.set_with_version("key_a", "value_a", 1); + node3_state.set_with_version("key_b", "value_b", 2); + node3_state.set_with_version("key_c", "value_c", 3); + stale_nodes.offer(&node3, &node3_state, 1u64); assert_eq!(stale_nodes.stale_nodes.len(), 1); let expected_staleness = Staleness { is_unknown: false, @@ -814,74 +875,46 @@ mod tests { let mut stale_nodes = SortedStaleNodes::default(); let node1 = ChitchatId::for_local_test(10_001); let mut node_state1 = NodeState::for_test(); - node_state1 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node_state1 - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state1 - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node1, &node_state1, &NodeDigest::new(Heartbeat(0), 1)); + node_state1.set_with_version("key_a", "value_a", 1); + node_state1.set_with_version("key_b", "value_b", 2); + node_state1.set_with_version("key_c", "value_c", 3); + stale_nodes.offer(&node1, &node_state1, 1u64); // 2 stale values. let node2 = ChitchatId::for_local_test(10_002); let mut node_state2 = NodeState::for_test(); - node_state2 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node_state2 - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state2 - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 5)); - stale_nodes.offer(&node2, &node_state2, &NodeDigest::new(Heartbeat(0), 2)); + node_state2.set_with_version("key_a", "value", 1); + node_state2.set_with_version("key_b", "value_b", 2); + node_state2.set_with_version("key_c", "value_c", 5); + stale_nodes.offer(&node2, &node_state2, 2u64); // 1 stale value. let node3 = ChitchatId::for_local_test(10_003); let mut node_state3 = NodeState::for_test(); - node_state3 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node_state3 - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state3 - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); - stale_nodes.offer(&node3, &node_state3, &NodeDigest::new(Heartbeat(0), 7)); + node_state3.set_with_version("key_a", "value_a", 1); + node_state3.set_with_version("key_b", "value_b", 2); + node_state3.set_with_version("key_c", "value_c", 3); + stale_nodes.offer(&node3, &node_state3, 7u64); // 0 stale values. let node4 = ChitchatId::for_local_test(10_004); let mut node_state4 = NodeState::for_test(); - node_state4 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - node_state4 - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node_state4 - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 5)); - node_state4 - .key_values - .insert("key_d".to_string(), VersionedValue::for_test("value_d", 7)); - stale_nodes.offer(&node4, &node_state4, &NodeDigest::new(Heartbeat(0), 1)); + node_state4.set_with_version("key_a", "value_a", 1); + node_state4.set_with_version("key_b", "value_b", 2); + node_state4.set_with_version("key_c", "value_c", 5); + node_state4.set_with_version("key_d", "value_d", 7); + stale_nodes.offer(&node4, &node_state4, 1); // 3 stale values let node5 = ChitchatId::for_local_test(10_005); let node_state5 = NodeState::for_test(); - stale_nodes.insert(&node5, &node_state5); + stale_nodes.offer(&node5, &node_state5, 0); // 0 stale values let node6 = ChitchatId::for_local_test(10_006); let mut node_state6 = NodeState::for_test(); - node_state6 - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); - stale_nodes.insert(&node6, &node_state6); + node_state6.set_with_version("key_a", "value_a", 1); + stale_nodes.offer(&node6, &node_state6, 0u64); // 1 stale values assert_eq!( @@ -889,7 +922,7 @@ mod tests { .into_iter() .map(|stale_node| stale_node.chitchat_id.gossip_advertise_addr.port()) .collect::>(), - vec![10_005, 10_006, 10_004, 10_001, 10_002] + vec![10_006, 10_004, 10_001, 10_002] ); } @@ -1083,13 +1116,12 @@ mod tests { node2_state.set_with_version("key_c".to_string(), "3".to_string(), 1); // 1 let mut delta = Delta::default(); - delta.add_node(node1.clone(), 0u64); + delta.add_node(node1.clone(), 0u64, 0u64); delta.add_kv(&node1, "key_a", "4", 4, false); delta.add_kv(&node1, "key_b", "2", 2, false); - // Reset node 2. - delta.add_node_to_reset(node2.clone()); - delta.add_node(node2.clone(), 0u64); + // We reset node 2 + delta.add_node(node2.clone(), 3, 0); delta.add_kv(&node2, "key_d", "4", 4, false); cluster_state.apply_delta(delta); @@ -1150,7 +1182,7 @@ mod tests { for (num_entries, &mtu) in mtu_per_num_entries.iter().enumerate() { let mut expected_delta = Delta::default(); for &(node, key, val, version, tombstone) in &expected_delta_atoms[..num_entries] { - expected_delta.add_node(node.clone(), 0u64); + expected_delta.add_node(node.clone(), 0u64, 0u64); expected_delta.add_kv(node, key, val, version, tombstone); } { @@ -1296,13 +1328,12 @@ mod tests { MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), ); - assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); - expected_delta.add_node(node2.clone(), 0u64); + expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node(node1.clone(), 0u64); + expected_delta.add_node(node1.clone(), 0u64, 1u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.set_serialized_len(73); + expected_delta.set_serialized_len(76); assert_eq!(delta, expected_delta); } @@ -1321,14 +1352,13 @@ mod tests { MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), ); - assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); - expected_delta.add_node(node2.clone(), 0u64); + expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node(node1.clone(), 0u64); + expected_delta.add_node(node1.clone(), 0u64, 1u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); expected_delta.add_kv(&node1, "key_a", "", 3, true); - expected_delta.set_serialized_len(87); + expected_delta.set_serialized_len(90); assert_eq!(delta, expected_delta); } @@ -1337,7 +1367,7 @@ mod tests { tokio::time::advance(DELETE_GRACE_PERIOD).await; cluster_state .node_state_mut(&node1) - .gc_keys_marked_for_deletion(Duration::from_secs(10)); + .gc_keys_marked_for_deletion(DELETE_GRACE_PERIOD); { let mut digest = Digest::default(); @@ -1348,12 +1378,12 @@ mod tests { &HashSet::new(), ); let mut expected_delta = Delta::default(); - expected_delta.add_node(node2.clone(), 0u64); + expected_delta.add_node(node2.clone(), 0u64, 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.add_node_to_reset(node1.clone()); - expected_delta.add_node(node1.clone(), 3u64); + // Last gc set to 3 and from version to 0. That's a reset right there. + expected_delta.add_node(node1.clone(), 3u64, 0u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.set_serialized_len(83); + expected_delta.set_serialized_len(75); assert_eq!(&delta, &expected_delta); } } @@ -1375,4 +1405,173 @@ mod tests { .collect(); assert_eq!(node_states, &["Europe:", "Europe:Italy"]); } + + #[test] + fn test_node_apply_delta_simple() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 1); + node_state.set_with_version("key_b", "val_a", 2); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 2, + last_gc_version: 0u64, + max_version: None, + key_values: vec![ + KeyValueMutation { + key: "key_c".to_string(), + value: "val_c".to_string(), + version: 4, + tombstone: false, + }, + KeyValueMutation { + key: "key_b".to_string(), + value: "val_b2".to_string(), + version: 3, + tombstone: false, + }, + ], + }; + node_state.apply_delta(node_delta, Instant::now()); + assert_eq!(node_state.num_key_values(), 3); + assert_eq!(node_state.max_version(), 4); + assert_eq!(node_state.last_gc_version, 0); + assert_eq!(node_state.get("key_a").unwrap(), "val_a"); + assert_eq!(node_state.get("key_b").unwrap(), "val_b2"); + assert_eq!(node_state.get("key_c").unwrap(), "val_c"); + } + + // Here we check that the accessor that dismiss resetting a Kv to the same value is not + // used in apply delta. Resetting to the same value is very possible in reality several updates + // happened in a row but were shadowed by the scuttlebutt logic. We DO need to update the + // version. + #[test] + fn test_node_apply_same_value_different_version() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 1); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 1, + last_gc_version: 0, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "val_a".to_string(), + version: 3, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 3); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "val_a"); + } + + #[test] + fn test_node_skip_delta_from_the_future() { + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 5); + assert_eq!(node_state.max_version(), 5); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 6, // we skipped version 6 here. + last_gc_version: 0, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "new_val".to_string(), + version: 7, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 5); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "val_a"); + } + + #[tokio::test] + async fn test_node_apply_delta_different_last_gc_is_ok_if_below_max_version() { + tokio::time::pause(); + const GC_PERIOD: Duration = Duration::from_secs(10); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 17); + node_state.mark_for_deletion("key_a"); + tokio::time::advance(GC_PERIOD).await; + node_state.gc_keys_marked_for_deletion(GC_PERIOD); + assert_eq!(node_state.last_gc_version, 18); + assert_eq!(node_state.max_version(), 18); + node_state.set_with_version("key_a", "val_a", 31); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 31, // we skipped version 6 here. + last_gc_version: 30, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_a".to_string(), + value: "new_val".to_string(), + version: 32, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + let versioned_a = node_state.get_versioned("key_a").unwrap(); + assert_eq!(versioned_a.version, 32); + assert_eq!(node_state.max_version(), 32); + assert!(versioned_a.tombstone.is_none()); + assert_eq!(&versioned_a.value, "new_val"); + } + + #[tokio::test] + async fn test_node_apply_delta_on_reset_fresher_version() { + tokio::time::pause(); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 17); + assert_eq!(node_state.max_version(), 17); + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 0, // we skipped version 6 here. + last_gc_version: 30, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_b".to_string(), + value: "val_b".to_string(), + version: 32, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + assert!(node_state.get_versioned("key_a").is_none()); + let versioned_b = node_state.get_versioned("key_b").unwrap(); + assert_eq!(versioned_b.version, 32); + } + + #[tokio::test] + async fn test_node_apply_delta_no_reset_if_older_version() { + tokio::time::pause(); + let mut node_state = NodeState::for_test(); + node_state.set_with_version("key_a", "val_a", 31); + node_state.set_with_version("key_b", "val_b2", 32); + assert_eq!(node_state.max_version(), 32); + // This does look like a valid reset, but we are already at version 32. + // Let's ignore this. + let node_delta = NodeDelta { + chitchat_id: node_state.chitchat_id.clone(), + from_version_excluded: 0, // we skipped version 6 here. + last_gc_version: 17, + max_version: None, + key_values: vec![KeyValueMutation { + key: "key_b".to_string(), + value: "val_b".to_string(), + version: 30, + tombstone: false, + }], + }; + node_state.apply_delta(node_delta, Instant::now()); + assert_eq!(node_state.max_version, 32); + let versioned_b = node_state.get_versioned("key_b").unwrap(); + assert_eq!(versioned_b.version, 32); + assert_eq!(versioned_b.value, "val_b2"); + } } diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index fc6a2a3..27f8664 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -4,6 +4,9 @@ use std::net::SocketAddr; use serde::{Deserialize, Serialize}; use tokio::time::Instant; +use crate::serialize::Deserializable; +use crate::Serializable; + /// For the lifetime of a cluster, nodes can go down and come back up multiple times. They may also /// die permanently. A [`ChitchatId`] is composed of three components: /// - `node_id`: an identifier unique across the cluster. @@ -30,7 +33,9 @@ impl Debug for ChitchatId { write!( f, "{}:{}:{}", - &self.node_id, self.generation_id, self.gossip_advertise_addr + self.node_id.as_str(), + self.generation_id, + self.gossip_advertise_addr ) } } @@ -107,6 +112,64 @@ impl PartialEq for VersionedValue { } } +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)] +pub(crate) struct KeyValueMutation { + pub(crate) key: String, + pub(crate) value: String, + pub(crate) version: Version, + pub(crate) tombstone: bool, +} + +impl<'a> From<&'a KeyValueMutation> for KeyValueMutationRef<'a> { + fn from(mutation: &'a KeyValueMutation) -> KeyValueMutationRef<'a> { + KeyValueMutationRef { + key: mutation.key.as_str(), + value: mutation.value.as_str(), + version: mutation.version, + tombstone: mutation.tombstone, + } + } +} + +#[derive(Debug, Eq, PartialEq, Clone, Serialize)] +pub(crate) struct KeyValueMutationRef<'a> { + pub(crate) key: &'a str, + pub(crate) value: &'a str, + pub(crate) version: Version, + pub(crate) tombstone: bool, +} + +impl<'a> Serializable for KeyValueMutationRef<'a> { + fn serialize(&self, buf: &mut Vec) { + Serializable::serialize(self.key, buf); + Serializable::serialize(self.value, buf); + Serializable::serialize(&self.version, buf); + Serializable::serialize(&self.tombstone, buf); + } + + fn serialized_len(&self) -> usize { + Serializable::serialized_len(self.key) + + Serializable::serialized_len(self.value) + + Serializable::serialized_len(&self.version) + + Serializable::serialized_len(&self.tombstone) + } +} + +impl Deserializable for KeyValueMutation { + fn deserialize(buf: &mut &[u8]) -> anyhow::Result { + let key: String = Deserializable::deserialize(buf)?; + let value: String = Deserializable::deserialize(buf)?; + let version: u64 = Deserializable::deserialize(buf)?; + let tombstone: bool = Deserializable::deserialize(buf)?; + Ok(KeyValueMutation { + key, + value, + version, + tombstone, + }) + } +} + #[derive(Serialize, Deserialize)] struct VersionedValueForSerialization { pub value: String, diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index d6c6533..919dd00 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -83,7 +83,7 @@ impl Simulator { pub async fn execute(&mut self, operations: Vec) { for operation in operations { - debug!("Execute operation {operation:?}"); + info!("Execute operation {operation:?}"); match operation { Operation::AddNode { chitchat_id, @@ -405,6 +405,7 @@ async fn test_marked_for_deletion_gc_with_network_partition_2_nodes() { } #[tokio::test] async fn test_marked_for_deletion_gc_with_network_partition_4_nodes() { + let _ = tracing_subscriber::fmt::try_init(); const TIMEOUT: Duration = Duration::from_millis(500); let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); let chitchat_id_1 = test_chitchat_id(1); From 9f5f049f86327dbc350ce329016377525baa77ae Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 5 Mar 2024 11:26:04 +0900 Subject: [PATCH 2/8] Removing info log --- chitchat/src/state.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index ca5383b..78bb69c 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -207,7 +207,6 @@ impl NodeState { } fn apply_delta(&mut self, node_delta: NodeDelta, now: Instant) { - info!(delta=?node_delta); if !self.prepare_apply_delta(&node_delta) { return; } From 094d98667a0f60bdb9cccc1632b6cde110672ddd Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Mar 2024 11:20:57 +0900 Subject: [PATCH 3/8] CR comment --- chitchat/src/delta.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index f8e8021..65cfbee 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -84,6 +84,7 @@ impl TryFrom for DeltaOpTag { match tag_byte { 0u8 => Ok(DeltaOpTag::Node), 1u8 => Ok(DeltaOpTag::KeyValue), + 2u8 => Ok(DeltaOpTag::SetMaxVersion), _ => { anyhow::bail!("Unknown tag: {tag_byte}") } @@ -297,10 +298,10 @@ impl Delta { #[derive(Debug, Eq, PartialEq, serde::Serialize)] pub(crate) struct NodeDelta { pub chitchat_id: ChitchatId, - // `from_version` and `last_gc_version` are here to express on which states + // `from_version_excluded` and `last_gc_version` are here to express on which states // this delta can be applied to. // - // `last_gc_version` expresses that this delta has be computed from a state where no + // `last_gc_version` expresses that this delta was computed from a state where no // keys > `last_gc_version` have been removed. // // `from_version` expresses that from this state, ALL of the records in @@ -732,6 +733,6 @@ mod tests { num_valid_tags += 1; } } - assert_eq!(num_valid_tags, 2); + assert_eq!(num_valid_tags, 3); } } From f0065cd620574c54795a74f14fa7ec0ee919ad01 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Mar 2024 13:33:43 +0900 Subject: [PATCH 4/8] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Raphaël Marinier --- chitchat/src/state.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 78bb69c..a638ac2 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -42,10 +42,10 @@ pub struct NodeState { // A proper interpretation of `max_version` and `last_gc_version` is the following: // The state contains exactly: // - all of the (non-deleted) key values present at snapshot `max_version`. - // - all of the tombstones of the entry that were deleted between (`last_gc_version`, + // - all of the tombstones of the entry that were marked for deletion between (`last_gc_version`, // `max_version]`. // - // It does not contain any trace of the tombstones of the entries that were deleted before + // It does not contain any trace of the tombstones of the entries that were marked for deletion before // `<= last_gc_version`. } @@ -180,7 +180,7 @@ impl NodeState { }; if delta_max_version <= self.max_version() { - // There is not point apply in this delta as it is not bringing us to a newer state. + // There is not point applying this delta as it is not bringing us to a newer state. warn!( node=?node_delta.chitchat_id, from_version=node_delta.from_version_excluded, From 2a6ad9787167a0f5ac238d520af5843c099491d4 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Mar 2024 13:43:15 +0900 Subject: [PATCH 5/8] CR comment --- chitchat/src/state.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index a638ac2..c1bdc63 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -47,6 +47,11 @@ pub struct NodeState { // // It does not contain any trace of the tombstones of the entries that were marked for deletion before // `<= last_gc_version`. + // + // Disclaimer: We do not necessarily have max_version >= last_gc_version. + // After a reset, a node will have its `last_gc_version` set to the version of the node + // it is getting its KV from, and it will receive a possible partial set of KVs from that node. + // As a result it is possible for node to have `last_gc_version` > `max_version`. } impl Debug for NodeState { @@ -149,8 +154,8 @@ impl NodeState { return true; } - // This delta might be missing tombstones with version within - // (`node_state.max_version`..`last_gc_version`]. + // This delta might be missing tombstones with a version within + // (`node_state.max_version`..`node_delta.last_gc_version`]. // // It is ok if we don't have the associated values to begin // with. From 65dfd8c8c396bd101e874b38c6b70fc5ff1fdd28 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Mar 2024 16:07:04 +0900 Subject: [PATCH 6/8] CR comments --- ALGORITHM.md | 66 +++++++++++++++++++++++++++++++++++++++++ chitchat/src/delta.rs | 18 +++++++++++ chitchat/src/digest.rs | 23 +++++++++++--- chitchat/src/lib.rs | 40 +++++++++++++++++++++++++ chitchat/src/message.rs | 8 ++--- chitchat/src/state.rs | 49 +++++++++++++++++------------- 6 files changed, 176 insertions(+), 28 deletions(-) create mode 100644 ALGORITHM.md diff --git a/ALGORITHM.md b/ALGORITHM.md new file mode 100644 index 0000000..409ee01 --- /dev/null +++ b/ALGORITHM.md @@ -0,0 +1,66 @@ +Chitchat relies on an extension of the Scuttlebutt reconciliation algorithm, to handle: +- Key-Value deletion +- Member leaving the cluster. + +For simplification, we will just discuss the replication of a single map on a cluster of n node, one which only one node can write. +(In scuttlebutt, each node has ownership over such a map.) + +There are two kinds of update event on this map: +- Set(K, V) +- Deletion(K) + +The writer associates an incremental version to all of the updates. + +Let's assume the writer node has done V updates and its map has reached a final +state $\mathcal{S}_f$. +We can use the notation $ ({u_{0}}, ..., {u_i}, ..., {u}_{v=V}) $ to describe the sequence of updates, ordered by version. + +At any point in time, each node in the cluster has a view of the state map $\mathcal{S}$. This view is not necessarily up-to-date. + +Every second or so, a node $N$ will contact a peers $N'$, requesting for updates. The peer will then send back a subset of updates $(u_i, ..., u_j)$. + +By design the algorithm attempts to work with a limited budget. The update is truncated to fit a maximum size called the `mtu`. In `chitchat`, the protocol used is UDP, and the default mtu is of around `65KB` + +We call such an update a `delta`. + +Upon reception of the updates, the node $N$ may apply this update to update its state. +We can write this operation. + +$$\mathcal{S}' = \mathcal{S}~ \top \left(u_i, ..., u_j\right)$$ + +In order to inform the peer about where the updates should start from, the +node will send two parameters describing how much of the state is replicated: + +- `max_version`: One big difference with reliable broadcast, is that this `max version` does NOT mean that the node's state reflects all of the events earlier than +`max_version`. Rather, it says that if the node were to receive all of the events within between `max_version` and $V$ then it will end up to date. +The difference is subtle, but comes from an important optimization of scuttlebutt. +If a node ask for an update above $max_version = m$, and that for a given key $k$ there are two updates after $m$, there is no need to send the first one: it will eventually be overridden by the second one. + +- `last_gc_version`: Rather than being deleted right away, the algorithm is using a tombstone mechanism. Key entries are kept by marked as deleted. From the client point of view, everything looks like the key really has been deleted. Eventually, in order +to free memory, we garbage collect all of the key values marked for deletion that are older than a few hours. We then keep track of the maximum `last_gc_version`. + +At each round of the protocol, a node makes sure to its pair $(last\_gc\_version, max\_version)$ in the lexicographical sense. + +We also defined the subsequence of missing updates $(u')$ extracted from $(u_i)$ by keeping only the update matching the predicate: + +- $i > max\_version$ if $u_i$ is a set operation + +- $i > max(max\_version, last\_gc\_version)$ if $u_i$ is a delete operation. + +The key of the algorithm is to maintain the following invariant. +$$ \mathcal{S}~\top (u') = \mathcal{S}$$ + +In plain words, the protocol attempts to make sure that, if any nodes were to apply +- all insert updates above its `max_version` +- all delete updates above its `max(max_version, last_gc_version)` +it would have managed to fully replicate $S_f$. + +Knowing the `max_version` and the `last_gc_version` of a peer, a node will emit a delta that, if applied to such valid state, would maintain the invariant above and +increase $(last\_gc\_version, max\_version)$ lexicographically. + +Due to the nature of UDP and to the concurrent handshake, deltas may not arrive in order or be duplicated. For this reason, upon reception of a delta, a node must ensure that the delta can be applied to the current state, without breaking the invariant, +and increasing $(last\_gc\_version, max\_version)$. + + + + diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 65cfbee..e95bc17 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -415,6 +415,7 @@ impl DeltaSerializer { } } + #[must_use] pub fn try_set_max_version(&mut self, max_version: Version) -> bool { let key_value_op = DeltaOp::SetMaxVersion { max_version }; self.try_add_op(key_value_op) @@ -477,6 +478,23 @@ mod tests { test_serdeser_aux(&Delta::default(), 1); } + #[test] + fn test_delta_serialization_with_set_max_version() { + // 4 bytes + let mut delta_writer = DeltaSerializer::with_mtu(198); + + // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). + let node1 = ChitchatId::for_local_test(10_001); + + // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node). + assert!(delta_writer.try_add_node(node1, 80u64, 50u64)); + + // +9 bytes: +1 bytes + 8 bytes (version) + assert!(delta_writer.try_set_max_version(100)); + + test_aux_delta_writer(delta_writer, 1 + 56); + } + #[test] fn test_delta_serialization_simple_foo() { // 4 bytes diff --git a/chitchat/src/digest.rs b/chitchat/src/digest.rs index 6cb50e9..9bb6f7a 100644 --- a/chitchat/src/digest.rs +++ b/chitchat/src/digest.rs @@ -6,13 +6,19 @@ use crate::{ChitchatId, Heartbeat, Version}; #[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] pub(crate) struct NodeDigest { pub(crate) heartbeat: Heartbeat, + pub(crate) last_gc_version: Version, pub(crate) max_version: Version, } impl NodeDigest { - pub(crate) fn new(heartbeat: Heartbeat, max_version: Version) -> Self { + pub(crate) fn new( + heartbeat: Heartbeat, + last_gc_version: Version, + max_version: Version, + ) -> Self { Self { heartbeat, + last_gc_version, max_version, } } @@ -30,8 +36,14 @@ pub struct Digest { #[cfg(test)] impl Digest { - pub fn add_node(&mut self, node: ChitchatId, heartbeat: Heartbeat, max_version: Version) { - let node_digest = NodeDigest::new(heartbeat, max_version); + pub fn add_node( + &mut self, + node: ChitchatId, + heartbeat: Heartbeat, + last_gc_version: Version, + max_version: Version, + ) { + let node_digest = NodeDigest::new(heartbeat, last_gc_version, max_version); self.node_digests.insert(node, node_digest); } } @@ -42,6 +54,7 @@ impl Serializable for Digest { for (chitchat_id, node_digest) in &self.node_digests { chitchat_id.serialize(buf); node_digest.heartbeat.serialize(buf); + node_digest.last_gc_version.serialize(buf); node_digest.max_version.serialize(buf); } } @@ -50,6 +63,7 @@ impl Serializable for Digest { for (chitchat_id, node_digest) in &self.node_digests { len += chitchat_id.serialized_len(); len += node_digest.heartbeat.serialized_len(); + len += node_digest.last_gc_version.serialized_len(); len += node_digest.max_version.serialized_len(); } len @@ -65,7 +79,8 @@ impl Deserializable for Digest { let chitchat_id = ChitchatId::deserialize(buf)?; let heartbeat = Heartbeat::deserialize(buf)?; let max_version = u64::deserialize(buf)?; - let node_digest = NodeDigest::new(heartbeat, max_version); + let last_gc_version = u64::deserialize(buf)?; + let node_digest = NodeDigest::new(heartbeat, last_gc_version, max_version); node_digests.insert(chitchat_id, node_digest); } Ok(Digest { node_digests }) diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 7f93a5c..77af8d1 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -424,6 +424,7 @@ mod tests { /// lhs and rhs. /// /// This does NOT check for deleted KVs. + #[track_caller] fn assert_cluster_state_eq(lhs: &NodeState, rhs: &NodeState) { assert_eq!(lhs.num_key_values(), rhs.num_key_values()); for (key, value) in lhs.key_values() { @@ -431,6 +432,7 @@ mod tests { } } + #[track_caller] fn assert_nodes_sync(nodes: &[&Chitchat]) { let first_node_states = &nodes[0].cluster_state.node_states; for other_node in nodes.iter().skip(1) { @@ -560,6 +562,44 @@ mod tests { assert_nodes_sync(&[&node1, &node2]); } + #[tokio::test] + async fn test_chitchat_no_need_to_reset_if_last_gc_version_is_higher() { + // This test checks what happens if a node is trailing behind too much, + // needs a reset, and a single delta would: + // - not increase its max version after reset. + // - not even bring the state to a max_version >= last_gc_version + let _ = tracing_subscriber::fmt::try_init(); + tokio::time::pause(); + let node_config1 = ChitchatConfig::for_test(10_001); + let empty_seeds = watch::channel(Default::default()).1; + let mut node1 = + Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone(), vec![]); + let node_config2 = ChitchatConfig::for_test(10_002); + let mut node2 = Chitchat::with_chitchat_id_and_seeds(node_config2, empty_seeds, vec![]); + // Because of compression, we need a lot of keys to reach the MTU. + for i in 0..20_000 { + let key = format!("k{}", i); + node1.self_node_state().set(&key, "first_value"); + } + for _ in 0..2 { + run_chitchat_handshake(&mut node1, &mut node2); + } + + assert_nodes_sync(&[&node1, &node2]); + + node1.self_node_state().mark_for_deletion("k1"); + + // Advance time before triggering the GC of that deleted key + tokio::time::advance(Duration::from_secs(3_600 * 3)).await; + node1.gc_keys_marked_for_deletion(); + + for _ in 0..2 { + run_chitchat_handshake(&mut node1, &mut node2); + } + + assert_nodes_sync(&[&node1, &node2]); + } + #[tokio::test] async fn test_live_node_channel() { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); diff --git a/chitchat/src/message.rs b/chitchat/src/message.rs index b0b74a6..714d4fb 100644 --- a/chitchat/src/message.rs +++ b/chitchat/src/message.rs @@ -132,13 +132,13 @@ mod tests { { let mut digest = Digest::default(); let node = ChitchatId::for_local_test(10_001); - digest.add_node(node, Heartbeat(0), 0); + digest.add_node(node, Heartbeat(0), 0, 0); let syn = ChitchatMessage::Syn { cluster_id: "cluster-a".to_string(), digest, }; - test_serdeser_aux(&syn, 57); + test_serdeser_aux(&syn, 65); } } @@ -157,7 +157,7 @@ mod tests { let mut digest = Digest::default(); let node = ChitchatId::for_local_test(10_001); // +43 bytes = 27 bytes (ChitchatId) + 8 (hearbeat) + 8 (max_version). - digest.add_node(node, Heartbeat(0), 0); + digest.add_node(node, Heartbeat(0), 0, 0); // 4 bytes let mut delta = Delta::default(); @@ -174,7 +174,7 @@ mod tests { let syn_ack = ChitchatMessage::SynAck { digest, delta }; // 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta). - test_serdeser_aux(&syn_ack, 1 + 60 + 45); + test_serdeser_aux(&syn_ack, 1 + 53 + 60); } } diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index c1bdc63..071b39d 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -42,11 +42,11 @@ pub struct NodeState { // A proper interpretation of `max_version` and `last_gc_version` is the following: // The state contains exactly: // - all of the (non-deleted) key values present at snapshot `max_version`. - // - all of the tombstones of the entry that were marked for deletion between (`last_gc_version`, - // `max_version]`. + // - all of the tombstones of the entry that were marked for deletion between + // (`last_gc_version`, `max_version]`. // - // It does not contain any trace of the tombstones of the entries that were marked for deletion before - // `<= last_gc_version`. + // It does not contain any trace of the tombstones of the entries that were marked for deletion + // before `<= last_gc_version`. // // Disclaimer: We do not necessarily have max_version >= last_gc_version. // After a reset, a node will have its `last_gc_version` set to the version of the node @@ -184,7 +184,9 @@ impl NodeState { return false; }; - if delta_max_version <= self.max_version() { + if (node_delta.last_gc_version, delta_max_version) + <= (self.last_gc_version, self.max_version()) + { // There is not point applying this delta as it is not bringing us to a newer state. warn!( node=?node_delta.chitchat_id, @@ -324,6 +326,7 @@ impl NodeState { fn digest(&self) -> NodeDigest { NodeDigest { heartbeat: self.heartbeat, + last_gc_version: self.last_gc_version, max_version: self.max_version, } } @@ -521,11 +524,11 @@ impl ClusterState { continue; } - let digest_max_version: Version = digest + let (digest_last_gc_version, digest_max_version) = digest .node_digests .get(chitchat_id) - .map(|node_digest| node_digest.max_version) - .unwrap_or(0u64); + .map(|node_digest| (node_digest.last_gc_version, node_digest.max_version)) + .unwrap_or((0u64, 0u64)); if node_state.max_version <= digest_max_version { // Our version is actually older than the version of the digest. @@ -535,7 +538,9 @@ impl ClusterState { // We have garbage collected some tombstones that the other node does not know about // yet. A reset is needed. - let should_reset = digest_max_version < node_state.last_gc_version; + let should_reset = digest_last_gc_version < node_state.last_gc_version + && digest_max_version < node_state.last_gc_version; + let from_version_excluded = if should_reset { warn!( "Node to reset {chitchat_id:?} last gc version: {} max version: {}", @@ -569,7 +574,10 @@ impl ClusterState { // There aren't any key-values in the state_node apparently. // Let's add a specific instruction to the delta to set the max version. if !added_something { - delta_serializer.try_set_max_version(stale_node.node_state.max_version); + // This call returns false if the mtu has been reached. + // + // In that case, this empty node update is useless but does not hurt correctness. + let _ = delta_serializer.try_set_max_version(stale_node.node_state.max_version); } } @@ -1050,14 +1058,15 @@ mod tests { let node2 = ChitchatId::for_local_test(10_002); let node2_state = cluster_state.node_state_mut(&node2); + node2_state.set_last_gc_version(10u64); node2_state.set("key_a", ""); node2_state.set("key_b", ""); let digest = cluster_state.compute_digest(&HashSet::new()); let mut expected_node_digests = Digest::default(); - expected_node_digests.add_node(node1.clone(), Heartbeat(0), 1); - expected_node_digests.add_node(node2.clone(), Heartbeat(0), 2); + expected_node_digests.add_node(node1.clone(), Heartbeat(0), 0, 1); + expected_node_digests.add_node(node2.clone(), Heartbeat(0), 10u64, 2); assert_eq!(&digest, &expected_node_digests); } @@ -1228,8 +1237,8 @@ mod tests { let mut digest = Digest::default(); let node1 = ChitchatId::for_local_test(10_001); let node2 = ChitchatId::for_local_test(10_002); - digest.add_node(node1.clone(), Heartbeat(0), 1); - digest.add_node(node2.clone(), Heartbeat(0), 2); + digest.add_node(node1.clone(), Heartbeat(0), 0, 1); + digest.add_node(node2.clone(), Heartbeat(0), 0, 2); test_with_varying_max_transmitted_kv_helper( &cluster_state, @@ -1250,8 +1259,8 @@ mod tests { let mut digest = Digest::default(); let node1 = ChitchatId::for_local_test(10_001); let node2 = ChitchatId::for_local_test(10_002); - digest.add_node(node1.clone(), Heartbeat(0), 1); - digest.add_node(node2.clone(), Heartbeat(0), 2); + digest.add_node(node1.clone(), Heartbeat(0), 0, 1); + digest.add_node(node2.clone(), Heartbeat(0), 0, 2); test_with_varying_max_transmitted_kv_helper( &cluster_state, @@ -1272,7 +1281,7 @@ mod tests { let mut digest = Digest::default(); let node1 = ChitchatId::for_local_test(10_001); let node2 = ChitchatId::for_local_test(10_002); - digest.add_node(node2.clone(), Heartbeat(0), 3); + digest.add_node(node2.clone(), Heartbeat(0), 0, 3); test_with_varying_max_transmitted_kv_helper( &cluster_state, @@ -1326,7 +1335,7 @@ mod tests { { let mut digest = Digest::default(); - digest.add_node(node1.clone(), Heartbeat(0), 1); + digest.add_node(node1.clone(), Heartbeat(0), 0, 1); let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, @@ -1350,7 +1359,7 @@ mod tests { { let mut digest = Digest::default(); let node1 = ChitchatId::for_local_test(10_001); - digest.add_node(node1.clone(), Heartbeat(0), 1); + digest.add_node(node1.clone(), Heartbeat(0), 0, 1); let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, @@ -1375,7 +1384,7 @@ mod tests { { let mut digest = Digest::default(); - digest.add_node(node1.clone(), Heartbeat(0), 1); + digest.add_node(node1.clone(), Heartbeat(0), 0, 1); let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, From e84780ed991edbef665272282f413dedced6abfd Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Mar 2024 20:41:03 +0900 Subject: [PATCH 7/8] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Raphaël Marinier --- ALGORITHM.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ALGORITHM.md b/ALGORITHM.md index 409ee01..0ddd6aa 100644 --- a/ALGORITHM.md +++ b/ALGORITHM.md @@ -2,10 +2,10 @@ Chitchat relies on an extension of the Scuttlebutt reconciliation algorithm, to - Key-Value deletion - Member leaving the cluster. -For simplification, we will just discuss the replication of a single map on a cluster of n node, one which only one node can write. +For simplification, we will just discuss the replication of a single map on a cluster of n node, one which only one node can write to. (In scuttlebutt, each node has ownership over such a map.) -There are two kinds of update event on this map: +There are two kinds of update events on this map: - Set(K, V) - Deletion(K) @@ -34,12 +34,12 @@ node will send two parameters describing how much of the state is replicated: - `max_version`: One big difference with reliable broadcast, is that this `max version` does NOT mean that the node's state reflects all of the events earlier than `max_version`. Rather, it says that if the node were to receive all of the events within between `max_version` and $V$ then it will end up to date. The difference is subtle, but comes from an important optimization of scuttlebutt. -If a node ask for an update above $max_version = m$, and that for a given key $k$ there are two updates after $m$, there is no need to send the first one: it will eventually be overridden by the second one. +If a node asks for an update above $max_version = m$, and that for a given key $k$ there are two updates after $m$, there is no need to send the first one: it will eventually be overridden by the second one. - `last_gc_version`: Rather than being deleted right away, the algorithm is using a tombstone mechanism. Key entries are kept by marked as deleted. From the client point of view, everything looks like the key really has been deleted. Eventually, in order to free memory, we garbage collect all of the key values marked for deletion that are older than a few hours. We then keep track of the maximum `last_gc_version`. -At each round of the protocol, a node makes sure to its pair $(last\_gc\_version, max\_version)$ in the lexicographical sense. +At each round of the protocol, a node maintains a pair $(last\_gc\_version, max\_version)$ which can only increase in lexicographical order. We also defined the subsequence of missing updates $(u')$ extracted from $(u_i)$ by keeping only the update matching the predicate: From 5b649379a3ff509e38afa25eb7d1403fa83d0529 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Thu, 7 Mar 2024 16:07:04 +0900 Subject: [PATCH 8/8] CR comments --- ALGORITHM.md | 6 ++++-- chitchat/src/state.rs | 10 +++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/ALGORITHM.md b/ALGORITHM.md index 0ddd6aa..bef4c6b 100644 --- a/ALGORITHM.md +++ b/ALGORITHM.md @@ -2,10 +2,11 @@ Chitchat relies on an extension of the Scuttlebutt reconciliation algorithm, to - Key-Value deletion - Member leaving the cluster. -For simplification, we will just discuss the replication of a single map on a cluster of n node, one which only one node can write to. +For simplification, we will just discuss the replication of a single map on a cluster of n node, on which only one node can write to. + (In scuttlebutt, each node has ownership over such a map.) -There are two kinds of update events on this map: +There are two kinds of update event on this map: - Set(K, V) - Deletion(K) @@ -34,6 +35,7 @@ node will send two parameters describing how much of the state is replicated: - `max_version`: One big difference with reliable broadcast, is that this `max version` does NOT mean that the node's state reflects all of the events earlier than `max_version`. Rather, it says that if the node were to receive all of the events within between `max_version` and $V$ then it will end up to date. The difference is subtle, but comes from an important optimization of scuttlebutt. + If a node asks for an update above $max_version = m$, and that for a given key $k$ there are two updates after $m$, there is no need to send the first one: it will eventually be overridden by the second one. - `last_gc_version`: Rather than being deleted right away, the algorithm is using a tombstone mechanism. Key entries are kept by marked as deleted. From the client point of view, everything looks like the key really has been deleted. Eventually, in order diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 071b39d..ceb8162 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -205,8 +205,16 @@ impl NodeState { current_last_gc_version=self.last_gc_version, "resetting node"); *self = NodeState::new(node_delta.chitchat_id.clone(), self.listeners.clone()); + // The node_delta max_version whe if let Some(max_version) = node_delta.max_version { - self.max_version = max_version; + if node_delta.key_values.is_empty() { + self.max_version = max_version; + } else { + warn!( + "Received a delta with a max_version, and key_values as well. This is \ + unexpected, please report." + ); + } } // We need to reset our `last_gc_version`. self.last_gc_version = node_delta.last_gc_version;