diff --git a/Cargo.lock b/Cargo.lock index e0be045..d379a54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,6 +135,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "assert_approx_eq" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c07dab4369547dbe5114677b33fbbf724971019f3818172d59a97a61c774ffd" + [[package]] name = "async-trait" version = "0.1.83" @@ -497,6 +503,12 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "err-derive" version = "0.3.1" @@ -613,6 +625,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -658,6 +676,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "hashbrown" version = "0.14.5" @@ -751,6 +775,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "indexmap" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -815,6 +849,7 @@ name = "latte-cli" version = "0.28.0-scylladb" dependencies = [ "anyhow", + "assert_approx_eq", "base64 0.22.1", "bytes", "chrono", @@ -829,15 +864,18 @@ dependencies = [ "jemallocator", "lazy_static", "metrohash", + "more-asserts", "num_cpus", "openssl", "parse_duration", "pin-project", "plotters", "rand", + "rand_distr", "regex", "rmp", "rmp-serde", + "rstest", "rune", "rust-embed", "scylla", @@ -960,6 +998,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "more-asserts" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fafa6961cabd9c63bcd77a45d7e3b7f3b552b70417831fb0f56db717e72407e" + [[package]] name = "musli" version = "0.0.42" @@ -1375,6 +1419,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "proc-macro-crate" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1545,6 +1598,12 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "rmp" version = "0.8.14" @@ -1567,6 +1626,36 @@ dependencies = [ "serde", ] +[[package]] +name = "rstest" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b423f0e62bdd61734b67cd21ff50871dfaeb9cc74f869dcd6af974fbcb19936" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e1711e7d14f74b12a58411c542185ef7fb7f2e7f8ee6e2940a883628522b42" +dependencies = [ + "cfg-if", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.79", + "unicode-ident", +] + [[package]] name = "rune" version = "0.13.4" @@ -1679,6 +1768,15 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -1783,6 +1881,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5145b2263bec888224d054d1c820ceffa7d4a23723a2a822f970fcf1c5b64770" +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.210" @@ -2105,6 +2209,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" + +[[package]] +name = "toml_edit" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +dependencies = [ + "indexmap", + "toml_datetime", + "winnow", +] + [[package]] name = "tracing" version = "0.1.40" @@ -2464,6 +2585,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + [[package]] name = "zerocopy" version = "0.7.35" diff --git a/Cargo.toml b/Cargo.toml index 1325330..80418db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,12 +30,14 @@ itertools = "0.13" jemallocator = "0.5" lazy_static = "1.4.0" metrohash = "1.0" +more-asserts = "0.3" num_cpus = "1.13.0" openssl = "0.10.38" parse_duration = "2.1.1" pin-project = "1.1" plotters = { version = "0.3", default-features = false, features = ["line_series", "svg_backend", "full_palette"]} rand = { version = "0.8", default-features = false, features = ["small_rng", "std"] } +rand_distr = "0.4" regex = "1.5" rune = "0.13" rust-embed = "8" @@ -58,6 +60,8 @@ uuid = { version = "1.1", features = ["v4"] } walkdir = "2" [dev-dependencies] +assert_approx_eq = "1" +rstest = "0.22" tokio = { version = "1", features = ["rt", "test-util", "macros"] } [profile.release] diff --git a/src/autocorrelation.rs b/src/autocorrelation.rs new file mode 100644 index 0000000..9155b9a --- /dev/null +++ b/src/autocorrelation.rs @@ -0,0 +1,275 @@ +use more_asserts::assert_le; +use rand_distr::num_traits::Pow; + +/// Estimates the effective size of the sample, by taking account for +/// autocorrelation between measurements. +/// +/// In most statistical operations we assume measurements to be independent of each other. +/// However, in reality time series data are often autocorrelated, which means measurements +/// in near proximity affect each other. A single performance fluctuation of the measured system +/// can affect multiple measurements one after another. This effect decreases the effective +/// sample size and increases measurement uncertainty. +/// +/// The algorithm used for computing autocorrelation matrix is quite inaccurate as it doesn't compute +/// the full covariance matrix, but approximates it by pre-merging data points. +/// However, it is fairly fast (O(n log log n) and works in O(log n) memory incrementally. +#[derive(Clone, Debug, Default)] +pub struct EffectiveSampleSizeEstimator { + n: u64, + levels: Vec, +} + +#[derive(Clone, Debug)] +struct Level { + level: usize, + buf: Vec, + variance: VarianceEstimator, +} + +/// Estimates the effective sample size by using batch means method. +impl EffectiveSampleSizeEstimator { + /// Adds a single data point + pub fn record(&mut self, x: f64) { + self.n += 1; + self.insert(x, 0); + } + + /// Merges another estimator data into this one + pub fn add(&mut self, other: &EffectiveSampleSizeEstimator) { + self.n += other.n; + for level in &other.levels { + self.add_level(level); + } + } + + pub fn clear(&mut self) { + self.n = 0; + self.levels.clear(); + } + + fn insert(&mut self, x: f64, level: usize) { + if self.levels.len() == level { + self.levels.push(Level::new(level)); + } + if let Some(carry) = self.levels[level].record(x) { + self.insert(carry, level + 1); + } + } + + fn add_level(&mut self, level: &Level) { + if self.levels.len() == level.level { + self.levels.push(level.clone()); + } else if let Some(carry) = self.levels[level.level].add(level) { + self.insert(carry, level.level + 1); + } + } + + pub fn effective_sample_size(&self) -> u64 { + if self.n <= 1 { + return self.n; + } + + assert!(!self.levels.is_empty()); + + // Autocorrelation time can be estimated as: + // size_of_the_batch * variance_of_the_batch_mean / variance_of_the_whole_sample + // We can technically compute it from any level but there are some constraints: + // - the batch size must be greater than the autocorrelation time + // - the number of batches should be also large enough for the variance + // of the mean be accurate + let sample_variance = self.levels[0].variance.value(); + let autocorrelation_time = self + .levels + .iter() + .map(|l| { + ( + l.batch_len(), + l.batch_len() as f64 * l.variance.value() / sample_variance, + ) + }) + .take_while(|(batch_len, time)| *time > 0.2 * *batch_len as f64) + .map(|(_, time)| time) + .reduce(f64::max) + .unwrap(); + + (self.n as f64 / autocorrelation_time) as u64 + } +} + +impl Level { + fn new(level: usize) -> Self { + Level { + level, + buf: Vec::with_capacity(2), + variance: Default::default(), + } + } + + fn batch_len(&self) -> usize { + 1 << self.level + } + + fn record(&mut self, value: f64) -> Option { + self.variance.record(value); + self.buf.push(value); + self.merge() + } + + fn add(&mut self, other: &Level) -> Option { + assert_eq!(self.level, other.level); + self.variance.add(&other.variance); + // If there was more than 1 item recorded by the other level, then we must + // drop our queued item, because it is not a neighbour of the other item + if other.variance.n > 1 { + self.buf.clear(); + } + self.buf.extend(&other.buf); + assert_le!(self.buf.len(), 2); + self.merge() + } + + fn merge(&mut self) -> Option { + if self.buf.len() == 2 { + let merged = (self.buf[0] + self.buf[1]) / 2.0; + self.buf.clear(); + Some(merged) + } else { + None + } + } +} + +#[derive(Clone, Debug, Default)] +struct VarianceEstimator { + mean: f64, + var: f64, + n: u64, +} + +/// Incrementally estimates covariance of two random variables X and Y. +/// Uses Welford's online algorithm. +impl VarianceEstimator { + pub fn record(&mut self, x: f64) { + self.n += 1; + let delta1 = x - self.mean; + self.mean += delta1 / self.n as f64; + let delta2 = x - self.mean; + self.var += delta1 * delta2; + } + + pub fn add(&mut self, other: &VarianceEstimator) { + let n1 = self.n as f64; + let n2 = other.n as f64; + let m1 = self.mean; + let m2 = other.mean; + let new_mean = (m1 * n1 + m2 * n2) / (n1 + n2); + + self.n += other.n; + self.mean = new_mean; + self.var = self.var + other.var + (m1 - new_mean).pow(2) * n1 + (m2 - new_mean).pow(2) * n2; + } + + pub fn value(&self) -> f64 { + if self.n <= 1 { + f64::NAN + } else { + self.var / (self.n - 1) as f64 + } + } +} + +#[cfg(test)] +mod test { + use crate::autocorrelation::{EffectiveSampleSizeEstimator, VarianceEstimator}; + use assert_approx_eq::assert_approx_eq; + use more_asserts::{assert_gt, assert_le}; + use rand::rngs::SmallRng; + use rand::{Rng, SeedableRng}; + use rstest::rstest; + + #[test] + fn test_random() { + let mut rng = SmallRng::seed_from_u64(4); + let mut estimator = EffectiveSampleSizeEstimator::default(); + const N: u64 = 1000; + for _ in 0..N { + estimator.record(rng.gen()); + } + assert_gt!(estimator.effective_sample_size(), N / 2); + assert_le!(estimator.effective_sample_size(), N); + } + + #[rstest] + #[case(100, 2)] + #[case(100, 4)] + #[case(100, 10)] + #[case(100, 16)] + #[case(100, 50)] + #[case(100, 100)] + #[case(100, 256)] + #[case(10, 1000)] + #[case(10, 10000)] + fn test_correlated(#[case] n: u64, #[case] cluster_size: usize) { + let mut rng = SmallRng::seed_from_u64(1); + let mut estimator = EffectiveSampleSizeEstimator::default(); + for _ in 0..n { + let v = rng.gen(); + for _ in 0..cluster_size { + estimator.record(v); + } + } + assert_gt!(estimator.effective_sample_size(), n / 2); + assert_le!(estimator.effective_sample_size(), n * 2); + } + + #[test] + fn test_merge_variances() { + const COUNT: usize = 1000; + let mut rng = SmallRng::seed_from_u64(1); + let data: Vec<_> = (0..COUNT) + .map(|i| 0.001 * i as f64 + rng.gen::()) + .collect(); + let mut est = VarianceEstimator::default(); + data.iter().for_each(|x| est.record(*x)); + + let (sub1, sub2) = data.split_at(COUNT / 3); + let mut sub_est1 = VarianceEstimator::default(); + let mut sub_est2 = VarianceEstimator::default(); + sub1.iter().for_each(|x| sub_est1.record(*x)); + sub2.iter().for_each(|x| sub_est2.record(*x)); + + let mut est2 = VarianceEstimator::default(); + est2.add(&sub_est1); + est2.add(&sub_est2); + + assert_approx_eq!(est.value(), est2.value(), 0.00001); + } + + #[test] + fn test_merge_ess() { + const COUNT: usize = 10000; + let mut rng = SmallRng::seed_from_u64(1); + let mut data = Vec::new(); + data.extend((0..COUNT / 2).map(|_| rng.gen::())); + data.extend((0..COUNT / 2).map(|_| rng.gen::() + 0.2)); + + let mut est = EffectiveSampleSizeEstimator::default(); + data.iter().for_each(|x| est.record(*x)); + + let (sub1, sub2) = data.split_at(COUNT / 3); + let mut sub_est1 = EffectiveSampleSizeEstimator::default(); + let mut sub_est2 = EffectiveSampleSizeEstimator::default(); + sub1.iter().for_each(|x| sub_est1.record(*x)); + sub2.iter().for_each(|x| sub_est2.record(*x)); + + let mut est2 = EffectiveSampleSizeEstimator::default(); + est2.add(&sub_est1); + est2.add(&sub_est2); + + assert_approx_eq!( + est.effective_sample_size() as f64, + est2.effective_sample_size() as f64, + est.effective_sample_size() as f64 * 0.1 + ); + } +} diff --git a/src/bootstrap.rs b/src/bootstrap.rs deleted file mode 100644 index 4ddd812..0000000 --- a/src/bootstrap.rs +++ /dev/null @@ -1,66 +0,0 @@ -use hdrhistogram::Histogram; -use rand::distributions::weighted::alias_method::WeightedIndex; -use rand::distributions::Distribution; -use rand::rngs::ThreadRng; -use rand::thread_rng; - -/// Bootstraps a random population sample based on given distribution. -/// See: https://en.wikipedia.org/wiki/Bootstrapping_(statistics) -pub struct Bootstrap { - values: Vec, - index: WeightedIndex, - rng: ThreadRng, -} - -impl Bootstrap { - pub fn new(histogram: &Histogram) -> Bootstrap { - let mut values = Vec::new(); - let mut weights = Vec::new(); - for v in histogram.iter_recorded() { - values.push(histogram.median_equivalent(v.value_iterated_to())); - weights.push(v.count_since_last_iteration()); - } - Bootstrap { - values, - index: WeightedIndex::new(weights).unwrap(), - rng: thread_rng(), - } - } - - pub fn sample(&mut self) -> u64 { - self.values[self.index.sample(&mut self.rng)] - } -} - -#[cfg(test)] -mod test { - use super::*; - use statrs::statistics::Mean; - use statrs::statistics::Statistics; - - #[test] - fn mean() { - let mut h1 = Histogram::::new(3).unwrap(); - for i in 10..100001 { - h1.record(i); - } - //h1.record(100000); - - let mean = h1.mean(); - println!("Mean: {}", mean); - println!("Stderr: {}", h1.stdev() / (h1.len() as f64).sqrt()); - - let mut bootstrap = Bootstrap::new(&h1); - let mut means = Vec::new(); - for j in 0..10 { - let mut h = Vec::with_capacity(100000); - for i in 0..100000 { - h.push(bootstrap.sample() as f64); - } - means.push(h.mean()); - } - - println!("Means mean: {}", means.iter().mean()); - println!("Means stddev: {}", means.iter().std_dev()); - } -} diff --git a/src/context.rs b/src/context.rs index b12ee39..77e0e85 100644 --- a/src/context.rs +++ b/src/context.rs @@ -11,7 +11,6 @@ use std::sync::Arc; use bytes::Bytes; use chrono::Utc; -use hdrhistogram::Histogram; use itertools::{enumerate, Itertools}; use metrohash::{MetroHash128, MetroHash64}; use openssl::error::ErrorStack; @@ -43,6 +42,7 @@ use try_lock::TryLock; use uuid::{Variant, Version}; use crate::config::{ConnectionConf, RetryInterval, PRINT_RETRY_ERROR_LIMIT}; +use crate::latency::LatencyDistributionRecorder; use crate::LatteError; fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> { @@ -330,7 +330,7 @@ pub struct SessionStats { pub row_count: u64, pub queue_length: u64, pub mean_queue_length: f32, - pub resp_times_ns: Histogram, + pub resp_times_ns: LatencyDistributionRecorder, } impl SessionStats { @@ -354,8 +354,7 @@ impl SessionStats { rs: &Result, ) { self.queue_length -= 1; - let duration_ns = duration.as_nanos().clamp(1, u64::MAX as u128) as u64; - self.resp_times_ns.record(duration_ns).unwrap(); + self.resp_times_ns.record(duration); self.req_count += 1; match rs { Ok(rs) => { @@ -405,7 +404,7 @@ impl Default for SessionStats { row_count: 0, queue_length: 0, mean_queue_length: 0.0, - resp_times_ns: Histogram::new(3).unwrap(), + resp_times_ns: LatencyDistributionRecorder::default(), } } } diff --git a/src/latency.rs b/src/latency.rs new file mode 100644 index 0000000..a4b18b0 --- /dev/null +++ b/src/latency.rs @@ -0,0 +1,66 @@ +use crate::autocorrelation::EffectiveSampleSizeEstimator; +use crate::histogram::SerializableHistogram; +use crate::percentiles::Percentiles; +use crate::stats::Mean; +use hdrhistogram::Histogram; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +/// Captures latency mean and percentiles, with uncertainty estimates. +#[derive(Serialize, Deserialize, Debug)] +pub struct LatencyDistribution { + pub mean: Mean, + pub percentiles: Percentiles, + pub histogram: SerializableHistogram, +} + +/// Builds TimeDistribution from a stream of durations. +#[derive(Clone, Debug)] +pub struct LatencyDistributionRecorder { + histogram_ns: Histogram, + ess_estimator: EffectiveSampleSizeEstimator, +} + +impl LatencyDistributionRecorder { + pub fn record(&mut self, time: Duration) { + self.histogram_ns + .record(time.as_nanos().clamp(1, u64::MAX as u128) as u64) + .unwrap(); + self.ess_estimator.record(time.as_secs_f64()); + } + + pub fn add(&mut self, other: &LatencyDistributionRecorder) { + self.histogram_ns.add(&other.histogram_ns).unwrap(); + self.ess_estimator.add(&other.ess_estimator); + } + + pub fn clear(&mut self) { + self.histogram_ns.clear(); + self.ess_estimator.clear(); + } + + pub fn distribution(&self) -> LatencyDistribution { + LatencyDistribution { + mean: Mean::from(&self.histogram_ns, 1e-6, 1), + percentiles: Percentiles::compute(&self.histogram_ns, 1e-6), + histogram: SerializableHistogram(self.histogram_ns.clone()), + } + } + pub fn distribution_with_errors(&self) -> LatencyDistribution { + let ess = self.ess_estimator.effective_sample_size(); + LatencyDistribution { + mean: Mean::from(&self.histogram_ns, 1e-6, ess), + percentiles: Percentiles::compute_with_errors(&self.histogram_ns, 1e-6, ess), + histogram: SerializableHistogram(self.histogram_ns.clone()), + } + } +} + +impl Default for LatencyDistributionRecorder { + fn default() -> Self { + Self { + histogram_ns: Histogram::new(3).unwrap(), + ess_estimator: Default::default(), + } + } +} diff --git a/src/main.rs b/src/main.rs index 631a204..e77ecdf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,6 +39,7 @@ use crate::stats::{BenchmarkCmp, BenchmarkStats, Recorder}; use crate::table::{Alignment, Table}; use crate::workload::{FnRef, Program, Workload, WorkloadStats, LOAD_FN}; +mod autocorrelation; mod chunks; mod config; mod context; @@ -46,6 +47,8 @@ mod cycle; mod error; mod exec; mod histogram; +mod latency; +mod percentiles; mod plot; mod progress; mod report; @@ -468,13 +471,13 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> { let interval_start_time = Duration::from_millis((sample.time_s * 1000.0) as u64); let interval_duration = Duration::from_millis((sample.duration_s * 1000.0) as u64); log_writer.write_histogram( - &sample.cycle_time_histograms_ns.0, + &sample.cycle_latency.histogram.0, interval_start_time, interval_duration, Tag::new(format!("{tag_prefix}cycles").as_str()), )?; log_writer.write_histogram( - &sample.resp_time_histogram_ns.0, + &sample.request_latency.histogram.0, interval_start_time, interval_duration, Tag::new(format!("{tag_prefix}requests").as_str()), diff --git a/src/percentiles.rs b/src/percentiles.rs new file mode 100644 index 0000000..571da30 --- /dev/null +++ b/src/percentiles.rs @@ -0,0 +1,195 @@ +use crate::stats::Mean; +use hdrhistogram::Histogram; +use rand::rngs::SmallRng; +use rand::{thread_rng, Rng, SeedableRng}; +use serde::{Deserialize, Serialize}; +use statrs::statistics::Statistics; +use strum::{EnumCount, EnumIter, IntoEnumIterator}; + +#[allow(non_camel_case_types)] +#[derive(Copy, Clone, EnumIter, EnumCount)] +pub enum Percentile { + Min = 0, + P1, + P2, + P5, + P10, + P25, + P50, + P75, + P90, + P95, + P98, + P99, + P99_9, + P99_99, + Max, +} + +impl Percentile { + pub fn value(&self) -> f64 { + match self { + Percentile::Min => 0.0, + Percentile::P1 => 1.0, + Percentile::P2 => 2.0, + Percentile::P5 => 5.0, + Percentile::P10 => 10.0, + Percentile::P25 => 25.0, + Percentile::P50 => 50.0, + Percentile::P75 => 75.0, + Percentile::P90 => 90.0, + Percentile::P95 => 95.0, + Percentile::P98 => 98.0, + Percentile::P99 => 99.0, + Percentile::P99_9 => 99.9, + Percentile::P99_99 => 99.99, + Percentile::Max => 100.0, + } + } + + pub fn name(&self) -> &'static str { + match self { + Percentile::Min => " Min ", + Percentile::P1 => " 1 ", + Percentile::P2 => " 2 ", + Percentile::P5 => " 5 ", + Percentile::P10 => " 10 ", + Percentile::P25 => " 25 ", + Percentile::P50 => " 50 ", + Percentile::P75 => " 75 ", + Percentile::P90 => " 90 ", + Percentile::P95 => " 95 ", + Percentile::P98 => " 98 ", + Percentile::P99 => " 99 ", + Percentile::P99_9 => " 99.9 ", + Percentile::P99_99 => " 99.99", + Percentile::Max => " Max ", + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Percentiles([Mean; Percentile::COUNT]); + +impl Percentiles { + const POPULATION_SIZE: usize = 100; + + /// Computes distribution percentiles without errors. + /// Fast. + pub fn compute(histogram: &Histogram, scale: f64) -> Percentiles { + let mut result = Vec::with_capacity(Percentile::COUNT); + for p in Percentile::iter() { + result.push(Mean { + n: Self::POPULATION_SIZE as u64, + value: histogram.value_at_percentile(p.value()) as f64 * scale, + std_err: None, + }); + } + assert_eq!(result.len(), Percentile::COUNT); + Percentiles(result.try_into().unwrap()) + } + + /// Computes distribution percentiles with errors based on a HDR histogram. + /// Caution: this is slow. Don't use it when benchmark is running! + /// Errors are estimated by bootstrapping a larger population of histograms from the + /// distribution determined by the original histogram and computing the standard error. + pub fn compute_with_errors( + histogram: &Histogram, + scale: f64, + effective_sample_size: u64, + ) -> Percentiles { + let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); + + let mut samples: Vec<[f64; Percentile::COUNT]> = Vec::with_capacity(Self::POPULATION_SIZE); + for _ in 0..Self::POPULATION_SIZE { + samples.push(percentiles( + &bootstrap(&mut rng, histogram, effective_sample_size), + scale, + )) + } + + let mut result = Vec::with_capacity(Percentile::COUNT); + for p in Percentile::iter() { + let std_err = samples.iter().map(|s| s[p as usize]).std_dev(); + result.push(Mean { + n: Self::POPULATION_SIZE as u64, + value: histogram.value_at_percentile(p.value()) as f64 * scale, + std_err: Some(std_err), + }); + } + + assert_eq!(result.len(), Percentile::COUNT); + Percentiles(result.try_into().unwrap()) + } + + pub fn get(&self, percentile: Percentile) -> Mean { + self.0[percentile as usize] + } +} + +/// Creates a new random histogram using another histogram as the distribution. +fn bootstrap(rng: &mut impl Rng, histogram: &Histogram, effective_n: u64) -> Histogram { + let n = histogram.len(); + if n <= 1 { + return histogram.clone(); + } + let mut result = + Histogram::new_with_bounds(histogram.low(), histogram.high(), histogram.sigfig()).unwrap(); + + for bucket in histogram.iter_recorded() { + let p = bucket.count_at_value() as f64 / n as f64; + assert!(p > 0.0, "Probability must be greater than 0.0"); + let b = rand_distr::Binomial::new(effective_n, p).unwrap(); + let count: u64 = rng.sample(b); + result.record_n(bucket.value_iterated_to(), count).unwrap() + } + result +} + +fn percentiles(hist: &Histogram, scale: f64) -> [f64; Percentile::COUNT] { + let mut percentiles = [0.0; Percentile::COUNT]; + for (i, p) in Percentile::iter().enumerate() { + percentiles[i] = hist.value_at_percentile(p.value()) as f64 * scale; + } + percentiles +} + +#[cfg(test)] +mod test { + use crate::percentiles::{Percentile, Percentiles}; + use assert_approx_eq::assert_approx_eq; + use hdrhistogram::Histogram; + use rand::{thread_rng, Rng}; + use statrs::distribution::Uniform; + + #[test] + fn test_zero_error() { + let mut histogram = Histogram::::new(3).unwrap(); + for _ in 0..100000 { + histogram.record(1000).unwrap(); + } + + let percentiles = Percentiles::compute_with_errors(&histogram, 1e-6, histogram.len()); + let median = percentiles.get(Percentile::P50); + assert_approx_eq!(median.value, 0.001, 0.00001); + assert_approx_eq!(median.std_err.unwrap(), 0.000, 1e-15); + } + + #[test] + fn test_min_max_error() { + let mut histogram = Histogram::::new(3).unwrap(); + let d = Uniform::new(0.0, 1000.0).unwrap(); + const N: usize = 100000; + for _ in 0..N { + histogram + .record(thread_rng().sample(d).round() as u64) + .unwrap(); + } + + let percentiles = Percentiles::compute_with_errors(&histogram, 1e-6, histogram.len()); + let min = percentiles.get(Percentile::Min); + let max = percentiles.get(Percentile::Max); + assert!(min.std_err.unwrap() < max.value / N as f64); + assert!(max.std_err.unwrap() < max.value / N as f64); + } +} diff --git a/src/plot.rs b/src/plot.rs index 5306fee..9252480 100644 --- a/src/plot.rs +++ b/src/plot.rs @@ -260,7 +260,7 @@ fn resp_time_series(report: &Report, color_index: usize, percentiles: &[f64]) -> for (i, p) in percentiles.iter().enumerate() { let time = s.time_s; let resp_time_ms = - s.resp_time_histogram_ns.0.value_at_percentile(*p) as f32 / 1_000_000.0; + s.request_latency.histogram.0.value_at_percentile(*p) as f32 / 1_000_000.0; series[i].data.push((time, resp_time_ms)); } } diff --git a/src/report.rs b/src/report.rs index 956fe33..447b982 100644 --- a/src/report.rs +++ b/src/report.rs @@ -1,7 +1,6 @@ use crate::config::{RunCommand, PRINT_RETRY_ERROR_LIMIT, WeightedFunction}; -use crate::stats::{ - BenchmarkCmp, BenchmarkStats, Bucket, Mean, Percentile, Sample, Significance, TimeDistribution, -}; +use crate::percentiles::Percentile; +use crate::stats::{BenchmarkCmp, BenchmarkStats, Mean, Sample, Significance}; use crate::table::Row; use chrono::{DateTime, Local, TimeZone}; use console::{pad_str, style, Alignment}; @@ -9,7 +8,6 @@ use core::fmt; use err_derive::*; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use statrs::statistics::Statistics; use std::collections::BTreeSet; use std::fmt::{Display, Formatter}; use std::io::{BufReader, BufWriter}; @@ -85,14 +83,14 @@ impl Report { throughput: self.result.cycle_throughput.value, latency_p50: self .result - .resp_time_ms + .request_latency .as_ref() - .map(|t| t.percentiles[Percentile::P50 as usize].value), + .map(|t| t.percentiles.get(Percentile::P50).value), latency_p99: self .result - .resp_time_ms + .request_latency .as_ref() - .map(|t| t.percentiles[Percentile::P99 as usize].value), + .map(|t| t.percentiles.get(Percentile::P99).value), } } } @@ -161,18 +159,6 @@ impl From> for Quantity { } } -impl From<&TimeDistribution> for Quantity { - fn from(td: &TimeDistribution) -> Self { - Quantity::from(td.mean) - } -} - -impl From<&Option> for Quantity { - fn from(td: &Option) -> Self { - Quantity::from(td.as_ref().map(|td| td.mean)) - } -} - impl Display for Quantity { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match (&self.value, self.precision) { @@ -632,14 +618,14 @@ impl Display for Sample { self.cycle_count, self.cycle_error_count, self.cycle_throughput, - self.cycle_time_percentiles[Percentile::Min as usize], - self.cycle_time_percentiles[Percentile::P50 as usize], - self.cycle_time_percentiles[Percentile::P75 as usize], - self.cycle_time_percentiles[Percentile::P90 as usize], - self.cycle_time_percentiles[Percentile::P95 as usize], - self.cycle_time_percentiles[Percentile::P99 as usize], - self.cycle_time_percentiles[Percentile::P99_9 as usize], - self.cycle_time_percentiles[Percentile::Max as usize] + self.cycle_latency.percentiles.get(Percentile::Min).value, + self.cycle_latency.percentiles.get(Percentile::P50).value, + self.cycle_latency.percentiles.get(Percentile::P75).value, + self.cycle_latency.percentiles.get(Percentile::P90).value, + self.cycle_latency.percentiles.get(Percentile::P95).value, + self.cycle_latency.percentiles.get(Percentile::P99).value, + self.cycle_latency.percentiles.get(Percentile::P99_9).value, + self.cycle_latency.percentiles.get(Percentile::Max).value ) } } @@ -697,18 +683,18 @@ impl<'a> Display for BenchmarkCmp<'a> { self.line("└─", "row/req", |s| { Quantity::from(s.row_count_per_req).with_precision(1) }), - self.line("Samples", "", |s| Quantity::from(s.samples_count)), + // self.line("Samples", "", |s| Quantity::from(s.samples_count)), ]; if self.v1.log.len() > 1 { let summary_part2: Vec> = vec![ - self.line("Mean sample size", "op", |s| { - Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean()) - .with_precision(0) - }), - self.line("└─", "req", |s| { - Quantity::from(s.log.iter().map(|s| s.request_count as f64).mean()) - .with_precision(0) - }), + // self.line("Mean sample size", "op", |s| { + // Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean()) + // .with_precision(0) + // }), + // self.line("└─", "req", |s| { + // Quantity::from(s.log.iter().map(|s| s.request_count as f64).mean()) + // .with_precision(0) + // }), self.line("Concurrency", "req", |s| { Quantity::from(s.concurrency).with_precision(0) }), @@ -733,14 +719,18 @@ impl<'a> Display for BenchmarkCmp<'a> { .with_significance(self.cmp_row_throughput()) .with_orientation(1) .into_box(), - self.line("Mean cycle time", "ms", |s| { - Quantity::from(&s.cycle_time_ms).with_precision(3) + // self.line("Mean cycle time", "ms", |s| { + self.line("Cycle latency", "ms", |s| { + // Quantity::from(&s.cycle_time_ms).with_precision(3) + Quantity::from(s.cycle_latency.mean).with_precision(3) }) .with_significance(self.cmp_mean_resp_time()) .with_orientation(-1) .into_box(), - self.line("Mean resp. time", "ms", |s| { - Quantity::from(s.resp_time_ms.as_ref().map(|rt| rt.mean)).with_precision(3) + // self.line("Mean resp. time", "ms", |s| { + self.line("Request latency", "ms", |s| { + // Quantity::from(s.resp_time_ms.as_ref().map(|rt| rt.mean)).with_precision(3) + Quantity::from(s.request_latency.as_ref().map(|rt| rt.mean)).with_precision(3) }) .with_significance(self.cmp_mean_resp_time()) .with_orientation(-1) @@ -752,7 +742,8 @@ impl<'a> Display for BenchmarkCmp<'a> { writeln!(f, "{l}")?; } - if self.v1.request_count > 0 && self.v1.log.len() > 1 { + // if self.v1.request_count > 0 && self.v1.log.len() > 1 { + if self.v1.log.len() > 1 { let resp_time_percentiles = [ Percentile::Min, Percentile::P25, @@ -766,8 +757,9 @@ impl<'a> Display for BenchmarkCmp<'a> { Percentile::P99_99, Percentile::Max, ]; + writeln!(f)?; - writeln!(f, "{}", fmt_section_header("RESPONSE TIMES [ms]"))?; + writeln!(f, "{}", fmt_section_header("CYCLE LATENCY [ms]"))?; if self.v2.is_some() { writeln!(f, "{}", fmt_cmp_header(true))?; } @@ -775,45 +767,13 @@ impl<'a> Display for BenchmarkCmp<'a> { for p in resp_time_percentiles.iter() { let l = self .line(p.name(), "", |s| { - let rt = s - .resp_time_ms - .as_ref() - .map(|rt| rt.percentiles[*p as usize]); + let rt = s.request_latency.as_ref().map(|rt| rt.percentiles.get(*p)); Quantity::from(rt).with_precision(3) }) .with_orientation(-1) .with_significance(self.cmp_resp_time_percentile(*p)); writeln!(f, "{l}")?; } - - writeln!(f)?; - writeln!(f, "{}", fmt_section_header("RESPONSE TIME DISTRIBUTION"))?; - writeln!(f, "{}", style("── Resp. time [ms] ── ────────────────────────────────────────────── Count ────────────────────────────────────────────────").yellow().bold().for_stdout())?; - let zero = Bucket { - percentile: 0.0, - duration_ms: 0.0, - count: 0, - cumulative_count: 0, - }; - let dist = &self.v1.resp_time_ms.as_ref().unwrap().distribution; - let max_count = dist.iter().map(|b| b.count).max().unwrap_or(1); - for (low, high) in ([zero].iter().chain(dist)).tuple_windows() { - writeln!( - f, - "{:8.1} {} {:8.1} {:9} {:6.2}% {}", - style(low.duration_ms).yellow().for_stdout(), - style("...").yellow().for_stdout(), - style(high.duration_ms).yellow().for_stdout(), - high.count, - high.percentile - low.percentile, - style("▪".repeat((82 * high.count / max_count) as usize)) - .dim() - .for_stdout() - )?; - if high.cumulative_count == self.v1.request_count { - break; - } - } } if self.v1.error_count > 0 { diff --git a/src/stats.rs b/src/stats.rs index ed22f83..14b294d 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -4,15 +4,13 @@ use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::time::{Instant, SystemTime}; +use crate::latency::{LatencyDistribution, LatencyDistributionRecorder}; +use crate::percentiles::Percentile; +use crate::workload::WorkloadStats; use cpu_time::ProcessTime; use hdrhistogram::Histogram; use serde::{Deserialize, Serialize}; use statrs::distribution::{ContinuousCDF, StudentsT}; -use strum::IntoEnumIterator; -use strum::{EnumCount, EnumIter}; - -use crate::histogram::SerializableHistogram; -use crate::workload::WorkloadStats; /// Controls the maximum order of autocovariance taken into /// account when estimating the long run mean error. Higher values make the estimator @@ -97,14 +95,6 @@ pub fn long_run_err(mean: f64, values: &[f32], weights: &[f32]) -> f64 { (long_run_variance(mean, values, weights) / values.len() as f64).sqrt() } -fn percentiles_ms(hist: &Histogram) -> [f32; Percentile::COUNT] { - let mut percentiles = [0.0; Percentile::COUNT]; - for (i, p) in Percentile::iter().enumerate() { - percentiles[i] = hist.value_at_percentile(p.value()) as f32 / 1000000.0; - } - percentiles -} - /// Holds a mean and its error together. /// Makes it more convenient to compare means and it also reduces the number /// of fields, because we don't have to keep the values and the errors in separate fields. @@ -124,6 +114,18 @@ impl Mean { std_err: not_nan(long_run_err(m, v, weights)), } } + + pub fn from(h: &Histogram, scale: f64, effective_n: u64) -> Mean { + Mean { + n: effective_n, + value: h.mean() * scale, + std_err: if effective_n > 1 { + Some(h.stdev() * scale / (effective_n as f64 - 1.0).sqrt()) + } else { + None + }, + } + } } /// Returns the probability that the difference between two means is due to a chance. @@ -160,21 +162,6 @@ pub fn t_test(mean1: &Mean, mean2: &Mean) -> f64 { } } -fn distribution(hist: &Histogram) -> Vec { - let mut result = Vec::new(); - if !hist.is_empty() { - for x in hist.iter_log(100000, 2.15443469) { - result.push(Bucket { - percentile: x.percentile(), - duration_ms: x.value_iterated_to() as f64 / 1000000.0, - count: x.count_since_last_iteration(), - cumulative_count: x.count_at_value(), - }); - } - } - result -} - /// Converts NaN to None. fn not_nan(x: f64) -> Option { if x.is_nan() { @@ -195,68 +182,6 @@ fn not_nan_f32(x: f32) -> Option { const MAX_KEPT_ERRORS: usize = 10; -#[allow(non_camel_case_types)] -#[derive(Copy, Clone, EnumIter, EnumCount)] -pub enum Percentile { - Min = 0, - P1, - P2, - P5, - P10, - P25, - P50, - P75, - P90, - P95, - P98, - P99, - P99_9, - P99_99, - Max, -} - -impl Percentile { - pub fn value(&self) -> f64 { - match self { - Percentile::Min => 0.0, - Percentile::P1 => 1.0, - Percentile::P2 => 2.0, - Percentile::P5 => 5.0, - Percentile::P10 => 10.0, - Percentile::P25 => 25.0, - Percentile::P50 => 50.0, - Percentile::P75 => 75.0, - Percentile::P90 => 90.0, - Percentile::P95 => 95.0, - Percentile::P98 => 98.0, - Percentile::P99 => 99.0, - Percentile::P99_9 => 99.9, - Percentile::P99_99 => 99.99, - Percentile::Max => 100.0, - } - } - - pub fn name(&self) -> &'static str { - match self { - Percentile::Min => " Min ", - Percentile::P1 => " 1 ", - Percentile::P2 => " 2 ", - Percentile::P5 => " 5 ", - Percentile::P10 => " 10 ", - Percentile::P25 => " 25 ", - Percentile::P50 => " 50 ", - Percentile::P75 => " 75 ", - Percentile::P90 => " 90 ", - Percentile::P95 => " 95 ", - Percentile::P98 => " 98 ", - Percentile::P99 => " 99 ", - Percentile::P99_9 => " 99.9 ", - Percentile::P99_99 => " 99.99", - Percentile::Max => " Max ", - } - } -} - /// Records basic statistics for a sample (a group) of requests #[derive(Serialize, Deserialize, Debug)] pub struct Sample { @@ -274,24 +199,16 @@ pub struct Sample { pub cycle_throughput: f32, pub req_throughput: f32, pub row_throughput: f32, - pub mean_cycle_time_ms: f32, - pub mean_resp_time_ms: f32, - - pub cycle_time_percentiles: [f32; Percentile::COUNT], - pub cycle_time_histograms_ns: SerializableHistogram, - - pub cycle_time_percentiles_by_fn: HashMap, - pub cycle_time_histograms_ns_by_fn: HashMap, - pub resp_time_percentiles: [f32; Percentile::COUNT], - pub resp_time_histogram_ns: SerializableHistogram, + pub cycle_latency: LatencyDistribution, + pub cycle_latency_by_fn: HashMap, + pub request_latency: LatencyDistribution, } impl Sample { pub fn new(base_start_time: Instant, stats: &[WorkloadStats]) -> Sample { assert!(!stats.is_empty()); - let create_histogram = || SerializableHistogram(Histogram::new(3).unwrap()); let mut cycle_count = 0; let mut cycle_error_count = 0; let mut request_count = 0; @@ -303,9 +220,9 @@ impl Sample { let mut mean_queue_len = 0.0; let mut duration_s = 0.0; - let mut resp_times_ns = create_histogram(); - let mut cycle_times_ns = create_histogram(); - let mut cycle_times_ns_per_fn = HashMap::new(); + let mut request_latency = LatencyDistributionRecorder::default(); + let mut cycle_latency = LatencyDistributionRecorder::default(); + let mut cycle_latency_per_fn = HashMap::::new(); for s in stats { let ss = &s.session_stats; @@ -319,27 +236,18 @@ impl Sample { req_retry_count += ss.req_retry_count; mean_queue_len += ss.mean_queue_length / stats.len() as f32; duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32; - resp_times_ns.0.add(&ss.resp_times_ns).unwrap(); + request_latency.add(&ss.resp_times_ns); for fs in &s.function_stats { cycle_count += fs.call_count; cycle_error_count = fs.error_count; - cycle_times_ns.0.add(&fs.call_times_ns).unwrap(); - cycle_times_ns_per_fn + cycle_latency.add(&fs.cycle_latency); + cycle_latency_per_fn .entry(fs.function.name.clone()) - .or_insert_with(create_histogram) - .0 - .add(&fs.call_times_ns) - .unwrap(); + .or_default() + .add(&fs.cycle_latency); } } - let resp_time_percentiles = percentiles_ms(&resp_times_ns.0); - let cycle_time_percentiles = percentiles_ms(&cycle_times_ns.0); - - let mut cycle_time_percentiles_per_fn = HashMap::new(); - for (fn_name, histogram) in &cycle_times_ns_per_fn { - cycle_time_percentiles_per_fn.insert(fn_name.to_owned(), percentiles_ms(&histogram.0)); - } Sample { time_s: (stats[0].start_time - base_start_time).as_secs_f32(), @@ -353,17 +261,18 @@ impl Sample { req_error_count, row_count, mean_queue_len: not_nan_f32(mean_queue_len).unwrap_or(0.0), + cycle_throughput: cycle_count as f32 / duration_s, req_throughput: request_count as f32 / duration_s, row_throughput: row_count as f32 / duration_s, - mean_cycle_time_ms: cycle_times_ns.0.mean() as f32 / 1000000.0, - mean_resp_time_ms: resp_times_ns.0.mean() as f32 / 1000000.0, - resp_time_percentiles, - resp_time_histogram_ns: resp_times_ns, - cycle_time_percentiles, - cycle_time_percentiles_by_fn: cycle_time_percentiles_per_fn, - cycle_time_histograms_ns: cycle_times_ns, - cycle_time_histograms_ns_by_fn: cycle_times_ns_per_fn, + + cycle_latency: cycle_latency.distribution(), + cycle_latency_by_fn: cycle_latency_per_fn + .into_iter() + .map(|(k, v)| (k, v.distribution())) + .collect(), + + request_latency: request_latency.distribution(), } } } @@ -394,10 +303,6 @@ impl Log { self.samples.last().unwrap() } - fn weights_by_call_count(&self) -> Vec { - self.samples.iter().map(|s| s.cycle_count as f32).collect() - } - fn weights_by_request_count(&self) -> Vec { self.samples .iter() @@ -423,38 +328,6 @@ impl Log { Mean::compute(t.as_slice(), w.as_slice()) } - fn resp_time_ms(&self) -> Mean { - let t: Vec = self.samples.iter().map(|s| s.mean_resp_time_ms).collect(); - let w = self.weights_by_request_count(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn resp_time_percentile(&self, p: Percentile) -> Mean { - let t: Vec = self - .samples - .iter() - .map(|s| s.resp_time_percentiles[p as usize]) - .collect(); - let w = self.weights_by_request_count(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn cycle_time_ms(&self) -> Mean { - let t: Vec = self.samples.iter().map(|s| s.mean_cycle_time_ms).collect(); - let w = self.weights_by_call_count(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn cycle_time_percentile(&self, p: Percentile) -> Mean { - let t: Vec = self - .samples - .iter() - .map(|s| s.cycle_time_percentiles[p as usize]) - .collect(); - let w = self.weights_by_call_count(); - Mean::compute(t.as_slice(), w.as_slice()) - } - fn mean_concurrency(&self) -> Mean { let p: Vec = self.samples.iter().map(|s| s.mean_queue_len).collect(); let w = self.weights_by_request_count(); @@ -471,21 +344,6 @@ impl Log { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct Bucket { - pub percentile: f64, - pub duration_ms: f64, - pub count: u64, - pub cumulative_count: u64, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct TimeDistribution { - pub mean: Mean, - pub percentiles: Vec, - pub distribution: Vec, -} - /// Stores the final statistics of the test run. #[derive(Serialize, Deserialize, Debug)] pub struct BenchmarkStats { @@ -509,8 +367,9 @@ pub struct BenchmarkStats { pub cycle_throughput_ratio: Option, pub req_throughput: Mean, pub row_throughput: Mean, - pub cycle_time_ms: TimeDistribution, - pub resp_time_ms: Option, + pub cycle_latency: LatencyDistribution, + pub cycle_latency_by_fn: HashMap, + pub request_latency: Option, pub concurrency: Mean, pub concurrency_ratio: f64, pub log: Vec, @@ -564,13 +423,13 @@ impl BenchmarkCmp<'_> { // Checks if mean response time of two benchmark runs are significantly different. // Returns None if the second benchmark is unset. pub fn cmp_mean_resp_time(&self) -> Option { - self.cmp(|s| s.resp_time_ms.as_ref().map(|r| r.mean)) + self.cmp(|s| s.request_latency.as_ref().map(|r| r.mean)) } // Checks corresponding response time percentiles of two benchmark runs // are statistically different. Returns None if the second benchmark is unset. pub fn cmp_resp_time_percentile(&self, p: Percentile) -> Option { - self.cmp(|s| s.resp_time_ms.as_ref().map(|r| r.percentiles[p as usize])) + self.cmp(|s| s.request_latency.as_ref().map(|r| r.percentiles.get(p))) } } @@ -592,8 +451,9 @@ pub struct Recorder { pub errors: HashSet, pub cycle_error_count: u64, pub row_count: u64, - pub cycle_times_ns: Histogram, - pub resp_times_ns: Histogram, + pub cycle_latency: LatencyDistributionRecorder, + pub cycle_latency_by_fn: HashMap, + pub request_latency: LatencyDistributionRecorder, log: Log, rate_limit: Option, concurrency_limit: NonZeroUsize, @@ -623,8 +483,9 @@ impl Recorder { row_count: 0, errors: HashSet::new(), cycle_error_count: 0, - cycle_times_ns: Histogram::new(3).unwrap(), - resp_times_ns: Histogram::new(3).unwrap(), + cycle_latency: LatencyDistributionRecorder::default(), + cycle_latency_by_fn: HashMap::new(), + request_latency: LatencyDistributionRecorder::default(), } } @@ -633,25 +494,27 @@ impl Recorder { pub fn record(&mut self, samples: &[WorkloadStats]) -> &Sample { assert!(!samples.is_empty()); for s in samples.iter() { - self.resp_times_ns - .add(&s.session_stats.resp_times_ns) - .unwrap(); + self.request_latency.add(&s.session_stats.resp_times_ns); for fs in &s.function_stats { - self.cycle_times_ns.add(&fs.call_times_ns).unwrap(); + self.cycle_latency.add(&fs.cycle_latency); + self.cycle_latency_by_fn + .entry(fs.function.name.clone()) + .or_default() + .add(&fs.cycle_latency); } } - let stats = Sample::new(self.start_instant, samples); - self.cycle_count += stats.cycle_count; - self.cycle_error_count += stats.cycle_error_count; - self.request_count += stats.request_count; - self.request_retry_count += stats.req_retry_count; - self.request_error_count += stats.req_error_count; - self.row_count += stats.row_count; + let sample = Sample::new(self.start_instant, samples); + self.cycle_count += sample.cycle_count; + self.cycle_error_count += sample.cycle_error_count; + self.request_count += sample.request_count; + self.request_retry_count += sample.req_retry_count; + self.request_error_count += sample.req_error_count; + self.row_count += sample.row_count; if self.errors.len() < MAX_KEPT_ERRORS { - self.errors.extend(stats.req_errors.iter().cloned()); + self.errors.extend(sample.req_errors.iter().cloned()); } - self.log.append(stats) + self.log.append(sample) } /// Stops the recording, computes the statistics and returns them as the new object. @@ -659,18 +522,14 @@ impl Recorder { self.end_time = SystemTime::now(); self.end_instant = Instant::now(); self.end_cpu_time = ProcessTime::now(); - self.stats() - } - /// Computes the final statistics based on collected data - /// and turn them into report that can be serialized - fn stats(self) -> BenchmarkStats { let elapsed_time_s = (self.end_instant - self.start_instant).as_secs_f64(); let cpu_time_s = self .end_cpu_time .duration_since(self.start_cpu_time) .as_secs_f64(); let cpu_util = 100.0 * cpu_time_s / elapsed_time_s / num_cpus::get() as f64; + let cycle_throughput = self.log.call_throughput(); let cycle_throughput_ratio = self.rate_limit.map(|r| 100.0 * cycle_throughput.value / r); let req_throughput = self.log.req_throughput(); @@ -678,13 +537,6 @@ impl Recorder { let concurrency = self.log.mean_concurrency(); let concurrency_ratio = 100.0 * concurrency.value / self.concurrency_limit.get() as f64; - let cycle_time_percentiles: Vec = Percentile::iter() - .map(|p| self.log.cycle_time_percentile(p)) - .collect(); - let resp_time_percentiles: Vec = Percentile::iter() - .map(|p| self.log.resp_time_percentile(p)) - .collect(); - BenchmarkStats { start_time: self.start_time.into(), end_time: self.end_time.into(), @@ -708,17 +560,14 @@ impl Recorder { cycle_throughput_ratio, req_throughput, row_throughput, - cycle_time_ms: TimeDistribution { - mean: self.log.cycle_time_ms(), - percentiles: cycle_time_percentiles, - distribution: distribution(&self.cycle_times_ns), - }, - resp_time_ms: if self.request_count > 0 { - Some(TimeDistribution { - mean: self.log.resp_time_ms(), - percentiles: resp_time_percentiles, - distribution: distribution(&self.resp_times_ns), - }) + cycle_latency: self.cycle_latency.distribution_with_errors(), + cycle_latency_by_fn: self + .cycle_latency_by_fn + .into_iter() + .map(|(k, v)| (k, v.distribution_with_errors())) + .collect(), + request_latency: if self.request_count > 0 { + Some(self.request_latency.distribution_with_errors()) } else { None }, diff --git a/src/workload.rs b/src/workload.rs index dded7c2..0e44650 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use hdrhistogram::Histogram; use rand::distributions::{Distribution, WeightedIndex}; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; @@ -21,6 +20,7 @@ use serde::{Deserialize, Serialize}; use try_lock::TryLock; use crate::error::LatteError; +use crate::latency::LatencyDistributionRecorder; use crate::{context, CassError, CassErrorKind, Context, SessionStats}; /// Wraps a reference to Session that can be converted to a Rune `Value` @@ -406,7 +406,7 @@ pub struct FnStats { pub function: FnRef, pub call_count: u64, pub error_count: u64, - pub call_times_ns: Histogram, + pub cycle_latency: LatencyDistributionRecorder, } impl FnStats { @@ -415,29 +415,25 @@ impl FnStats { function, call_count: 0, error_count: 0, - call_times_ns: Histogram::new(3).unwrap(), + cycle_latency: LatencyDistributionRecorder::default(), } } pub fn reset(&mut self) { self.call_count = 0; self.error_count = 0; - self.call_times_ns.clear(); + self.cycle_latency.clear(); } pub fn operation_completed(&mut self, duration: Duration) { self.call_count += 1; - self.call_times_ns - .record(duration.as_nanos().clamp(1, u64::MAX as u128) as u64) - .unwrap(); + self.cycle_latency.record(duration) } pub fn operation_failed(&mut self, duration: Duration) { self.call_count += 1; self.error_count += 1; - self.call_times_ns - .record(duration.as_nanos().clamp(1, u64::MAX as u128) as u64) - .unwrap(); + self.cycle_latency.record(duration); } }