Skip to content

Commit

Permalink
Change in chitchat gossiping priority.
Browse files Browse the repository at this point in the history
Following the original paper, chitchat currently first shares nodes with the highest number of stale values.

As a side effect, nodes that are not emitting many KVs are gossiped last.
In quickwit, under a little bit of load (1000 indexes on 10 indexer), it has a very dire effect.

Indexer that reconnect have to gossip the entire cluster state (~10MB) before being able to get any
information about the metastore.
Knowing at least one node with the metastore service is required for nodes to declare themselves as live.

This PR changes the gossip order.
It prioritizes nodes for which the node that originated the Syn message
has not receives any KV yet.

This is identified by the fact that either the node was not part of the
digest at all, or the floor_version is equal to 0.

The latter (floor_version == 0) should not happen today, but this case
is done in preparation for another PR updating heartbeat on Syn.
  • Loading branch information
fulmicoton committed Feb 19, 2024
1 parent 173f391 commit 3b3a2e7
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 91 deletions.
2 changes: 2 additions & 0 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ mod tests {
Chitchat::with_chitchat_id_and_seeds(test_config, empty_seeds(), Vec::new());
let mut test_transport = transport.open(test_addr).await.unwrap();

dbg!("1");
let server_config = ChitchatConfig::for_test(2);
let server_id = server_config.chitchat_id.clone();
let server_addr = server_config.chitchat_id.gossip_advertise_addr;
Expand All @@ -582,6 +583,7 @@ mod tests {
server_chitchat.update_heartbeat();
let syn = server_chitchat.create_syn_message();
let syn_ack = test_chitchat.process_message(syn).unwrap();
dbg!(&syn_ack);
server_chitchat.process_message(syn_ack);
})
.await;
Expand Down
282 changes: 191 additions & 91 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::Ordering;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashSet};
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -402,20 +403,77 @@ impl ClusterState {
}
}

/// Score used to decide which member should be gossiped first.
///
/// Number of stale key-value pairs carried by the node. A key-value is considered stale if its
/// local version is higher than the max version of the digest, also called "floor version".
type Staleness = usize;
#[derive(Clone, Copy, Eq, PartialEq)]
struct Staleness {
is_unknown: bool,
num_stale_records: usize,
}

/// The ord should be considered a "priority". The higher the faster a node's
/// information is gossiped.
impl Ord for Staleness {
fn cmp(&self, other: &Self) -> Ordering {
// Nodes get gossiped in priority.
// Unknown nodes get gossiped first.
self.is_unknown.cmp(&other.is_unknown).then_with(|| {
// Then nodes with the highest number of stale records get higher priority.
self.num_stale_records.cmp(&other.num_stale_records)
})
}
}

impl PartialOrd for Staleness {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// Sorts the stale nodes in decreasing order of staleness.
#[derive(Debug, Default)]
#[derive(Default)]
struct SortedStaleNodes<'a> {
stale_nodes: BTreeMap<Staleness, Vec<StaleNode<'a>>>,
}

/// The `staleness_score` is used to decide which node should be gossiped first.
/// `floor_version` is the version (transmitted in the digest), below which
/// all of the records have already been received.
///
/// There is no such thing as a KV for version 0. So if `floor_version == 0`
/// it means the node is entirely new.
/// We artificially prioritize those nodes to make sure their membership (in quickwit the service
/// key for instnace) and initial KVs spread rapidly.
///
/// If no KV is stale, there is nothing to gossip, and we simply return `None`:
/// the node is not a candidate for gossip.
fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option<Staleness> {
let is_unknown = floor_version == 0u64;
let num_stale_kv = if is_unknown {
node_state.num_key_values()
} else {
node_state.stale_key_values(floor_version).count()
};
// We don't return None if the node is unknown yet because we want to make
// sure the node is added to the neighbors's list.
if !is_unknown && num_stale_kv == 0 {
return None;
}
Some(Staleness {
is_unknown,
num_stale_records: num_stale_kv,
})
}

impl<'a> SortedStaleNodes<'a> {
/// Adds a node to the list of stale nodes.
fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) {
let staleness = node_state.num_key_values() + 1; // +1 for the heartbeat.
let Some(staleness) = staleness_score(node_state, 0u64) else {
// The node does not have any stale KV.
return;
};
let heartbeat = node_state.heartbeat;
let floor_version = 0;

Expand All @@ -440,22 +498,20 @@ impl<'a> SortedStaleNodes<'a> {
) {
let heartbeat: Heartbeat = node_digest.heartbeat.max(node_state.heartbeat);
let floor_version = node_digest.max_version;
let mut staleness = node_state.stale_key_values(floor_version).count();
if heartbeat > node_digest.heartbeat {
staleness += 1;
}
if staleness > 0 {
let stale_node = StaleNode {
chitchat_id,
heartbeat,
node_state,
floor_version,
};
self.stale_nodes
.entry(staleness)
.or_default()
.push(stale_node);
}
let Some(staleness) = staleness_score(node_state, floor_version) else {
// The node does not have any stale KV.
return;
};
let stale_node = StaleNode {
chitchat_id,
heartbeat,
node_state,
floor_version,
};
self.stale_nodes
.entry(staleness)
.or_default()
.push(stale_node);
}

/// Returns an iterator over the stale nodes sorted in decreasing order of staleness.
Expand Down Expand Up @@ -592,38 +648,40 @@ mod tests {
let mut stale_nodes = SortedStaleNodes::default();

let node1 = ChitchatId::for_local_test(10_001);
let node1_state = NodeState::for_test();
stale_nodes.insert(&node1, &node1_state);
let node2 = ChitchatId::for_local_test(10_002);
let node3 = ChitchatId::for_local_test(10_003);

let expected_staleness = 1;
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
// No stale KV. We still insert the node!
// That way it will get a node state, and be a candidate for gossip later.
let node_state1 = NodeState::for_test();
stale_nodes.insert(&node1, &node_state1);
assert_eq!(stale_nodes.stale_nodes.len(), 1);

let node2 = ChitchatId::for_local_test(10_002);
let mut node2_state = NodeState::for_test();
node2_state
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
stale_nodes.insert(&node2, &node2_state);

let expected_staleness = 2;
let expected_staleness = Staleness {
is_unknown: true,
num_stale_records: 1,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);

let node3 = ChitchatId::for_local_test(10_003);
let mut node3_state = NodeState::for_test();
node3_state
.key_values
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
stale_nodes.insert(&node3, &node3_state);

let expected_staleness = 2;
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 2);
node3_state
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 3));

let num_nodes = stale_nodes
.stale_nodes
.values()
.map(|nodes| nodes.len())
.sum::<usize>();
assert_eq!(num_nodes, 3);
stale_nodes.insert(&node3, &node3_state);
let expected_staleness = Staleness {
is_unknown: true,
num_stale_records: 2,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
}

#[test]
Expand All @@ -632,79 +690,121 @@ mod tests {

let node1 = ChitchatId::for_local_test(10_001);
let node1_state = NodeState::for_test();
stale_nodes.offer(&node1, &node1_state, &NodeDigest::new(Heartbeat(0), 0));

assert_eq!(stale_nodes.stale_nodes.len(), 0);
stale_nodes.offer(&node1, &node1_state, &NodeDigest::new(Heartbeat(0), 1));
// No stale records. This is not a candidate for gossip.
assert!(stale_nodes.stale_nodes.is_empty());

let node2 = ChitchatId::for_local_test(10_002);
let mut node2_state = NodeState::for_test();
node2_state.heartbeat = Heartbeat(1);
stale_nodes.offer(&node2, &node2_state, &NodeDigest::new(Heartbeat(0), 0));

let expected_staleness = 1;
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
node2_state
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
stale_nodes.offer(&node2, &node2_state, &NodeDigest::new(Heartbeat(0), 1));
// No stale records (due to the floor version). This is not a candidate for gossip.
assert!(stale_nodes.stale_nodes.is_empty());

let node3 = ChitchatId::for_local_test(10_003);
let node3 = ChitchatId::for_local_test(10_002);
let mut node3_state = NodeState::for_test();
node3_state
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
stale_nodes.offer(&node3, &node3_state, &NodeDigest::new(Heartbeat(0), 0));

let expected_staleness = 1;
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 2);

let node4 = ChitchatId::for_local_test(10_004);
let mut node4_state = NodeState::for_test();
node4_state.heartbeat = Heartbeat(1);
node4_state
node3_state
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
stale_nodes.offer(&node4, &node4_state, &NodeDigest::new(Heartbeat(0), 0));

let expected_staleness = 2;
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
node3_state
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 3));
stale_nodes.offer(&node3, &node3_state, &NodeDigest::new(Heartbeat(0), 1));
assert_eq!(stale_nodes.stale_nodes.len(), 1);
let expected_staleness = Staleness {
is_unknown: false,
num_stale_records: 2,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
}

#[test]
fn test_sorted_stale_nodes_into_iter() {
let mut stale_nodes = SortedStaleNodes::default();
let stale_node1 = StaleNode {
chitchat_id: &ChitchatId::for_local_test(10_001),
heartbeat: Heartbeat(0),
node_state: &NodeState::for_test(),
floor_version: 0,
};
stale_nodes.stale_nodes.insert(1, vec![stale_node1]);

let stale_node2 = StaleNode {
chitchat_id: &ChitchatId::for_local_test(10_002),
heartbeat: Heartbeat(0),
node_state: &NodeState::for_test(),
floor_version: 0,
};
let stale_node3 = StaleNode {
chitchat_id: &ChitchatId::for_local_test(10_003),
heartbeat: Heartbeat(0),
node_state: &NodeState::for_test(),
floor_version: 0,
};
let stale_node4 = StaleNode {
chitchat_id: &ChitchatId::for_local_test(10_004),
heartbeat: Heartbeat(0),
node_state: &NodeState::for_test(),
floor_version: 0,
};
stale_nodes
.stale_nodes
.insert(2, vec![stale_node2, stale_node3, stale_node4]);
let node1 = ChitchatId::for_local_test(10_001);
let mut node_state1 = NodeState::for_test();
node_state1
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
node_state1
.key_values
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
node_state1
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 3));
stale_nodes.offer(&node1, &node_state1, &NodeDigest::new(Heartbeat(0), 1));
// 2 stale values.

let node2 = ChitchatId::for_local_test(10_002);
let mut node_state2 = NodeState::for_test();
node_state2
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
node_state2
.key_values
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
node_state2
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 5));
stale_nodes.offer(&node2, &node_state2, &NodeDigest::new(Heartbeat(0), 2));
// 1 stale value.

let node3 = ChitchatId::for_local_test(10_003);
let mut node_state3 = NodeState::for_test();
node_state3
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
node_state3
.key_values
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
node_state3
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 3));
stale_nodes.offer(&node3, &node_state3, &NodeDigest::new(Heartbeat(0), 7));
// 0 stale values.

let node4 = ChitchatId::for_local_test(10_004);
let mut node_state4 = NodeState::for_test();
node_state4
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
node_state4
.key_values
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
node_state4
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 5));
node_state4
.key_values
.insert("key_d".to_string(), VersionedValue::for_test("value_d", 7));
stale_nodes.offer(&node4, &node_state4, &NodeDigest::new(Heartbeat(0), 1));

// 3 stale values
let node5 = ChitchatId::for_local_test(10_005);
let node_state5 = NodeState::for_test();
stale_nodes.insert(&node5, &node_state5);

// 0 stale values
let node6 = ChitchatId::for_local_test(10_006);
let mut node_state6 = NodeState::for_test();
node_state6
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
stale_nodes.insert(&node6, &node_state6);

// 1 stale values
assert_eq!(
stale_nodes
.into_iter()
.map(|stale_node| stale_node.chitchat_id.gossip_advertise_addr.port())
.collect::<Vec<_>>(),
vec![10_003, 10_002, 10_004, 10_001]
vec![10_006, 10_005, 10_004, 10_001, 10_002]
);
}

Expand Down Expand Up @@ -1119,11 +1219,11 @@ mod tests {
);
assert!(delta.nodes_to_reset.is_empty());
let mut expected_delta = Delta::default();
expected_delta.add_node(node1.clone(), Heartbeat(10_000));
expected_delta.add_kv(&node1, "key_b", "2", 2, None);
expected_delta.add_node(node2.clone(), Heartbeat(0));
expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, None);
expected_delta.set_serialized_len(78);
expected_delta.add_node(node1.clone(), Heartbeat(10_000));
expected_delta.add_kv(&node1, "key_b", "2", 2, None);
expected_delta.set_serialized_len(76);
assert_eq!(delta, expected_delta);
}
{
Expand Down

0 comments on commit 3b3a2e7

Please sign in to comment.