From d0078df02529cae841a8d9469a4caae26ecd9697 Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Thu, 5 Dec 2024 09:49:30 +0200 Subject: [PATCH] =?UTF-8?q?refactor(api):=20Add=20more=20components=20to?= =?UTF-8?q?=20healthcheck=20=E2=80=93=20follow-ups=20(#3337)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Various minor follow-ups after https://github.com/matter-labs/zksync-era/pull/3193: - Rework app-level health details. - Fix `execution_time` unit of measurement for the database health check details. - Rework the database health check: do not hold a DB connection all the time; make it reactive. ## Why ❔ Makes the dependency graph lighter; simplifies maintenance. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`. --- Cargo.lock | 15 +-- Cargo.toml | 1 - core/bin/external_node/Cargo.toml | 1 - .../external_node/src/metrics/framework.rs | 3 - core/lib/bin_metadata/Cargo.toml | 18 --- core/lib/dal/src/system_dal.rs | 8 +- core/lib/health_check/Cargo.toml | 1 - core/lib/health_check/src/binary.rs | 21 ---- core/lib/health_check/src/lib.rs | 19 ++- core/lib/health_check/src/tests.rs | 1 + core/node/node_framework/Cargo.toml | 1 - .../layers/healtcheck_server.rs | 15 ++- .../src/implementations/layers/postgres.rs | 117 +++++++++--------- core/node/shared_metrics/Cargo.toml | 4 +- .../shared_metrics}/build.rs | 0 core/node/shared_metrics/src/lib.rs | 9 +- .../shared_metrics/src/metadata.rs} | 30 +++-- 17 files changed, 111 insertions(+), 153 deletions(-) delete mode 100644 core/lib/bin_metadata/Cargo.toml delete mode 100644 core/lib/health_check/src/binary.rs rename core/{lib/bin_metadata => node/shared_metrics}/build.rs (100%) rename core/{lib/bin_metadata/src/lib.rs => node/shared_metrics/src/metadata.rs} (72%) diff --git a/Cargo.lock b/Cargo.lock index f9e754902f61..bb4399d1ae78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11157,16 +11157,6 @@ dependencies = [ "zksync_pairing", ] -[[package]] -name = "zksync_bin_metadata" -version = "0.1.0" -dependencies = [ - "rustc_version 0.4.1", - "serde", - "tracing", - "vise", -] - [[package]] name = "zksync_block_reverter" version = "0.1.0" @@ -11848,7 +11838,6 @@ dependencies = [ "zksync_object_store", "zksync_protobuf_config", "zksync_reorg_detector", - "zksync_shared_metrics", "zksync_snapshots_applier", "zksync_state", "zksync_state_keeper", @@ -11938,7 +11927,6 @@ dependencies = [ "tokio", "tracing", "vise", - "zksync_bin_metadata", ] [[package]] @@ -12275,7 +12263,6 @@ dependencies = [ "tracing", "trybuild", "zksync_base_token_adjuster", - "zksync_bin_metadata", "zksync_block_reverter", "zksync_circuit_breaker", "zksync_commitment_generator", @@ -12622,10 +12609,10 @@ dependencies = [ name = "zksync_shared_metrics" version = "0.1.0" dependencies = [ + "rustc_version 0.4.1", "serde", "tracing", "vise", - "zksync_bin_metadata", "zksync_dal", "zksync_types", ] diff --git a/Cargo.toml b/Cargo.toml index 60099874cf39..80a1e4104265 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,7 +79,6 @@ members = [ # Test infrastructure "core/tests/loadnext", "core/tests/vm-benchmark", - "core/lib/bin_metadata", ] resolver = "2" diff --git a/core/bin/external_node/Cargo.toml b/core/bin/external_node/Cargo.toml index a69fdf263794..91bdcefa2ec0 100644 --- a/core/bin/external_node/Cargo.toml +++ b/core/bin/external_node/Cargo.toml @@ -29,7 +29,6 @@ zksync_health_check.workspace = true zksync_web3_decl.workspace = true zksync_types.workspace = true zksync_block_reverter.workspace = true -zksync_shared_metrics.workspace = true zksync_node_genesis.workspace = true zksync_node_fee_model.workspace = true zksync_node_db_pruner.workspace = true diff --git a/core/bin/external_node/src/metrics/framework.rs b/core/bin/external_node/src/metrics/framework.rs index 228af8aa0417..81c9e57d9b9a 100644 --- a/core/bin/external_node/src/metrics/framework.rs +++ b/core/bin/external_node/src/metrics/framework.rs @@ -5,7 +5,6 @@ use zksync_node_framework::{ implementations::resources::pools::{MasterPool, PoolResource}, FromContext, IntoContext, StopReceiver, Task, TaskId, WiringError, WiringLayer, }; -use zksync_shared_metrics::{GIT_METRICS, RUST_METRICS}; use zksync_types::{L1ChainId, L2ChainId, SLChainId}; use super::EN_METRICS; @@ -39,8 +38,6 @@ impl WiringLayer for ExternalNodeMetricsLayer { } async fn wire(self, input: Self::Input) -> Result { - RUST_METRICS.initialize(); - GIT_METRICS.initialize(); EN_METRICS.observe_config( self.l1_chain_id, self.sl_chain_id, diff --git a/core/lib/bin_metadata/Cargo.toml b/core/lib/bin_metadata/Cargo.toml deleted file mode 100644 index e529ecfb49a7..000000000000 --- a/core/lib/bin_metadata/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "zksync_bin_metadata" -version.workspace = true -edition.workspace = true -authors.workspace = true -homepage.workspace = true -repository.workspace = true -license.workspace = true -keywords.workspace = true -categories.workspace = true - -[dependencies] -serde.workspace = true -vise.workspace = true -tracing.workspace = true - -[build-dependencies] -rustc_version.workspace = true diff --git a/core/lib/dal/src/system_dal.rs b/core/lib/dal/src/system_dal.rs index 6f2e64b1c1c5..f4de4faf8eb3 100644 --- a/core/lib/dal/src/system_dal.rs +++ b/core/lib/dal/src/system_dal.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, time::Duration}; -use chrono::DateTime; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; @@ -14,11 +14,11 @@ pub(crate) struct TableSize { pub total_size: u64, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DatabaseMigration { pub version: i64, pub description: String, - pub installed_on: DateTime, + pub installed_on: DateTime, pub success: bool, pub checksum: String, pub execution_time: Duration, @@ -118,7 +118,7 @@ impl SystemDal<'_, '_> { installed_on: row.installed_on, success: row.success, checksum: hex::encode(row.checksum), - execution_time: Duration::from_millis(u64::try_from(row.execution_time).unwrap_or(0)), + execution_time: Duration::from_nanos(u64::try_from(row.execution_time).unwrap_or(0)), }) } } diff --git a/core/lib/health_check/Cargo.toml b/core/lib/health_check/Cargo.toml index 0e823c848ce5..6f1d863d8cec 100644 --- a/core/lib/health_check/Cargo.toml +++ b/core/lib/health_check/Cargo.toml @@ -20,7 +20,6 @@ serde_json.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["sync", "time"] } tracing.workspace = true -zksync_bin_metadata.workspace = true [dev-dependencies] assert_matches.workspace = true diff --git a/core/lib/health_check/src/binary.rs b/core/lib/health_check/src/binary.rs deleted file mode 100644 index b14ed2ed9392..000000000000 --- a/core/lib/health_check/src/binary.rs +++ /dev/null @@ -1,21 +0,0 @@ -use async_trait::async_trait; -use zksync_bin_metadata::BinMetadata; - -use crate::{CheckHealth, Health, HealthStatus}; - -impl From<&BinMetadata> for Health { - fn from(details: &BinMetadata) -> Self { - Self::from(HealthStatus::Ready).with_details(details) - } -} - -#[async_trait] -impl CheckHealth for BinMetadata { - fn name(&self) -> &'static str { - "metadata" - } - - async fn check_health(&self) -> Health { - self.into() - } -} diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index 7dcdb47aa2f9..76b1c4d8b0ff 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -11,12 +11,9 @@ pub use async_trait::async_trait; use futures::future; use serde::Serialize; use tokio::sync::watch; -use zksync_bin_metadata::BIN_METADATA; -use self::metrics::{CheckResult, METRICS}; -use crate::metrics::AppHealthCheckConfig; +use crate::metrics::{AppHealthCheckConfig, CheckResult, METRICS}; -mod binary; mod metrics; #[cfg(test)] @@ -114,6 +111,8 @@ pub struct AppHealthCheck { #[derive(Debug, Clone)] struct AppHealthCheckInner { + /// Application-level health details. + app_details: Option, components: Vec>, slow_time_limit: Duration, hard_time_limit: Duration, @@ -136,6 +135,7 @@ impl AppHealthCheck { let inner = AppHealthCheckInner { components: Vec::default(), + app_details: None, slow_time_limit, hard_time_limit, }; @@ -181,6 +181,13 @@ impl AppHealthCheck { } } + /// Sets app-level health details. They can include build info etc. + pub fn set_details(&self, details: impl Serialize) { + let details = serde_json::to_value(details).expect("failed serializing app details"); + let mut inner = self.inner.lock().expect("`AppHealthCheck` is poisoned"); + inner.app_details = Some(details); + } + /// Inserts health check for a component. /// /// # Errors @@ -220,6 +227,7 @@ impl AppHealthCheck { // Clone `inner` so that we don't hold a lock for them across a wait point. let AppHealthCheckInner { components, + app_details, slow_time_limit, hard_time_limit, } = self @@ -238,7 +246,8 @@ impl AppHealthCheck { .map(|health| health.status) .max_by_key(|status| status.priority_for_aggregation()) .unwrap_or(HealthStatus::Ready); - let inner = Health::with_details(aggregated_status.into(), BIN_METADATA); + let mut inner = Health::from(aggregated_status); + inner.details = app_details.clone(); let health = AppHealth { inner, components }; if !health.inner.status.is_healthy() { diff --git a/core/lib/health_check/src/tests.rs b/core/lib/health_check/src/tests.rs index 14c610e9fd83..76863db05415 100644 --- a/core/lib/health_check/src/tests.rs +++ b/core/lib/health_check/src/tests.rs @@ -82,6 +82,7 @@ async fn aggregating_health_checks() { let (first_check, first_updater) = ReactiveHealthCheck::new("first"); let (second_check, second_updater) = ReactiveHealthCheck::new("second"); let inner = AppHealthCheckInner { + app_details: None, components: vec![Arc::new(first_check), Arc::new(second_check)], slow_time_limit: AppHealthCheck::DEFAULT_SLOW_TIME_LIMIT, hard_time_limit: AppHealthCheck::DEFAULT_HARD_TIME_LIMIT, diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index 6334495885f3..eec9b8ef4b7a 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -41,7 +41,6 @@ zksync_vm_executor.workspace = true zksync_state_keeper.workspace = true zksync_consistency_checker.workspace = true zksync_metadata_calculator.workspace = true -zksync_bin_metadata.workspace = true zksync_node_sync.workspace = true zksync_node_api_server.workspace = true zksync_node_consensus.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/healtcheck_server.rs b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs index 83a74c63cb45..3a4e3ca11569 100644 --- a/core/node/node_framework/src/implementations/layers/healtcheck_server.rs +++ b/core/node/node_framework/src/implementations/layers/healtcheck_server.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use zksync_config::configs::api::HealthCheckConfig; use zksync_health_check::AppHealthCheck; use zksync_node_api_server::healthcheck::HealthCheckHandle; +use zksync_shared_metrics::metadata::{GitMetadata, RustMetadata, GIT_METRICS, RUST_METRICS}; +use zksync_web3_decl::jsonrpsee::core::Serialize; use crate::{ implementations::resources::healthcheck::AppHealthCheckResource, @@ -12,6 +14,13 @@ use crate::{ FromContext, IntoContext, }; +/// Full metadata of the compiled binary. +#[derive(Debug, Serialize)] +pub struct BinMetadata { + pub rust: &'static RustMetadata, + pub git: &'static GitMetadata, +} + /// Wiring layer for health check server /// /// Expects other layers to insert different components' health checks @@ -73,8 +82,12 @@ impl Task for HealthCheckTask { } async fn run(mut self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + self.app_health_check.set_details(BinMetadata { + rust: RUST_METRICS.initialize(), + git: GIT_METRICS.initialize(), + }); let handle = - HealthCheckHandle::spawn_server(self.config.bind_addr(), self.app_health_check.clone()); + HealthCheckHandle::spawn_server(self.config.bind_addr(), self.app_health_check); stop_receiver.0.changed().await?; handle.stop().await; diff --git a/core/node/node_framework/src/implementations/layers/postgres.rs b/core/node/node_framework/src/implementations/layers/postgres.rs index 8a81b8709895..bf602f1de631 100644 --- a/core/node/node_framework/src/implementations/layers/postgres.rs +++ b/core/node/node_framework/src/implementations/layers/postgres.rs @@ -1,11 +1,15 @@ -use std::time::Duration; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; -use serde::{Deserialize, Serialize}; -use tokio::sync::watch; +use async_trait::async_trait; +use serde::Serialize; +use tokio::sync::RwLock; use zksync_dal::{ metrics::PostgresMetrics, system_dal::DatabaseMigration, ConnectionPool, Core, CoreDal, }; -use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; +use zksync_health_check::{CheckHealth, Health, HealthStatus}; use crate::{ implementations::resources::{ @@ -38,8 +42,6 @@ pub struct Input { pub struct Output { #[context(task)] pub metrics_task: PostgresMetricsScrapingTask, - #[context(task)] - pub health_task: DatabaseHealthTask, } #[async_trait::async_trait] @@ -58,16 +60,15 @@ impl WiringLayer for PostgresLayer { }; let app_health = input.app_health.0; - let health_task = DatabaseHealthTask::new(pool); - app_health - .insert_component(health_task.health_check()) + .insert_custom_component(Arc::new(DatabaseHealthCheck { + polling_interval: TASK_EXECUTION_INTERVAL, + pool, + cached: RwLock::default(), + })) .map_err(WiringError::internal)?; - Ok(Output { - metrics_task, - health_task, - }) + Ok(Output { metrics_task }) } } @@ -99,7 +100,7 @@ impl Task for PostgresMetricsScrapingTask { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] pub struct DatabaseInfo { last_migration: DatabaseMigration, } @@ -111,62 +112,60 @@ impl From for Health { } #[derive(Debug)] -pub struct DatabaseHealthTask { +struct DatabaseHealthCheck { polling_interval: Duration, - connection_pool: ConnectionPool, - updater: HealthUpdater, + pool: ConnectionPool, + cached: RwLock>, } -impl DatabaseHealthTask { - fn new(connection_pool: ConnectionPool) -> Self { - Self { - polling_interval: TASK_EXECUTION_INTERVAL, - connection_pool, - updater: ReactiveHealthCheck::new("database").1, - } +impl DatabaseHealthCheck { + async fn update(&self) -> anyhow::Result { + let mut conn = self.pool.connection_tagged("postgres_healthcheck").await?; + let last_migration = conn.system_dal().get_last_migration().await?; + Ok(DatabaseInfo { last_migration }) } - async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> - where - Self: Sized, - { - let timeout = self.polling_interval; - let mut conn = self - .connection_pool - .connection_tagged("postgres_healthcheck") - .await?; - - tracing::info!("Starting database healthcheck with frequency: {timeout:?}",); - - while !*stop_receiver.borrow_and_update() { - let last_migration = conn.system_dal().get_last_migration().await?; - self.updater.update(DatabaseInfo { last_migration }.into()); - - // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. - tokio::time::timeout(timeout, stop_receiver.changed()) - .await - .ok(); + fn validate_cache(&self, cache: Option<&(DatabaseInfo, Instant)>) -> Option { + let now = Instant::now(); + if let Some((cached, cached_at)) = cache { + let elapsed = now + .checked_duration_since(*cached_at) + .unwrap_or(Duration::ZERO); + (elapsed <= self.polling_interval).then(|| cached.clone()) + } else { + None } - tracing::info!("Stop signal received; database healthcheck is shut down"); - Ok(()) - } - - pub fn health_check(&self) -> ReactiveHealthCheck { - self.updater.subscribe() } } -#[async_trait::async_trait] -impl Task for DatabaseHealthTask { - fn kind(&self) -> TaskKind { - TaskKind::UnconstrainedTask +#[async_trait] +impl CheckHealth for DatabaseHealthCheck { + fn name(&self) -> &'static str { + "database" } - fn id(&self) -> TaskId { - "database_health".into() - } + // If the DB malfunctions, this method would time out, which would lead to the health check marked as failed. + async fn check_health(&self) -> Health { + let cached = self.cached.read().await.clone(); + if let Some(cache) = self.validate_cache(cached.as_ref()) { + return cache.into(); + } - async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - (*self).run(stop_receiver.0).await + let mut cached_lock = self.cached.write().await; + // The cached value may have been updated by another task. + if let Some(cache) = self.validate_cache(cached_lock.as_ref()) { + return cache.into(); + } + + match self.update().await { + Ok(info) => { + *cached_lock = Some((info.clone(), Instant::now())); + info.into() + } + Err(err) => { + tracing::warn!("Error updating database health: {err:#}"); + cached.map_or_else(|| HealthStatus::Affected.into(), |(info, _)| info.into()) + } + } } } diff --git a/core/node/shared_metrics/Cargo.toml b/core/node/shared_metrics/Cargo.toml index 23c669b4f963..618888ffddc0 100644 --- a/core/node/shared_metrics/Cargo.toml +++ b/core/node/shared_metrics/Cargo.toml @@ -16,4 +16,6 @@ vise.workspace = true tracing.workspace = true zksync_types.workspace = true zksync_dal.workspace = true -zksync_bin_metadata.workspace = true + +[build-dependencies] +rustc_version.workspace = true diff --git a/core/lib/bin_metadata/build.rs b/core/node/shared_metrics/build.rs similarity index 100% rename from core/lib/bin_metadata/build.rs rename to core/node/shared_metrics/build.rs diff --git a/core/node/shared_metrics/src/lib.rs b/core/node/shared_metrics/src/lib.rs index e37764c5a6d7..001293a72bc2 100644 --- a/core/node/shared_metrics/src/lib.rs +++ b/core/node/shared_metrics/src/lib.rs @@ -5,10 +5,11 @@ use std::{fmt, time::Duration}; use vise::{ Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit, }; -use zksync_bin_metadata::{GitMetrics, RustMetrics}; use zksync_dal::transactions_dal::L2TxSubmissionResult; use zksync_types::aggregated_operations::AggregatedActionType; +pub mod metadata; + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "stage", rename_all = "snake_case")] pub enum SnapshotRecoveryStage { @@ -195,9 +196,3 @@ pub struct ExternalNodeMetrics { #[vise::register] pub static EN_METRICS: vise::Global = vise::Global::new(); - -#[vise::register] -pub static RUST_METRICS: vise::Global = vise::Global::new(); - -#[vise::register] -pub static GIT_METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/bin_metadata/src/lib.rs b/core/node/shared_metrics/src/metadata.rs similarity index 72% rename from core/lib/bin_metadata/src/lib.rs rename to core/node/shared_metrics/src/metadata.rs index d8a5221e4775..bc7e52ae1e97 100644 --- a/core/lib/bin_metadata/src/lib.rs +++ b/core/node/shared_metrics/src/metadata.rs @@ -3,24 +3,12 @@ use vise::{EncodeLabelSet, Info, Metrics}; use self::values::{GIT_METADATA, RUST_METADATA}; -pub mod values { +mod values { use super::{GitMetadata, RustMetadata}; include!(concat!(env!("OUT_DIR"), "/metadata_values.rs")); } -pub const BIN_METADATA: BinMetadata = BinMetadata { - rust: RUST_METADATA, - git: GIT_METADATA, -}; - -/// Metadata of the compiled binary. -#[derive(Debug, Serialize)] -pub struct BinMetadata { - pub rust: RustMetadata, - pub git: GitMetadata, -} - /// Rust metadata of the compiled binary. #[derive(Debug, EncodeLabelSet, Serialize)] pub struct RustMetadata { @@ -47,22 +35,32 @@ pub struct RustMetrics { } impl RustMetrics { - pub fn initialize(&self) { + pub fn initialize(&self) -> &RustMetadata { tracing::info!("Rust metadata for this binary: {RUST_METADATA:?}"); self.info.set(RUST_METADATA).ok(); + // `unwrap` is safe due to setting the value above + self.info.get().unwrap() } } #[derive(Debug, Metrics)] -#[metrics(prefix = "git_info")] +#[metrics(prefix = "git")] pub struct GitMetrics { /// General information about the compiled binary. info: Info, } impl GitMetrics { - pub fn initialize(&self) { + pub fn initialize(&self) -> &GitMetadata { tracing::info!("Git metadata for this binary: {GIT_METADATA:?}"); self.info.set(GIT_METADATA).ok(); + // `unwrap` is safe due to setting the value above + self.info.get().unwrap() } } + +#[vise::register] +pub static RUST_METRICS: vise::Global = vise::Global::new(); + +#[vise::register] +pub static GIT_METRICS: vise::Global = vise::Global::new();