From 01288adebb8fdab2f09012825653b74041ab3cae Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Fri, 23 Feb 2024 11:03:02 -0500 Subject: [PATCH] Add catch-up callback --- chitchat-test/src/main.rs | 1 + chitchat/src/configuration.rs | 11 +- chitchat/src/delta.rs | 2 +- chitchat/src/failure_detector.rs | 6 ++ chitchat/src/lib.rs | 176 ++++++++++++++++++++++++++----- chitchat/src/message.rs | 16 +-- chitchat/src/server.rs | 8 +- chitchat/src/state.rs | 112 ++++++++++++-------- chitchat/src/transport/udp.rs | 4 +- chitchat/src/types.rs | 69 +++++++----- chitchat/tests/cluster_test.rs | 3 +- chitchat/tests/perf_test.rs | 3 +- 12 files changed, 293 insertions(+), 118 deletions(-) diff --git a/chitchat-test/src/main.rs b/chitchat-test/src/main.rs index b3373cf..56f2d76 100644 --- a/chitchat-test/src/main.rs +++ b/chitchat-test/src/main.rs @@ -100,6 +100,7 @@ async fn main() -> anyhow::Result<()> { ..FailureDetectorConfig::default() }, marked_for_deletion_grace_period: Duration::from_secs(60), + catchup_callback: None, extra_liveness_predicate: None, }; let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?; diff --git a/chitchat/src/configuration.rs b/chitchat/src/configuration.rs index 1792fc5..c04758a 100644 --- a/chitchat/src/configuration.rs +++ b/chitchat/src/configuration.rs @@ -5,8 +5,11 @@ use std::time::Duration; use crate::{ChitchatId, FailureDetectorConfig, NodeState}; -/// User-defined predicate liveness predication applied on top of the output of the failure -/// detector. +/// An optional user-defined callback executed when the self node is lagging behind. +pub type CatchupCallback = Box; + +/// An optional user-defined predicate liveness predication applied on top of the output of the +/// failure detector. pub type ExtraLivenessPredicate = Box bool + Send>; /// A struct for configuring a Chitchat instance. @@ -27,6 +30,8 @@ pub struct ChitchatConfig { // - Apply delta: for a node flagged "to be reset", Chitchat will remove the node state and // populate a fresh new node state with the keys and values present in the delta. pub marked_for_deletion_grace_period: Duration, + /// An optional callback executed when the self node is lagging behind. + pub catchup_callback: Option, // Extra lifeness predicate that can be used to define what a node being "live" means. // It can be used for instance, to only surface the nodes that are both alive according // to the failure detector, but also have a given set of required keys. @@ -46,6 +51,7 @@ impl ChitchatConfig { seed_nodes: Vec::new(), failure_detector_config: Default::default(), marked_for_deletion_grace_period: Duration::from_secs(10_000), + catchup_callback: None, extra_liveness_predicate: None, } } @@ -64,6 +70,7 @@ impl Default for ChitchatConfig { seed_nodes: Vec::new(), failure_detector_config: Default::default(), marked_for_deletion_grace_period: Duration::from_secs(3_600 * 2), // 2h + catchup_callback: None, extra_liveness_predicate: None, } } diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 79f9ccd..01efde7 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -185,7 +185,7 @@ impl<'a> Serializable for DeltaOpRef<'a> { key.serialize(buf); versioned_value.value.serialize(buf); versioned_value.version.serialize(buf); - versioned_value.tombstone.is_some().serialize(buf); + versioned_value.is_tombstone().serialize(buf); } Self::NodeToReset(chitchat_id) => { buf.push(DeltaOpTag::NodeToReset.into()); diff --git a/chitchat/src/failure_detector.rs b/chitchat/src/failure_detector.rs index 027dfa1..6b80c81 100644 --- a/chitchat/src/failure_detector.rs +++ b/chitchat/src/failure_detector.rs @@ -311,6 +311,12 @@ mod tests { use crate::failure_detector::{FailureDetector, FailureDetectorConfig}; use crate::ChitchatId; + impl FailureDetector { + pub fn contains_node(&self, chitchat_id: &ChitchatId) -> bool { + self.node_samples.contains_key(chitchat_id) + } + } + #[test] fn test_failure_detector_does_not_see_a_node_as_alive_with_a_single_heartbeat() { let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index f935fe7..8703029 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -24,7 +24,7 @@ pub use listener::ListenerHandle; pub use serialize::Serializable; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; -use tracing::{error, warn}; +use tracing::{error, info, warn}; pub use self::configuration::ChitchatConfig; pub use self::state::{ClusterStateSnapshot, NodeState}; @@ -103,17 +103,30 @@ impl Chitchat { } fn process_delta(&mut self, delta: Delta) { + self.maybe_trigger_catchup_callback(&delta); self.cluster_state.apply_delta(delta); } + + /// Executes the catchup callback if necessary. + fn maybe_trigger_catchup_callback(&self, delta: &Delta) { + if !delta.nodes_to_reset.is_empty() { + if let Some(catchup_callback) = &self.config.catchup_callback { + info!("executing catchup callback"); + catchup_callback(); + } + } + } + pub(crate) fn process_message(&mut self, msg: ChitchatMessage) -> Option { self.update_self_heartbeat(); + match msg { ChitchatMessage::Syn { cluster_id, digest } => { if cluster_id != self.cluster_id() { warn!( our_cluster_id=%self.cluster_id(), their_cluster_id=%cluster_id, - "Received SYN message addressed to a different cluster." + "received SYN message addressed to a different cluster" ); return Some(ChitchatMessage::BadCluster); } @@ -149,7 +162,7 @@ impl Chitchat { None } ChitchatMessage::BadCluster => { - warn!("Message rejected by peer: wrong cluster."); + warn!("message rejected by peer: wrong cluster"); None } } @@ -244,10 +257,14 @@ impl Chitchat { /// - updates its max version /// /// Heartbeats are not notified. - pub fn live_nodes_watcher(&self) -> WatchStream> { + pub fn live_nodes_watch_stream(&self) -> WatchStream> { WatchStream::new(self.live_nodes_watcher_rx.clone()) } + pub fn live_nodes_watcher(&self) -> watch::Receiver> { + self.live_nodes_watcher_rx.clone() + } + /// Returns the set of nodes considered dead by the failure detector. pub fn dead_nodes(&self) -> impl Iterator { self.failure_detector.dead_nodes() @@ -277,6 +294,34 @@ impl Chitchat { ClusterStateSnapshot::from(&self.cluster_state) } + /// Resets the entire node state. + /// + /// Updated key-values will see their listeners called. + /// The order of calls is arbitrary. + /// + /// Existing key-values that are not present in `key_values` will be deleted + /// (not marked with a tombstone). + pub fn reset_node_state( + &mut self, + chitchat_id: &ChitchatId, + key_values: impl Iterator, + max_version: Version, + last_gc_version: Version, + ) { + let node_state = self.cluster_state.node_state_mut(chitchat_id); + + // If the node is new, we must ensure that the failure detector is aware of it. + if node_state.max_version() == 0 { + self.failure_detector.report_heartbeat(chitchat_id); + } + node_state.retain_key_values(|_key, value| value.version > max_version); + node_state.set_last_gc_version(last_gc_version); + + for (key, value) in key_values { + node_state.set_versioned_value(key, value) + } + } + pub(crate) fn update_self_heartbeat(&mut self) { self.self_node_state().inc_heartbeat(); } @@ -362,10 +407,14 @@ mod tests { assert!(peer_node.process_message(ack_message).is_none()); } + /// Checks that all of the non-deleted key-values pairs are the same in + /// lhs and rhs. + /// + /// This does NOT check for deleted KVs. fn assert_cluster_state_eq(lhs: &NodeState, rhs: &NodeState) { assert_eq!(lhs.num_key_values(), rhs.num_key_values()); for (key, value) in lhs.key_values() { - assert_eq!(rhs.get_versioned(key), Some(value)); + assert_eq!(rhs.get(key), Some(value)); } } @@ -408,6 +457,7 @@ mod tests { ..Default::default() }, marked_for_deletion_grace_period: Duration::from_secs(3_600), + catchup_callback: None, extra_liveness_predicate: None, }; start_node_with_config(transport, config).await @@ -447,7 +497,7 @@ mod tests { chitchat .lock() .await - .live_nodes_watcher() + .live_nodes_watch_stream() .skip_while(|live_nodes| { if live_nodes.len() != expected_nodes.len() { return true; @@ -502,11 +552,11 @@ mod tests { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); let nodes = setup_nodes(20001..=20005, &transport).await; let node2 = nodes.get(1).unwrap(); - let mut live_rx = node2.chitchat().lock().await.live_nodes_watcher(); + let mut live_nodes_stream = node2.chitchat().lock().await.live_nodes_watch_stream(); let live_members = loop { - let live_members = live_rx.next().await.unwrap(); - if live_members.len() == 5 { - break live_members; + let live_nodes = live_nodes_stream.next().await.unwrap(); + if live_nodes.len() == 5 { + break live_nodes; } }; for node in &nodes { @@ -532,6 +582,7 @@ mod tests { ..Default::default() }, marked_for_deletion_grace_period: Duration::from_secs(3_600), + catchup_callback: None, extra_liveness_predicate: Some(Box::new(|node_state| { node_state.get("READY") == Some("true") })), @@ -543,20 +594,20 @@ mod tests { nodes.push(chitchat_handle); } - let mut num_nodes = 0; + let mut num_live_nodes = 0; assert!(tokio::time::timeout(Duration::from_secs(1), async { - let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher(); + let mut live_nodes_stream = nodes[2].chitchat().lock().await.live_nodes_watch_stream(); loop { - let live_members = live_rx.next().await.unwrap(); - num_nodes = live_members.len(); - if live_members.len() == 3 { - break live_members; + let live_nodes = live_nodes_stream.next().await.unwrap(); + num_live_nodes = live_nodes.len(); + if live_nodes.len() == 3 { + break live_nodes; } } }) .await .is_err()); - assert_eq!(num_nodes, 0); + assert_eq!(num_live_nodes, 0); nodes[0] .chitchat() @@ -577,11 +628,11 @@ mod tests { .self_node_state() .set("READY", "true"); - let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher(); + let mut live_nodes_stream = nodes[2].chitchat().lock().await.live_nodes_watch_stream(); let live_members = loop { - let live_members = live_rx.next().await.unwrap(); - if live_members.len() == 3 { - break live_members; + let live_nodes = live_nodes_stream.next().await.unwrap(); + if live_nodes.len() == 3 { + break live_nodes; } }; for node in &nodes { @@ -596,9 +647,9 @@ mod tests { .mark_for_deletion("READY"); let live_members = loop { - let live_members = live_rx.next().await.unwrap(); - if live_members.len() == 2 { - break live_members; + let live_nodes = live_nodes_stream.next().await.unwrap(); + if live_nodes.len() == 2 { + break live_nodes; } }; assert!(live_members.contains_key(&chitchat_ids[1])); @@ -612,9 +663,9 @@ mod tests { .set("READY", "false"); let live_members = loop { - let live_members = live_rx.next().await.unwrap(); - if live_members.len() == 1 { - break live_members; + let live_nodes = live_nodes_stream.next().await.unwrap(); + if live_nodes.len() == 1 { + break live_nodes; } }; @@ -968,4 +1019,75 @@ mod tests { assert_eq!(counter_self_key.load(Ordering::SeqCst), 2); assert_eq!(counter_other_key.load(Ordering::SeqCst), 1); } + + #[tokio::test] + async fn test_maybe_trigger_catchup_callback() { + let catchup_callback_counter = Arc::new(AtomicUsize::new(0)); + let catchup_callback_counter_clone = catchup_callback_counter.clone(); + + let mut config = ChitchatConfig::for_test(10_001); + config.catchup_callback = Some(Box::new(move || { + catchup_callback_counter_clone.fetch_add(1, Ordering::Release); + })); + let (_seed_addrs_rx, seed_addrs_tx) = watch::channel(Default::default()); + + let mut node = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs_tx, Vec::new()); + let delta = Delta::default(); + node.process_delta(delta); + + let mut delta = Delta::default(); + delta.add_node_to_reset(ChitchatId::for_local_test(10_002)); + node.process_delta(delta); + + assert_eq!(catchup_callback_counter.load(Ordering::Acquire), 1); + } + + #[tokio::test] + async fn test_reset_node_state() { + let config = ChitchatConfig::for_test(10_001); + let (_seed_addrs_rx, seed_addrs_tx) = watch::channel(Default::default()); + let mut node = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs_tx, Vec::new()); + + let chitchat_id = ChitchatId::for_local_test(10_002); + node.reset_node_state( + &chitchat_id, + [( + "foo".to_string(), + VersionedValue::new("bar".to_string(), 1, false), + )] + .into_iter(), + 1, + 1337, + ); + node.failure_detector.contains_node(&chitchat_id); + + let node_state = node.cluster_state.node_state(&chitchat_id).unwrap(); + assert_eq!(node_state.num_key_values(), 1); + assert_eq!(node_state.get("foo"), Some("bar")); + assert_eq!(node_state.max_version(), 1); + assert_eq!(node_state.last_gc_version(), 1337); + + let chitchat_id = ChitchatId::for_local_test(10_003); + let node_state = node.cluster_state.node_state_mut(&chitchat_id); + node_state.set("foo", "bar"); + node_state.set("qux", "baz"); + node_state.set("toto", "titi"); + + node.reset_node_state( + &chitchat_id, + [( + "qux".to_string(), + VersionedValue::new("baz".to_string(), 2, false), + )] + .into_iter(), + 2, + 1337, + ); + let node_state = node.cluster_state.node_state(&chitchat_id).unwrap(); + assert_eq!(node_state.num_key_values(), 2); + assert_eq!(node_state.get("qux"), Some("baz")); + assert_eq!(node_state.get("toto"), Some("titi")); + assert_eq!(node_state.max_version(), 3); + assert_eq!(node_state.last_gc_version(), 1337); + } } diff --git a/chitchat/src/message.rs b/chitchat/src/message.rs index d2b2caa..b79e67b 100644 --- a/chitchat/src/message.rs +++ b/chitchat/src/message.rs @@ -11,17 +11,17 @@ use crate::serialize::{Deserializable, Serializable}; /// Each variant represents a step of the gossip "handshake" /// between node A and node B. /// The names {SYN, SYN-ACK, ACK} of the different steps are borrowed from -/// TCP Handshake. +/// TCP handshake. #[derive(Debug, Eq, PartialEq)] pub enum ChitchatMessage { - /// Node A initiates a handshake and sends its digest. + /// Scuttlebutt SYN: node A initiates a handshake and sends its digest. Syn { cluster_id: String, digest: Digest }, - /// Node B returns a partial update as described - /// in the Scuttlebutt reconciliation algorithm, - /// and returns its own digest. + /// Scuttlebutt SYN-ACK: node B returns a partial update as described in the Scuttlebutt + /// reconciliation algorithm and its own digest. SynAck { digest: Digest, delta: Delta }, - /// Node A returns a partial update for B. + /// Scuttlebutt ACK: node A returns a partial update for B. Ack { delta: Delta }, + /// Node B rejects the SYN message because node A and B belong to different clusters. BadCluster, } @@ -91,9 +91,9 @@ impl Deserializable for ChitchatMessage { fn deserialize(buf: &mut &[u8]) -> anyhow::Result { let code = buf .first() - .cloned() + .copied() .and_then(MessageType::from_code) - .context("Invalid message type")?; + .context("invalid message type")?; buf.consume(1); match code { MessageType::Syn => { diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index fbf848d..9fddbce 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -65,12 +65,12 @@ async fn resolve_seed_host(seed_host: &str, seed_addrs: &mut HashSet Ok(resolved_seed_addrs) => { for seed_addr in resolved_seed_addrs { if seed_addrs.insert(seed_addr) { - debug!(seed_host=%seed_host, seed_addr=%seed_addr, "Resolved peer seed host."); + debug!(seed_host=%seed_host, seed_addr=%seed_addr, "resolved peer seed host"); } } } Err(error) => { - warn!(seed_host=%seed_host, error=?error, "Failed to lookup host."); + warn!(seed_host=%seed_host, error=?error, "failed to lookup host"); } }; } @@ -201,7 +201,7 @@ impl Server { chitchat: Arc>, transport: Box, ) -> Self { - let rng = SmallRng::from_rng(thread_rng()).expect("Failed to seed random generator"); + let rng = SmallRng::from_rng(thread_rng()).expect("failed to seed random generator"); Self { chitchat, command_rx, @@ -633,7 +633,7 @@ mod tests { .chitchat() .lock() .await - .live_nodes_watcher() + .live_nodes_watch_stream() .skip_while(|live_nodes| live_nodes.is_empty()); { diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 4115c73..125f224 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -21,7 +21,7 @@ use crate::{ChitchatId, Heartbeat, KeyChangeEvent, Version, VersionedValue}; #[derive(Clone, Serialize, Deserialize)] pub struct NodeState { - node_id: ChitchatId, + chitchat_id: ChitchatId, heartbeat: Heartbeat, key_values: BTreeMap, max_version: Version, @@ -45,9 +45,9 @@ impl Debug for NodeState { } impl NodeState { - fn new(node_id: ChitchatId, listeners: Listeners) -> NodeState { + fn new(chitchat_id: ChitchatId, listeners: Listeners) -> NodeState { NodeState { - node_id, + chitchat_id, heartbeat: Heartbeat(0), key_values: Default::default(), max_version: 0u64, @@ -56,9 +56,17 @@ impl NodeState { } } + pub fn last_gc_version(&self) -> Version { + self.last_gc_version + } + + pub(crate) fn set_last_gc_version(&mut self, last_gc_version: Version) { + self.last_gc_version = last_gc_version; + } + pub fn for_test() -> NodeState { NodeState { - node_id: ChitchatId { + chitchat_id: ChitchatId { node_id: "test-node".to_string(), generation_id: 0, gossip_advertise_addr: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 7280), @@ -81,11 +89,19 @@ impl NodeState { self.max_version } - /// Returns an iterator over the keys matching the given predicate, excluding keys marked for - /// deletion. - pub fn key_values(&self) -> impl Iterator { - self.internal_iter_key_values() - .filter(|&(_, versioned_value)| versioned_value.tombstone.is_none()) + /// Returns an iterator over keys matching the given predicate. + /// Disclaimer: This also returns keys marked for deletion. + pub fn key_values_including_deleted(&self) -> impl Iterator { + self.key_values + .iter() + .map(|(key, versioned_value)| (key.as_str(), versioned_value)) + } + + /// Returns an iterator over all of the (non-deleted) key-values. + pub fn key_values(&self) -> impl Iterator { + self.key_values_including_deleted() + .filter(|(_, versioned_value)| !versioned_value.is_tombstone()) + .map(|(key, versioned_value)| (key, versioned_value.value.as_str())) } /// Returns key values matching a prefix @@ -97,16 +113,13 @@ impl NodeState { self.key_values .range::(range) .take_while(move |(key, _)| key.starts_with(prefix)) - .map(|(key, record)| (key.as_str(), record)) - .filter(|&(_, versioned_value)| versioned_value.tombstone.is_none()) + .filter(|&(_, versioned_value)| !versioned_value.is_tombstone()) + .map(|(key, versioned_value)| (key.as_str(), versioned_value)) } /// Returns the number of key-value pairs, excluding keys marked for deletion. pub fn num_key_values(&self) -> usize { - self.key_values - .values() - .filter(|versioned_value| versioned_value.tombstone.is_none()) - .count() + self.key_values().count() } /// Returns false if the key is inexistant or marked for deletion. @@ -116,7 +129,7 @@ impl NodeState { pub fn get(&self, key: &str) -> Option<&str> { let versioned_value = self.get_versioned(key)?; - if versioned_value.tombstone.is_some() { + if versioned_value.is_tombstone() { return None; } Some(versioned_value.value.as_str()) @@ -151,9 +164,9 @@ impl NodeState { return; }; self.max_version += 1; - versioned_value.tombstone = Some(Instant::now()); versioned_value.version = self.max_version; versioned_value.value = "".to_string(); + versioned_value.tombstone = Some(Instant::now()); } pub(crate) fn inc_heartbeat(&mut self) { @@ -179,35 +192,37 @@ impl NodeState { } } - /// Returns an iterator over keys matching the given predicate. - /// Not public as it returns also keys marked for deletion. - fn internal_iter_key_values(&self) -> impl Iterator { - self.key_values - .iter() - .map(|(key, record)| (key.as_str(), record)) - } - /// Removes the keys marked for deletion such that `tombstone + grace_period > heartbeat`. fn gc_keys_marked_for_deletion(&mut self, grace_period: Duration) { let now = Instant::now(); let mut max_deleted_version = self.last_gc_version; - self.key_values - .retain(|_, versioned_value: &mut VersionedValue| { - let Some(deleted_instant) = versioned_value.tombstone else { - // The KV is not deleted. We keep it! - return true; - }; - if now < deleted_instant + grace_period { - // We haved not passed the grace period yet. We keep it! - return true; - } - // We have exceeded the tombstone grace period. Time to remove it. - max_deleted_version = versioned_value.version.max(max_deleted_version); - false - }); + self.retain_key_values(|_, versioned_value: &VersionedValue| { + let Some(deleted_instant) = versioned_value.tombstone else { + // The KV is not deleted. We keep it! + return true; + }; + if now < deleted_instant + grace_period { + // We haved not passed the grace period yet. We keep it! + return true; + } + // We have exceeded the tombstone grace period. Time to remove it. + max_deleted_version = versioned_value.version.max(max_deleted_version); + false + }); self.last_gc_version = max_deleted_version; } + /// Retains the key-value pairs for which the predicate returns `true` and removes the other + /// ones definitively. In other words, this method does not add tombstones. + /// + /// Most often, you don't want to call this method but rather `mark_for_deletion`. + pub(crate) fn retain_key_values( + &mut self, + mut predicate: impl FnMut(&str, &VersionedValue) -> bool, + ) { + self.key_values.retain(|key, value| predicate(key, value)); + } + /// Returns an iterator over the versioned values that are strictly greater than /// `floor_version`. The floor version typically comes from the max version of a digest. /// @@ -217,22 +232,27 @@ impl NodeState { floor_version: u64, ) -> impl Iterator { // TODO optimize by checking the max version. - self.internal_iter_key_values() + self.key_values_including_deleted() .filter(move |(_key, versioned_value)| versioned_value.version > floor_version) } /// Sets a new versioned value to associate to a given key. - /// This operation is ignored if the key value inserted has a version that is obsolete. + /// This operation is ignored if the key value inserted has a version that is obsolete. /// /// This method also update the max_version if necessary. - fn set_versioned_value(&mut self, key: String, versioned_value_update: VersionedValue) { + pub(crate) fn set_versioned_value( + &mut self, + key: String, + versioned_value_update: VersionedValue, + ) { let key_clone = key.clone(); let key_change_event = KeyChangeEvent { key: key_clone.as_str(), value: &versioned_value_update.value, - node: &self.node_id, + node: &self.chitchat_id, }; self.max_version = versioned_value_update.version.max(self.max_version); + match self.key_values.entry(key) { Entry::Occupied(mut occupied) => { let occupied_versioned_value = occupied.get_mut(); @@ -246,7 +266,7 @@ impl NodeState { vacant.insert(versioned_value_update.clone()); } }; - if versioned_value_update.tombstone.is_none() { + if !versioned_value_update.is_tombstone() { self.listeners.trigger_event(key_change_event); } } @@ -973,7 +993,7 @@ mod tests { let versioned_value = node_state.get_versioned("key").unwrap(); assert_eq!(&versioned_value.value, ""); assert_eq!(versioned_value.version, 2u64); - assert!(&versioned_value.tombstone.is_some()); + assert!(&versioned_value.is_tombstone()); } // Overriding the same key @@ -982,7 +1002,7 @@ mod tests { let versioned_value = node_state.get_versioned("key").unwrap(); assert_eq!(&versioned_value.value, "2"); assert_eq!(versioned_value.version, 3u64); - assert!(&versioned_value.tombstone.is_none()); + assert!(!versioned_value.is_tombstone()); } } diff --git a/chitchat/src/transport/udp.rs b/chitchat/src/transport/udp.rs index 43edb0f..0715049 100644 --- a/chitchat/src/transport/udp.rs +++ b/chitchat/src/transport/udp.rs @@ -28,7 +28,7 @@ impl UdpSocket { pub async fn open(bind_addr: SocketAddr) -> anyhow::Result { let socket = tokio::net::UdpSocket::bind(bind_addr) .await - .with_context(|| format!("Failed to bind to {bind_addr}/UDP for gossip."))?; + .with_context(|| format!("failed to bind to {bind_addr}/UDP for gossip"))?; Ok(UdpSocket { buf_send: Vec::with_capacity(MAX_UDP_DATAGRAM_PAYLOAD_SIZE), buf_recv: Box::new([0u8; MAX_UDP_DATAGRAM_PAYLOAD_SIZE]), @@ -81,7 +81,7 @@ impl UdpSocket { self.socket .send_to(payload, to_addr) .await - .context("Failed to send chitchat message to target")?; + .context("failed to send chitchat message to peer")?; Ok(()) } } diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index 0367cbc..fc6a2a3 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -1,4 +1,5 @@ -use std::{fmt::Debug, net::SocketAddr}; +use std::fmt::Debug; +use std::net::SocketAddr; use serde::{Deserialize, Serialize}; use tokio::time::Instant; @@ -26,7 +27,11 @@ pub struct ChitchatId { impl Debug for ChitchatId { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}:{}:{}", self.node_id.as_str(), self.generation_id, self.gossip_advertise_addr) + write!( + f, + "{}:{}:{}", + &self.node_id, self.generation_id, self.gossip_advertise_addr + ) } } @@ -64,14 +69,41 @@ pub struct VersionedValue { pub version: Version, // The tombstone instant is transient: // Only the presence of a tombstone or not is serialized, and used in partial eq eq. - pub tombstone: Option, + pub(crate) tombstone: Option, +} + +impl VersionedValue { + pub fn new(value: String, version: Version, is_tombstone: bool) -> VersionedValue { + VersionedValue { + value, + version, + tombstone: if is_tombstone { + Some(Instant::now()) + } else { + None + }, + } + } + + pub fn is_tombstone(&self) -> bool { + self.tombstone.is_some() + } + + #[cfg(test)] + pub fn for_test(value: &str, version: Version) -> Self { + Self { + value: value.to_string(), + version, + tombstone: None, + } + } } impl PartialEq for VersionedValue { fn eq(&self, other: &Self) -> bool { self.value.eq(&other.value) && self.version.eq(&other.version) - && self.tombstone.is_some().eq(&other.tombstone.is_some()) + && self.is_tombstone().eq(&other.is_tombstone()) } } @@ -79,20 +111,16 @@ impl PartialEq for VersionedValue { struct VersionedValueForSerialization { pub value: String, pub version: Version, - pub tombstone: bool, + pub is_tombstone: bool, } impl From for VersionedValue { fn from(versioned_value: VersionedValueForSerialization) -> Self { - VersionedValue { - value: versioned_value.value, - version: versioned_value.version, - tombstone: if versioned_value.tombstone { - Some(Instant::now()) - } else { - None - }, - } + VersionedValue::new( + versioned_value.value, + versioned_value.version, + versioned_value.is_tombstone, + ) } } @@ -101,18 +129,7 @@ impl From for VersionedValueForSerialization { VersionedValueForSerialization { value: versioned_value.value, version: versioned_value.version, - tombstone: versioned_value.tombstone.is_some(), - } - } -} - -#[cfg(test)] -impl VersionedValue { - pub fn for_test(value: &str, version: Version) -> Self { - Self { - value: value.to_string(), - version, - tombstone: None, + is_tombstone: versioned_value.tombstone.is_some(), } } } diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index ea4b9a2..d6c6533 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -58,7 +58,7 @@ impl NodeStatePredicate { } NodeStatePredicate::MarkedForDeletion(key, marked) => { debug!(key=%key, marked=marked, "assert-key-marked-for-deletion"); - node_state.get_versioned(key).unwrap().tombstone.is_some() == *marked + node_state.get_versioned(key).unwrap().is_tombstone() == *marked } } } @@ -215,6 +215,7 @@ impl Simulator { ..Default::default() }, marked_for_deletion_grace_period: self.marked_for_deletion_key_grace_period, + catchup_callback: None, extra_liveness_predicate: None, }; let handle = spawn_chitchat(config, Vec::new(), &self.transport) diff --git a/chitchat/tests/perf_test.rs b/chitchat/tests/perf_test.rs index 9f3758c..ad7ef02 100644 --- a/chitchat/tests/perf_test.rs +++ b/chitchat/tests/perf_test.rs @@ -29,6 +29,7 @@ async fn spawn_one(chitchat_id: u16, transport: &dyn Transport) -> ChitchatHandl ..Default::default() }, marked_for_deletion_grace_period: Duration::from_secs(10_000), + catchup_callback: None, extra_liveness_predicate: None, }; spawn_chitchat(config, Vec::new(), transport).await.unwrap() @@ -48,7 +49,7 @@ async fn wait_until) -> bool>( predicate: P, ) -> Duration { let start = Instant::now(); - let mut node_watcher = handle.chitchat().lock().await.live_nodes_watcher(); + let mut node_watcher = handle.chitchat().lock().await.live_nodes_watch_stream(); while let Some(nodes) = node_watcher.next().await { if predicate(&nodes) { break;