Skip to content

Commit

Permalink
Ignore updates from node from previous generations
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Mar 7, 2024
1 parent 57fba8b commit eb8ec27
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 22 deletions.
2 changes: 1 addition & 1 deletion quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ base64 = "0.21"
bytes = { version = "1", features = ["serde"] }
bytesize = { version = "1.3.0", features = ["serde"] }
bytestring = "1.3.0"
chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "867207e" }

chitchat = { git = "https://github.com/quickwit-oss/chitchat.git", rev = "78f8aff" }
chrono = { version = "0.4", default-features = false, features = [
"clock",
"std",
Expand Down
62 changes: 57 additions & 5 deletions quickwit/quickwit-cluster/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async fn compute_cluster_change_events_on_added(
warn!(
node_id=%new_chitchat_id.node_id,
generation_id=%new_chitchat_id.generation_id,
"node `{}` has rejoined the cluster with a lower generation ID and will be ignored",
"ignoring node `{}` rejoining the cluster with a lower generation ID",
new_chitchat_id.node_id
);
return events;
Expand Down Expand Up @@ -201,6 +201,16 @@ async fn compute_cluster_change_events_on_updated(
previous_nodes: &mut BTreeMap<NodeId, ClusterNode>,
) -> Option<ClusterChange> {
let previous_node = previous_nodes.get(&updated_chitchat_id.node_id)?.clone();

if previous_node.chitchat_id().generation_id > updated_chitchat_id.generation_id {
warn!(
node_id=%updated_chitchat_id.node_id,
generation_id=%updated_chitchat_id.generation_id,
"ignoring node `{}` update with a lower generation ID",
updated_chitchat_id.node_id
);
return None;
}
let previous_channel = previous_node.channel();
let is_self_node = self_chitchat_id == updated_chitchat_id;
let updated_node = try_new_node_with_channel(
Expand Down Expand Up @@ -337,7 +347,7 @@ pub mod for_test {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::collections::HashSet;
use std::net::SocketAddr;

Expand All @@ -362,9 +372,7 @@ mod tests {
fn default() -> Self {
Self {
enabled_services: QuickwitService::supported_services(),
grpc_advertise_addr: "127.0.0.1:7281"
.parse()
.expect("`127.0.0.1:7281` should be a valid socket address"),
grpc_advertise_addr: "127.0.0.1:7281".parse().unwrap(),
readiness: false,
key_values: Vec::new(),
}
Expand Down Expand Up @@ -721,6 +729,50 @@ mod tests {
&node
);
}
{
// Ignore node update with a lower generation ID.
let port = 1235;
let grpc_advertise_addr: SocketAddr = ([127, 0, 0, 1], port + 1).into();
let updated_chitchat_id = ChitchatId::for_local_test(port);
let updated_node_id: NodeId = updated_chitchat_id.node_id.clone().into();
let mut previous_chitchat_id = updated_chitchat_id.clone();
previous_chitchat_id.generation_id += 1;
let previous_node_state = NodeStateBuilder::default()
.with_grpc_advertise_addr(grpc_advertise_addr)
.with_readiness(true)
.build();
let previous_channel = Channel::from_static("http://127.0.0.1:12345/").connect_lazy();
let is_self_node = true;
let previous_node = ClusterNode::try_new(
previous_chitchat_id.clone(),
&previous_node_state,
previous_channel,
is_self_node,
)
.unwrap();
let mut previous_nodes =
BTreeMap::from_iter([(updated_node_id, previous_node.clone())]);

let updated_node_state = NodeStateBuilder::default()
.with_grpc_advertise_addr(grpc_advertise_addr)
.with_readiness(false)
.with_key_value("my-key", "my-value")
.build();
let event_opt = compute_cluster_change_events_on_updated(
&cluster_id,
&self_chitchat_id,
&updated_chitchat_id,
&updated_node_state,
&mut previous_nodes,
)
.await;
assert!(event_opt.is_none());

assert_eq!(
previous_nodes.get(&updated_chitchat_id.node_id).unwrap(),
&previous_node
);
}
}

#[tokio::test]
Expand Down
67 changes: 53 additions & 14 deletions quickwit/quickwit-cluster/src/grpc_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ async fn wait_for_gossip_candidates(
.skip_while(|node_states| {
node_states.len() < MAX_GOSSIP_ROUNDS
&& node_states
.iter()
.filter(|(chitchat_id, node_state)| {
*chitchat_id != self_chitchat_id && is_candidate_for_gossip(node_state)
.values()
.filter(|node_state| {
find_gossip_candidate_grpc_addr(self_chitchat_id, node_state).is_some()
})
.count()
< MAX_GOSSIP_ROUNDS
Expand All @@ -177,23 +177,27 @@ fn select_gossip_candidates(
) -> (Vec<String>, Vec<SocketAddr>) {
live_nodes_rx
.borrow()
.iter()
.filter_map(|(chitchat_id, node_state)| {
if chitchat_id != self_chitchat_id && node_state.is_ready() {
if let Ok(grpc_advertise_addr) = node_state.grpc_advertise_addr() {
return Some((&chitchat_id.node_id, grpc_advertise_addr));
}
}
None
.values()
.filter_map(|node_state| {
find_gossip_candidate_grpc_addr(self_chitchat_id, node_state)
.map(|grpc_addr| (&node_state.chitchat_id().node_id, grpc_addr))
})
.choose_multiple(&mut rand::thread_rng(), MAX_GOSSIP_ROUNDS)
.into_iter()
.map(|(node_id, grpc_advertise_addr)| (node_id.clone(), grpc_advertise_addr))
.map(|(node_id, grpc_addr)| (node_id.clone(), grpc_addr))
.unzip()
}

fn is_candidate_for_gossip(node_state: &NodeState) -> bool {
node_state.is_ready() && node_state.grpc_advertise_addr().is_ok()
/// Returns the gRPC advertise address of the node if it is a gossip candidate.
fn find_gossip_candidate_grpc_addr(
self_chitchat_id: &ChitchatId,
node_state: &NodeState,
) -> Option<SocketAddr> {
// Ignore self node, including previous generations, and nodes that are not ready.
if self_chitchat_id.node_id == node_state.chitchat_id().node_id || !node_state.is_ready() {
return None;
}
node_state.grpc_advertise_addr().ok()
}

#[cfg(test)]
Expand All @@ -205,9 +209,44 @@ mod tests {
};

use super::*;
use crate::change::tests::NodeStateBuilder;
use crate::create_cluster_for_test;
use crate::member::{GRPC_ADVERTISE_ADDR_KEY, READINESS_KEY, READINESS_VALUE_READY};

#[tokio::test]
async fn test_find_gossip_candidate_grpc_addr() {
let gossip_advertise_addr: SocketAddr = "127.0.0.1:10000".parse().unwrap();
let grpc_advertise_addr: SocketAddr = "127.0.0.1:10001".parse().unwrap();
let self_chitchat_id =
ChitchatId::new("test-node-foo".to_string(), 1, gossip_advertise_addr);

let node_state = NodeStateBuilder::default()
.with_readiness(true)
.with_grpc_advertise_addr(grpc_advertise_addr)
.build();
let grpc_addr = find_gossip_candidate_grpc_addr(&self_chitchat_id, &node_state).unwrap();
assert_eq!(grpc_addr, grpc_advertise_addr);

let node_state = NodeStateBuilder::default()
.with_readiness(false)
.with_grpc_advertise_addr(grpc_advertise_addr)
.build();
let grpc_addr_opt = find_gossip_candidate_grpc_addr(&self_chitchat_id, &node_state);
assert!(grpc_addr_opt.is_none());

let node_state = NodeStateBuilder::default().with_readiness(false).build();
let grpc_addr_opt = find_gossip_candidate_grpc_addr(&self_chitchat_id, &node_state);
assert!(grpc_addr_opt.is_none());

let self_chitchat_id = ChitchatId::new("test-node".to_string(), 1, gossip_advertise_addr);
let node_state = NodeStateBuilder::default()
.with_readiness(true)
.with_grpc_advertise_addr(grpc_advertise_addr)
.build();
let grpc_addr_opt = find_gossip_candidate_grpc_addr(&self_chitchat_id, &node_state);
assert!(grpc_addr_opt.is_none());
}

#[tokio::test]
async fn test_perform_grpc_gossip_rounds() {
let peer_seeds = Vec::new();
Expand Down

0 comments on commit eb8ec27

Please sign in to comment.