From ca80b6ce0bafb50346487b37abbde993d292e24b Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 23 Feb 2024 12:06:37 +0900 Subject: [PATCH] Updating policy --- chitchat/src/state.rs | 51 +++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 7338268..1719a66 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -43,7 +43,7 @@ impl NodeState { node_id, heartbeat: Heartbeat(0), key_values: Default::default(), - max_version: Default::default(), + max_version: 0u64, listeners, } } @@ -212,12 +212,12 @@ impl NodeState { } } - fn set_with_version(&mut self, key: String, value: String, version: Version) { + fn set_with_version(&mut self, key: impl ToString, value: impl ToString, version: Version) { assert!(version > self.max_version); self.set_versioned_value( - key, + key.to_string(), VersionedValue { - value, + value: value.to_string(), version, tombstone: None, }, @@ -407,9 +407,10 @@ impl ClusterState { /// /// Number of stale key-value pairs carried by the node. A key-value is considered stale if its /// local version is higher than the max version of the digest, also called "floor version". -#[derive(Clone, Copy, Eq, PartialEq)] +#[derive(Clone, Copy, Eq, PartialEq, Debug)] struct Staleness { is_unknown: bool, + max_version: u64, num_stale_key_values: usize, } @@ -417,11 +418,21 @@ struct Staleness { /// information is gossiped. impl Ord for Staleness { fn cmp(&self, other: &Self) -> Ordering { - // Nodes are gossiped in order of priority: + // Nodes get gossiped in priority. // Unknown nodes get gossiped first. + // If several nodes are unknown, the one with the lowest max_version gets gossiped first. + // This is a bit of a hack to make sure we know about the metastore + // as soon as possible in quickwit, even when the indexer's chitchat state is bloated. + // + // Within known nodes, the one with the highest number of stale records gets gossiped first, + // as described in the scuttlebutt paper. self.is_unknown.cmp(&other.is_unknown).then_with(|| { - // Then nodes with the highest number of stale records get higher priority. - self.num_stale_key_values.cmp(&other.num_stale_key_values) + if self.is_unknown { + self.max_version.cmp(&other.max_version).reverse() + } else { + // Then nodes with the highest number of stale records get higher priority. + self.num_stale_key_values.cmp(&other.num_stale_key_values) + } }) } } @@ -461,6 +472,7 @@ fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option SortedStaleNodes<'a> { /// Adds a node to the list of stale nodes. fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) { let Some(staleness) = staleness_score(node_state, 0u64) else { - // The node does not have any stale KV. + // The node does not have any stale key values. return; }; let heartbeat = node_state.heartbeat; @@ -656,28 +668,24 @@ mod tests { assert_eq!(stale_nodes.stale_nodes.len(), 1); let mut node2_state = NodeState::for_test(); - node2_state - .key_values - .insert("key_a".to_string(), VersionedValue::for_test("value_a", 1)); + node2_state.set_with_version("key_a", "value_a", 1); stale_nodes.insert(&node2, &node2_state); let expected_staleness = Staleness { is_unknown: true, - num_stale_key_values: 1, + max_version: 0, + num_stale_key_values: 0, }; assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1); let mut node3_state = NodeState::for_test(); - node3_state - .key_values - .insert("key_b".to_string(), VersionedValue::for_test("value_b", 2)); - node3_state - .key_values - .insert("key_c".to_string(), VersionedValue::for_test("value_c", 3)); + node3_state.set_with_version("key_b", "value_b", 2); + node3_state.set_with_version("key_c", "value_c", 3); stale_nodes.insert(&node3, &node3_state); let expected_staleness = Staleness { is_unknown: true, - num_stale_key_values: 2, + max_version: 3, + num_stale_key_values: 3, }; assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1); } @@ -716,6 +724,7 @@ mod tests { assert_eq!(stale_nodes.stale_nodes.len(), 1); let expected_staleness = Staleness { is_unknown: false, + max_version: 1, num_stale_key_values: 2, }; assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1); @@ -802,7 +811,7 @@ mod tests { .into_iter() .map(|stale_node| stale_node.chitchat_id.gossip_advertise_addr.port()) .collect::>(), - vec![10_006, 10_005, 10_004, 10_001, 10_002] + vec![10_005, 10_006, 10_004, 10_001, 10_002] ); }