diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index 890d9cb..5eedc50 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -86,8 +86,7 @@ impl Chitchat { } pub(crate) fn create_syn_message(&self) -> ChitchatMessage { - let dead_nodes: HashSet<_> = self.dead_nodes().collect(); - let digest = self.compute_digest(&dead_nodes); + let digest = self.compute_digest(); ChitchatMessage::Syn { cluster_id: self.config.cluster_id.clone(), digest, @@ -107,7 +106,7 @@ impl Chitchat { } // Ensure for every reply from this node, at least the heartbeat is changed. let dead_nodes: HashSet<_> = self.dead_nodes().collect(); - let self_digest = self.compute_digest(&dead_nodes); + let self_digest = self.compute_digest(); let empty_delta = Delta::default(); let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - syn_ack_serialized_len(&self_digest, &empty_delta); @@ -168,6 +167,7 @@ impl Chitchat { } } else { self.failure_detector.report_unknown(chitchat_id); + self.failure_detector.update_node_liveness(chitchat_id); } } } @@ -280,8 +280,8 @@ impl Chitchat { } /// Computes the node's digest. - fn compute_digest(&self, dead_nodes: &HashSet<&ChitchatId>) -> Digest { - self.cluster_state.compute_digest(dead_nodes) + fn compute_digest(&self) -> Digest { + self.cluster_state.compute_digest() } /// Subscribes a callback that will be called every time a key matching the supplied prefix diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 3602c0f..43ad6dc 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -594,11 +594,11 @@ mod tests { test_transport.send(server_addr, syn_ack).await.unwrap(); // Wait for delta to ensure heartbeat key was incremented. - let (_, chitchat_message) = timeout(test_transport.recv()).await.unwrap(); - let delta = if let ChitchatMessage::Ack { delta } = chitchat_message { - delta - } else { - panic!("Expected ack"); + let delta = loop { + let (_, chitchat_message) = timeout(test_transport.recv()).await.unwrap(); + if let ChitchatMessage::Ack { delta } = chitchat_message { + break delta; + }; }; let node_delta = &delta.node_deltas.get(&server_id).unwrap(); diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 7a71e14..35384a0 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -313,12 +313,11 @@ impl ClusterState { } } - pub fn compute_digest(&self, dead_nodes: &HashSet<&ChitchatId>) -> Digest { + pub fn compute_digest(&self) -> Digest { Digest { node_digests: self .node_states .iter() - .filter(|(chitchat_id, _)| !dead_nodes.contains(chitchat_id)) .map(|(chitchat_id, node_state)| (chitchat_id.clone(), node_state.digest())) .collect(), } @@ -366,8 +365,8 @@ impl ClusterState { } let mut delta_writer = DeltaWriter::with_mtu(mtu); - for chitchat_id in nodes_to_reset { - if !delta_writer.add_node_to_reset(chitchat_id.clone()) { + for chitchat_id in &nodes_to_reset { + if !delta_writer.add_node_to_reset((*chitchat_id).clone()) { break; } } @@ -375,12 +374,30 @@ impl ClusterState { if !delta_writer.add_node(stale_node.chitchat_id.clone(), stale_node.heartbeat) { break; } + let mut added_something = false; for (key, versioned_value) in stale_node.stale_key_values() { + added_something = true; if !delta_writer.add_kv(key, versioned_value.clone()) { let delta: Delta = delta_writer.into(); return delta; } } + if !added_something && nodes_to_reset.contains(&stale_node.chitchat_id) { + // send a sentinel element to update the max_version. Otherwise the node's vision + // of max_version will be 0, and it may accept writes that are supposed to be + // stale, but it can tell they are. + if !delta_writer.add_kv( + "", + VersionedValue { + value: String::new(), + version: stale_node.node_state.max_version, + tombstone: Some(0), + }, + ) { + let delta: Delta = delta_writer.into(); + return delta; + } + } } delta_writer.into() } @@ -817,23 +834,13 @@ mod tests { node2_state.set("key_a", ""); node2_state.set("key_b", ""); - let dead_nodes = HashSet::new(); - let digest = cluster_state.compute_digest(&dead_nodes); + let digest = cluster_state.compute_digest(); let mut expected_node_digests = Digest::default(); expected_node_digests.add_node(node1.clone(), Heartbeat(0), 1); expected_node_digests.add_node(node2.clone(), Heartbeat(0), 2); assert_eq!(&digest, &expected_node_digests); - - // Consider node 1 dead: - let dead_nodes = HashSet::from_iter([&node1]); - let digest = cluster_state.compute_digest(&dead_nodes); - - let mut expected_node_digests = Digest::default(); - expected_node_digests.add_node(node2.clone(), Heartbeat(0), 2); - - assert_eq!(&digest, &expected_node_digests); } #[test] diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index a41906d..a711f11 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -426,7 +426,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { }, // Relink node 3 Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()), - Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_2.clone()), + Operation::AddNetworkLink(chitchat_id_2.clone(), chitchat_id_3.clone()), Operation::NodeStateAssert { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(),