Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics exporter to Prometheus with OTLP #1438

Merged
merged 17 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ http-body-util = { version = "0.1.1", optional = true }
http-body = { version = "1", optional = true }
iai = { version = "0.1.1", optional = true }
once_cell = "1.18"
opentelemetry = "0.24"
opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = { version = "0.17" }
pin-project = "1.0"
prometheus = "0.13.3"
rand = "0.8"
rand_core = "0.6"
rcgen = { version = "0.11.3", optional = true }
Expand Down
41 changes: 41 additions & 0 deletions ipa-core/src/net/http_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ pub mod echo {
pub const AXUM_PATH: &str = "/echo";
}

pub mod metrics {
use std::collections::HashMap;

use axum::body::Body;
use hyper::http::uri;
use serde::{Deserialize, Serialize};

#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Request {
// pub headers: HashMap<String, String>,
}

impl Request {
pub fn new(// headers: HashMap<String, String>,
) -> Self {
Self {
// headers,
}
}
pub fn try_into_http_request(
self,
scheme: uri::Scheme,
authority: uri::Authority,
) -> crate::net::http_serde::OutgoingRequest {
let uri = uri::Uri::builder()
.scheme(scheme)
.authority(authority)
.build()?;

// let req = self
// .headers
// .into_iter()
// .fold(hyper::Request::get(uri), |req, (k, v)| req.header(k, v));
let req = hyper::Request::get(uri);
Ok(req.body(Body::empty())?)
}
}

pub const AXUM_PATH: &str = "/metrics";
}

pub mod query {
use std::fmt::{Display, Formatter};

Expand Down
14 changes: 8 additions & 6 deletions ipa-core/src/net/server/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use axum::Router;
use crate::net::{http_serde, transport::MpcHttpTransport, ShardHttpTransport};

pub fn mpc_router(transport: MpcHttpTransport) -> Router {
echo::router().nest(
http_serde::query::BASE_AXUM_PATH,
Router::new()
.merge(query::query_router(transport.clone()))
.merge(query::h2h_router(transport)),
)
echo::router()
.nest(
http_serde::query::BASE_AXUM_PATH,
Router::new()
.merge(query::query_router(transport.clone()))
.merge(query::h2h_router(transport)),
)
.merge(query::metric_router())
}

pub fn shard_router(transport: ShardHttpTransport) -> Router {
Expand Down
67 changes: 67 additions & 0 deletions ipa-core/src/net/server/handlers/query/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use axum::{routing::get, Router};
use hyper::StatusCode;
use opentelemetry::KeyValue;

use crate::net::{
http_serde::{self},
Error,
};

use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{self, Encoder, TextEncoder};

/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent
/// to the [`HttpTransport`].
async fn handler(// transport: Extension<MpcHttpTransport>,
// QueryConfigQueryParams(query_config): QueryConfigQueryParams,
) -> Result<Vec<u8>, Error> {
// match transport.dispatch(query_config, BodyStream::empty()).await {
// Ok(resp) => Ok(Json(resp.try_into()?)),
// Err(err @ ApiError::NewQuery(NewQueryError::State { .. })) => {
// Err(Error::application(StatusCode::CONFLICT, err))
// }
// Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)),
// }

// TODO: Remove this dummy metrics and get metrics for scraper from ipa-metrics::PrometheusMetricsExporter (see ipa-metrics/exporter.rs)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akoshelev I'm still not clear how we can pull the metrics from the logging_handle. IpaHttpClient doesn't seem to have a reference to it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right, it is not referenced from anywhere. helper.rs does have logging_handle that has a reference to this controller. So we need to plumb it through to deliver Controller to the handlers


// create a new prometheus registry
let registry = prometheus::Registry::new();

// configure OpenTelemetry to use this registry
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();

// set up a meter to create instruments
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
let meter = provider.meter("ipa-helper");

// Use two instruments
let counter = meter
.u64_counter("a.counter")
.with_description("Counts things")
.init();
let histogram = meter
.u64_histogram("a.histogram")
.with_description("Records values")
.init();

counter.add(100, &[KeyValue::new("key", "value")]);
histogram.record(100, &[KeyValue::new("key", "value")]);

// Encode data as text or protobuf
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut result = Vec::new();
match encoder.encode(&metric_families, &mut result) {
Ok(()) => Ok(result),
Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)),
}
}

pub fn router() -> Router {
Router::new().route(http_serde::metrics::AXUM_PATH, get(handler))
}
6 changes: 6 additions & 0 deletions ipa-core/src/net/server/handlers/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod create;
mod input;
mod kill;
mod metrics;
mod prepare;
mod results;
mod status;
Expand Down Expand Up @@ -59,6 +60,11 @@ pub fn s2s_router(transport: ShardHttpTransport) -> Router {
.layer(layer_fn(HelperAuthentication::<_, Shard>::new))
}

/// Construct router for exporting metrics to metrics backend (e.g. Prometheus scraper)
pub fn metric_router() -> Router {
Router::new().merge(metrics::router())
}

/// Returns HTTP 401 Unauthorized if the request does not have valid authentication.
///
/// Authentication information is carried via the `ClientIdentity` request extension. The extension
Expand Down
5 changes: 5 additions & 0 deletions ipa-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ partitions = []
crossbeam-channel = "0.5"
# This crate uses raw entry API that is unstable in stdlib
hashbrown = "0.15"
# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC)
opentelemetry = "0.24"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great, but can you make a separate crate for it? ipa-metrics is intended to contain only core functionality and it is easier to maintain in the long term if it does not have extra dependencies

opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = { version = "0.17" }
prometheus = "0.13.3"
# Fast non-collision-resistant hashing
rustc-hash = "2.0.0"
# logging
Expand Down
111 changes: 111 additions & 0 deletions ipa-metrics/src/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::io;

use opentelemetry::metrics::{Meter, MeterProvider};
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{self, Encoder, TextEncoder};

use crate::MetricsStore;

pub trait PrometheusMetricsExporter {
fn export<W: io::Write>(&mut self, w: &mut W);
}

impl MetricsStore {
fn to_otlp(&mut self, meter: &Meter) {
let counters = self.counters();

counters.for_each(|(counter_name, counter_value)| {
let otlp_counter = meter.u64_counter(counter_name.key).init();

let attributes: Vec<KeyValue> = counter_name
.labels()
.map(|l| KeyValue::new(l.name, l.val.to_string()))
.collect();

otlp_counter.add(counter_value, &attributes[..]);
});
}
}

impl PrometheusMetricsExporter for MetricsStore {
fn export<W: io::Write>(&mut self, w: &mut W) {
// Setup prometheus registry and open-telemetry exporter
let registry = prometheus::Registry::new();

let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();

let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build();

// Convert the snapshot to otel struct
// TODO : We need to define a proper scope for the metrics
let meter = meter_provider.meter("ipa-helper");
self.to_otlp(&meter);

let encoder = TextEncoder::new();
let metric_families = registry.gather();
// TODO: Handle error?
encoder.encode(&metric_families, w).unwrap();
}
}

mod test {

use std::thread::{self, Scope, ScopedJoinHandle};

use super::PrometheusMetricsExporter;
use crate::{counter, install_new_thread, producer::Producer, MetricChannelType};
struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer);

impl<'scope, 'env: 'scope> MeteredScope<'scope, 'env> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need this complexity, each thread has access to its own metric store throuh CurrentThreadMetricContext

fn spawn<F, T>(&self, f: F) -> ScopedJoinHandle<'scope, T>
where
F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope,
{
let producer = self.1.clone();

self.0.spawn(move || {
producer.install();
let r = f();
let _ = producer.drop_handle();

r
})
}
}

trait IntoMetered<'scope, 'env: 'scope> {
fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env>;
}

impl<'scope, 'env: 'scope> IntoMetered<'scope, 'env> for Scope<'scope, 'env> {
fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env> {
MeteredScope(self, meter)
}
}

#[test]
fn export_to_prometheus() {
let (producer, controller, _) = install_new_thread(MetricChannelType::Rendezvous).unwrap();

thread::scope(move |s| {
let s = s.metered(producer);
s.spawn(|| counter!("baz", 4)).join().unwrap();
s.spawn(|| counter!("bar", 1)).join().unwrap();

let mut store = controller
.snapshot()
.expect("Metrics snapshot must be available");

let mut buff = Vec::new();
store.export(&mut buff);

let result = String::from_utf8(buff).unwrap();
println!("{result}");
});
}
}
1 change: 1 addition & 0 deletions ipa-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
mod collector;
mod context;
mod controller;
mod exporter;
mod key;
mod kind;
mod label;
Expand Down
5 changes: 5 additions & 0 deletions ipa-metrics/src/partitioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use hashbrown::hash_map::Entry;
use rustc_hash::FxBuildHasher;

use crate::{
key::OwnedMetricName,
kind::CounterValue,
store::{CounterHandle, Store},
MetricName,
Expand Down Expand Up @@ -120,6 +121,10 @@ impl PartitionedStore {
self.get_mut(CurrentThreadContext::get()).counter(key)
}

pub fn counters(&mut self) -> impl Iterator<Item = (&OwnedMetricName, CounterValue)> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is &mut required here or &self can work too?

self.get_mut(CurrentThreadContext::get()).counters()
}

#[must_use]
pub fn len(&self) -> usize {
self.inner.len() + self.default_store.len()
Expand Down
Loading