Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: custom attributes in otel task metrics #9

Merged
merged 2 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ prometheus = "0.13"
opentelemetry = { version = "0.19", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = "0.12"
once_cell = "1.17"
smallvec = "1.11"
76 changes: 37 additions & 39 deletions crates/metrics/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use {
},
once_cell::sync::OnceCell,
opentelemetry::metrics::{Counter, Histogram},
smallvec::SmallVec,
std::{ops::Deref, sync::Arc, time::Duration},
};

/// Wrapper for [`OtelTaskMetricsRecorder`], which can be statically
/// initialized.
pub struct TaskMetrics {
prefix: &'static str,
inner: OnceCell<OtelTaskMetricsRecorder<()>>,
inner: OnceCell<OtelTaskMetricsRecorder>,
}

impl TaskMetrics {
Expand Down Expand Up @@ -50,60 +51,57 @@ impl Deref for TaskMetrics {
/// The above metrics are tracked using [`opentelemetry`] metrics API and are
/// prefixed according to the constructor arguments.
#[derive(Clone)]
pub struct OtelTaskMetricsRecorder<N: AsTaskName = ()> {
pub struct OtelTaskMetricsRecorder {
inner: Arc<OtelRecorderInner>,
name: Option<N>,
name: &'static str,
attributes: SmallVec<[otel::KeyValue; 2]>,
}

impl OtelTaskMetricsRecorder<()> {
impl OtelTaskMetricsRecorder {
pub fn new(prefix: &str) -> Self {
Self {
inner: Arc::new(OtelRecorderInner::new(prefix)),
name: None,
name: "unknown",
attributes: SmallVec::new(),
}
}
}

impl<N> OtelTaskMetricsRecorder<N>
where
N: AsTaskName,
{
#[inline]
fn task_name_kv(&self) -> otel::KeyValue {
let name: &'static str = self
.name
.as_ref()
.map(AsTaskName::as_task_name)
.unwrap_or_default();

otel::KeyValue::new("task_name", name)
}
}

impl<N1> OtelTaskMetricsRecorder<N1>
where
N1: AsTaskName,
{
/// Clones the current recording context with a new task name.
pub fn with_name<N2>(&self, name: N2) -> OtelTaskMetricsRecorder<N2>
pub fn with_name<N>(&self, name: N) -> Self
where
N2: AsTaskName,
N: AsTaskName,
{
OtelTaskMetricsRecorder {
Self {
inner: self.inner.clone(),
name: Some(name),
name: name.as_task_name(),
attributes: self.attributes.clone(),
}
}

/// Clones the current recording context with a new set of attributes.
pub fn with_attributes(
&self,
attributes: impl IntoIterator<Item = otel::KeyValue>,
) -> OtelTaskMetricsRecorder {
Self {
inner: self.inner.clone(),
name: self.name,
attributes: attributes.into_iter().collect(),
}
}

fn combine_attributes(&self) -> SmallVec<[otel::KeyValue; 4]> {
let name = [otel::KeyValue::new("task_name", self.name)];
let extra = self.attributes.iter().cloned();
name.into_iter().chain(extra).collect()
}
}

impl<N> TaskMetricsRecorder for OtelTaskMetricsRecorder<N>
where
N: AsTaskName,
{
impl TaskMetricsRecorder for OtelTaskMetricsRecorder {
fn record_task_started(&self) {
self.inner
.tasks_started
.add(&otel::Context::new(), 1, &[self.task_name_kv()]);
.add(&otel::Context::new(), 1, &self.combine_attributes());
}

fn record_task_finished(
Expand All @@ -115,10 +113,10 @@ where
) {
let total_duration_ms = duration_as_millis_f64(total_duration);
let poll_duration_ms = duration_as_millis_f64(poll_duration);
let attrs = [
self.task_name_kv(),
otel::KeyValue::new("completed", completed),
];

let mut attrs = self.combine_attributes();
attrs.push(otel::KeyValue::new("completed", completed));

let ctx = otel::Context::new();

self.inner
Expand Down