diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 2333a07..5f41a4e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -87,7 +87,6 @@ jobs: uses: reviewdog/action-misspell@v1 with: github_token: ${{ secrets.github_token }} - locale: "US" cocogitto: name: cocogitto diff --git a/Cargo.toml b/Cargo.toml index 04f0b92..7a903d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ geoblock = ["geoip/middleware"] geoip = ["dep:geoip"] http = [] metrics = ["dep:metrics", "future/metrics", "alloc/metrics", "http/metrics"] +future_metrics = ["dep:future_metrics"] profiler = ["alloc/profiler"] rate_limit = ["dep:rate_limit"] @@ -47,6 +48,7 @@ future = { path = "./crates/future", optional = true } geoip = { path = "./crates/geoip", optional = true } http = { path = "./crates/http", optional = true } metrics = { path = "./crates/metrics", optional = true } +future_metrics = { path = "./crates/future_metrics", optional = true } rate_limit = { path = "./crates/rate_limit", optional = true } [dev-dependencies] diff --git a/crates/future_metrics/Cargo.toml b/crates/future_metrics/Cargo.toml new file mode 100644 index 0000000..710a22b --- /dev/null +++ b/crates/future_metrics/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "future_metrics" +version = "0.1.0" +edition = "2021" + +[dependencies] +pin-project = "1" +metrics = "0.23" diff --git a/crates/future_metrics/src/lib.rs b/crates/future_metrics/src/lib.rs new file mode 100644 index 0000000..a2db759 --- /dev/null +++ b/crates/future_metrics/src/lib.rs @@ -0,0 +1,215 @@ +use { + metrics::{Counter, Gauge, Histogram, Key, Label, Level, Metadata}, + std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, + }, +}; + +/// Target specified in [`metrics::Metadata`] for all metrics produced by this +/// crate. +pub const METADATA_TARGET: &str = "future_metrics"; + +/// Metric names used by this crate. +pub mod metric_name { + pub const FUTURE_DURATION: &str = "future_duration"; + pub const FUTURE_CANCELLED_DURATION: &str = "future_cancelled_duration"; + + pub const FUTURES_CREATED: &str = "futures_created_count"; + pub const FUTURES_STARTED: &str = "futures_started_count"; + pub const FUTURES_FINISHED: &str = "futures_finished_count"; + pub const FUTURES_CANCELLED: &str = "futures_cancelled_count"; + + pub const FUTURE_POLL_DURATION: &str = "future_poll_duration"; + pub const FUTURE_POLL_DURATION_MAX: &str = "future_poll_duration_max"; + pub const FUTURE_POLLS: &str = "future_polls_count"; +} + +/// Creates a new label identifying a future by its name. +pub const fn future_name(s: &'static str) -> Label { + Label::from_static_parts("future_name", s) +} + +pub trait FutureExt: Sized { + /// Consumes the future, returning a new future that records the executiion + /// metrics of the inner future. + /// + /// It is expected that you provide at least one label identifying the + /// future being metered. + /// Consider using [`future_name`] label, or the [`FutureExt::with_metrics`] + /// shortcut. + fn with_labeled_metrics(self, labels: &'static [Label]) -> Metered { + Metered::new(self, labels) + } + + /// A shortcut for [`FutureExt::with_labeled_metrics`] using a single label + /// only (presumably [`future_name`]). + fn with_metrics(self, label: &'static Label) -> Metered { + self.with_labeled_metrics(std::slice::from_ref(label)) + } +} + +impl FutureExt for F where F: Future {} + +#[pin_project::pin_project] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Metered { + #[pin] + future: F, + state: State, +} + +struct State { + started_at: Option, + is_finished: bool, + + poll_duration_sum: Duration, + poll_duration_max: Duration, + polls_count: usize, + + metrics: Metrics, +} + +impl Metered { + fn new(future: F, metric_labels: &'static [Label]) -> Self { + let metrics = Metrics::new(metric_labels); + + metrics.created.increment(1); + + Self { + future, + state: State { + started_at: None, + is_finished: false, + poll_duration_sum: Duration::from_secs(0), + poll_duration_max: Duration::from_secs(0), + polls_count: 0, + metrics, + }, + } + } +} + +impl Future for Metered { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut this = self.project(); + let state = &mut this.state; + + if state.started_at.is_none() { + state.started_at = Some(Instant::now()); + state.metrics.started.increment(1); + } + + let poll_started_at = Instant::now(); + let result = this.future.poll(cx); + let poll_duration = poll_started_at.elapsed(); + + state.poll_duration_sum += poll_duration; + state.poll_duration_max = state.poll_duration_max.max(poll_duration); + state.polls_count += 1; + + if result.is_ready() && !state.is_finished { + state.is_finished = true; + + state.metrics.finished.increment(1); + + if let Some(started_at) = state.started_at { + state.metrics.duration.record(started_at.elapsed()) + } + } + + result + } +} + +impl Drop for State { + fn drop(&mut self) { + if !self.is_finished { + self.metrics.cancelled.increment(1); + + if let Some(started_at) = self.started_at { + self.metrics.cancelled_duration.record(started_at.elapsed()) + } + } + + self.metrics + .poll_duration + .record(duration_as_millis_f64(self.poll_duration_sum)); + + self.metrics + .poll_duration_max + .set(duration_as_millis_f64(self.poll_duration_max)); + + self.metrics.polls.increment(self.polls_count as u64); + } +} + +struct Metrics { + duration: Histogram, + cancelled_duration: Histogram, + + created: Counter, + started: Counter, + finished: Counter, + cancelled: Counter, + + poll_duration: Histogram, + poll_duration_max: Gauge, + polls: Counter, +} + +impl Metrics { + fn new(labels: &'static [Label]) -> Self { + metrics::with_recorder(|r| { + let metadata = Metadata::new(METADATA_TARGET, Level::INFO, None); + + Self { + duration: r.register_histogram( + &Key::from_static_parts(metric_name::FUTURE_DURATION, labels), + &metadata, + ), + cancelled_duration: r.register_histogram( + &Key::from_static_parts(metric_name::FUTURE_CANCELLED_DURATION, labels), + &metadata, + ), + created: r.register_counter( + &Key::from_static_parts(metric_name::FUTURES_CREATED, labels), + &metadata, + ), + started: r.register_counter( + &Key::from_static_parts(metric_name::FUTURES_STARTED, labels), + &metadata, + ), + finished: r.register_counter( + &Key::from_static_parts(metric_name::FUTURES_FINISHED, labels), + &metadata, + ), + cancelled: r.register_counter( + &Key::from_static_parts(metric_name::FUTURES_CANCELLED, labels), + &metadata, + ), + poll_duration: r.register_histogram( + &Key::from_static_parts(metric_name::FUTURE_POLL_DURATION, labels), + &metadata, + ), + poll_duration_max: r.register_gauge( + &Key::from_static_parts(metric_name::FUTURE_POLL_DURATION_MAX, labels), + &metadata, + ), + polls: r.register_counter( + &Key::from_static_parts(metric_name::FUTURE_POLLS, labels), + &metadata, + ), + } + }) + } +} + +#[inline] +pub fn duration_as_millis_f64(val: Duration) -> f64 { + val.as_secs_f64() * 1000.0 +} diff --git a/src/lib.rs b/src/lib.rs index a16c71a..a9204a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,8 @@ pub use analytics; pub use collections; #[cfg(feature = "future")] pub use future; +#[cfg(feature = "future_metrics")] +pub use future_metrics; #[cfg(feature = "geoip")] pub use geoip; #[cfg(feature = "http")]