From 3b278a03492cf861440bd0b36d4fd39aeeae1f1a Mon Sep 17 00:00:00 2001 From: Vadim Smirnov <ukint-vs@proton.me> Date: Mon, 15 Jul 2024 17:21:26 +0300 Subject: [PATCH] feat: Add prometheus exporter and metrics service (#98) * feat: Add prometheus endpoint and metrics service * add docker prometheus and grafana * update Cargo.toml * rm monitoring * transition_commitments -> block_commitments --- Cargo.lock | 15 ++ Cargo.toml | 1 + hypercore/cli/Cargo.toml | 5 +- hypercore/cli/src/args.rs | 9 +- hypercore/cli/src/config.rs | 32 ++- hypercore/cli/src/main.rs | 1 + hypercore/cli/src/metrics.rs | 199 ++++++++++++++++++ hypercore/cli/src/params/mod.rs | 4 +- hypercore/cli/src/params/prometheus_params.rs | 67 ++++++ hypercore/cli/src/service.rs | 37 +++- hypercore/observer/src/lib.rs | 2 +- hypercore/observer/src/observer.rs | 34 +++ hypercore/sequencer/Cargo.toml | 1 + hypercore/sequencer/src/lib.rs | 42 +++- hypercore/utils/prometheus/Cargo.toml | 27 +++ hypercore/utils/prometheus/src/lib.rs | 175 +++++++++++++++ hypercore/utils/prometheus/src/sourced.rs | 168 +++++++++++++++ 17 files changed, 810 insertions(+), 9 deletions(-) create mode 100644 hypercore/cli/src/metrics.rs create mode 100644 hypercore/cli/src/params/prometheus_params.rs create mode 100644 hypercore/utils/prometheus/Cargo.toml create mode 100644 hypercore/utils/prometheus/src/lib.rs create mode 100644 hypercore/utils/prometheus/src/sourced.rs diff --git a/Cargo.lock b/Cargo.lock index 9c457c1ac03..3241831ac92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7254,6 +7254,7 @@ dependencies = [ "directories", "env_logger", "futures", + "futures-timer", "gprimitives", "hex", "hypercore-common", @@ -7262,9 +7263,11 @@ dependencies = [ "hypercore-network", "hypercore-observer", "hypercore-processor", + "hypercore-prometheus-endpoint", "hypercore-rpc", "hypercore-sequencer", "hypercore-signer", + "hypercore-utils", "hypercore-validator", "log", "parity-scale-codec", @@ -7400,6 +7403,17 @@ dependencies = [ "wasmtime", ] +[[package]] +name = "hypercore-prometheus-endpoint" +version = "1.4.2" +dependencies = [ + "hyper 0.14.27", + "log", + "prometheus", + "thiserror", + "tokio", +] + [[package]] name = "hypercore-rpc" version = "1.4.2" @@ -7462,6 +7476,7 @@ dependencies = [ "hypercore-signer", "log", "parity-scale-codec", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ac2f691a31f..eb9d92c207f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -290,6 +290,7 @@ hypercore-signer = { path = "hypercore/signer", default-features = false } hypercore-sequencer = { path = "hypercore/sequencer", default-features = false } hypercore-ethereum = { path = "hypercore/ethereum", default-features = false } hypercore-runtime-common = { path = "hypercore/runtime/common", default-features = false } +hypercore-prometheus-endpoint = { path = "hypercore/utils/prometheus", default-features = false } hypercore-utils = { path = "hypercore/utils", default-features = false } hypercore-validator = { path = "hypercore/validator", default-features = false } hypercore-rpc = { path = "hypercore/rpc", default-features = false } diff --git a/hypercore/cli/Cargo.toml b/hypercore/cli/Cargo.toml index cb3118e3bff..2eb03efe74d 100644 --- a/hypercore/cli/Cargo.toml +++ b/hypercore/cli/Cargo.toml @@ -23,6 +23,9 @@ hypercore-sequencer.workspace = true hypercore-ethereum.workspace = true hypercore-validator.workspace = true hypercore-common.workspace = true +hypercore-prometheus-endpoint.workspace = true +hypercore-rpc.workspace = true +hypercore-utils.workspace = true gprimitives.workspace = true clap = { workspace = true, features = ["derive"] } @@ -44,6 +47,6 @@ parity-scale-codec = { workspace = true, features = ["std", "derive"] } hex.workspace = true rand.workspace = true tempfile.workspace = true -hypercore-rpc.workspace = true +futures-timer.workspace = true static_init = "1.0.3" diff --git a/hypercore/cli/src/args.rs b/hypercore/cli/src/args.rs index 6b89efa8caa..491c0add4ee 100644 --- a/hypercore/cli/src/args.rs +++ b/hypercore/cli/src/args.rs @@ -18,7 +18,10 @@ //! CLI arguments in one place. -use crate::{config, params::NetworkParams}; +use crate::{ + config, + params::{NetworkParams, PrometheusParams}, +}; use anyhow::{anyhow, bail, Result}; use clap::{Parser, Subcommand}; use gprimitives::{ActorId, CodeId}; @@ -91,6 +94,10 @@ pub struct Args { #[clap(flatten)] pub network_params: NetworkParams, + #[allow(missing_docs)] + #[clap(flatten)] + pub prometheus_params: PrometheusParams, + #[command(subcommand)] pub extra_command: Option<ExtraCommands>, } diff --git a/hypercore/cli/src/config.rs b/hypercore/cli/src/config.rs index a5f38f68099..3ebfef03cd8 100644 --- a/hypercore/cli/src/config.rs +++ b/hypercore/cli/src/config.rs @@ -23,10 +23,13 @@ use crate::args::Args; use anyhow::{Context as _, Result}; use directories::ProjectDirs; use hypercore_network::NetworkConfiguration; +use hypercore_prometheus_endpoint::Registry; use hypercore_signer::PublicKey; -use std::path::PathBuf; +use std::{iter, net::SocketAddr, path::PathBuf}; use tempfile::TempDir; +const DEFAULT_PROMETHEUS_PORT: u16 = 9635; + #[static_init::dynamic(drop, lazy)] static mut BASE_PATH_TEMP: Option<TempDir> = None; @@ -44,6 +47,27 @@ pub enum ValidatorConfig { Disabled, } +/// Configuration of the Prometheus endpoint. +#[derive(Debug, Clone)] +pub struct PrometheusConfig { + /// Port to use. + pub port: SocketAddr, + /// A metrics registry to use. Useful for setting the metric prefix. + pub registry: Registry, +} + +impl PrometheusConfig { + /// Create a new config using the default registry. + pub fn new_with_default_registry(port: SocketAddr, chain_id: String) -> Self { + let param = iter::once((String::from("chain"), chain_id)).collect(); + Self { + port, + registry: Registry::new_custom(None, Some(param)) + .expect("this can only fail if the prefix is empty"), + } + } +} + #[derive(Debug)] pub struct Config { /// RPC of the Ethereum endpoint @@ -79,6 +103,9 @@ pub struct Config { // Network configuration pub net_config: NetworkConfiguration, + // Prometheus configuration + pub prometheus_config: Option<PrometheusConfig>, + /// RPC port pub rpc_port: u16, } @@ -133,6 +160,9 @@ impl TryFrom<Args> for Config { .unwrap_or(chain_spec.ethereum_router_address), max_commitment_depth: args.max_commitment_depth.unwrap_or(1000), net_config, + prometheus_config: args + .prometheus_params + .prometheus_config(DEFAULT_PROMETHEUS_PORT, "hypercore-dev".to_string()), database_path: base_path.join("db"), network_path: base_path.join("net"), key_path: base_path.join("key"), diff --git a/hypercore/cli/src/main.rs b/hypercore/cli/src/main.rs index 24373529d06..78de4c4e845 100644 --- a/hypercore/cli/src/main.rs +++ b/hypercore/cli/src/main.rs @@ -19,6 +19,7 @@ mod args; mod chain_spec; mod config; +mod metrics; mod params; mod service; diff --git a/hypercore/cli/src/metrics.rs b/hypercore/cli/src/metrics.rs new file mode 100644 index 00000000000..ffd8c5ae75e --- /dev/null +++ b/hypercore/cli/src/metrics.rs @@ -0,0 +1,199 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use std::time::SystemTime; + +use crate::config::Config; +use futures_timer::Delay; +use hypercore_observer::ObserverStatus; +use hypercore_prometheus_endpoint::{register, Gauge, Opts, PrometheusError, Registry, U64}; +use hypercore_sequencer::SequencerStatus; +use hypercore_utils::metrics::register_globals; +use std::time::{Duration, Instant}; +use tokio::sync::watch; + +struct PrometheusMetrics { + // generic info + eth_block_height: Gauge<U64>, + pending_upload_code: Gauge<U64>, + last_router_state: Gauge<U64>, + aggregated_commitments: Gauge<U64>, + submitted_code_commitments: Gauge<U64>, + submitted_block_commitments: Gauge<U64>, +} + +impl PrometheusMetrics { + fn setup(registry: &Registry, name: &str) -> Result<Self, PrometheusError> { + register( + Gauge::<U64>::with_opts( + Opts::new( + "hypercore_build_info", + "A metric with a constant '1' value labeled by name, version", + ) + .const_label("name", name), + )?, + registry, + )? + .set(1); + + register_globals(registry)?; + + let start_time_since_epoch = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default(); + register( + Gauge::<U64>::new( + "hypercore_process_start_time_seconds", + "Number of seconds between the UNIX epoch and the moment the process started", + )?, + registry, + )? + .set(start_time_since_epoch.as_secs()); + + Ok(Self { + // generic internals + eth_block_height: register( + Gauge::<U64>::new( + "hypercore_eth_block_height", + "Block height info of the ethereum observer", + )?, + registry, + )?, + + pending_upload_code: register( + Gauge::<U64>::new( + "hypercore_pending_upload_code", + "Pending upload code events of the ethereum observer", + )?, + registry, + )?, + + last_router_state: register( + Gauge::<U64>::new( + "hypercore_last_router_state", + "Block height of the latest state of the router contract", + )?, + registry, + )?, + + aggregated_commitments: register( + Gauge::<U64>::new( + "hypercore_aggregated_commitments", + "Number of commitments aggregated in sequencer", + )?, + registry, + )?, + + submitted_code_commitments: register( + Gauge::<U64>::new( + "hypercore_submitted_code_commitments", + "Number of submitted code commitments in sequencer", + )?, + registry, + )?, + + submitted_block_commitments: register( + Gauge::<U64>::new( + "hypercore_submitted_block_commitments", + "Number of submitted block commitments in sequencer", + )?, + registry, + )?, + }) + } +} + +/// A `MetricsService` periodically sends general client and +/// network state to the telemetry as well as (optionally) +/// a Prometheus endpoint. +pub struct MetricsService { + metrics: Option<PrometheusMetrics>, + last_update: Instant, +} + +impl MetricsService { + /// Creates a `MetricsService` that sends metrics + /// to prometheus alongside the telemetry. + pub fn with_prometheus(registry: &Registry, config: &Config) -> Result<Self, PrometheusError> { + PrometheusMetrics::setup(registry, &config.net_config.node_name).map(|p| MetricsService { + metrics: Some(p), + last_update: Instant::now(), + }) + } + + /// Returns a never-ending `Future` that performs the + /// metric and telemetry updates with information from + /// the given sources. + pub async fn run( + mut self, + mut observer_status: watch::Receiver<ObserverStatus>, + mut sequencer_status: Option<watch::Receiver<SequencerStatus>>, + ) { + let mut timer = Delay::new(Duration::from_secs(0)); + let timer_interval = Duration::from_secs(5); + + loop { + // Wait for the next tick of the timer. + (&mut timer).await; + + // Update / Send the metrics. + self.update( + *observer_status.borrow_and_update(), + sequencer_status.as_mut().map(|s| *s.borrow_and_update()), + ); + + // Schedule next tick. + timer.reset(timer_interval); + } + } + + fn update( + &mut self, + observer_status: ObserverStatus, + sequencer_status: Option<SequencerStatus>, + ) { + let now = Instant::now(); + self.last_update = now; + + let eth_number: u64 = observer_status.eth_block_number; + let pending_upload_code: u64 = observer_status.pending_upload_code; + let last_router_state: u64 = observer_status.last_router_state; + + if let Some(metrics) = self.metrics.as_ref() { + metrics.eth_block_height.set(eth_number); + metrics.pending_upload_code.set(pending_upload_code); + metrics.last_router_state.set(last_router_state); + log::trace!("Observer status: {:?}", observer_status); + if let Some(sequencer_status) = sequencer_status { + metrics + .aggregated_commitments + .set(sequencer_status.aggregated_commitments); + metrics + .submitted_code_commitments + .set(sequencer_status.submitted_code_commitments); + metrics + .submitted_block_commitments + .set(sequencer_status.submitted_block_commitments); + log::trace!("Sequencer status: {:?}", sequencer_status); + } + } + + // TODO: Use network status + // Update/send network status information, if any. + } +} diff --git a/hypercore/cli/src/params/mod.rs b/hypercore/cli/src/params/mod.rs index 11543e51662..888304b46db 100644 --- a/hypercore/cli/src/params/mod.rs +++ b/hypercore/cli/src/params/mod.rs @@ -16,6 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -pub mod network_params; +mod network_params; +mod prometheus_params; pub use network_params::*; +pub use prometheus_params::*; diff --git a/hypercore/cli/src/params/prometheus_params.rs b/hypercore/cli/src/params/prometheus_params.rs new file mode 100644 index 00000000000..956ba053b3a --- /dev/null +++ b/hypercore/cli/src/params/prometheus_params.rs @@ -0,0 +1,67 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use crate::config::PrometheusConfig; +use clap::Args; +use serde::Deserialize; +use std::net::{Ipv4Addr, SocketAddr}; + +/// Parameters used to config prometheus. +#[derive(Debug, Clone, Args, Deserialize)] +pub struct PrometheusParams { + /// Specify Prometheus exporter TCP Port. + #[arg(long, value_name = "PORT")] + pub prometheus_port: Option<u16>, + /// Expose Prometheus exporter on all interfaces. + /// + /// Default is local. + #[arg(long)] + pub prometheus_external: bool, + /// Do not expose a Prometheus exporter endpoint. + /// + /// Prometheus metric endpoint is enabled by default. + #[arg(long)] + pub no_prometheus: bool, +} + +impl PrometheusParams { + /// Creates [`PrometheusConfig`]. + pub fn prometheus_config( + &self, + default_listen_port: u16, + chain_id: String, + ) -> Option<PrometheusConfig> { + if self.no_prometheus { + None + } else { + let interface = if self.prometheus_external { + Ipv4Addr::UNSPECIFIED + } else { + Ipv4Addr::LOCALHOST + }; + + Some(PrometheusConfig::new_with_default_registry( + SocketAddr::new( + interface.into(), + self.prometheus_port.unwrap_or(default_listen_port), + ), + chain_id, + )) + } + } +} diff --git a/hypercore/cli/src/service.rs b/hypercore/cli/src/service.rs index 413b3e67459..cf7c76f6fcf 100644 --- a/hypercore/cli/src/service.rs +++ b/hypercore/cli/src/service.rs @@ -18,9 +18,12 @@ //! Main service in hypercore node. -use crate::config::{Config, SequencerConfig, ValidatorConfig}; +use crate::{ + config::{Config, PrometheusConfig, SequencerConfig, ValidatorConfig}, + metrics::MetricsService, +}; use anyhow::{anyhow, Ok, Result}; -use futures::{future, stream::StreamExt}; +use futures::{future, stream::StreamExt, FutureExt}; use gprimitives::H256; use hypercore_common::{events::BlockEvent, BlockCommitment, CodeCommitment, StateTransition}; use hypercore_db::{BlockHeader, BlockMetaStorage, CodeUploadInfo, CodesStorage, Database}; @@ -42,6 +45,7 @@ pub struct Service { signer: hypercore_signer::Signer, sequencer: Option<hypercore_sequencer::Sequencer>, validator: Option<hypercore_validator::Validator>, + metrics_service: Option<MetricsService>, rpc: hypercore_rpc::RpcService, } @@ -108,6 +112,19 @@ impl Service { SequencerConfig::Disabled => None, }; + // Prometheus metrics. + let metrics_service = if let Some(PrometheusConfig { port, registry }) = + config.prometheus_config.clone() + { + // Set static metrics. + let metrics = MetricsService::with_prometheus(®istry, config)?; + tokio::spawn(hypercore_prometheus_endpoint::init_prometheus(port, registry).map(drop)); + + Some(metrics) + } else { + None + }; + let validator = match config.validator { ValidatorConfig::Enabled(ref sign_tx_public) => { Some(hypercore_validator::Validator::new( @@ -132,6 +149,7 @@ impl Service { sequencer, signer, validator, + metrics_service, rpc, }) } @@ -315,11 +333,19 @@ impl Service { mut sequencer, signer: _signer, mut validator, + metrics_service, rpc, } = self; let network_service = network.service().clone(); + if let Some(metrics_service) = metrics_service { + tokio::spawn(metrics_service.run( + observer.get_status_receiver(), + sequencer.as_mut().map(|s| s.get_status_receiver()), + )); + } + let observer_events = observer.events(); futures::pin_mut!(observer_events); @@ -435,7 +461,8 @@ impl Service { #[cfg(test)] mod tests { use super::Service; - use crate::config::Config; + use crate::config::{Config, PrometheusConfig}; + use std::net::{Ipv4Addr, SocketAddr}; #[tokio::test] async fn basics() { @@ -448,6 +475,10 @@ mod tests { key_path: "/tmp/key".into(), network_path: "/tmp/net".into(), net_config: hypercore_network::NetworkConfiguration::new_local(), + prometheus_config: Some(PrometheusConfig::new_with_default_registry( + SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 9635), + "dev".to_string(), + )), sequencer: Default::default(), validator: Default::default(), sender_address: Default::default(), diff --git a/hypercore/observer/src/lib.rs b/hypercore/observer/src/lib.rs index aae7379aa9e..1fb09061464 100644 --- a/hypercore/observer/src/lib.rs +++ b/hypercore/observer/src/lib.rs @@ -25,5 +25,5 @@ mod query; pub use blobs::{BlobReader, ConsensusLayerBlobReader, MockBlobReader}; pub use event::{BlockData, CodeLoadedData, Event}; -pub use observer::Observer; +pub use observer::{Observer, ObserverStatus}; pub use query::Query; diff --git a/hypercore/observer/src/observer.rs b/hypercore/observer/src/observer.rs index 0f71108cd28..5d8310d312b 100644 --- a/hypercore/observer/src/observer.rs +++ b/hypercore/observer/src/observer.rs @@ -13,6 +13,7 @@ use hypercore_common::events::BlockEvent; use hypercore_ethereum::event::*; use hypercore_signer::Address as HypercoreAddress; use std::sync::Arc; +use tokio::sync::watch; pub(crate) type ObserverProvider = RootProvider<BoxTransport>; @@ -20,6 +21,15 @@ pub struct Observer { provider: ObserverProvider, router_address: Address, blob_reader: Arc<dyn BlobReader>, + status_sender: watch::Sender<ObserverStatus>, + status: ObserverStatus, +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct ObserverStatus { + pub eth_block_number: u64, + pub pending_upload_code: u64, + pub last_router_state: u64, } impl Observer { @@ -28,13 +38,27 @@ impl Observer { router_address: HypercoreAddress, blob_reader: Arc<dyn BlobReader>, ) -> Result<Self> { + let (status_sender, _status_receiver) = watch::channel(ObserverStatus::default()); Ok(Self { provider: ProviderBuilder::new().on_builtin(ethereum_rpc).await?, router_address: Address::new(router_address.0), blob_reader, + status: Default::default(), + status_sender, }) } + pub fn get_status_receiver(&self) -> watch::Receiver<ObserverStatus> { + self.status_sender.subscribe() + } + + fn update_status<F>(&mut self, update_fn: F) + where + F: FnOnce(&mut ObserverStatus), + { + update_fn(&mut self.status); + let _ = self.status_sender.send_replace(self.status); + } pub fn provider(&self) -> &ObserverProvider { &self.provider } @@ -72,12 +96,16 @@ impl Observer { } }; + let mut codes_len = 0; + // Create futures to load codes for event in events.iter() { let BlockEvent::UploadCode(pending_upload_code) = event else { continue }; + codes_len += 1; + let blob_reader = self.blob_reader.clone(); let origin = pending_upload_code.origin; let tx_hash = pending_upload_code.blob_tx(); @@ -95,6 +123,12 @@ impl Observer { }); } + self.update_status(|status| { + status.eth_block_number = block_number; + if codes_len > 0 { status.last_router_state = block_number }; + status.pending_upload_code = codes_len as u64; + }); + let block_data = BlockData { block_hash: H256(block_hash.0), parent_hash: H256(parent_hash.0), diff --git a/hypercore/sequencer/Cargo.toml b/hypercore/sequencer/Cargo.toml index 4ba67bdd0f4..83b2454f109 100644 --- a/hypercore/sequencer/Cargo.toml +++ b/hypercore/sequencer/Cargo.toml @@ -22,3 +22,4 @@ parity-scale-codec = { workspace = true, features = ["std", "derive"] } log.workspace = true anyhow.workspace = true futures.workspace = true +tokio.workspace = true diff --git a/hypercore/sequencer/src/lib.rs b/hypercore/sequencer/src/lib.rs index 7546f557f67..48d3bb32f69 100644 --- a/hypercore/sequencer/src/lib.rs +++ b/hypercore/sequencer/src/lib.rs @@ -27,6 +27,7 @@ use hypercore_ethereum::Ethereum; use hypercore_observer::Event; use hypercore_signer::{Address, PublicKey, Signer}; use std::mem; +use tokio::sync::watch; pub use agro::AggregatedCommitments; @@ -36,6 +37,13 @@ pub struct Config { pub router_address: Address, } +#[derive(Debug, Clone, Copy, Default)] +pub struct SequencerStatus { + pub aggregated_commitments: u64, + pub submitted_code_commitments: u64, + pub submitted_block_commitments: u64, +} + #[allow(unused)] pub struct Sequencer { signer: Signer, @@ -44,10 +52,13 @@ pub struct Sequencer { codes_aggregation: Aggregator<CodeCommitment>, blocks_aggregation: Aggregator<BlockCommitment>, ethereum: Ethereum, + status: SequencerStatus, + status_sender: watch::Sender<SequencerStatus>, } impl Sequencer { pub async fn new(config: &Config, signer: Signer) -> Result<Self> { + let (status_sender, _status_receiver) = watch::channel(SequencerStatus::default()); Ok(Self { signer: signer.clone(), ethereum_rpc: config.ethereum_rpc.clone(), @@ -61,11 +72,16 @@ impl Sequencer { config.sign_tx_public.to_address(), ) .await?, + status: SequencerStatus::default(), + status_sender, }) } // This function should never block. pub fn process_observer_event(&mut self, event: &Event) -> Result<()> { + self.update_status(|status| { + *status = SequencerStatus::default(); + }); match event { Event::Block(data) => { log::debug!( @@ -97,6 +113,8 @@ impl Sequencer { let mut codes_future = None; let mut blocks_future = None; + let mut code_commitments_len = 0; + let mut block_commitments_len = 0; let codes_aggregation = mem::replace(&mut self.codes_aggregation, Aggregator::new(1)); let blocks_aggregation = mem::replace(&mut self.blocks_aggregation, Aggregator::new(1)); @@ -110,6 +128,7 @@ impl Sequencer { if let Some(code_commitments) = codes_aggregation.find_root() { log::debug!("Achieved consensus on code commitments. Submitting..."); + code_commitments_len = code_commitments.commitments.len() as u64; codes_future = Some(self.submit_codes_commitment(code_commitments)); } else { log::debug!("No consensus on code commitments found. Discarding..."); @@ -124,7 +143,7 @@ impl Sequencer { if let Some(block_commitments) = blocks_aggregation.find_root() { log::debug!("Achieved consensus on transition commitments. Submitting..."); - + block_commitments_len = block_commitments.commitments.len() as u64; blocks_future = Some(self.submit_block_commitments(block_commitments)); } else { log::debug!("No consensus on code commitments found. Discarding..."); @@ -142,6 +161,11 @@ impl Sequencer { (None, None) => {} } + self.update_status(|status| { + status.submitted_code_commitments += code_commitments_len; + status.submitted_block_commitments += block_commitments_len; + }); + Ok(()) } @@ -201,6 +225,9 @@ impl Sequencer { origin: Address, commitments: AggregatedCommitments<CodeCommitment>, ) -> Result<()> { + self.update_status(|status| { + status.aggregated_commitments += 1; + }); log::debug!("Received codes commitment from {}", origin); self.codes_aggregation.push(origin, commitments); Ok(()) @@ -219,4 +246,17 @@ impl Sequencer { pub fn address(&self) -> Address { self.key.to_address() } + + pub fn get_status_receiver(&self) -> watch::Receiver<SequencerStatus> { + self.status_sender.subscribe() + } + + fn update_status<F>(&mut self, update_fn: F) + where + F: FnOnce(&mut SequencerStatus), + { + let mut status = self.status; + update_fn(&mut status); + let _ = self.status_sender.send_replace(status); + } } diff --git a/hypercore/utils/prometheus/Cargo.toml b/hypercore/utils/prometheus/Cargo.toml new file mode 100644 index 00000000000..7452f5bca0d --- /dev/null +++ b/hypercore/utils/prometheus/Cargo.toml @@ -0,0 +1,27 @@ +[package] +description = "Endpoint to expose Prometheus metrics" +name = "hypercore-prometheus-endpoint" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +hyper = { version = "0.14.16", default-features = false, features = [ + "http1", + "server", + "tcp", +] } +log = { workspace = true, default-features = true } +prometheus = { version = "0.13.0", default-features = false } +thiserror = { workspace = true } +tokio = { version = "1.22.0", features = ["parking_lot"] } + +[dev-dependencies] +hyper = { version = "0.14.16", features = ["client"] } +tokio = { version = "1.22.0", features = ["rt-multi-thread"] } diff --git a/hypercore/utils/prometheus/src/lib.rs b/hypercore/utils/prometheus/src/lib.rs new file mode 100644 index 00000000000..910b88cdd10 --- /dev/null +++ b/hypercore/utils/prometheus/src/lib.rs @@ -0,0 +1,175 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +use hyper::{ + http::StatusCode, + server::Server, + service::{make_service_fn, service_fn}, + Body, Request, Response, +}; +pub use prometheus::{ + self, + core::{ + AtomicF64 as F64, AtomicI64 as I64, AtomicU64 as U64, GenericCounter as Counter, + GenericCounterVec as CounterVec, GenericGauge as Gauge, GenericGaugeVec as GaugeVec, + }, + exponential_buckets, Error as PrometheusError, Histogram, HistogramOpts, HistogramVec, Opts, + Registry, +}; +use prometheus::{core::Collector, Encoder, TextEncoder}; +use std::net::SocketAddr; + +mod sourced; + +pub use sourced::{MetricSource, SourcedCounter, SourcedGauge, SourcedMetric}; + +pub fn register<T: Clone + Collector + 'static>( + metric: T, + registry: &Registry, +) -> Result<T, PrometheusError> { + registry.register(Box::new(metric.clone()))?; + Ok(metric) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Hyper internal error. + #[error(transparent)] + Hyper(#[from] hyper::Error), + + /// Http request error. + #[error(transparent)] + Http(#[from] hyper::http::Error), + + /// i/o error. + #[error(transparent)] + Io(#[from] std::io::Error), + + #[error("Prometheus port {0} already in use.")] + PortInUse(SocketAddr), +} + +async fn request_metrics(req: Request<Body>, registry: Registry) -> Result<Response<Body>, Error> { + if req.uri().path() == "/metrics" { + let metric_families = registry.gather(); + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", encoder.format_type()) + .body(Body::from(buffer)) + .map_err(Error::Http) + } else { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found.")) + .map_err(Error::Http) + } +} + +/// Initializes the metrics context, and starts an HTTP server +/// to serve metrics. +pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error> { + let listener = tokio::net::TcpListener::bind(&prometheus_addr) + .await + .map_err(|_| Error::PortInUse(prometheus_addr))?; + + init_prometheus_with_listener(listener, registry).await +} + +/// Init prometheus using the given listener. +async fn init_prometheus_with_listener( + listener: tokio::net::TcpListener, + registry: Registry, +) -> Result<(), Error> { + let listener = hyper::server::conn::AddrIncoming::from_listener(listener)?; + log::info!( + "〽️ Prometheus exporter started at {}", + listener.local_addr() + ); + + let service = make_service_fn(move |_| { + let registry = registry.clone(); + + async move { + Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| { + request_metrics(req, registry.clone()) + })) + } + }); + + let (signal, on_exit) = tokio::sync::oneshot::channel::<()>(); + let server = Server::builder(listener) + .serve(service) + .with_graceful_shutdown(async { + let _ = on_exit.await; + }); + + let result = server.await.map_err(Into::into); + + // Gracefully shutdown server, otherwise the server does not stop if it has open connections + let _ = signal.send(()); + + result +} + +#[cfg(test)] +mod tests { + use super::*; + use hyper::{Client, Uri}; + + #[test] + fn prometheus_works() { + const METRIC_NAME: &str = "test_test_metric_name_test_test"; + + let runtime = tokio::runtime::Runtime::new().expect("Creates the runtime"); + + let listener = runtime + .block_on(tokio::net::TcpListener::bind("127.0.0.1:0")) + .expect("Creates listener"); + + let local_addr = listener.local_addr().expect("Returns the local addr"); + + let registry = Registry::default(); + register( + prometheus::Counter::new(METRIC_NAME, "yeah").expect("Creates test counter"), + ®istry, + ) + .expect("Registers the test metric"); + + runtime.spawn(init_prometheus_with_listener(listener, registry)); + + runtime.block_on(async { + let client = Client::new(); + + let res = client + .get(Uri::try_from(&format!("http://{}/metrics", local_addr)).expect("Parses URI")) + .await + .expect("Requests metrics"); + + let buf = hyper::body::to_bytes(res) + .await + .expect("Converts body to bytes"); + + let body = String::from_utf8(buf.to_vec()).expect("Converts body to String"); + assert!(body.contains(&format!("{} 0", METRIC_NAME))); + }); + } +} diff --git a/hypercore/utils/prometheus/src/sourced.rs b/hypercore/utils/prometheus/src/sourced.rs new file mode 100644 index 00000000000..fc5f6020834 --- /dev/null +++ b/hypercore/utils/prometheus/src/sourced.rs @@ -0,0 +1,168 @@ +// This file is part of Gear. +// +// Copyright (C) 2024 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Metrics that are collected from existing sources. + +use prometheus::{ + core::{Collector, Desc, Describer, Number, Opts}, + proto, +}; +use std::{cmp::Ordering, marker::PhantomData}; + +/// A counter whose values are obtained from an existing source. +/// +/// > **Note*: The counter values provided by the source `S` +/// > must be monotonically increasing. Otherwise use a +/// > [`SourcedGauge`] instead. +pub type SourcedCounter<S> = SourcedMetric<Counter, S>; + +/// A gauge whose values are obtained from an existing source. +pub type SourcedGauge<S> = SourcedMetric<Gauge, S>; + +/// The type of a sourced counter. +#[derive(Copy, Clone)] +pub enum Counter {} + +/// The type of a sourced gauge. +#[derive(Copy, Clone)] +pub enum Gauge {} + +/// A metric whose values are obtained from an existing source, +/// instead of being independently recorded. +#[derive(Debug, Clone)] +pub struct SourcedMetric<T, S> { + source: S, + desc: Desc, + _type: PhantomData<T>, +} + +/// A source of values for a [`SourcedMetric`]. +pub trait MetricSource: Sync + Send + Clone { + /// The type of the collected values. + type N: Number; + /// Collects the current values of the metrics from the source. + fn collect(&self, set: impl FnMut(&[&str], Self::N)); +} + +impl<T: SourcedType, S: MetricSource> SourcedMetric<T, S> { + /// Creates a new metric that obtains its values from the given source. + pub fn new(opts: &Opts, source: S) -> prometheus::Result<Self> { + let desc = opts.describe()?; + Ok(Self { + source, + desc, + _type: PhantomData, + }) + } +} + +impl<T: SourcedType, S: MetricSource> Collector for SourcedMetric<T, S> { + fn desc(&self) -> Vec<&Desc> { + vec![&self.desc] + } + + fn collect(&self) -> Vec<proto::MetricFamily> { + let mut counters = Vec::new(); + + self.source.collect(|label_values, value| { + let mut m = proto::Metric::default(); + + match T::proto() { + proto::MetricType::COUNTER => { + let mut c = proto::Counter::default(); + c.set_value(value.into_f64()); + m.set_counter(c); + } + proto::MetricType::GAUGE => { + let mut g = proto::Gauge::default(); + g.set_value(value.into_f64()); + m.set_gauge(g); + } + t => { + log::error!("Unsupported sourced metric type: {:?}", t); + } + } + + debug_assert_eq!(self.desc.variable_labels.len(), label_values.len()); + match self.desc.variable_labels.len().cmp(&label_values.len()) { + Ordering::Greater => { + log::warn!( + "Missing label values for sourced metric {}", + self.desc.fq_name + ) + } + Ordering::Less => { + log::warn!( + "Too many label values for sourced metric {}", + self.desc.fq_name + ) + } + Ordering::Equal => {} + } + + m.set_label( + self.desc + .variable_labels + .iter() + .zip(label_values) + .map(|(l_name, l_value)| { + let mut l = proto::LabelPair::default(); + l.set_name(l_name.to_string()); + l.set_value(l_value.to_string()); + l + }) + .chain(self.desc.const_label_pairs.iter().cloned()) + .collect::<Vec<_>>(), + ); + + counters.push(m); + }); + + let mut m = proto::MetricFamily::default(); + m.set_name(self.desc.fq_name.clone()); + m.set_help(self.desc.help.clone()); + m.set_field_type(T::proto()); + m.set_metric(counters); + + vec![m] + } +} + +/// Types of metrics that can obtain their values from an existing source. +pub trait SourcedType: private::Sealed + Sync + Send { + #[doc(hidden)] + fn proto() -> proto::MetricType; +} + +impl SourcedType for Counter { + fn proto() -> proto::MetricType { + proto::MetricType::COUNTER + } +} + +impl SourcedType for Gauge { + fn proto() -> proto::MetricType { + proto::MetricType::GAUGE + } +} + +mod private { + pub trait Sealed {} + impl Sealed for super::Counter {} + impl Sealed for super::Gauge {} +}