From e2ff3d3060694e138191d7a02b2eeb06b75a1565 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Sat, 10 Aug 2024 17:24:17 +0200 Subject: [PATCH] Compute all statistics without using the log --- src/latency.rs | 24 +- src/main.rs | 3 +- src/stats.rs | 261 +++------------------- src/throughput.rs | 36 +++ src/{autocorrelation.rs => timeseries.rs} | 150 ++++++++----- src/workload.rs | 10 +- 6 files changed, 185 insertions(+), 299 deletions(-) create mode 100644 src/throughput.rs rename src/{autocorrelation.rs => timeseries.rs} (60%) diff --git a/src/latency.rs b/src/latency.rs index a4b18b0..fce1ce3 100644 --- a/src/latency.rs +++ b/src/latency.rs @@ -1,7 +1,7 @@ -use crate::autocorrelation::EffectiveSampleSizeEstimator; use crate::histogram::SerializableHistogram; use crate::percentiles::Percentiles; use crate::stats::Mean; +use crate::timeseries::TimeSeriesStats; use hdrhistogram::Histogram; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -18,7 +18,7 @@ pub struct LatencyDistribution { #[derive(Clone, Debug)] pub struct LatencyDistributionRecorder { histogram_ns: Histogram, - ess_estimator: EffectiveSampleSizeEstimator, + ess_estimator: TimeSeriesStats, } impl LatencyDistributionRecorder { @@ -26,7 +26,7 @@ impl LatencyDistributionRecorder { self.histogram_ns .record(time.as_nanos().clamp(1, u64::MAX as u128) as u64) .unwrap(); - self.ess_estimator.record(time.as_secs_f64()); + self.ess_estimator.record(time.as_secs_f64(), 1.0); } pub fn add(&mut self, other: &LatencyDistributionRecorder) { @@ -41,19 +41,33 @@ impl LatencyDistributionRecorder { pub fn distribution(&self) -> LatencyDistribution { LatencyDistribution { - mean: Mean::from(&self.histogram_ns, 1e-6, 1), + mean: self.mean(1), percentiles: Percentiles::compute(&self.histogram_ns, 1e-6), histogram: SerializableHistogram(self.histogram_ns.clone()), } } + pub fn distribution_with_errors(&self) -> LatencyDistribution { let ess = self.ess_estimator.effective_sample_size(); LatencyDistribution { - mean: Mean::from(&self.histogram_ns, 1e-6, ess), + mean: self.mean(ess), percentiles: Percentiles::compute_with_errors(&self.histogram_ns, 1e-6, ess), histogram: SerializableHistogram(self.histogram_ns.clone()), } } + + fn mean(&self, effective_n: u64) -> Mean { + let scale = 1e-6; + Mean { + n: effective_n, + value: self.histogram_ns.mean() * scale, + std_err: if effective_n > 1 { + Some(self.histogram_ns.stdev() * scale / (effective_n as f64 - 1.0).sqrt()) + } else { + None + }, + } + } } impl Default for LatencyDistributionRecorder { diff --git a/src/main.rs b/src/main.rs index 1325c57..fe7da57 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,7 +39,6 @@ use crate::stats::{BenchmarkCmp, BenchmarkStats, Recorder}; use crate::table::{Alignment, Table}; use crate::workload::{FnRef, Program, Workload, WorkloadStats, LOAD_FN}; -mod autocorrelation; mod chunks; mod config; mod context; @@ -54,6 +53,8 @@ mod progress; mod report; mod stats; mod table; +mod throughput; +mod timeseries; mod workload; const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/src/stats.rs b/src/stats.rs index 4ce94c3..7529030 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,102 +1,20 @@ use chrono::{DateTime, Local}; -use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; +use std::ops::Mul; use std::time::{Instant, SystemTime}; use crate::latency::{LatencyDistribution, LatencyDistributionRecorder}; use crate::percentiles::Percentile; +use crate::throughput::ThroughputMeter; +use crate::timeseries::TimeSeriesStats; use crate::workload::WorkloadStats; use cpu_time::ProcessTime; -use hdrhistogram::Histogram; use serde::{Deserialize, Serialize}; use statrs::distribution::{ContinuousCDF, StudentsT}; -/// Controls the maximum order of autocovariance taken into -/// account when estimating the long run mean error. Higher values make the estimator -/// capture more autocorrelation from the signal, but also make the results -/// more random. Lower values increase the bias (underestimation) of error, but offer smoother -/// results for small N and better performance for large N. -/// The value has been established empirically. -/// Probably anything between 0.2 and 0.8 is good enough. -/// Valid range is 0.0 to 1.0. -const BANDWIDTH_COEFF: f64 = 0.5; - -/// Arithmetic weighted mean of values in the vector -pub fn mean(values: &[f32], weights: &[f32]) -> f64 { - let sum_values = values - .iter() - .zip(weights) - .map(|(&v, &w)| (v as f64) * (w as f64)) - .sum::(); - let sum_weights = weights.iter().map(|&v| v as f64).sum::(); - sum_values / sum_weights -} - -/// Estimates the variance of the mean of a time-series. -/// Takes into account the fact that the observations can be dependent on each other -/// (i.e. there is a non-zero amount of auto-correlation in the signal). -/// -/// Contrary to the classic variance estimator, the order of the -/// data points does matter here. If the observations are totally independent from each other, -/// the expected return value of this function is close to the expected sample variance. -pub fn long_run_variance(mean: f64, values: &[f32], weights: &[f32]) -> f64 { - if values.len() <= 1 { - return f64::NAN; - } - let len = values.len() as f64; - - // Compute the variance: - let mut sum_weights = 0.0; - let mut var = 0.0; - for (&v, &w) in values.iter().zip(weights) { - let diff = v as f64 - mean; - let w = w as f64; - var += diff * diff * w; - sum_weights += w; - } - var /= sum_weights; - - // Compute a sum of autocovariances of orders 1 to (cutoff - 1). - // Cutoff (bandwidth) and diminishing weights are needed to reduce random error - // introduced by higher order autocovariance estimates. - let bandwidth = len.powf(BANDWIDTH_COEFF); - let max_lag = min(values.len(), bandwidth.ceil() as usize); - let mut cov_sum = 0.0; - for lag in 1..max_lag { - let weight = 1.0 - lag as f64 / values.len() as f64; - let mut cov = 0.0; - let mut sum_weights = 0.0; - for i in lag..values.len() { - let diff_1 = values[i] as f64 - mean; - let diff_2 = values[i - lag] as f64 - mean; - let w = weights[i] as f64 * weights[i - lag] as f64; - sum_weights += w; - cov += 2.0 * diff_1 * diff_2 * weight * w; - } - cov_sum += cov / sum_weights; - } - - // It is possible that we end up with a negative sum of autocovariances here. - // But we don't want that because we're trying to estimate - // the worst-case error and for small N this situation is likely a random coincidence. - // Additionally, `var + cov` must be at least 0.0. - cov_sum = cov_sum.max(0.0); - - // Correct bias for small n: - let inflation = 1.0 + cov_sum / (var + f64::MIN_POSITIVE); - let bias_correction = (inflation / len).exp(); - bias_correction * (var + cov_sum) -} - -/// Estimates the error of the mean of a time-series. -/// See `long_run_variance`. -pub fn long_run_err(mean: f64, values: &[f32], weights: &[f32]) -> f64 { - (long_run_variance(mean, values, weights) / values.len() as f64).sqrt() -} - /// Holds a mean and its error together. -/// Makes it more convenient to compare means and it also reduces the number +/// Makes it more convenient to compare means, and it also reduces the number /// of fields, because we don't have to keep the values and the errors in separate fields. #[derive(Debug, Copy, Clone, Serialize, Deserialize)] pub struct Mean { @@ -105,25 +23,14 @@ pub struct Mean { pub std_err: Option, } -impl Mean { - pub fn compute(v: &[f32], weights: &[f32]) -> Self { - let m = mean(v, weights); - Mean { - n: v.len() as u64, - value: m, - std_err: not_nan(long_run_err(m, v, weights)), - } - } +impl Mul for Mean { + type Output = Mean; - pub fn from(h: &Histogram, scale: f64, effective_n: u64) -> Mean { + fn mul(self, rhs: f64) -> Self::Output { Mean { - n: effective_n, - value: h.mean() * scale, - std_err: if effective_n > 1 { - Some(h.stdev() * scale / (effective_n as f64 - 1.0).sqrt()) - } else { - None - }, + n: self.n, + value: self.value * rhs, + std_err: self.std_err.map(|e| e * rhs), } } } @@ -238,11 +145,11 @@ impl Sample { for fs in &s.function_stats { cycle_count += fs.call_count; cycle_error_count = fs.error_count; - cycle_latency.add(&fs.cycle_latency); + cycle_latency.add(&fs.call_latency); cycle_latency_per_fn .entry(fs.function.name.clone()) .or_default() - .add(&fs.cycle_latency); + .add(&fs.call_latency); } } @@ -273,64 +180,6 @@ impl Sample { } } -/// Collects the samples and computes aggregate statistics -struct Log { - samples: Vec, -} - -impl Log { - fn new() -> Log { - Log { - samples: Vec::new(), - } - } - - fn append(&mut self, sample: Sample) -> &Sample { - self.samples.push(sample); - self.samples.last().unwrap() - } - - fn weights_by_request_count(&self) -> Vec { - self.samples - .iter() - .map(|s| s.request_count as f32) - .collect() - } - - fn call_throughput(&self) -> Mean { - let t: Vec = self.samples.iter().map(|s| s.cycle_throughput).collect(); - let w: Vec = self.samples.iter().map(|s| s.duration_s).collect(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn req_throughput(&self) -> Mean { - let t: Vec = self.samples.iter().map(|s| s.req_throughput).collect(); - let w: Vec = self.samples.iter().map(|s| s.duration_s).collect(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn row_throughput(&self) -> Mean { - let t: Vec = self.samples.iter().map(|s| s.row_throughput).collect(); - let w: Vec = self.samples.iter().map(|s| s.duration_s).collect(); - Mean::compute(t.as_slice(), w.as_slice()) - } - - fn mean_concurrency(&self) -> Mean { - let p: Vec = self.samples.iter().map(|s| s.mean_queue_len).collect(); - let w = self.weights_by_request_count(); - let m = Mean::compute(p.as_slice(), w.as_slice()); - if m.value.is_nan() { - Mean { - n: 0, - value: 0.0, - std_err: None, - } - } else { - m - } - } -} - /// Stores the final statistics of the test run. #[derive(Serialize, Deserialize, Debug)] pub struct BenchmarkStats { @@ -434,13 +283,15 @@ pub struct Recorder { pub request_count: u64, pub request_retry_count: u64, pub request_error_count: u64, + pub throughput_meter: ThroughputMeter, pub errors: HashSet, pub cycle_error_count: u64, pub row_count: u64, pub cycle_latency: LatencyDistributionRecorder, pub cycle_latency_by_fn: HashMap, pub request_latency: LatencyDistributionRecorder, - log: Log, + pub concurrency_meter: TimeSeriesStats, + log: Vec, rate_limit: Option, concurrency_limit: NonZeroUsize, } @@ -459,7 +310,7 @@ impl Recorder { end_instant: start_instant, start_cpu_time: ProcessTime::now(), end_cpu_time: ProcessTime::now(), - log: Log::new(), + log: Vec::new(), rate_limit, concurrency_limit, cycle_count: 0, @@ -472,6 +323,8 @@ impl Recorder { cycle_latency: LatencyDistributionRecorder::default(), cycle_latency_by_fn: HashMap::new(), request_latency: LatencyDistributionRecorder::default(), + throughput_meter: ThroughputMeter::default(), + concurrency_meter: TimeSeriesStats::default(), } } @@ -483,11 +336,11 @@ impl Recorder { self.request_latency.add(&s.session_stats.resp_times_ns); for fs in &s.function_stats { - self.cycle_latency.add(&fs.cycle_latency); + self.cycle_latency.add(&fs.call_latency); self.cycle_latency_by_fn .entry(fs.function.name.clone()) .or_default() - .add(&fs.cycle_latency); + .add(&fs.call_latency); } } let sample = Sample::new(self.start_instant, samples); @@ -497,10 +350,14 @@ impl Recorder { self.request_retry_count += sample.req_retry_count; self.request_error_count += sample.req_error_count; self.row_count += sample.row_count; + self.throughput_meter.record(sample.cycle_count); + self.concurrency_meter + .record(sample.mean_queue_len as f64, sample.duration_s as f64); if self.errors.len() < MAX_KEPT_ERRORS { self.errors.extend(sample.req_errors.iter().cloned()); } - self.log.append(sample) + self.log.push(sample); + self.log.last().unwrap() } /// Stops the recording, computes the statistics and returns them as the new object. @@ -516,11 +373,12 @@ impl Recorder { .as_secs_f64(); let cpu_util = 100.0 * cpu_time_s / elapsed_time_s / num_cpus::get() as f64; - let cycle_throughput = self.log.call_throughput(); + let cycle_throughput = self.throughput_meter.throughput(); let cycle_throughput_ratio = self.rate_limit.map(|r| 100.0 * cycle_throughput.value / r); - let req_throughput = self.log.req_throughput(); - let row_throughput = self.log.row_throughput(); - let concurrency = self.log.mean_concurrency(); + let req_throughput = + cycle_throughput * (self.request_count as f64 / self.cycle_count as f64); + let row_throughput = cycle_throughput * (self.row_count as f64 / self.cycle_count as f64); + let concurrency = self.concurrency_meter.mean(); let concurrency_ratio = 100.0 * concurrency.value / self.concurrency_limit.get() as f64; BenchmarkStats { @@ -558,72 +416,15 @@ impl Recorder { }, concurrency, concurrency_ratio, - log: self.log.samples, + log: self.log, } } } #[cfg(test)] mod test { - use rand::distributions::Distribution; - use rand::prelude::StdRng; - use rand::SeedableRng; - use statrs::distribution::Normal; - use statrs::statistics::Statistics; - use crate::stats::{t_test, Mean}; - /// Returns a random sample of size `len`. - /// All data points i.i.d with N(`mean`, `std_dev`). - fn random_vector(seed: usize, len: usize, mean: f64, std_dev: f64) -> Vec { - let mut rng = StdRng::seed_from_u64(seed as u64); - let distrib = Normal::new(mean, std_dev).unwrap(); - (0..len).map(|_| distrib.sample(&mut rng) as f32).collect() - } - - /// Introduces a strong dependency between the observations, - /// making it an AR(1) process - fn make_autocorrelated(v: &mut [f32]) { - for i in 1..v.len() { - v[i] = 0.01 * v[i] + 0.99 * v[i - 1]; - } - } - - /// Traditional standard error assuming i.i.d variables - fn reference_err(v: &[f32]) -> f64 { - v.iter().map(|x| *x as f64).std_dev() / (v.len() as f64).sqrt() - } - - #[test] - fn mean_err_no_auto_correlation() { - let run_len = 10000; - let mean = 1.0; - let std_dev = 1.0; - let weights = [1.0; 10000]; - for i in 0..10 { - let v = random_vector(i, run_len, mean, std_dev); - let err = super::long_run_err(mean, &v, &weights); - let ref_err = reference_err(&v); - assert!(err > 0.99 * ref_err); - assert!(err < 1.2 * ref_err); - } - } - - #[test] - fn mean_err_with_auto_correlation() { - let run_len = 10000; - let mean = 1.0; - let std_dev = 1.0; - let weights = [1.0; 10000]; - for i in 0..10 { - let mut v = random_vector(i, run_len, mean, std_dev); - make_autocorrelated(&mut v); - let mean_err = super::long_run_err(mean, &v, &weights); - let ref_err = reference_err(&v); - assert!(mean_err > 6.0 * ref_err); - } - } - #[test] fn t_test_same() { let mean1 = Mean { diff --git a/src/throughput.rs b/src/throughput.rs new file mode 100644 index 0000000..e819679 --- /dev/null +++ b/src/throughput.rs @@ -0,0 +1,36 @@ +use crate::stats::Mean; +use crate::timeseries::TimeSeriesStats; +use std::time::Instant; + +pub struct ThroughputMeter { + last_record_time: Instant, + count: u64, + stats: TimeSeriesStats, +} + +impl Default for ThroughputMeter { + fn default() -> Self { + let now = Instant::now(); + Self { + last_record_time: now, + count: 0, + stats: TimeSeriesStats::default(), + } + } +} + +impl ThroughputMeter { + pub fn record(&mut self, count: u64) { + let now = Instant::now(); + let duration = now.duration_since(self.last_record_time).as_secs_f64(); + let throughput = count as f64 / duration; + self.count += count; + self.stats.record(throughput, duration); + self.last_record_time = now; + } + + /// Returns mean throughput in events per second + pub fn throughput(&self) -> Mean { + self.stats.mean() + } +} diff --git a/src/autocorrelation.rs b/src/timeseries.rs similarity index 60% rename from src/autocorrelation.rs rename to src/timeseries.rs index 9155b9a..2abbddf 100644 --- a/src/autocorrelation.rs +++ b/src/timeseries.rs @@ -1,7 +1,8 @@ +use crate::stats::Mean; use more_asserts::assert_le; use rand_distr::num_traits::Pow; -/// Estimates the effective size of the sample, by taking account for +/// Estimates the mean and effective size of the sample, by taking account for /// autocorrelation between measurements. /// /// In most statistical operations we assume measurements to be independent of each other. @@ -14,7 +15,7 @@ use rand_distr::num_traits::Pow; /// the full covariance matrix, but approximates it by pre-merging data points. /// However, it is fairly fast (O(n log log n) and works in O(log n) memory incrementally. #[derive(Clone, Debug, Default)] -pub struct EffectiveSampleSizeEstimator { +pub struct TimeSeriesStats { n: u64, levels: Vec, } @@ -22,20 +23,20 @@ pub struct EffectiveSampleSizeEstimator { #[derive(Clone, Debug)] struct Level { level: usize, - buf: Vec, - variance: VarianceEstimator, + buf: Vec<(f64, f64)>, + stats: Stats, } /// Estimates the effective sample size by using batch means method. -impl EffectiveSampleSizeEstimator { +impl TimeSeriesStats { /// Adds a single data point - pub fn record(&mut self, x: f64) { + pub fn record(&mut self, x: f64, weight: f64) { self.n += 1; - self.insert(x, 0); + self.insert(x, weight, 0); } /// Merges another estimator data into this one - pub fn add(&mut self, other: &EffectiveSampleSizeEstimator) { + pub fn add(&mut self, other: &TimeSeriesStats) { self.n += other.n; for level in &other.levels { self.add_level(level); @@ -47,20 +48,37 @@ impl EffectiveSampleSizeEstimator { self.levels.clear(); } - fn insert(&mut self, x: f64, level: usize) { + fn insert(&mut self, x: f64, weight: f64, level: usize) { if self.levels.len() == level { self.levels.push(Level::new(level)); } - if let Some(carry) = self.levels[level].record(x) { - self.insert(carry, level + 1); + if let Some((x, w)) = self.levels[level].record(x, weight) { + self.insert(x, w, level + 1); } } fn add_level(&mut self, level: &Level) { if self.levels.len() == level.level { self.levels.push(level.clone()); - } else if let Some(carry) = self.levels[level.level].add(level) { - self.insert(carry, level.level + 1); + } else if let Some((x, w)) = self.levels[level.level].add(level) { + self.insert(x, w, level.level + 1); + } + } + + pub fn mean(&self) -> Mean { + let n = self.effective_sample_size(); + Mean { + n, + value: if n == 0 { + f64::NAN + } else { + self.levels[0].stats.mean() + }, + std_err: if self.n <= 1 { + None + } else { + Some(self.levels[0].stats.variance().sqrt() / (n as f64).sqrt()) + }, } } @@ -77,14 +95,14 @@ impl EffectiveSampleSizeEstimator { // - the batch size must be greater than the autocorrelation time // - the number of batches should be also large enough for the variance // of the mean be accurate - let sample_variance = self.levels[0].variance.value(); + let sample_variance = self.levels[0].stats.variance(); let autocorrelation_time = self .levels .iter() .map(|l| { ( l.batch_len(), - l.batch_len() as f64 * l.variance.value() / sample_variance, + l.batch_len() as f64 * l.stats.variance() / sample_variance, ) }) .take_while(|(batch_len, time)| *time > 0.2 * *batch_len as f64) @@ -101,7 +119,7 @@ impl Level { Level { level, buf: Vec::with_capacity(2), - variance: Default::default(), + stats: Default::default(), } } @@ -109,18 +127,18 @@ impl Level { 1 << self.level } - fn record(&mut self, value: f64) -> Option { - self.variance.record(value); - self.buf.push(value); + fn record(&mut self, value: f64, weight: f64) -> Option<(f64, f64)> { + self.stats.record(value, weight); + self.buf.push((value, weight)); self.merge() } - fn add(&mut self, other: &Level) -> Option { + fn add(&mut self, other: &Level) -> Option<(f64, f64)> { assert_eq!(self.level, other.level); - self.variance.add(&other.variance); + self.stats.add(&other.stats); // If there was more than 1 item recorded by the other level, then we must // drop our queued item, because it is not a neighbour of the other item - if other.variance.n > 1 { + if other.stats.n > 1 { self.buf.clear(); } self.buf.extend(&other.buf); @@ -128,11 +146,14 @@ impl Level { self.merge() } - fn merge(&mut self) -> Option { + fn merge(&mut self) -> Option<(f64, f64)> { if self.buf.len() == 2 { - let merged = (self.buf[0] + self.buf[1]) / 2.0; + let (x1, w1) = self.buf[0]; + let (x2, w2) = self.buf[1]; + let merged_w = w1 + w2; + let merged_x = (x1 * w1 + x2 * w2) / merged_w; self.buf.clear(); - Some(merged) + Some((merged_x, merged_w)) } else { None } @@ -140,47 +161,60 @@ impl Level { } #[derive(Clone, Debug, Default)] -struct VarianceEstimator { +struct Stats { mean: f64, var: f64, + total_weight: f64, n: u64, } -/// Incrementally estimates covariance of two random variables X and Y. +/// Incrementally estimates basic statistics such as mean and variance over a weighted set of data. /// Uses Welford's online algorithm. -impl VarianceEstimator { - pub fn record(&mut self, x: f64) { +impl Stats { + pub fn record(&mut self, x: f64, weight: f64) { + assert!(weight > 0.0, "weight must be greater than 0.0"); self.n += 1; + self.total_weight += weight; let delta1 = x - self.mean; - self.mean += delta1 / self.n as f64; + self.mean += weight * delta1 / self.total_weight; let delta2 = x - self.mean; - self.var += delta1 * delta2; + self.var += weight * delta1 * delta2; } - pub fn add(&mut self, other: &VarianceEstimator) { - let n1 = self.n as f64; - let n2 = other.n as f64; + pub fn add(&mut self, other: &Stats) { + let w1 = self.total_weight; + let w2 = other.total_weight; let m1 = self.mean; let m2 = other.mean; - let new_mean = (m1 * n1 + m2 * n2) / (n1 + n2); + let new_mean = (m1 * w1 + m2 * w2) / (w1 + w2); self.n += other.n; self.mean = new_mean; - self.var = self.var + other.var + (m1 - new_mean).pow(2) * n1 + (m2 - new_mean).pow(2) * n2; + self.var = self.var + other.var + (m1 - new_mean).pow(2) * w1 + (m2 - new_mean).pow(2) * w2; + self.total_weight = w1 + w2; + } + + pub fn mean(&self) -> f64 { + if self.total_weight == 0.0 { + f64::NAN + } else { + self.mean + } } - pub fn value(&self) -> f64 { + pub fn variance(&self) -> f64 { if self.n <= 1 { f64::NAN } else { - self.var / (self.n - 1) as f64 + let n = self.n as f64; + self.var / self.total_weight * n / (n - 1.0) } } } #[cfg(test)] mod test { - use crate::autocorrelation::{EffectiveSampleSizeEstimator, VarianceEstimator}; + use crate::timeseries::{Stats, TimeSeriesStats}; use assert_approx_eq::assert_approx_eq; use more_asserts::{assert_gt, assert_le}; use rand::rngs::SmallRng; @@ -190,10 +224,10 @@ mod test { #[test] fn test_random() { let mut rng = SmallRng::seed_from_u64(4); - let mut estimator = EffectiveSampleSizeEstimator::default(); + let mut estimator = TimeSeriesStats::default(); const N: u64 = 1000; for _ in 0..N { - estimator.record(rng.gen()); + estimator.record(rng.gen(), 1.0); } assert_gt!(estimator.effective_sample_size(), N / 2); assert_le!(estimator.effective_sample_size(), N); @@ -211,11 +245,11 @@ mod test { #[case(10, 10000)] fn test_correlated(#[case] n: u64, #[case] cluster_size: usize) { let mut rng = SmallRng::seed_from_u64(1); - let mut estimator = EffectiveSampleSizeEstimator::default(); + let mut estimator = TimeSeriesStats::default(); for _ in 0..n { let v = rng.gen(); for _ in 0..cluster_size { - estimator.record(v); + estimator.record(v, 1.0); } } assert_gt!(estimator.effective_sample_size(), n / 2); @@ -229,20 +263,20 @@ mod test { let data: Vec<_> = (0..COUNT) .map(|i| 0.001 * i as f64 + rng.gen::()) .collect(); - let mut est = VarianceEstimator::default(); - data.iter().for_each(|x| est.record(*x)); + let mut est = Stats::default(); + data.iter().for_each(|x| est.record(*x, 1.0)); let (sub1, sub2) = data.split_at(COUNT / 3); - let mut sub_est1 = VarianceEstimator::default(); - let mut sub_est2 = VarianceEstimator::default(); - sub1.iter().for_each(|x| sub_est1.record(*x)); - sub2.iter().for_each(|x| sub_est2.record(*x)); + let mut sub_est1 = Stats::default(); + let mut sub_est2 = Stats::default(); + sub1.iter().for_each(|x| sub_est1.record(*x, 1.0)); + sub2.iter().for_each(|x| sub_est2.record(*x, 1.0)); - let mut est2 = VarianceEstimator::default(); + let mut est2 = Stats::default(); est2.add(&sub_est1); est2.add(&sub_est2); - assert_approx_eq!(est.value(), est2.value(), 0.00001); + assert_approx_eq!(est.variance(), est2.variance(), 0.00001); } #[test] @@ -253,16 +287,16 @@ mod test { data.extend((0..COUNT / 2).map(|_| rng.gen::())); data.extend((0..COUNT / 2).map(|_| rng.gen::() + 0.2)); - let mut est = EffectiveSampleSizeEstimator::default(); - data.iter().for_each(|x| est.record(*x)); + let mut est = TimeSeriesStats::default(); + data.iter().for_each(|x| est.record(*x, 1.0)); let (sub1, sub2) = data.split_at(COUNT / 3); - let mut sub_est1 = EffectiveSampleSizeEstimator::default(); - let mut sub_est2 = EffectiveSampleSizeEstimator::default(); - sub1.iter().for_each(|x| sub_est1.record(*x)); - sub2.iter().for_each(|x| sub_est2.record(*x)); + let mut sub_est1 = TimeSeriesStats::default(); + let mut sub_est2 = TimeSeriesStats::default(); + sub1.iter().for_each(|x| sub_est1.record(*x, 1.0)); + sub2.iter().for_each(|x| sub_est2.record(*x, 1.0)); - let mut est2 = EffectiveSampleSizeEstimator::default(); + let mut est2 = TimeSeriesStats::default(); est2.add(&sub_est1); est2.add(&sub_est2); diff --git a/src/workload.rs b/src/workload.rs index 2858899..6f4b434 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -376,7 +376,7 @@ pub struct FnStats { pub function: FnRef, pub call_count: u64, pub error_count: u64, - pub cycle_latency: LatencyDistributionRecorder, + pub call_latency: LatencyDistributionRecorder, } impl FnStats { @@ -385,25 +385,25 @@ impl FnStats { function, call_count: 0, error_count: 0, - cycle_latency: LatencyDistributionRecorder::default(), + call_latency: LatencyDistributionRecorder::default(), } } pub fn reset(&mut self) { self.call_count = 0; self.error_count = 0; - self.cycle_latency.clear(); + self.call_latency.clear(); } pub fn operation_completed(&mut self, duration: Duration) { self.call_count += 1; - self.cycle_latency.record(duration) + self.call_latency.record(duration) } pub fn operation_failed(&mut self, duration: Duration) { self.call_count += 1; self.error_count += 1; - self.cycle_latency.record(duration); + self.call_latency.record(duration); } }