Skip to content

Commit

Permalink
Add catch-up callback
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Feb 26, 2024
1 parent fa59b4c commit 01288ad
Show file tree
Hide file tree
Showing 12 changed files with 293 additions and 118 deletions.
1 change: 1 addition & 0 deletions chitchat-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ async fn main() -> anyhow::Result<()> {
..FailureDetectorConfig::default()
},
marked_for_deletion_grace_period: Duration::from_secs(60),
catchup_callback: None,
extra_liveness_predicate: None,
};
let chitchat_handler = spawn_chitchat(config, Vec::new(), &UdpTransport).await?;
Expand Down
11 changes: 9 additions & 2 deletions chitchat/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use std::time::Duration;

use crate::{ChitchatId, FailureDetectorConfig, NodeState};

/// User-defined predicate liveness predication applied on top of the output of the failure
/// detector.
/// An optional user-defined callback executed when the self node is lagging behind.
pub type CatchupCallback = Box<dyn Fn() + Send>;

/// An optional user-defined predicate liveness predication applied on top of the output of the
/// failure detector.
pub type ExtraLivenessPredicate = Box<dyn Fn(&NodeState) -> bool + Send>;

/// A struct for configuring a Chitchat instance.
Expand All @@ -27,6 +30,8 @@ pub struct ChitchatConfig {
// - Apply delta: for a node flagged "to be reset", Chitchat will remove the node state and
// populate a fresh new node state with the keys and values present in the delta.
pub marked_for_deletion_grace_period: Duration,
/// An optional callback executed when the self node is lagging behind.
pub catchup_callback: Option<CatchupCallback>,
// Extra lifeness predicate that can be used to define what a node being "live" means.
// It can be used for instance, to only surface the nodes that are both alive according
// to the failure detector, but also have a given set of required keys.
Expand All @@ -46,6 +51,7 @@ impl ChitchatConfig {
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: Duration::from_secs(10_000),
catchup_callback: None,
extra_liveness_predicate: None,
}
}
Expand All @@ -64,6 +70,7 @@ impl Default for ChitchatConfig {
seed_nodes: Vec::new(),
failure_detector_config: Default::default(),
marked_for_deletion_grace_period: Duration::from_secs(3_600 * 2), // 2h
catchup_callback: None,
extra_liveness_predicate: None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<'a> Serializable for DeltaOpRef<'a> {
key.serialize(buf);
versioned_value.value.serialize(buf);
versioned_value.version.serialize(buf);
versioned_value.tombstone.is_some().serialize(buf);
versioned_value.is_tombstone().serialize(buf);
}
Self::NodeToReset(chitchat_id) => {
buf.push(DeltaOpTag::NodeToReset.into());
Expand Down
6 changes: 6 additions & 0 deletions chitchat/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ mod tests {
use crate::failure_detector::{FailureDetector, FailureDetectorConfig};
use crate::ChitchatId;

impl FailureDetector {
pub fn contains_node(&self, chitchat_id: &ChitchatId) -> bool {
self.node_samples.contains_key(chitchat_id)
}
}

#[test]
fn test_failure_detector_does_not_see_a_node_as_alive_with_a_single_heartbeat() {
let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default());
Expand Down
176 changes: 149 additions & 27 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub use listener::ListenerHandle;
pub use serialize::Serializable;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tracing::{error, warn};
use tracing::{error, info, warn};

pub use self::configuration::ChitchatConfig;
pub use self::state::{ClusterStateSnapshot, NodeState};
Expand Down Expand Up @@ -103,17 +103,30 @@ impl Chitchat {
}

fn process_delta(&mut self, delta: Delta) {
self.maybe_trigger_catchup_callback(&delta);
self.cluster_state.apply_delta(delta);
}

/// Executes the catchup callback if necessary.
fn maybe_trigger_catchup_callback(&self, delta: &Delta) {
if !delta.nodes_to_reset.is_empty() {
if let Some(catchup_callback) = &self.config.catchup_callback {
info!("executing catchup callback");
catchup_callback();
}
}
}

pub(crate) fn process_message(&mut self, msg: ChitchatMessage) -> Option<ChitchatMessage> {
self.update_self_heartbeat();

match msg {
ChitchatMessage::Syn { cluster_id, digest } => {
if cluster_id != self.cluster_id() {
warn!(
our_cluster_id=%self.cluster_id(),
their_cluster_id=%cluster_id,
"Received SYN message addressed to a different cluster."
"received SYN message addressed to a different cluster"
);
return Some(ChitchatMessage::BadCluster);
}
Expand Down Expand Up @@ -149,7 +162,7 @@ impl Chitchat {
None
}
ChitchatMessage::BadCluster => {
warn!("Message rejected by peer: wrong cluster.");
warn!("message rejected by peer: wrong cluster");
None
}
}
Expand Down Expand Up @@ -244,10 +257,14 @@ impl Chitchat {
/// - updates its max version
///
/// Heartbeats are not notified.
pub fn live_nodes_watcher(&self) -> WatchStream<BTreeMap<ChitchatId, NodeState>> {
pub fn live_nodes_watch_stream(&self) -> WatchStream<BTreeMap<ChitchatId, NodeState>> {
WatchStream::new(self.live_nodes_watcher_rx.clone())
}

pub fn live_nodes_watcher(&self) -> watch::Receiver<BTreeMap<ChitchatId, NodeState>> {
self.live_nodes_watcher_rx.clone()
}

/// Returns the set of nodes considered dead by the failure detector.
pub fn dead_nodes(&self) -> impl Iterator<Item = &ChitchatId> {
self.failure_detector.dead_nodes()
Expand Down Expand Up @@ -277,6 +294,34 @@ impl Chitchat {
ClusterStateSnapshot::from(&self.cluster_state)
}

/// Resets the entire node state.
///
/// Updated key-values will see their listeners called.
/// The order of calls is arbitrary.
///
/// Existing key-values that are not present in `key_values` will be deleted
/// (not marked with a tombstone).
pub fn reset_node_state(
&mut self,
chitchat_id: &ChitchatId,
key_values: impl Iterator<Item = (String, VersionedValue)>,
max_version: Version,
last_gc_version: Version,
) {
let node_state = self.cluster_state.node_state_mut(chitchat_id);

// If the node is new, we must ensure that the failure detector is aware of it.
if node_state.max_version() == 0 {
self.failure_detector.report_heartbeat(chitchat_id);
}
node_state.retain_key_values(|_key, value| value.version > max_version);
node_state.set_last_gc_version(last_gc_version);

for (key, value) in key_values {
node_state.set_versioned_value(key, value)
}
}

pub(crate) fn update_self_heartbeat(&mut self) {
self.self_node_state().inc_heartbeat();
}
Expand Down Expand Up @@ -362,10 +407,14 @@ mod tests {
assert!(peer_node.process_message(ack_message).is_none());
}

/// Checks that all of the non-deleted key-values pairs are the same in
/// lhs and rhs.
///
/// This does NOT check for deleted KVs.
fn assert_cluster_state_eq(lhs: &NodeState, rhs: &NodeState) {
assert_eq!(lhs.num_key_values(), rhs.num_key_values());
for (key, value) in lhs.key_values() {
assert_eq!(rhs.get_versioned(key), Some(value));
assert_eq!(rhs.get(key), Some(value));
}
}

Expand Down Expand Up @@ -408,6 +457,7 @@ mod tests {
..Default::default()
},
marked_for_deletion_grace_period: Duration::from_secs(3_600),
catchup_callback: None,
extra_liveness_predicate: None,
};
start_node_with_config(transport, config).await
Expand Down Expand Up @@ -447,7 +497,7 @@ mod tests {
chitchat
.lock()
.await
.live_nodes_watcher()
.live_nodes_watch_stream()
.skip_while(|live_nodes| {
if live_nodes.len() != expected_nodes.len() {
return true;
Expand Down Expand Up @@ -502,11 +552,11 @@ mod tests {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let nodes = setup_nodes(20001..=20005, &transport).await;
let node2 = nodes.get(1).unwrap();
let mut live_rx = node2.chitchat().lock().await.live_nodes_watcher();
let mut live_nodes_stream = node2.chitchat().lock().await.live_nodes_watch_stream();
let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 5 {
break live_members;
let live_nodes = live_nodes_stream.next().await.unwrap();
if live_nodes.len() == 5 {
break live_nodes;
}
};
for node in &nodes {
Expand All @@ -532,6 +582,7 @@ mod tests {
..Default::default()
},
marked_for_deletion_grace_period: Duration::from_secs(3_600),
catchup_callback: None,
extra_liveness_predicate: Some(Box::new(|node_state| {
node_state.get("READY") == Some("true")
})),
Expand All @@ -543,20 +594,20 @@ mod tests {
nodes.push(chitchat_handle);
}

let mut num_nodes = 0;
let mut num_live_nodes = 0;
assert!(tokio::time::timeout(Duration::from_secs(1), async {
let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher();
let mut live_nodes_stream = nodes[2].chitchat().lock().await.live_nodes_watch_stream();
loop {
let live_members = live_rx.next().await.unwrap();
num_nodes = live_members.len();
if live_members.len() == 3 {
break live_members;
let live_nodes = live_nodes_stream.next().await.unwrap();
num_live_nodes = live_nodes.len();
if live_nodes.len() == 3 {
break live_nodes;
}
}
})
.await
.is_err());
assert_eq!(num_nodes, 0);
assert_eq!(num_live_nodes, 0);

nodes[0]
.chitchat()
Expand All @@ -577,11 +628,11 @@ mod tests {
.self_node_state()
.set("READY", "true");

let mut live_rx = nodes[2].chitchat().lock().await.live_nodes_watcher();
let mut live_nodes_stream = nodes[2].chitchat().lock().await.live_nodes_watch_stream();
let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 3 {
break live_members;
let live_nodes = live_nodes_stream.next().await.unwrap();
if live_nodes.len() == 3 {
break live_nodes;
}
};
for node in &nodes {
Expand All @@ -596,9 +647,9 @@ mod tests {
.mark_for_deletion("READY");

let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 2 {
break live_members;
let live_nodes = live_nodes_stream.next().await.unwrap();
if live_nodes.len() == 2 {
break live_nodes;
}
};
assert!(live_members.contains_key(&chitchat_ids[1]));
Expand All @@ -612,9 +663,9 @@ mod tests {
.set("READY", "false");

let live_members = loop {
let live_members = live_rx.next().await.unwrap();
if live_members.len() == 1 {
break live_members;
let live_nodes = live_nodes_stream.next().await.unwrap();
if live_nodes.len() == 1 {
break live_nodes;
}
};

Expand Down Expand Up @@ -968,4 +1019,75 @@ mod tests {
assert_eq!(counter_self_key.load(Ordering::SeqCst), 2);
assert_eq!(counter_other_key.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn test_maybe_trigger_catchup_callback() {
let catchup_callback_counter = Arc::new(AtomicUsize::new(0));
let catchup_callback_counter_clone = catchup_callback_counter.clone();

let mut config = ChitchatConfig::for_test(10_001);
config.catchup_callback = Some(Box::new(move || {
catchup_callback_counter_clone.fetch_add(1, Ordering::Release);
}));
let (_seed_addrs_rx, seed_addrs_tx) = watch::channel(Default::default());

let mut node = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs_tx, Vec::new());
let delta = Delta::default();
node.process_delta(delta);

let mut delta = Delta::default();
delta.add_node_to_reset(ChitchatId::for_local_test(10_002));
node.process_delta(delta);

assert_eq!(catchup_callback_counter.load(Ordering::Acquire), 1);
}

#[tokio::test]
async fn test_reset_node_state() {
let config = ChitchatConfig::for_test(10_001);
let (_seed_addrs_rx, seed_addrs_tx) = watch::channel(Default::default());
let mut node = Chitchat::with_chitchat_id_and_seeds(config, seed_addrs_tx, Vec::new());

let chitchat_id = ChitchatId::for_local_test(10_002);
node.reset_node_state(
&chitchat_id,
[(
"foo".to_string(),
VersionedValue::new("bar".to_string(), 1, false),
)]
.into_iter(),
1,
1337,
);
node.failure_detector.contains_node(&chitchat_id);

let node_state = node.cluster_state.node_state(&chitchat_id).unwrap();
assert_eq!(node_state.num_key_values(), 1);
assert_eq!(node_state.get("foo"), Some("bar"));
assert_eq!(node_state.max_version(), 1);
assert_eq!(node_state.last_gc_version(), 1337);

let chitchat_id = ChitchatId::for_local_test(10_003);
let node_state = node.cluster_state.node_state_mut(&chitchat_id);
node_state.set("foo", "bar");
node_state.set("qux", "baz");
node_state.set("toto", "titi");

node.reset_node_state(
&chitchat_id,
[(
"qux".to_string(),
VersionedValue::new("baz".to_string(), 2, false),
)]
.into_iter(),
2,
1337,
);
let node_state = node.cluster_state.node_state(&chitchat_id).unwrap();
assert_eq!(node_state.num_key_values(), 2);
assert_eq!(node_state.get("qux"), Some("baz"));
assert_eq!(node_state.get("toto"), Some("titi"));
assert_eq!(node_state.max_version(), 3);
assert_eq!(node_state.last_gc_version(), 1337);
}
}
Loading

0 comments on commit 01288ad

Please sign in to comment.