-
Notifications
You must be signed in to change notification settings - Fork 43
Convert the KstatSampler
to a non-blocking self-stat queue
#8005
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Switch the `KstatSampler`'s queue used to report self-statistics from a `tokio::sync::mpsc` channel to a `tokio::sync::broadcast`. This latter channel is a ring-buffer, where slow readers lose the earliest samples when they lag behind. The `mpsc` queue could block the worker entirely, if `oximeter` never actually reached the producer to drain that queue. - Fixes #7983
@@ -414,93 +424,13 @@ impl KstatSamplerWorker { | |||
let Some((id, interval)) = maybe_id else { | |||
unreachable!(); | |||
}; | |||
match self.sample_one(id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All this diff is cleaning up this giant match
. I've broken out the arms into methods, but almost all of this is NFC.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, this seems like the right thing to me! I had a couple minor nitpicks and suggestions, but no real concerns.
match self.add_target(target, details) { | ||
Ok(id) => { | ||
trace!( | ||
self.log, | ||
"added target with timeout"; | ||
"id" => ?id, | ||
"details" => ?details, | ||
); | ||
match reply_tx.send(Ok(id)) { | ||
Ok(_) => trace!(self.log, "sent reply"), | ||
Err(e) => error!( | ||
self.log, | ||
"failed to send reply"; | ||
"id" => ?id, | ||
"error" => ?e, | ||
), | ||
} | ||
Request::RemoveTarget { id, reply_tx } => { | ||
self.targets.remove(&id); | ||
if let Some(remaining_samples) = self.samples.lock().unwrap().remove(&id) { | ||
if !remaining_samples.is_empty() { | ||
warn!( | ||
self.log, | ||
"target removed with queued samples"; | ||
"id" => ?id, | ||
"n_samples" => remaining_samples.len(), | ||
); | ||
} | ||
} | ||
match reply_tx.send(Ok(())) { | ||
Ok(_) => trace!(self.log, "sent reply"), | ||
Err(e) => error!( | ||
self.log, | ||
"failed to send reply"; | ||
"error" => ?e, | ||
) | ||
} | ||
Some(YieldIdAfter::new(id, details.interval)) | ||
} | ||
Err(e) => { | ||
error!( | ||
self.log, | ||
"failed to add target"; | ||
"error" => ?e, | ||
); | ||
match reply_tx.send(Err(e)) { | ||
Ok(_) => trace!(self.log, "sent reply"), | ||
Err(e) => error!( | ||
self.log, | ||
"failed to send reply"; | ||
"error" => ?e, | ||
), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wondered briefly if we could make this (and the similar code in other match arms) a bit less repetitive by having the match
on whether the operation succeeds just return the reply, and then send that to reply_tx
in one place, rather than in two. but, we can't easily do that because we also decide whether or not to return Some(YieldIdAfter::new(...))
based on whether or not the operation succeeds. Perhaps we could make this a bit less repetitive if we pulled the code for sending a message to reply_tx
and logging if that fails into a function, but it's not really a big deal either way...
// Safety: We only get here if the `sample_one()` | ||
// method works, which means we have a record of the | ||
// target, and it's not expired. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
silly nit, considered pedantic: i realize this code was already here, and it just moved around in this diff, but personally i don't really love the use of a "Safety:" comment on something that isn't an unsafe
block. if it were me, i might just put this text in the panic message for the unreachable!
macro instead. feel free to disregard this, it's definitely not important.
Err(broadcast::error::TryRecvError::Lagged(n_missed)) => { | ||
warn!( | ||
self.log, | ||
"producer missed some samples because they're \ | ||
being produced faster than `oximeter` is collecting"; | ||
"n_missed" => %n_missed, | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be recording missing samples in this case? it would be nice to let Oximeter know we've dropped some stuff...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. That would be a new kind of missing sample, one specifically saying that the sampler itself dropped a sample. We currently collect information about samples we've dropped for each target the sampler tracks. That would mean an addition to the kstat-sampler.toml
table schema file. I'd prefer to do that in another PR, I'll make an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah, if it's a bigger change, let's do that separately. I do think it's worth considering this kind of data loss as a first-class construct in Oximeter, but that seems like its own chunk of work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think it's worth considering this kind of data loss as a first-class construct in Oximeter, but that seems like its own chunk of work.
oximeter
does already have the ability to represent missing samples. The issue is that at this point, we don't know which timeseries the missing samples belongs to because we don't have the sample. We need a new timeseries, specifically something like kstat_sampler:self_stat_samples_dropped
or similar, to represent this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in #8025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New comments look good, thanks for adding them!
KstatSampler
's queue used to report self-statistics from atokio::sync::mpsc
channel to atokio::sync::broadcast
. This latter channel is a ring-buffer, where slow readers lose the earliest samples when they lag behind. Thempsc
queue could block the worker entirely, ifoximeter
never actually reached the producer to drain that queue.