From fa59b4c8b5d15638274d3dc4f3037f2163f790aa Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 23 Feb 2024 14:57:16 +0900 Subject: [PATCH] Resetting interval window when a node starts failing. (#126) --- chitchat/src/failure_detector.rs | 38 ++++++++++++++++++++++++++++---- chitchat/src/state.rs | 11 +++++++-- chitchat/src/types.rs | 10 +++++++-- 3 files changed, 51 insertions(+), 8 deletions(-) diff --git a/chitchat/src/failure_detector.rs b/chitchat/src/failure_detector.rs index 2415642..027dfa1 100644 --- a/chitchat/src/failure_detector.rs +++ b/chitchat/src/failure_detector.rs @@ -61,10 +61,11 @@ impl FailureDetector { if !self.dead_nodes.contains_key(chitchat_id) { self.dead_nodes.insert(chitchat_id.clone(), Instant::now()); } - // Remove current sampling window so that when the node + // Remove all samples, so that when the node // comes back online, we start with a fresh sampling window. - // TODO is this the right idea? - // self.node_samples.remove(chitchat_id); + if let Some(node_sample) = self.node_samples.get_mut(chitchat_id) { + node_sample.reset(); + } } } @@ -77,8 +78,9 @@ impl FailureDetector { garbage_collected_nodes.push(chitchat_id.clone()) } } - for chitchat_id in garbage_collected_nodes.iter() { + for chitchat_id in &garbage_collected_nodes { self.dead_nodes.remove(chitchat_id); + self.node_samples.remove(chitchat_id); } garbage_collected_nodes } @@ -222,6 +224,11 @@ impl SamplingWindow { self.last_heartbeat = Some(now); } + /// Forget about all previous intervals. + pub fn reset(&mut self) { + self.intervals.clear(); + } + /// Computes the sampling window's phi value. /// Returns `None` if have not received two heartbeat yet. pub fn phi(&self) -> Option { @@ -279,6 +286,12 @@ impl BoundedArrayStats { } } + pub fn clear(&mut self) { + self.index = 0; + self.is_filled = false; + self.sum = 0f64; + } + fn len(&self) -> usize { if self.is_filled { return self.values.len(); @@ -514,6 +527,23 @@ mod tests { tokio::time::advance(Duration::from_secs(2)).await; assert_nearly_equal(sampling_window.phi().unwrap(), 2.0f64 / mean); + + tokio::time::advance(Duration::from_secs(100)).await; + sampling_window.reset(); + + // To revive, a single sample is not sufficient. + sampling_window.report_heartbeat(); + assert!(sampling_window.phi().is_none()); + + tokio::time::advance(Duration::from_secs(2)).await; + sampling_window.report_heartbeat(); + + tokio::time::advance(Duration::from_secs(4)).await; + + // Now intervals window is: [2.0]. With additive smoothing we get: + let new_mean = (2.0 + 2.0 * 5.0) / (1.0f64 + 5.0f64); + + assert_nearly_equal(sampling_window.phi().unwrap(), 4.0f64 / new_mean); } #[track_caller] diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 032b5b4..4115c73 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -413,9 +413,16 @@ impl ClusterState { stale_nodes.insert(chitchat_id, node_state); continue; }; - let should_reset = node_state.last_gc_version > node_digest.max_version; + // TODO: We have problem here. If after the delta we end up with a max version that is + // not high enough to bring us to `last_gc_version`, we might get reset again + // and again. + let should_reset = + node_state.last_gc_version > node_digest.max_version && node_digest.max_version > 0; if should_reset { - warn!("Node to reset {chitchat_id:?}"); + warn!( + "Node to reset {chitchat_id:?} last gc version: {} max version: {}", + node_state.last_gc_version, node_digest.max_version + ); nodes_to_reset.push(chitchat_id); stale_nodes.insert(chitchat_id, node_state); continue; diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index 1e14e35..0367cbc 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::{fmt::Debug, net::SocketAddr}; use serde::{Deserialize, Serialize}; use tokio::time::Instant; @@ -14,7 +14,7 @@ use tokio::time::Instant; /// leaves and rejoins the cluster. Backends such as Cassandra or Quickwit typically use the node's /// startup time as the `generation_id`. Applications with stable state across restarts can use a /// constant `generation_id`, for instance, `0`. -#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize)] +#[derive(Clone, Eq, PartialEq, Hash, Ord, PartialOrd, Serialize, Deserialize)] pub struct ChitchatId { /// An identifier unique across the cluster. pub node_id: String, @@ -24,6 +24,12 @@ pub struct ChitchatId { pub gossip_advertise_addr: SocketAddr, } +impl Debug for ChitchatId { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}:{}:{}", self.node_id.as_str(), self.generation_id, self.gossip_advertise_addr) + } +} + impl ChitchatId { pub fn new(node_id: String, generation_id: u64, gossip_advertise_addr: SocketAddr) -> Self { Self {