Skip to content

Commit

Permalink
Updating policy
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 23, 2024
1 parent 257f81e commit 728e715
Showing 1 changed file with 33 additions and 24 deletions.
57 changes: 33 additions & 24 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,12 @@ impl NodeState {
}
}

fn set_with_version(&mut self, key: String, value: String, version: Version) {
fn set_with_version(&mut self, key: impl ToString, value: impl ToString, version: Version) {
assert!(version > self.max_version);
self.set_versioned_value(
key,
key.to_string(),
VersionedValue {
value,
value: value.to_string(),
version,
tombstone: None,
},
Expand Down Expand Up @@ -468,21 +468,32 @@ impl ClusterState {
///
/// Number of stale key-value pairs carried by the node. A key-value is considered stale if its
/// local version is higher than the max version of the digest, also called "floor version".
#[derive(Clone, Copy, Eq, PartialEq)]
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
struct Staleness {
is_unknown: bool,
max_version: u64,
num_stale_key_values: usize,
}

/// The ord should be considered a "priority". The higher, the faster a node's
/// information is gossiped.
impl Ord for Staleness {
fn cmp(&self, other: &Self) -> Ordering {
// Nodes are gossiped in order of priority:
// Nodes get gossiped in priority.
// Unknown nodes get gossiped first.
// If several nodes are unknown, the one with the lowest max_version gets gossiped first.
// This is a bit of a hack to make sure we know about the metastore
// as soon as possible in quickwit, even when the indexer's chitchat state is bloated.
//
// Within known nodes, the one with the highest number of stale records gets gossiped first,
// as described in the scuttlebutt paper.
self.is_unknown.cmp(&other.is_unknown).then_with(|| {
// Then nodes with the highest number of stale records get higher priority.
self.num_stale_key_values.cmp(&other.num_stale_key_values)
if self.is_unknown {
self.max_version.cmp(&other.max_version).reverse()
} else {
// Then nodes with the highest number of stale records get higher priority.
self.num_stale_key_values.cmp(&other.num_stale_key_values)
}
})
}
}
Expand Down Expand Up @@ -522,6 +533,7 @@ fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option<Stalene
}
Some(Staleness {
is_unknown,
max_version: node_state.max_version,
num_stale_key_values,
})
}
Expand All @@ -530,10 +542,9 @@ impl<'a> SortedStaleNodes<'a> {
/// Adds a node to the list of stale nodes.
fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) {
let Some(staleness) = staleness_score(node_state, 0u64) else {
// The node does not have any stale KV.
// The node does not have any stale key values.
return;
};
let heartbeat = node_state.heartbeat;

let floor_version = 0;
let stale_node = StaleNode {
Expand Down Expand Up @@ -711,28 +722,24 @@ mod tests {
assert_eq!(stale_nodes.stale_nodes.len(), 1);

let mut node2_state = NodeState::for_test();
node2_state
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
node2_state.set_with_version("key_a", "value_a", 1);
stale_nodes.insert(&node2, &node2_state);
let expected_staleness = Staleness {
is_unknown: true,
num_stale_key_values: 1,
max_version: 0,
num_stale_key_values: 0,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);

let mut node3_state = NodeState::for_test();
node3_state
.key_values
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
node3_state
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 3));
node3_state.set_with_version("key_b", "value_b", 2);
node3_state.set_with_version("key_c", "value_c", 3);

stale_nodes.insert(&node3, &node3_state);
let expected_staleness = Staleness {
is_unknown: true,
num_stale_key_values: 2,
max_version: 3,
num_stale_key_values: 3,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
}
Expand Down Expand Up @@ -771,6 +778,7 @@ mod tests {
assert_eq!(stale_nodes.stale_nodes.len(), 1);
let expected_staleness = Staleness {
is_unknown: false,
max_version: 1,
num_stale_key_values: 2,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
Expand Down Expand Up @@ -856,7 +864,7 @@ mod tests {
.into_iter()
.map(|stale_node| stale_node.chitchat_id.gossip_advertise_addr.port())
.collect::<Vec<_>>(),
vec![10_006, 10_005, 10_004, 10_001, 10_002]
vec![10_005, 10_006, 10_004, 10_001, 10_002]
);
}

Expand Down Expand Up @@ -1315,12 +1323,13 @@ mod tests {
&HashSet::new(),
);
let mut expected_delta = Delta::default();
expected_delta.add_node(node2.clone(), 0u64);
expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false);
expected_delta.add_node_to_reset(node1.clone());
expected_delta.add_node(node1.clone(), 3u64);
expected_delta.add_kv(&node1, "key_b", "2", 2, false);
expected_delta.add_node(node2.clone(), 0u64);
expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false);
expected_delta.set_serialized_len(80);
expected_delta.set_serialized_len(83);
assert_eq!(&delta, &expected_delta);
}
}

Expand Down

0 comments on commit 728e715

Please sign in to comment.