Skip to content

Commit

Permalink
Merge pull request #830 from Lorak-mmk/fix_latency_crash
Browse files Browse the repository at this point in the history
Fix crash when Instant::now() returns the same value twice
  • Loading branch information
piodul authored Oct 12, 2023
2 parents e6d6d3e + e900ac5 commit c2746af
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 9 deletions.
2 changes: 1 addition & 1 deletion scylla/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ bytes = "1.0.1"
futures = "0.3.6"
histogram = "0.6.9"
num_enum = "0.6"
tokio = { version = "1.27", features = ["net", "time", "io-util", "sync", "rt", "macros"] }
tokio = { version = "1.27", features = ["net", "time", "io-util", "sync", "rt", "macros", "test-util"] }
snap = "1.0"
uuid = { version = "1.0", features = ["v4"] }
rand = "0.8.3"
Expand Down
61 changes: 53 additions & 8 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2132,7 +2132,8 @@ mod latency_awareness {
use futures::{future::RemoteHandle, FutureExt};
use itertools::Either;
use scylla_cql::errors::{DbError, QueryError};
use tracing::{instrument::WithSubscriber, trace};
use tokio::time::{Duration, Instant};
use tracing::{instrument::WithSubscriber, trace, warn};
use uuid::Uuid;

use crate::{load_balancing::NodeRef, transport::node::Node};
Expand All @@ -2143,7 +2144,6 @@ mod latency_awareness {
atomic::{AtomicU64, Ordering},
Arc, RwLock,
},
time::{Duration, Instant},
};

#[derive(Debug)]
Expand All @@ -2168,7 +2168,7 @@ mod latency_awareness {
}
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct TimestampedAverage {
pub(super) timestamp: Instant,
pub(super) average: Duration,
Expand All @@ -2190,15 +2190,41 @@ mod latency_awareness {
timestamp: now,
}),
Some(prev_avg) => Some({
let delay = (now - prev_avg.timestamp).as_secs_f64();
let delay = now
.saturating_duration_since(prev_avg.timestamp)
.as_secs_f64();
let scaled_delay = delay / scale_secs;
let prev_weight = (scaled_delay + 1.).ln() / scaled_delay;
let prev_weight = if scaled_delay <= 0. {
1.
} else {
(scaled_delay + 1.).ln() / scaled_delay
};

let last_latency_secs = last_latency.as_secs_f64();
let prev_avg_secs = prev_avg.average.as_secs_f64();
let average = Duration::from_secs_f64(
let average = match Duration::try_from_secs_f64(
(1. - prev_weight) * last_latency_secs + prev_weight * prev_avg_secs,
);
) {
Ok(ts) => ts,
Err(e) => {
warn!(
"Error while calculating average: {e}. \
prev_avg_secs: {prev_avg_secs}, \
last_latency_secs: {last_latency_secs}, \
prev_weight: {prev_weight}, \
scaled_delay: {scaled_delay}, \
delay: {delay}, \
prev_avg.timestamp: {:?}, \
now: {now:?}",
prev_avg.timestamp
);

// Not sure when we could enter this branch,
// so I have no idea what would be a sensible value to return here,
// this does not seem like a very bad choice.
prev_avg.average
}
};
Self {
num_measures: prev_avg.num_measures + 1,
timestamp: now,
Expand Down Expand Up @@ -2733,7 +2759,7 @@ mod latency_awareness {
},
ExecutionProfile,
};
use std::time::Instant;
use tokio::time::Instant;

trait DefaultPolicyTestExt {
fn set_nodes_latency_stats(
Expand Down Expand Up @@ -3453,5 +3479,24 @@ mod latency_awareness {

session.query("whatever", ()).await.unwrap_err();
}

#[tokio::test]
async fn timestamped_average_works_when_clock_stops() {
tokio::time::pause();
let avg = Some(TimestampedAverage {
timestamp: Instant::now(),
average: Duration::from_secs(123),
num_measures: 1,
});
let new_avg = TimestampedAverage::compute_next(avg, Duration::from_secs(456), 10.0);
assert_eq!(
new_avg,
Some(TimestampedAverage {
timestamp: Instant::now(),
average: Duration::from_secs(123),
num_measures: 2,
}),
);
}
}
}

0 comments on commit c2746af

Please sign in to comment.