Skip to content

Convert the KstatSampler to a non-blocking self-stat queue #8005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 23, 2025

Conversation

bnaecker
Copy link
Collaborator

  • Switch the KstatSampler's queue used to report self-statistics from a tokio::sync::mpsc channel to a tokio::sync::broadcast. This latter channel is a ring-buffer, where slow readers lose the earliest samples when they lag behind. The mpsc queue could block the worker entirely, if oximeter never actually reached the producer to drain that queue.
  • Fixes Instance failed to start, channel at capacity (32) #7983

- Switch the `KstatSampler`'s queue used to report self-statistics from
  a `tokio::sync::mpsc` channel to a `tokio::sync::broadcast`. This
  latter channel is a ring-buffer, where slow readers lose the earliest
  samples when they lag behind. The `mpsc` queue could block the worker
  entirely, if `oximeter` never actually reached the producer to drain
  that queue.
- Fixes #7983
@bnaecker bnaecker requested review from hawkw and gjcolombo April 18, 2025 17:52
@@ -414,93 +424,13 @@ impl KstatSamplerWorker {
let Some((id, interval)) = maybe_id else {
unreachable!();
};
match self.sample_one(id) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this diff is cleaning up this giant match. I've broken out the arms into methods, but almost all of this is NFC.

Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this seems like the right thing to me! I had a couple minor nitpicks and suggestions, but no real concerns.

Comment on lines +461 to 493
match self.add_target(target, details) {
Ok(id) => {
trace!(
self.log,
"added target with timeout";
"id" => ?id,
"details" => ?details,
);
match reply_tx.send(Ok(id)) {
Ok(_) => trace!(self.log, "sent reply"),
Err(e) => error!(
self.log,
"failed to send reply";
"id" => ?id,
"error" => ?e,
),
}
Request::RemoveTarget { id, reply_tx } => {
self.targets.remove(&id);
if let Some(remaining_samples) = self.samples.lock().unwrap().remove(&id) {
if !remaining_samples.is_empty() {
warn!(
self.log,
"target removed with queued samples";
"id" => ?id,
"n_samples" => remaining_samples.len(),
);
}
}
match reply_tx.send(Ok(())) {
Ok(_) => trace!(self.log, "sent reply"),
Err(e) => error!(
self.log,
"failed to send reply";
"error" => ?e,
)
}
Some(YieldIdAfter::new(id, details.interval))
}
Err(e) => {
error!(
self.log,
"failed to add target";
"error" => ?e,
);
match reply_tx.send(Err(e)) {
Ok(_) => trace!(self.log, "sent reply"),
Err(e) => error!(
self.log,
"failed to send reply";
"error" => ?e,
),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wondered briefly if we could make this (and the similar code in other match arms) a bit less repetitive by having the match on whether the operation succeeds just return the reply, and then send that to reply_tx in one place, rather than in two. but, we can't easily do that because we also decide whether or not to return Some(YieldIdAfter::new(...)) based on whether or not the operation succeeds. Perhaps we could make this a bit less repetitive if we pulled the code for sending a message to reply_tx and logging if that fails into a function, but it's not really a big deal either way...

Comment on lines +636 to +638
// Safety: We only get here if the `sample_one()`
// method works, which means we have a record of the
// target, and it's not expired.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

silly nit, considered pedantic: i realize this code was already here, and it just moved around in this diff, but personally i don't really love the use of a "Safety:" comment on something that isn't an unsafe block. if it were me, i might just put this text in the panic message for the unreachable! macro instead. feel free to disregard this, it's definitely not important.

Comment on lines +1314 to +1321
Err(broadcast::error::TryRecvError::Lagged(n_missed)) => {
warn!(
self.log,
"producer missed some samples because they're \
being produced faster than `oximeter` is collecting";
"n_missed" => %n_missed,
);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be recording missing samples in this case? it would be nice to let Oximeter know we've dropped some stuff...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. That would be a new kind of missing sample, one specifically saying that the sampler itself dropped a sample. We currently collect information about samples we've dropped for each target the sampler tracks. That would mean an addition to the kstat-sampler.toml table schema file. I'd prefer to do that in another PR, I'll make an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, if it's a bigger change, let's do that separately. I do think it's worth considering this kind of data loss as a first-class construct in Oximeter, but that seems like its own chunk of work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think it's worth considering this kind of data loss as a first-class construct in Oximeter, but that seems like its own chunk of work.

oximeter does already have the ability to represent missing samples. The issue is that at this point, we don't know which timeseries the missing samples belongs to because we don't have the sample. We need a new timeseries, specifically something like kstat_sampler:self_stat_samples_dropped or similar, to represent this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #8025

Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New comments look good, thanks for adding them!

@bnaecker bnaecker merged commit f5b92b7 into main Apr 23, 2025
16 checks passed
@bnaecker bnaecker deleted the non-blocking-self-stat-queue branch April 23, 2025 00:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Instance failed to start, channel at capacity (32)
2 participants