diff --git a/ipa-core/Cargo.toml b/ipa-core/Cargo.toml index 496699e34..a1c922453 100644 --- a/ipa-core/Cargo.toml +++ b/ipa-core/Cargo.toml @@ -88,6 +88,7 @@ ipa-metrics = { path = "../ipa-metrics" } ipa-metrics-tracing = { optional = true, path = "../ipa-metrics-tracing" } ipa-step = { version = "*", path = "../ipa-step" } ipa-step-derive = { version = "*", path = "../ipa-step-derive" } +ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" } aes = "0.8.3" async-trait = "0.1.79" @@ -176,6 +177,7 @@ rustls = { version = "0.23" } tempfile = "3" ipa-metrics-tracing = { path = "../ipa-metrics-tracing" } ipa-metrics = { path = "../ipa-metrics", features = ["partitions"] } +ipa-metrics-prometheus = { path = "../ipa-metrics-prometheus" } [lib] diff --git a/ipa-core/src/app.rs b/ipa-core/src/app.rs index 0d61e9bb6..b20a71c65 100644 --- a/ipa-core/src/app.rs +++ b/ipa-core/src/app.rs @@ -3,6 +3,7 @@ use std::sync::Weak; use async_trait::async_trait; use crate::{ + cli::LoggingHandle, executor::IpaRuntime, helpers::{ query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput}, @@ -65,6 +66,7 @@ struct Inner { /// the flamegraph mpc_transport: MpcTransportImpl, shard_transport: ShardTransportImpl, + logging_handle: LoggingHandle, } impl Setup { @@ -96,11 +98,13 @@ impl Setup { self, mpc_transport: MpcTransportImpl, shard_transport: ShardTransportImpl, + logging_handle: LoggingHandle, ) -> HelperApp { let app = Arc::new(Inner { query_processor: self.query_processor, mpc_transport, shard_transport, + logging_handle, }); self.mpc_handler .set_handler(Arc::downgrade(&app) as Weak>); @@ -277,6 +281,11 @@ impl RequestHandler for Inner { let query_id = ext_query_id(&req)?; HelperResponse::from(qp.kill(query_id)?) } + RouteId::Metrics => { + let logging_handler = &self.logging_handle; + let metrics_handle = &logging_handler.metrics_handle; + HelperResponse::from(metrics_handle.scrape_metrics()) + } }) } } diff --git a/ipa-core/src/bin/helper.rs b/ipa-core/src/bin/helper.rs index 85b0c7110..e6396ff97 100644 --- a/ipa-core/src/bin/helper.rs +++ b/ipa-core/src/bin/helper.rs @@ -271,7 +271,7 @@ async fn server(args: ServerArgs, logging_handle: LoggingHandle) -> Result<(), B Some(shard_handler), ); - let _app = setup.connect(transport.clone(), shard_transport.clone()); + let _app = setup.connect(transport.clone(), shard_transport.clone(), logging_handle); let listener = create_listener(args.server_socket_fd)?; let shard_listener = create_listener(args.shard_server_socket_fd)?; diff --git a/ipa-core/src/cli/metric_collector.rs b/ipa-core/src/cli/metric_collector.rs index 8f9a374b4..5b8f07b27 100644 --- a/ipa-core/src/cli/metric_collector.rs +++ b/ipa-core/src/cli/metric_collector.rs @@ -3,13 +3,14 @@ use std::{io, thread, thread::JoinHandle}; use ipa_metrics::{ MetricChannelType, MetricsCollectorController, MetricsCurrentThreadContext, MetricsProducer, }; +use ipa_metrics_prometheus::PrometheusMetricsExporter; use tokio::runtime::Builder; /// Holds a reference to metrics controller and producer pub struct CollectorHandle { thread_handle: JoinHandle<()>, /// This will be used once we start consuming metrics - _controller: MetricsCollectorController, + controller: MetricsCollectorController, producer: MetricsProducer, } @@ -26,7 +27,7 @@ pub fn install_collector() -> io::Result { Ok(CollectorHandle { thread_handle: handle, - _controller: controller, + controller, producer, }) } @@ -53,4 +54,17 @@ impl CollectorHandle { .on_thread_stop(flush_fn) .on_thread_park(flush_fn) } + + /// Export the metrics to be consumed by metrics scraper, e.g. Prometheus + /// + /// # Panics + /// If metrics is not initialized + #[must_use] + pub fn scrape_metrics(&self) -> Vec { + let mut store = self.controller.snapshot().expect("Metrics must be set up"); + let mut buff = Vec::new(); + store.export(&mut buff); + + buff + } } diff --git a/ipa-core/src/helpers/transport/handler.rs b/ipa-core/src/helpers/transport/handler.rs index a87e47ab5..9a1f1b457 100644 --- a/ipa-core/src/helpers/transport/handler.rs +++ b/ipa-core/src/helpers/transport/handler.rs @@ -149,6 +149,12 @@ impl> From for HelperResponse { } } +impl From> for HelperResponse { + fn from(value: Vec) -> Self { + Self { body: value } + } +} + /// Union of error types returned by API operations. #[derive(thiserror::Error, Debug)] pub enum Error { diff --git a/ipa-core/src/helpers/transport/in_memory/transport.rs b/ipa-core/src/helpers/transport/in_memory/transport.rs index a2a1abea6..504921eb5 100644 --- a/ipa-core/src/helpers/transport/in_memory/transport.rs +++ b/ipa-core/src/helpers/transport/in_memory/transport.rs @@ -116,7 +116,8 @@ impl InMemoryTransport { | RouteId::QueryInput | RouteId::QueryStatus | RouteId::CompleteQuery - | RouteId::KillQuery => { + | RouteId::KillQuery + | RouteId::Metrics => { handler .as_ref() .expect("Handler is set") diff --git a/ipa-core/src/helpers/transport/mod.rs b/ipa-core/src/helpers/transport/mod.rs index 6b8341966..00021c62c 100644 --- a/ipa-core/src/helpers/transport/mod.rs +++ b/ipa-core/src/helpers/transport/mod.rs @@ -229,6 +229,26 @@ where fn extra(&self) -> Self::Params; } +impl RouteParams for RouteId { + type Params = &'static str; + + fn resource_identifier(&self) -> RouteId { + *self + } + + fn query_id(&self) -> NoQueryId { + NoQueryId + } + + fn gate(&self) -> NoStep { + NoStep + } + + fn extra(&self) -> Self::Params { + "" + } +} + impl RouteParams for (QueryId, Gate) { type Params = &'static str; diff --git a/ipa-core/src/helpers/transport/routing.rs b/ipa-core/src/helpers/transport/routing.rs index c935704d4..138e496f9 100644 --- a/ipa-core/src/helpers/transport/routing.rs +++ b/ipa-core/src/helpers/transport/routing.rs @@ -24,6 +24,7 @@ pub enum RouteId { QueryStatus, CompleteQuery, KillQuery, + Metrics, } /// The header/metadata of the incoming request. diff --git a/ipa-core/src/net/http_serde.rs b/ipa-core/src/net/http_serde.rs index c7c3eb1a4..6d99498e0 100644 --- a/ipa-core/src/net/http_serde.rs +++ b/ipa-core/src/net/http_serde.rs @@ -68,6 +68,16 @@ pub mod echo { pub const AXUM_PATH: &str = "/echo"; } +pub mod metrics { + + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] + pub struct Request {} + + pub const AXUM_PATH: &str = "/metrics"; +} + pub mod query { use std::fmt::{Display, Formatter}; diff --git a/ipa-core/src/net/server/handlers/metrics.rs b/ipa-core/src/net/server/handlers/metrics.rs new file mode 100644 index 000000000..1c7d859b6 --- /dev/null +++ b/ipa-core/src/net/server/handlers/metrics.rs @@ -0,0 +1,62 @@ +use axum::{routing::get, Extension, Router}; +use hyper::StatusCode; + +use crate::{ + helpers::{routing::RouteId, BodyStream}, + net::{ + http_serde::{self}, + Error, MpcHttpTransport, + }, +}; + +/// Takes details from the HTTP request and creates a `[TransportCommand]::CreateQuery` that is sent +/// to the [`HttpTransport`]. +async fn handler(transport: Extension) -> Result, Error> { + match transport + .dispatch(RouteId::Metrics, BodyStream::empty()) + .await + { + Ok(resp) => Ok(resp.into_body()), + Err(err) => Err(Error::application(StatusCode::INTERNAL_SERVER_ERROR, err)), + } +} + +pub fn router(transport: MpcHttpTransport) -> Router { + Router::new() + .route(http_serde::metrics::AXUM_PATH, get(handler)) + .layer(Extension(transport)) +} + +#[cfg(all(test, unit_test))] +mod tests { + use axum::{ + body::Body, + http::uri::{self, Authority, Scheme}, + }; + + use super::*; + use crate::{ + helpers::{make_owned_handler, routing::Addr, HelperIdentity, HelperResponse}, + net::server::handlers::query::test_helpers::assert_success_with, + }; + + #[tokio::test] + async fn happy_case() { + let handler = make_owned_handler( + move |addr: Addr, _data: BodyStream| async move { + let RouteId::Metrics = addr.route else { + panic!("unexpected call"); + }; + Ok(HelperResponse::from(Vec::new())) + }, + ); + let uri = uri::Builder::new() + .scheme(Scheme::HTTP) + .authority(Authority::from_static("localhost")) + .path_and_query(String::from("/metrics")) + .build() + .unwrap(); + let req = hyper::Request::get(uri).body(Body::empty()).unwrap(); + assert_success_with(req, handler).await; + } +} diff --git a/ipa-core/src/net/server/handlers/mod.rs b/ipa-core/src/net/server/handlers/mod.rs index c8ab75875..2571a2be0 100644 --- a/ipa-core/src/net/server/handlers/mod.rs +++ b/ipa-core/src/net/server/handlers/mod.rs @@ -1,4 +1,5 @@ mod echo; +mod metrics; mod query; use axum::Router; @@ -9,12 +10,14 @@ use crate::{ }; pub fn mpc_router(transport: MpcHttpTransport) -> Router { - echo::router().nest( - http_serde::query::BASE_AXUM_PATH, - Router::new() - .merge(query::query_router(transport.clone())) - .merge(query::h2h_router(transport.inner_transport)), - ) + echo::router() + .merge(metrics::router(transport.clone())) + .nest( + http_serde::query::BASE_AXUM_PATH, + Router::new() + .merge(query::query_router(transport.clone())) + .merge(query::h2h_router(transport.inner_transport)), + ) } pub fn shard_router(transport: Arc>) -> Router { diff --git a/ipa-core/src/net/test.rs b/ipa-core/src/net/test.rs index 00441e6bf..12330f67a 100644 --- a/ipa-core/src/net/test.rs +++ b/ipa-core/src/net/test.rs @@ -23,6 +23,8 @@ use once_cell::sync::Lazy; use rustls_pki_types::CertificateDer; use super::{ConnectionFlavor, HttpTransport, Shard}; +#[cfg(all(test, web_test, descriptive_gate))] +use crate::cli::{install_collector, LoggingHandle}; use crate::{ config::{ ClientConfig, HpkeClientConfig, HpkeServerConfig, NetworkConfig, PeerConfig, ServerConfig, @@ -263,7 +265,11 @@ impl TestApp { shard_server.start_on(&IpaRuntime::current(), self.shard_server.socket.take(), ()), ) .await; - setup.connect(transport, shard_transport) + + let metrics_handle = install_collector().unwrap(); + let logging_handle = LoggingHandle { metrics_handle }; + + setup.connect(transport, shard_transport, logging_handle) } } diff --git a/ipa-core/src/net/transport.rs b/ipa-core/src/net/transport.rs index d8fbfc4b5..fda51ce70 100644 --- a/ipa-core/src/net/transport.rs +++ b/ipa-core/src/net/transport.rs @@ -115,7 +115,10 @@ impl HttpTransport { let req = serde_json::from_str(route.extra().borrow())?; self.clients[client_ix].status_match(req).await } - evt @ (RouteId::QueryInput | RouteId::ReceiveQuery | RouteId::KillQuery) => { + evt @ (RouteId::QueryInput + | RouteId::ReceiveQuery + | RouteId::KillQuery + | RouteId::Metrics) => { unimplemented!( "attempting to send client-specific request {evt:?} to another helper" ) diff --git a/ipa-core/src/test_fixture/app.rs b/ipa-core/src/test_fixture/app.rs index 1866cef74..1888b83e6 100644 --- a/ipa-core/src/test_fixture/app.rs +++ b/ipa-core/src/test_fixture/app.rs @@ -6,6 +6,7 @@ use typenum::Unsigned; use crate::{ app::AppConfig, + cli::{install_collector, LoggingHandle}, ff::Serializable, helpers::{ query::{QueryConfig, QueryInput}, @@ -68,8 +69,15 @@ impl Default for TestApp { let mpc_network = InMemoryMpcNetwork::new(handlers.map(Some)); let shard_network = InMemoryShardNetwork::with_shards(1); - let drivers = zip3(mpc_network.transports().each_ref(), setup) - .map(|(t, s)| s.connect(Clone::clone(t), shard_network.transport(t.identity(), 0))); + let drivers = zip3(mpc_network.transports().each_ref(), setup).map(|(t, s)| { + let metrics_handle = install_collector().unwrap(); + let logging_handle = LoggingHandle { metrics_handle }; + s.connect( + Clone::clone(t), + shard_network.transport(t.identity(), 0), + logging_handle, + ) + }); Self { drivers, diff --git a/ipa-metrics-prometheus/Cargo.toml b/ipa-metrics-prometheus/Cargo.toml new file mode 100644 index 000000000..36ce2be11 --- /dev/null +++ b/ipa-metrics-prometheus/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ipa-metrics-prometheus" +version = "0.1.0" +edition = "2021" + +[features] +default = [] + +[dependencies] +ipa-metrics = { path = "../ipa-metrics" } + +# Open telemetry crates: opentelemetry-prometheus crate implementation is based on Opentelemetry API and SDK 0.23. (TBC) +opentelemetry = "0.24" +opentelemetry_sdk = { version = "0.24", features = ["metrics", "rt-tokio"] } +opentelemetry-prometheus = { version = "0.17" } +prometheus = "0.13.3" diff --git a/ipa-metrics-prometheus/src/exporter.rs b/ipa-metrics-prometheus/src/exporter.rs new file mode 100644 index 000000000..8ba17f905 --- /dev/null +++ b/ipa-metrics-prometheus/src/exporter.rs @@ -0,0 +1,83 @@ +use std::io; + +use ipa_metrics::MetricsStore; +use opentelemetry::{metrics::MeterProvider, KeyValue}; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use prometheus::{self, Encoder, TextEncoder}; + +pub trait PrometheusMetricsExporter { + fn export(&mut self, w: &mut W); +} + +impl PrometheusMetricsExporter for MetricsStore { + fn export(&mut self, w: &mut W) { + // Setup prometheus registry and open-telemetry exporter + let registry = prometheus::Registry::new(); + + let exporter = opentelemetry_prometheus::exporter() + .with_registry(registry.clone()) + .build() + .unwrap(); + + let meter_provider = SdkMeterProvider::builder().with_reader(exporter).build(); + + // Convert the snapshot to otel struct + // TODO : We need to define a proper scope for the metrics + let meter = meter_provider.meter("ipa-helper"); + + let counters = self.counters(); + counters.for_each(|(counter_name, counter_value)| { + let otlp_counter = meter.u64_counter(counter_name.key).init(); + + let attributes: Vec = counter_name + .labels() + .map(|l| KeyValue::new(l.name, l.val.to_string())) + .collect(); + + otlp_counter.add(counter_value, &attributes[..]); + }); + + let encoder = TextEncoder::new(); + let metric_families = registry.gather(); + encoder.encode(&metric_families, w).unwrap(); + } +} + +#[cfg(test)] +mod test { + + use std::thread; + + use ipa_metrics::{counter, install_new_thread, MetricChannelType}; + + use super::PrometheusMetricsExporter; + + #[test] + fn export_to_prometheus() { + let (producer, controller, _) = install_new_thread(MetricChannelType::Rendezvous).unwrap(); + + thread::spawn(move || { + producer.install(); + counter!("baz", 4); + counter!("bar", 1); + let _ = producer.drop_handle(); + }) + .join() + .unwrap(); + + let mut store = controller.snapshot().unwrap(); + + let mut buff = Vec::new(); + store.export(&mut buff); + + let expected_result = "# TYPE bar_total counter +bar_total{otel_scope_name=\"ipa-helper\"} 1 +# TYPE baz_total counter +baz_total{otel_scope_name=\"ipa-helper\"} 4 +# HELP target_info Target metadata +# TYPE target_info gauge +target_info{service_name=\"unknown_service\",telemetry_sdk_language=\"rust\",telemetry_sdk_name=\"opentelemetry\",telemetry_sdk_version=\"0.24.1\"} 1\n"; + let result = String::from_utf8(buff).unwrap(); + assert_eq!(result, expected_result); + } +} diff --git a/ipa-metrics-prometheus/src/lib.rs b/ipa-metrics-prometheus/src/lib.rs new file mode 100644 index 000000000..b4cc164fc --- /dev/null +++ b/ipa-metrics-prometheus/src/lib.rs @@ -0,0 +1,3 @@ +mod exporter; + +pub use exporter::PrometheusMetricsExporter; diff --git a/ipa-metrics/Cargo.toml b/ipa-metrics/Cargo.toml index ebaeb9473..cc9ec52f0 100644 --- a/ipa-metrics/Cargo.toml +++ b/ipa-metrics/Cargo.toml @@ -17,4 +17,3 @@ hashbrown = "0.15" rustc-hash = "2.0.0" # logging tracing = "0.1" - diff --git a/ipa-metrics/src/partitioned.rs b/ipa-metrics/src/partitioned.rs index 0f71d0e28..3b2419fae 100644 --- a/ipa-metrics/src/partitioned.rs +++ b/ipa-metrics/src/partitioned.rs @@ -20,6 +20,7 @@ use hashbrown::hash_map::Entry; use rustc_hash::FxBuildHasher; use crate::{ + key::OwnedMetricName, kind::CounterValue, store::{CounterHandle, Store}, MetricName, @@ -120,6 +121,16 @@ impl PartitionedStore { self.get_mut(CurrentThreadContext::get()).counter(key) } + pub fn counters(&self) -> impl Iterator { + if let Some(partition) = CurrentThreadContext::get() { + return match self.inner.get(&partition) { + Some(store) => store.counters(), + None => self.default_store.counters(), + }; + } + self.default_store.counters() + } + #[must_use] pub fn len(&self) -> usize { self.inner.len() + self.default_store.len()