Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics exporter to Prometheus with OTLP #1438

Merged
merged 17 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ipa-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ http-body-util = { version = "0.1.1", optional = true }
http-body = { version = "1", optional = true }
iai = { version = "0.1.1", optional = true }
once_cell = "1.18"
opentelemetry = "0.24"
opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = { version = "0.17" }
pin-project = "1.0"
prometheus = "0.13.3"
rand = "0.8"
rand_core = "0.6"
rcgen = { version = "0.11.3", optional = true }
Expand Down
41 changes: 41 additions & 0 deletions ipa-core/src/net/http_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ pub mod echo {
pub const AXUM_PATH: &str = "/echo";
}

pub mod metrics {
use std::collections::HashMap;

use axum::body::Body;
use hyper::http::uri;
use serde::{Deserialize, Serialize};

#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Request {
// pub headers: HashMap<String, String>,
}

impl Request {
pub fn new(// headers: HashMap<String, String>,
) -> Self {
Self {
// headers,
}
}
pub fn try_into_http_request(
self,
scheme: uri::Scheme,
authority: uri::Authority,
) -> crate::net::http_serde::OutgoingRequest {
let uri = uri::Uri::builder()
.scheme(scheme)
.authority(authority)
.build()?;

// let req = self
// .headers
// .into_iter()
// .fold(hyper::Request::get(uri), |req, (k, v)| req.header(k, v));
let req = hyper::Request::get(uri);
Ok(req.body(Body::empty())?)
}
}

pub const AXUM_PATH: &str = "/metrics";
}

pub mod query {
use std::fmt::{Display, Formatter};

Expand Down
14 changes: 8 additions & 6 deletions ipa-core/src/net/server/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use axum::Router;
use crate::net::{http_serde, transport::MpcHttpTransport, ShardHttpTransport};

pub fn mpc_router(transport: MpcHttpTransport) -> Router {
echo::router().nest(
http_serde::query::BASE_AXUM_PATH,
Router::new()
.merge(query::query_router(transport.clone()))
.merge(query::h2h_router(transport)),
)
echo::router()
.nest(
http_serde::query::BASE_AXUM_PATH,
Router::new()
.merge(query::query_router(transport.clone()))
.merge(query::h2h_router(transport)),
)
.merge(query::metric_router())
}

pub fn shard_router(transport: ShardHttpTransport) -> Router {
Expand Down
67 changes: 67 additions & 0 deletions ipa-core/src/net/server/handlers/query/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use axum::{routing::get, Router};
use hyper::StatusCode;
use opentelemetry::KeyValue;

use crate::net::{
http_serde::{self},
Error,
};

use opentelemetry::metrics::MeterProvider;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{self, Encoder, TextEncoder};

/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent
/// to the [`HttpTransport`].
async fn handler(// transport: Extension<MpcHttpTransport>,
// QueryConfigQueryParams(query_config): QueryConfigQueryParams,
) -> Result<Vec<u8>, Error> {
// match transport.dispatch(query_config, BodyStream::empty()).await {
// Ok(resp) => Ok(Json(resp.try_into()?)),
// Err(err @ ApiError::NewQuery(NewQueryError::State { .. })) => {
// Err(Error::application(StatusCode::CONFLICT, err))
// }
// Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)),
// }

// TODO: Remove this dummy metrics and get metrics for scraper from ipa-metrics::PrometheusMetricsExporter (see ipa-metrics/exporter.rs)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akoshelev I'm still not clear how we can pull the metrics from the logging_handle. IpaHttpClient doesn't seem to have a reference to it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's right, it is not referenced from anywhere. helper.rs does have logging_handle that has a reference to this controller. So we need to plumb it through to deliver Controller to the handlers


// create a new prometheus registry
let registry = prometheus::Registry::new();

// configure OpenTelemetry to use this registry
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();

// set up a meter to create instruments
let provider = SdkMeterProvider::builder().with_reader(exporter).build();
let meter = provider.meter("ipa-helper");

// Use two instruments
let counter = meter
.u64_counter("a.counter")
.with_description("Counts things")
.init();
let histogram = meter
.u64_histogram("a.histogram")
.with_description("Records values")
.init();

counter.add(100, &[KeyValue::new("key", "value")]);
histogram.record(100, &[KeyValue::new("key", "value")]);

// Encode data as text or protobuf
let encoder = TextEncoder::new();
let metric_families = registry.gather();
let mut result = Vec::new();
match encoder.encode(&metric_families, &mut result) {
Ok(()) => Ok(result),
Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)),
}
}

pub fn router() -> Router {
Router::new().route(http_serde::metrics::AXUM_PATH, get(handler))
}
6 changes: 6 additions & 0 deletions ipa-core/src/net/server/handlers/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod create;
mod input;
mod kill;
mod metrics;
mod prepare;
mod results;
mod status;
Expand Down Expand Up @@ -59,6 +60,11 @@ pub fn s2s_router(transport: ShardHttpTransport) -> Router {
.layer(layer_fn(HelperAuthentication::<_, Shard>::new))
}

/// Construct router for exporting metrics to metrics backend (e.g. Prometheus scraper)
pub fn metric_router() -> Router {
Router::new().merge(metrics::router())
}

/// Returns HTTP 401 Unauthorized if the request does not have valid authentication.
///
/// Authentication information is carried via the `ClientIdentity` request extension. The extension
Expand Down
16 changes: 16 additions & 0 deletions ipa-metrics-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "ipa-metrics-prometheus"
version = "0.1.0"
edition = "2021"

[features]
default = []

[dependencies]
ipa-metrics = { path = "../ipa-metrics" }

# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC)
opentelemetry = "0.24"
opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] }
opentelemetry-prometheus = { version = "0.17" }
prometheus = "0.13.3"
79 changes: 79 additions & 0 deletions ipa-metrics-prometheus/src/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::io;

use opentelemetry::metrics::{Meter, MeterProvider};
use opentelemetry::KeyValue;
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{self, Encoder, TextEncoder};

use ipa_metrics::MetricsStore;

pub trait PrometheusMetricsExporter {
fn export<W: io::Write>(&mut self, w: &mut W);
}

impl PrometheusMetricsExporter for MetricsStore {
fn export<W: io::Write>(&mut self, w: &mut W) {
// Setup prometheus registry and open-telemetry exporter
let registry = prometheus::Registry::new();

let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.build()
.unwrap();

let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build();

// Convert the snapshot to otel struct
// TODO : We need to define a proper scope for the metrics
let meter = meter_provider.meter("ipa-helper");

let counters = self.counters();
counters.for_each(|(counter_name, counter_value)| {
let otlp_counter = meter.u64_counter(counter_name.key).init();

let attributes: Vec<KeyValue> = counter_name
.labels()
.map(|l| KeyValue::new(l.name, l.val.to_string()))
.collect();

otlp_counter.add(counter_value, &attributes[..]);
});

let encoder = TextEncoder::new();
let metric_families = registry.gather();
// TODO: Handle error?
encoder.encode(&metric_families, w).unwrap();
}
}

#[cfg(test)]
mod test {

use std::thread;

use ipa_metrics::{counter, install_new_thread, MetricChannelType};

use super::PrometheusMetricsExporter;

#[test]
fn export_to_prometheus() {
let (producer, controller, _) = install_new_thread(MetricChannelType::Rendezvous).unwrap();

thread::spawn(move || {
producer.install();
counter!("baz", 4);
counter!("bar", 1);
let _ = producer.drop_handle();
})
.join()
.unwrap();

let mut store = controller.snapshot().unwrap();

let mut buff = Vec::new();
store.export(&mut buff);

let result = String::from_utf8(buff).unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to validate the results somehow. Also consider removing the println.

println!("Export to Prometheus: {result}");
}
}
1 change: 1 addition & 0 deletions ipa-metrics-prometheus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod exporter;
1 change: 0 additions & 1 deletion ipa-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ hashbrown = "0.15"
rustc-hash = "2.0.0"
# logging
tracing = "0.1"

5 changes: 5 additions & 0 deletions ipa-metrics/src/partitioned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use hashbrown::hash_map::Entry;
use rustc_hash::FxBuildHasher;

use crate::{
key::OwnedMetricName,
kind::CounterValue,
store::{CounterHandle, Store},
MetricName,
Expand Down Expand Up @@ -120,6 +121,10 @@ impl PartitionedStore {
self.get_mut(CurrentThreadContext::get()).counter(key)
}

pub fn counters(&mut self) -> impl Iterator<Item = (&OwnedMetricName, CounterValue)> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is &mut required here or &self can work too?

self.get_mut(CurrentThreadContext::get()).counters()
}

#[must_use]
pub fn len(&self) -> usize {
self.inner.len() + self.default_store.len()
Expand Down
Loading