-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics exporter to Prometheus with OTLP #1438
Changes from 6 commits
b57aa09
92329b5
397c390
68a42b6
a8235de
3d986b4
dcbbf51
35fbb2e
ec5e5e4
92c3a28
d49a6da
8076f04
5d7c689
76c6652
7e1cb2b
dd2a174
822c659
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
use axum::{routing::get, Router}; | ||
use hyper::StatusCode; | ||
use opentelemetry::KeyValue; | ||
|
||
use crate::net::{ | ||
http_serde::{self}, | ||
Error, | ||
}; | ||
|
||
use opentelemetry::metrics::MeterProvider; | ||
use opentelemetry_sdk::metrics::SdkMeterProvider; | ||
use prometheus::{self, Encoder, TextEncoder}; | ||
|
||
/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent | ||
/// to the [`HttpTransport`]. | ||
async fn handler(// transport: Extension<MpcHttpTransport>, | ||
// QueryConfigQueryParams(query_config): QueryConfigQueryParams, | ||
) -> Result<Vec<u8>, Error> { | ||
// match transport.dispatch(query_config, BodyStream::empty()).await { | ||
// Ok(resp) => Ok(Json(resp.try_into()?)), | ||
// Err(err @ ApiError::NewQuery(NewQueryError::State { .. })) => { | ||
// Err(Error::application(StatusCode::CONFLICT, err)) | ||
// } | ||
// Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), | ||
// } | ||
|
||
// TODO: Remove this dummy metrics and get metrics for scraper from ipa-metrics::PrometheusMetricsExporter (see ipa-metrics/exporter.rs) | ||
|
||
// create a new prometheus registry | ||
let registry = prometheus::Registry::new(); | ||
|
||
// configure OpenTelemetry to use this registry | ||
let exporter = opentelemetry_prometheus::exporter() | ||
.with_registry(registry.clone()) | ||
.build() | ||
.unwrap(); | ||
|
||
// set up a meter to create instruments | ||
let provider = SdkMeterProvider::builder().with_reader(exporter).build(); | ||
let meter = provider.meter("ipa-helper"); | ||
|
||
// Use two instruments | ||
let counter = meter | ||
.u64_counter("a.counter") | ||
.with_description("Counts things") | ||
.init(); | ||
let histogram = meter | ||
.u64_histogram("a.histogram") | ||
.with_description("Records values") | ||
.init(); | ||
|
||
counter.add(100, &[KeyValue::new("key", "value")]); | ||
histogram.record(100, &[KeyValue::new("key", "value")]); | ||
|
||
// Encode data as text or protobuf | ||
let encoder = TextEncoder::new(); | ||
let metric_families = registry.gather(); | ||
let mut result = Vec::new(); | ||
match encoder.encode(&metric_families, &mut result) { | ||
Ok(()) => Ok(result), | ||
Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), | ||
} | ||
} | ||
|
||
pub fn router() -> Router { | ||
Router::new().route(http_serde::metrics::AXUM_PATH, get(handler)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,11 @@ partitions = [] | |
crossbeam-channel = "0.5" | ||
# This crate uses raw entry API that is unstable in stdlib | ||
hashbrown = "0.15" | ||
# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC) | ||
opentelemetry = "0.24" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is great, but can you make a separate crate for it? |
||
opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] } | ||
opentelemetry-prometheus = { version = "0.17" } | ||
prometheus = "0.13.3" | ||
# Fast non-collision-resistant hashing | ||
rustc-hash = "2.0.0" | ||
# logging | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
use std::io; | ||
|
||
use opentelemetry::metrics::{Meter, MeterProvider}; | ||
use opentelemetry::KeyValue; | ||
use opentelemetry_sdk::metrics::SdkMeterProvider; | ||
use prometheus::{self, Encoder, TextEncoder}; | ||
|
||
use crate::MetricsStore; | ||
|
||
pub trait PrometheusMetricsExporter { | ||
fn export<W: io::Write>(&mut self, w: &mut W); | ||
} | ||
|
||
impl MetricsStore { | ||
fn to_otlp(&mut self, meter: &Meter) { | ||
let counters = self.counters(); | ||
|
||
counters.for_each(|(counter_name, counter_value)| { | ||
let otlp_counter = meter.u64_counter(counter_name.key).init(); | ||
|
||
let attributes: Vec<KeyValue> = counter_name | ||
.labels() | ||
.map(|l| KeyValue::new(l.name, l.val.to_string())) | ||
.collect(); | ||
|
||
otlp_counter.add(counter_value, &attributes[..]); | ||
}); | ||
} | ||
} | ||
|
||
impl PrometheusMetricsExporter for MetricsStore { | ||
fn export<W: io::Write>(&mut self, w: &mut W) { | ||
// Setup prometheus registry and open-telemetry exporter | ||
let registry = prometheus::Registry::new(); | ||
|
||
let exporter = opentelemetry_prometheus::exporter() | ||
.with_registry(registry.clone()) | ||
.build() | ||
.unwrap(); | ||
|
||
let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build(); | ||
|
||
// Convert the snapshot to otel struct | ||
// TODO : We need to define a proper scope for the metrics | ||
let meter = meter_provider.meter("ipa-helper"); | ||
self.to_otlp(&meter); | ||
|
||
let encoder = TextEncoder::new(); | ||
let metric_families = registry.gather(); | ||
// TODO: Handle error? | ||
encoder.encode(&metric_families, w).unwrap(); | ||
} | ||
} | ||
|
||
mod test { | ||
|
||
use std::thread::{self, Scope, ScopedJoinHandle}; | ||
|
||
use super::PrometheusMetricsExporter; | ||
use crate::{counter, install_new_thread, producer::Producer, MetricChannelType}; | ||
struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer); | ||
|
||
impl<'scope, 'env: 'scope> MeteredScope<'scope, 'env> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you don't need this complexity, each thread has access to its own metric store throuh |
||
fn spawn<F, T>(&self, f: F) -> ScopedJoinHandle<'scope, T> | ||
where | ||
F: FnOnce() -> T + Send + 'scope, | ||
T: Send + 'scope, | ||
{ | ||
let producer = self.1.clone(); | ||
|
||
self.0.spawn(move || { | ||
producer.install(); | ||
let r = f(); | ||
let _ = producer.drop_handle(); | ||
|
||
r | ||
}) | ||
} | ||
} | ||
|
||
trait IntoMetered<'scope, 'env: 'scope> { | ||
fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env>; | ||
} | ||
|
||
impl<'scope, 'env: 'scope> IntoMetered<'scope, 'env> for Scope<'scope, 'env> { | ||
fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env> { | ||
MeteredScope(self, meter) | ||
} | ||
} | ||
|
||
#[test] | ||
fn export_to_prometheus() { | ||
let (producer, controller, _) = install_new_thread(MetricChannelType::Rendezvous).unwrap(); | ||
|
||
thread::scope(move |s| { | ||
let s = s.metered(producer); | ||
s.spawn(|| counter!("baz", 4)).join().unwrap(); | ||
s.spawn(|| counter!("bar", 1)).join().unwrap(); | ||
|
||
let mut store = controller | ||
.snapshot() | ||
.expect("Metrics snapshot must be available"); | ||
|
||
let mut buff = Vec::new(); | ||
store.export(&mut buff); | ||
|
||
let result = String::from_utf8(buff).unwrap(); | ||
println!("{result}"); | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
mod collector; | ||
mod context; | ||
mod controller; | ||
mod exporter; | ||
mod key; | ||
mod kind; | ||
mod label; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ use hashbrown::hash_map::Entry; | |
use rustc_hash::FxBuildHasher; | ||
|
||
use crate::{ | ||
key::OwnedMetricName, | ||
kind::CounterValue, | ||
store::{CounterHandle, Store}, | ||
MetricName, | ||
|
@@ -120,6 +121,10 @@ impl PartitionedStore { | |
self.get_mut(CurrentThreadContext::get()).counter(key) | ||
} | ||
|
||
pub fn counters(&mut self) -> impl Iterator<Item = (&OwnedMetricName, CounterValue)> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is |
||
self.get_mut(CurrentThreadContext::get()).counters() | ||
} | ||
|
||
#[must_use] | ||
pub fn len(&self) -> usize { | ||
self.inner.len() + self.default_store.len() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akoshelev I'm still not clear how we can pull the metrics from the logging_handle. IpaHttpClient doesn't seem to have a reference to it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's right, it is not referenced from anywhere.
helper.rs
does have logging_handle that has a reference to this controller. So we need to plumb it through to deliverController
to the handlers