From b57aa09d212539adf6e12418b9e42fdaa297a2d1 Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Thu, 7 Nov 2024 15:20:41 +0800 Subject: [PATCH 01/12] Add /metrics route for metric backend scraper --- ipa-core/src/net/http_serde.rs | 42 +++++++++++++++++++ ipa-core/src/net/server/handlers/mod.rs | 1 + .../src/net/server/handlers/query/metrics.rs | 23 ++++++++++ ipa-core/src/net/server/handlers/query/mod.rs | 7 ++++ 4 files changed, 73 insertions(+) create mode 100644 ipa-core/src/net/server/handlers/query/metrics.rs diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index 1965c15ce..3ca976a53 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -68,6 +68,48 @@ pub mod echo { pub const AXUM_PATH: &str = "/echo"; } +pub mod metrics { + use std::collections::HashMap; + + use axum::body::Body; + use hyper::http::uri; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub struct Request { + // pub headers: HashMap, + } + + impl Request { + pub fn new( + // headers: HashMap, + ) -> Self { + Self { + // headers, + } + } + pub fn try_into_http_request( + self, + scheme: uri::Scheme, + authority: uri::Authority, + ) -> crate::net::http_serde::OutgoingRequest { + let uri = uri::Uri::builder() + .scheme(scheme) + .authority(authority) + .build()?; + + // let req = self + // .headers + // .into_iter() + // .fold(hyper::Request::get(uri), |req, (k, v)| req.header(k, v)); + let req = hyper::Request::get(uri); + Ok(req.body(Body::empty())?) + } + } + + pub const AXUM_PATH: &str = "/metrics"; +} + pub mod query { use std::fmt::{Display, Formatter}; diff --git a/ipa-core/src/net/server/handlers/mod.rs b/ipa-core/src/net/server/handlers/mod.rs index dc99ebff5..709a55ee7 100644 --- a/ipa-core/src/net/server/handlers/mod.rs +++ b/ipa-core/src/net/server/handlers/mod.rs @@ -12,6 +12,7 @@ pub fn mpc_router(transport: MpcHttpTransport) -> Router { .merge(query::query_router(transport.clone())) .merge(query::h2h_router(transport)), ) + .merge(query::metric_router()) } pub fn shard_router(transport: ShardHttpTransport) -> Router { diff --git a/ipa-core/src/net/server/handlers/query/metrics.rs b/ipa-core/src/net/server/handlers/query/metrics.rs new file mode 100644 index 000000000..88b306bc7 --- /dev/null +++ b/ipa-core/src/net/server/handlers/query/metrics.rs @@ -0,0 +1,23 @@ +use axum::{routing::get, Router}; + +use crate::net::http_serde::{self}; + +/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent +/// to the [`HttpTransport`]. +async fn handler( + // transport: Extension, + // QueryConfigQueryParams(query_config): QueryConfigQueryParams, +) -> &'static str { + // match transport.dispatch(query_config, BodyStream::empty()).await { + // Ok(resp) => Ok(Json(resp.try_into()?)), + // Err(err @ ApiError::NewQuery(NewQueryError::State { .. })) => { + // Err(Error::application(StatusCode::CONFLICT, err)) + // } + // Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), + // } + "hello world" +} + +pub fn router() -> Router { + Router::new().route(http_serde::metrics::AXUM_PATH, get(handler)) +} \ No newline at end of file diff --git a/ipa-core/src/net/server/handlers/query/mod.rs b/ipa-core/src/net/server/handlers/query/mod.rs index 5f43b9ef5..c4a644dfb 100644 --- a/ipa-core/src/net/server/handlers/query/mod.rs +++ b/ipa-core/src/net/server/handlers/query/mod.rs @@ -5,6 +5,7 @@ mod prepare; mod results; mod status; mod step; +mod metrics; use std::marker::PhantomData; @@ -59,6 +60,12 @@ pub fn s2s_router(transport: ShardHttpTransport) -> Router { .layer(layer_fn(HelperAuthentication::<_, Shard>::new)) } +/// Construct router for exporting metrics to metrics backend (e.g. Prometheus scraper) +pub fn metric_router() -> Router { + Router::new() + .merge(metrics::router()) +} + /// Returns HTTP 401 Unauthorized if the request does not have valid authentication. /// /// Authentication information is carried via the `ClientIdentity` request extension. The extension From 92329b55fb113b129cf80d59d93e5d046a935537 Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Mon, 11 Nov 2024 15:34:51 +0800 Subject: [PATCH 02/12] /metrics endpoint returns prometheus sample metrics with otel --- ipa-core/Cargo.toml | 4 ++ .../src/net/server/handlers/query/metrics.rs | 46 +++++++++++++++++-- 2 files changed, 47 insertions(+), 3 deletions(-) diff --git a/ipa-core/Cargo.toml b/ipa-core/Cargo.toml index 017f67ab6..c79640a11 100644 --- a/ipa-core/Cargo.toml +++ b/ipa-core/Cargo.toml @@ -131,7 +131,11 @@ http-body-util = { version = "0.1.1", optional = true } http-body = { version = "1", optional = true } iai = { version = "0.1.1", optional = true } once_cell = "1.18" +opentelemetry = "0.24" +opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] } +opentelemetry-prometheus = { version = "0.17" } pin-project = "1.0" +prometheus = "0.13.3" rand = "0.8" rand_core = "0.6" rcgen = { version = "0.11.3", optional = true } diff --git a/ipa-core/src/net/server/handlers/query/metrics.rs b/ipa-core/src/net/server/handlers/query/metrics.rs index 88b306bc7..6fd1f765b 100644 --- a/ipa-core/src/net/server/handlers/query/metrics.rs +++ b/ipa-core/src/net/server/handlers/query/metrics.rs @@ -1,13 +1,21 @@ use axum::{routing::get, Router}; +use opentelemetry::KeyValue; -use crate::net::http_serde::{self}; +use crate::net::{ + http_serde::{self}, + Error, +}; + +use prometheus::{self, TextEncoder}; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry::metrics::MeterProvider; /// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent /// to the [`HttpTransport`]. async fn handler( // transport: Extension, // QueryConfigQueryParams(query_config): QueryConfigQueryParams, -) -> &'static str { +) -> Result { // match transport.dispatch(query_config, BodyStream::empty()).await { // Ok(resp) => Ok(Json(resp.try_into()?)), // Err(err @ ApiError::NewQuery(NewQueryError::State { .. })) => { @@ -15,7 +23,39 @@ async fn handler( // } // Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), // } - "hello world" + + // create a new prometheus registry + let registry = prometheus::Registry::new(); + + // configure OpenTelemetry to use this registry + let exporter = opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build().unwrap(); + + // set up a meter to create instruments + let provider = SdkMeterProvider::builder().with_reader(exporter).build(); + let meter = provider.meter("ipa-helper"); + + // Use two instruments + let counter = meter + .u64_counter("a.counter") + .with_description("Counts things") + .init(); + let histogram = meter + .u64_histogram("a.histogram") + .with_description("Records values") + .init(); + + counter.add(100, &[KeyValue::new("key", "value")]); + histogram.record(100, &[KeyValue::new("key", "value")]); + + // Encode data as text or protobuf + let encoder = TextEncoder::new(); + let metric_families = registry.gather(); + let mut result = String::new(); + encoder.encode_utf8(&metric_families, &mut result).unwrap(); + + Ok(result) } pub fn router() -> Router { From 68a42b61193b384e880e765d87a48075659e3e6c Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Tue, 12 Nov 2024 16:37:50 +0800 Subject: [PATCH 03/12] Added metrics exporter for prometheus --- .../src/net/server/handlers/query/metrics.rs | 14 ++-- ipa-metrics/Cargo.toml | 5 ++ ipa-metrics/src/exporter.rs | 65 +++++++++++++++++++ ipa-metrics/src/lib.rs | 1 + 4 files changed, 79 insertions(+), 6 deletions(-) create mode 100644 ipa-metrics/src/exporter.rs diff --git a/ipa-core/src/net/server/handlers/query/metrics.rs b/ipa-core/src/net/server/handlers/query/metrics.rs index 6fd1f765b..77ba7d7b6 100644 --- a/ipa-core/src/net/server/handlers/query/metrics.rs +++ b/ipa-core/src/net/server/handlers/query/metrics.rs @@ -1,4 +1,5 @@ use axum::{routing::get, Router}; +use hyper::StatusCode; use opentelemetry::KeyValue; use crate::net::{ @@ -6,7 +7,7 @@ use crate::net::{ Error, }; -use prometheus::{self, TextEncoder}; +use prometheus::{self, Encoder, TextEncoder}; use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry::metrics::MeterProvider; @@ -15,7 +16,7 @@ use opentelemetry::metrics::MeterProvider; async fn handler( // transport: Extension, // QueryConfigQueryParams(query_config): QueryConfigQueryParams, -) -> Result { +) -> Result, Error> { // match transport.dispatch(query_config, BodyStream::empty()).await { // Ok(resp) => Ok(Json(resp.try_into()?)), // Err(err @ ApiError::NewQuery(NewQueryError::State { .. })) => { @@ -52,10 +53,11 @@ async fn handler( // Encode data as text or protobuf let encoder = TextEncoder::new(); let metric_families = registry.gather(); - let mut result = String::new(); - encoder.encode_utf8(&metric_families, &mut result).unwrap(); - - Ok(result) + let mut result = Vec::new(); + match encoder.encode(&metric_families, &mut result) { + Ok(()) => Ok(result), + Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), + } } pub fn router() -> Router { diff --git a/ipa-metrics/Cargo.toml b/ipa-metrics/Cargo.toml index ebaeb9473..6483d4637 100644 --- a/ipa-metrics/Cargo.toml +++ b/ipa-metrics/Cargo.toml @@ -13,6 +13,11 @@ partitions = [] crossbeam-channel = "0.5" # This crate uses raw entry API that is unstable in stdlib hashbrown = "0.15" +# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC) +opentelemetry = "0.24" +opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] } +opentelemetry-prometheus = { version = "0.17" } +prometheus = "0.13.3" # Fast non-collision-resistant hashing rustc-hash = "2.0.0" # logging diff --git a/ipa-metrics/src/exporter.rs b/ipa-metrics/src/exporter.rs new file mode 100644 index 000000000..3a7e18e9f --- /dev/null +++ b/ipa-metrics/src/exporter.rs @@ -0,0 +1,65 @@ +use std::borrow::Cow; +use prometheus::{self, Encoder, TextEncoder}; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry::KeyValue; +use opentelemetry::metrics::MeterProvider; + +pub trait MetricsExporter { + fn export (&self) -> Result, String>; +} + +pub struct PrometheusMetricsExporter { + scope: Cow<'static, str>, + registry: prometheus::Registry, + meter_provider: SdkMeterProvider, +} + +impl PrometheusMetricsExporter { + fn new(scope: impl Into>) -> PrometheusMetricsExporter { + // create a new prometheus registry + let registry = prometheus::Registry::new(); + + // configure OpenTelemetry to use this registry + let exporter = opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build().unwrap(); + + // set up a meter to create instruments + let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build(); + + PrometheusMetricsExporter { + scope: scope.into(), + registry, + meter_provider, + } + } +} + +impl MetricsExporter for PrometheusMetricsExporter { + fn export(&self) -> Result, String> { + // Get snapshot from controller : how? + // Convert the snapshot to otel struct + + let meter = self.meter_provider.meter(self.scope.clone()); + // This is basically a dummy metrics + let counter = meter + .u64_counter("a.counter") + .with_description("Counts things") + .init(); + let histogram = meter + .u64_histogram("a.histogram") + .with_description("Records values") + .init(); + + counter.add(100, &[KeyValue::new("key", "value")]); + histogram.record(100, &[KeyValue::new("key", "value")]); + + let encoder = TextEncoder::new(); + let metric_families = self.registry.gather(); + let mut buffer = Vec::new(); + match encoder.encode(&metric_families, &mut buffer) { + Ok(()) => Ok(buffer), + Err(e) => Err(format!("Failed to encode Prometheus metric: {e:?}")), + } + } +} \ No newline at end of file diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs index 2449d41a3..1e2a08a73 100644 --- a/ipa-metrics/src/lib.rs +++ b/ipa-metrics/src/lib.rs @@ -5,6 +5,7 @@ mod collector; mod context; mod controller; +mod exporter; mod key; mod kind; mod label; From a8235debd30d971be4b441ea1e080b2bce321c44 Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Thu, 14 Nov 2024 15:26:10 +0800 Subject: [PATCH 04/12] Add conversion from ipa-metric counter to OTLP --- ipa-core/src/net/http_serde.rs | 9 +- ipa-core/src/net/server/handlers/mod.rs | 15 +- .../src/net/server/handlers/query/metrics.rs | 14 +- ipa-core/src/net/server/handlers/query/mod.rs | 5 +- ipa-metrics/src/exporter.rs | 134 ++++++++++++------ ipa-metrics/src/partitioned.rs | 5 + 6 files changed, 117 insertions(+), 65 deletions(-) diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index 3ca976a53..e1c132b51 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -81,8 +81,7 @@ pub mod metrics { } impl Request { - pub fn new( - // headers: HashMap, + pub fn new(// headers: HashMap, ) -> Self { Self { // headers, @@ -99,9 +98,9 @@ pub mod metrics { .build()?; // let req = self - // .headers - // .into_iter() - // .fold(hyper::Request::get(uri), |req, (k, v)| req.header(k, v)); + // .headers + // .into_iter() + // .fold(hyper::Request::get(uri), |req, (k, v)| req.header(k, v)); let req = hyper::Request::get(uri); Ok(req.body(Body::empty())?) } diff --git a/ipa-core/src/net/server/handlers/mod.rs b/ipa-core/src/net/server/handlers/mod.rs index 709a55ee7..5db0f20ae 100644 --- a/ipa-core/src/net/server/handlers/mod.rs +++ b/ipa-core/src/net/server/handlers/mod.rs @@ -6,13 +6,14 @@ use axum::Router; use crate::net::{http_serde, transport::MpcHttpTransport, ShardHttpTransport}; pub fn mpc_router(transport: MpcHttpTransport) -> Router { - echo::router().nest( - http_serde::query::BASE_AXUM_PATH, - Router::new() - .merge(query::query_router(transport.clone())) - .merge(query::h2h_router(transport)), - ) - .merge(query::metric_router()) + echo::router() + .nest( + http_serde::query::BASE_AXUM_PATH, + Router::new() + .merge(query::query_router(transport.clone())) + .merge(query::h2h_router(transport)), + ) + .merge(query::metric_router()) } pub fn shard_router(transport: ShardHttpTransport) -> Router { diff --git a/ipa-core/src/net/server/handlers/query/metrics.rs b/ipa-core/src/net/server/handlers/query/metrics.rs index 77ba7d7b6..0e9a19b5f 100644 --- a/ipa-core/src/net/server/handlers/query/metrics.rs +++ b/ipa-core/src/net/server/handlers/query/metrics.rs @@ -7,14 +7,13 @@ use crate::net::{ Error, }; -use prometheus::{self, Encoder, TextEncoder}; -use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry::metrics::MeterProvider; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use prometheus::{self, Encoder, TextEncoder}; /// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent /// to the [`HttpTransport`]. -async fn handler( - // transport: Extension, +async fn handler(// transport: Extension, // QueryConfigQueryParams(query_config): QueryConfigQueryParams, ) -> Result, Error> { // match transport.dispatch(query_config, BodyStream::empty()).await { @@ -25,13 +24,16 @@ async fn handler( // Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), // } + // TODO: Remove this dummy metrics and get metrics for scraper from ipa-metrics::PrometheusMetricsExporter (see ipa-metrics/exporter.rs) + // create a new prometheus registry let registry = prometheus::Registry::new(); // configure OpenTelemetry to use this registry let exporter = opentelemetry_prometheus::exporter() .with_registry(registry.clone()) - .build().unwrap(); + .build() + .unwrap(); // set up a meter to create instruments let provider = SdkMeterProvider::builder().with_reader(exporter).build(); @@ -62,4 +64,4 @@ async fn handler( pub fn router() -> Router { Router::new().route(http_serde::metrics::AXUM_PATH, get(handler)) -} \ No newline at end of file +} diff --git a/ipa-core/src/net/server/handlers/query/mod.rs b/ipa-core/src/net/server/handlers/query/mod.rs index fec828448..4dd352da5 100644 --- a/ipa-core/src/net/server/handlers/query/mod.rs +++ b/ipa-core/src/net/server/handlers/query/mod.rs @@ -1,11 +1,11 @@ mod create; mod input; mod kill; +mod metrics; mod prepare; mod results; mod status; mod step; -mod metrics; use std::marker::PhantomData; @@ -62,8 +62,7 @@ pub fn s2s_router(transport: ShardHttpTransport) -> Router { /// Construct router for exporting metrics to metrics backend (e.g. Prometheus scraper) pub fn metric_router() -> Router { - Router::new() - .merge(metrics::router()) + Router::new().merge(metrics::router()) } /// Returns HTTP 401 Unauthorized if the request does not have valid authentication. diff --git a/ipa-metrics/src/exporter.rs b/ipa-metrics/src/exporter.rs index 3a7e18e9f..bbe426130 100644 --- a/ipa-metrics/src/exporter.rs +++ b/ipa-metrics/src/exporter.rs @@ -1,65 +1,111 @@ -use std::borrow::Cow; -use prometheus::{self, Encoder, TextEncoder}; -use opentelemetry_sdk::metrics::SdkMeterProvider; +use std::io; + +use opentelemetry::metrics::{Meter, MeterProvider}; use opentelemetry::KeyValue; -use opentelemetry::metrics::MeterProvider; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use prometheus::{self, Encoder, TextEncoder}; + +use crate::MetricsStore; -pub trait MetricsExporter { - fn export (&self) -> Result, String>; +pub trait PrometheusMetricsExporter { + fn export(&mut self, w: &mut W); } -pub struct PrometheusMetricsExporter { - scope: Cow<'static, str>, - registry: prometheus::Registry, - meter_provider: SdkMeterProvider, +impl MetricsStore { + fn to_otlp(&mut self, meter: &Meter) { + let counters = self.counters(); + + counters.for_each(|(counter_name, counter_value)| { + let otlp_counter = meter.u64_counter(counter_name.key).init(); + + let attributes: Vec = counter_name + .labels() + .map(|l| KeyValue::new(l.name, l.val.to_string())) + .collect(); + + otlp_counter.add(counter_value, &attributes[..]); + }); + } } -impl PrometheusMetricsExporter { - fn new(scope: impl Into>) -> PrometheusMetricsExporter { - // create a new prometheus registry +impl PrometheusMetricsExporter for MetricsStore { + fn export(&mut self, w: &mut W) { + // Setup prometheus registry and open-telemetry exporter let registry = prometheus::Registry::new(); - // configure OpenTelemetry to use this registry let exporter = opentelemetry_prometheus::exporter() .with_registry(registry.clone()) - .build().unwrap(); + .build() + .unwrap(); - // set up a meter to create instruments let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build(); - PrometheusMetricsExporter { - scope: scope.into(), - registry, - meter_provider, - } + // Convert the snapshot to otel struct + // TODO : We need to define a proper scope for the metrics + let meter = meter_provider.meter("ipa-helper"); + self.to_otlp(&meter); + + let encoder = TextEncoder::new(); + let metric_families = registry.gather(); + // TODO: Handle error? + encoder.encode(&metric_families, w).unwrap(); } } -impl MetricsExporter for PrometheusMetricsExporter { - fn export(&self) -> Result, String> { - // Get snapshot from controller : how? - // Convert the snapshot to otel struct +mod test { - let meter = self.meter_provider.meter(self.scope.clone()); - // This is basically a dummy metrics - let counter = meter - .u64_counter("a.counter") - .with_description("Counts things") - .init(); - let histogram = meter - .u64_histogram("a.histogram") - .with_description("Records values") - .init(); + use std::thread::{self, Scope, ScopedJoinHandle}; - counter.add(100, &[KeyValue::new("key", "value")]); - histogram.record(100, &[KeyValue::new("key", "value")]); + use super::PrometheusMetricsExporter; + use crate::{counter, install_new_thread, producer::Producer, MetricChannelType}; + struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer); - let encoder = TextEncoder::new(); - let metric_families = self.registry.gather(); - let mut buffer = Vec::new(); - match encoder.encode(&metric_families, &mut buffer) { - Ok(()) => Ok(buffer), - Err(e) => Err(format!("Failed to encode Prometheus metric: {e:?}")), + impl<'scope, 'env: 'scope> MeteredScope<'scope, 'env> { + fn spawn(&self, f: F) -> ScopedJoinHandle<'scope, T> + where + F: FnOnce() -> T + Send + 'scope, + T: Send + 'scope, + { + let producer = self.1.clone(); + + self.0.spawn(move || { + producer.install(); + let r = f(); + let _ = producer.drop_handle(); + + r + }) + } + } + + trait IntoMetered<'scope, 'env: 'scope> { + fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env>; + } + + impl<'scope, 'env: 'scope> IntoMetered<'scope, 'env> for Scope<'scope, 'env> { + fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env> { + MeteredScope(self, meter) } } -} \ No newline at end of file + + #[test] + fn export_to_prometheus() { + let (producer, controller, _) = install_new_thread(MetricChannelType::Rendezvous).unwrap(); + + thread::scope(move |s| { + let s = s.metered(producer); + s.spawn(|| counter!("baz", 4)).join().unwrap(); + s.spawn(|| counter!("bar", 1)).join().unwrap(); + + let mut store = controller + .snapshot() + .expect("Metrics snapshot must be available"); + + let mut buff = Vec::new(); + store.export(&mut buff); + + let result = String::from_utf8(buff).unwrap(); + println!("{result}"); + }); + } +} diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs index 0f71d0e28..065915cbe 100644 --- a/ipa-metrics/src/partitioned.rs +++ b/ipa-metrics/src/partitioned.rs @@ -20,6 +20,7 @@ use hashbrown::hash_map::Entry; use rustc_hash::FxBuildHasher; use crate::{ + key::OwnedMetricName, kind::CounterValue, store::{CounterHandle, Store}, MetricName, @@ -120,6 +121,10 @@ impl PartitionedStore { self.get_mut(CurrentThreadContext::get()).counter(key) } + pub fn counters(&mut self) -> impl Iterator { + self.get_mut(CurrentThreadContext::get()).counters() + } + #[must_use] pub fn len(&self) -> usize { self.inner.len() + self.default_store.len() From dcbbf517948ee031b2d437e3a2ae9c81f8819f9f Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Mon, 18 Nov 2024 14:30:15 +0800 Subject: [PATCH 05/12] Simplified test case --- ipa-metrics/src/exporter.rs | 59 ++++++++++--------------------------- 1 file changed, 16 insertions(+), 43 deletions(-) diff --git a/ipa-metrics/src/exporter.rs b/ipa-metrics/src/exporter.rs index bbe426130..c0930fc2e 100644 --- a/ipa-metrics/src/exporter.rs +++ b/ipa-metrics/src/exporter.rs @@ -52,60 +52,33 @@ impl PrometheusMetricsExporter for MetricsStore { } } +#[cfg(test)] mod test { - use std::thread::{self, Scope, ScopedJoinHandle}; + use std::thread; use super::PrometheusMetricsExporter; - use crate::{counter, install_new_thread, producer::Producer, MetricChannelType}; - struct MeteredScope<'scope, 'env: 'scope>(&'scope Scope<'scope, 'env>, Producer); - - impl<'scope, 'env: 'scope> MeteredScope<'scope, 'env> { - fn spawn(&self, f: F) -> ScopedJoinHandle<'scope, T> - where - F: FnOnce() -> T + Send + 'scope, - T: Send + 'scope, - { - let producer = self.1.clone(); - - self.0.spawn(move || { - producer.install(); - let r = f(); - let _ = producer.drop_handle(); - - r - }) - } - } - - trait IntoMetered<'scope, 'env: 'scope> { - fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env>; - } - - impl<'scope, 'env: 'scope> IntoMetered<'scope, 'env> for Scope<'scope, 'env> { - fn metered(&'scope self, meter: Producer) -> MeteredScope<'scope, 'env> { - MeteredScope(self, meter) - } - } + use crate::{counter, install_new_thread, MetricChannelType}; #[test] fn export_to_prometheus() { let (producer, controller, _) = install_new_thread(MetricChannelType::Rendezvous).unwrap(); - thread::scope(move |s| { - let s = s.metered(producer); - s.spawn(|| counter!("baz", 4)).join().unwrap(); - s.spawn(|| counter!("bar", 1)).join().unwrap(); + thread::spawn(move || { + producer.install(); + counter!("baz", 4); + counter!("bar", 1); + let _ = producer.drop_handle(); + }) + .join() + .unwrap(); - let mut store = controller - .snapshot() - .expect("Metrics snapshot must be available"); + let mut store = controller.snapshot().unwrap(); - let mut buff = Vec::new(); - store.export(&mut buff); + let mut buff = Vec::new(); + store.export(&mut buff); - let result = String::from_utf8(buff).unwrap(); - println!("{result}"); - }); + let result = String::from_utf8(buff).unwrap(); + println!("Export to Prometheus: {result}"); } } From 35fbb2e2221dfdd40dc588d1c01613e2a601d576 Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Mon, 18 Nov 2024 14:59:17 +0800 Subject: [PATCH 06/12] Move prometheus exporter to its own crate --- ipa-metrics-prometheus/Cargo.toml | 16 +++++++++ .../src/exporter.rs | 35 ++++++++----------- ipa-metrics-prometheus/src/lib.rs | 1 + ipa-metrics/Cargo.toml | 6 ---- ipa-metrics/src/lib.rs | 1 - 5 files changed, 32 insertions(+), 27 deletions(-) create mode 100644 ipa-metrics-prometheus/Cargo.toml rename {ipa-metrics => ipa-metrics-prometheus}/src/exporter.rs (92%) create mode 100644 ipa-metrics-prometheus/src/lib.rs diff --git a/ipa-metrics-prometheus/Cargo.toml b/ipa-metrics-prometheus/Cargo.toml new file mode 100644 index 000000000..36ce2be11 --- /dev/null +++ b/ipa-metrics-prometheus/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ipa-metrics-prometheus" +version = "0.1.0" +edition = "2021" + +[features] +default = [] + +[dependencies] +ipa-metrics = { path = "../ipa-metrics" } + +# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC) +opentelemetry = "0.24" +opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] } +opentelemetry-prometheus = { version = "0.17" } +prometheus = "0.13.3" diff --git a/ipa-metrics/src/exporter.rs b/ipa-metrics-prometheus/src/exporter.rs similarity index 92% rename from ipa-metrics/src/exporter.rs rename to ipa-metrics-prometheus/src/exporter.rs index c0930fc2e..fd4c6ab9f 100644 --- a/ipa-metrics/src/exporter.rs +++ b/ipa-metrics-prometheus/src/exporter.rs @@ -5,29 +5,12 @@ use opentelemetry::KeyValue; use opentelemetry_sdk::metrics::SdkMeterProvider; use prometheus::{self, Encoder, TextEncoder}; -use crate::MetricsStore; +use ipa_metrics::MetricsStore; pub trait PrometheusMetricsExporter { fn export(&mut self, w: &mut W); } -impl MetricsStore { - fn to_otlp(&mut self, meter: &Meter) { - let counters = self.counters(); - - counters.for_each(|(counter_name, counter_value)| { - let otlp_counter = meter.u64_counter(counter_name.key).init(); - - let attributes: Vec = counter_name - .labels() - .map(|l| KeyValue::new(l.name, l.val.to_string())) - .collect(); - - otlp_counter.add(counter_value, &attributes[..]); - }); - } -} - impl PrometheusMetricsExporter for MetricsStore { fn export(&mut self, w: &mut W) { // Setup prometheus registry and open-telemetry exporter @@ -43,7 +26,18 @@ impl PrometheusMetricsExporter for MetricsStore { // Convert the snapshot to otel struct // TODO : We need to define a proper scope for the metrics let meter = meter_provider.meter("ipa-helper"); - self.to_otlp(&meter); + + let counters = self.counters(); + counters.for_each(|(counter_name, counter_value)| { + let otlp_counter = meter.u64_counter(counter_name.key).init(); + + let attributes: Vec = counter_name + .labels() + .map(|l| KeyValue::new(l.name, l.val.to_string())) + .collect(); + + otlp_counter.add(counter_value, &attributes[..]); + }); let encoder = TextEncoder::new(); let metric_families = registry.gather(); @@ -57,8 +51,9 @@ mod test { use std::thread; + use ipa_metrics::{counter, install_new_thread, MetricChannelType}; + use super::PrometheusMetricsExporter; - use crate::{counter, install_new_thread, MetricChannelType}; #[test] fn export_to_prometheus() { diff --git a/ipa-metrics-prometheus/src/lib.rs b/ipa-metrics-prometheus/src/lib.rs new file mode 100644 index 000000000..8f7123801 --- /dev/null +++ b/ipa-metrics-prometheus/src/lib.rs @@ -0,0 +1 @@ +mod exporter; \ No newline at end of file diff --git a/ipa-metrics/Cargo.toml b/ipa-metrics/Cargo.toml index 6483d4637..cc9ec52f0 100644 --- a/ipa-metrics/Cargo.toml +++ b/ipa-metrics/Cargo.toml @@ -13,13 +13,7 @@ partitions = [] crossbeam-channel = "0.5" # This crate uses raw entry API that is unstable in stdlib hashbrown = "0.15" -# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC) -opentelemetry = "0.24" -opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] } -opentelemetry-prometheus = { version = "0.17" } -prometheus = "0.13.3" # Fast non-collision-resistant hashing rustc-hash = "2.0.0" # logging tracing = "0.1" - diff --git a/ipa-metrics/src/lib.rs b/ipa-metrics/src/lib.rs index 1e2a08a73..2449d41a3 100644 --- a/ipa-metrics/src/lib.rs +++ b/ipa-metrics/src/lib.rs @@ -5,7 +5,6 @@ mod collector; mod context; mod controller; -mod exporter; mod key; mod kind; mod label; From 92c3a28281b84300e72f684c0b5b39b828a82c16 Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Tue, 26 Nov 2024 14:20:00 +0800 Subject: [PATCH 07/12] Wiring in logging_handle to the handler --- ipa-core/Cargo.toml | 6 +- ipa-core/src/app.rs | 9 +++ ipa-core/src/bin/helper.rs | 2 +- ipa-core/src/cli/metric_collector.rs | 18 ++++- ipa-core/src/helpers/transport/handler.rs | 6 ++ .../helpers/transport/in_memory/transport.rs | 3 +- ipa-core/src/helpers/transport/mod.rs | 20 +++++ ipa-core/src/helpers/transport/routing.rs | 1 + ipa-core/src/net/http_serde.rs | 33 +-------- ipa-core/src/net/server/handlers/mod.rs | 4 +- .../src/net/server/handlers/query/metrics.rs | 73 +++++-------------- ipa-core/src/net/server/handlers/query/mod.rs | 4 +- ipa-core/src/net/transport.rs | 3 +- ipa-core/src/test_fixture/app.rs | 12 ++- ipa-metrics-prometheus/src/exporter.rs | 6 +- ipa-metrics-prometheus/src/lib.rs | 4 +- 16 files changed, 96 insertions(+), 108 deletions(-) diff --git a/ipa-core/Cargo.toml b/ipa-core/Cargo.toml index d72754ddf..f384ae592 100644 --- a/ipa-core/Cargo.toml +++ b/ipa-core/Cargo.toml @@ -90,6 +90,7 @@ ipa-metrics = { path = "../ipa-metrics" } ipa-metrics-tracing = { optional = true, path = "../ipa-metrics-tracing" } ipa-step = { version = "*", path = "../ipa-step" } ipa-step-derive = { version = "*", path = "../ipa-step-derive" } +ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" } aes = "0.8.3" async-trait = "0.1.79" @@ -131,11 +132,7 @@ http-body-util = { version = "0.1.1", optional = true } http-body = { version = "1", optional = true } iai = { version = "0.1.1", optional = true } once_cell = "1.18" -opentelemetry = "0.24" -opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] } -opentelemetry-prometheus = { version = "0.17" } pin-project = "1.0" -prometheus = "0.13.3" rand = "0.8" rand_core = "0.6" rcgen = { version = "0.11.3", optional = true } @@ -181,6 +178,7 @@ rustls = { version = "0.23" } tempfile = "3" ipa-metrics-tracing = { path = "../ipa-metrics-tracing" } ipa-metrics = { path = "../ipa-metrics", features = ["partitions"] } +ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" } [lib] diff --git a/ipa-core/src/app.rs b/ipa-core/src/app.rs index 6b1032f72..7354e4b43 100644 --- a/ipa-core/src/app.rs +++ b/ipa-core/src/app.rs @@ -3,6 +3,7 @@ use std::sync::Weak; use async_trait::async_trait; use crate::{ + cli::LoggingHandle, executor::IpaRuntime, helpers::{ query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput}, @@ -65,6 +66,7 @@ struct Inner { /// the flamegraph mpc_transport: MpcTransportImpl, shard_transport: ShardTransportImpl, + logging_handle: LoggingHandle, } impl Setup { @@ -96,11 +98,13 @@ impl Setup { self, mpc_transport: MpcTransportImpl, shard_transport: ShardTransportImpl, + logging_handle: LoggingHandle, ) -> HelperApp { let app = Arc::new(Inner { query_processor: self.query_processor, mpc_transport, shard_transport, + logging_handle, }); self.mpc_handler .set_handler(Arc::downgrade(&app) as Weak>); @@ -278,6 +282,11 @@ impl RequestHandler for Inner { let query_id = ext_query_id(&req)?; HelperResponse::from(qp.kill(query_id)?) } + RouteId::Metrics => { + let logging_handler = &self.logging_handle; + let metrics_handle = &logging_handler.metrics_handle; + HelperResponse::from(metrics_handle.scrape_metrics()) + } }) } } diff --git a/ipa-core/src/bin/helper.rs b/ipa-core/src/bin/helper.rs index 8641f3547..b728a0fc6 100644 --- a/ipa-core/src/bin/helper.rs +++ b/ipa-core/src/bin/helper.rs @@ -264,7 +264,7 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B Some(shard_handler), ); - let _app = setup.connect(transport.clone(), shard_transport.clone()); + let _app = setup.connect(transport.clone(), shard_transport.clone(), logging_handle); let listener = create_listener(args.server_socket_fd)?; let shard_listener = create_listener(args.shard_server_socket_fd)?; diff --git a/ipa-core/src/cli/metric_collector.rs b/ipa-core/src/cli/metric_collector.rs index 8f9a374b4..5b8f07b27 100644 --- a/ipa-core/src/cli/metric_collector.rs +++ b/ipa-core/src/cli/metric_collector.rs @@ -3,13 +3,14 @@ use std::{io, thread, thread::JoinHandle}; use ipa_metrics::{ MetricChannelType, MetricsCollectorController, MetricsCurrentThreadContext, MetricsProducer, }; +use ipa_metrics_prometheus::PrometheusMetricsExporter; use tokio::runtime::Builder; /// Holds a reference to metrics controller and producer pub struct CollectorHandle { thread_handle: JoinHandle<()>, /// This will be used once we start consuming metrics - _controller: MetricsCollectorController, + controller: MetricsCollectorController, producer: MetricsProducer, } @@ -26,7 +27,7 @@ pub fn install_collector() -> io::Result { Ok(CollectorHandle { thread_handle: handle, - _controller: controller, + controller, producer, }) } @@ -53,4 +54,17 @@ impl CollectorHandle { .on_thread_stop(flush_fn) .on_thread_park(flush_fn) } + + /// Export the metrics to be consumed by metrics scraper, e.g. Prometheus + /// + /// # Panics + /// If metrics is not initialized + #[must_use] + pub fn scrape_metrics(&self) -> Vec { + let mut store = self.controller.snapshot().expect("Metrics must be set up"); + let mut buff = Vec::new(); + store.export(&mut buff); + + buff + } } diff --git a/ipa-core/src/helpers/transport/handler.rs b/ipa-core/src/helpers/transport/handler.rs index a87e47ab5..9a1f1b457 100644 --- a/ipa-core/src/helpers/transport/handler.rs +++ b/ipa-core/src/helpers/transport/handler.rs @@ -149,6 +149,12 @@ impl> From for HelperResponse { } } +impl From> for HelperResponse { + fn from(value: Vec) -> Self { + Self { body: value } + } +} + /// Union of error types returned by API operations. #[derive(thiserror::Error, Debug)] pub enum Error { diff --git a/ipa-core/src/helpers/transport/in_memory/transport.rs b/ipa-core/src/helpers/transport/in_memory/transport.rs index a2a1abea6..504921eb5 100644 --- a/ipa-core/src/helpers/transport/in_memory/transport.rs +++ b/ipa-core/src/helpers/transport/in_memory/transport.rs @@ -116,7 +116,8 @@ impl InMemoryTransport { | RouteId::QueryInput | RouteId::QueryStatus | RouteId::CompleteQuery - | RouteId::KillQuery => { + | RouteId::KillQuery + | RouteId::Metrics => { handler .as_ref() .expect("Handler is set") diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index 6b8341966..00021c62c 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -229,6 +229,26 @@ where fn extra(&self) -> Self::Params; } +impl RouteParams for RouteId { + type Params = &'static str; + + fn resource_identifier(&self) -> RouteId { + *self + } + + fn query_id(&self) -> NoQueryId { + NoQueryId + } + + fn gate(&self) -> NoStep { + NoStep + } + + fn extra(&self) -> Self::Params { + "" + } +} + impl RouteParams for (QueryId, Gate) { type Params = &'static str; diff --git a/ipa-core/src/helpers/transport/routing.rs b/ipa-core/src/helpers/transport/routing.rs index e851865ce..f27bccf5b 100644 --- a/ipa-core/src/helpers/transport/routing.rs +++ b/ipa-core/src/helpers/transport/routing.rs @@ -17,6 +17,7 @@ pub enum RouteId { QueryStatus, CompleteQuery, KillQuery, + Metrics, } /// The header/metadata of the incoming request. diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index c55dbe27c..47be7c27d 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -69,42 +69,11 @@ pub mod echo { } pub mod metrics { - use std::collections::HashMap; - use axum::body::Body; - use hyper::http::uri; use serde::{Deserialize, Serialize}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] - pub struct Request { - // pub headers: HashMap, - } - - impl Request { - pub fn new(// headers: HashMap, - ) -> Self { - Self { - // headers, - } - } - pub fn try_into_http_request( - self, - scheme: uri::Scheme, - authority: uri::Authority, - ) -> crate::net::http_serde::OutgoingRequest { - let uri = uri::Uri::builder() - .scheme(scheme) - .authority(authority) - .build()?; - - // let req = self - // .headers - // .into_iter() - // .fold(hyper::Request::get(uri), |req, (k, v)| req.header(k, v)); - let req = hyper::Request::get(uri); - Ok(req.body(Body::empty())?) - } - } + pub struct Request {} pub const AXUM_PATH: &str = "/metrics"; } diff --git a/ipa-core/src/net/server/handlers/mod.rs b/ipa-core/src/net/server/handlers/mod.rs index 5db0f20ae..3a30cb52d 100644 --- a/ipa-core/src/net/server/handlers/mod.rs +++ b/ipa-core/src/net/server/handlers/mod.rs @@ -11,9 +11,9 @@ pub fn mpc_router(transport: MpcHttpTransport) -> Router { http_serde::query::BASE_AXUM_PATH, Router::new() .merge(query::query_router(transport.clone())) - .merge(query::h2h_router(transport)), + .merge(query::h2h_router(transport.clone())), ) - .merge(query::metric_router()) + .merge(query::metric_router(transport)) } pub fn shard_router(transport: ShardHttpTransport) -> Router { diff --git a/ipa-core/src/net/server/handlers/query/metrics.rs b/ipa-core/src/net/server/handlers/query/metrics.rs index 0e9a19b5f..455011a3b 100644 --- a/ipa-core/src/net/server/handlers/query/metrics.rs +++ b/ipa-core/src/net/server/handlers/query/metrics.rs @@ -1,67 +1,28 @@ -use axum::{routing::get, Router}; +use axum::{routing::get, Extension, Router}; use hyper::StatusCode; -use opentelemetry::KeyValue; -use crate::net::{ - http_serde::{self}, - Error, +use crate::{ + helpers::{routing::RouteId, BodyStream}, + net::{ + http_serde::{self}, + Error, MpcHttpTransport, + }, }; -use opentelemetry::metrics::MeterProvider; -use opentelemetry_sdk::metrics::SdkMeterProvider; -use prometheus::{self, Encoder, TextEncoder}; - /// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent /// to the [`HttpTransport`]. -async fn handler(// transport: Extension, - // QueryConfigQueryParams(query_config): QueryConfigQueryParams, -) -> Result, Error> { - // match transport.dispatch(query_config, BodyStream::empty()).await { - // Ok(resp) => Ok(Json(resp.try_into()?)), - // Err(err @ ApiError::NewQuery(NewQueryError::State { .. })) => { - // Err(Error::application(StatusCode::CONFLICT, err)) - // } - // Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), - // } - - // TODO: Remove this dummy metrics and get metrics for scraper from ipa-metrics::PrometheusMetricsExporter (see ipa-metrics/exporter.rs) - - // create a new prometheus registry - let registry = prometheus::Registry::new(); - - // configure OpenTelemetry to use this registry - let exporter = opentelemetry_prometheus::exporter() - .with_registry(registry.clone()) - .build() - .unwrap(); - - // set up a meter to create instruments - let provider = SdkMeterProvider::builder().with_reader(exporter).build(); - let meter = provider.meter("ipa-helper"); - - // Use two instruments - let counter = meter - .u64_counter("a.counter") - .with_description("Counts things") - .init(); - let histogram = meter - .u64_histogram("a.histogram") - .with_description("Records values") - .init(); - - counter.add(100, &[KeyValue::new("key", "value")]); - histogram.record(100, &[KeyValue::new("key", "value")]); - - // Encode data as text or protobuf - let encoder = TextEncoder::new(); - let metric_families = registry.gather(); - let mut result = Vec::new(); - match encoder.encode(&metric_families, &mut result) { - Ok(()) => Ok(result), +async fn handler(transport: Extension) -> Result, Error> { + match transport + .dispatch(RouteId::Metrics, BodyStream::empty()) + .await + { + Ok(resp) => Ok(resp.into_body()), Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), } } -pub fn router() -> Router { - Router::new().route(http_serde::metrics::AXUM_PATH, get(handler)) +pub fn router(transport: MpcHttpTransport) -> Router { + Router::new() + .route(http_serde::metrics::AXUM_PATH, get(handler)) + .layer(Extension(transport)) } diff --git a/ipa-core/src/net/server/handlers/query/mod.rs b/ipa-core/src/net/server/handlers/query/mod.rs index 00ae72e71..a53b0696e 100644 --- a/ipa-core/src/net/server/handlers/query/mod.rs +++ b/ipa-core/src/net/server/handlers/query/mod.rs @@ -65,8 +65,8 @@ pub fn s2s_router(transport: ShardHttpTransport) -> Router { } /// Construct router for exporting metrics to metrics backend (e.g. Prometheus scraper) -pub fn metric_router() -> Router { - Router::new().merge(metrics::router()) +pub fn metric_router(transport: MpcHttpTransport) -> Router { + Router::new().merge(metrics::router(transport)) } /// Returns HTTP 401 Unauthorized if the request does not have valid authentication. diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index de632c48e..2d4a00cd8 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -114,7 +114,8 @@ impl HttpTransport { evt @ (RouteId::QueryInput | RouteId::ReceiveQuery | RouteId::QueryStatus - | RouteId::KillQuery) => { + | RouteId::KillQuery + | RouteId::Metrics) => { unimplemented!( "attempting to send client-specific request {evt:?} to another helper" ) diff --git a/ipa-core/src/test_fixture/app.rs b/ipa-core/src/test_fixture/app.rs index 1cf01cb38..d43620588 100644 --- a/ipa-core/src/test_fixture/app.rs +++ b/ipa-core/src/test_fixture/app.rs @@ -6,6 +6,7 @@ use typenum::Unsigned; use crate::{ app::AppConfig, + cli::{install_collector, LoggingHandle}, ff::Serializable, helpers::{ query::{QueryConfig, QueryInput}, @@ -68,8 +69,15 @@ impl Default for TestApp { let mpc_network = InMemoryMpcNetwork::new(handlers.map(Some)); let shard_network = InMemoryShardNetwork::with_shards(1); - let drivers = zip3(mpc_network.transports().each_ref(), setup) - .map(|(t, s)| s.connect(Clone::clone(t), shard_network.transport(t.identity(), 0))); + let drivers = zip3(mpc_network.transports().each_ref(), setup).map(|(t, s)| { + let metrics_handle = install_collector().unwrap(); + let logging_handle = LoggingHandle { metrics_handle }; + s.connect( + Clone::clone(t), + shard_network.transport(t.identity(), 0), + logging_handle, + ) + }); Self { drivers, diff --git a/ipa-metrics-prometheus/src/exporter.rs b/ipa-metrics-prometheus/src/exporter.rs index fd4c6ab9f..596f8a298 100644 --- a/ipa-metrics-prometheus/src/exporter.rs +++ b/ipa-metrics-prometheus/src/exporter.rs @@ -1,12 +1,10 @@ use std::io; -use opentelemetry::metrics::{Meter, MeterProvider}; -use opentelemetry::KeyValue; +use ipa_metrics::MetricsStore; +use opentelemetry::{metrics::MeterProvider, KeyValue}; use opentelemetry_sdk::metrics::SdkMeterProvider; use prometheus::{self, Encoder, TextEncoder}; -use ipa_metrics::MetricsStore; - pub trait PrometheusMetricsExporter { fn export(&mut self, w: &mut W); } diff --git a/ipa-metrics-prometheus/src/lib.rs b/ipa-metrics-prometheus/src/lib.rs index 8f7123801..b4cc164fc 100644 --- a/ipa-metrics-prometheus/src/lib.rs +++ b/ipa-metrics-prometheus/src/lib.rs @@ -1 +1,3 @@ -mod exporter; \ No newline at end of file +mod exporter; + +pub use exporter::PrometheusMetricsExporter; From 5d7c689276c43b25b65499ead679034f7800792f Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Mon, 9 Dec 2024 12:16:41 +0800 Subject: [PATCH 08/12] Move metrics out of query module, add test cases --- ipa-core/src/net/http_serde.rs | 16 +++++ ipa-core/src/net/server/handlers/metrics.rs | 62 +++++++++++++++++++ ipa-core/src/net/server/handlers/mod.rs | 5 +- .../src/net/server/handlers/query/metrics.rs | 28 --------- ipa-core/src/net/server/handlers/query/mod.rs | 6 -- ipa-metrics-prometheus/src/exporter.rs | 11 +++- ipa-metrics/src/partitioned.rs | 11 +++- 7 files changed, 100 insertions(+), 39 deletions(-) create mode 100644 ipa-core/src/net/server/handlers/metrics.rs delete mode 100644 ipa-core/src/net/server/handlers/query/metrics.rs diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index 6d99498e0..d93112520 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -70,11 +70,27 @@ pub mod echo { pub mod metrics { + use axum::{body::Body, http::uri}; use serde::{Deserialize, Serialize}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Request {} + impl Request { + pub fn try_into_http_request( + self, + scheme: uri::Scheme, + authority: uri::Authority, + ) -> crate::net::http_serde::OutgoingRequest { + let uri = uri::Builder::new() + .scheme(scheme) + .authority(authority) + .path_and_query(String::from(AXUM_PATH)) + .build()?; + Ok(hyper::Request::get(uri).body(Body::empty())?) + } + } + pub const AXUM_PATH: &str = "/metrics"; } diff --git a/ipa-core/src/net/server/handlers/metrics.rs b/ipa-core/src/net/server/handlers/metrics.rs new file mode 100644 index 000000000..832350105 --- /dev/null +++ b/ipa-core/src/net/server/handlers/metrics.rs @@ -0,0 +1,62 @@ +use axum::{routing::get, Extension, Router}; +use hyper::StatusCode; + +use crate::{ + helpers::{routing::RouteId, BodyStream}, + net::{ + http_serde::{self}, + Error, MpcHttpTransport, + }, +}; + +/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent +/// to the [`HttpTransport`]. +async fn handler(transport: Extension) -> Result, Error> { + match transport + .dispatch(RouteId::Metrics, BodyStream::empty()) + .await + { + Ok(resp) => Ok(resp.into_body()), + Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), + } +} + +pub fn router(transport: MpcHttpTransport) -> Router { + Router::new() + .route(http_serde::metrics::AXUM_PATH, get(handler)) + .layer(Extension(transport)) +} + +#[cfg(all(test, unit_test))] +mod tests { + use axum::{body::Body, http::uri::{Authority, Scheme}}; + use bytes::Buf; + use http_body_util::BodyExt; + use hyper::{Request, StatusCode}; + use serde_json::{json, Value}; + use tower::ServiceExt; + + use crate::{helpers::{make_owned_handler, routing::Addr, HelperIdentity, HelperResponse}, net::server::handlers::query::test_helpers::assert_success_with}; + + use super::*; + + #[tokio::test] + async fn happy_case() { + let handler = make_owned_handler( + move |addr: Addr, _data: BodyStream| async move { + println!("{:?}", addr.route); + let RouteId::Metrics = addr.route else { + panic!("unexpected call"); + }; + Ok(HelperResponse::from(Vec::new())) + }, + ); + let req = http_serde::metrics::Request { }; + let req = req + .try_into_http_request(Scheme::HTTP, Authority::from_static("localhost")) + .unwrap(); + + assert_success_with(req, handler).await; + + } +} diff --git a/ipa-core/src/net/server/handlers/mod.rs b/ipa-core/src/net/server/handlers/mod.rs index c8ab75875..a375b759b 100644 --- a/ipa-core/src/net/server/handlers/mod.rs +++ b/ipa-core/src/net/server/handlers/mod.rs @@ -1,5 +1,6 @@ mod echo; mod query; +mod metrics; use axum::Router; @@ -9,7 +10,9 @@ use crate::{ }; pub fn mpc_router(transport: MpcHttpTransport) -> Router { - echo::router().nest( + echo::router() + .merge(metrics::router(transport.clone())) + .nest( http_serde::query::BASE_AXUM_PATH, Router::new() .merge(query::query_router(transport.clone())) diff --git a/ipa-core/src/net/server/handlers/query/metrics.rs b/ipa-core/src/net/server/handlers/query/metrics.rs deleted file mode 100644 index 455011a3b..000000000 --- a/ipa-core/src/net/server/handlers/query/metrics.rs +++ /dev/null @@ -1,28 +0,0 @@ -use axum::{routing::get, Extension, Router}; -use hyper::StatusCode; - -use crate::{ - helpers::{routing::RouteId, BodyStream}, - net::{ - http_serde::{self}, - Error, MpcHttpTransport, - }, -}; - -/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent -/// to the [`HttpTransport`]. -async fn handler(transport: Extension) -> Result, Error> { - match transport - .dispatch(RouteId::Metrics, BodyStream::empty()) - .await - { - Ok(resp) => Ok(resp.into_body()), - Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), - } -} - -pub fn router(transport: MpcHttpTransport) -> Router { - Router::new() - .route(http_serde::metrics::AXUM_PATH, get(handler)) - .layer(Extension(transport)) -} diff --git a/ipa-core/src/net/server/handlers/query/mod.rs b/ipa-core/src/net/server/handlers/query/mod.rs index 12e36ae9c..8a9881bb7 100644 --- a/ipa-core/src/net/server/handlers/query/mod.rs +++ b/ipa-core/src/net/server/handlers/query/mod.rs @@ -1,7 +1,6 @@ mod create; mod input; mod kill; -mod metrics; mod prepare; mod results; mod status; @@ -67,11 +66,6 @@ pub fn s2s_router(transport: Arc>) -> Router { .layer(layer_fn(HelperAuthentication::<_, Shard>::new)) } -/// Construct router for exporting metrics to metrics backend (e.g. Prometheus scraper) -pub fn metric_router(transport: MpcHttpTransport) -> Router { - Router::new().merge(metrics::router(transport)) -} - /// Returns HTTP 401 Unauthorized if the request does not have valid authentication. /// /// Authentication information is carried via the `ClientIdentity` request extension. The extension diff --git a/ipa-metrics-prometheus/src/exporter.rs b/ipa-metrics-prometheus/src/exporter.rs index 596f8a298..fee9026e8 100644 --- a/ipa-metrics-prometheus/src/exporter.rs +++ b/ipa-metrics-prometheus/src/exporter.rs @@ -39,7 +39,6 @@ impl PrometheusMetricsExporter for MetricsStore { let encoder = TextEncoder::new(); let metric_families = registry.gather(); - // TODO: Handle error? encoder.encode(&metric_families, w).unwrap(); } } @@ -71,7 +70,15 @@ mod test { let mut buff = Vec::new(); store.export(&mut buff); + + let expected_result = "# TYPE bar_total counter +bar_total{otel_scope_name=\"ipa-helper\"} 1 +# TYPE baz_total counter +baz_total{otel_scope_name=\"ipa-helper\"} 4 +# HELP target_info Target metadata +# TYPE target_info gauge +target_info{service_name=\"unknown_service\",telemetry_sdk_language=\"rust\",telemetry_sdk_name=\"opentelemetry\",telemetry_sdk_version=\"0.24.1\"} 1\n"; let result = String::from_utf8(buff).unwrap(); - println!("Export to Prometheus: {result}"); + assert_eq!(result, expected_result); } } diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs index 065915cbe..70fd4df0e 100644 --- a/ipa-metrics/src/partitioned.rs +++ b/ipa-metrics/src/partitioned.rs @@ -121,8 +121,15 @@ impl PartitionedStore { self.get_mut(CurrentThreadContext::get()).counter(key) } - pub fn counters(&mut self) -> impl Iterator { - self.get_mut(CurrentThreadContext::get()).counters() + pub fn counters(&self) -> impl Iterator { + if let Some(partition) = CurrentThreadContext::get() { + return match self.inner.get(&partition) { + Some(store) => store.counters(), + None => self.default_store.counters() + } + } else { + self.default_store.counters() + } } #[must_use] From 76c66528cac3a95b0a85c5d688bdc9b7d34ce22f Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Tue, 10 Dec 2024 09:46:04 +0800 Subject: [PATCH 09/12] cargo fmt --- ipa-core/src/net/http_serde.rs | 16 ------------ ipa-core/src/net/server/handlers/metrics.rs | 28 ++++++++++----------- ipa-core/src/net/server/handlers/mod.rs | 16 ++++++------ ipa-core/src/net/transport.rs | 5 +++- ipa-metrics-prometheus/src/exporter.rs | 1 - ipa-metrics/src/partitioned.rs | 9 +++---- 6 files changed, 30 insertions(+), 45 deletions(-) diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index d93112520..6d99498e0 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -70,27 +70,11 @@ pub mod echo { pub mod metrics { - use axum::{body::Body, http::uri}; use serde::{Deserialize, Serialize}; #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Request {} - impl Request { - pub fn try_into_http_request( - self, - scheme: uri::Scheme, - authority: uri::Authority, - ) -> crate::net::http_serde::OutgoingRequest { - let uri = uri::Builder::new() - .scheme(scheme) - .authority(authority) - .path_and_query(String::from(AXUM_PATH)) - .build()?; - Ok(hyper::Request::get(uri).body(Body::empty())?) - } - } - pub const AXUM_PATH: &str = "/metrics"; } diff --git a/ipa-core/src/net/server/handlers/metrics.rs b/ipa-core/src/net/server/handlers/metrics.rs index 832350105..1c7d859b6 100644 --- a/ipa-core/src/net/server/handlers/metrics.rs +++ b/ipa-core/src/net/server/handlers/metrics.rs @@ -29,34 +29,34 @@ pub fn router(transport: MpcHttpTransport) -> Router { #[cfg(all(test, unit_test))] mod tests { - use axum::{body::Body, http::uri::{Authority, Scheme}}; - use bytes::Buf; - use http_body_util::BodyExt; - use hyper::{Request, StatusCode}; - use serde_json::{json, Value}; - use tower::ServiceExt; - - use crate::{helpers::{make_owned_handler, routing::Addr, HelperIdentity, HelperResponse}, net::server::handlers::query::test_helpers::assert_success_with}; + use axum::{ + body::Body, + http::uri::{self, Authority, Scheme}, + }; use super::*; + use crate::{ + helpers::{make_owned_handler, routing::Addr, HelperIdentity, HelperResponse}, + net::server::handlers::query::test_helpers::assert_success_with, + }; #[tokio::test] async fn happy_case() { let handler = make_owned_handler( move |addr: Addr, _data: BodyStream| async move { - println!("{:?}", addr.route); let RouteId::Metrics = addr.route else { panic!("unexpected call"); }; Ok(HelperResponse::from(Vec::new())) }, ); - let req = http_serde::metrics::Request { }; - let req = req - .try_into_http_request(Scheme::HTTP, Authority::from_static("localhost")) + let uri = uri::Builder::new() + .scheme(Scheme::HTTP) + .authority(Authority::from_static("localhost")) + .path_and_query(String::from("/metrics")) + .build() .unwrap(); - + let req = hyper::Request::get(uri).body(Body::empty()).unwrap(); assert_success_with(req, handler).await; - } } diff --git a/ipa-core/src/net/server/handlers/mod.rs b/ipa-core/src/net/server/handlers/mod.rs index a375b759b..2571a2be0 100644 --- a/ipa-core/src/net/server/handlers/mod.rs +++ b/ipa-core/src/net/server/handlers/mod.rs @@ -1,6 +1,6 @@ mod echo; -mod query; mod metrics; +mod query; use axum::Router; @@ -11,13 +11,13 @@ use crate::{ pub fn mpc_router(transport: MpcHttpTransport) -> Router { echo::router() - .merge(metrics::router(transport.clone())) - .nest( - http_serde::query::BASE_AXUM_PATH, - Router::new() - .merge(query::query_router(transport.clone())) - .merge(query::h2h_router(transport.inner_transport)), - ) + .merge(metrics::router(transport.clone())) + .nest( + http_serde::query::BASE_AXUM_PATH, + Router::new() + .merge(query::query_router(transport.clone())) + .merge(query::h2h_router(transport.inner_transport)), + ) } pub fn shard_router(transport: Arc>) -> Router { diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index bc0c4c41e..fda51ce70 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -115,7 +115,10 @@ impl HttpTransport { let req = serde_json::from_str(route.extra().borrow())?; self.clients[client_ix].status_match(req).await } - evt @ (RouteId::QueryInput | RouteId::ReceiveQuery | RouteId::KillQuery | RouteId::Metrics) => { + evt @ (RouteId::QueryInput + | RouteId::ReceiveQuery + | RouteId::KillQuery + | RouteId::Metrics) => { unimplemented!( "attempting to send client-specific request {evt:?} to another helper" ) diff --git a/ipa-metrics-prometheus/src/exporter.rs b/ipa-metrics-prometheus/src/exporter.rs index fee9026e8..8ba17f905 100644 --- a/ipa-metrics-prometheus/src/exporter.rs +++ b/ipa-metrics-prometheus/src/exporter.rs @@ -70,7 +70,6 @@ mod test { let mut buff = Vec::new(); store.export(&mut buff); - let expected_result = "# TYPE bar_total counter bar_total{otel_scope_name=\"ipa-helper\"} 1 # TYPE baz_total counter diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs index 70fd4df0e..3b2419fae 100644 --- a/ipa-metrics/src/partitioned.rs +++ b/ipa-metrics/src/partitioned.rs @@ -124,12 +124,11 @@ impl PartitionedStore { pub fn counters(&self) -> impl Iterator { if let Some(partition) = CurrentThreadContext::get() { return match self.inner.get(&partition) { - Some(store) => store.counters(), - None => self.default_store.counters() - } - } else { - self.default_store.counters() + Some(store) => store.counters(), + None => self.default_store.counters(), + }; } + self.default_store.counters() } #[must_use] From 7e1cb2bb7893298edbad9024c04a20d7ad278989 Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Wed, 11 Dec 2024 13:17:00 +0800 Subject: [PATCH 10/12] Add logging_handle to TestApp --- ipa-core/src/net/test.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ipa-core/src/net/test.rs b/ipa-core/src/net/test.rs index 00441e6bf..4497b1a35 100644 --- a/ipa-core/src/net/test.rs +++ b/ipa-core/src/net/test.rs @@ -263,7 +263,11 @@ impl TestApp { shard_server.start_on(&IpaRuntime::current(), self.shard_server.socket.take(), ()), ) .await; - setup.connect(transport, shard_transport) + + let metrics_handle = install_collector().unwrap(); + let logging_handle = LoggingHandle { metrics_handle }; + + setup.connect(transport, shard_transport, logging_handle) } } From dd2a174edf78ab9eb083e180f3cbe148cb860a80 Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Wed, 11 Dec 2024 13:52:33 +0800 Subject: [PATCH 11/12] imports --- ipa-core/src/net/test.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ipa-core/src/net/test.rs b/ipa-core/src/net/test.rs index 4497b1a35..6698f1301 100644 --- a/ipa-core/src/net/test.rs +++ b/ipa-core/src/net/test.rs @@ -22,6 +22,9 @@ use hyper::StatusCode; use once_cell::sync::Lazy; use rustls_pki_types::CertificateDer; +#[cfg(all(test, web_test, descriptive_gate))] +use crate::cli::{install_collector, LoggingHandle}; + use super::{ConnectionFlavor, HttpTransport, Shard}; use crate::{ config::{ From 822c659461914fe8beb2e75be19a481ab5dddb94 Mon Sep 17 00:00:00 2001 From: Shinta Liem Date: Wed, 11 Dec 2024 14:06:24 +0800 Subject: [PATCH 12/12] cargo fmt --- ipa-core/src/net/test.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ipa-core/src/net/test.rs b/ipa-core/src/net/test.rs index 6698f1301..12330f67a 100644 --- a/ipa-core/src/net/test.rs +++ b/ipa-core/src/net/test.rs @@ -22,10 +22,9 @@ use hyper::StatusCode; use once_cell::sync::Lazy; use rustls_pki_types::CertificateDer; +use super::{ConnectionFlavor, HttpTransport, Shard}; #[cfg(all(test, web_test, descriptive_gate))] use crate::cli::{install_collector, LoggingHandle}; - -use super::{ConnectionFlavor, HttpTransport, Shard}; use crate::{ config::{ ClientConfig, HpkeClientConfig, HpkeServerConfig, NetworkConfig, PeerConfig, ServerConfig,