Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

forget previous generation #108

Merged
merged 5 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ impl Chitchat {
}

pub(crate) fn create_syn_message(&self) -> ChitchatMessage {
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let digest = self.compute_digest(&dead_nodes);
let digest = self.compute_digest();
ChitchatMessage::Syn {
cluster_id: self.config.cluster_id.clone(),
digest,
Expand All @@ -107,7 +106,7 @@ impl Chitchat {
}
// Ensure for every reply from this node, at least the heartbeat is changed.
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let self_digest = self.compute_digest(&dead_nodes);
let self_digest = self.compute_digest();
let empty_delta = Delta::default();
let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE
- syn_ack_serialized_len(&self_digest, &empty_delta);
Expand Down Expand Up @@ -168,6 +167,7 @@ impl Chitchat {
}
} else {
self.failure_detector.report_unknown(chitchat_id);
self.failure_detector.update_node_liveness(chitchat_id);
}
}
}
Expand Down Expand Up @@ -280,8 +280,8 @@ impl Chitchat {
}

/// Computes the node's digest.
fn compute_digest(&self, dead_nodes: &HashSet<&ChitchatId>) -> Digest {
self.cluster_state.compute_digest(dead_nodes)
fn compute_digest(&self) -> Digest {
self.cluster_state.compute_digest()
}

/// Subscribes a callback that will be called every time a key matching the supplied prefix
Expand Down
10 changes: 5 additions & 5 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,11 @@ mod tests {
test_transport.send(server_addr, syn_ack).await.unwrap();

// Wait for delta to ensure heartbeat key was incremented.
let (_, chitchat_message) = timeout(test_transport.recv()).await.unwrap();
let delta = if let ChitchatMessage::Ack { delta } = chitchat_message {
delta
} else {
panic!("Expected ack");
let delta = loop {
let (_, chitchat_message) = timeout(test_transport.recv()).await.unwrap();
if let ChitchatMessage::Ack { delta } = chitchat_message {
break delta;
};
};

let node_delta = &delta.node_deltas.get(&server_id).unwrap();
Expand Down
37 changes: 22 additions & 15 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,11 @@ impl ClusterState {
}
}

pub fn compute_digest(&self, dead_nodes: &HashSet<&ChitchatId>) -> Digest {
pub fn compute_digest(&self) -> Digest {
Digest {
node_digests: self
.node_states
.iter()
.filter(|(chitchat_id, _)| !dead_nodes.contains(chitchat_id))
.map(|(chitchat_id, node_state)| (chitchat_id.clone(), node_state.digest()))
.collect(),
}
Expand Down Expand Up @@ -366,21 +365,39 @@ impl ClusterState {
}
let mut delta_writer = DeltaWriter::with_mtu(mtu);

for chitchat_id in nodes_to_reset {
if !delta_writer.add_node_to_reset(chitchat_id.clone()) {
for chitchat_id in &nodes_to_reset {
if !delta_writer.add_node_to_reset((*chitchat_id).clone()) {
break;
}
}
for stale_node in stale_nodes.into_iter() {
if !delta_writer.add_node(stale_node.chitchat_id.clone(), stale_node.heartbeat) {
break;
}
let mut added_something = false;
for (key, versioned_value) in stale_node.stale_key_values() {
added_something = true;
if !delta_writer.add_kv(key, versioned_value.clone()) {
let delta: Delta = delta_writer.into();
return delta;
}
}
if !added_something && nodes_to_reset.contains(&stale_node.chitchat_id) {
trinity-1686a marked this conversation as resolved.
Show resolved Hide resolved
// send a sentinel element to update the max_version. Otherwise the node's vision
// of max_version will be 0, and it may accept writes that are supposed to be
// stale, but it can tell they are.
if !delta_writer.add_kv(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch does not seem unit-tested.

"",
VersionedValue {
value: String::new(),
version: stale_node.node_state.max_version,
tombstone: Some(0),
},
) {
let delta: Delta = delta_writer.into();
return delta;
}
}
}
delta_writer.into()
}
Expand Down Expand Up @@ -817,23 +834,13 @@ mod tests {
node2_state.set("key_a", "");
node2_state.set("key_b", "");

let dead_nodes = HashSet::new();
let digest = cluster_state.compute_digest(&dead_nodes);
let digest = cluster_state.compute_digest();

let mut expected_node_digests = Digest::default();
expected_node_digests.add_node(node1.clone(), Heartbeat(0), 1);
expected_node_digests.add_node(node2.clone(), Heartbeat(0), 2);

assert_eq!(&digest, &expected_node_digests);

// Consider node 1 dead:
let dead_nodes = HashSet::from_iter([&node1]);
let digest = cluster_state.compute_digest(&dead_nodes);

let mut expected_node_digests = Digest::default();
expected_node_digests.add_node(node2.clone(), Heartbeat(0), 2);

assert_eq!(&digest, &expected_node_digests);
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion chitchat/tests/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() {
},
// Relink node 3
Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_3.clone()),
Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_2.clone()),
Operation::AddNetworkLink(chitchat_id_2.clone(), chitchat_id_3.clone()),
Operation::NodeStateAssert {
server_chitchat_id: chitchat_id_3.clone(),
chitchat_id: chitchat_id_1.clone(),
Expand Down
Loading