Skip to content

Commit

Permalink
Record function call duration histograms separately in Sample
Browse files Browse the repository at this point in the history
(cherry picked from commit 819b654)
  • Loading branch information
pkolaczk authored and vponomaryov committed Oct 29, 2024
1 parent 1c144e1 commit b174068
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> {
let interval_start_time = Duration::from_millis((sample.time_s * 1000.0) as u64);
let interval_duration = Duration::from_millis((sample.duration_s * 1000.0) as u64);
log_writer.write_histogram(
&sample.cycle_time_histogram_ns.0,
&sample.cycle_time_histograms_ns.0,
interval_start_time,
interval_duration,
Tag::new(format!("{tag_prefix}cycles").as_str()),
Expand Down
66 changes: 42 additions & 24 deletions src/stats.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use chrono::{DateTime, Local};
use std::cmp::min;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::time::{Instant, SystemTime};

Expand Down Expand Up @@ -276,19 +276,24 @@ pub struct Sample {
pub row_throughput: f32,
pub mean_cycle_time_ms: f32,
pub mean_resp_time_ms: f32,

pub cycle_time_percentiles: [f32; Percentile::COUNT],
pub cycle_time_histograms_ns: SerializableHistogram,

pub cycle_time_percentiles_by_fn: HashMap<String, [f32; Percentile::COUNT]>,
pub cycle_time_histograms_ns_by_fn: HashMap<String, SerializableHistogram>,

pub resp_time_percentiles: [f32; Percentile::COUNT],
pub cycle_time_histogram_ns: SerializableHistogram,
pub resp_time_histogram_ns: SerializableHistogram,
}

impl Sample {
pub fn new(base_start_time: Instant, stats: &[WorkloadStats]) -> Sample {
assert!(!stats.is_empty());

let create_histogram = || SerializableHistogram(Histogram::new(3).unwrap());
let mut cycle_count = 0;
let mut cycle_error_count = 0;
let mut cycle_times_ns = Histogram::new(3).unwrap();

let mut request_count = 0;
let mut req_retry_errors = HashSet::new();
let mut req_retry_count = 0;
Expand All @@ -297,14 +302,13 @@ impl Sample {
let mut req_error_count = 0;
let mut mean_queue_len = 0.0;
let mut duration_s = 0.0;
let mut resp_times_ns = Histogram::new(3).unwrap();

let mut cycle_time_histogram_ns = Histogram::new(3).unwrap();
let mut resp_time_histogram_ns = Histogram::new(3).unwrap();
let mut resp_times_ns = create_histogram();
let mut cycle_times_ns = create_histogram();
let mut cycle_times_ns_per_fn = HashMap::new();

for s in stats {
let ss = &s.session_stats;
let fs = &s.function_stats;
request_count += ss.req_count;
row_count += ss.row_count;
if errors.len() < MAX_KEPT_ERRORS {
Expand All @@ -315,16 +319,27 @@ impl Sample {
req_retry_count += ss.req_retry_count;
mean_queue_len += ss.mean_queue_length / stats.len() as f32;
duration_s += (s.end_time - s.start_time).as_secs_f32() / stats.len() as f32;
resp_times_ns.add(&ss.resp_times_ns).unwrap();
resp_time_histogram_ns.add(&ss.resp_times_ns).unwrap();
resp_times_ns.0.add(&ss.resp_times_ns).unwrap();

for fs in &s.function_stats {
cycle_count += fs.call_count;
cycle_error_count = fs.error_count;
cycle_times_ns.0.add(&fs.call_times_ns).unwrap();
cycle_times_ns_per_fn
.entry(fs.function.name.clone())
.or_insert_with(create_histogram)
.0
.add(&fs.call_times_ns)
.unwrap();
}
}
let resp_time_percentiles = percentiles_ms(&resp_times_ns.0);
let cycle_time_percentiles = percentiles_ms(&cycle_times_ns.0);

cycle_count += fs.call_count;
cycle_error_count = fs.error_count;
cycle_times_ns.add(&fs.call_times_ns).unwrap();
cycle_time_histogram_ns.add(&fs.call_times_ns).unwrap();
let mut cycle_time_percentiles_per_fn = HashMap::new();
for (fn_name, histogram) in &cycle_times_ns_per_fn {
cycle_time_percentiles_per_fn.insert(fn_name.to_owned(), percentiles_ms(&histogram.0));
}
let resp_time_percentiles = percentiles_ms(&resp_times_ns);
let call_time_percentiles = percentiles_ms(&cycle_times_ns);

Sample {
time_s: (stats[0].start_time - base_start_time).as_secs_f32(),
Expand All @@ -341,12 +356,14 @@ impl Sample {
cycle_throughput: cycle_count as f32 / duration_s,
req_throughput: request_count as f32 / duration_s,
row_throughput: row_count as f32 / duration_s,
mean_cycle_time_ms: cycle_times_ns.mean() as f32 / 1000000.0,
cycle_time_histogram_ns: SerializableHistogram(cycle_time_histogram_ns),
cycle_time_percentiles: call_time_percentiles,
mean_resp_time_ms: resp_times_ns.mean() as f32 / 1000000.0,
mean_cycle_time_ms: cycle_times_ns.0.mean() as f32 / 1000000.0,
mean_resp_time_ms: resp_times_ns.0.mean() as f32 / 1000000.0,
resp_time_percentiles,
resp_time_histogram_ns: SerializableHistogram(resp_time_histogram_ns),
resp_time_histogram_ns: resp_times_ns,
cycle_time_percentiles,
cycle_time_percentiles_by_fn: cycle_time_percentiles_per_fn,
cycle_time_histograms_ns: cycle_times_ns,
cycle_time_histograms_ns_by_fn: cycle_times_ns_per_fn,
}
}
}
Expand Down Expand Up @@ -619,9 +636,10 @@ impl Recorder {
self.resp_times_ns
.add(&s.session_stats.resp_times_ns)
.unwrap();
self.cycle_times_ns
.add(&s.function_stats.call_times_ns)
.unwrap();

for fs in &s.function_stats {
self.cycle_times_ns.add(&fs.call_times_ns).unwrap();
}
}
let stats = Sample::new(self.start_instant, samples);
self.cycle_count += stats.cycle_count;
Expand Down
125 changes: 89 additions & 36 deletions src/workload.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::mem;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -15,6 +17,7 @@ use rune::compile::{CompileVisitor, MetaError, MetaRef};
use rune::runtime::{AnyObj, Args, RuntimeContext, Shared, VmError, VmResult};
use rune::termcolor::{ColorChoice, StandardStream};
use rune::{vm_try, Any, Diagnostics, Module, Source, Sources, ToValue, Unit, Value, Vm};
use serde::{Deserialize, Serialize};
use try_lock::TryLock;

use crate::error::LatteError;
Expand Down Expand Up @@ -69,10 +72,16 @@ impl<'a> ToValue for ContextRefMut<'a> {

/// Stores the name and hash together.
/// Name is used for message formatting, hash is used for fast function lookup.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct FnRef {
name: String,
hash: rune::Hash,
pub name: String,
pub hash: rune::Hash,
}

impl Hash for FnRef {
fn hash<H: Hasher>(&self, state: &mut H) {
self.hash.hash(state);
}
}

impl FnRef {
Expand Down Expand Up @@ -394,12 +403,28 @@ impl CompileVisitor for ProgramMetadata {
/// Tracks statistics of the Rune function invoked by the workload
#[derive(Clone, Debug)]
pub struct FnStats {
pub function: FnRef,
pub call_count: u64,
pub error_count: u64,
pub call_times_ns: Histogram<u64>,
}

impl FnStats {
pub fn new(function: FnRef) -> FnStats {
FnStats {
function,
call_count: 0,
error_count: 0,
call_times_ns: Histogram::new(3).unwrap(),
}
}

pub fn reset(&mut self) {
self.call_count = 0;
self.error_count = 0;
self.call_times_ns.clear();
}

pub fn operation_completed(&mut self, duration: Duration) {
self.call_count += 1;
self.call_times_ns
Expand All @@ -416,53 +441,85 @@ impl FnStats {
}
}

impl Default for FnStats {
fn default() -> Self {
FnStats {
call_count: 0,
error_count: 0,
call_times_ns: Histogram::new(3).unwrap(),
}
}
}

/// Statistics of operations (function calls) and Cassandra requests.
pub struct WorkloadStats {
pub start_time: Instant,
pub end_time: Instant,
pub function_stats: FnStats,
pub function_stats: Vec<FnStats>,
pub session_stats: SessionStats,
}

/// Mutable part of Workload
pub struct WorkloadState {
pub struct FnStatsCollector {
start_time: Instant,
fn_stats: FnStats,
fn_stats: Vec<FnStats>,
}

impl Default for WorkloadState {
fn default() -> Self {
WorkloadState {
impl FnStatsCollector {
pub fn new(functions: impl IntoIterator<Item = FnRef>) -> FnStatsCollector {
let mut fn_stats = Vec::new();
for f in functions {
fn_stats.push(FnStats::new(f));
}
FnStatsCollector {
start_time: Instant::now(),
fn_stats: Default::default(),
fn_stats,
}
}

pub fn functions(&self) -> impl Iterator<Item = FnRef> + '_ {
self.fn_stats.iter().map(|f| f.function.clone())
}

/// Records the duration of a successful operation
pub fn operation_completed(&mut self, function: &FnRef, duration: Duration) {
self.fn_stats_mut(function).operation_completed(duration);
}

/// Records the duration of a failed operation
pub fn operation_failed(&mut self, function: &FnRef, duration: Duration) {
self.fn_stats_mut(function).operation_failed(duration);
}

/// Finds the stats for given function.
/// The function must exist! Otherwise, it will panic.
fn fn_stats_mut(&mut self, function: &FnRef) -> &mut FnStats {
self.fn_stats
.iter_mut()
.find(|f| f.function.hash == function.hash)
.unwrap()
}

/// Clears any collected stats and sets the start time
pub fn reset(&mut self, start_time: Instant) {
self.fn_stats.iter_mut().for_each(FnStats::reset);
self.start_time = start_time;
}

/// Returns the collected stats and resets this object
pub fn take(&mut self, end_time: Instant) -> FnStatsCollector {
let mut state = FnStatsCollector::new(self.functions());
state.start_time = end_time;
mem::swap(self, &mut state);
state
}
}

pub struct Workload {
context: Context,
program: Program,
router: FunctionRouter,
state: TryLock<WorkloadState>,
state: TryLock<FnStatsCollector>,
}

impl Workload {
pub fn new(context: Context, program: Program, functions: &[(FnRef, f64)]) -> Workload {
let state = FnStatsCollector::new(functions.iter().map(|x| x.0.clone()));
Workload {
context,
program,
router: FunctionRouter::new(functions),
state: TryLock::new(WorkloadState::default()),
state: TryLock::new(state),
}
}

Expand All @@ -472,7 +529,9 @@ impl Workload {
// make a deep copy to avoid congestion on Arc ref counts used heavily by Rune
program: self.program.unshare(),
router: self.router.clone(),
state: TryLock::new(WorkloadState::default()),
state: TryLock::new(FnStatsCollector::new(
self.state.try_lock().unwrap().functions(),
)),
})
}

Expand All @@ -484,26 +543,24 @@ impl Workload {
let start_time = Instant::now();
let mut rng = SmallRng::seed_from_u64(cycle as u64);
let context = SessionRef::new(&self.context);
let result = self
.program
.async_call(self.router.select(&mut rng), (context, cycle))
.await;
let function = self.router.select(&mut rng);
let result = self.program.async_call(function, (context, cycle)).await;
let end_time = Instant::now();
let mut state = self.state.try_lock().unwrap();
let duration = end_time - start_time;
match result {
Ok(_) => {
state.fn_stats.operation_completed(duration);
state.operation_completed(function, duration);
Ok((cycle, end_time))
}
Err(LatteError::Cassandra(CassError(CassErrorKind::Overloaded(_, _)))) => {
// don't stop on overload errors;
// they are being counted by the context stats anyways
state.fn_stats.operation_failed(duration);
state.operation_failed(function, duration);
Ok((cycle, end_time))
}
Err(e) => {
state.fn_stats.operation_failed(duration);
state.operation_failed(function, duration);
Err(e)
}
}
Expand All @@ -519,24 +576,20 @@ impl Workload {
/// Needed for producing `WorkloadStats` with
/// recorded start and end times of measurement.
pub fn reset(&self, start_time: Instant) {
let mut state = self.state.try_lock().unwrap();
state.fn_stats = FnStats::default();
state.start_time = start_time;
self.state.try_lock().unwrap().reset(start_time);
self.context.reset();
}

/// Returns statistics of the operations invoked by this workload so far.
/// Resets the internal statistic counters.
pub fn take_stats(&self, end_time: Instant) -> WorkloadStats {
let mut state = self.state.try_lock().unwrap();
let state = self.state.try_lock().unwrap().take(end_time);
let result = WorkloadStats {
start_time: state.start_time,
end_time,
function_stats: state.fn_stats.clone(),
session_stats: self.context().take_session_stats(),
};
state.start_time = end_time;
state.fn_stats = FnStats::default();
result
}
}
Expand Down

0 comments on commit b174068

Please sign in to comment.