Skip to content

Commit

Permalink
Updating policy
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 23, 2024
1 parent a703bb0 commit ca80b6c
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl NodeState {
node_id,
heartbeat: Heartbeat(0),
key_values: Default::default(),
max_version: Default::default(),
max_version: 0u64,
listeners,
}
}
Expand Down Expand Up @@ -212,12 +212,12 @@ impl NodeState {
}
}

fn set_with_version(&mut self, key: String, value: String, version: Version) {
fn set_with_version(&mut self, key: impl ToString, value: impl ToString, version: Version) {
assert!(version > self.max_version);
self.set_versioned_value(
key,
key.to_string(),
VersionedValue {
value,
value: value.to_string(),
version,
tombstone: None,
},
Expand Down Expand Up @@ -407,21 +407,32 @@ impl ClusterState {
///
/// Number of stale key-value pairs carried by the node. A key-value is considered stale if its
/// local version is higher than the max version of the digest, also called "floor version".
#[derive(Clone, Copy, Eq, PartialEq)]
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
struct Staleness {
is_unknown: bool,
max_version: u64,
num_stale_key_values: usize,
}

/// The ord should be considered a "priority". The higher, the faster a node's
/// information is gossiped.
impl Ord for Staleness {
fn cmp(&self, other: &Self) -> Ordering {
// Nodes are gossiped in order of priority:
// Nodes get gossiped in priority.
// Unknown nodes get gossiped first.
// If several nodes are unknown, the one with the lowest max_version gets gossiped first.
// This is a bit of a hack to make sure we know about the metastore
// as soon as possible in quickwit, even when the indexer's chitchat state is bloated.
//
// Within known nodes, the one with the highest number of stale records gets gossiped first,
// as described in the scuttlebutt paper.
self.is_unknown.cmp(&other.is_unknown).then_with(|| {
// Then nodes with the highest number of stale records get higher priority.
self.num_stale_key_values.cmp(&other.num_stale_key_values)
if self.is_unknown {
self.max_version.cmp(&other.max_version).reverse()
} else {
// Then nodes with the highest number of stale records get higher priority.
self.num_stale_key_values.cmp(&other.num_stale_key_values)
}
})
}
}
Expand Down Expand Up @@ -461,6 +472,7 @@ fn staleness_score(node_state: &NodeState, floor_version: u64) -> Option<Stalene
}
Some(Staleness {
is_unknown,
max_version: node_state.max_version,
num_stale_key_values,
})
}
Expand All @@ -469,7 +481,7 @@ impl<'a> SortedStaleNodes<'a> {
/// Adds a node to the list of stale nodes.
fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) {
let Some(staleness) = staleness_score(node_state, 0u64) else {
// The node does not have any stale KV.
// The node does not have any stale key values.
return;
};
let heartbeat = node_state.heartbeat;
Expand Down Expand Up @@ -656,28 +668,24 @@ mod tests {
assert_eq!(stale_nodes.stale_nodes.len(), 1);

let mut node2_state = NodeState::for_test();
node2_state
.key_values
.insert("key_a".to_string(), VersionedValue::for_test("value_a", 1));
node2_state.set_with_version("key_a", "value_a", 1);
stale_nodes.insert(&node2, &node2_state);
let expected_staleness = Staleness {
is_unknown: true,
num_stale_key_values: 1,
max_version: 0,
num_stale_key_values: 0,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);

let mut node3_state = NodeState::for_test();
node3_state
.key_values
.insert("key_b".to_string(), VersionedValue::for_test("value_b", 2));
node3_state
.key_values
.insert("key_c".to_string(), VersionedValue::for_test("value_c", 3));
node3_state.set_with_version("key_b", "value_b", 2);
node3_state.set_with_version("key_c", "value_c", 3);

stale_nodes.insert(&node3, &node3_state);
let expected_staleness = Staleness {
is_unknown: true,
num_stale_key_values: 2,
max_version: 3,
num_stale_key_values: 3,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
}
Expand Down Expand Up @@ -716,6 +724,7 @@ mod tests {
assert_eq!(stale_nodes.stale_nodes.len(), 1);
let expected_staleness = Staleness {
is_unknown: false,
max_version: 1,
num_stale_key_values: 2,
};
assert_eq!(stale_nodes.stale_nodes[&expected_staleness].len(), 1);
Expand Down Expand Up @@ -802,7 +811,7 @@ mod tests {
.into_iter()
.map(|stale_node| stale_node.chitchat_id.gossip_advertise_addr.port())
.collect::<Vec<_>>(),
vec![10_006, 10_005, 10_004, 10_001, 10_002]
vec![10_005, 10_006, 10_004, 10_001, 10_002]
);
}

Expand Down

0 comments on commit ca80b6c

Please sign in to comment.