Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Measure metrics latency #2323

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions stress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ name = "metrics_histogram"
path = "src/metrics_histogram.rs"
doc = false

[[bin]] # Bin to run measure latency in various conditions
name = "metrics_latency"
path = "src/metrics_latency.rs"
doc = false

[[bin]] # Bin to run the metrics overflow stress tests
name = "metrics_overflow"
path = "src/metrics_overflow.rs"
Expand Down
229 changes: 229 additions & 0 deletions stress/src/metrics_latency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
use std::{
collections::{BTreeMap, HashMap},
sync::{Arc, Mutex, Weak},
time::{Duration, Instant},
};

use opentelemetry::{metrics::MeterProvider, KeyValue};
use opentelemetry_sdk::{
metrics::{
data::{self, ResourceMetrics},
reader::MetricReader,
InstrumentKind, ManualReader, MetricError, Pipeline, SdkMeterProvider, Temporality,
},
Resource,
};

// copy/paste from opentelemetry-sdk/benches/metric.rs
#[derive(Clone, Debug)]
pub struct SharedReader(Arc<dyn MetricReader>);

impl SharedReader {
pub fn new<R>(reader: R) -> Self
where
R: MetricReader,
{
Self(Arc::new(reader))
}
}

impl MetricReader for SharedReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
self.0.register_pipeline(pipeline)
}

fn collect(&self, rm: &mut ResourceMetrics) -> Result<(), MetricError> {
self.0.collect(rm)
}

fn force_flush(&self) -> Result<(), MetricError> {
self.0.force_flush()
}

fn shutdown(&self) -> Result<(), MetricError> {
self.0.shutdown()
}

fn temporality(&self, kind: InstrumentKind) -> Temporality {
self.0.temporality(kind)
}
}

const ITERATIONS_PER_THREAD: usize = 10000;

fn main() {
let available_threads: usize = std::thread::available_parallelism().map_or(1, |p| p.get());

measure_update_latency(
&format!("half threads = {available_threads}/2, lots of inserts"),
available_threads / 2,
|_i, j| format!("{}", j % 2000),
);
measure_update_latency(
&format!("half threads = {available_threads}/2, updates only"),
available_threads / 2,
|_i, _j| String::new(),
);
measure_update_latency(
&format!("twice threads = {available_threads}*2, lots of inserts"),
available_threads * 2,
|_i, j| format!("{}", j % 2000),
);
measure_update_latency(
&format!("twice threads = {available_threads}*2, updates only"),
available_threads * 2,
|_i, _j| String::new(),
);
}

fn measure_update_latency(msg: &str, threads_count: usize, keyname: fn(usize, usize) -> String) {
let reader = SharedReader::new(
ManualReader::builder()
.with_temporality(Temporality::Delta)
.build(),
);
let provider = SdkMeterProvider::builder()
.with_reader(reader.clone())
.build();
let histogram = provider.meter("test").u64_counter("hello").build();
let mut threads = Vec::new();
let mut stats = Vec::new();
stats.resize_with(threads_count, || {
Arc::new(Mutex::new(HashMap::<u64, u64>::new()))
});
// run multiple threads and measure how time it takes to update metric
for thread_idx in 0..threads_count {
let hist = histogram.clone();
let stat = stats[thread_idx].clone();
threads.push(std::thread::spawn(move || {
let mut stat = stat.lock().unwrap();
for iter_idx in 0..ITERATIONS_PER_THREAD {
let kv = KeyValue::new(keyname(thread_idx, iter_idx), 1);
let start = Instant::now();
hist.add(1, &[kv]);
let curr = stat.entry(start.elapsed().as_nanos() as u64).or_default();
*curr += 1;
}
}));
}
// collect threads in short intervals
let mut total_count = 0;
while threads.iter().any(|t| !t.is_finished()) {
// collect agressively so we could measure inserts properly,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to understand what this is simulating? Metric collection interval is by default 30 sec or 60 sec in the spec. What is the scenario for collecting every 500 microsec?

// but not loop, as we'll see no difference in updates vs inserts due to super high contention
std::thread::sleep(Duration::from_micros(500));
total_count += collect_and_return_count(&reader);
}
threads.into_iter().for_each(|t| {
t.join().unwrap();
});
total_count += collect_and_return_count(&reader);

let total_measurements = (threads_count * ITERATIONS_PER_THREAD) as u64;
assert_eq!(total_count, total_measurements);

let stats = stats
.into_iter()
.map(|s| Arc::into_inner(s).unwrap().into_inner().unwrap())
.flat_map(|s| s.into_iter())
.fold(BTreeMap::<u64, u64>::default(), |mut acc, (time, count)| {
*acc.entry(time).or_default() += count;
acc
});

let sum = stats.iter().fold(0, |mut acc, (&time, &count)| {
acc += time * count;
acc
});

println!("{msg}");
println!("avg {}", format_time(sum / total_measurements as u64));
println!(
"p50 {}",
format_time(get_percentile_value(total_measurements, &stats, 50))
);
println!(
"p95 {}",
format_time(get_percentile_value(total_measurements, &stats, 95))
);
println!(
"p99 {}",
format_time(get_percentile_value(total_measurements, &stats, 99))
);
}

fn collect_and_return_count(reader: &SharedReader) -> u64 {
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
reader.collect(&mut rm).unwrap();
rm.scope_metrics
.into_iter()
.flat_map(|sm| sm.metrics.into_iter())
.flat_map(|m| {
m.data
.as_any()
.downcast_ref::<data::Sum<u64>>()
.unwrap()
.data_points
.clone()
.into_iter()
})
.map(|dp| dp.value)
.sum()
}

fn get_percentile_value(
total_measurements: u64,
stats: &BTreeMap<u64, u64>,
percentile: u64,
) -> u64 {
assert!(percentile > 0 && percentile < 100);
let break_point = ((total_measurements as f64 * percentile as f64) / 100.0) as u64;
let mut iter = stats.iter().peekable();
let mut sum = 0;
while let Some(left) = iter.next() {
sum += left.1;
if let Some(&right) = iter.peek() {
let next_sum = sum + right.1;
if next_sum > break_point {
// interpolate
let diff = (next_sum - sum) as f32;
let ratio = (break_point - sum) as f32 / diff;
let time_diff = (right.0 - left.0) as f32;
return *left.0 + (time_diff * ratio) as u64;
}
}
}
0
}

fn format_time(nanos: u64) -> String {
let nanos = nanos as f64;
let (val, symbol) = if nanos > 1000000.0 {
(nanos / 1000000.0, "ms")
} else if nanos > 1000.0 {
(nanos / 1000.0, "μs")
} else {
(nanos, "ns")
};
if val > 100.0 {
format!("{val:>5.1}{symbol}")
} else if val > 10.0 {
format!("{val:>5.2}{symbol}")
} else {
format!("{val:>5.3}{symbol}")
}
}

#[test]
fn format_number() {
assert_eq!("12.00ns", format_time(12));
assert_eq!("123.0ns", format_time(123));
assert_eq!("1.234μs", format_time(1234));
assert_eq!("12.35μs", format_time(12349));
assert_eq!("123.4μs", format_time(123400));
assert_eq!("1.235ms", format_time(1234900));
assert_eq!("12.34ms", format_time(12340000));
}
Loading