Skip to content

Commit f5b92b7

Browse files
authored
Convert the KstatSampler to a non-blocking self-stat queue (#8005)
- Switch the `KstatSampler`'s queue used to report self-statistics from a `tokio::sync::mpsc` channel to a `tokio::sync::broadcast`. This latter channel is a ring-buffer, where slow readers lose the earliest samples when they lag behind. The `mpsc` queue could block the worker entirely, if `oximeter` never actually reached the producer to drain that queue. - Fixes #7983
1 parent c531755 commit f5b92b7

File tree

2 files changed

+395
-235
lines changed

2 files changed

+395
-235
lines changed

oximeter/instruments/src/kstat/link.rs

+83
Original file line numberDiff line numberDiff line change
@@ -655,4 +655,87 @@ mod tests {
655655
let times = sampler.creation_times().await;
656656
assert!(times.contains_key(&path));
657657
}
658+
659+
#[tokio::test]
660+
async fn overflowing_self_stat_queue_does_not_block_sampler() {
661+
let log = test_logger();
662+
let mut sampler = KstatSampler::with_sample_limit(&log, 1).unwrap();
663+
664+
// We'll create an actual link, so that we can generate valid samples
665+
// and overflow the per-target queue. This will ensure we continually
666+
// push "overflow" samples onto the self-stat queue.
667+
let link = TestEtherstub::new();
668+
info!(log, "created test etherstub"; "name" => &link.name);
669+
let target = SledDataLinkTarget {
670+
rack_id: RACK_ID,
671+
sled_id: SLED_ID,
672+
sled_serial: SLED_SERIAL.into(),
673+
link_name: link.name.clone().into(),
674+
kind: KIND.into(),
675+
sled_model: SLED_MODEL.into(),
676+
sled_revision: SLED_REVISION,
677+
zone_name: ZONE_NAME.into(),
678+
};
679+
let dl = SledDataLink::new(target, true);
680+
let collection_interval = Duration::from_millis(10);
681+
let details = CollectionDetails::never(collection_interval);
682+
let _id = sampler.add_target(dl, details).await.unwrap();
683+
684+
// Pause time long enough for the sampler to have produced a bunch of
685+
// self-stats.
686+
tokio::time::pause();
687+
const MAX_DURATION: Duration = Duration::from_millis(5000 * 10);
688+
const STEP_DURATION: Duration = Duration::from_millis(1);
689+
let now = Instant::now();
690+
while now.elapsed() < MAX_DURATION {
691+
tokio::time::advance(STEP_DURATION).await;
692+
}
693+
694+
// Collect and sum all the sample counters.
695+
let mut sample_counts = sampler.sample_counts().unwrap();
696+
let first_overflow_sample_count = sample_counts;
697+
while let Some(counts) = sampler.sample_counts() {
698+
sample_counts += counts;
699+
}
700+
println!("{sample_counts:?}");
701+
assert_eq!(sample_counts.total, sample_counts.overflow + 1);
702+
703+
// We should have one real sample, and then the entire self-stat queue
704+
// filled with "overflow" samples. Collect the samples first, which
705+
// we'll verify below.
706+
let samples: Vec<_> = sampler.produce().unwrap().collect();
707+
assert_eq!(samples.len(), 4096 + 1);
708+
709+
// The _first_ overflow sample in the collected samples should not be
710+
// the same as the first sample count we got on our test queue. In other
711+
// words, we should have dropped the first few overflow sample counts as
712+
// we started to lag behind on the broadcast queue.
713+
//
714+
// In the previous implementation, which used an mpsc queue, that queue
715+
// would block the worker when full. That means the first overflow
716+
// sample on the actual producer queue would be the same as the sample
717+
// count, i.e., the queue still holds the first chunk of overflow
718+
// samples, rather than evicting the older ones as the current
719+
// implementation does.
720+
let oximeter::Datum::CumulativeU64(count) =
721+
&samples[1].measurement.datum()
722+
else {
723+
unreachable!();
724+
};
725+
assert!(count.value() > first_overflow_sample_count.overflow as u64);
726+
727+
// The final sample on the queue should report the cumulative number of
728+
// overflow samples too.
729+
let oximeter::Datum::CumulativeU64(count) =
730+
samples.last().unwrap().measurement.datum()
731+
else {
732+
unreachable!();
733+
};
734+
assert_eq!(count.value(), sample_counts.overflow as u64);
735+
736+
// And we should have recorded many more than the queue size, proving
737+
// that we dropped some counts as we lagged the broadcast queue, but we
738+
// kept the final samples.
739+
assert!(count.value() > 4096);
740+
}
658741
}

0 commit comments

Comments
 (0)