Skip to content

Commit

Permalink
feat: custom attributes in otel task metrics (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
heilhead authored Oct 9, 2023
1 parent 356602a commit 3f6fcca
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 39 deletions.
1 change: 1 addition & 0 deletions crates/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ prometheus = "0.13"
opentelemetry = { version = "0.19", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = "0.12"
once_cell = "1.17"
smallvec = "1.11"
76 changes: 37 additions & 39 deletions crates/metrics/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use {
},
once_cell::sync::OnceCell,
opentelemetry::metrics::{Counter, Histogram},
smallvec::SmallVec,
std::{ops::Deref, sync::Arc, time::Duration},
};

/// Wrapper for [`OtelTaskMetricsRecorder`], which can be statically
/// initialized.
pub struct TaskMetrics {
prefix: &'static str,
inner: OnceCell<OtelTaskMetricsRecorder<()>>,
inner: OnceCell<OtelTaskMetricsRecorder>,
}

impl TaskMetrics {
Expand Down Expand Up @@ -50,60 +51,57 @@ impl Deref for TaskMetrics {
/// The above metrics are tracked using [`opentelemetry`] metrics API and are
/// prefixed according to the constructor arguments.
#[derive(Clone)]
pub struct OtelTaskMetricsRecorder<N: AsTaskName = ()> {
pub struct OtelTaskMetricsRecorder {
inner: Arc<OtelRecorderInner>,
name: Option<N>,
name: &'static str,
attributes: SmallVec<[otel::KeyValue; 2]>,
}

impl OtelTaskMetricsRecorder<()> {
impl OtelTaskMetricsRecorder {
pub fn new(prefix: &str) -> Self {
Self {
inner: Arc::new(OtelRecorderInner::new(prefix)),
name: None,
name: "unknown",
attributes: SmallVec::new(),
}
}
}

impl<N> OtelTaskMetricsRecorder<N>
where
N: AsTaskName,
{
#[inline]
fn task_name_kv(&self) -> otel::KeyValue {
let name: &'static str = self
.name
.as_ref()
.map(AsTaskName::as_task_name)
.unwrap_or_default();

otel::KeyValue::new("task_name", name)
}
}

impl<N1> OtelTaskMetricsRecorder<N1>
where
N1: AsTaskName,
{
/// Clones the current recording context with a new task name.
pub fn with_name<N2>(&self, name: N2) -> OtelTaskMetricsRecorder<N2>
pub fn with_name<N>(&self, name: N) -> Self
where
N2: AsTaskName,
N: AsTaskName,
{
OtelTaskMetricsRecorder {
Self {
inner: self.inner.clone(),
name: Some(name),
name: name.as_task_name(),
attributes: self.attributes.clone(),
}
}

/// Clones the current recording context with a new set of attributes.
pub fn with_attributes(
&self,
attributes: impl IntoIterator<Item = otel::KeyValue>,
) -> OtelTaskMetricsRecorder {
Self {
inner: self.inner.clone(),
name: self.name,
attributes: attributes.into_iter().collect(),
}
}

fn combine_attributes(&self) -> SmallVec<[otel::KeyValue; 4]> {
let name = [otel::KeyValue::new("task_name", self.name)];
let extra = self.attributes.iter().cloned();
name.into_iter().chain(extra).collect()
}
}

impl<N> TaskMetricsRecorder for OtelTaskMetricsRecorder<N>
where
N: AsTaskName,
{
impl TaskMetricsRecorder for OtelTaskMetricsRecorder {
fn record_task_started(&self) {
self.inner
.tasks_started
.add(&otel::Context::new(), 1, &[self.task_name_kv()]);
.add(&otel::Context::new(), 1, &self.combine_attributes());
}

fn record_task_finished(
Expand All @@ -115,10 +113,10 @@ where
) {
let total_duration_ms = duration_as_millis_f64(total_duration);
let poll_duration_ms = duration_as_millis_f64(poll_duration);
let attrs = [
self.task_name_kv(),
otel::KeyValue::new("completed", completed),
];

let mut attrs = self.combine_attributes();
attrs.push(otel::KeyValue::new("completed", completed));

let ctx = otel::Context::new();

self.inner
Expand Down

0 comments on commit 3f6fcca

Please sign in to comment.