Skip to content

Commit

Permalink
Change in the liveness detection.
Browse files Browse the repository at this point in the history
- Updating heartbeat on Digest messages (in addition to process delta)
- Avoid making up an interval when we have received a single heartbeat.
- Never marking nodes with less than 2 heartbeat recorded as live.
- Using additive smoothing to deal with the problem of the inability to
  assess liveness on the first few samples.
  • Loading branch information
fulmicoton committed Feb 15, 2024
1 parent 31f8d02 commit 8b22dd6
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 123 deletions.
224 changes: 139 additions & 85 deletions chitchat/src/failure_detector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::time::Duration;
#[cfg(not(test))]
use std::time::Instant;
Expand Down Expand Up @@ -35,21 +36,6 @@ impl FailureDetector {
/// Reports node heartbeat.
pub fn report_heartbeat(&mut self, chitchat_id: &ChitchatId) {
debug!(node_id=%chitchat_id.node_id, "reporting node heartbeat.");
let heartbeat_window = self
.node_samples
.entry(chitchat_id.clone())
.or_insert_with(|| {
SamplingWindow::new(
self.config.sampling_window_size,
self.config.max_interval,
self.config.initial_interval,
)
});
heartbeat_window.report_heartbeat();
}

pub fn report_unknown(&mut self, chitchat_id: &ChitchatId) {
debug!(node_id=%chitchat_id.node_id, "reporting unknown node heartbeat.");
self.node_samples
.entry(chitchat_id.clone())
.or_insert_with(|| {
Expand All @@ -58,23 +44,28 @@ impl FailureDetector {
self.config.max_interval,
self.config.initial_interval,
)
});
})
.report_heartbeat();
}

/// Marks the node as dead or alive based on the current phi value.
pub fn update_node_liveness(&mut self, chitchat_id: &ChitchatId) {
if let Some(phi) = self.phi(chitchat_id) {
debug!(node_id=%chitchat_id.node_id, phi=phi, "updating node liveness");
if phi > self.config.phi_threshold {
self.live_nodes.remove(chitchat_id);
self.dead_nodes.insert(chitchat_id.clone(), Instant::now());
// Remove current sampling window so that when the node
// comes back online, we start with a fresh sampling window.
self.node_samples.remove(chitchat_id);
} else {
self.live_nodes.insert(chitchat_id.clone());
self.dead_nodes.remove(chitchat_id);
}
let phi_opt = self.phi(chitchat_id);
let is_alive = self
.phi(chitchat_id)
.map(|phi| phi <= self.config.phi_threshold)
.unwrap_or(false);
debug!(node_id=%chitchat_id.node_id, phi=?phi_opt, is_alive=is_alive, "computing node liveness");
if is_alive {
self.live_nodes.insert(chitchat_id.clone());
self.dead_nodes.remove(chitchat_id);
} else {
self.live_nodes.remove(chitchat_id);
self.dead_nodes.insert(chitchat_id.clone(), Instant::now());
// Remove current sampling window so that when the node
// comes back online, we start with a fresh sampling window.
// TODO is this the right idea?
self.node_samples.remove(chitchat_id);
}
}

Expand Down Expand Up @@ -121,10 +112,10 @@ impl FailureDetector {
}

/// Returns the current phi value of a node.
///
/// If we have received less than 2 heartbeat, `phi()` returns `None`.
fn phi(&mut self, chitchat_id: &ChitchatId) -> Option<f64> {
self.node_samples
.get(chitchat_id)
.map(|sampling_window| sampling_window.phi())
self.node_samples.get(chitchat_id)?.phi()
}
}

Expand Down Expand Up @@ -182,102 +173,119 @@ struct SamplingWindow {
last_heartbeat: Option<Instant>,
/// Heartbeat intervals greater than this value are ignored.
max_interval: Duration,
/// The initial interval on startup.
initial_interval: Duration,
/// We may not have many intervals in the beginning.
/// For this reason we use additive smoothing to make sure we are
/// lenient on the first few intervals, and we don't have nodes flapping from
/// life to death.
additive_smoothing: AdditiveSmoothing,
}

impl SamplingWindow {
// Construct a new instance.
pub fn new(window_size: usize, max_interval: Duration, initial_interval: Duration) -> Self {
Self {
intervals: BoundedArrayStats::new(window_size),
pub fn new(window_size: usize, max_interval: Duration, prior_interval: Duration) -> Self {
let additive_smoothing = AdditiveSmoothing {
prior_mean: prior_interval.as_secs_f64(),
prior_weight: 5.0f64,
};
SamplingWindow {
intervals: BoundedArrayStats::with_capacity(window_size),
last_heartbeat: None,
max_interval,
initial_interval,
additive_smoothing,
}
}

/// Reports a heartbeat.
pub fn report_heartbeat(&mut self) {
if let Some(last_value) = &self.last_heartbeat {
let interval = last_value.elapsed();
let now = Instant::now();
if let Some(last_value) = self.last_heartbeat {
let interval = now.duration_since(last_value);
if interval <= self.max_interval {
self.intervals.append(interval.as_secs_f64());
}
} else {
self.intervals.append(self.initial_interval.as_secs_f64());
};
// This is our first heartbeat.
// No way to compute an interval.
// This is fine.
}
self.last_heartbeat = Some(Instant::now());
}

/// Computes the sampling window's phi value.
pub fn phi(&self) -> f64 {
if let Some(last_heartbeat) = self.last_heartbeat {
assert!(self.intervals.mean() > 0.0);
let elapsed_time = last_heartbeat.elapsed().as_secs_f64();
elapsed_time / self.intervals.mean()
} else {
// if we phi is called before we have a sample, we assume the node isn't really alive.
f64::INFINITY
}
/// Returns `None` if have not received two heartbeat yet.
pub fn phi(&self) -> Option<f64> {
// We avoid computing phi if we have only received one heartbeat.
// It could be data from an old dead node after all.
let len_non_zero = NonZeroUsize::new(self.intervals.len())?;
let sum = self.intervals.sum();
let last_heartbeat = self.last_heartbeat?;
let interval_mean = self.additive_smoothing.compute_mean(len_non_zero, sum);
let elapsed_time = last_heartbeat.elapsed().as_secs_f64();
Some(elapsed_time / interval_mean)
}
}

/// An array that retains a fixed number of streaming values.
#[derive(Debug)]
struct BoundedArrayStats {
/// The values.
data: Vec<f64>,
/// Number of accumulated values.
size: usize,
values: Box<[f64]>,
/// Is the values array filled?
is_filled: bool,
/// Position of the index within the values array.
/// Position of the next value to be written in the values array.
index: usize,
/// The accumulated sum of values.
sum: f64,
/// The accumulated mean of values.
mean: f64,
}

#[derive(Debug)]
struct AdditiveSmoothing {
prior_mean: f64,
prior_weight: f64,
}

impl AdditiveSmoothing {
fn compute_mean(&self, len: NonZeroUsize, sum: f64) -> f64 {
(sum + self.prior_weight * self.prior_mean) / (len.get() as f64 + self.prior_weight)
}
}

impl BoundedArrayStats {
pub fn new(size: usize) -> Self {
pub fn with_capacity(capacity: usize) -> Self {
Self {
data: vec![0.0; size],
size,
values: vec![0.0; capacity].into_boxed_slice(),
is_filled: false,
index: 0,
sum: 0.0,
mean: 0.0,
}
}

/// Returns the mean.
pub fn mean(&self) -> f64 {
self.mean
pub fn sum(&self) -> f64 {
self.sum
}

/// Appends a new value and updates the statistics.
pub fn append(&mut self, interval: f64) {
if self.index == self.size {
fn increment_index(&mut self) {
self.index = self.index + 1;

Check failure on line 269 in chitchat/src/failure_detector.rs

View workflow job for this annotation

GitHub Actions / clippy

manual implementation of an assign operation

error: manual implementation of an assign operation --> chitchat/src/failure_detector.rs:269:9 | 269 | self.index = self.index + 1; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `self.index += 1` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#assign_op_pattern = note: `-D clippy::assign-op-pattern` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::assign_op_pattern)]`

Check failure on line 269 in chitchat/src/failure_detector.rs

View workflow job for this annotation

GitHub Actions / clippy

manual implementation of an assign operation

error: manual implementation of an assign operation --> chitchat/src/failure_detector.rs:269:9 | 269 | self.index = self.index + 1; | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: replace it with: `self.index += 1` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#assign_op_pattern = note: `-D clippy::assign-op-pattern` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(clippy::assign_op_pattern)]`
if self.index == self.values.len() {
self.is_filled = true;
self.index = 0;
}
}

/// Appends a new value and updates the statistics.
pub fn append(&mut self, interval: f64) {
self.values[self.index] = interval;
self.sum += interval;
if self.is_filled {
self.sum -= self.data[self.index];
self.sum -= self.values[self.index];
}
self.sum += interval;

self.data[self.index] = interval;
self.index += 1;

self.mean = self.sum / self.len() as f64;
self.increment_index();
}

fn len(&self) -> usize {
if self.is_filled {
return self.size;
return self.values.len();
}
self.index
}
Expand All @@ -294,6 +302,17 @@ mod tests {
use crate::failure_detector::{FailureDetector, FailureDetectorConfig};
use crate::ChitchatId;

#[test]
fn test_failure_detector_does_not_see_a_node_as_alive_with_a_single_heartbeat() {
let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default());
let chitchat_id = ChitchatId::for_local_test(10_001);
failure_detector.report_heartbeat(&chitchat_id);
failure_detector.update_node_liveness(&chitchat_id);
let dead_nodes: Vec<&ChitchatId> = failure_detector.dead_nodes().collect();
assert_eq!(dead_nodes.len(), 1);
assert!(failure_detector.live_nodes().next().is_none());
}

#[test]
fn test_failure_detector() {
let mut rng = rand::thread_rng();
Expand Down Expand Up @@ -415,20 +434,28 @@ mod tests {
}

#[test]
fn test_failure_detector_node_state_after_initial_interval() {
fn test_failure_detector_node_state_additive_smoothing_predominant_in_the_beginning() {
let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default());

// We add a few very short samples.
let chitchat_id = ChitchatId::for_local_test(10_001);
failure_detector.report_heartbeat(&chitchat_id);

MockClock::advance(Duration::from_secs(1));
let chitchat_id = ChitchatId::for_local_test(10_001);
for _ in 0..5 {
MockClock::advance(Duration::from_millis(200));
failure_detector.report_heartbeat(&chitchat_id);
}

MockClock::advance(Duration::from_secs(6));
failure_detector.update_node_liveness(&chitchat_id);

let live_nodes = failure_detector
.live_nodes()
.map(|chitchat_id| chitchat_id.node_id.as_str())
.collect::<Vec<_>>();
assert_eq!(live_nodes, vec!["node-10001"]);

MockClock::advance(Duration::from_secs(40));
failure_detector.update_node_liveness(&chitchat_id);

Expand All @@ -439,6 +466,27 @@ mod tests {
assert_eq!(live_nodes, Vec::<&str>::new());
}

#[test]
fn test_failure_detector_node_state_additive_smoothing_effect_fades_off() {
let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default());

// We add a few very short samples.
let chitchat_id = ChitchatId::for_local_test(10_001);
failure_detector.report_heartbeat(&chitchat_id);

let chitchat_id = ChitchatId::for_local_test(10_001);
for _ in 0..1000 {
MockClock::advance(Duration::from_millis(200));
failure_detector.report_heartbeat(&chitchat_id);
}

MockClock::advance(Duration::from_secs(6));

failure_detector.update_node_liveness(&chitchat_id);

assert!(failure_detector.live_nodes().next().is_none());
}

#[test]
fn test_sampling_window() {
let mut sampling_window =
Expand All @@ -448,43 +496,49 @@ mod tests {
MockClock::advance(Duration::from_secs(3));
sampling_window.report_heartbeat();

// Now intervals window is: [2.0, 3.0].
let mean = (2.0 + 3.0) / 2.0;
// Now intervals window is: [3.0].
let mean = (3.0 + 2.0 * 5.0) / (1.0f64 + 5.0f64);

// 0s elapsed since last reported heartbeat.
assert!((sampling_window.phi() - (0.0 / mean)).abs() < f64::EPSILON);
assert_nearly_equal(sampling_window.phi().unwrap(), 0.0f64);

// 1s elapsed since last reported heartbeat.
MockClock::advance(Duration::from_secs(1));
assert!((sampling_window.phi() - (1.0 / mean)).abs() < f64::EPSILON);

assert_nearly_equal(sampling_window.phi().unwrap(), 1.0f64 / mean);

// Check reported heartbeat later than max_interval is ignore.
MockClock::advance(Duration::from_secs(5));
sampling_window.report_heartbeat();
MockClock::advance(Duration::from_secs(2));

assert_nearly_equal(sampling_window.phi().unwrap(), 2.0f64 / mean);
}

#[track_caller]
fn assert_nearly_equal(value: f64, expected: f64) {
assert!(
(sampling_window.phi() - (2.0 / mean)).abs() < f64::EPSILON,
"Mean value should not change."
(value - expected).abs() < f64::EPSILON,
"value ({value}) is not not nearly equal to expected {expected}"
);
}

#[test]
fn test_bounded_array_stats() {
let mut bounded_array = BoundedArrayStats::new(10);
let mut bounded_array = BoundedArrayStats::with_capacity(10);
for i in 1..10 {
bounded_array.append(i as f64);
}
assert_eq!(bounded_array.index, 9);
assert_eq!(bounded_array.len(), 9);
assert!(!bounded_array.is_filled);
assert!((bounded_array.mean() - 5.0f64).abs() < f64::EPSILON);

assert_nearly_equal(bounded_array.sum(), 45.0f64);
for i in 10..14 {
bounded_array.append(i as f64);
}
assert_eq!(bounded_array.index, 3);
assert_eq!(bounded_array.len(), 10);
assert!(bounded_array.is_filled);
assert!((bounded_array.mean() - 8.5f64).abs() < f64::EPSILON);
assert_nearly_equal(bounded_array.sum(), 55.0f64);
}
}
Loading

0 comments on commit 8b22dd6

Please sign in to comment.