diff --git a/README.md b/README.md index 6b356a0..f708529 100644 --- a/README.md +++ b/README.md @@ -56,23 +56,25 @@ associated with a version, and replicated using the same mechanism as a KV updat The library will then interpret this versioned tombstone before exposing kv to the user. -To avoid keeping deleted KV indefinitely, the library includes a GC mechanism. Any nodes containing a tombstone older than a given `grace period threshold` -(age is measured in ticks of heartbeat), it is safe to be deleted. +To avoid keeping deleted KV indefinitely, the library includes a GC mechanism. +Every tombstone is associated with a monotonic timestamp. +It is local in the sense that it is computed locally to the given node, and never shared with other servers. + +All KV with a timestamp older than a given `marked_for_deletion_grace_period` will be deleted upon delete operations. (Note for a given KV, GC can happen at different +times on different nodes.) This yields the following problem. If a node was disconnected for more than `marked_for_deletion_grace_period`, they could have missed the deletion of a KV and never be aware of it. -To address this problem, nodes that are too outdated have to reset their state. +To address this problem, nodes keep a record of the version of the last KV they +have GCed. Here is how it works: + +Let's assume a Node A sends a Syn message to a Node B. The digest expresses that A want for updates about Node N with a version stricly greater than `V`. +Node B will compare the version `V` of the digest with its `max_gc_version` for the node N. -More accurately, let's assume a Node A sends a Syn message to Node B with a digest with an outdated version V for a node N. -Node B will compare the version of the digest with its own version. +If `V > max_gc_version`, Node B knows that no GC has impacted Key values with a version above V. It can safely emit a normal delta to A. -If V is fresher than `own version - marked_for_deletion_grace_period`, -Node B knows that no GC has impacted Key values with a version above V. It can -safely emit a normal delta to A. -If however V is older than `own version - marked_for_deletion_grace_period`, -a GC could have been executed. Instead of sending a delta to Node A, Node B will -instruct A to reset its state. +If however V is older, a GC could have been executed. Instead of sending a delta to Node A, Node B will instruct A to reset its state. Node A will then wipe-off whatever information it has about N, and will start syncing from a blank state. diff --git a/chitchat-test/src/main.rs b/chitchat-test/src/main.rs index 3acf0d6..5159531 100644 --- a/chitchat-test/src/main.rs +++ b/chitchat-test/src/main.rs @@ -99,7 +99,7 @@ async fn main() -> anyhow::Result<()> { dead_node_grace_period: Duration::from_secs(10), ..FailureDetectorConfig::default() }, - marked_for_deletion_grace_period: 60, + marked_for_deletion_grace_period: Duration::from_secs(60), }; let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?; let chitchat = chitchat_handler.chitchat(); diff --git a/chitchat/src/configuration.rs b/chitchat/src/configuration.rs index 9533c88..c8bcd71 100644 --- a/chitchat/src/configuration.rs +++ b/chitchat/src/configuration.rs @@ -15,14 +15,14 @@ pub struct ChitchatConfig { pub failure_detector_config: FailureDetectorConfig, // Marked for deletion grace period expressed as a number of hearbeats. // Chitchat ensures a key marked for deletion is eventually deleted by three mechanisms: - // - Garbage collection: each heartbeat, marked for deletion keys with `tombstone + - // marked_for_deletion_grace_period > node.heartbeat` are deleted. + // - Garbage collection: each heartbeat, marked for deletion keys with `deletion now > instant + // + marked_for_deletion_grace_period` are deleted. // - Compute delta: for a given node digest, if `node_digest.heartbeat + // marked_for_deletion_grace_period < node_state.heartbeat` the node is flagged "to be reset" // and the delta is populated with all keys and values. // - Apply delta: for a node flagged "to be reset", Chitchat will remove the node state and // populate a fresh new node state with the keys and values present in the delta. - pub marked_for_deletion_grace_period: usize, + pub marked_for_deletion_grace_period: Duration, } impl ChitchatConfig { @@ -37,7 +37,7 @@ impl ChitchatConfig { listen_addr, seed_nodes: Vec::new(), failure_detector_config: Default::default(), - marked_for_deletion_grace_period: 10_000, + marked_for_deletion_grace_period: Duration::from_secs(10_000), } } } @@ -54,9 +54,7 @@ impl Default for ChitchatConfig { listen_addr, seed_nodes: Vec::new(), failure_detector_config: Default::default(), - // Each heartbeat increments the version, with one heartbeat each second - // 86400 ~ 24h. - marked_for_deletion_grace_period: 86400, + marked_for_deletion_grace_period: Duration::from_secs(3_600 * 2), // 2h } } } diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index e0f3234..90b2d6a 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -1,5 +1,7 @@ use std::collections::HashSet; +use tokio::time::Instant; + use crate::serialize::*; use crate::{ChitchatId, Heartbeat, VersionedValue}; @@ -116,7 +118,8 @@ impl Deserializable for DeltaOp { let key = String::deserialize(buf)?; let value = String::deserialize(buf)?; let version = u64::deserialize(buf)?; - let tombstone = Option::::deserialize(buf)?; + let deleted = bool::deserialize(buf)?; + let tombstone = if deleted { Some(Instant::now()) } else { None }; let versioned_value: VersionedValue = VersionedValue { value, version, @@ -182,7 +185,7 @@ impl<'a> Serializable for DeltaOpRef<'a> { key.serialize(buf); versioned_value.value.serialize(buf); versioned_value.version.serialize(buf); - versioned_value.tombstone.serialize(buf); + versioned_value.tombstone.is_some().serialize(buf); } Self::NodeToReset(chitchat_id) => { buf.push(DeltaOpTag::NodeToReset.into()); @@ -204,7 +207,7 @@ impl<'a> Serializable for DeltaOpRef<'a> { key.serialized_len() + versioned_value.value.serialized_len() + versioned_value.version.serialized_len() - + versioned_value.tombstone.serialized_len() + + 1 } Self::NodeToReset(chitchat_id) => chitchat_id.serialized_len(), } @@ -267,13 +270,14 @@ impl Delta { key: &str, value: &str, version: crate::Version, - tombstone: Option, + deleted: bool, ) { let node_delta = self .node_deltas .iter_mut() .find(|node_delta| &node_delta.chitchat_id == chitchat_id) .unwrap(); + let tombstone = if deleted { Some(Instant::now()) } else { None }; node_delta.key_values.push(( key.to_string(), VersionedValue { @@ -488,7 +492,7 @@ mod tests { VersionedValue { value: "".to_string(), version: 2, - tombstone: Some(0), + tombstone: Some(Instant::now()), }, )); @@ -515,7 +519,7 @@ mod tests { tombstone: None, }, )); - test_aux_delta_writer(delta_writer, 99); + test_aux_delta_writer(delta_writer, 98); } #[test] diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index adda914..dcd0a4f 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -122,7 +122,6 @@ impl Chitchat { &digest, delta_mtu, &scheduled_for_deletion, - self.config.marked_for_deletion_grace_period as u64, ); Some(ChitchatMessage::SynAck { digest: self_digest, @@ -137,7 +136,6 @@ impl Chitchat { &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1, &scheduled_for_deletion, - self.config.marked_for_deletion_grace_period as u64, ); Some(ChitchatMessage::Ack { delta }) } @@ -153,11 +151,8 @@ impl Chitchat { } fn gc_keys_marked_for_deletion(&mut self) { - let dead_nodes = self.dead_nodes().cloned().collect::>(); - self.cluster_state.gc_keys_marked_for_deletion( - self.config.marked_for_deletion_grace_period as u64, - &dead_nodes, - ); + self.cluster_state + .gc_keys_marked_for_deletion(self.config.marked_for_deletion_grace_period); } /// Reports heartbeats to the failure detector for nodes in the delta for which we received an @@ -190,11 +185,12 @@ impl Chitchat { let current_live_nodes = self .live_nodes() - .map(|chitchat_id| { - let node_state = self - .node_state(chitchat_id) - .expect("Node state should exist."); - (chitchat_id.clone(), node_state.max_version()) + .flat_map(|chitchat_id| { + if let Some(node_state) = self.node_state(chitchat_id) { + return Some((chitchat_id.clone(), node_state.max_version())); + } + warn!("node state for {chitchat_id:?} is absent"); + None }) .collect::>(); @@ -402,7 +398,7 @@ mod tests { initial_interval: Duration::from_millis(100), ..Default::default() }, - marked_for_deletion_grace_period: 10_000, + marked_for_deletion_grace_period: Duration::from_secs(3_600), }; let initial_kvs: Vec<(String, String)> = Vec::new(); spawn_chitchat(config, initial_kvs, transport) diff --git a/chitchat/src/message.rs b/chitchat/src/message.rs index 92e3635..8952669 100644 --- a/chitchat/src/message.rs +++ b/chitchat/src/message.rs @@ -165,12 +165,12 @@ mod tests { // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). delta.add_node(node.clone(), Heartbeat(0)); // +29 bytes. - delta.add_kv(&node, "key", "value", 0, Some(5)); - delta.set_serialized_len(70); + delta.add_kv(&node, "key", "value", 0, true); + delta.set_serialized_len(62); let syn_ack = ChitchatMessage::SynAck { digest, delta }; // 1 bytes (syn ack message) + 45 bytes (digest) + 69 bytes (delta). - test_serdeser_aux(&syn_ack, 116); + test_serdeser_aux(&syn_ack, 108); } } @@ -188,10 +188,10 @@ mod tests { // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). delta.add_node(node.clone(), Heartbeat(0)); // +29 bytes. - delta.add_kv(&node, "key", "value", 0, Some(5)); - delta.set_serialized_len(70); + delta.add_kv(&node, "key", "value", 0, true); + delta.set_serialized_len(62); let ack = ChitchatMessage::Ack { delta }; - test_serdeser_aux(&ack, 71); + test_serdeser_aux(&ack, 63); } } diff --git a/chitchat/src/serialize.rs b/chitchat/src/serialize.rs index 4de64c2..96c0849 100644 --- a/chitchat/src/serialize.rs +++ b/chitchat/src/serialize.rs @@ -93,33 +93,6 @@ impl Deserializable for u64 { } } -impl Serializable for Option { - fn serialize(&self, buf: &mut Vec) { - self.is_some().serialize(buf); - if let Some(tombstone) = &self { - tombstone.serialize(buf); - } - } - fn serialized_len(&self) -> usize { - if self.is_some() { - 9 - } else { - 1 - } - } -} - -impl Deserializable for Option { - fn deserialize(buf: &mut &[u8]) -> anyhow::Result { - let is_some: bool = Deserializable::deserialize(buf)?; - if is_some { - let u64_value = Deserializable::deserialize(buf)?; - return Ok(Some(u64_value)); - } - Ok(None) - } -} - impl Serializable for bool { fn serialize(&self, buf: &mut Vec) { buf.push(*self as u8); @@ -525,12 +498,6 @@ mod tests { test_serdeser_aux(&ipv6, 17); } - #[test] - fn test_serialize_option_u64() { - test_serdeser_aux(&Some(1), 9); - test_serdeser_aux(&None, 1); - } - #[test] fn test_serialize_block_type() { let mut valid_vals_count = 0; diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 17cf7dc..3397268 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -3,12 +3,14 @@ use std::collections::{BTreeMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::net::{Ipv4Addr, SocketAddr}; use std::ops::Bound; +use std::time::Duration; use itertools::Itertools; use rand::prelude::SliceRandom; use rand::Rng; use serde::{Deserialize, Serialize}; use tokio::sync::watch; +use tokio::time::Instant; use tracing::warn; use crate::delta::{Delta, DeltaSerializer, NodeDelta}; @@ -24,6 +26,11 @@ pub struct NodeState { max_version: Version, #[serde(skip)] listeners: Listeners, + // This is the maximum version of the last tombstone GC. + // + // After we GC the tombstones, we cannot safely do replication with + // nodes that are asking for a diff with version lower than this. + max_garbage_collected_tombstone_version: Option, } impl Debug for NodeState { @@ -44,6 +51,7 @@ impl NodeState { key_values: Default::default(), max_version: Default::default(), listeners, + max_garbage_collected_tombstone_version: None, } } @@ -58,6 +66,7 @@ impl NodeState { key_values: Default::default(), max_version: Default::default(), listeners: Listeners::default(), + max_garbage_collected_tombstone_version: None, } } @@ -132,7 +141,7 @@ impl NodeState { return; }; self.max_version += 1; - versioned_value.tombstone = Some(self.heartbeat.0); + versioned_value.tombstone = Some(Instant::now()); versioned_value.version = self.max_version; versioned_value.value = "".to_string(); } @@ -157,15 +166,24 @@ impl NodeState { } /// Removes the keys marked for deletion such that `tombstone + grace_period > heartbeat`. - fn gc_keys_marked_for_deletion(&mut self, grace_period: u64) { - let heartbeat = self.heartbeat.0; + fn gc_keys_marked_for_deletion(&mut self, grace_period: Duration) { + let now = Instant::now(); + let mut max_deleted_version = self.max_garbage_collected_tombstone_version; self.key_values .retain(|_, versioned_value: &mut VersionedValue| { - versioned_value - .tombstone - .map(|tombstone: u64| tombstone + grace_period >= heartbeat) - .unwrap_or(true) - }) + let Some(deleted_instant) = versioned_value.tombstone else { + // The KV is not deleted. We keep it! + return true; + }; + if now < deleted_instant + grace_period { + // We haved not passed the grace period yet. We keep it! + return true; + } + // We have exceeded the tombstone grace period. Time to remove it. + max_deleted_version = Some(versioned_value.version).max(max_deleted_version); + false + }); + self.max_garbage_collected_tombstone_version = max_deleted_version; } /// Returns an iterator over the versioned values that are strictly greater than @@ -284,9 +302,14 @@ impl ClusterState { } pub(crate) fn apply_delta(&mut self, delta: Delta) { - // Remove nodes to reset. - self.node_states - .retain(|chitchat_id, _| !delta.nodes_to_reset.contains(chitchat_id)); + // Clearing the node of the states to reset. + for node_to_reset in delta.nodes_to_reset { + // We don't want to remove the entire state here: the node could be alive and the + // live watcher panics if the state is missing. + if let Some(node_state) = self.node_states.get_mut(&node_to_reset) { + *node_state = NodeState::new(node_to_reset, self.listeners.clone()); + } + } // Apply delta. for node_delta in delta.node_deltas { @@ -320,15 +343,8 @@ impl ClusterState { } } - pub fn gc_keys_marked_for_deletion( - &mut self, - marked_for_deletion_grace_period: u64, - dead_nodes: &HashSet, - ) { - for (chitchat_id, node_state) in &mut self.node_states { - if dead_nodes.contains(chitchat_id) { - continue; - } + pub fn gc_keys_marked_for_deletion(&mut self, marked_for_deletion_grace_period: Duration) { + for node_state in self.node_states.values_mut() { node_state.gc_keys_marked_for_deletion(marked_for_deletion_grace_period); } } @@ -341,7 +357,6 @@ impl ClusterState { digest: &Digest, mtu: usize, scheduled_for_deletion: &HashSet<&ChitchatId>, - marked_for_deletion_grace_period: u64, ) -> Delta { let mut stale_nodes = SortedStaleNodes::default(); let mut nodes_to_reset = Vec::new(); @@ -354,7 +369,13 @@ impl ClusterState { stale_nodes.insert(chitchat_id, node_state); continue; }; - if node_digest.heartbeat.0 + marked_for_deletion_grace_period < node_state.heartbeat.0 { + let should_reset = + if let Some(max_gc_version) = node_state.max_garbage_collected_tombstone_version { + max_gc_version >= node_digest.max_version + } else { + false + }; + if should_reset { warn!("Node to reset {chitchat_id:?}"); nodes_to_reset.push(chitchat_id); stale_nodes.insert(chitchat_id, node_state); @@ -391,7 +412,7 @@ impl ClusterState { VersionedValue { value: String::new(), version: stale_node.node_state.max_version, - tombstone: Some(0), + tombstone: Some(Instant::now()), }, ) { return delta_serializer.finish(); @@ -802,23 +823,21 @@ mod tests { node_state.heartbeat = Heartbeat(10); node_state.set("key", "1"); node_state.mark_for_deletion("key"); - assert_eq!( - node_state.get_versioned("key").unwrap(), - &VersionedValue { - value: "".to_string(), - version: 2, - tombstone: Some(10), - } - ); + { + let versioned_value = node_state.get_versioned("key").unwrap(); + assert_eq!(&versioned_value.value, ""); + assert_eq!(versioned_value.version, 2u64); + assert!(&versioned_value.tombstone.is_some()); + } + + // Overriding the same key node_state.set("key", "2"); - assert_eq!( - node_state.get_versioned("key").unwrap(), - &VersionedValue { - value: "2".to_string(), - version: 3, - tombstone: None, - } - ); + { + let versioned_value = node_state.get_versioned("key").unwrap(); + assert_eq!(&versioned_value.value, "2"); + assert_eq!(versioned_value.version, 3u64); + assert!(&versioned_value.tombstone.is_none()); + } } #[test] @@ -842,18 +861,20 @@ mod tests { assert_eq!(&digest, &expected_node_digests); } - #[test] - fn test_cluster_state_gc_keys_marked_for_deletion() { + #[tokio::test] + async fn test_cluster_state_gc_keys_marked_for_deletion() { + tokio::time::pause(); let mut cluster_state = ClusterState::default(); let node1 = ChitchatId::for_local_test(10_001); let node1_state = cluster_state.node_state_mut(&node1); - node1_state.heartbeat = Heartbeat(100); - node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 + node1_state.set("key_a", "1"); node1_state.mark_for_deletion("key_a"); // Version 2. Tombstone set to heartbeat 100. + tokio::time::advance(Duration::from_secs(5)).await; node1_state.set_with_version("key_b".to_string(), "3".to_string(), 13); // 3 node1_state.heartbeat = Heartbeat(110); - // No GC if tombstone (=100) + grace_period <= heartbeat (=110). - cluster_state.gc_keys_marked_for_deletion(10, &HashSet::new()); + // No GC as tombstone is less than 10 secs old. + cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10)); + cluster_state .node_state(&node1) .unwrap() @@ -866,8 +887,10 @@ mod tests { .key_values .get("key_b") .unwrap(); + // GC if tombstone (=100) + grace_period > heartbeat (=110). - cluster_state.gc_keys_marked_for_deletion(9, &HashSet::new()); + tokio::time::advance(Duration::from_secs(5)).await; + cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10)); assert!(cluster_state .node_state(&node1) .unwrap() @@ -897,13 +920,13 @@ mod tests { let mut delta = Delta::default(); delta.add_node(node1.clone(), Heartbeat(0)); - delta.add_kv(&node1, "key_a", "4", 4, None); - delta.add_kv(&node1, "key_b", "2", 2, None); + delta.add_kv(&node1, "key_a", "4", 4, false); + delta.add_kv(&node1, "key_b", "2", 2, false); // Reset node 2. delta.add_node_to_reset(node2.clone()); delta.add_node(node2.clone(), Heartbeat(0)); - delta.add_kv(&node2, "key_d", "4", 4, None); + delta.add_kv(&node2, "key_d", "4", 4, false); cluster_state.apply_delta(delta); let node1_state = cluster_state.node_state(&node1).unwrap(); @@ -943,20 +966,15 @@ mod tests { cluster_state: &ClusterState, digest: &Digest, dead_nodes: &HashSet<&ChitchatId>, - expected_delta_atoms: &[(&ChitchatId, &str, &str, Version, Option)], + expected_delta_atoms: &[(&ChitchatId, &str, &str, Version, bool)], ) { - let max_delta = cluster_state.compute_partial_delta_respecting_mtu( - digest, - usize::MAX, - dead_nodes, - 10_000, - ); + let max_delta = + cluster_state.compute_partial_delta_respecting_mtu(digest, usize::MAX, dead_nodes); let mut buf = Vec::new(); max_delta.serialize(&mut buf); let mut mtu_per_num_entries = Vec::new(); for mtu in 100..buf.len() { - let delta = - cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes, 10_000); + let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes); let num_tuples = delta.num_tuples(); if mtu_per_num_entries.len() == num_tuples + 1 { continue; @@ -972,17 +990,13 @@ mod tests { expected_delta.add_kv(node, key, val, version, tombstone); } { - let delta = cluster_state - .compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes, 10_000); + let delta = + cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes); assert_eq!(&delta, &expected_delta); } { - let delta = cluster_state.compute_partial_delta_respecting_mtu( - digest, - mtu + 1, - dead_nodes, - 10_000, - ); + let delta = + cluster_state.compute_partial_delta_respecting_mtu(digest, mtu + 1, dead_nodes); assert_eq!(&delta, &expected_delta); } } @@ -1022,9 +1036,9 @@ mod tests { &digest, &HashSet::new(), &[ - (&node2, "key_c", "3", 3, None), - (&node2, "key_d", "", 5, Some(0)), - (&node1, "key_b", "2", 2, None), + (&node2, "key_c", "3", 3, false), + (&node2, "key_d", "", 5, true), + (&node1, "key_b", "2", 2, false), ], ); } @@ -1044,9 +1058,9 @@ mod tests { &digest, &HashSet::new(), &[ - (&node2, "key_c", "3", 3, None), - (&node2, "key_d", "", 5, Some(0)), - (&node1, "key_b", "2", 2, None), + (&node2, "key_c", "3", 3, false), + (&node2, "key_d", "", 5, true), + (&node1, "key_b", "2", 2, false), ], ); } @@ -1065,9 +1079,9 @@ mod tests { &digest, &HashSet::new(), &[ - (&node1, "key_a", "1", 1, None), - (&node1, "key_b", "2", 2, None), - (&node2, "key_d", "4", 4, None), + (&node1, "key_a", "1", 1, false), + (&node1, "key_b", "2", 2, false), + (&node2, "key_d", "4", 4, false), ], ); } @@ -1087,63 +1101,96 @@ mod tests { &digest, &dead_nodes, &[ - (&node1, "key_a", "1", 1, None), - (&node1, "key_b", "2", 2, None), + (&node1, "key_a", "1", 1, false), + (&node1, "key_b", "2", 2, false), ], ); } - #[test] - fn test_cluster_state_compute_delta_with_old_node_state_that_needs_reset() { + #[tokio::test] + async fn test_cluster_state_compute_delta_with_old_node_state_that_needs_reset() { + tokio::time::pause(); let mut cluster_state = ClusterState::default(); let node1 = ChitchatId::for_local_test(10_001); - let node1_state = cluster_state.node_state_mut(&node1); - node1_state.heartbeat = Heartbeat(10000); - node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 - node1_state.set_with_version("key_b".to_string(), "2".to_string(), 2); // 2 - let node2 = ChitchatId::for_local_test(10_002); - let node2_state = cluster_state.node_state_mut(&node2); - node2_state.set_with_version("key_c".to_string(), "3".to_string(), 2); // 2 + { + let node1_state = cluster_state.node_state_mut(&node1); + node1_state.heartbeat = Heartbeat(10000); + node1_state.set_with_version("key_a".to_string(), "1".to_string(), 1); // 1 + node1_state.set_with_version("key_b".to_string(), "2".to_string(), 2); // 2 + + let node2_state = cluster_state.node_state_mut(&node2); + node2_state.set_with_version("key_c".to_string(), "3".to_string(), 2); // 2 + } - let mut digest = Digest::default(); - let node1 = ChitchatId::for_local_test(10_001); - digest.add_node(node1.clone(), Heartbeat(0), 1); { + let mut digest = Digest::default(); + let node1 = ChitchatId::for_local_test(10_001); + digest.add_node(node1.clone(), Heartbeat(0), 1); let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), - 10_000, ); assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); expected_delta.add_node(node1.clone(), Heartbeat(10_000)); - expected_delta.add_kv(&node1, "key_b", "2", 2, None); + expected_delta.add_kv(&node1, "key_b", "2", 2, false); expected_delta.add_node(node2.clone(), Heartbeat(0)); - expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, None); + expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); expected_delta.set_serialized_len(78); assert_eq!(delta, expected_delta); } + + cluster_state + .node_state_mut(&node1) + .mark_for_deletion("key_a"); + tokio::time::advance(Duration::from_secs(5)).await; + cluster_state.gc_keys_marked_for_deletion(Duration::from_secs(10)); + + { + let mut digest = Digest::default(); + let node1 = ChitchatId::for_local_test(10_001); + digest.add_node(node1.clone(), Heartbeat(0), 1); + let delta = cluster_state.compute_partial_delta_respecting_mtu( + &digest, + MAX_UDP_DATAGRAM_PAYLOAD_SIZE, + &HashSet::new(), + ); + assert!(delta.nodes_to_reset.is_empty()); + let mut expected_delta = Delta::default(); + expected_delta.add_node(node1.clone(), Heartbeat(10_000)); + expected_delta.add_kv(&node1, "key_b", "2", 2, false); + expected_delta.add_kv(&node1, "key_a", "", 3, true); + expected_delta.add_node(node2.clone(), Heartbeat(0)); + expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); + expected_delta.set_serialized_len(90); + assert_eq!(delta, expected_delta); + } + + const DELETE_GRACE_PERIOD: Duration = Duration::from_secs(10); + // node1 / key a will be deleted here. + tokio::time::advance(DELETE_GRACE_PERIOD).await; + cluster_state + .node_state_mut(&node1) + .gc_keys_marked_for_deletion(Duration::from_secs(10)); + { - // Node 1 heartbeat in digest + grace period (9_999) is inferior to the - // node1's hearbeat in the cluster state. Thus we expect the cluster to compute a - // delta that will reset node 1. + let mut digest = Digest::default(); + digest.add_node(node1.clone(), Heartbeat(0), 1); let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), - 9_999, ); let mut expected_delta = Delta::default(); expected_delta.add_node_to_reset(node1.clone()); expected_delta.add_node(node1.clone(), Heartbeat(10_000)); - expected_delta.add_kv(&node1, "key_a", "1", 1, None); - expected_delta.add_kv(&node1, "key_b", "2", 2, None); + expected_delta.add_kv(&node1, "key_b", "2", 2, false); expected_delta.add_node(node2.clone(), Heartbeat(0)); - expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, None); - expected_delta.set_serialized_len(91); + expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); + expected_delta.set_serialized_len(81); assert_eq!(delta, expected_delta); } } diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index 55af70d..afcf37b 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use serde::{Deserialize, Serialize}; +use tokio::time::Instant; /// For the lifetime of a cluster, nodes can go down and come back up multiple times. They may also /// die permanently. A [`ChitchatId`] is composed of three components: @@ -47,11 +48,56 @@ impl ChitchatId { } /// A versioned key-value pair. -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, Serialize, Deserialize)] +#[serde( + into = "VersionedValueForSerialization", + from = "VersionedValueForSerialization" +)] pub struct VersionedValue { pub value: String, pub version: Version, - pub tombstone: Option, + // The tombstone instant is transient: + // Only the presence of a tombstone or not is serialized, and used in partial eq eq. + pub tombstone: Option, +} + +impl PartialEq for VersionedValue { + fn eq(&self, other: &Self) -> bool { + self.value.eq(&other.value) + && self.version.eq(&other.version) + && self.tombstone.is_some().eq(&other.tombstone.is_some()) + } +} + +#[derive(Serialize, Deserialize)] +struct VersionedValueForSerialization { + pub value: String, + pub version: Version, + pub tombstone: bool, +} + +impl From for VersionedValue { + fn from(versioned_value: VersionedValueForSerialization) -> Self { + VersionedValue { + value: versioned_value.value, + version: versioned_value.version, + tombstone: if versioned_value.tombstone { + Some(Instant::now()) + } else { + None + }, + } + } +} + +impl From for VersionedValueForSerialization { + fn from(versioned_value: VersionedValue) -> Self { + VersionedValueForSerialization { + value: versioned_value.value, + version: versioned_value.version, + tombstone: versioned_value.tombstone.is_some(), + } + } } #[cfg(test)] diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index f493a8a..807a380 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -68,11 +68,11 @@ struct Simulator { transport: ChannelTransport, node_handles: HashMap, gossip_interval: Duration, - marked_for_deletion_key_grace_period: usize, + marked_for_deletion_key_grace_period: Duration, } impl Simulator { - pub fn new(gossip_interval: Duration, marked_for_deletion_key_grace_period: usize) -> Self { + pub fn new(gossip_interval: Duration, marked_for_deletion_key_grace_period: Duration) -> Self { Self { transport: ChannelTransport::with_mtu(65_507), node_handles: HashMap::new(), @@ -246,7 +246,7 @@ pub fn find_available_tcp_port() -> anyhow::Result { #[tokio::test] async fn test_simple_simulation_insert() { // let _ = tracing_subscriber::fmt::try_init(); - let mut simulator = Simulator::new(Duration::from_millis(100), 10); + let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); let chitchat_id_1 = create_chitchat_id("node-1"); let chitchat_id_2 = create_chitchat_id("node-2"); let operations = vec![ @@ -285,7 +285,7 @@ async fn test_simple_simulation_insert() { #[tokio::test] async fn test_simple_simulation_with_network_partition() { // let _ = tracing_subscriber::fmt::try_init(); - let mut simulator = Simulator::new(Duration::from_millis(100), 10); + let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); let chitchat_id_1 = create_chitchat_id("node-1"); let chitchat_id_2 = create_chitchat_id("node-2"); let operations = vec![ @@ -328,7 +328,7 @@ async fn test_simple_simulation_with_network_partition() { async fn test_marked_for_deletion_gc_with_network_partition() { const TIMEOUT: Duration = Duration::from_millis(500); // let _ = tracing_subscriber::fmt::try_init(); - let mut simulator = Simulator::new(Duration::from_millis(100), 10); + let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); let chitchat_id_1 = create_chitchat_id("node-1"); let chitchat_id_2 = create_chitchat_id("node-2"); let chitchat_id_3 = create_chitchat_id("node-3"); @@ -389,7 +389,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), false), timeout_opt: None, }, - // Wait for garbage collection: grace period * heartbeat ~ 1 second + margin of 1 second. + // Wait for garbage collection: grace period + margin of 1 second. Operation::Wait(Duration::from_secs(2)), Operation::NodeStateAssert { server_chitchat_id: chitchat_id_2.clone(), @@ -451,7 +451,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { async fn test_simple_simulation_heavy_insert_delete() { // let _ = tracing_subscriber::fmt::try_init(); let mut rng = thread_rng(); - let mut simulator = Simulator::new(Duration::from_millis(100), 50); + let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(5)); let mut chitchat_ids = Vec::new(); for i in 0..20 { chitchat_ids.push(create_chitchat_id(&format!("node-{}", i))); @@ -520,7 +520,7 @@ async fn test_simple_simulation_heavy_insert_delete() { } // Wait for garbage collection to kick in. - // Time to wait is 10s = margin of 4s + grace_period(50) * heartbeat (100ms). + // Time to wait is 10s = grace_period(5s) + margin of 5s tokio::time::sleep(Duration::from_secs(10)).await; info!("Checking keys are deleted..."); for (chitchat_id, keys) in keys_values_inserted_per_chitchat_id.clone().into_iter() { diff --git a/chitchat/tests/perf_test.rs b/chitchat/tests/perf_test.rs index fd86cff..2ac73d4 100644 --- a/chitchat/tests/perf_test.rs +++ b/chitchat/tests/perf_test.rs @@ -28,7 +28,7 @@ async fn spawn_one(chitchat_id: u16, transport: &dyn Transport) -> ChitchatHandl initial_interval: gossip_interval, ..Default::default() }, - marked_for_deletion_grace_period: 10_000, + marked_for_deletion_grace_period: Duration::from_secs(10_000), }; spawn_chitchat(config, Vec::new(), transport).await.unwrap() }