diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 90b2d6a..2e62e88 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -2,8 +2,8 @@ use std::collections::HashSet; use tokio::time::Instant; -use crate::serialize::*; -use crate::{ChitchatId, Heartbeat, VersionedValue}; +use crate::{serialize::*, Version}; +use crate::{ChitchatId, VersionedValue}; /// A delta is the message we send to another node to update it. /// @@ -32,7 +32,7 @@ impl Delta { let node_deltas = self.node_deltas.iter().flat_map(|node_delta| { std::iter::once(DeltaOpRef::Node { chitchat_id: &node_delta.chitchat_id, - heartbeat: node_delta.heartbeat, + last_gc_version: node_delta.last_gc_version, }) .chain(node_delta.key_values.iter().map(|(key, versioned_value)| { DeltaOpRef::KeyValue { @@ -49,7 +49,7 @@ enum DeltaOp { NodeToReset(ChitchatId), Node { chitchat_id: ChitchatId, - heartbeat: Heartbeat, + last_gc_version: Version, }, KeyValue { key: String, @@ -61,7 +61,7 @@ enum DeltaOpRef<'a> { NodeToReset(&'a ChitchatId), Node { chitchat_id: &'a ChitchatId, - heartbeat: Heartbeat, + last_gc_version: Version, }, KeyValue { key: &'a str, @@ -108,10 +108,10 @@ impl Deserializable for DeltaOp { } DeltaOpTag::Node => { let chitchat_id = ChitchatId::deserialize(buf)?; - let heartbeat = Heartbeat::deserialize(buf)?; + let last_gc_version = Version::deserialize(buf)?; Ok(DeltaOp::Node { chitchat_id, - heartbeat, + last_gc_version, }) } DeltaOpTag::KeyValue => { @@ -139,10 +139,10 @@ impl DeltaOp { match self { DeltaOp::Node { chitchat_id, - heartbeat, + last_gc_version, } => DeltaOpRef::Node { chitchat_id, - heartbeat: *heartbeat, + last_gc_version: *last_gc_version, }, DeltaOp::KeyValue { key, @@ -171,11 +171,11 @@ impl<'a> Serializable for DeltaOpRef<'a> { match self { Self::Node { chitchat_id, - heartbeat, + last_gc_version, } => { buf.push(DeltaOpTag::Node.into()); chitchat_id.serialize(buf); - heartbeat.serialize(buf); + last_gc_version.serialize(buf); } Self::KeyValue { key, @@ -198,8 +198,8 @@ impl<'a> Serializable for DeltaOpRef<'a> { 1 + match self { Self::Node { chitchat_id, - heartbeat, - } => chitchat_id.serialized_len() + heartbeat.serialized_len(), + last_gc_version, + } => chitchat_id.serialized_len() + last_gc_version.serialized_len(), Self::KeyValue { key, versioned_value, @@ -252,14 +252,14 @@ impl Delta { .sum() } - pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) { + pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) { assert!(!self .node_deltas .iter() .any(|node_delta| { node_delta.chitchat_id == chitchat_id })); self.node_deltas.push(NodeDelta { chitchat_id, - heartbeat, + last_gc_version, key_values: Vec::new(), }); } @@ -306,7 +306,7 @@ impl Delta { #[derive(Debug, Eq, PartialEq, serde::Serialize)] pub(crate) struct NodeDelta { pub chitchat_id: ChitchatId, - pub heartbeat: Heartbeat, + pub last_gc_version: Version, pub key_values: Vec<(String, VersionedValue)>, } @@ -335,14 +335,14 @@ impl DeltaBuilder { match op { DeltaOp::Node { chitchat_id, - heartbeat, + last_gc_version } => { self.flush(); anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id)); self.existing_nodes.insert(chitchat_id.clone()); self.current_node_delta = Some(NodeDelta { chitchat_id, - heartbeat, + last_gc_version, key_values: Vec::new(), }); } @@ -441,10 +441,10 @@ impl DeltaSerializer { } /// Returns false if the node could not be added because the payload would exceed the mtu. - pub fn try_add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) -> bool { + pub fn try_add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) -> bool { let new_node_op = DeltaOp::Node { chitchat_id, - heartbeat, + last_gc_version, }; self.try_add_op(new_node_op) } @@ -471,9 +471,8 @@ mod tests { // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node). - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); // +23 bytes: 2 bytes (key length) + 5 bytes (key) + 7 bytes (values) + 8 bytes (version) + // 1 bytes (empty tombstone). @@ -497,9 +496,8 @@ mod tests { )); let node2 = ChitchatId::for_local_test(10_002); - let heartbeat = Heartbeat(0); // +37 bytes - assert!(delta_writer.try_add_node(node2, heartbeat)); + assert!(delta_writer.try_add_node(node2, 0)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -529,9 +527,8 @@ mod tests { // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); - // +37 bytes = 8 bytes (heartbeat) + 27 bytes (node) + 2bytes (block length) - assert!(delta_writer.try_add_node(node1, heartbeat)); + // +37 bytes = 8 bytes (last gc version) + 27 bytes (node) + 2bytes (block length) + assert!(delta_writer.try_add_node(node1, 0)); // +24 bytes (kv + op tag) assert!(delta_writer.try_add_kv( @@ -554,9 +551,8 @@ mod tests { )); let node2 = ChitchatId::for_local_test(10_002); - let heartbeat = Heartbeat(0); - // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node). - assert!(delta_writer.try_add_node(node2, heartbeat)); + // +37 bytes = 8 bytes (last gc version) + 2 bytes (empty node delta) + 27 bytes (node). + assert!(delta_writer.try_add_node(node2, 0)); test_aux_delta_writer(delta_writer, 80); } @@ -576,11 +572,10 @@ mod tests { assert!(delta_writer.try_add_node_to_reset(ChitchatId::for_local_test(10_000))); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); - // +8 bytes (heartbeat) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic new + // +8 bytes (last gc version) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic new // block) = 71 - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); // +23 bytes (kv) + 1 (op tag) // = 95 @@ -604,10 +599,9 @@ mod tests { )); let node2 = ChitchatId::for_local_test(10_002); - let heartbeat = Heartbeat(0); - // +8 bytes (heartbeat) + 27 bytes (ChitchatId) + 1 byte (op tag) + // +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag) // = 155 - assert!(delta_writer.try_add_node(node2, heartbeat)); + assert!(delta_writer.try_add_node(node2, 0u64)); // The block got compressed. test_aux_delta_writer(delta_writer, 85); } @@ -618,9 +612,8 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(100); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -642,9 +635,8 @@ mod tests { )); let node2 = ChitchatId::for_local_test(10_002); - let heartbeat = Heartbeat(0); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(!delta_writer.try_add_node(node2, heartbeat)); + assert!(!delta_writer.try_add_node(node2, 0u64)); // The block got compressed. test_aux_delta_writer(delta_writer, 72); @@ -656,9 +648,8 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(100); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); // +37 bytes. - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -692,11 +683,10 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(100); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); // + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag) // = 40 - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); // +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag) // = 67 @@ -728,8 +718,7 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(62); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); assert!(delta_writer.try_add_kv( "key11", diff --git a/chitchat/src/failure_detector.rs b/chitchat/src/failure_detector.rs index d8b8038..b0ac77e 100644 --- a/chitchat/src/failure_detector.rs +++ b/chitchat/src/failure_detector.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::num::NonZeroUsize; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -32,21 +33,6 @@ impl FailureDetector { /// Reports node heartbeat. pub fn report_heartbeat(&mut self, chitchat_id: &ChitchatId) { debug!(node_id=%chitchat_id.node_id, "reporting node heartbeat."); - let heartbeat_window = self - .node_samples - .entry(chitchat_id.clone()) - .or_insert_with(|| { - SamplingWindow::new( - self.config.sampling_window_size, - self.config.max_interval, - self.config.initial_interval, - ) - }); - heartbeat_window.report_heartbeat(); - } - - pub fn report_unknown(&mut self, chitchat_id: &ChitchatId) { - debug!(node_id=%chitchat_id.node_id, "reporting unknown node heartbeat."); self.node_samples .entry(chitchat_id.clone()) .or_insert_with(|| { @@ -55,23 +41,30 @@ impl FailureDetector { self.config.max_interval, self.config.initial_interval, ) - }); + }) + .report_heartbeat(); } /// Marks the node as dead or alive based on the current phi value. pub fn update_node_liveness(&mut self, chitchat_id: &ChitchatId) { - if let Some(phi) = self.phi(chitchat_id) { - debug!(node_id=%chitchat_id.node_id, phi=phi, "updating node liveness"); - if phi > self.config.phi_threshold { - self.live_nodes.remove(chitchat_id); + let phi_opt = self.phi(chitchat_id); + let is_alive = self + .phi(chitchat_id) + .map(|phi| phi <= self.config.phi_threshold) + .unwrap_or(false); + debug!(node_id=%chitchat_id.node_id, phi=?phi_opt, is_alive=is_alive, "computing node liveness"); + if is_alive { + self.live_nodes.insert(chitchat_id.clone()); + self.dead_nodes.remove(chitchat_id); + } else { + self.live_nodes.remove(chitchat_id); + if !self.dead_nodes.contains_key(&chitchat_id) { self.dead_nodes.insert(chitchat_id.clone(), Instant::now()); - // Remove current sampling window so that when the node - // comes back online, we start with a fresh sampling window. - self.node_samples.remove(chitchat_id); - } else { - self.live_nodes.insert(chitchat_id.clone()); - self.dead_nodes.remove(chitchat_id); } + // Remove current sampling window so that when the node + // comes back online, we start with a fresh sampling window. + // TODO is this the right idea? + // self.node_samples.remove(chitchat_id); } } @@ -118,10 +111,10 @@ impl FailureDetector { } /// Returns the current phi value of a node. + /// + /// If we have received less than 2 heartbeat, `phi()` returns `None`. fn phi(&mut self, chitchat_id: &ChitchatId) -> Option { - self.node_samples - .get(chitchat_id) - .map(|sampling_window| sampling_window.phi()) + self.node_samples.get(chitchat_id)?.phi() } } @@ -170,6 +163,18 @@ impl Default for FailureDetectorConfig { } } +#[derive(Debug)] +struct AdditiveSmoothing { + prior_mean: f64, + prior_weight: f64, +} + +impl AdditiveSmoothing { + fn compute_mean(&self, len: NonZeroUsize, sum: f64) -> f64 { + (sum + self.prior_weight * self.prior_mean) / (len.get() as f64 + self.prior_weight) + } +} + /// A fixed-sized window that keeps track of the most recent heartbeat arrival intervals. #[derive(Debug)] struct SamplingWindow { @@ -179,44 +184,55 @@ struct SamplingWindow { last_heartbeat: Option, /// Heartbeat intervals greater than this value are ignored. max_interval: Duration, - /// The initial interval on startup. - initial_interval: Duration, + /// We may not have many intervals in the beginning. + /// For this reason we use additive smoothing to make sure we are + /// lenient on the first few intervals, and we don't have nodes flapping from + /// life to death. + additive_smoothing: AdditiveSmoothing, } impl SamplingWindow { // Construct a new instance. - pub fn new(window_size: usize, max_interval: Duration, initial_interval: Duration) -> Self { - Self { - intervals: BoundedArrayStats::new(window_size), - last_heartbeat: None, + pub fn new(window_size: usize, max_interval: Duration, prior_interval: Duration) -> Self { + let additive_smoothing = AdditiveSmoothing { + prior_mean: prior_interval.as_secs_f64(), + prior_weight: 5.0f64, + }; + SamplingWindow { + intervals: BoundedArrayStats::with_capacity(window_size), + last_heartbeat: None, max_interval, - initial_interval, + additive_smoothing, } } /// Reports a heartbeat. pub fn report_heartbeat(&mut self) { - if let Some(last_value) = &self.last_heartbeat { - let interval = last_value.elapsed(); + let now = Instant::now(); + if let Some(last_value) = self.last_heartbeat { + let interval = now.duration_since(last_value); if interval <= self.max_interval { self.intervals.append(interval.as_secs_f64()); } } else { - self.intervals.append(self.initial_interval.as_secs_f64()); - }; - self.last_heartbeat = Some(Instant::now()); + // This is our first heartbeat. + // No way to compute an interval. + // This is fine. + } + self.last_heartbeat = Some(now); } /// Computes the sampling window's phi value. - pub fn phi(&self) -> f64 { - if let Some(last_heartbeat) = self.last_heartbeat { - assert!(self.intervals.mean() > 0.0); - let elapsed_time = last_heartbeat.elapsed().as_secs_f64(); - elapsed_time / self.intervals.mean() - } else { - // if we phi is called before we have a sample, we assume the node isn't really alive. - f64::INFINITY - } + /// Returns `None` if have not received two heartbeat yet. + pub fn phi(&self) -> Option { + // We avoid computing phi if we have only received one heartbeat. + // It could be data from an old dead node after all. + let len_non_zero = NonZeroUsize::new(self.intervals.len())?; + let sum = self.intervals.sum(); + let last_heartbeat = self.last_heartbeat?; + let interval_mean = self.additive_smoothing.compute_mean(len_non_zero, sum); + let elapsed_time = last_heartbeat.elapsed().as_secs_f64(); + Some(elapsed_time / interval_mean) } } @@ -224,57 +240,48 @@ impl SamplingWindow { #[derive(Debug)] struct BoundedArrayStats { /// The values. - data: Vec, - /// Number of accumulated values. - size: usize, + values: Box<[f64]>, /// Is the values array filled? is_filled: bool, - /// Position of the index within the values array. + /// Position of the next value to be written in the values array. index: usize, /// The accumulated sum of values. sum: f64, - /// The accumulated mean of values. - mean: f64, } impl BoundedArrayStats { - pub fn new(size: usize) -> Self { + pub fn with_capacity(capacity: usize) -> Self { Self { - data: vec![0.0; size], - size, + values: vec![0.0; capacity].into_boxed_slice(), is_filled: false, index: 0, sum: 0.0, - mean: 0.0, } } /// Returns the mean. - pub fn mean(&self) -> f64 { - self.mean + pub fn sum(&self) -> f64 { + self.sum } /// Appends a new value and updates the statistics. pub fn append(&mut self, interval: f64) { - if self.index == self.size { - self.is_filled = true; - self.index = 0; - } - if self.is_filled { - self.sum -= self.data[self.index]; + self.sum -= self.values[self.index]; } + self.values[self.index] = interval; self.sum += interval; - - self.data[self.index] = interval; - self.index += 1; - - self.mean = self.sum / self.len() as f64; + if self.index == self.values.len() - 1 { + self.is_filled = true; + self.index = 0; + } else { + self.index += 1; + } } fn len(&self) -> usize { if self.is_filled { - return self.size; + return self.values.len(); } self.index } @@ -282,6 +289,7 @@ impl BoundedArrayStats { #[cfg(test)] mod tests { + use std::collections::VecDeque; use std::time::Duration; use rand::prelude::*; @@ -290,6 +298,17 @@ mod tests { use crate::failure_detector::{FailureDetector, FailureDetectorConfig}; use crate::ChitchatId; + #[test] + fn test_failure_detector_does_not_see_a_node_as_alive_with_a_single_heartbeat() { + let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); + let chitchat_id = ChitchatId::for_local_test(10_001); + failure_detector.report_heartbeat(&chitchat_id); + failure_detector.update_node_liveness(&chitchat_id); + let dead_nodes: Vec<&ChitchatId> = failure_detector.dead_nodes().collect(); + assert_eq!(dead_nodes.len(), 1); + assert!(failure_detector.live_nodes().next().is_none()); + } + #[tokio::test] async fn test_failure_detector() { tokio::time::pause(); @@ -413,14 +432,22 @@ mod tests { } #[tokio::test] - async fn test_failure_detector_node_state_after_initial_interval() { + async fn test_failure_detector_node_state_additive_smoothing_predominant_in_the_beginning() { tokio::time::pause(); - let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); + let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); + // We add a few very short samples. let chitchat_id = ChitchatId::for_local_test(10_001); failure_detector.report_heartbeat(&chitchat_id); tokio::time::advance(Duration::from_secs(1)).await; + let chitchat_id = ChitchatId::for_local_test(10_001); + for _ in 0..5 { + tokio::time::advance(Duration::from_millis(200)).await; + failure_detector.report_heartbeat(&chitchat_id); + } + + tokio::time::advance(Duration::from_secs(6)).await; failure_detector.update_node_liveness(&chitchat_id); let live_nodes = failure_detector @@ -428,6 +455,7 @@ mod tests { .map(|chitchat_id| chitchat_id.node_id.as_str()) .collect::>(); assert_eq!(live_nodes, vec!["node-10001"]); + tokio::time::advance(Duration::from_secs(40)).await; failure_detector.update_node_liveness(&chitchat_id); @@ -438,6 +466,29 @@ mod tests { assert_eq!(live_nodes, Vec::<&str>::new()); } + #[tokio::test] + async fn test_failure_detector_node_state_additive_smoothing_effect_fades_off() { + tokio::time::pause(); + let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); + + // We add a few very short samples. + let chitchat_id = ChitchatId::for_local_test(10_001); + failure_detector.report_heartbeat(&chitchat_id); + + let chitchat_id = ChitchatId::for_local_test(10_001); + for _ in 0..1000 { + tokio::time::advance(Duration::from_millis(200)).await; + failure_detector.report_heartbeat(&chitchat_id); + } + + tokio::time::advance(Duration::from_secs(6)).await; + + failure_detector.update_node_liveness(&chitchat_id); + + assert!(failure_detector.live_nodes().next().is_none()); + } + + #[tokio::test] async fn test_sampling_window() { tokio::time::pause(); @@ -448,43 +499,57 @@ mod tests { tokio::time::advance(Duration::from_secs(3)).await; sampling_window.report_heartbeat(); - // Now intervals window is: [2.0, 3.0]. - let mean = (2.0 + 3.0) / 2.0; + // Now intervals window is: [3.0]. + let mean = (3.0 + 2.0 * 5.0) / (1.0f64 + 5.0f64); // 0s elapsed since last reported heartbeat. - assert!((sampling_window.phi() - (0.0 / mean)).abs() < f64::EPSILON); + assert_nearly_equal(sampling_window.phi().unwrap(), 0.0f64); // 1s elapsed since last reported heartbeat. tokio::time::advance(Duration::from_secs(1)).await; - assert!((sampling_window.phi() - (1.0 / mean)).abs() < f64::EPSILON); + assert_nearly_equal(sampling_window.phi().unwrap(), 1.0f64 / mean); // Check reported heartbeat later than max_interval is ignore. tokio::time::advance(Duration::from_secs(5)).await; sampling_window.report_heartbeat(); tokio::time::advance(Duration::from_secs(2)).await; + + assert_nearly_equal(sampling_window.phi().unwrap(), 2.0f64 / mean); + } + + #[track_caller] + fn assert_nearly_equal(value: f64, expected: f64) { assert!( - (sampling_window.phi() - (2.0 / mean)).abs() < f64::EPSILON, - "Mean value should not change." + (value - expected).abs() < f64::EPSILON, + "value ({value}) is not not nearly equal to expected {expected}" ); } #[test] fn test_bounded_array_stats() { - let mut bounded_array = BoundedArrayStats::new(10); - for i in 1..10 { + let capacity = 5; + let mut bounded_array = BoundedArrayStats::with_capacity(capacity); + let mut queue: VecDeque = VecDeque::new(); + for i in 1..=capacity { + assert!(bounded_array.len() < capacity); + assert!(!bounded_array.is_filled); bounded_array.append(i as f64); + queue.push_back(i as f64); + assert_eq!(bounded_array.len(), i); + assert_eq!(queue.len(), i); + assert_nearly_equal(bounded_array.sum(), queue.iter().copied().sum::()); } - assert_eq!(bounded_array.index, 9); - assert_eq!(bounded_array.len(), 9); - assert!(!bounded_array.is_filled); - assert!((bounded_array.mean() - 5.0f64).abs() < f64::EPSILON); - for i in 10..14 { + assert!(bounded_array.is_filled); + assert_nearly_equal(bounded_array.sum(), queue.iter().copied().sum()); + + for i in capacity + 1..capacity * 2 { bounded_array.append(i as f64); + queue.push_back(i as f64); + queue.pop_front(); + assert_nearly_equal(bounded_array.sum(), queue.iter().copied().sum::()); + assert_eq!(queue.len(), capacity); + assert_eq!(bounded_array.len(), capacity); } - assert_eq!(bounded_array.index, 3); - assert_eq!(bounded_array.len(), 10); - assert!(bounded_array.is_filled); - assert!((bounded_array.mean() - 8.5f64).abs() < f64::EPSILON); } } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index f21b7bb..6064611 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -76,7 +76,7 @@ impl Chitchat { let self_node_state = chitchat.self_node_state(); // Immediately mark the node as alive to ensure it responds to SYN messages. - self_node_state.update_heartbeat(); + self_node_state.inc_heartbeat(); // Set initial key/value pairs. for (key, value) in initial_key_values { @@ -94,15 +94,19 @@ impl Chitchat { } } + /// Digest contains important information about the list of members in + /// the cluster. + fn report_heartbeats_in_digest(&mut self, digest: &Digest) { + for (chitchat_id, node_digest) in &digest.node_digests { + self.report_heartbeat(chitchat_id, node_digest.heartbeat); + } + } + fn process_delta(&mut self, delta: Delta) { - // Warning: order matters here. - // `report_heartbeats` will compare the current known heartbeat with the one - // in the delta, while `apply_delta` is actually updating this heartbeat. - self.report_heartbeats(&delta); self.cluster_state.apply_delta(delta); } - pub(crate) fn process_message(&mut self, msg: ChitchatMessage) -> Option { + self.update_self_heartbeat(); match msg { ChitchatMessage::Syn { cluster_id, digest } => { if cluster_id != self.cluster_id() { @@ -113,7 +117,7 @@ impl Chitchat { ); return Some(ChitchatMessage::BadCluster); } - + self.report_heartbeats_in_digest(&digest); let scheduled_for_deletion: HashSet<_> = self.scheduled_for_deletion_nodes().collect(); let self_digest = self.compute_digest(&scheduled_for_deletion); @@ -129,6 +133,7 @@ impl Chitchat { }) } ChitchatMessage::SynAck { digest, delta } => { + self.report_heartbeats_in_digest(&digest); self.process_delta(delta); let scheduled_for_deletion = self.scheduled_for_deletion_nodes().collect::>(); @@ -157,32 +162,21 @@ impl Chitchat { /// Reports heartbeats to the failure detector for nodes in the delta for which we received an /// update. - fn report_heartbeats(&mut self, delta: &Delta) { - for node_delta in &delta.node_deltas { - if let Some(node_state) = self.cluster_state.node_states.get(&node_delta.chitchat_id) { - if node_state.heartbeat() < node_delta.heartbeat { - self.failure_detector - .report_heartbeat(&node_delta.chitchat_id); - } - } else { - self.failure_detector - .report_unknown(&node_delta.chitchat_id); - self.failure_detector - .update_node_liveness(&node_delta.chitchat_id); - } + fn report_heartbeat(&mut self, chitchat_id: &ChitchatId, heartbeat: Heartbeat) { + let node_state = self.cluster_state.node_state_mut(chitchat_id); + if node_state.try_set_heartbeat(heartbeat) { + self.failure_detector.report_heartbeat(chitchat_id); } } /// Marks the node as dead or alive depending on the new phi values and updates the live nodes /// watcher accordingly. pub(crate) fn update_nodes_liveness(&mut self) { - self.cluster_state - .nodes() - .filter(|&chitchat_id| *chitchat_id != self.config.chitchat_id) - .for_each(|chitchat_id| { + for chitchat_id in self.cluster_state.nodes() { + if chitchat_id != &self.config.chitchat_id { self.failure_detector.update_node_liveness(chitchat_id); - }); - + } + } let current_live_nodes = self .live_nodes() .map(|chitchat_id| { @@ -277,8 +271,8 @@ impl Chitchat { ClusterStateSnapshot::from(&self.cluster_state) } - pub(crate) fn update_heartbeat(&mut self) { - self.self_node_state().update_heartbeat(); + pub(crate) fn update_self_heartbeat(&mut self) { + self.self_node_state().inc_heartbeat(); } pub(crate) fn cluster_state(&self) -> &ClusterState { @@ -578,7 +572,7 @@ mod tests { } #[tokio::test] - async fn test_dead_node_kvs_are_when_node_joins() -> anyhow::Result<()> { + async fn test_dead_node_kvs_are_gossiped_too_when_node_joins() -> anyhow::Result<()> { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); // starting 2 nodes. let mut nodes = setup_nodes(40001..=40002, &transport).await; @@ -607,7 +601,7 @@ mod tests { ) .await; let node2_chitchat = node2.chitchat(); - // We have received node3's key + // We have received node1's key let value = node2_chitchat .lock() .await @@ -621,30 +615,30 @@ mod tests { // Take node 1 down. let node1 = nodes.remove(0); - assert_eq!(node1.chitchat_id().advertise_port(), 40001); + assert_eq!(node1.chitchat_id().advertise_port(), 40_001); node1.shutdown().await.unwrap(); // Node 2 has detected that node 1 is missing. - { + let node_id2 = { let node2 = nodes.first().unwrap(); - assert_eq!(node2.chitchat_id().advertise_port(), 40002); + assert_eq!(node2.chitchat_id().advertise_port(), 40_002); wait_for_chitchat_state(node2.chitchat(), &[ChitchatId::for_local_test(40_002)]).await; - } + node2.chitchat_id().clone() + }; // Restart node at localhost:40001 with new name let mut new_config = ChitchatConfig::for_test(40_001); new_config.chitchat_id.node_id = "new_node".to_string(); let new_chitchat_id = new_config.chitchat_id.clone(); - let seed_addr = ChitchatId::for_local_test(40_001).gossip_advertise_addr; + let seed_addr = ChitchatId::for_local_test(40_002).gossip_advertise_addr; new_config.seed_nodes = vec![seed_addr.to_string()]; let new_node_chitchat_handle = spawn_chitchat(new_config, Vec::new(), &transport) .await .unwrap(); - let new_node_chitchat = new_node_chitchat_handle.chitchat(); wait_for_chitchat_state( new_node_chitchat.clone(), - &[ChitchatId::for_local_test(40_002), new_chitchat_id], + &[new_chitchat_id.clone(), node_id2.clone()], ) .await; @@ -734,6 +728,7 @@ mod tests { .node_state(&dead_chitchat_id) .is_some()); } + // Wait a bit more than `dead_node_grace_period` since all nodes will not // notice cluster change at the same time. let wait_for = DEAD_NODE_GRACE_PERIOD.add(Duration::from_secs(5)); @@ -748,6 +743,7 @@ mod tests { .node_state(&dead_chitchat_id) .is_none()); } + shutdown_nodes(nodes).await?; Ok(()) } diff --git a/chitchat/src/message.rs b/chitchat/src/message.rs index 8952669..d2b2caa 100644 --- a/chitchat/src/message.rs +++ b/chitchat/src/message.rs @@ -163,7 +163,7 @@ mod tests { let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), Heartbeat(0)); + delta.add_node(node.clone(), 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); delta.set_serialized_len(62); @@ -186,7 +186,7 @@ mod tests { let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), Heartbeat(0)); + delta.add_node(node.clone(), 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); delta.set_serialized_len(62); diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 4124607..e5c656c 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -272,7 +272,12 @@ impl Server { .dead_nodes() .map(|chitchat_id| chitchat_id.gossip_advertise_addr) .collect::>(); - let seed_nodes: HashSet = chitchat_guard.seed_nodes(); + let seed_nodes: HashSet = chitchat_guard.seed_nodes() + .into_iter() + .filter(|addr| { + *addr != chitchat_guard.self_chitchat_id().gossip_advertise_addr + }) + .collect(); let (selected_nodes, random_dead_node_opt, random_seed_node_opt) = select_nodes_for_gossip( &mut self.rng, peer_nodes, @@ -281,7 +286,7 @@ impl Server { seed_nodes, ); - chitchat_guard.update_heartbeat(); + chitchat_guard.update_self_heartbeat(); chitchat_guard.gc_keys_marked_for_deletion(); // Drop lock to prevent deadlock in [`UdpSocket::gossip`]. @@ -579,13 +584,21 @@ mod tests { // Add our test socket to the server's nodes. server_handle .with_chitchat(|server_chitchat| { - server_chitchat.update_heartbeat(); + server_chitchat.update_self_heartbeat(); let syn = server_chitchat.create_syn_message(); let syn_ack = test_chitchat.process_message(syn).unwrap(); server_chitchat.process_message(syn_ack); }) .await; + let node_state = test_chitchat + .cluster_state() + .node_state(&server_id) + .unwrap(); + let heartbeat = node_state.heartbeat(); + + assert_eq!(heartbeat, Heartbeat(2)); + // Wait for syn, with updated heartbeat let (_, syn) = timeout(test_transport.recv()).await.unwrap(); @@ -602,8 +615,8 @@ mod tests { }; let node_delta = delta.get(&server_id).unwrap(); - let heartbeat = node_delta.heartbeat; - assert_eq!(heartbeat, Heartbeat(3)); + let heartbeat = node_delta.last_gc_version; + assert_eq!(heartbeat, 0u64); server_handle.shutdown().await.unwrap(); } diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index b719750..d1b3574 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -30,7 +30,7 @@ pub struct NodeState { // // After we GC the tombstones, we cannot safely do replication with // nodes that are asking for a diff from with version lower than this. - max_garbage_collected_tombstone_version: Option, + last_gc_version: Version, } impl Debug for NodeState { @@ -49,9 +49,9 @@ impl NodeState { node_id, heartbeat: Heartbeat(0), key_values: Default::default(), - max_version: Default::default(), + max_version: 0u64, listeners, - max_garbage_collected_tombstone_version: None, + last_gc_version: 0u64, } } @@ -66,7 +66,7 @@ impl NodeState { key_values: Default::default(), max_version: Default::default(), listeners: Listeners::default(), - max_garbage_collected_tombstone_version: None, + last_gc_version: 0u64, } } @@ -146,10 +146,22 @@ impl NodeState { versioned_value.value = "".to_string(); } - pub(crate) fn update_heartbeat(&mut self) { + pub(crate) fn inc_heartbeat(&mut self) { self.heartbeat.inc(); } + /// Attempts to set the heartbeat of another node. + /// If the value is actually not an update, just ignore the data and return false. + /// Otherwise, returns true. + pub fn try_set_heartbeat(&mut self, heartbeat_new_value: Heartbeat) -> bool { + if heartbeat_new_value > self.heartbeat { + self.heartbeat = heartbeat_new_value; + true + } else { + false + } + } + fn digest(&self) -> NodeDigest { NodeDigest { heartbeat: self.heartbeat, @@ -168,7 +180,7 @@ impl NodeState { /// Removes the keys marked for deletion such that `tombstone + grace_period > heartbeat`. fn gc_keys_marked_for_deletion(&mut self, grace_period: Duration) { let now = Instant::now(); - let mut max_deleted_version = self.max_garbage_collected_tombstone_version; + let mut max_deleted_version = self.last_gc_version; self.key_values .retain(|_, versioned_value: &mut VersionedValue| { let Some(deleted_instant) = versioned_value.tombstone else { @@ -180,10 +192,10 @@ impl NodeState { return true; } // We have exceeded the tombstone grace period. Time to remove it. - max_deleted_version = Some(versioned_value.version).max(max_deleted_version); + max_deleted_version = versioned_value.version.max(max_deleted_version); false }); - self.max_garbage_collected_tombstone_version = max_deleted_version; + self.last_gc_version = max_deleted_version; } /// Returns an iterator over the versioned values that are strictly greater than @@ -280,6 +292,8 @@ impl ClusterState { pub(crate) fn node_state_mut(&mut self, chitchat_id: &ChitchatId) -> &mut NodeState { // TODO use the `hash_raw_entry` feature once it gets stabilized. + // Most of the time the entry is already present. We avoid cloning chitchat_id with + // this if statement. self.node_states .entry(chitchat_id.clone()) .or_insert_with(|| NodeState::new(chitchat_id.clone(), self.listeners.clone())) @@ -303,6 +317,9 @@ impl ClusterState { pub(crate) fn apply_delta(&mut self, delta: Delta) { // Remove nodes to reset. + if delta.nodes_to_reset.len() > 0 { + tracing::error!(nodes_to_reset=?delta.nodes_to_reset, "nodes to reset"); + }; self.node_states .retain(|chitchat_id, _| !delta.nodes_to_reset.contains(chitchat_id)); @@ -310,15 +327,12 @@ impl ClusterState { for node_delta in delta.node_deltas { let NodeDelta { chitchat_id, - heartbeat, + last_gc_version, key_values, } = node_delta; - let node_state = self - .node_states - .entry(chitchat_id.clone()) - .or_insert_with(|| NodeState::new(chitchat_id, self.listeners.clone())); - if node_state.heartbeat < heartbeat { - node_state.heartbeat = heartbeat; + let node_state = self.node_state_mut(&chitchat_id); + if node_state.last_gc_version == 0u64 { + node_state.last_gc_version = last_gc_version; } for (key, versioned_value) in key_values { node_state.max_version = node_state.max_version.max(versioned_value.version); @@ -364,12 +378,7 @@ impl ClusterState { stale_nodes.insert(chitchat_id, node_state); continue; }; - let should_reset = - if let Some(max_gc_version) = node_state.max_garbage_collected_tombstone_version { - max_gc_version >= node_digest.max_version - } else { - false - }; + let should_reset = node_state.last_gc_version > node_digest.max_version; if should_reset { warn!("Node to reset {chitchat_id:?}"); nodes_to_reset.push(chitchat_id); @@ -387,7 +396,7 @@ impl ClusterState { } for stale_node in stale_nodes.into_iter() { - if !delta_serializer.try_add_node(stale_node.chitchat_id.clone(), stale_node.heartbeat) + if !delta_serializer.try_add_node(stale_node.chitchat_id.clone(), stale_node.node_state.last_gc_version) { break; } @@ -432,12 +441,9 @@ impl<'a> SortedStaleNodes<'a> { /// Adds a node to the list of stale nodes. fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) { let staleness = node_state.num_key_values() + 1; // +1 for the heartbeat. - let heartbeat = node_state.heartbeat; let floor_version = 0; - let stale_node = StaleNode { chitchat_id, - heartbeat, node_state, floor_version, }; @@ -463,7 +469,6 @@ impl<'a> SortedStaleNodes<'a> { if staleness > 0 { let stale_node = StaleNode { chitchat_id, - heartbeat, node_state, floor_version, }; @@ -493,7 +498,6 @@ impl<'a> SortedStaleNodes<'a> { #[derive(Debug)] struct StaleNode<'a> { chitchat_id: &'a ChitchatId, - heartbeat: Heartbeat, node_state: &'a NodeState, floor_version: u64, } @@ -562,7 +566,6 @@ mod tests { let node_state = NodeState::for_test(); let stale_node = StaleNode { chitchat_id: &node, - heartbeat: Heartbeat(0), node_state: &node_state, floor_version: 0, }; @@ -583,7 +586,6 @@ mod tests { let stale_node = StaleNode { chitchat_id: &node, - heartbeat: Heartbeat(0), node_state: &node_state, floor_version: 1, }; @@ -687,7 +689,6 @@ mod tests { let mut stale_nodes = SortedStaleNodes::default(); let stale_node1 = StaleNode { chitchat_id: &ChitchatId::for_local_test(10_001), - heartbeat: Heartbeat(0), node_state: &NodeState::for_test(), floor_version: 0, }; @@ -695,19 +696,16 @@ mod tests { let stale_node2 = StaleNode { chitchat_id: &ChitchatId::for_local_test(10_002), - heartbeat: Heartbeat(0), node_state: &NodeState::for_test(), floor_version: 0, }; let stale_node3 = StaleNode { chitchat_id: &ChitchatId::for_local_test(10_003), - heartbeat: Heartbeat(0), node_state: &NodeState::for_test(), floor_version: 0, }; let stale_node4 = StaleNode { chitchat_id: &ChitchatId::for_local_test(10_004), - heartbeat: Heartbeat(0), node_state: &NodeState::for_test(), floor_version: 0, }; @@ -914,13 +912,13 @@ mod tests { node2_state.set_with_version("key_c".to_string(), "3".to_string(), 1); // 1 let mut delta = Delta::default(); - delta.add_node(node1.clone(), Heartbeat(0)); + delta.add_node(node1.clone(), 0u64); delta.add_kv(&node1, "key_a", "4", 4, false); delta.add_kv(&node1, "key_b", "2", 2, false); // Reset node 2. delta.add_node_to_reset(node2.clone()); - delta.add_node(node2.clone(), Heartbeat(0)); + delta.add_node(node2.clone(), 0u64); delta.add_kv(&node2, "key_d", "4", 4, false); cluster_state.apply_delta(delta); @@ -981,7 +979,7 @@ mod tests { for (num_entries, &mtu) in mtu_per_num_entries.iter().enumerate() { let mut expected_delta = Delta::default(); for &(node, key, val, version, tombstone) in &expected_delta_atoms[..num_entries] { - expected_delta.add_node(node.clone(), Heartbeat(0)); + expected_delta.add_node(node.clone(), 0u64); expected_delta.add_kv(node, key, val, version, tombstone); } { @@ -1130,11 +1128,11 @@ mod tests { ); assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); - expected_delta.add_node(node1.clone(), Heartbeat(10_000)); + expected_delta.add_node(node1.clone(), 0u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.add_node(node2.clone(), Heartbeat(0)); + expected_delta.add_node(node2.clone(), 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.set_serialized_len(78); + expected_delta.set_serialized_len(73); assert_eq!(delta, expected_delta); } @@ -1155,12 +1153,12 @@ mod tests { ); assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); - expected_delta.add_node(node1.clone(), Heartbeat(10_000)); + expected_delta.add_node(node1.clone(), 0u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); expected_delta.add_kv(&node1, "key_a", "", 3, true); - expected_delta.add_node(node2.clone(), Heartbeat(0)); + expected_delta.add_node(node2.clone(), 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.set_serialized_len(90); + expected_delta.set_serialized_len(83); assert_eq!(delta, expected_delta); } @@ -1181,12 +1179,12 @@ mod tests { ); let mut expected_delta = Delta::default(); expected_delta.add_node_to_reset(node1.clone()); - expected_delta.add_node(node1.clone(), Heartbeat(10_000)); + expected_delta.add_node(node1.clone(), 3u64); // expected_delta.add_kv(&node1, "key_a", "1", 1, false); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.add_node(node2.clone(), Heartbeat(0)); + expected_delta.add_node(node2.clone(), 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.set_serialized_len(81); + expected_delta.set_serialized_len(80); assert_eq!(delta, expected_delta); } } diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index afcf37b..1e14e35 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -122,7 +122,7 @@ pub struct Heartbeat(pub(crate) u64); impl Heartbeat { pub(crate) fn inc(&mut self) { - self.0 += 1; + self.0 = self.0.wrapping_add(1); } } diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index 807a380..8cdc7f1 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -232,6 +232,14 @@ pub fn create_chitchat_id(id: &str) -> ChitchatId { } } +pub fn test_chitchat_id(port: u16) -> ChitchatId { + ChitchatId { + node_id: format!("node_{port}"), + generation_id: 0, + gossip_advertise_addr: ([127, 0, 0, 1], port).into(), + } +} + /// Copy-pasted from Quickwit repo. /// Finds a random available TCP port. /// @@ -324,15 +332,88 @@ async fn test_simple_simulation_with_network_partition() { simulator.execute(operations).await; } + + #[tokio::test] -async fn test_marked_for_deletion_gc_with_network_partition() { - const TIMEOUT: Duration = Duration::from_millis(500); +async fn test_marked_for_deletion_gc_with_network_partition_2_nodes() { // let _ = tracing_subscriber::fmt::try_init(); + const TIMEOUT: Duration = Duration::from_millis(500); let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); - let chitchat_id_1 = create_chitchat_id("node-1"); - let chitchat_id_2 = create_chitchat_id("node-2"); - let chitchat_id_3 = create_chitchat_id("node-3"); - let chitchat_id_4 = create_chitchat_id("node-4"); + let chitchat_id_1 = test_chitchat_id(1); + let chitchat_id_2 = test_chitchat_id(2); + let peer_seeds = vec![ + chitchat_id_1.clone(), + chitchat_id_2.clone(), + ]; + let operations = vec![ + Operation::AddNode { + chitchat_id: chitchat_id_1.clone(), + peer_seeds: Some(peer_seeds.clone()), + }, + Operation::AddNode { + chitchat_id: chitchat_id_2.clone(), + peer_seeds: Some(peer_seeds.clone()), + }, + Operation::InsertKeysValues { + chitchat_id: chitchat_id_1.clone(), + keys_values: vec![("key_a".to_string(), "0".to_string())], + }, + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), + timeout_opt: Some(TIMEOUT), + }, + // Isolate node 2. + Operation::RemoveNetworkLink(chitchat_id_1.clone(), chitchat_id_2.clone()), + Operation::Wait(Duration::from_secs(5)), + // Mark for deletion key. + Operation::MarkKeyForDeletion { + chitchat_id: chitchat_id_1.clone(), + key: "key_a".to_string(), + }, + // Check marked for deletion is not propagated to node 3. + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), false), + timeout_opt: None, + }, + // Wait for garbage collection: grace period * heartbeat ~ 1 second + margin of 1 second. + Operation::Wait(Duration::from_secs(2)), + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_1.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), + timeout_opt: None, + }, + Operation::Wait(TIMEOUT), + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), + timeout_opt: None, + }, + // Relink node 2 + Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_2.clone()), + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + // The key should be deleted. + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), + timeout_opt: Some(TIMEOUT), + }, + ]; + simulator.execute(operations).await; +} +#[tokio::test] +async fn test_marked_for_deletion_gc_with_network_partition_4_nodes() { + const TIMEOUT: Duration = Duration::from_millis(500); + let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); + let chitchat_id_1 = test_chitchat_id(1); + let chitchat_id_2 = test_chitchat_id(2); + let chitchat_id_3 = test_chitchat_id(3); + let chitchat_id_4 = test_chitchat_id(4); let peer_seeds = vec![ chitchat_id_1.clone(), chitchat_id_2.clone(), @@ -431,6 +512,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { Operation::NodeStateAssert { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), + // The key should be deleted. predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), timeout_opt: Some(TIMEOUT), }, @@ -438,7 +520,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_4.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), - timeout_opt: Some(TIMEOUT), + timeout_opt: Some(TIMEOUT * 10), }, ]; simulator.execute(operations).await;