Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/prometheus/client_rust in…
Browse files Browse the repository at this point in the history
…to document-protobuf
  • Loading branch information
mxinden committed Jul 15, 2024
2 parents 762a155 + a914fc9 commit 5957f56
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 47 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.22.3] - unreleased

### Added

- Added `encode_registry` and `encode_eof` functions to `text` module.
See [PR 205].

[PR 205]: https://github.com/prometheus/client_rust/pull/205

- Support all platforms with 32 bit atomics lacking 64 bit atomics.
See [PR 203].

[PR 203]: https://github.com/prometheus/client_rust/pull/203

## [0.22.2]

### Added
Expand Down
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "prometheus-client"
version = "0.22.2"
version = "0.22.3"
authors = ["Max Inden <[email protected]>"]
edition = "2021"
description = "Open Metrics client library allowing users to natively instrument applications."
Expand Down Expand Up @@ -35,7 +35,9 @@ rand = "0.8.4"
tide = "0.16"
actix-web = "4"
tokio = { version = "1", features = ["rt-multi-thread", "net", "macros", "signal"] }
hyper = { version = "0.14.16", features = ["server", "http1", "tcp"] }
hyper = { version = "1.3.1", features = ["server", "http1"] }
hyper-util = { version = "0.1.3", features = ["tokio"] }
http-body-util = "0.1.1"

[build-dependencies]
prost-build = { version = "0.11.0", optional = true }
Expand Down
2 changes: 2 additions & 0 deletions examples/actix-web.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Mutex;

use actix_web::middleware::Compress;
use actix_web::{web, App, HttpResponse, HttpServer, Responder, Result};
use prometheus_client::encoding::text::encode;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};
Expand Down Expand Up @@ -61,6 +62,7 @@ async fn main() -> std::io::Result<()> {

HttpServer::new(move || {
App::new()
.wrap(Compress::default())
.app_data(metrics.clone())
.app_data(state.clone())
.service(web::resource("/metrics").route(web::get().to(metrics_handler)))
Expand Down
62 changes: 42 additions & 20 deletions examples/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use http_body_util::{combinators, BodyExt, Full};
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
body::{Bytes, Incoming},
server::conn::http1,
service::service_fn,
Request, Response,
};
use hyper_util::rt::TokioIo;
use prometheus_client::{encoding::text::encode, metrics::counter::Counter, registry::Registry};
use std::{
future::Future,
Expand All @@ -10,7 +14,11 @@ use std::{
pin::Pin,
sync::Arc,
};
use tokio::signal::unix::{signal, SignalKind};
use tokio::{
net::TcpListener,
pin,
signal::unix::{signal, SignalKind},
};

#[tokio::main]
async fn main() {
Expand All @@ -31,39 +39,48 @@ async fn main() {

/// Start a HTTP server to report metrics.
pub async fn start_metrics_server(metrics_addr: SocketAddr, registry: Registry) {
let mut shutdown_stream = signal(SignalKind::terminate()).unwrap();

eprintln!("Starting metrics server on {metrics_addr}");

let registry = Arc::new(registry);
Server::bind(&metrics_addr)
.serve(make_service_fn(move |_conn| {
let registry = registry.clone();
async move {
let handler = make_handler(registry);
Ok::<_, io::Error>(service_fn(handler))

let tcp_listener = TcpListener::bind(metrics_addr).await.unwrap();
let server = http1::Builder::new();
while let Ok((stream, _)) = tcp_listener.accept().await {
let mut shutdown_stream = signal(SignalKind::terminate()).unwrap();
let io = TokioIo::new(stream);
let server_clone = server.clone();
let registry_clone = registry.clone();
tokio::task::spawn(async move {
let conn = server_clone.serve_connection(io, service_fn(make_handler(registry_clone)));
pin!(conn);
tokio::select! {
_ = conn.as_mut() => {}
_ = shutdown_stream.recv() => {
conn.as_mut().graceful_shutdown();
}
}
}))
.with_graceful_shutdown(async move {
shutdown_stream.recv().await;
})
.await
.unwrap();
});
}
}

/// Boxed HTTP body for responses
type BoxBody = combinators::BoxBody<Bytes, hyper::Error>;

/// This function returns a HTTP handler (i.e. another function)
pub fn make_handler(
registry: Arc<Registry>,
) -> impl Fn(Request<Body>) -> Pin<Box<dyn Future<Output = io::Result<Response<Body>>> + Send>> {
) -> impl Fn(Request<Incoming>) -> Pin<Box<dyn Future<Output = io::Result<Response<BoxBody>>> + Send>>
{
// This closure accepts a request and responds with the OpenMetrics encoding of our metrics.
move |_req: Request<Body>| {
move |_req: Request<Incoming>| {
let reg = registry.clone();

Box::pin(async move {
let mut buf = String::new();
encode(&mut buf, &reg.clone())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
.map(|_| {
let body = Body::from(buf);
let body = full(Bytes::from(buf));
Response::builder()
.header(
hyper::header::CONTENT_TYPE,
Expand All @@ -75,3 +92,8 @@ pub fn make_handler(
})
}
}

/// helper function to build a full boxed body
pub fn full(body: Bytes) -> BoxBody {
Full::new(body).map_err(|never| match never {}).boxed()
}
204 changes: 194 additions & 10 deletions src/encoding/text.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Open Metrics text format implementation.
//!
//! ```
//! # use prometheus_client::encoding::text::encode;
//! # use prometheus_client::encoding::text::{encode, encode_registry, encode_eof};
//! # use prometheus_client::metrics::counter::Counter;
//! # use prometheus_client::registry::Registry;
//! #
Expand All @@ -15,13 +15,26 @@
//! # );
//! # counter.inc();
//! let mut buffer = String::new();
//!
//! // Encode the complete OpenMetrics exposition into the message buffer
//! encode(&mut buffer, &registry).unwrap();
//! let expected_msg = "# HELP my_counter This is my counter.\n".to_owned() +
//! "# TYPE my_counter counter\n" +
//! "my_counter_total 1\n" +
//! "# EOF\n";
//! assert_eq!(expected_msg, buffer);
//! buffer.clear();
//!
//! // Encode just the registry into the message buffer
//! encode_registry(&mut buffer, &registry).unwrap();
//! let expected_reg = "# HELP my_counter This is my counter.\n".to_owned() +
//! "# TYPE my_counter counter\n" +
//! "my_counter_total 1\n";
//! assert_eq!(expected_reg, buffer);
//!
//! let expected = "# HELP my_counter This is my counter.\n".to_owned() +
//! "# TYPE my_counter counter\n" +
//! "my_counter_total 1\n" +
//! "# EOF\n";
//! assert_eq!(expected, buffer);
//! // Encode EOF marker into message buffer to complete the OpenMetrics exposition
//! encode_eof(&mut buffer).unwrap();
//! assert_eq!(expected_msg, buffer);
//! ```
use crate::encoding::{EncodeExemplarValue, EncodeLabelSet};
Expand All @@ -33,15 +46,140 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Write;

/// Encode both the metrics registered with the provided [`Registry`] and the
/// EOF marker into the provided [`Write`]r using the OpenMetrics text format.
///
/// Note: This function encodes the **complete** OpenMetrics exposition.
///
/// Use [`encode_registry`] or [`encode_eof`] if partial encoding is needed.
///
/// See [OpenMetrics exposition format](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#text-format)
/// for additional details.
///
/// # Examples
///
/// ```no_run
/// # use prometheus_client::encoding::text::encode;
/// # use prometheus_client::metrics::counter::Counter;
/// # use prometheus_client::metrics::gauge::Gauge;
/// # use prometheus_client::registry::Registry;
/// #
/// // Initialize registry with metric families
/// let mut registry = Registry::default();
/// let counter: Counter = Counter::default();
/// registry.register(
/// "my_counter",
/// "This is my counter",
/// counter.clone(),
/// );
/// let gauge: Gauge = Gauge::default();
/// registry.register(
/// "my_gauge",
/// "This is my gauge",
/// gauge.clone(),
/// );
///
/// // Encode the complete OpenMetrics exposition into the buffer
/// let mut buffer = String::new();
/// encode(&mut buffer, &registry)?;
/// # Ok::<(), std::fmt::Error>(())
/// ```
pub fn encode<W>(writer: &mut W, registry: &Registry) -> Result<(), std::fmt::Error>
where
W: Write,
{
encode_registry(writer, registry)?;
encode_eof(writer)
}

/// Encode the metrics registered with the provided [`Registry`] into the
/// provided [`Write`]r using the OpenMetrics text format.
pub fn encode<W>(writer: &mut W, registry: &Registry) -> Result<(), std::fmt::Error>
///
/// Note: The OpenMetrics exposition requires that a complete message must end
/// with an EOF marker.
///
/// This function may be called repeatedly for the HTTP scrape response until
/// [`encode_eof`] signals the end of the response.
///
/// This may also be used to compose a partial message with metrics assembled
/// from multiple registries.
///
/// # Examples
///
/// ```no_run
/// # use prometheus_client::encoding::text::encode_registry;
/// # use prometheus_client::metrics::counter::Counter;
/// # use prometheus_client::metrics::gauge::Gauge;
/// # use prometheus_client::registry::Registry;
/// #
/// // Initialize registry with a counter
/// let mut reg_counter = Registry::default();
/// let counter: Counter = Counter::default();
/// reg_counter.register(
/// "my_counter",
/// "This is my counter",
/// counter.clone(),
/// );
///
/// // Encode the counter registry into the buffer
/// let mut buffer = String::new();
/// encode_registry(&mut buffer, &reg_counter)?;
///
/// // Initialize another registry but with a gauge
/// let mut reg_gauge = Registry::default();
/// let gauge: Gauge = Gauge::default();
/// reg_gauge.register(
/// "my_gauge",
/// "This is my gauge",
/// gauge.clone(),
/// );
///
/// // Encode the gauge registry into the buffer
/// encode_registry(&mut buffer, &reg_gauge)?;
/// # Ok::<(), std::fmt::Error>(())
/// ```
pub fn encode_registry<W>(writer: &mut W, registry: &Registry) -> Result<(), std::fmt::Error>
where
W: Write,
{
registry.encode(&mut DescriptorEncoder::new(writer).into())?;
writer.write_str("# EOF\n")?;
Ok(())
registry.encode(&mut DescriptorEncoder::new(writer).into())
}

/// Encode the EOF marker into the provided [`Write`]r using the OpenMetrics
/// text format.
///
/// Note: This function is used to mark/signal the end of the exposition.
///
/// # Examples
///
/// ```no_run
/// # use prometheus_client::encoding::text::{encode_registry, encode_eof};
/// # use prometheus_client::metrics::counter::Counter;
/// # use prometheus_client::metrics::gauge::Gauge;
/// # use prometheus_client::registry::Registry;
/// #
/// // Initialize registry with a counter
/// let mut registry = Registry::default();
/// let counter: Counter = Counter::default();
/// registry.register(
/// "my_counter",
/// "This is my counter",
/// counter.clone(),
/// );
///
/// // Encode registry into the buffer
/// let mut buffer = String::new();
/// encode_registry(&mut buffer, &registry)?;
///
/// // Encode EOF marker to complete the message
/// encode_eof(&mut buffer)?;
/// # Ok::<(), std::fmt::Error>(())
/// ```
pub fn encode_eof<W>(writer: &mut W) -> Result<(), std::fmt::Error>
where
W: Write,
{
writer.write_str("# EOF\n")
}

pub(crate) struct DescriptorEncoder<'a> {
Expand Down Expand Up @@ -915,6 +1053,52 @@ mod tests {
parse_with_python_client(encoded);
}

#[test]
fn encode_registry_eof() {
let mut orders_registry = Registry::default();

let total_orders: Counter<u64> = Default::default();
orders_registry.register("orders", "Total orders received", total_orders.clone());
total_orders.inc();

let processing_times = Histogram::new(exponential_buckets(1.0, 2.0, 10));
orders_registry.register_with_unit(
"processing_times",
"Order times",
Unit::Seconds,
processing_times.clone(),
);
processing_times.observe(2.4);

let mut user_auth_registry = Registry::default();

let successful_logins: Counter<u64> = Default::default();
user_auth_registry.register(
"successful_logins",
"Total successful logins",
successful_logins.clone(),
);
successful_logins.inc();

let failed_logins: Counter<u64> = Default::default();
user_auth_registry.register(
"failed_logins",
"Total failed logins",
failed_logins.clone(),
);

let mut response = String::new();

encode_registry(&mut response, &orders_registry).unwrap();
assert_eq!(&response[response.len() - 20..], "bucket{le=\"+Inf\"} 1\n");

encode_registry(&mut response, &user_auth_registry).unwrap();
assert_eq!(&response[response.len() - 20..], "iled_logins_total 0\n");

encode_eof(&mut response).unwrap();
assert_eq!(&response[response.len() - 20..], "ogins_total 0\n# EOF\n");
}

fn parse_with_python_client(input: String) {
pyo3::prepare_freethreaded_python();

Expand Down
Loading

0 comments on commit 5957f56

Please sign in to comment.