From a99d609666d242143a07cefd1002a69c434a9c93 Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Fri, 23 Feb 2024 13:42:02 -0500 Subject: [PATCH] Expose live nodes `watch::Receiver` --- chitchat/src/lib.rs | 18 +++++++++++------- chitchat/src/server.rs | 2 +- chitchat/tests/perf_test.rs | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 8b153ec..3ce6793 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -109,8 +109,8 @@ impl Chitchat { /// Trigggers the catchup callback if the self node is included in the nodes to reset. fn maybe_trigger_catchup_callback(&self, delta: &Delta) { - if let Some(catchup_callback) = &self.config.catchup_callback { - if delta.nodes_to_reset.contains(self.self_chitchat_id()) { + if !delta.nodes_to_reset.is_empty() { + if let Some(catchup_callback) = &self.config.catchup_callback { info!("triggering catchup callback"); catchup_callback(); } @@ -257,10 +257,14 @@ impl Chitchat { /// - updates its max version /// /// Heartbeats are not notified. - pub fn live_nodes_watcher(&self) -> WatchStream> { + pub fn live_nodes_watch_stream(&self) -> WatchStream> { WatchStream::new(self.live_nodes_watcher_rx.clone()) } + pub fn live_nodes_watcher(&self) -> watch::Receiver> { + self.live_nodes_watcher_rx.clone() + } + /// Returns the set of nodes considered dead by the failure detector. pub fn dead_nodes(&self) -> impl Iterator { self.failure_detector.dead_nodes() @@ -475,7 +479,7 @@ mod tests { chitchat .lock() .await - .live_nodes_watcher() + .live_nodes_watch_stream() .skip_while(|live_nodes| { if live_nodes.len() != expected_nodes.len() { return true; @@ -530,7 +534,7 @@ mod tests { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); let nodes = setup_nodes(20001..=20005, &transport).await; let node2 = nodes.get(1).unwrap(); - let mut live_rx = node2.chitchat().lock().await.live_nodes_watcher(); + let mut live_rx = node2.chitchat().lock().await.live_nodes_watch_stream(); let live_members = loop { let live_members = live_rx.next().await.unwrap(); if live_members.len() == 5 { @@ -574,7 +578,7 @@ mod tests { let mut num_nodes = 0; assert!(tokio::time::timeout(Duration::from_secs(1), async { - let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher(); + let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watch_stream(); loop { let live_members = live_rx.next().await.unwrap(); num_nodes = live_members.len(); @@ -606,7 +610,7 @@ mod tests { .self_node_state() .set("READY", "true"); - let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher(); + let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watch_stream(); let live_members = loop { let live_members = live_rx.next().await.unwrap(); if live_members.len() == 3 { diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 148cdde..9fddbce 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -633,7 +633,7 @@ mod tests { .chitchat() .lock() .await - .live_nodes_watcher() + .live_nodes_watch_stream() .skip_while(|live_nodes| live_nodes.is_empty()); { diff --git a/chitchat/tests/perf_test.rs b/chitchat/tests/perf_test.rs index b6d6083..ad7ef02 100644 --- a/chitchat/tests/perf_test.rs +++ b/chitchat/tests/perf_test.rs @@ -49,7 +49,7 @@ async fn wait_until) -> bool>( predicate: P, ) -> Duration { let start = Instant::now(); - let mut node_watcher = handle.chitchat().lock().await.live_nodes_watcher(); + let mut node_watcher = handle.chitchat().lock().await.live_nodes_watch_stream(); while let Some(nodes) = node_watcher.next().await { if predicate(&nodes) { break;