From ceab79475f20be698c3e0fabb20616f9796454bf Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 28 Feb 2024 22:58:30 +0900 Subject: [PATCH] Resetted nodes were briefly identified as alive. The reason is that we call report_heartbeat in the reset procedure. The purpose of this call is to make sure that we have a record in the failure detector for the node. This PR just creates the failure detector entry, and avoid recording a heartbeat change. This PR also makes sure we do not count the first heartbeat record as an event. Closes #132 --- chitchat/src/failure_detector.rs | 14 +++++++++++--- chitchat/src/lib.rs | 27 ++++++++++++++++++++++++--- chitchat/src/state.rs | 8 ++++++++ 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/chitchat/src/failure_detector.rs b/chitchat/src/failure_detector.rs index 6b80c81..2bbdf8d 100644 --- a/chitchat/src/failure_detector.rs +++ b/chitchat/src/failure_detector.rs @@ -31,8 +31,10 @@ impl FailureDetector { } /// Reports node heartbeat. - pub fn report_heartbeat(&mut self, chitchat_id: &ChitchatId) { - debug!(node_id=%chitchat_id.node_id, "reporting node heartbeat."); + pub(crate) fn get_or_create_sampling_window( + &mut self, + chitchat_id: &ChitchatId, + ) -> &mut SamplingWindow { self.node_samples .entry(chitchat_id.clone()) .or_insert_with(|| { @@ -42,6 +44,12 @@ impl FailureDetector { self.config.initial_interval, ) }) + } + + /// Reports node heartbeat. + pub fn report_heartbeat(&mut self, chitchat_id: &ChitchatId) { + debug!(node_id=%chitchat_id.node_id, "reporting node heartbeat."); + self.get_or_create_sampling_window(chitchat_id) .report_heartbeat(); } @@ -179,7 +187,7 @@ impl AdditiveSmoothing { /// A fixed-sized window that keeps track of the most recent heartbeat arrival intervals. #[derive(Debug)] -struct SamplingWindow { +pub(crate) struct SamplingWindow { /// The set of collected intervals. intervals: BoundedArrayStats, /// Last heartbeat reported time. diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 79ba476..6614114 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -313,9 +313,15 @@ impl Chitchat { if node_state.max_version() >= max_version { return; } - if node_state.max_version() == 0 { - self.failure_detector.report_heartbeat(chitchat_id); - } + + // We make sure that the node is listed in the failure detector, + // so that we won't forget to GC the state. + // + // We don't report the heartbeat however, to make sure that we + // avoid identifying resetted node as alive. + self.failure_detector + .get_or_create_sampling_window(chitchat_id); + // We don't want to call listeners for keys that are already up to date so we must do this // dance instead of clearing the node state and then setting the new values. let mut previous_keys: HashSet = node_state @@ -557,6 +563,21 @@ mod tests { assert_nodes_sync(&[&node1, &node2]); } + #[test] + fn test_chitchat_dead_node_liveness() { + let node_config1 = ChitchatConfig::for_test(10_001); + let empty_seeds = watch::channel(Default::default()).1; + let mut node1 = + Chitchat::with_chitchat_id_and_seeds(node_config1, empty_seeds.clone(), Vec::new()); + let chitchat_id = ChitchatId::for_local_test(10u16); + node1.reset_node_state(&chitchat_id, std::iter::empty(), 10_000, 10u64); + node1.report_heartbeat(&chitchat_id, Heartbeat(10_000u64)); + node1.report_heartbeat(&chitchat_id, Heartbeat(10_000u64)); + node1.update_nodes_liveness(); + let live_nodes: HashSet<&ChitchatId> = node1.live_nodes().collect(); + assert_eq!(live_nodes.len(), 1); + } + #[tokio::test] async fn test_live_node_channel() { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index a7d171a..8804718 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -175,8 +175,16 @@ impl NodeState { /// Attempts to set the heartbeat of another node. /// If the value is actually not an update, just ignore the data and return false. + /// As a corner case, the first value is not considered an update. + /// /// Otherwise, returns true. pub fn try_set_heartbeat(&mut self, heartbeat_new_value: Heartbeat) -> bool { + if self.heartbeat.0 == 0 { + // This is the first heartbeat. + // Let's set it, but we do not consider it as an update. + self.heartbeat = heartbeat_new_value; + return false; + } if heartbeat_new_value > self.heartbeat { self.heartbeat = heartbeat_new_value; true