diff --git a/src/main.rs b/src/main.rs index 54efa56..631a204 100644 --- a/src/main.rs +++ b/src/main.rs @@ -468,7 +468,7 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> { let interval_start_time = Duration::from_millis((sample.time_s * 1000.0) as u64); let interval_duration = Duration::from_millis((sample.duration_s * 1000.0) as u64); log_writer.write_histogram( - &sample.cycle_time_histogram_ns.0, + &sample.cycle_time_histograms_ns.0, interval_start_time, interval_duration, Tag::new(format!("{tag_prefix}cycles").as_str()), diff --git a/src/stats.rs b/src/stats.rs index f01d13a..ed22f83 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Local}; use std::cmp::min; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::time::{Instant, SystemTime}; @@ -276,19 +276,24 @@ pub struct Sample { pub row_throughput: f32, pub mean_cycle_time_ms: f32, pub mean_resp_time_ms: f32, + pub cycle_time_percentiles: [f32; Percentile::COUNT], + pub cycle_time_histograms_ns: SerializableHistogram, + + pub cycle_time_percentiles_by_fn: HashMap, + pub cycle_time_histograms_ns_by_fn: HashMap, + pub resp_time_percentiles: [f32; Percentile::COUNT], - pub cycle_time_histogram_ns: SerializableHistogram, pub resp_time_histogram_ns: SerializableHistogram, } impl Sample { pub fn new(base_start_time: Instant, stats: &[WorkloadStats]) -> Sample { assert!(!stats.is_empty()); + + let create_histogram = || SerializableHistogram(Histogram::new(3).unwrap()); let mut cycle_count = 0; let mut cycle_error_count = 0; - let mut cycle_times_ns = Histogram::new(3).unwrap(); - let mut request_count = 0; let mut req_retry_errors = HashSet::new(); let mut req_retry_count = 0; @@ -297,14 +302,13 @@ impl Sample { let mut req_error_count = 0; let mut mean_queue_len = 0.0; let mut duration_s = 0.0; - let mut resp_times_ns = Histogram::new(3).unwrap(); - let mut cycle_time_histogram_ns = Histogram::new(3).unwrap(); - let mut resp_time_histogram_ns = Histogram::new(3).unwrap(); + let mut resp_times_ns = create_histogram(); + let mut cycle_times_ns = create_histogram(); + let mut cycle_times_ns_per_fn = HashMap::new(); for s in stats { let ss = &s.session_stats; - let fs = &s.function_stats; request_count += ss.req_count; row_count += ss.row_count; if errors.len() < MAX_KEPT_ERRORS { @@ -315,16 +319,27 @@ impl Sample { req_retry_count += ss.req_retry_count; mean_queue_len += ss.mean_queue_length / stats.len() as f32; duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32; - resp_times_ns.add(&ss.resp_times_ns).unwrap(); - resp_time_histogram_ns.add(&ss.resp_times_ns).unwrap(); + resp_times_ns.0.add(&ss.resp_times_ns).unwrap(); + + for fs in &s.function_stats { + cycle_count += fs.call_count; + cycle_error_count = fs.error_count; + cycle_times_ns.0.add(&fs.call_times_ns).unwrap(); + cycle_times_ns_per_fn + .entry(fs.function.name.clone()) + .or_insert_with(create_histogram) + .0 + .add(&fs.call_times_ns) + .unwrap(); + } + } + let resp_time_percentiles = percentiles_ms(&resp_times_ns.0); + let cycle_time_percentiles = percentiles_ms(&cycle_times_ns.0); - cycle_count += fs.call_count; - cycle_error_count = fs.error_count; - cycle_times_ns.add(&fs.call_times_ns).unwrap(); - cycle_time_histogram_ns.add(&fs.call_times_ns).unwrap(); + let mut cycle_time_percentiles_per_fn = HashMap::new(); + for (fn_name, histogram) in &cycle_times_ns_per_fn { + cycle_time_percentiles_per_fn.insert(fn_name.to_owned(), percentiles_ms(&histogram.0)); } - let resp_time_percentiles = percentiles_ms(&resp_times_ns); - let call_time_percentiles = percentiles_ms(&cycle_times_ns); Sample { time_s: (stats[0].start_time - base_start_time).as_secs_f32(), @@ -341,12 +356,14 @@ impl Sample { cycle_throughput: cycle_count as f32 / duration_s, req_throughput: request_count as f32 / duration_s, row_throughput: row_count as f32 / duration_s, - mean_cycle_time_ms: cycle_times_ns.mean() as f32 / 1000000.0, - cycle_time_histogram_ns: SerializableHistogram(cycle_time_histogram_ns), - cycle_time_percentiles: call_time_percentiles, - mean_resp_time_ms: resp_times_ns.mean() as f32 / 1000000.0, + mean_cycle_time_ms: cycle_times_ns.0.mean() as f32 / 1000000.0, + mean_resp_time_ms: resp_times_ns.0.mean() as f32 / 1000000.0, resp_time_percentiles, - resp_time_histogram_ns: SerializableHistogram(resp_time_histogram_ns), + resp_time_histogram_ns: resp_times_ns, + cycle_time_percentiles, + cycle_time_percentiles_by_fn: cycle_time_percentiles_per_fn, + cycle_time_histograms_ns: cycle_times_ns, + cycle_time_histograms_ns_by_fn: cycle_times_ns_per_fn, } } } @@ -619,9 +636,10 @@ impl Recorder { self.resp_times_ns .add(&s.session_stats.resp_times_ns) .unwrap(); - self.cycle_times_ns - .add(&s.function_stats.call_times_ns) - .unwrap(); + + for fs in &s.function_stats { + self.cycle_times_ns.add(&fs.call_times_ns).unwrap(); + } } let stats = Sample::new(self.start_instant, samples); self.cycle_count += stats.cycle_count; diff --git a/src/workload.rs b/src/workload.rs index 080e9d6..dded7c2 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -1,5 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; +use std::hash::{Hash, Hasher}; +use std::mem; use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -15,6 +17,7 @@ use rune::compile::{CompileVisitor, MetaError, MetaRef}; use rune::runtime::{AnyObj, Args, RuntimeContext, Shared, VmError, VmResult}; use rune::termcolor::{ColorChoice, StandardStream}; use rune::{vm_try, Any, Diagnostics, Module, Source, Sources, ToValue, Unit, Value, Vm}; +use serde::{Deserialize, Serialize}; use try_lock::TryLock; use crate::error::LatteError; @@ -69,10 +72,16 @@ impl<'a> ToValue for ContextRefMut<'a> { /// Stores the name and hash together. /// Name is used for message formatting, hash is used for fast function lookup. -#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct FnRef { - name: String, - hash: rune::Hash, + pub name: String, + pub hash: rune::Hash, +} + +impl Hash for FnRef { + fn hash(&self, state: &mut H) { + self.hash.hash(state); + } } impl FnRef { @@ -394,12 +403,28 @@ impl CompileVisitor for ProgramMetadata { /// Tracks statistics of the Rune function invoked by the workload #[derive(Clone, Debug)] pub struct FnStats { + pub function: FnRef, pub call_count: u64, pub error_count: u64, pub call_times_ns: Histogram, } impl FnStats { + pub fn new(function: FnRef) -> FnStats { + FnStats { + function, + call_count: 0, + error_count: 0, + call_times_ns: Histogram::new(3).unwrap(), + } + } + + pub fn reset(&mut self) { + self.call_count = 0; + self.error_count = 0; + self.call_times_ns.clear(); + } + pub fn operation_completed(&mut self, duration: Duration) { self.call_count += 1; self.call_times_ns @@ -416,53 +441,85 @@ impl FnStats { } } -impl Default for FnStats { - fn default() -> Self { - FnStats { - call_count: 0, - error_count: 0, - call_times_ns: Histogram::new(3).unwrap(), - } - } -} - /// Statistics of operations (function calls) and Cassandra requests. pub struct WorkloadStats { pub start_time: Instant, pub end_time: Instant, - pub function_stats: FnStats, + pub function_stats: Vec, pub session_stats: SessionStats, } /// Mutable part of Workload -pub struct WorkloadState { +pub struct FnStatsCollector { start_time: Instant, - fn_stats: FnStats, + fn_stats: Vec, } -impl Default for WorkloadState { - fn default() -> Self { - WorkloadState { +impl FnStatsCollector { + pub fn new(functions: impl IntoIterator) -> FnStatsCollector { + let mut fn_stats = Vec::new(); + for f in functions { + fn_stats.push(FnStats::new(f)); + } + FnStatsCollector { start_time: Instant::now(), - fn_stats: Default::default(), + fn_stats, } } + + pub fn functions(&self) -> impl Iterator + '_ { + self.fn_stats.iter().map(|f| f.function.clone()) + } + + /// Records the duration of a successful operation + pub fn operation_completed(&mut self, function: &FnRef, duration: Duration) { + self.fn_stats_mut(function).operation_completed(duration); + } + + /// Records the duration of a failed operation + pub fn operation_failed(&mut self, function: &FnRef, duration: Duration) { + self.fn_stats_mut(function).operation_failed(duration); + } + + /// Finds the stats for given function. + /// The function must exist! Otherwise, it will panic. + fn fn_stats_mut(&mut self, function: &FnRef) -> &mut FnStats { + self.fn_stats + .iter_mut() + .find(|f| f.function.hash == function.hash) + .unwrap() + } + + /// Clears any collected stats and sets the start time + pub fn reset(&mut self, start_time: Instant) { + self.fn_stats.iter_mut().for_each(FnStats::reset); + self.start_time = start_time; + } + + /// Returns the collected stats and resets this object + pub fn take(&mut self, end_time: Instant) -> FnStatsCollector { + let mut state = FnStatsCollector::new(self.functions()); + state.start_time = end_time; + mem::swap(self, &mut state); + state + } } pub struct Workload { context: Context, program: Program, router: FunctionRouter, - state: TryLock, + state: TryLock, } impl Workload { pub fn new(context: Context, program: Program, functions: &[(FnRef, f64)]) -> Workload { + let state = FnStatsCollector::new(functions.iter().map(|x| x.0.clone())); Workload { context, program, router: FunctionRouter::new(functions), - state: TryLock::new(WorkloadState::default()), + state: TryLock::new(state), } } @@ -472,7 +529,9 @@ impl Workload { // make a deep copy to avoid congestion on Arc ref counts used heavily by Rune program: self.program.unshare(), router: self.router.clone(), - state: TryLock::new(WorkloadState::default()), + state: TryLock::new(FnStatsCollector::new( + self.state.try_lock().unwrap().functions(), + )), }) } @@ -484,26 +543,24 @@ impl Workload { let start_time = Instant::now(); let mut rng = SmallRng::seed_from_u64(cycle as u64); let context = SessionRef::new(&self.context); - let result = self - .program - .async_call(self.router.select(&mut rng), (context, cycle)) - .await; + let function = self.router.select(&mut rng); + let result = self.program.async_call(function, (context, cycle)).await; let end_time = Instant::now(); let mut state = self.state.try_lock().unwrap(); let duration = end_time - start_time; match result { Ok(_) => { - state.fn_stats.operation_completed(duration); + state.operation_completed(function, duration); Ok((cycle, end_time)) } Err(LatteError::Cassandra(CassError(CassErrorKind::Overloaded(_, _)))) => { // don't stop on overload errors; // they are being counted by the context stats anyways - state.fn_stats.operation_failed(duration); + state.operation_failed(function, duration); Ok((cycle, end_time)) } Err(e) => { - state.fn_stats.operation_failed(duration); + state.operation_failed(function, duration); Err(e) } } @@ -519,24 +576,20 @@ impl Workload { /// Needed for producing `WorkloadStats` with /// recorded start and end times of measurement. pub fn reset(&self, start_time: Instant) { - let mut state = self.state.try_lock().unwrap(); - state.fn_stats = FnStats::default(); - state.start_time = start_time; + self.state.try_lock().unwrap().reset(start_time); self.context.reset(); } /// Returns statistics of the operations invoked by this workload so far. /// Resets the internal statistic counters. pub fn take_stats(&self, end_time: Instant) -> WorkloadStats { - let mut state = self.state.try_lock().unwrap(); + let state = self.state.try_lock().unwrap().take(end_time); let result = WorkloadStats { start_time: state.start_time, end_time, function_stats: state.fn_stats.clone(), session_stats: self.context().take_session_stats(), }; - state.start_time = end_time; - state.fn_stats = FnStats::default(); result } }