From 70870dc833dc22bfc36a12302b123e5d4b68dc9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Wed, 7 Aug 2024 13:35:58 +0200 Subject: [PATCH 1/3] Record function call duration histograms separately in Sample --- src/main.rs | 2 +- src/stats.rs | 66 +++++++++++++++---------- src/workload.rs | 125 ++++++++++++++++++++++++++++++++++-------------- 3 files changed, 132 insertions(+), 61 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8a85219..f5b127b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -459,7 +459,7 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> { let interval_start_time = Duration::from_millis((sample.time_s * 1000.0) as u64); let interval_duration = Duration::from_millis((sample.duration_s * 1000.0) as u64); log_writer.write_histogram( - &sample.cycle_time_histogram_ns.0, + &sample.cycle_time_histograms_ns.0, interval_start_time, interval_duration, Tag::new(format!("{tag_prefix}cycles").as_str()), diff --git a/src/stats.rs b/src/stats.rs index 6c5a3ff..8a07d20 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Local}; use std::cmp::min; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::time::{Instant, SystemTime}; @@ -275,19 +275,24 @@ pub struct Sample { pub row_throughput: f32, pub mean_cycle_time_ms: f32, pub mean_resp_time_ms: f32, + pub cycle_time_percentiles: [f32; Percentile::COUNT], + pub cycle_time_histograms_ns: SerializableHistogram, + + pub cycle_time_percentiles_by_fn: HashMap, + pub cycle_time_histograms_ns_by_fn: HashMap, + pub resp_time_percentiles: [f32; Percentile::COUNT], - pub cycle_time_histogram_ns: SerializableHistogram, pub resp_time_histogram_ns: SerializableHistogram, } impl Sample { pub fn new(base_start_time: Instant, stats: &[WorkloadStats]) -> Sample { assert!(!stats.is_empty()); + + let create_histogram = || SerializableHistogram(Histogram::new(3).unwrap()); let mut cycle_count = 0; let mut cycle_error_count = 0; - let mut cycle_times_ns = Histogram::new(3).unwrap(); - let mut request_count = 0; let mut req_retry_count = 0; let mut row_count = 0; @@ -295,14 +300,13 @@ impl Sample { let mut req_error_count = 0; let mut mean_queue_len = 0.0; let mut duration_s = 0.0; - let mut resp_times_ns = Histogram::new(3).unwrap(); - let mut cycle_time_histogram_ns = Histogram::new(3).unwrap(); - let mut resp_time_histogram_ns = Histogram::new(3).unwrap(); + let mut resp_times_ns = create_histogram(); + let mut cycle_times_ns = create_histogram(); + let mut cycle_times_ns_per_fn = HashMap::new(); for s in stats { let ss = &s.session_stats; - let fs = &s.function_stats; request_count += ss.req_count; row_count += ss.row_count; if errors.len() < MAX_KEPT_ERRORS { @@ -312,16 +316,27 @@ impl Sample { req_retry_count += ss.req_retry_count; mean_queue_len += ss.mean_queue_length / stats.len() as f32; duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32; - resp_times_ns.add(&ss.resp_times_ns).unwrap(); - resp_time_histogram_ns.add(&ss.resp_times_ns).unwrap(); + resp_times_ns.0.add(&ss.resp_times_ns).unwrap(); + + for fs in &s.function_stats { + cycle_count += fs.call_count; + cycle_error_count = fs.error_count; + cycle_times_ns.0.add(&fs.call_times_ns).unwrap(); + cycle_times_ns_per_fn + .entry(fs.function.name.clone()) + .or_insert_with(create_histogram) + .0 + .add(&fs.call_times_ns) + .unwrap(); + } + } + let resp_time_percentiles = percentiles_ms(&resp_times_ns.0); + let cycle_time_percentiles = percentiles_ms(&cycle_times_ns.0); - cycle_count += fs.call_count; - cycle_error_count = fs.error_count; - cycle_times_ns.add(&fs.call_times_ns).unwrap(); - cycle_time_histogram_ns.add(&fs.call_times_ns).unwrap(); + let mut cycle_time_percentiles_per_fn = HashMap::new(); + for (fn_name, histogram) in &cycle_times_ns_per_fn { + cycle_time_percentiles_per_fn.insert(fn_name.to_owned(), percentiles_ms(&histogram.0)); } - let resp_time_percentiles = percentiles_ms(&resp_times_ns); - let call_time_percentiles = percentiles_ms(&cycle_times_ns); Sample { time_s: (stats[0].start_time - base_start_time).as_secs_f32(), @@ -337,12 +352,14 @@ impl Sample { cycle_throughput: cycle_count as f32 / duration_s, req_throughput: request_count as f32 / duration_s, row_throughput: row_count as f32 / duration_s, - mean_cycle_time_ms: cycle_times_ns.mean() as f32 / 1000000.0, - cycle_time_histogram_ns: SerializableHistogram(cycle_time_histogram_ns), - cycle_time_percentiles: call_time_percentiles, - mean_resp_time_ms: resp_times_ns.mean() as f32 / 1000000.0, + mean_cycle_time_ms: cycle_times_ns.0.mean() as f32 / 1000000.0, + mean_resp_time_ms: resp_times_ns.0.mean() as f32 / 1000000.0, resp_time_percentiles, - resp_time_histogram_ns: SerializableHistogram(resp_time_histogram_ns), + resp_time_histogram_ns: resp_times_ns, + cycle_time_percentiles, + cycle_time_percentiles_by_fn: cycle_time_percentiles_per_fn, + cycle_time_histograms_ns: cycle_times_ns, + cycle_time_histograms_ns_by_fn: cycle_times_ns_per_fn, } } } @@ -605,9 +622,10 @@ impl Recorder { self.resp_times_ns .add(&s.session_stats.resp_times_ns) .unwrap(); - self.cycle_times_ns - .add(&s.function_stats.call_times_ns) - .unwrap(); + + for fs in &s.function_stats { + self.cycle_times_ns.add(&fs.call_times_ns).unwrap(); + } } let stats = Sample::new(self.start_instant, samples); self.cycle_count += stats.cycle_count; diff --git a/src/workload.rs b/src/workload.rs index c69a2db..95d20e2 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -1,5 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; +use std::hash::{Hash, Hasher}; +use std::mem; use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -15,6 +17,7 @@ use rune::compile::{CompileVisitor, MetaError, MetaRef}; use rune::runtime::{AnyObj, Args, RuntimeContext, Shared, VmError, VmResult}; use rune::termcolor::{ColorChoice, StandardStream}; use rune::{vm_try, Any, Diagnostics, Module, Source, Sources, ToValue, Unit, Value, Vm}; +use serde::{Deserialize, Serialize}; use try_lock::TryLock; use crate::error::LatteError; @@ -69,10 +72,16 @@ impl<'a> ToValue for ContextRefMut<'a> { /// Stores the name and hash together. /// Name is used for message formatting, hash is used for fast function lookup. -#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct FnRef { - name: String, - hash: rune::Hash, + pub name: String, + pub hash: rune::Hash, +} + +impl Hash for FnRef { + fn hash(&self, state: &mut H) { + self.hash.hash(state); + } } impl FnRef { @@ -364,12 +373,28 @@ impl CompileVisitor for ProgramMetadata { /// Tracks statistics of the Rune function invoked by the workload #[derive(Clone, Debug)] pub struct FnStats { + pub function: FnRef, pub call_count: u64, pub error_count: u64, pub call_times_ns: Histogram, } impl FnStats { + pub fn new(function: FnRef) -> FnStats { + FnStats { + function, + call_count: 0, + error_count: 0, + call_times_ns: Histogram::new(3).unwrap(), + } + } + + pub fn reset(&mut self) { + self.call_count = 0; + self.error_count = 0; + self.call_times_ns.clear(); + } + pub fn operation_completed(&mut self, duration: Duration) { self.call_count += 1; self.call_times_ns @@ -386,53 +411,85 @@ impl FnStats { } } -impl Default for FnStats { - fn default() -> Self { - FnStats { - call_count: 0, - error_count: 0, - call_times_ns: Histogram::new(3).unwrap(), - } - } -} - /// Statistics of operations (function calls) and Cassandra requests. pub struct WorkloadStats { pub start_time: Instant, pub end_time: Instant, - pub function_stats: FnStats, + pub function_stats: Vec, pub session_stats: SessionStats, } /// Mutable part of Workload -pub struct WorkloadState { +pub struct FnStatsCollector { start_time: Instant, - fn_stats: FnStats, + fn_stats: Vec, } -impl Default for WorkloadState { - fn default() -> Self { - WorkloadState { +impl FnStatsCollector { + pub fn new(functions: impl IntoIterator) -> FnStatsCollector { + let mut fn_stats = Vec::new(); + for f in functions { + fn_stats.push(FnStats::new(f)); + } + FnStatsCollector { start_time: Instant::now(), - fn_stats: Default::default(), + fn_stats, } } + + pub fn functions(&self) -> impl Iterator + '_ { + self.fn_stats.iter().map(|f| f.function.clone()) + } + + /// Records the duration of a successful operation + pub fn operation_completed(&mut self, function: &FnRef, duration: Duration) { + self.fn_stats_mut(function).operation_completed(duration); + } + + /// Records the duration of a failed operation + pub fn operation_failed(&mut self, function: &FnRef, duration: Duration) { + self.fn_stats_mut(function).operation_failed(duration); + } + + /// Finds the stats for given function. + /// The function must exist! Otherwise, it will panic. + fn fn_stats_mut(&mut self, function: &FnRef) -> &mut FnStats { + self.fn_stats + .iter_mut() + .find(|f| f.function.hash == function.hash) + .unwrap() + } + + /// Clears any collected stats and sets the start time + pub fn reset(&mut self, start_time: Instant) { + self.fn_stats.iter_mut().for_each(FnStats::reset); + self.start_time = start_time; + } + + /// Returns the collected stats and resets this object + pub fn take(&mut self, end_time: Instant) -> FnStatsCollector { + let mut state = FnStatsCollector::new(self.functions()); + state.start_time = end_time; + mem::swap(self, &mut state); + state + } } pub struct Workload { context: Context, program: Program, router: FunctionRouter, - state: TryLock, + state: TryLock, } impl Workload { pub fn new(context: Context, program: Program, functions: &[(FnRef, f64)]) -> Workload { + let state = FnStatsCollector::new(functions.iter().map(|x| x.0.clone())); Workload { context, program, router: FunctionRouter::new(functions), - state: TryLock::new(WorkloadState::default()), + state: TryLock::new(state), } } @@ -442,7 +499,9 @@ impl Workload { // make a deep copy to avoid congestion on Arc ref counts used heavily by Rune program: self.program.unshare(), router: self.router.clone(), - state: TryLock::new(WorkloadState::default()), + state: TryLock::new(FnStatsCollector::new( + self.state.try_lock().unwrap().functions(), + )), }) } @@ -454,26 +513,24 @@ impl Workload { let start_time = Instant::now(); let mut rng = SmallRng::seed_from_u64(cycle as u64); let context = SessionRef::new(&self.context); - let result = self - .program - .async_call(self.router.select(&mut rng), (context, cycle)) - .await; + let function = self.router.select(&mut rng); + let result = self.program.async_call(function, (context, cycle)).await; let end_time = Instant::now(); let mut state = self.state.try_lock().unwrap(); let duration = end_time - start_time; match result { Ok(_) => { - state.fn_stats.operation_completed(duration); + state.operation_completed(function, duration); Ok((cycle, end_time)) } Err(LatteError::Cassandra(CassError(CassErrorKind::Overloaded(_, _)))) => { // don't stop on overload errors; // they are being counted by the context stats anyways - state.fn_stats.operation_failed(duration); + state.operation_failed(function, duration); Ok((cycle, end_time)) } Err(e) => { - state.fn_stats.operation_failed(duration); + state.operation_failed(function, duration); Err(e) } } @@ -489,24 +546,20 @@ impl Workload { /// Needed for producing `WorkloadStats` with /// recorded start and end times of measurement. pub fn reset(&self, start_time: Instant) { - let mut state = self.state.try_lock().unwrap(); - state.fn_stats = FnStats::default(); - state.start_time = start_time; + self.state.try_lock().unwrap().reset(start_time); self.context.reset(); } /// Returns statistics of the operations invoked by this workload so far. /// Resets the internal statistic counters. pub fn take_stats(&self, end_time: Instant) -> WorkloadStats { - let mut state = self.state.try_lock().unwrap(); + let state = self.state.try_lock().unwrap().take(end_time); let result = WorkloadStats { start_time: state.start_time, end_time, function_stats: state.fn_stats.clone(), session_stats: self.context().take_session_stats(), }; - state.start_time = end_time; - state.fn_stats = FnStats::default(); result } } From 05e72cce64e7b0d276e1b9b571cc7e02ac57df54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Thu, 8 Aug 2024 09:21:30 +0200 Subject: [PATCH 2/3] Compute percentile errors from histograms --- Cargo.lock | 130 ++++++++++++++++++ Cargo.toml | 4 + src/autocorrelation.rs | 275 ++++++++++++++++++++++++++++++++++++++ src/bootstrap.rs | 66 ---------- src/context.rs | 9 +- src/latency.rs | 66 ++++++++++ src/main.rs | 7 +- src/percentiles.rs | 195 +++++++++++++++++++++++++++ src/plot.rs | 2 +- src/report.rs | 145 +++++++------------- src/stats.rs | 291 ++++++++++------------------------------- src/workload.rs | 16 +-- 12 files changed, 801 insertions(+), 405 deletions(-) create mode 100644 src/autocorrelation.rs delete mode 100644 src/bootstrap.rs create mode 100644 src/latency.rs create mode 100644 src/percentiles.rs diff --git a/Cargo.lock b/Cargo.lock index d65c8e5..2c8c4ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,6 +134,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" +[[package]] +name = "assert_approx_eq" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c07dab4369547dbe5114677b33fbbf724971019f3818172d59a97a61c774ffd" + [[package]] name = "async-trait" version = "0.1.77" @@ -499,6 +505,12 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "err-derive" version = "0.3.1" @@ -615,6 +627,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" @@ -660,6 +678,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "hashbrown" version = "0.14.3" @@ -753,6 +777,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "indexmap" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "itertools" version = "0.11.0" @@ -811,6 +845,7 @@ name = "latte-cli" version = "0.28.0" dependencies = [ "anyhow", + "assert_approx_eq", "base64 0.22.1", "chrono", "clap", @@ -824,15 +859,18 @@ dependencies = [ "jemallocator", "lazy_static", "metrohash", + "more-asserts", "num_cpus", "openssl", "parse_duration", "pin-project", "plotters", "rand", + "rand_distr", "regex", "rmp", "rmp-serde", + "rstest", "rune", "rust-embed", "scylla", @@ -954,6 +992,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "more-asserts" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fafa6961cabd9c63bcd77a45d7e3b7f3b552b70417831fb0f56db717e72407e" + [[package]] name = "musli" version = "0.0.42" @@ -1368,6 +1412,15 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "proc-macro-crate" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1538,6 +1591,12 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "rmp" version = "0.8.14" @@ -1560,6 +1619,36 @@ dependencies = [ "serde", ] +[[package]] +name = "rstest" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b423f0e62bdd61734b67cd21ff50871dfaeb9cc74f869dcd6af974fbcb19936" +dependencies = [ + "futures", + "futures-timer", + "rstest_macros", + "rustc_version", +] + +[[package]] +name = "rstest_macros" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e1711e7d14f74b12a58411c542185ef7fb7f2e7f8ee6e2940a883628522b42" +dependencies = [ + "cfg-if", + "glob", + "proc-macro-crate", + "proc-macro2", + "quote", + "regex", + "relative-path", + "rustc_version", + "syn 2.0.50", + "unicode-ident", +] + [[package]] name = "rune" version = "0.13.4" @@ -1672,6 +1761,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -1776,6 +1874,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5145b2263bec888224d054d1c820ceffa7d4a23723a2a822f970fcf1c5b64770" +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.204" @@ -2099,6 +2203,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" + +[[package]] +name = "toml_edit" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" +dependencies = [ + "indexmap", + "toml_datetime", + "winnow", +] + [[package]] name = "tracing" version = "0.1.40" @@ -2507,6 +2628,15 @@ version = "0.52.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0770833d60a970638e989b3fa9fd2bb1aaadcf88963d1659fd7d9990196ed2d6" +[[package]] +name = "winnow" +version = "0.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" +dependencies = [ + "memchr", +] + [[package]] name = "zerocopy" version = "0.7.34" diff --git a/Cargo.toml b/Cargo.toml index 660c498..b955e23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,12 +29,14 @@ itertools = "0.13" jemallocator = "0.5" lazy_static = "1.4.0" metrohash = "1.0" +more-asserts = "0.3" num_cpus = "1.13.0" openssl = "0.10.38" parse_duration = "2.1.1" pin-project = "1.1" plotters = { version = "0.3", default-features = false, features = ["line_series", "svg_backend", "full_palette"]} rand = { version = "0.8", default-features = false, features = ["small_rng", "std"] } +rand_distr = "0.4" regex = "1.5" rune = "0.13" rust-embed = "8" @@ -57,6 +59,8 @@ uuid = { version = "1.1", features = ["v4"] } walkdir = "2" [dev-dependencies] +assert_approx_eq = "1" +rstest = "0.22" tokio = { version = "1", features = ["rt", "test-util", "macros"] } [profile.release] diff --git a/src/autocorrelation.rs b/src/autocorrelation.rs new file mode 100644 index 0000000..9155b9a --- /dev/null +++ b/src/autocorrelation.rs @@ -0,0 +1,275 @@ +use more_asserts::assert_le; +use rand_distr::num_traits::Pow; + +/// Estimates the effective size of the sample, by taking account for +/// autocorrelation between measurements. +/// +/// In most statistical operations we assume measurements to be independent of each other. +/// However, in reality time series data are often autocorrelated, which means measurements +/// in near proximity affect each other. A single performance fluctuation of the measured system +/// can affect multiple measurements one after another. This effect decreases the effective +/// sample size and increases measurement uncertainty. +/// +/// The algorithm used for computing autocorrelation matrix is quite inaccurate as it doesn't compute +/// the full covariance matrix, but approximates it by pre-merging data points. +/// However, it is fairly fast (O(n log log n) and works in O(log n) memory incrementally. +#[derive(Clone, Debug, Default)] +pub struct EffectiveSampleSizeEstimator { + n: u64, + levels: Vec, +} + +#[derive(Clone, Debug)] +struct Level { + level: usize, + buf: Vec, + variance: VarianceEstimator, +} + +/// Estimates the effective sample size by using batch means method. +impl EffectiveSampleSizeEstimator { + /// Adds a single data point + pub fn record(&mut self, x: f64) { + self.n += 1; + self.insert(x, 0); + } + + /// Merges another estimator data into this one + pub fn add(&mut self, other: &EffectiveSampleSizeEstimator) { + self.n += other.n; + for level in &other.levels { + self.add_level(level); + } + } + + pub fn clear(&mut self) { + self.n = 0; + self.levels.clear(); + } + + fn insert(&mut self, x: f64, level: usize) { + if self.levels.len() == level { + self.levels.push(Level::new(level)); + } + if let Some(carry) = self.levels[level].record(x) { + self.insert(carry, level + 1); + } + } + + fn add_level(&mut self, level: &Level) { + if self.levels.len() == level.level { + self.levels.push(level.clone()); + } else if let Some(carry) = self.levels[level.level].add(level) { + self.insert(carry, level.level + 1); + } + } + + pub fn effective_sample_size(&self) -> u64 { + if self.n <= 1 { + return self.n; + } + + assert!(!self.levels.is_empty()); + + // Autocorrelation time can be estimated as: + // size_of_the_batch * variance_of_the_batch_mean / variance_of_the_whole_sample + // We can technically compute it from any level but there are some constraints: + // - the batch size must be greater than the autocorrelation time + // - the number of batches should be also large enough for the variance + // of the mean be accurate + let sample_variance = self.levels[0].variance.value(); + let autocorrelation_time = self + .levels + .iter() + .map(|l| { + ( + l.batch_len(), + l.batch_len() as f64 * l.variance.value() / sample_variance, + ) + }) + .take_while(|(batch_len, time)| *time > 0.2 * *batch_len as f64) + .map(|(_, time)| time) + .reduce(f64::max) + .unwrap(); + + (self.n as f64 / autocorrelation_time) as u64 + } +} + +impl Level { + fn new(level: usize) -> Self { + Level { + level, + buf: Vec::with_capacity(2), + variance: Default::default(), + } + } + + fn batch_len(&self) -> usize { + 1 << self.level + } + + fn record(&mut self, value: f64) -> Option { + self.variance.record(value); + self.buf.push(value); + self.merge() + } + + fn add(&mut self, other: &Level) -> Option { + assert_eq!(self.level, other.level); + self.variance.add(&other.variance); + // If there was more than 1 item recorded by the other level, then we must + // drop our queued item, because it is not a neighbour of the other item + if other.variance.n > 1 { + self.buf.clear(); + } + self.buf.extend(&other.buf); + assert_le!(self.buf.len(), 2); + self.merge() + } + + fn merge(&mut self) -> Option { + if self.buf.len() == 2 { + let merged = (self.buf[0] + self.buf[1]) / 2.0; + self.buf.clear(); + Some(merged) + } else { + None + } + } +} + +#[derive(Clone, Debug, Default)] +struct VarianceEstimator { + mean: f64, + var: f64, + n: u64, +} + +/// Incrementally estimates covariance of two random variables X and Y. +/// Uses Welford's online algorithm. +impl VarianceEstimator { + pub fn record(&mut self, x: f64) { + self.n += 1; + let delta1 = x - self.mean; + self.mean += delta1 / self.n as f64; + let delta2 = x - self.mean; + self.var += delta1 * delta2; + } + + pub fn add(&mut self, other: &VarianceEstimator) { + let n1 = self.n as f64; + let n2 = other.n as f64; + let m1 = self.mean; + let m2 = other.mean; + let new_mean = (m1 * n1 + m2 * n2) / (n1 + n2); + + self.n += other.n; + self.mean = new_mean; + self.var = self.var + other.var + (m1 - new_mean).pow(2) * n1 + (m2 - new_mean).pow(2) * n2; + } + + pub fn value(&self) -> f64 { + if self.n <= 1 { + f64::NAN + } else { + self.var / (self.n - 1) as f64 + } + } +} + +#[cfg(test)] +mod test { + use crate::autocorrelation::{EffectiveSampleSizeEstimator, VarianceEstimator}; + use assert_approx_eq::assert_approx_eq; + use more_asserts::{assert_gt, assert_le}; + use rand::rngs::SmallRng; + use rand::{Rng, SeedableRng}; + use rstest::rstest; + + #[test] + fn test_random() { + let mut rng = SmallRng::seed_from_u64(4); + let mut estimator = EffectiveSampleSizeEstimator::default(); + const N: u64 = 1000; + for _ in 0..N { + estimator.record(rng.gen()); + } + assert_gt!(estimator.effective_sample_size(), N / 2); + assert_le!(estimator.effective_sample_size(), N); + } + + #[rstest] + #[case(100, 2)] + #[case(100, 4)] + #[case(100, 10)] + #[case(100, 16)] + #[case(100, 50)] + #[case(100, 100)] + #[case(100, 256)] + #[case(10, 1000)] + #[case(10, 10000)] + fn test_correlated(#[case] n: u64, #[case] cluster_size: usize) { + let mut rng = SmallRng::seed_from_u64(1); + let mut estimator = EffectiveSampleSizeEstimator::default(); + for _ in 0..n { + let v = rng.gen(); + for _ in 0..cluster_size { + estimator.record(v); + } + } + assert_gt!(estimator.effective_sample_size(), n / 2); + assert_le!(estimator.effective_sample_size(), n * 2); + } + + #[test] + fn test_merge_variances() { + const COUNT: usize = 1000; + let mut rng = SmallRng::seed_from_u64(1); + let data: Vec<_> = (0..COUNT) + .map(|i| 0.001 * i as f64 + rng.gen::()) + .collect(); + let mut est = VarianceEstimator::default(); + data.iter().for_each(|x| est.record(*x)); + + let (sub1, sub2) = data.split_at(COUNT / 3); + let mut sub_est1 = VarianceEstimator::default(); + let mut sub_est2 = VarianceEstimator::default(); + sub1.iter().for_each(|x| sub_est1.record(*x)); + sub2.iter().for_each(|x| sub_est2.record(*x)); + + let mut est2 = VarianceEstimator::default(); + est2.add(&sub_est1); + est2.add(&sub_est2); + + assert_approx_eq!(est.value(), est2.value(), 0.00001); + } + + #[test] + fn test_merge_ess() { + const COUNT: usize = 10000; + let mut rng = SmallRng::seed_from_u64(1); + let mut data = Vec::new(); + data.extend((0..COUNT / 2).map(|_| rng.gen::())); + data.extend((0..COUNT / 2).map(|_| rng.gen::() + 0.2)); + + let mut est = EffectiveSampleSizeEstimator::default(); + data.iter().for_each(|x| est.record(*x)); + + let (sub1, sub2) = data.split_at(COUNT / 3); + let mut sub_est1 = EffectiveSampleSizeEstimator::default(); + let mut sub_est2 = EffectiveSampleSizeEstimator::default(); + sub1.iter().for_each(|x| sub_est1.record(*x)); + sub2.iter().for_each(|x| sub_est2.record(*x)); + + let mut est2 = EffectiveSampleSizeEstimator::default(); + est2.add(&sub_est1); + est2.add(&sub_est2); + + assert_approx_eq!( + est.effective_sample_size() as f64, + est2.effective_sample_size() as f64, + est.effective_sample_size() as f64 * 0.1 + ); + } +} diff --git a/src/bootstrap.rs b/src/bootstrap.rs deleted file mode 100644 index 4ddd812..0000000 --- a/src/bootstrap.rs +++ /dev/null @@ -1,66 +0,0 @@ -use hdrhistogram::Histogram; -use rand::distributions::weighted::alias_method::WeightedIndex; -use rand::distributions::Distribution; -use rand::rngs::ThreadRng; -use rand::thread_rng; - -/// Bootstraps a random population sample based on given distribution. -/// See: https://en.wikipedia.org/wiki/Bootstrapping_(statistics) -pub struct Bootstrap { - values: Vec, - index: WeightedIndex, - rng: ThreadRng, -} - -impl Bootstrap { - pub fn new(histogram: &Histogram) -> Bootstrap { - let mut values = Vec::new(); - let mut weights = Vec::new(); - for v in histogram.iter_recorded() { - values.push(histogram.median_equivalent(v.value_iterated_to())); - weights.push(v.count_since_last_iteration()); - } - Bootstrap { - values, - index: WeightedIndex::new(weights).unwrap(), - rng: thread_rng(), - } - } - - pub fn sample(&mut self) -> u64 { - self.values[self.index.sample(&mut self.rng)] - } -} - -#[cfg(test)] -mod test { - use super::*; - use statrs::statistics::Mean; - use statrs::statistics::Statistics; - - #[test] - fn mean() { - let mut h1 = Histogram::::new(3).unwrap(); - for i in 10..100001 { - h1.record(i); - } - //h1.record(100000); - - let mean = h1.mean(); - println!("Mean: {}", mean); - println!("Stderr: {}", h1.stdev() / (h1.len() as f64).sqrt()); - - let mut bootstrap = Bootstrap::new(&h1); - let mut means = Vec::new(); - for j in 0..10 { - let mut h = Vec::with_capacity(100000); - for i in 0..100000 { - h.push(bootstrap.sample() as f64); - } - means.push(h.mean()); - } - - println!("Means mean: {}", means.iter().mean()); - println!("Means stddev: {}", means.iter().std_dev()); - } -} diff --git a/src/context.rs b/src/context.rs index cd607e3..e5111e0 100644 --- a/src/context.rs +++ b/src/context.rs @@ -11,7 +11,6 @@ use std::str::FromStr; use std::sync::Arc; use chrono::Utc; -use hdrhistogram::Histogram; use itertools::Itertools; use metrohash::{MetroHash128, MetroHash64}; use openssl::error::ErrorStack; @@ -42,6 +41,7 @@ use try_lock::TryLock; use uuid::{Variant, Version}; use crate::config::{ConnectionConf, RetryDelay}; +use crate::latency::LatencyDistributionRecorder; use crate::LatteError; fn ssl_context(conf: &&ConnectionConf) -> Result, CassError> { @@ -309,7 +309,7 @@ pub struct SessionStats { pub row_count: u64, pub queue_length: u64, pub mean_queue_length: f32, - pub resp_times_ns: Histogram, + pub resp_times_ns: LatencyDistributionRecorder, } impl SessionStats { @@ -333,8 +333,7 @@ impl SessionStats { retries: u64, ) { self.queue_length -= 1; - let duration_ns = duration.as_nanos().clamp(1, u64::MAX as u128) as u64; - self.resp_times_ns.record(duration_ns).unwrap(); + self.resp_times_ns.record(duration); self.req_count += 1; self.req_retry_count += retries; match rs { @@ -371,7 +370,7 @@ impl Default for SessionStats { row_count: 0, queue_length: 0, mean_queue_length: 0.0, - resp_times_ns: Histogram::new(3).unwrap(), + resp_times_ns: LatencyDistributionRecorder::default(), } } } diff --git a/src/latency.rs b/src/latency.rs new file mode 100644 index 0000000..a4b18b0 --- /dev/null +++ b/src/latency.rs @@ -0,0 +1,66 @@ +use crate::autocorrelation::EffectiveSampleSizeEstimator; +use crate::histogram::SerializableHistogram; +use crate::percentiles::Percentiles; +use crate::stats::Mean; +use hdrhistogram::Histogram; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +/// Captures latency mean and percentiles, with uncertainty estimates. +#[derive(Serialize, Deserialize, Debug)] +pub struct LatencyDistribution { + pub mean: Mean, + pub percentiles: Percentiles, + pub histogram: SerializableHistogram, +} + +/// Builds TimeDistribution from a stream of durations. +#[derive(Clone, Debug)] +pub struct LatencyDistributionRecorder { + histogram_ns: Histogram, + ess_estimator: EffectiveSampleSizeEstimator, +} + +impl LatencyDistributionRecorder { + pub fn record(&mut self, time: Duration) { + self.histogram_ns + .record(time.as_nanos().clamp(1, u64::MAX as u128) as u64) + .unwrap(); + self.ess_estimator.record(time.as_secs_f64()); + } + + pub fn add(&mut self, other: &LatencyDistributionRecorder) { + self.histogram_ns.add(&other.histogram_ns).unwrap(); + self.ess_estimator.add(&other.ess_estimator); + } + + pub fn clear(&mut self) { + self.histogram_ns.clear(); + self.ess_estimator.clear(); + } + + pub fn distribution(&self) -> LatencyDistribution { + LatencyDistribution { + mean: Mean::from(&self.histogram_ns, 1e-6, 1), + percentiles: Percentiles::compute(&self.histogram_ns, 1e-6), + histogram: SerializableHistogram(self.histogram_ns.clone()), + } + } + pub fn distribution_with_errors(&self) -> LatencyDistribution { + let ess = self.ess_estimator.effective_sample_size(); + LatencyDistribution { + mean: Mean::from(&self.histogram_ns, 1e-6, ess), + percentiles: Percentiles::compute_with_errors(&self.histogram_ns, 1e-6, ess), + histogram: SerializableHistogram(self.histogram_ns.clone()), + } + } +} + +impl Default for LatencyDistributionRecorder { + fn default() -> Self { + Self { + histogram_ns: Histogram::new(3).unwrap(), + ess_estimator: Default::default(), + } + } +} diff --git a/src/main.rs b/src/main.rs index f5b127b..1325c57 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,6 +39,7 @@ use crate::stats::{BenchmarkCmp, BenchmarkStats, Recorder}; use crate::table::{Alignment, Table}; use crate::workload::{FnRef, Program, Workload, WorkloadStats, LOAD_FN}; +mod autocorrelation; mod chunks; mod config; mod context; @@ -46,6 +47,8 @@ mod cycle; mod error; mod exec; mod histogram; +mod latency; +mod percentiles; mod plot; mod progress; mod report; @@ -459,13 +462,13 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> { let interval_start_time = Duration::from_millis((sample.time_s * 1000.0) as u64); let interval_duration = Duration::from_millis((sample.duration_s * 1000.0) as u64); log_writer.write_histogram( - &sample.cycle_time_histograms_ns.0, + &sample.cycle_latency.histogram.0, interval_start_time, interval_duration, Tag::new(format!("{tag_prefix}cycles").as_str()), )?; log_writer.write_histogram( - &sample.resp_time_histogram_ns.0, + &sample.request_latency.histogram.0, interval_start_time, interval_duration, Tag::new(format!("{tag_prefix}requests").as_str()), diff --git a/src/percentiles.rs b/src/percentiles.rs new file mode 100644 index 0000000..571da30 --- /dev/null +++ b/src/percentiles.rs @@ -0,0 +1,195 @@ +use crate::stats::Mean; +use hdrhistogram::Histogram; +use rand::rngs::SmallRng; +use rand::{thread_rng, Rng, SeedableRng}; +use serde::{Deserialize, Serialize}; +use statrs::statistics::Statistics; +use strum::{EnumCount, EnumIter, IntoEnumIterator}; + +#[allow(non_camel_case_types)] +#[derive(Copy, Clone, EnumIter, EnumCount)] +pub enum Percentile { + Min = 0, + P1, + P2, + P5, + P10, + P25, + P50, + P75, + P90, + P95, + P98, + P99, + P99_9, + P99_99, + Max, +} + +impl Percentile { + pub fn value(&self) -> f64 { + match self { + Percentile::Min => 0.0, + Percentile::P1 => 1.0, + Percentile::P2 => 2.0, + Percentile::P5 => 5.0, + Percentile::P10 => 10.0, + Percentile::P25 => 25.0, + Percentile::P50 => 50.0, + Percentile::P75 => 75.0, + Percentile::P90 => 90.0, + Percentile::P95 => 95.0, + Percentile::P98 => 98.0, + Percentile::P99 => 99.0, + Percentile::P99_9 => 99.9, + Percentile::P99_99 => 99.99, + Percentile::Max => 100.0, + } + } + + pub fn name(&self) -> &'static str { + match self { + Percentile::Min => " Min ", + Percentile::P1 => " 1 ", + Percentile::P2 => " 2 ", + Percentile::P5 => " 5 ", + Percentile::P10 => " 10 ", + Percentile::P25 => " 25 ", + Percentile::P50 => " 50 ", + Percentile::P75 => " 75 ", + Percentile::P90 => " 90 ", + Percentile::P95 => " 95 ", + Percentile::P98 => " 98 ", + Percentile::P99 => " 99 ", + Percentile::P99_9 => " 99.9 ", + Percentile::P99_99 => " 99.99", + Percentile::Max => " Max ", + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Percentiles([Mean; Percentile::COUNT]); + +impl Percentiles { + const POPULATION_SIZE: usize = 100; + + /// Computes distribution percentiles without errors. + /// Fast. + pub fn compute(histogram: &Histogram, scale: f64) -> Percentiles { + let mut result = Vec::with_capacity(Percentile::COUNT); + for p in Percentile::iter() { + result.push(Mean { + n: Self::POPULATION_SIZE as u64, + value: histogram.value_at_percentile(p.value()) as f64 * scale, + std_err: None, + }); + } + assert_eq!(result.len(), Percentile::COUNT); + Percentiles(result.try_into().unwrap()) + } + + /// Computes distribution percentiles with errors based on a HDR histogram. + /// Caution: this is slow. Don't use it when benchmark is running! + /// Errors are estimated by bootstrapping a larger population of histograms from the + /// distribution determined by the original histogram and computing the standard error. + pub fn compute_with_errors( + histogram: &Histogram, + scale: f64, + effective_sample_size: u64, + ) -> Percentiles { + let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); + + let mut samples: Vec<[f64; Percentile::COUNT]> = Vec::with_capacity(Self::POPULATION_SIZE); + for _ in 0..Self::POPULATION_SIZE { + samples.push(percentiles( + &bootstrap(&mut rng, histogram, effective_sample_size), + scale, + )) + } + + let mut result = Vec::with_capacity(Percentile::COUNT); + for p in Percentile::iter() { + let std_err = samples.iter().map(|s| s[p as usize]).std_dev(); + result.push(Mean { + n: Self::POPULATION_SIZE as u64, + value: histogram.value_at_percentile(p.value()) as f64 * scale, + std_err: Some(std_err), + }); + } + + assert_eq!(result.len(), Percentile::COUNT); + Percentiles(result.try_into().unwrap()) + } + + pub fn get(&self, percentile: Percentile) -> Mean { + self.0[percentile as usize] + } +} + +/// Creates a new random histogram using another histogram as the distribution. +fn bootstrap(rng: &mut impl Rng, histogram: &Histogram, effective_n: u64) -> Histogram { + let n = histogram.len(); + if n <= 1 { + return histogram.clone(); + } + let mut result = + Histogram::new_with_bounds(histogram.low(), histogram.high(), histogram.sigfig()).unwrap(); + + for bucket in histogram.iter_recorded() { + let p = bucket.count_at_value() as f64 / n as f64; + assert!(p > 0.0, "Probability must be greater than 0.0"); + let b = rand_distr::Binomial::new(effective_n, p).unwrap(); + let count: u64 = rng.sample(b); + result.record_n(bucket.value_iterated_to(), count).unwrap() + } + result +} + +fn percentiles(hist: &Histogram, scale: f64) -> [f64; Percentile::COUNT] { + let mut percentiles = [0.0; Percentile::COUNT]; + for (i, p) in Percentile::iter().enumerate() { + percentiles[i] = hist.value_at_percentile(p.value()) as f64 * scale; + } + percentiles +} + +#[cfg(test)] +mod test { + use crate::percentiles::{Percentile, Percentiles}; + use assert_approx_eq::assert_approx_eq; + use hdrhistogram::Histogram; + use rand::{thread_rng, Rng}; + use statrs::distribution::Uniform; + + #[test] + fn test_zero_error() { + let mut histogram = Histogram::::new(3).unwrap(); + for _ in 0..100000 { + histogram.record(1000).unwrap(); + } + + let percentiles = Percentiles::compute_with_errors(&histogram, 1e-6, histogram.len()); + let median = percentiles.get(Percentile::P50); + assert_approx_eq!(median.value, 0.001, 0.00001); + assert_approx_eq!(median.std_err.unwrap(), 0.000, 1e-15); + } + + #[test] + fn test_min_max_error() { + let mut histogram = Histogram::::new(3).unwrap(); + let d = Uniform::new(0.0, 1000.0).unwrap(); + const N: usize = 100000; + for _ in 0..N { + histogram + .record(thread_rng().sample(d).round() as u64) + .unwrap(); + } + + let percentiles = Percentiles::compute_with_errors(&histogram, 1e-6, histogram.len()); + let min = percentiles.get(Percentile::Min); + let max = percentiles.get(Percentile::Max); + assert!(min.std_err.unwrap() < max.value / N as f64); + assert!(max.std_err.unwrap() < max.value / N as f64); + } +} diff --git a/src/plot.rs b/src/plot.rs index 5306fee..9252480 100644 --- a/src/plot.rs +++ b/src/plot.rs @@ -260,7 +260,7 @@ fn resp_time_series(report: &Report, color_index: usize, percentiles: &[f64]) -> for (i, p) in percentiles.iter().enumerate() { let time = s.time_s; let resp_time_ms = - s.resp_time_histogram_ns.0.value_at_percentile(*p) as f32 / 1_000_000.0; + s.request_latency.histogram.0.value_at_percentile(*p) as f32 / 1_000_000.0; series[i].data.push((time, resp_time_ms)); } } diff --git a/src/report.rs b/src/report.rs index b422827..3c64c5b 100644 --- a/src/report.rs +++ b/src/report.rs @@ -1,7 +1,6 @@ use crate::config::{RunCommand, WeightedFunction}; -use crate::stats::{ - BenchmarkCmp, BenchmarkStats, Bucket, Mean, Percentile, Sample, Significance, TimeDistribution, -}; +use crate::percentiles::Percentile; +use crate::stats::{BenchmarkCmp, BenchmarkStats, Mean, Sample, Significance}; use crate::table::Row; use chrono::{DateTime, Local, TimeZone}; use console::{pad_str, style, Alignment}; @@ -9,7 +8,6 @@ use core::fmt; use err_derive::*; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use statrs::statistics::Statistics; use std::collections::BTreeSet; use std::fmt::{Display, Formatter}; use std::io::{BufReader, BufWriter}; @@ -85,14 +83,14 @@ impl Report { throughput: self.result.cycle_throughput.value, latency_p50: self .result - .resp_time_ms + .request_latency .as_ref() - .map(|t| t.percentiles[Percentile::P50 as usize].value), + .map(|t| t.percentiles.get(Percentile::P50).value), latency_p99: self .result - .resp_time_ms + .request_latency .as_ref() - .map(|t| t.percentiles[Percentile::P99 as usize].value), + .map(|t| t.percentiles.get(Percentile::P99).value), } } } @@ -161,18 +159,6 @@ impl From> for Quantity { } } -impl From<&TimeDistribution> for Quantity { - fn from(td: &TimeDistribution) -> Self { - Quantity::from(td.mean) - } -} - -impl From<&Option> for Quantity { - fn from(td: &Option) -> Self { - Quantity::from(td.as_ref().map(|td| td.mean)) - } -} - impl Display for Quantity { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match (&self.value, self.precision) { @@ -611,14 +597,14 @@ impl Display for Sample { self.cycle_count, self.cycle_error_count, self.cycle_throughput, - self.cycle_time_percentiles[Percentile::Min as usize], - self.cycle_time_percentiles[Percentile::P25 as usize], - self.cycle_time_percentiles[Percentile::P50 as usize], - self.cycle_time_percentiles[Percentile::P75 as usize], - self.cycle_time_percentiles[Percentile::P90 as usize], - self.cycle_time_percentiles[Percentile::P99 as usize], - self.cycle_time_percentiles[Percentile::P99_9 as usize], - self.cycle_time_percentiles[Percentile::Max as usize] + self.cycle_latency.percentiles.get(Percentile::Min).value, + self.cycle_latency.percentiles.get(Percentile::P25).value, + self.cycle_latency.percentiles.get(Percentile::P50).value, + self.cycle_latency.percentiles.get(Percentile::P75).value, + self.cycle_latency.percentiles.get(Percentile::P90).value, + self.cycle_latency.percentiles.get(Percentile::P99).value, + self.cycle_latency.percentiles.get(Percentile::P99_9).value, + self.cycle_latency.percentiles.get(Percentile::Max).value ) } } @@ -676,14 +662,6 @@ impl<'a> Display for BenchmarkCmp<'a> { self.line("└─", "row/req", |s| { Quantity::from(s.row_count_per_req).with_precision(1) }), - self.line("Samples", "", |s| Quantity::from(s.log.len())), - self.line("Mean sample size", "op", |s| { - Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean()).with_precision(0) - }), - self.line("└─", "req", |s| { - Quantity::from(s.log.iter().map(|s| s.request_count as f64).mean()) - .with_precision(0) - }), self.line("Concurrency", "req", |s| { Quantity::from(s.concurrency).with_precision(0) }), @@ -708,14 +686,14 @@ impl<'a> Display for BenchmarkCmp<'a> { .with_significance(self.cmp_row_throughput()) .with_orientation(1) .into_box(), - self.line("Mean cycle time", "ms", |s| { - Quantity::from(&s.cycle_time_ms).with_precision(3) + self.line("Cycle latency", "ms", |s| { + Quantity::from(s.cycle_latency.mean).with_precision(3) }) .with_significance(self.cmp_mean_resp_time()) .with_orientation(-1) .into_box(), - self.line("Mean resp. time", "ms", |s| { - Quantity::from(s.resp_time_ms.as_ref().map(|rt| rt.mean)).with_precision(3) + self.line("Request latency", "ms", |s| { + Quantity::from(s.request_latency.as_ref().map(|rt| rt.mean)).with_precision(3) }) .with_significance(self.cmp_mean_resp_time()) .with_orientation(-1) @@ -727,68 +705,35 @@ impl<'a> Display for BenchmarkCmp<'a> { } writeln!(f)?; - if self.v1.request_count > 0 { - let resp_time_percentiles = [ - Percentile::Min, - Percentile::P25, - Percentile::P50, - Percentile::P75, - Percentile::P90, - Percentile::P95, - Percentile::P98, - Percentile::P99, - Percentile::P99_9, - Percentile::P99_99, - Percentile::Max, - ]; - writeln!(f)?; - writeln!(f, "{}", fmt_section_header("RESPONSE TIMES [ms]"))?; - if self.v2.is_some() { - writeln!(f, "{}", fmt_cmp_header(true))?; - } + let resp_time_percentiles = [ + Percentile::Min, + Percentile::P25, + Percentile::P50, + Percentile::P75, + Percentile::P90, + Percentile::P95, + Percentile::P98, + Percentile::P99, + Percentile::P99_9, + Percentile::P99_99, + Percentile::Max, + ]; - for p in resp_time_percentiles.iter() { - let l = self - .line(p.name(), "", |s| { - let rt = s - .resp_time_ms - .as_ref() - .map(|rt| rt.percentiles[*p as usize]); - Quantity::from(rt).with_precision(3) - }) - .with_orientation(-1) - .with_significance(self.cmp_resp_time_percentile(*p)); - writeln!(f, "{l}")?; - } + writeln!(f)?; + writeln!(f, "{}", fmt_section_header("CYCLE LATENCY [ms]"))?; + if self.v2.is_some() { + writeln!(f, "{}", fmt_cmp_header(true))?; + } - writeln!(f)?; - writeln!(f, "{}", fmt_section_header("RESPONSE TIME DISTRIBUTION"))?; - writeln!(f, "{}", style("── Resp. time [ms] ── ────────────────────────────────────────────── Count ────────────────────────────────────────────────").yellow().bold().for_stdout())?; - let zero = Bucket { - percentile: 0.0, - duration_ms: 0.0, - count: 0, - cumulative_count: 0, - }; - let dist = &self.v1.resp_time_ms.as_ref().unwrap().distribution; - let max_count = dist.iter().map(|b| b.count).max().unwrap_or(1); - for (low, high) in ([zero].iter().chain(dist)).tuple_windows() { - writeln!( - f, - "{:8.1} {} {:8.1} {:9} {:6.2}% {}", - style(low.duration_ms).yellow().for_stdout(), - style("...").yellow().for_stdout(), - style(high.duration_ms).yellow().for_stdout(), - high.count, - high.percentile - low.percentile, - style("▪".repeat((82 * high.count / max_count) as usize)) - .dim() - .for_stdout() - )?; - if high.cumulative_count == self.v1.request_count { - break; - } - } + for p in resp_time_percentiles.iter() { + let l = self + .line(p.name(), "", |s| { + let rt = s.request_latency.as_ref().map(|rt| rt.percentiles.get(*p)); + Quantity::from(rt).with_precision(3) + }) + .with_orientation(-1) + .with_significance(self.cmp_resp_time_percentile(*p)); + writeln!(f, "{l}")?; } if self.v1.error_count > 0 { diff --git a/src/stats.rs b/src/stats.rs index 8a07d20..4ce94c3 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -4,15 +4,13 @@ use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::time::{Instant, SystemTime}; +use crate::latency::{LatencyDistribution, LatencyDistributionRecorder}; +use crate::percentiles::Percentile; +use crate::workload::WorkloadStats; use cpu_time::ProcessTime; use hdrhistogram::Histogram; use serde::{Deserialize, Serialize}; use statrs::distribution::{ContinuousCDF, StudentsT}; -use strum::IntoEnumIterator; -use strum::{EnumCount, EnumIter}; - -use crate::histogram::SerializableHistogram; -use crate::workload::WorkloadStats; /// Controls the maximum order of autocovariance taken into /// account when estimating the long run mean error. Higher values make the estimator @@ -97,14 +95,6 @@ pub fn long_run_err(mean: f64, values: &[f32], weights: &[f32]) -> f64 { (long_run_variance(mean, values, weights) / values.len() as f64).sqrt() } -fn percentiles_ms(hist: &Histogram) -> [f32; Percentile::COUNT] { - let mut percentiles = [0.0; Percentile::COUNT]; - for (i, p) in Percentile::iter().enumerate() { - percentiles[i] = hist.value_at_percentile(p.value()) as f32 / 1000000.0; - } - percentiles -} - /// Holds a mean and its error together. /// Makes it more convenient to compare means and it also reduces the number /// of fields, because we don't have to keep the values and the errors in separate fields. @@ -124,6 +114,18 @@ impl Mean { std_err: not_nan(long_run_err(m, v, weights)), } } + + pub fn from(h: &Histogram, scale: f64, effective_n: u64) -> Mean { + Mean { + n: effective_n, + value: h.mean() * scale, + std_err: if effective_n > 1 { + Some(h.stdev() * scale / (effective_n as f64 - 1.0).sqrt()) + } else { + None + }, + } + } } /// Returns the probability that the difference between two means is due to a chance. @@ -160,21 +162,6 @@ pub fn t_test(mean1: &Mean, mean2: &Mean) -> f64 { } } -fn distribution(hist: &Histogram) -> Vec { - let mut result = Vec::new(); - if !hist.is_empty() { - for x in hist.iter_log(100000, 2.15443469) { - result.push(Bucket { - percentile: x.percentile(), - duration_ms: x.value_iterated_to() as f64 / 1000000.0, - count: x.count_since_last_iteration(), - cumulative_count: x.count_at_value(), - }); - } - } - result -} - /// Converts NaN to None. fn not_nan(x: f64) -> Option { if x.is_nan() { @@ -195,68 +182,6 @@ fn not_nan_f32(x: f32) -> Option { const MAX_KEPT_ERRORS: usize = 10; -#[allow(non_camel_case_types)] -#[derive(Copy, Clone, EnumIter, EnumCount)] -pub enum Percentile { - Min = 0, - P1, - P2, - P5, - P10, - P25, - P50, - P75, - P90, - P95, - P98, - P99, - P99_9, - P99_99, - Max, -} - -impl Percentile { - pub fn value(&self) -> f64 { - match self { - Percentile::Min => 0.0, - Percentile::P1 => 1.0, - Percentile::P2 => 2.0, - Percentile::P5 => 5.0, - Percentile::P10 => 10.0, - Percentile::P25 => 25.0, - Percentile::P50 => 50.0, - Percentile::P75 => 75.0, - Percentile::P90 => 90.0, - Percentile::P95 => 95.0, - Percentile::P98 => 98.0, - Percentile::P99 => 99.0, - Percentile::P99_9 => 99.9, - Percentile::P99_99 => 99.99, - Percentile::Max => 100.0, - } - } - - pub fn name(&self) -> &'static str { - match self { - Percentile::Min => " Min ", - Percentile::P1 => " 1 ", - Percentile::P2 => " 2 ", - Percentile::P5 => " 5 ", - Percentile::P10 => " 10 ", - Percentile::P25 => " 25 ", - Percentile::P50 => " 50 ", - Percentile::P75 => " 75 ", - Percentile::P90 => " 90 ", - Percentile::P95 => " 95 ", - Percentile::P98 => " 98 ", - Percentile::P99 => " 99 ", - Percentile::P99_9 => " 99.9 ", - Percentile::P99_99 => " 99.99", - Percentile::Max => " Max ", - } - } -} - /// Records basic statistics for a sample (a group) of requests #[derive(Serialize, Deserialize, Debug)] pub struct Sample { @@ -273,24 +198,16 @@ pub struct Sample { pub cycle_throughput: f32, pub req_throughput: f32, pub row_throughput: f32, - pub mean_cycle_time_ms: f32, - pub mean_resp_time_ms: f32, - - pub cycle_time_percentiles: [f32; Percentile::COUNT], - pub cycle_time_histograms_ns: SerializableHistogram, - - pub cycle_time_percentiles_by_fn: HashMap, - pub cycle_time_histograms_ns_by_fn: HashMap, - pub resp_time_percentiles: [f32; Percentile::COUNT], - pub resp_time_histogram_ns: SerializableHistogram, + pub cycle_latency: LatencyDistribution, + pub cycle_latency_by_fn: HashMap, + pub request_latency: LatencyDistribution, } impl Sample { pub fn new(base_start_time: Instant, stats: &[WorkloadStats]) -> Sample { assert!(!stats.is_empty()); - let create_histogram = || SerializableHistogram(Histogram::new(3).unwrap()); let mut cycle_count = 0; let mut cycle_error_count = 0; let mut request_count = 0; @@ -301,9 +218,9 @@ impl Sample { let mut mean_queue_len = 0.0; let mut duration_s = 0.0; - let mut resp_times_ns = create_histogram(); - let mut cycle_times_ns = create_histogram(); - let mut cycle_times_ns_per_fn = HashMap::new(); + let mut request_latency = LatencyDistributionRecorder::default(); + let mut cycle_latency = LatencyDistributionRecorder::default(); + let mut cycle_latency_per_fn = HashMap::::new(); for s in stats { let ss = &s.session_stats; @@ -316,27 +233,18 @@ impl Sample { req_retry_count += ss.req_retry_count; mean_queue_len += ss.mean_queue_length / stats.len() as f32; duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32; - resp_times_ns.0.add(&ss.resp_times_ns).unwrap(); + request_latency.add(&ss.resp_times_ns); for fs in &s.function_stats { cycle_count += fs.call_count; cycle_error_count = fs.error_count; - cycle_times_ns.0.add(&fs.call_times_ns).unwrap(); - cycle_times_ns_per_fn + cycle_latency.add(&fs.cycle_latency); + cycle_latency_per_fn .entry(fs.function.name.clone()) - .or_insert_with(create_histogram) - .0 - .add(&fs.call_times_ns) - .unwrap(); + .or_default() + .add(&fs.cycle_latency); } } - let resp_time_percentiles = percentiles_ms(&resp_times_ns.0); - let cycle_time_percentiles = percentiles_ms(&cycle_times_ns.0); - - let mut cycle_time_percentiles_per_fn = HashMap::new(); - for (fn_name, histogram) in &cycle_times_ns_per_fn { - cycle_time_percentiles_per_fn.insert(fn_name.to_owned(), percentiles_ms(&histogram.0)); - } Sample { time_s: (stats[0].start_time - base_start_time).as_secs_f32(), @@ -349,17 +257,18 @@ impl Sample { req_error_count, row_count, mean_queue_len: not_nan_f32(mean_queue_len).unwrap_or(0.0), + cycle_throughput: cycle_count as f32 / duration_s, req_throughput: request_count as f32 / duration_s, row_throughput: row_count as f32 / duration_s, - mean_cycle_time_ms: cycle_times_ns.0.mean() as f32 / 1000000.0, - mean_resp_time_ms: resp_times_ns.0.mean() as f32 / 1000000.0, - resp_time_percentiles, - resp_time_histogram_ns: resp_times_ns, - cycle_time_percentiles, - cycle_time_percentiles_by_fn: cycle_time_percentiles_per_fn, - cycle_time_histograms_ns: cycle_times_ns, - cycle_time_histograms_ns_by_fn: cycle_times_ns_per_fn, + + cycle_latency: cycle_latency.distribution(), + cycle_latency_by_fn: cycle_latency_per_fn + .into_iter() + .map(|(k, v)| (k, v.distribution())) + .collect(), + + request_latency: request_latency.distribution(), } } } @@ -381,10 +290,6 @@ impl Log { self.samples.last().unwrap() } - fn weights_by_call_count(&self) -> Vec { - self.samples.iter().map(|s| s.cycle_count as f32).collect() - } - fn weights_by_request_count(&self) -> Vec { self.samples .iter() @@ -410,38 +315,6 @@ impl Log { Mean::compute(t.as_slice(), w.as_slice()) } - fn resp_time_ms(&self) -> Mean { - let t: Vec = self.samples.iter().map(|s| s.mean_resp_time_ms).collect(); - let w = self.weights_by_request_count(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn resp_time_percentile(&self, p: Percentile) -> Mean { - let t: Vec = self - .samples - .iter() - .map(|s| s.resp_time_percentiles[p as usize]) - .collect(); - let w = self.weights_by_request_count(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn cycle_time_ms(&self) -> Mean { - let t: Vec = self.samples.iter().map(|s| s.mean_cycle_time_ms).collect(); - let w = self.weights_by_call_count(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn cycle_time_percentile(&self, p: Percentile) -> Mean { - let t: Vec = self - .samples - .iter() - .map(|s| s.cycle_time_percentiles[p as usize]) - .collect(); - let w = self.weights_by_call_count(); - Mean::compute(t.as_slice(), w.as_slice()) - } - fn mean_concurrency(&self) -> Mean { let p: Vec = self.samples.iter().map(|s| s.mean_queue_len).collect(); let w = self.weights_by_request_count(); @@ -458,21 +331,6 @@ impl Log { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct Bucket { - pub percentile: f64, - pub duration_ms: f64, - pub count: u64, - pub cumulative_count: u64, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct TimeDistribution { - pub mean: Mean, - pub percentiles: Vec, - pub distribution: Vec, -} - /// Stores the final statistics of the test run. #[derive(Serialize, Deserialize, Debug)] pub struct BenchmarkStats { @@ -495,8 +353,9 @@ pub struct BenchmarkStats { pub cycle_throughput_ratio: Option, pub req_throughput: Mean, pub row_throughput: Mean, - pub cycle_time_ms: TimeDistribution, - pub resp_time_ms: Option, + pub cycle_latency: LatencyDistribution, + pub cycle_latency_by_fn: HashMap, + pub request_latency: Option, pub concurrency: Mean, pub concurrency_ratio: f64, pub log: Vec, @@ -550,13 +409,13 @@ impl BenchmarkCmp<'_> { // Checks if mean response time of two benchmark runs are significantly different. // Returns None if the second benchmark is unset. pub fn cmp_mean_resp_time(&self) -> Option { - self.cmp(|s| s.resp_time_ms.as_ref().map(|r| r.mean)) + self.cmp(|s| s.request_latency.as_ref().map(|r| r.mean)) } // Checks corresponding response time percentiles of two benchmark runs // are statistically different. Returns None if the second benchmark is unset. pub fn cmp_resp_time_percentile(&self, p: Percentile) -> Option { - self.cmp(|s| s.resp_time_ms.as_ref().map(|r| r.percentiles[p as usize])) + self.cmp(|s| s.request_latency.as_ref().map(|r| r.percentiles.get(p))) } } @@ -578,8 +437,9 @@ pub struct Recorder { pub errors: HashSet, pub cycle_error_count: u64, pub row_count: u64, - pub cycle_times_ns: Histogram, - pub resp_times_ns: Histogram, + pub cycle_latency: LatencyDistributionRecorder, + pub cycle_latency_by_fn: HashMap, + pub request_latency: LatencyDistributionRecorder, log: Log, rate_limit: Option, concurrency_limit: NonZeroUsize, @@ -609,8 +469,9 @@ impl Recorder { row_count: 0, errors: HashSet::new(), cycle_error_count: 0, - cycle_times_ns: Histogram::new(3).unwrap(), - resp_times_ns: Histogram::new(3).unwrap(), + cycle_latency: LatencyDistributionRecorder::default(), + cycle_latency_by_fn: HashMap::new(), + request_latency: LatencyDistributionRecorder::default(), } } @@ -619,25 +480,27 @@ impl Recorder { pub fn record(&mut self, samples: &[WorkloadStats]) -> &Sample { assert!(!samples.is_empty()); for s in samples.iter() { - self.resp_times_ns - .add(&s.session_stats.resp_times_ns) - .unwrap(); + self.request_latency.add(&s.session_stats.resp_times_ns); for fs in &s.function_stats { - self.cycle_times_ns.add(&fs.call_times_ns).unwrap(); + self.cycle_latency.add(&fs.cycle_latency); + self.cycle_latency_by_fn + .entry(fs.function.name.clone()) + .or_default() + .add(&fs.cycle_latency); } } - let stats = Sample::new(self.start_instant, samples); - self.cycle_count += stats.cycle_count; - self.cycle_error_count += stats.cycle_error_count; - self.request_count += stats.request_count; - self.request_retry_count += stats.req_retry_count; - self.request_error_count += stats.req_error_count; - self.row_count += stats.row_count; + let sample = Sample::new(self.start_instant, samples); + self.cycle_count += sample.cycle_count; + self.cycle_error_count += sample.cycle_error_count; + self.request_count += sample.request_count; + self.request_retry_count += sample.req_retry_count; + self.request_error_count += sample.req_error_count; + self.row_count += sample.row_count; if self.errors.len() < MAX_KEPT_ERRORS { - self.errors.extend(stats.req_errors.iter().cloned()); + self.errors.extend(sample.req_errors.iter().cloned()); } - self.log.append(stats) + self.log.append(sample) } /// Stops the recording, computes the statistics and returns them as the new object. @@ -645,18 +508,14 @@ impl Recorder { self.end_time = SystemTime::now(); self.end_instant = Instant::now(); self.end_cpu_time = ProcessTime::now(); - self.stats() - } - /// Computes the final statistics based on collected data - /// and turn them into report that can be serialized - fn stats(self) -> BenchmarkStats { let elapsed_time_s = (self.end_instant - self.start_instant).as_secs_f64(); let cpu_time_s = self .end_cpu_time .duration_since(self.start_cpu_time) .as_secs_f64(); let cpu_util = 100.0 * cpu_time_s / elapsed_time_s / num_cpus::get() as f64; + let cycle_throughput = self.log.call_throughput(); let cycle_throughput_ratio = self.rate_limit.map(|r| 100.0 * cycle_throughput.value / r); let req_throughput = self.log.req_throughput(); @@ -664,13 +523,6 @@ impl Recorder { let concurrency = self.log.mean_concurrency(); let concurrency_ratio = 100.0 * concurrency.value / self.concurrency_limit.get() as f64; - let cycle_time_percentiles: Vec = Percentile::iter() - .map(|p| self.log.cycle_time_percentile(p)) - .collect(); - let resp_time_percentiles: Vec = Percentile::iter() - .map(|p| self.log.resp_time_percentile(p)) - .collect(); - BenchmarkStats { start_time: self.start_time.into(), end_time: self.end_time.into(), @@ -693,17 +545,14 @@ impl Recorder { cycle_throughput_ratio, req_throughput, row_throughput, - cycle_time_ms: TimeDistribution { - mean: self.log.cycle_time_ms(), - percentiles: cycle_time_percentiles, - distribution: distribution(&self.cycle_times_ns), - }, - resp_time_ms: if self.request_count > 0 { - Some(TimeDistribution { - mean: self.log.resp_time_ms(), - percentiles: resp_time_percentiles, - distribution: distribution(&self.resp_times_ns), - }) + cycle_latency: self.cycle_latency.distribution_with_errors(), + cycle_latency_by_fn: self + .cycle_latency_by_fn + .into_iter() + .map(|(k, v)| (k, v.distribution_with_errors())) + .collect(), + request_latency: if self.request_count > 0 { + Some(self.request_latency.distribution_with_errors()) } else { None }, diff --git a/src/workload.rs b/src/workload.rs index 95d20e2..2858899 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use hdrhistogram::Histogram; use rand::distributions::{Distribution, WeightedIndex}; use rand::rngs::SmallRng; use rand::{Rng, SeedableRng}; @@ -21,6 +20,7 @@ use serde::{Deserialize, Serialize}; use try_lock::TryLock; use crate::error::LatteError; +use crate::latency::LatencyDistributionRecorder; use crate::{context, CassError, CassErrorKind, Context, SessionStats}; /// Wraps a reference to Session that can be converted to a Rune `Value` @@ -376,7 +376,7 @@ pub struct FnStats { pub function: FnRef, pub call_count: u64, pub error_count: u64, - pub call_times_ns: Histogram, + pub cycle_latency: LatencyDistributionRecorder, } impl FnStats { @@ -385,29 +385,25 @@ impl FnStats { function, call_count: 0, error_count: 0, - call_times_ns: Histogram::new(3).unwrap(), + cycle_latency: LatencyDistributionRecorder::default(), } } pub fn reset(&mut self) { self.call_count = 0; self.error_count = 0; - self.call_times_ns.clear(); + self.cycle_latency.clear(); } pub fn operation_completed(&mut self, duration: Duration) { self.call_count += 1; - self.call_times_ns - .record(duration.as_nanos().clamp(1, u64::MAX as u128) as u64) - .unwrap(); + self.cycle_latency.record(duration) } pub fn operation_failed(&mut self, duration: Duration) { self.call_count += 1; self.error_count += 1; - self.call_times_ns - .record(duration.as_nanos().clamp(1, u64::MAX as u128) as u64) - .unwrap(); + self.cycle_latency.record(duration); } } From ea1a693d4eafc26de643b1a2f1b00f46a410a66d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Fri, 9 Aug 2024 21:16:41 +0200 Subject: [PATCH 3/3] Report latency distributions for each function separately --- src/report.rs | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/src/report.rs b/src/report.rs index 3c64c5b..74c5668 100644 --- a/src/report.rs +++ b/src/report.rs @@ -719,21 +719,30 @@ impl<'a> Display for BenchmarkCmp<'a> { Percentile::Max, ]; - writeln!(f)?; - writeln!(f, "{}", fmt_section_header("CYCLE LATENCY [ms]"))?; - if self.v2.is_some() { - writeln!(f, "{}", fmt_cmp_header(true))?; - } + for fn_name in self.v1.cycle_latency_by_fn.keys() { + writeln!(f)?; + writeln!( + f, + "{}", + fmt_section_header(format!("CYCLE LATENCY for {fn_name} [ms] ").as_str()) + )?; + if self.v2.is_some() { + writeln!(f, "{}", fmt_cmp_header(true))?; + } - for p in resp_time_percentiles.iter() { - let l = self - .line(p.name(), "", |s| { - let rt = s.request_latency.as_ref().map(|rt| rt.percentiles.get(*p)); - Quantity::from(rt).with_precision(3) - }) - .with_orientation(-1) - .with_significance(self.cmp_resp_time_percentile(*p)); - writeln!(f, "{l}")?; + for p in resp_time_percentiles.iter() { + let l = self + .line(p.name(), "", |s| { + let rt = s + .cycle_latency_by_fn + .get(fn_name) + .map(|l| l.percentiles.get(*p)); + Quantity::from(rt).with_precision(3) + }) + .with_orientation(-1) + .with_significance(self.cmp_resp_time_percentile(*p)); + writeln!(f, "{l}")?; + } } if self.v1.error_count > 0 {