From 193c855fc593f53b63d3eab9784ee2dae255205c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C3=ADas=20Ignacio=20Gonz=C3=A1lez?= Date: Mon, 25 Nov 2024 11:32:53 -0300 Subject: [PATCH 1/2] chore: Upgrade rustls to fix security vulnerability (#3331) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Upgrade rustls ## Why ❔ ``` error[vulnerability]: rustls network-reachable panic in `Acceptor::accept` ┌─ /github/workspace/Cargo.lock:601:1 │ 601 │ rustls 0.23.16 registry+https://github.com/rust-lang/crates.io-index │ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ security vulnerability detected │ ├ ID: RUSTSEC-[20](https://github.com/matter-labs/zksync-era/actions/runs/12011183823/job/33479628638?pr=3199#step:4:21)24-0399 ├ Advisory: https://rustsec.org/advisories/RUSTSEC-2024-0399 ├ A bug introduced in rustls 0.23.13 leads to a panic if the received TLS ClientHello is fragmented. Only servers that use `rustls::server::Acceptor::accept()` are affected. Servers that use `tokio-rustls`'s `LazyConfigAcceptor` API are affected. Servers that use `tokio-rustls`'s `TlsAcceptor` API are not affected. Servers that use `rustls-ffi`'s `rustls_acceptor_accept` API are affected. ├ Announcement: https://github.com/rustls/rustls/issues/[22](https://github.com/matter-labs/zksync-era/actions/runs/12011183823/job/33479628638?pr=3199#step:4:23)27 ├ Solution: Upgrade to >=0.[23](https://github.com/matter-labs/zksync-era/actions/runs/12011183823/job/33479628638?pr=3199#step:4:24).18 (try `cargo update -p rustls`) ``` --- Cargo.lock | 32 ++++++++++++++++---------------- deny.toml | 1 - 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3df58a9634e2..25bd07547b4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1040,7 +1040,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.10.5", "lazy_static", "lazycell", "log", @@ -4156,7 +4156,7 @@ dependencies = [ "hyper 1.5.0", "hyper-util", "log", - "rustls 0.23.16", + "rustls 0.23.18", "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", @@ -4568,7 +4568,7 @@ dependencies = [ "http 1.1.0", "jsonrpsee-core 0.23.2", "pin-project", - "rustls 0.23.16", + "rustls 0.23.18", "rustls-pki-types", "rustls-platform-verifier", "soketto 0.8.0", @@ -4667,7 +4667,7 @@ dependencies = [ "hyper-util", "jsonrpsee-core 0.23.2", "jsonrpsee-types 0.23.2", - "rustls 0.23.16", + "rustls 0.23.18", "rustls-platform-verifier", "serde", "serde_json", @@ -4920,7 +4920,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -6588,7 +6588,7 @@ checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.12.1", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -6608,7 +6608,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2 1.0.89", "quote 1.0.37", "syn 2.0.85", @@ -6798,7 +6798,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.0.0", - "rustls 0.23.16", + "rustls 0.23.18", "socket2", "thiserror", "tokio", @@ -6815,7 +6815,7 @@ dependencies = [ "rand 0.8.5", "ring", "rustc-hash 2.0.0", - "rustls 0.23.16", + "rustls 0.23.18", "slab", "thiserror", "tinyvec", @@ -7118,7 +7118,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.16", + "rustls 0.23.18", "rustls-native-certs 0.8.0", "rustls-pemfile 2.2.0", "rustls-pki-types", @@ -7434,9 +7434,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.16" +version = "0.23.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e" +checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" dependencies = [ "aws-lc-rs", "log", @@ -7521,7 +7521,7 @@ dependencies = [ "jni", "log", "once_cell", - "rustls 0.23.16", + "rustls 0.23.18", "rustls-native-certs 0.7.3", "rustls-platform-verifier-android", "rustls-webpki 0.102.8", @@ -9620,7 +9620,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.16", + "rustls 0.23.18", "rustls-pki-types", "tokio", ] @@ -11494,7 +11494,7 @@ dependencies = [ "octocrab", "regex", "reqwest 0.12.9", - "rustls 0.23.16", + "rustls 0.23.18", "semver 1.0.23", "serde", "serde_json", @@ -12965,7 +12965,7 @@ dependencies = [ "pin-project-lite", "rand 0.8.5", "rlp", - "rustls 0.23.16", + "rustls 0.23.18", "serde", "serde_json", "test-casing", diff --git a/deny.toml b/deny.toml index 13ce6504107f..d72f3823761f 100644 --- a/deny.toml +++ b/deny.toml @@ -12,7 +12,6 @@ ignore = [ "RUSTSEC-2020-0168", # mach dependency being unmaintained, dependency in consensus, we should consider moving to mach2 fork "RUSTSEC-2024-0370", # `cs_derive` needs to be updated to not rely on `proc-macro-error` # all below caused by StructOpt which we still use and we should move to clap v4 instead - "RUSTSEC-2024-0375", "RUSTSEC-2021-0145", "RUSTSEC-2021-0139", "RUSTSEC-2024-0388", # `derivative` is unmaintained, crypto dependenicies (boojum, circuit_encodings and others) rely on it From 440fe8d8afdf0fc2768692a1b40b0910873e2faf Mon Sep 17 00:00:00 2001 From: Manuel Mauro Date: Tue, 26 Nov 2024 10:31:01 +0100 Subject: [PATCH 2/2] feat(zksync_cli): Health checkpoint improvements (#3193) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Add three new components to node's healthcheck: - General (i.e., version, last migration) - State Keeper - Eth Sender ## Why ❔ ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`. --------- Co-authored-by: Danil --- Cargo.lock | 21 ++- Cargo.toml | 2 + .../external_node/src/metrics/framework.rs | 3 +- core/bin/external_node/src/node_builder.rs | 8 +- core/bin/zksync_server/src/node_builder.rs | 10 +- core/lib/bin_metadata/Cargo.toml | 18 ++ core/lib/bin_metadata/build.rs | 75 ++++++++ core/lib/bin_metadata/src/lib.rs | 68 +++++++ ...3613cd34138b59ef3b9271fd0bfdaddd086f8.json | 50 +++++ core/lib/dal/src/eth_sender_dal.rs | 3 +- core/lib/dal/src/system_dal.rs | 35 ++++ core/lib/health_check/Cargo.toml | 1 + core/lib/health_check/src/binary.rs | 21 +++ core/lib/health_check/src/lib.rs | 5 +- core/lib/types/src/aggregated_operations.rs | 4 +- core/node/consensus/src/testonly.rs | 6 +- core/node/eth_sender/Cargo.toml | 2 + core/node/eth_sender/src/eth_tx_aggregator.rs | 19 ++ core/node/eth_sender/src/eth_tx_manager.rs | 20 ++ core/node/eth_sender/src/health.rs | 68 +++++++ core/node/eth_sender/src/lib.rs | 1 + .../house_keeper/src/blocks_state_reporter.rs | 5 +- core/node/node_framework/Cargo.toml | 3 + .../layers/eth_sender/aggregator.rs | 9 + .../layers/eth_sender/manager.rs | 9 + .../implementations/layers/house_keeper.rs | 2 +- .../src/implementations/layers/mod.rs | 2 +- .../src/implementations/layers/postgres.rs | 172 ++++++++++++++++++ .../layers/postgres_metrics.rs | 75 -------- .../layers/state_keeper/mod.rs | 51 +++--- core/node/node_sync/src/tests.rs | 3 +- core/node/shared_metrics/Cargo.toml | 5 +- core/node/shared_metrics/build.rs | 46 ----- core/node/shared_metrics/src/lib.rs | 9 +- core/node/shared_metrics/src/rustc.rs | 36 ---- core/node/state_keeper/Cargo.toml | 2 + core/node/state_keeper/src/health.rs | 30 +++ core/node/state_keeper/src/keeper.rs | 86 ++++++--- core/node/state_keeper/src/lib.rs | 1 + .../src/testonly/test_batch_executor.rs | 3 +- core/node/state_keeper/src/tests/mod.rs | 3 +- core/node/state_keeper/src/utils.rs | 5 + 42 files changed, 756 insertions(+), 241 deletions(-) create mode 100644 core/lib/bin_metadata/Cargo.toml create mode 100644 core/lib/bin_metadata/build.rs create mode 100644 core/lib/bin_metadata/src/lib.rs create mode 100644 core/lib/dal/.sqlx/query-3566423188a5d6bed7150f327d83613cd34138b59ef3b9271fd0bfdaddd086f8.json create mode 100644 core/lib/health_check/src/binary.rs create mode 100644 core/node/eth_sender/src/health.rs create mode 100644 core/node/node_framework/src/implementations/layers/postgres.rs delete mode 100644 core/node/node_framework/src/implementations/layers/postgres_metrics.rs delete mode 100644 core/node/shared_metrics/build.rs delete mode 100644 core/node/shared_metrics/src/rustc.rs create mode 100644 core/node/state_keeper/src/health.rs diff --git a/Cargo.lock b/Cargo.lock index 25bd07547b4b..e8f33f35c13d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11157,6 +11157,16 @@ dependencies = [ "zksync_pairing", ] +[[package]] +name = "zksync_bin_metadata" +version = "0.1.0" +dependencies = [ + "rustc_version 0.4.1", + "serde", + "tracing", + "vise", +] + [[package]] name = "zksync_block_reverter" version = "0.1.0" @@ -11735,6 +11745,7 @@ dependencies = [ "async-trait", "chrono", "once_cell", + "serde", "test-casing", "test-log", "thiserror", @@ -11745,6 +11756,7 @@ dependencies = [ "zksync_contracts", "zksync_dal", "zksync_eth_client", + "zksync_health_check", "zksync_l1_contract_interface", "zksync_node_fee_model", "zksync_node_test_utils", @@ -11921,6 +11933,7 @@ dependencies = [ "tokio", "tracing", "vise", + "zksync_bin_metadata", ] [[package]] @@ -12250,11 +12263,13 @@ dependencies = [ "futures 0.3.31", "pin-project-lite", "semver 1.0.23", + "serde", "thiserror", "tokio", "tracing", "trybuild", "zksync_base_token_adjuster", + "zksync_bin_metadata", "zksync_block_reverter", "zksync_circuit_breaker", "zksync_commitment_generator", @@ -12288,6 +12303,7 @@ dependencies = [ "zksync_proof_data_handler", "zksync_queued_job_processor", "zksync_reorg_detector", + "zksync_shared_metrics", "zksync_state", "zksync_state_keeper", "zksync_storage", @@ -12600,9 +12616,10 @@ dependencies = [ name = "zksync_shared_metrics" version = "0.1.0" dependencies = [ - "rustc_version 0.4.1", + "serde", "tracing", "vise", + "zksync_bin_metadata", "zksync_dal", "zksync_types", ] @@ -12683,6 +12700,7 @@ dependencies = [ "itertools 0.10.5", "once_cell", "rand 0.8.5", + "serde", "tempfile", "test-casing", "thiserror", @@ -12694,6 +12712,7 @@ dependencies = [ "zksync_contracts", "zksync_dal", "zksync_eth_client", + "zksync_health_check", "zksync_mempool", "zksync_multivm", "zksync_node_fee_model", diff --git a/Cargo.toml b/Cargo.toml index 44a00196fb76..88173e07af4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,7 @@ members = [ # Test infrastructure "core/tests/loadnext", "core/tests/vm-benchmark", + "core/lib/bin_metadata", ] resolver = "2" @@ -277,6 +278,7 @@ zksync_health_check = { version = "0.1.0", path = "core/lib/health_check" } zksync_l1_contract_interface = { version = "0.1.0", path = "core/lib/l1_contract_interface" } zksync_mempool = { version = "0.1.0", path = "core/lib/mempool" } zksync_merkle_tree = { version = "0.1.0", path = "core/lib/merkle_tree" } +zksync_bin_metadata = { version = "0.1.0", path = "core/lib/bin_metadata" } zksync_mini_merkle_tree = { version = "0.1.0", path = "core/lib/mini_merkle_tree" } zksync_object_store = { version = "0.1.0", path = "core/lib/object_store" } zksync_protobuf_config = { version = "0.1.0", path = "core/lib/protobuf_config" } diff --git a/core/bin/external_node/src/metrics/framework.rs b/core/bin/external_node/src/metrics/framework.rs index fc9d4fe51345..228af8aa0417 100644 --- a/core/bin/external_node/src/metrics/framework.rs +++ b/core/bin/external_node/src/metrics/framework.rs @@ -5,7 +5,7 @@ use zksync_node_framework::{ implementations::resources::pools::{MasterPool, PoolResource}, FromContext, IntoContext, StopReceiver, Task, TaskId, WiringError, WiringLayer, }; -use zksync_shared_metrics::rustc::RUST_METRICS; +use zksync_shared_metrics::{GIT_METRICS, RUST_METRICS}; use zksync_types::{L1ChainId, L2ChainId, SLChainId}; use super::EN_METRICS; @@ -40,6 +40,7 @@ impl WiringLayer for ExternalNodeMetricsLayer { async fn wire(self, input: Self::Input) -> Result { RUST_METRICS.initialize(); + GIT_METRICS.initialize(); EN_METRICS.observe_config( self.l1_chain_id, self.sl_chain_id, diff --git a/core/bin/external_node/src/node_builder.rs b/core/bin/external_node/src/node_builder.rs index 5c70fd436781..c5d9228e9930 100644 --- a/core/bin/external_node/src/node_builder.rs +++ b/core/bin/external_node/src/node_builder.rs @@ -33,7 +33,7 @@ use zksync_node_framework::{ NodeStorageInitializerLayer, }, pools_layer::PoolsLayerBuilder, - postgres_metrics::PostgresMetricsLayer, + postgres::PostgresLayer, prometheus_exporter::PrometheusExporterLayer, pruning::PruningLayer, query_eth_client::QueryEthClientLayer, @@ -125,8 +125,8 @@ impl ExternalNodeBuilder { Ok(self) } - fn add_postgres_metrics_layer(mut self) -> anyhow::Result { - self.node.add_layer(PostgresMetricsLayer); + fn add_postgres_layer(mut self) -> anyhow::Result { + self.node.add_layer(PostgresLayer); Ok(self) } @@ -582,7 +582,7 @@ impl ExternalNodeBuilder { // so until we have a dedicated component for "auxiliary" tasks, // it's responsible for things like metrics. self = self - .add_postgres_metrics_layer()? + .add_postgres_layer()? .add_external_node_metrics_layer()?; // We assign the storage initialization to the core, as it's considered to be // the "main" component. diff --git a/core/bin/zksync_server/src/node_builder.rs b/core/bin/zksync_server/src/node_builder.rs index 794c847a24d5..7e7d219856ca 100644 --- a/core/bin/zksync_server/src/node_builder.rs +++ b/core/bin/zksync_server/src/node_builder.rs @@ -48,7 +48,7 @@ use zksync_node_framework::{ object_store::ObjectStoreLayer, pk_signing_eth_client::PKSigningEthClientLayer, pools_layer::PoolsLayerBuilder, - postgres_metrics::PostgresMetricsLayer, + postgres::PostgresLayer, prometheus_exporter::PrometheusExporterLayer, proof_data_handler::ProofDataHandlerLayer, query_eth_client::QueryEthClientLayer, @@ -138,8 +138,8 @@ impl MainNodeBuilder { Ok(self) } - fn add_postgres_metrics_layer(mut self) -> anyhow::Result { - self.node.add_layer(PostgresMetricsLayer); + fn add_postgres_layer(mut self) -> anyhow::Result { + self.node.add_layer(PostgresLayer); Ok(self) } @@ -760,9 +760,7 @@ impl MainNodeBuilder { self = self.add_eth_tx_manager_layer()?; } Component::Housekeeper => { - self = self - .add_house_keeper_layer()? - .add_postgres_metrics_layer()?; + self = self.add_house_keeper_layer()?.add_postgres_layer()?; } Component::ProofDataHandler => { self = self.add_proof_data_handler_layer()?; diff --git a/core/lib/bin_metadata/Cargo.toml b/core/lib/bin_metadata/Cargo.toml new file mode 100644 index 000000000000..e529ecfb49a7 --- /dev/null +++ b/core/lib/bin_metadata/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "zksync_bin_metadata" +version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +keywords.workspace = true +categories.workspace = true + +[dependencies] +serde.workspace = true +vise.workspace = true +tracing.workspace = true + +[build-dependencies] +rustc_version.workspace = true diff --git a/core/lib/bin_metadata/build.rs b/core/lib/bin_metadata/build.rs new file mode 100644 index 000000000000..ff339ae5589c --- /dev/null +++ b/core/lib/bin_metadata/build.rs @@ -0,0 +1,75 @@ +//! Build script for the external node binary. + +use std::{ + env, fs, + io::{self, Write}, + path::Path, + process::Command, +}; + +use rustc_version::{Channel, LlvmVersion}; + +fn print_binary_meta(out: &mut impl Write) -> io::Result<()> { + let rustc_meta = rustc_version::version_meta().expect("Failed obtaining rustc metadata"); + + writeln!( + out, + "pub const RUST_METADATA: RustMetadata = RustMetadata {{ \ + version: {semver:?}, \ + commit_hash: {commit_hash:?}, \ + commit_date: {commit_date:?}, \ + channel: {channel:?}, \ + host: {host:?}, \ + llvm: {llvm:?}, \ + }}; + + pub const GIT_METADATA: GitMetadata = GitMetadata {{ \ + branch: {git_branch:?}, \ + revision: {git_revision:?} \ + }};", + semver = rustc_meta.semver.to_string(), + commit_hash = rustc_meta.commit_hash, + commit_date = rustc_meta.commit_date, + channel = match rustc_meta.channel { + Channel::Dev => "dev", + Channel::Beta => "beta", + Channel::Nightly => "nightly", + Channel::Stable => "stable", + }, + host = rustc_meta.host, + llvm = rustc_meta.llvm_version.as_ref().map(LlvmVersion::to_string), + git_branch = git_branch(), + git_revision = git_revision() + ) +} + +/// Outputs the current git branch as a string literal. +pub fn git_branch() -> Option { + run_cmd_opt("git", &["rev-parse", "--abbrev-ref", "HEAD"]) +} + +/// Outputs the current git commit hash as a string literal. +pub fn git_revision() -> Option { + run_cmd_opt("git", &["rev-parse", "--short", "HEAD"]) +} + +fn run_cmd_opt(cmd: &str, args: &[&str]) -> Option { + let output = Command::new(cmd).args(args).output().ok()?; + if output.status.success() { + String::from_utf8(output.stdout) + .ok() + .map(|s| s.trim().to_string()) + } else { + None + } +} + +fn main() { + let out_dir = env::var("OUT_DIR").expect("`OUT_DIR` env var not set for build script"); + let metadata_module_path = Path::new(&out_dir).join("metadata_values.rs"); + let metadata_module = + fs::File::create(metadata_module_path).expect("cannot create metadata module"); + let mut metadata_module = io::BufWriter::new(metadata_module); + + print_binary_meta(&mut metadata_module).expect("failed printing binary metadata"); +} diff --git a/core/lib/bin_metadata/src/lib.rs b/core/lib/bin_metadata/src/lib.rs new file mode 100644 index 000000000000..d8a5221e4775 --- /dev/null +++ b/core/lib/bin_metadata/src/lib.rs @@ -0,0 +1,68 @@ +use serde::Serialize; +use vise::{EncodeLabelSet, Info, Metrics}; + +use self::values::{GIT_METADATA, RUST_METADATA}; + +pub mod values { + use super::{GitMetadata, RustMetadata}; + + include!(concat!(env!("OUT_DIR"), "/metadata_values.rs")); +} + +pub const BIN_METADATA: BinMetadata = BinMetadata { + rust: RUST_METADATA, + git: GIT_METADATA, +}; + +/// Metadata of the compiled binary. +#[derive(Debug, Serialize)] +pub struct BinMetadata { + pub rust: RustMetadata, + pub git: GitMetadata, +} + +/// Rust metadata of the compiled binary. +#[derive(Debug, EncodeLabelSet, Serialize)] +pub struct RustMetadata { + pub version: &'static str, + pub commit_hash: Option<&'static str>, + pub commit_date: Option<&'static str>, + pub channel: &'static str, + pub host: &'static str, + pub llvm: Option<&'static str>, +} + +/// Git metadata of the compiled binary. +#[derive(Debug, EncodeLabelSet, Serialize)] +pub struct GitMetadata { + pub branch: Option<&'static str>, + pub revision: Option<&'static str>, +} + +#[derive(Debug, Metrics)] +#[metrics(prefix = "rust")] +pub struct RustMetrics { + /// General information about the compiled binary. + info: Info, +} + +impl RustMetrics { + pub fn initialize(&self) { + tracing::info!("Rust metadata for this binary: {RUST_METADATA:?}"); + self.info.set(RUST_METADATA).ok(); + } +} + +#[derive(Debug, Metrics)] +#[metrics(prefix = "git_info")] +pub struct GitMetrics { + /// General information about the compiled binary. + info: Info, +} + +impl GitMetrics { + pub fn initialize(&self) { + tracing::info!("Git metadata for this binary: {GIT_METADATA:?}"); + self.info.set(GIT_METADATA).ok(); + } +} diff --git a/core/lib/dal/.sqlx/query-3566423188a5d6bed7150f327d83613cd34138b59ef3b9271fd0bfdaddd086f8.json b/core/lib/dal/.sqlx/query-3566423188a5d6bed7150f327d83613cd34138b59ef3b9271fd0bfdaddd086f8.json new file mode 100644 index 000000000000..123afc6060a6 --- /dev/null +++ b/core/lib/dal/.sqlx/query-3566423188a5d6bed7150f327d83613cd34138b59ef3b9271fd0bfdaddd086f8.json @@ -0,0 +1,50 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT *\n FROM _sqlx_migrations\n ORDER BY _sqlx_migrations.version DESC\n LIMIT 1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "version", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "description", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "installed_on", + "type_info": "Timestamptz" + }, + { + "ordinal": 3, + "name": "success", + "type_info": "Bool" + }, + { + "ordinal": 4, + "name": "checksum", + "type_info": "Bytea" + }, + { + "ordinal": 5, + "name": "execution_time", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false, + false, + false, + false, + false + ] + }, + "hash": "3566423188a5d6bed7150f327d83613cd34138b59ef3b9271fd0bfdaddd086f8" +} diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index 4ce76547ac9b..4efead269d1b 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -669,7 +669,7 @@ impl EthSenderDal<'_, '_> { Ok(()) } - pub async fn get_number_of_failed_transactions(&mut self) -> anyhow::Result { + pub async fn get_number_of_failed_transactions(&mut self) -> anyhow::Result { sqlx::query!( r#" SELECT @@ -683,6 +683,7 @@ impl EthSenderDal<'_, '_> { .fetch_one(self.storage.conn()) .await? .count + .map(|c| c as u64) .context("count field is missing") } diff --git a/core/lib/dal/src/system_dal.rs b/core/lib/dal/src/system_dal.rs index 105665fa2ec6..6f2e64b1c1c5 100644 --- a/core/lib/dal/src/system_dal.rs +++ b/core/lib/dal/src/system_dal.rs @@ -1,5 +1,7 @@ use std::{collections::HashMap, time::Duration}; +use chrono::DateTime; +use serde::{Deserialize, Serialize}; use zksync_db_connection::{connection::Connection, error::DalResult, instrument::InstrumentExt}; use crate::Core; @@ -12,6 +14,16 @@ pub(crate) struct TableSize { pub total_size: u64, } +#[derive(Debug, Serialize, Deserialize)] +pub struct DatabaseMigration { + pub version: i64, + pub description: String, + pub installed_on: DateTime, + pub success: bool, + pub checksum: String, + pub execution_time: Duration, +} + #[derive(Debug)] pub struct SystemDal<'a, 'c> { pub(crate) storage: &'a mut Connection<'c, Core>, @@ -86,4 +98,27 @@ impl SystemDal<'_, '_> { }); Ok(table_sizes.collect()) } + + pub async fn get_last_migration(&mut self) -> DalResult { + let row = sqlx::query!( + r#" + SELECT * + FROM _sqlx_migrations + ORDER BY _sqlx_migrations.version DESC + LIMIT 1 + "# + ) + .instrument("get_last_migration") + .fetch_one(self.storage) + .await?; + + Ok(DatabaseMigration { + version: row.version, + description: row.description, + installed_on: row.installed_on, + success: row.success, + checksum: hex::encode(row.checksum), + execution_time: Duration::from_millis(u64::try_from(row.execution_time).unwrap_or(0)), + }) + } } diff --git a/core/lib/health_check/Cargo.toml b/core/lib/health_check/Cargo.toml index 6f1d863d8cec..0e823c848ce5 100644 --- a/core/lib/health_check/Cargo.toml +++ b/core/lib/health_check/Cargo.toml @@ -20,6 +20,7 @@ serde_json.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["sync", "time"] } tracing.workspace = true +zksync_bin_metadata.workspace = true [dev-dependencies] assert_matches.workspace = true diff --git a/core/lib/health_check/src/binary.rs b/core/lib/health_check/src/binary.rs new file mode 100644 index 000000000000..b14ed2ed9392 --- /dev/null +++ b/core/lib/health_check/src/binary.rs @@ -0,0 +1,21 @@ +use async_trait::async_trait; +use zksync_bin_metadata::BinMetadata; + +use crate::{CheckHealth, Health, HealthStatus}; + +impl From<&BinMetadata> for Health { + fn from(details: &BinMetadata) -> Self { + Self::from(HealthStatus::Ready).with_details(details) + } +} + +#[async_trait] +impl CheckHealth for BinMetadata { + fn name(&self) -> &'static str { + "metadata" + } + + async fn check_health(&self) -> Health { + self.into() + } +} diff --git a/core/lib/health_check/src/lib.rs b/core/lib/health_check/src/lib.rs index e4e8ba3c9a58..7dcdb47aa2f9 100644 --- a/core/lib/health_check/src/lib.rs +++ b/core/lib/health_check/src/lib.rs @@ -11,11 +11,14 @@ pub use async_trait::async_trait; use futures::future; use serde::Serialize; use tokio::sync::watch; +use zksync_bin_metadata::BIN_METADATA; use self::metrics::{CheckResult, METRICS}; use crate::metrics::AppHealthCheckConfig; +mod binary; mod metrics; + #[cfg(test)] mod tests; @@ -235,7 +238,7 @@ impl AppHealthCheck { .map(|health| health.status) .max_by_key(|status| status.priority_for_aggregation()) .unwrap_or(HealthStatus::Ready); - let inner = aggregated_status.into(); + let inner = Health::with_details(aggregated_status.into(), BIN_METADATA); let health = AppHealth { inner, components }; if !health.inner.status.is_healthy() { diff --git a/core/lib/types/src/aggregated_operations.rs b/core/lib/types/src/aggregated_operations.rs index dadfad265cb2..44b730c929a3 100644 --- a/core/lib/types/src/aggregated_operations.rs +++ b/core/lib/types/src/aggregated_operations.rs @@ -1,6 +1,8 @@ use std::{fmt, str::FromStr}; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum AggregatedActionType { Commit, PublishProofOnchain, diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 225a38aee760..479ca1f244cc 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -584,7 +584,6 @@ impl StateKeeperRunner { let stop_recv = stop_recv.clone(); async { ZkSyncStateKeeper::new( - stop_recv, Box::new(io), Box::new(executor_factory), OutputHandler::new(Box::new(persistence.with_tx_insertion())) @@ -592,7 +591,7 @@ impl StateKeeperRunner { Arc::new(NoopSealer), Arc::new(async_cache), ) - .run() + .run(stop_recv) .await .context("ZkSyncStateKeeper::run()")?; Ok(()) @@ -665,7 +664,6 @@ impl StateKeeperRunner { let stop_recv = stop_recv.clone(); async { ZkSyncStateKeeper::new( - stop_recv, Box::new(io), Box::new(MockBatchExecutor), OutputHandler::new(Box::new(persistence.with_tx_insertion())) @@ -674,7 +672,7 @@ impl StateKeeperRunner { Arc::new(NoopSealer), Arc::new(MockReadStorageFactory), ) - .run() + .run(stop_recv) .await .context("ZkSyncStateKeeper::run()")?; Ok(()) diff --git a/core/node/eth_sender/Cargo.toml b/core/node/eth_sender/Cargo.toml index a33536baa986..90b5727d9500 100644 --- a/core/node/eth_sender/Cargo.toml +++ b/core/node/eth_sender/Cargo.toml @@ -11,12 +11,14 @@ keywords.workspace = true categories.workspace = true [dependencies] +serde.workspace = true vise.workspace = true zksync_types.workspace = true zksync_dal.workspace = true zksync_config.workspace = true zksync_contracts.workspace = true zksync_eth_client.workspace = true +zksync_health_check.workspace = true zksync_l1_contract_interface.workspace = true zksync_object_store.workspace = true zksync_prover_interface.workspace = true diff --git a/core/node/eth_sender/src/eth_tx_aggregator.rs b/core/node/eth_sender/src/eth_tx_aggregator.rs index ac9ed4aaaadb..f0d7201d1503 100644 --- a/core/node/eth_sender/src/eth_tx_aggregator.rs +++ b/core/node/eth_sender/src/eth_tx_aggregator.rs @@ -3,6 +3,7 @@ use zksync_config::configs::eth_sender::SenderConfig; use zksync_contracts::BaseSystemContractsHashes; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_eth_client::{BoundEthInterface, CallFunctionArgs}; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_l1_contract_interface::{ i_executor::{ commit::kzg::{KzgInfo, ZK_SYNC_BYTES_PER_BLOB}, @@ -27,6 +28,7 @@ use zksync_types::{ use super::aggregated_operations::AggregatedOperation; use crate::{ + health::{EthTxAggregatorHealthDetails, EthTxDetails}, metrics::{PubdataKind, METRICS}, utils::agg_l1_batch_base_cost, zksync_functions::ZkSyncFunctions, @@ -65,6 +67,7 @@ pub struct EthTxAggregator { pool: ConnectionPool, settlement_mode: SettlementMode, sl_chain_id: SLChainId, + health_updater: HealthUpdater, } struct TxData { @@ -119,10 +122,14 @@ impl EthTxAggregator { pool, settlement_mode, sl_chain_id, + health_updater: ReactiveHealthCheck::new("eth_tx_aggregator").1, } } pub async fn run(mut self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + self.health_updater + .update(Health::from(HealthStatus::Ready)); + let pool = self.pool.clone(); loop { let mut storage = pool.connection_tagged("eth_sender").await.unwrap(); @@ -431,6 +438,13 @@ impl EthTxAggregator { ) .await?; Self::report_eth_tx_saving(storage, &agg_op, &tx).await; + + self.health_updater.update( + EthTxAggregatorHealthDetails { + last_saved_tx: EthTxDetails::new(&tx, None), + } + .into(), + ); } Ok(()) } @@ -670,4 +684,9 @@ impl EthTxAggregator { ) }) } + + /// Returns the health check for eth tx aggregator. + pub fn health_check(&self) -> ReactiveHealthCheck { + self.health_updater.subscribe() + } } diff --git a/core/node/eth_sender/src/eth_tx_manager.rs b/core/node/eth_sender/src/eth_tx_manager.rs index 6992bea1007c..f411e9b3ae4b 100644 --- a/core/node/eth_sender/src/eth_tx_manager.rs +++ b/core/node/eth_sender/src/eth_tx_manager.rs @@ -9,6 +9,7 @@ use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_eth_client::{ encode_blob_tx_with_sidecar, BoundEthInterface, ExecutedTxStatus, RawTransactionBytes, }; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_node_fee_model::l1_gas_price::TxParamsProvider; use zksync_shared_metrics::BlockL1Stage; use zksync_types::{eth_sender::EthTx, Address, L1BlockNumber, H256, U256}; @@ -19,6 +20,7 @@ use crate::{ AbstractL1Interface, L1BlockNumbers, OperatorNonce, OperatorType, RealL1Interface, }, eth_fees_oracle::{EthFees, EthFeesOracle, GasAdjusterFeesOracle}, + health::{EthTxDetails, EthTxManagerHealthDetails}, metrics::TransactionType, }; @@ -33,6 +35,7 @@ pub struct EthTxManager { config: SenderConfig, fees_oracle: Box, pool: ConnectionPool, + health_updater: HealthUpdater, } impl EthTxManager { @@ -67,6 +70,7 @@ impl EthTxManager { config, fees_oracle: Box::new(fees_oracle), pool, + health_updater: ReactiveHealthCheck::new("eth_tx_manager").1, } } @@ -417,6 +421,14 @@ impl EthTxManager { ) { let receipt_block_number = tx_status.receipt.block_number.unwrap().as_u32(); if receipt_block_number <= finalized_block.0 { + self.health_updater.update( + EthTxManagerHealthDetails { + last_mined_tx: EthTxDetails::new(tx, Some((&tx_status).into())), + finalized_block, + } + .into(), + ); + if tx_status.success { self.confirm_tx(storage, tx, tx_status).await; } else { @@ -522,6 +534,9 @@ impl EthTxManager { } pub async fn run(mut self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + self.health_updater + .update(Health::from(HealthStatus::Ready)); + let pool = self.pool.clone(); loop { @@ -682,4 +697,9 @@ impl EthTxManager { } } } + + /// Returns the health check for eth tx manager. + pub fn health_check(&self) -> ReactiveHealthCheck { + self.health_updater.subscribe() + } } diff --git a/core/node/eth_sender/src/health.rs b/core/node/eth_sender/src/health.rs new file mode 100644 index 000000000000..1aff80dae6d2 --- /dev/null +++ b/core/node/eth_sender/src/health.rs @@ -0,0 +1,68 @@ +use serde::{Deserialize, Serialize}; +use zksync_eth_client::ExecutedTxStatus; +use zksync_health_check::{Health, HealthStatus}; +use zksync_types::{ + aggregated_operations::AggregatedActionType, eth_sender::EthTx, web3::TransactionReceipt, + L1BlockNumber, Nonce, H256, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TxStatus { + pub tx_hash: H256, + pub success: bool, + pub receipt: TransactionReceipt, +} + +impl From<&ExecutedTxStatus> for TxStatus { + fn from(status: &ExecutedTxStatus) -> Self { + Self { + tx_hash: status.tx_hash, + success: status.success, + receipt: status.receipt.clone(), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EthTxAggregatorHealthDetails { + pub last_saved_tx: EthTxDetails, +} + +impl From for Health { + fn from(details: EthTxAggregatorHealthDetails) -> Self { + Self::from(HealthStatus::Ready).with_details(details) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EthTxDetails { + pub nonce: Nonce, + pub tx_type: AggregatedActionType, + pub created_at_timestamp: u64, + pub predicted_gas_cost: u64, + pub status: Option, +} + +impl EthTxDetails { + pub fn new(tx: &EthTx, status: Option) -> Self { + Self { + nonce: tx.nonce, + tx_type: tx.tx_type, + created_at_timestamp: tx.created_at_timestamp, + predicted_gas_cost: tx.predicted_gas_cost, + status, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct EthTxManagerHealthDetails { + pub last_mined_tx: EthTxDetails, + pub finalized_block: L1BlockNumber, +} + +impl From for Health { + fn from(details: EthTxManagerHealthDetails) -> Self { + Self::from(HealthStatus::Ready).with_details(details) + } +} diff --git a/core/node/eth_sender/src/lib.rs b/core/node/eth_sender/src/lib.rs index 747ece93b811..fc6076ec640b 100644 --- a/core/node/eth_sender/src/lib.rs +++ b/core/node/eth_sender/src/lib.rs @@ -3,6 +3,7 @@ mod aggregator; mod error; mod eth_tx_aggregator; mod eth_tx_manager; +mod health; mod metrics; mod publish_criterion; mod utils; diff --git a/core/node/house_keeper/src/blocks_state_reporter.rs b/core/node/house_keeper/src/blocks_state_reporter.rs index 6f85aa0fbb09..abd2c6e8802d 100644 --- a/core/node/house_keeper/src/blocks_state_reporter.rs +++ b/core/node/house_keeper/src/blocks_state_reporter.rs @@ -22,7 +22,10 @@ impl L1BatchMetricsReporter { async fn report_metrics(&self) -> anyhow::Result<()> { let mut block_metrics = vec![]; - let mut conn = self.connection_pool.connection().await?; + let mut conn = self + .connection_pool + .connection_tagged("house_keeper") + .await?; let last_l1_batch = conn.blocks_dal().get_sealed_l1_batch_number().await?; if let Some(number) = last_l1_batch { block_metrics.push((number, BlockStage::Sealed)); diff --git a/core/node/node_framework/Cargo.toml b/core/node/node_framework/Cargo.toml index d85f3dc7c8e9..6334495885f3 100644 --- a/core/node/node_framework/Cargo.toml +++ b/core/node/node_framework/Cargo.toml @@ -41,6 +41,7 @@ zksync_vm_executor.workspace = true zksync_state_keeper.workspace = true zksync_consistency_checker.workspace = true zksync_metadata_calculator.workspace = true +zksync_bin_metadata.workspace = true zksync_node_sync.workspace = true zksync_node_api_server.workspace = true zksync_node_consensus.workspace = true @@ -54,6 +55,7 @@ zksync_node_storage_init.workspace = true zksync_external_price_api.workspace = true zksync_external_proof_integration_api.workspace = true zksync_logs_bloom_backfill.workspace = true +zksync_shared_metrics.workspace = true pin-project-lite.workspace = true tracing.workspace = true @@ -61,6 +63,7 @@ thiserror.workspace = true async-trait.workspace = true futures.workspace = true anyhow.workspace = true +serde.workspace = true tokio = { workspace = true, features = ["rt"] } ctrlc.workspace = true semver.workspace = true diff --git a/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs b/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs index 310580aeb3a3..235158544c54 100644 --- a/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs +++ b/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs @@ -9,6 +9,7 @@ use crate::{ implementations::resources::{ circuit_breakers::CircuitBreakersResource, eth_interface::{BoundEthInterfaceForBlobsResource, BoundEthInterfaceResource}, + healthcheck::AppHealthCheckResource, object_store::ObjectStoreResource, pools::{MasterPool, PoolResource, ReplicaPool}, }, @@ -55,6 +56,8 @@ pub struct Input { pub object_store: ObjectStoreResource, #[context(default)] pub circuit_breakers: CircuitBreakersResource, + #[context(default)] + pub app_health: AppHealthCheckResource, } #[derive(Debug, IntoContext)] @@ -133,6 +136,12 @@ impl WiringLayer for EthTxAggregatorLayer { .insert(Box::new(FailedL1TransactionChecker { pool: replica_pool })) .await; + input + .app_health + .0 + .insert_component(eth_tx_aggregator.health_check()) + .map_err(WiringError::internal)?; + Ok(Output { eth_tx_aggregator }) } } diff --git a/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs b/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs index 5462fa575f94..e9ce4cc19e1a 100644 --- a/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs +++ b/core/node/node_framework/src/implementations/layers/eth_sender/manager.rs @@ -8,6 +8,7 @@ use crate::{ circuit_breakers::CircuitBreakersResource, eth_interface::{BoundEthInterfaceForBlobsResource, BoundEthInterfaceResource}, gas_adjuster::GasAdjusterResource, + healthcheck::AppHealthCheckResource, pools::{MasterPool, PoolResource, ReplicaPool}, }, service::StopReceiver, @@ -48,6 +49,8 @@ pub struct Input { pub gas_adjuster: GasAdjusterResource, #[context(default)] pub circuit_breakers: CircuitBreakersResource, + #[context(default)] + pub app_health: AppHealthCheckResource, } #[derive(Debug, IntoContext)] @@ -114,6 +117,12 @@ impl WiringLayer for EthTxManagerLayer { .insert(Box::new(FailedL1TransactionChecker { pool: replica_pool })) .await; + input + .app_health + .0 + .insert_component(eth_tx_manager.health_check()) + .map_err(WiringError::internal)?; + Ok(Output { eth_tx_manager }) } } diff --git a/core/node/node_framework/src/implementations/layers/house_keeper.rs b/core/node/node_framework/src/implementations/layers/house_keeper.rs index 1e2bc568d50f..af59a73554ac 100644 --- a/core/node/node_framework/src/implementations/layers/house_keeper.rs +++ b/core/node/node_framework/src/implementations/layers/house_keeper.rs @@ -56,7 +56,7 @@ impl WiringLayer for HouseKeeperLayer { let l1_batch_metrics_reporter = L1BatchMetricsReporter::new( self.house_keeper_config .l1_batch_metrics_reporting_interval_ms, - replica_pool.clone(), + replica_pool, ); Ok(Output { diff --git a/core/node/node_framework/src/implementations/layers/mod.rs b/core/node/node_framework/src/implementations/layers/mod.rs index 11a62c9333b2..28a6f65600ab 100644 --- a/core/node/node_framework/src/implementations/layers/mod.rs +++ b/core/node/node_framework/src/implementations/layers/mod.rs @@ -24,7 +24,7 @@ pub mod node_storage_init; pub mod object_store; pub mod pk_signing_eth_client; pub mod pools_layer; -pub mod postgres_metrics; +pub mod postgres; pub mod prometheus_exporter; pub mod proof_data_handler; pub mod pruning; diff --git a/core/node/node_framework/src/implementations/layers/postgres.rs b/core/node/node_framework/src/implementations/layers/postgres.rs new file mode 100644 index 000000000000..8a81b8709895 --- /dev/null +++ b/core/node/node_framework/src/implementations/layers/postgres.rs @@ -0,0 +1,172 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tokio::sync::watch; +use zksync_dal::{ + metrics::PostgresMetrics, system_dal::DatabaseMigration, ConnectionPool, Core, CoreDal, +}; +use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; + +use crate::{ + implementations::resources::{ + healthcheck::AppHealthCheckResource, + pools::{PoolResource, ReplicaPool}, + }, + service::StopReceiver, + task::{Task, TaskId, TaskKind}, + wiring_layer::{WiringError, WiringLayer}, + FromContext, IntoContext, +}; + +/// Execution interval for Postrgres metrics and healthcheck tasks +const TASK_EXECUTION_INTERVAL: Duration = Duration::from_secs(60); + +/// Wiring layer for the Postgres metrics exporter and healthcheck. +#[derive(Debug)] +pub struct PostgresLayer; + +#[derive(Debug, FromContext)] +#[context(crate = crate)] +pub struct Input { + pub replica_pool: PoolResource, + #[context(default)] + pub app_health: AppHealthCheckResource, +} + +#[derive(Debug, IntoContext)] +#[context(crate = crate)] +pub struct Output { + #[context(task)] + pub metrics_task: PostgresMetricsScrapingTask, + #[context(task)] + pub health_task: DatabaseHealthTask, +} + +#[async_trait::async_trait] +impl WiringLayer for PostgresLayer { + type Input = Input; + type Output = Output; + + fn layer_name(&self) -> &'static str { + "postgres_layer" + } + + async fn wire(self, input: Self::Input) -> Result { + let pool = input.replica_pool.get().await?; + let metrics_task = PostgresMetricsScrapingTask { + pool_for_metrics: pool.clone(), + }; + + let app_health = input.app_health.0; + let health_task = DatabaseHealthTask::new(pool); + + app_health + .insert_component(health_task.health_check()) + .map_err(WiringError::internal)?; + + Ok(Output { + metrics_task, + health_task, + }) + } +} + +#[derive(Debug)] +pub struct PostgresMetricsScrapingTask { + pool_for_metrics: ConnectionPool, +} + +#[async_trait::async_trait] +impl Task for PostgresMetricsScrapingTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedTask + } + + fn id(&self) -> TaskId { + "postgres_metrics_scraping".into() + } + + async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { + tokio::select! { + () = PostgresMetrics::run_scraping(self.pool_for_metrics, TASK_EXECUTION_INTERVAL) => { + tracing::warn!("Postgres metrics scraping unexpectedly stopped"); + } + _ = stop_receiver.0.changed() => { + tracing::info!("Stop signal received, Postgres metrics scraping is shutting down"); + } + } + Ok(()) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct DatabaseInfo { + last_migration: DatabaseMigration, +} + +impl From for Health { + fn from(details: DatabaseInfo) -> Self { + Self::from(HealthStatus::Ready).with_details(details) + } +} + +#[derive(Debug)] +pub struct DatabaseHealthTask { + polling_interval: Duration, + connection_pool: ConnectionPool, + updater: HealthUpdater, +} + +impl DatabaseHealthTask { + fn new(connection_pool: ConnectionPool) -> Self { + Self { + polling_interval: TASK_EXECUTION_INTERVAL, + connection_pool, + updater: ReactiveHealthCheck::new("database").1, + } + } + + async fn run(self, mut stop_receiver: watch::Receiver) -> anyhow::Result<()> + where + Self: Sized, + { + let timeout = self.polling_interval; + let mut conn = self + .connection_pool + .connection_tagged("postgres_healthcheck") + .await?; + + tracing::info!("Starting database healthcheck with frequency: {timeout:?}",); + + while !*stop_receiver.borrow_and_update() { + let last_migration = conn.system_dal().get_last_migration().await?; + self.updater.update(DatabaseInfo { last_migration }.into()); + + // Error here corresponds to a timeout w/o `stop_receiver` changed; we're OK with this. + tokio::time::timeout(timeout, stop_receiver.changed()) + .await + .ok(); + } + tracing::info!("Stop signal received; database healthcheck is shut down"); + Ok(()) + } + + pub fn health_check(&self) -> ReactiveHealthCheck { + self.updater.subscribe() + } +} + +#[async_trait::async_trait] +impl Task for DatabaseHealthTask { + fn kind(&self) -> TaskKind { + TaskKind::UnconstrainedTask + } + + fn id(&self) -> TaskId { + "database_health".into() + } + + async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { + (*self).run(stop_receiver.0).await + } +} diff --git a/core/node/node_framework/src/implementations/layers/postgres_metrics.rs b/core/node/node_framework/src/implementations/layers/postgres_metrics.rs deleted file mode 100644 index 238bee578678..000000000000 --- a/core/node/node_framework/src/implementations/layers/postgres_metrics.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::time::Duration; - -use zksync_dal::{metrics::PostgresMetrics, ConnectionPool, Core}; - -use crate::{ - implementations::resources::pools::{PoolResource, ReplicaPool}, - service::StopReceiver, - task::{Task, TaskId, TaskKind}, - wiring_layer::{WiringError, WiringLayer}, - FromContext, IntoContext, -}; - -const SCRAPE_INTERVAL: Duration = Duration::from_secs(60); - -/// Wiring layer for the Postgres metrics exporter. -#[derive(Debug)] -pub struct PostgresMetricsLayer; - -#[derive(Debug, FromContext)] -#[context(crate = crate)] -pub struct Input { - pub replica_pool: PoolResource, -} - -#[derive(Debug, IntoContext)] -#[context(crate = crate)] -pub struct Output { - #[context(task)] - pub task: PostgresMetricsScrapingTask, -} - -#[async_trait::async_trait] -impl WiringLayer for PostgresMetricsLayer { - type Input = Input; - type Output = Output; - - fn layer_name(&self) -> &'static str { - "postgres_metrics_layer" - } - - async fn wire(self, input: Self::Input) -> Result { - let pool_for_metrics = input.replica_pool.get_singleton().await?; - let task = PostgresMetricsScrapingTask { pool_for_metrics }; - - Ok(Output { task }) - } -} - -#[derive(Debug)] -pub struct PostgresMetricsScrapingTask { - pool_for_metrics: ConnectionPool, -} - -#[async_trait::async_trait] -impl Task for PostgresMetricsScrapingTask { - fn kind(&self) -> TaskKind { - TaskKind::UnconstrainedTask - } - - fn id(&self) -> TaskId { - "postgres_metrics_scraping".into() - } - - async fn run(self: Box, mut stop_receiver: StopReceiver) -> anyhow::Result<()> { - tokio::select! { - () = PostgresMetrics::run_scraping(self.pool_for_metrics, SCRAPE_INTERVAL) => { - tracing::warn!("Postgres metrics scraping unexpectedly stopped"); - } - _ = stop_receiver.0.changed() => { - tracing::info!("Stop signal received, Postgres metrics scraping is shutting down"); - } - } - Ok(()) - } -} diff --git a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs index 55defd095be8..6f21a321eb1a 100644 --- a/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs +++ b/core/node/node_framework/src/implementations/layers/state_keeper/mod.rs @@ -1,17 +1,15 @@ use std::sync::Arc; use anyhow::Context; +use zksync_health_check::ReactiveHealthCheck; +use zksync_state::AsyncCatchupTask; pub use zksync_state::RocksdbStorageOptions; -use zksync_state::{AsyncCatchupTask, OwnedStorage, ReadStorageFactory}; -use zksync_state_keeper::{ - seal_criteria::ConditionalSealer, AsyncRocksdbCache, OutputHandler, StateKeeperIO, - ZkSyncStateKeeper, -}; +use zksync_state_keeper::{AsyncRocksdbCache, ZkSyncStateKeeper}; use zksync_storage::RocksDB; -use zksync_vm_executor::interface::BatchExecutorFactory; use crate::{ implementations::resources::{ + healthcheck::AppHealthCheckResource, pools::{MasterPool, PoolResource}, state_keeper::{ BatchExecutorResource, ConditionalSealerResource, OutputHandlerResource, @@ -44,6 +42,8 @@ pub struct Input { pub output_handler: OutputHandlerResource, pub conditional_sealer: ConditionalSealerResource, pub master_pool: PoolResource, + #[context(default)] + pub app_health: AppHealthCheckResource, } #[derive(Debug, IntoContext)] @@ -99,13 +99,21 @@ impl WiringLayer for StateKeeperLayer { self.rocksdb_options, ); - let state_keeper = StateKeeperTask { + let state_keeper = ZkSyncStateKeeper::new( io, - executor_factory: batch_executor_base, + batch_executor_base, output_handler, sealer, - storage_factory: Arc::new(storage_factory), - }; + Arc::new(storage_factory), + ); + + let state_keeper = StateKeeperTask { state_keeper }; + + input + .app_health + .0 + .insert_component(state_keeper.health_check()) + .map_err(WiringError::internal)?; let rocksdb_termination_hook = ShutdownHook::new("rocksdb_terminaton", async { // Wait for all the instances of RocksDB to be destroyed. @@ -123,11 +131,14 @@ impl WiringLayer for StateKeeperLayer { #[derive(Debug)] pub struct StateKeeperTask { - io: Box, - executor_factory: Box>, - output_handler: OutputHandler, - sealer: Arc, - storage_factory: Arc, + state_keeper: ZkSyncStateKeeper, +} + +impl StateKeeperTask { + /// Returns the health check for state keeper. + pub fn health_check(&self) -> ReactiveHealthCheck { + self.state_keeper.health_check() + } } #[async_trait::async_trait] @@ -137,15 +148,7 @@ impl Task for StateKeeperTask { } async fn run(self: Box, stop_receiver: StopReceiver) -> anyhow::Result<()> { - let state_keeper = ZkSyncStateKeeper::new( - stop_receiver.0, - self.io, - self.executor_factory, - self.output_handler, - self.sealer, - self.storage_factory, - ); - state_keeper.run().await + self.state_keeper.run(stop_receiver.0).await } } diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 172a00e8c14c..21058144f778 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -132,7 +132,6 @@ impl StateKeeperHandles { } let state_keeper = ZkSyncStateKeeper::new( - stop_receiver, Box::new(io), Box::new(batch_executor), output_handler, @@ -143,7 +142,7 @@ impl StateKeeperHandles { Self { stop_sender, sync_state, - task: tokio::spawn(state_keeper.run()), + task: tokio::spawn(state_keeper.run(stop_receiver)), } } diff --git a/core/node/shared_metrics/Cargo.toml b/core/node/shared_metrics/Cargo.toml index f30a2ba35334..23c669b4f963 100644 --- a/core/node/shared_metrics/Cargo.toml +++ b/core/node/shared_metrics/Cargo.toml @@ -11,10 +11,9 @@ keywords.workspace = true categories.workspace = true [dependencies] +serde.workspace = true vise.workspace = true tracing.workspace = true zksync_types.workspace = true zksync_dal.workspace = true - -[build-dependencies] -rustc_version.workspace = true +zksync_bin_metadata.workspace = true diff --git a/core/node/shared_metrics/build.rs b/core/node/shared_metrics/build.rs deleted file mode 100644 index d37fef0b1b0c..000000000000 --- a/core/node/shared_metrics/build.rs +++ /dev/null @@ -1,46 +0,0 @@ -//! Build script for the external node binary. - -use std::{ - env, fs, - io::{self, Write}, - path::Path, -}; - -use rustc_version::{Channel, LlvmVersion}; - -fn print_rust_meta(out: &mut impl Write, meta: &rustc_version::VersionMeta) -> io::Result<()> { - writeln!( - out, - "pub(crate) const RUSTC_METADATA: RustcMetadata = RustcMetadata {{ \ - version: {semver:?}, \ - commit_hash: {commit_hash:?}, \ - commit_date: {commit_date:?}, \ - channel: {channel:?}, \ - host: {host:?}, \ - llvm: {llvm:?} \ - }};", - semver = meta.semver.to_string(), - commit_hash = meta.commit_hash, - commit_date = meta.commit_date, - channel = match meta.channel { - Channel::Dev => "dev", - Channel::Beta => "beta", - Channel::Nightly => "nightly", - Channel::Stable => "stable", - }, - host = meta.host, - llvm = meta.llvm_version.as_ref().map(LlvmVersion::to_string), - ) -} - -fn main() { - let out_dir = env::var("OUT_DIR").expect("`OUT_DIR` env var not set for build script"); - let rustc_meta = rustc_version::version_meta().expect("Failed obtaining rustc metadata"); - - let metadata_module_path = Path::new(&out_dir).join("metadata_values.rs"); - let metadata_module = - fs::File::create(metadata_module_path).expect("cannot create metadata module"); - let mut metadata_module = io::BufWriter::new(metadata_module); - - print_rust_meta(&mut metadata_module, &rustc_meta).expect("failed printing rustc metadata"); -} diff --git a/core/node/shared_metrics/src/lib.rs b/core/node/shared_metrics/src/lib.rs index 2c41ec9293a0..e37764c5a6d7 100644 --- a/core/node/shared_metrics/src/lib.rs +++ b/core/node/shared_metrics/src/lib.rs @@ -5,11 +5,10 @@ use std::{fmt, time::Duration}; use vise::{ Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit, }; +use zksync_bin_metadata::{GitMetrics, RustMetrics}; use zksync_dal::transactions_dal::L2TxSubmissionResult; use zksync_types::aggregated_operations::AggregatedActionType; -pub mod rustc; - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "stage", rename_all = "snake_case")] pub enum SnapshotRecoveryStage { @@ -196,3 +195,9 @@ pub struct ExternalNodeMetrics { #[vise::register] pub static EN_METRICS: vise::Global = vise::Global::new(); + +#[vise::register] +pub static RUST_METRICS: vise::Global = vise::Global::new(); + +#[vise::register] +pub static GIT_METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/shared_metrics/src/rustc.rs b/core/node/shared_metrics/src/rustc.rs deleted file mode 100644 index 11165dbf51b0..000000000000 --- a/core/node/shared_metrics/src/rustc.rs +++ /dev/null @@ -1,36 +0,0 @@ -use vise::{EncodeLabelSet, Info, Metrics}; - -mod values { - use super::RustcMetadata; - include!(concat!(env!("OUT_DIR"), "/metadata_values.rs")); -} - -use values::RUSTC_METADATA; - -/// Metadata of Rust compiler used to compile the crate. -#[derive(Debug, EncodeLabelSet)] -pub struct RustcMetadata { - pub version: &'static str, - pub commit_hash: Option<&'static str>, - pub commit_date: Option<&'static str>, - pub channel: &'static str, - pub host: &'static str, - pub llvm: Option<&'static str>, -} - -#[derive(Debug, Metrics)] -#[metrics(prefix = "rust")] -pub struct RustMetrics { - /// General information about the Rust compiler. - info: Info, -} - -impl RustMetrics { - pub fn initialize(&self) { - tracing::info!("Metadata for rustc that this binary was compiled with: {RUSTC_METADATA:?}"); - self.info.set(RUSTC_METADATA).ok(); - } -} - -#[vise::register] -pub static RUST_METRICS: vise::Global = vise::Global::new(); diff --git a/core/node/state_keeper/Cargo.toml b/core/node/state_keeper/Cargo.toml index 49d4209a4c4f..979a11dcbb4a 100644 --- a/core/node/state_keeper/Cargo.toml +++ b/core/node/state_keeper/Cargo.toml @@ -16,6 +16,7 @@ vise.workspace = true zksync_multivm.workspace = true zksync_types.workspace = true zksync_dal.workspace = true +zksync_health_check.workspace = true zksync_state.workspace = true zksync_storage.workspace = true zksync_mempool.workspace = true @@ -38,6 +39,7 @@ tracing.workspace = true futures.workspace = true once_cell.workspace = true itertools.workspace = true +serde.workspace = true hex.workspace = true [dev-dependencies] diff --git a/core/node/state_keeper/src/health.rs b/core/node/state_keeper/src/health.rs new file mode 100644 index 000000000000..4fc86263e439 --- /dev/null +++ b/core/node/state_keeper/src/health.rs @@ -0,0 +1,30 @@ +use serde::{Deserialize, Serialize}; +use zksync_health_check::{Health, HealthStatus}; +use zksync_types::{L1BatchNumber, L2BlockNumber, H256}; + +use crate::io::IoCursor; + +#[derive(Debug, Serialize, Deserialize)] +pub struct StateKeeperHealthDetails { + pub next_l2_block: L2BlockNumber, + pub prev_l2_block_hash: H256, + pub prev_l2_block_timestamp: u64, + pub l1_batch: L1BatchNumber, +} + +impl From for Health { + fn from(details: StateKeeperHealthDetails) -> Self { + Self::from(HealthStatus::Ready).with_details(details) + } +} + +impl From<&IoCursor> for StateKeeperHealthDetails { + fn from(details: &IoCursor) -> Self { + Self { + next_l2_block: details.next_l2_block, + prev_l2_block_hash: details.prev_l2_block_hash, + prev_l2_block_timestamp: details.prev_l2_block_timestamp, + l1_batch: details.l1_batch, + } + } +} diff --git a/core/node/state_keeper/src/keeper.rs b/core/node/state_keeper/src/keeper.rs index fe37ee8d8dd6..7fcc53ad2d26 100644 --- a/core/node/state_keeper/src/keeper.rs +++ b/core/node/state_keeper/src/keeper.rs @@ -7,6 +7,7 @@ use std::{ use anyhow::Context as _; use tokio::sync::watch; use tracing::{info_span, Instrument}; +use zksync_health_check::{HealthUpdater, ReactiveHealthCheck}; use zksync_multivm::{ interface::{ executor::{BatchExecutor, BatchExecutorFactory}, @@ -24,12 +25,13 @@ use zksync_types::{ use crate::{ executor::TxExecutionResult, + health::StateKeeperHealthDetails, io::{IoCursor, L1BatchParams, L2BlockParams, OutputHandler, PendingBatchData, StateKeeperIO}, metrics::{AGGREGATION_METRICS, KEEPER_METRICS, L1_BATCH_METRICS}, seal_criteria::{ConditionalSealer, SealData, SealResolution, UnexecutableReason}, types::ExecutionMetricsForCriteria, updates::UpdatesManager, - utils::gas_count_from_writes, + utils::{gas_count_from_writes, is_canceled}, }; /// Amount of time to block on waiting for some resource. The exact value is not really important, @@ -65,17 +67,16 @@ impl Error { /// a sequence of executed L2 blocks and batches. #[derive(Debug)] pub struct ZkSyncStateKeeper { - stop_receiver: watch::Receiver, io: Box, output_handler: OutputHandler, batch_executor: Box>, sealer: Arc, storage_factory: Arc, + health_updater: HealthUpdater, } impl ZkSyncStateKeeper { pub fn new( - stop_receiver: watch::Receiver, sequencer: Box, batch_executor: Box>, output_handler: OutputHandler, @@ -83,17 +84,17 @@ impl ZkSyncStateKeeper { storage_factory: Arc, ) -> Self { Self { - stop_receiver, io: sequencer, batch_executor, output_handler, sealer, storage_factory, + health_updater: ReactiveHealthCheck::new("state_keeper").1, } } - pub async fn run(mut self) -> anyhow::Result<()> { - match self.run_inner().await { + pub async fn run(mut self, stop_receiver: watch::Receiver) -> anyhow::Result<()> { + match self.run_inner(stop_receiver).await { Ok(_) => unreachable!(), Err(Error::Fatal(err)) => Err(err).context("state_keeper failed"), Err(Error::Canceled) => { @@ -104,9 +105,14 @@ impl ZkSyncStateKeeper { } /// Fallible version of `run` routine that allows to easily exit upon cancellation. - async fn run_inner(&mut self) -> Result { + async fn run_inner( + &mut self, + mut stop_receiver: watch::Receiver, + ) -> Result { let (cursor, pending_batch_params) = self.io.initialize().await?; self.output_handler.initialize(&cursor).await?; + self.health_updater + .update(StateKeeperHealthDetails::from(&cursor).into()); tracing::info!( "Starting state keeper. Next l1 batch to seal: {}, next L2 block to seal: {}", cursor.l1_batch, @@ -135,7 +141,7 @@ impl ZkSyncStateKeeper { None => { tracing::info!("There is no open pending batch, starting a new empty batch"); let (system_env, l1_batch_env, pubdata_params) = self - .wait_for_new_batch_env(&cursor) + .wait_for_new_batch_env(&cursor, &mut stop_receiver) .await .map_err(|e| e.context("wait_for_new_batch_params()"))?; PendingBatchData { @@ -154,22 +160,29 @@ impl ZkSyncStateKeeper { .await?; let mut batch_executor = self - .create_batch_executor(l1_batch_env.clone(), system_env.clone(), pubdata_params) + .create_batch_executor( + l1_batch_env.clone(), + system_env.clone(), + pubdata_params, + &stop_receiver, + ) .await?; self.restore_state( &mut *batch_executor, &mut updates_manager, pending_l2_blocks, + &stop_receiver, ) .await?; let mut l1_batch_seal_delta: Option = None; - while !self.is_canceled() { + while !is_canceled(&stop_receiver) { // This function will run until the batch can be sealed. self.process_l1_batch( &mut *batch_executor, &mut updates_manager, protocol_upgrade_tx, + &stop_receiver, ) .await?; @@ -178,8 +191,9 @@ impl ZkSyncStateKeeper { self.seal_l2_block(&updates_manager).await?; // We've sealed the L2 block that we had, but we still need to set up the timestamp // for the fictive L2 block. - let new_l2_block_params = - self.wait_for_new_l2_block_params(&updates_manager).await?; + let new_l2_block_params = self + .wait_for_new_l2_block_params(&updates_manager, &stop_receiver) + .await?; Self::start_next_l2_block( new_l2_block_params, &mut updates_manager, @@ -204,11 +218,17 @@ impl ZkSyncStateKeeper { // Start the new batch. next_cursor.l1_batch += 1; - (system_env, l1_batch_env, pubdata_params) = - self.wait_for_new_batch_env(&next_cursor).await?; + (system_env, l1_batch_env, pubdata_params) = self + .wait_for_new_batch_env(&next_cursor, &mut stop_receiver) + .await?; updates_manager = UpdatesManager::new(&l1_batch_env, &system_env, pubdata_params); batch_executor = self - .create_batch_executor(l1_batch_env.clone(), system_env.clone(), pubdata_params) + .create_batch_executor( + l1_batch_env.clone(), + system_env.clone(), + pubdata_params, + &stop_receiver, + ) .await?; let version_changed = system_env.version != sealed_batch_protocol_version; @@ -226,10 +246,11 @@ impl ZkSyncStateKeeper { l1_batch_env: L1BatchEnv, system_env: SystemEnv, pubdata_params: PubdataParams, + stop_receiver: &watch::Receiver, ) -> Result>, Error> { let storage = self .storage_factory - .access_storage(&self.stop_receiver, l1_batch_env.number - 1) + .access_storage(stop_receiver, l1_batch_env.number - 1) .await .context("failed creating VM storage")? .ok_or(Error::Canceled)?; @@ -287,10 +308,6 @@ impl ZkSyncStateKeeper { Ok(protocol_upgrade_tx) } - fn is_canceled(&self) -> bool { - *self.stop_receiver.borrow() - } - async fn load_upgrade_tx( &mut self, protocol_version: ProtocolVersionId, @@ -310,8 +327,9 @@ impl ZkSyncStateKeeper { async fn wait_for_new_batch_params( &mut self, cursor: &IoCursor, + stop_receiver: &watch::Receiver, ) -> Result { - while !self.is_canceled() { + while !is_canceled(stop_receiver) { if let Some(params) = self .io .wait_for_new_batch_params(cursor, POLL_WAIT_DURATION) @@ -332,10 +350,13 @@ impl ZkSyncStateKeeper { async fn wait_for_new_batch_env( &mut self, cursor: &IoCursor, + stop_receiver: &mut watch::Receiver, ) -> Result<(SystemEnv, L1BatchEnv, PubdataParams), Error> { // `io.wait_for_new_batch_params(..)` is not cancel-safe; once we get new batch params, we must hold onto them // until we get the rest of parameters from I/O or receive a stop signal. - let params = self.wait_for_new_batch_params(cursor).await?; + let params = self + .wait_for_new_batch_params(cursor, stop_receiver) + .await?; let contracts = self .io .load_base_system_contracts(params.protocol_version, cursor) @@ -353,7 +374,7 @@ impl ZkSyncStateKeeper { let previous_batch_hash = hash_result.context("cannot load state hash for previous L1 batch")?; Ok(params.into_env(self.io.chain_id(), contracts, cursor, previous_batch_hash)) } - _ = self.stop_receiver.changed() => Err(Error::Canceled), + _ = stop_receiver.changed() => Err(Error::Canceled), } } @@ -367,16 +388,20 @@ impl ZkSyncStateKeeper { async fn wait_for_new_l2_block_params( &mut self, updates: &UpdatesManager, + stop_receiver: &watch::Receiver, ) -> Result { let latency = KEEPER_METRICS.wait_for_l2_block_params.start(); let cursor = updates.io_cursor(); - while !self.is_canceled() { + while !is_canceled(stop_receiver) { if let Some(params) = self .io .wait_for_new_l2_block_params(&cursor, POLL_WAIT_DURATION) .await .context("error waiting for new L2 block params")? { + self.health_updater + .update(StateKeeperHealthDetails::from(&cursor).into()); + latency.observe(); return Ok(params); } @@ -439,6 +464,7 @@ impl ZkSyncStateKeeper { batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, l2_blocks_to_reexecute: Vec, + stop_receiver: &watch::Receiver, ) -> Result<(), Error> { if l2_blocks_to_reexecute.is_empty() { return Ok(()); @@ -530,7 +556,7 @@ impl ZkSyncStateKeeper { // We've processed all the L2 blocks, and right now we're initializing the next *actual* L2 block. let new_l2_block_params = self - .wait_for_new_l2_block_params(updates_manager) + .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await .map_err(|e| e.context("wait_for_new_l2_block_params"))?; Self::start_next_l2_block(new_l2_block_params, updates_manager, batch_executor).await?; @@ -547,13 +573,14 @@ impl ZkSyncStateKeeper { batch_executor: &mut dyn BatchExecutor, updates_manager: &mut UpdatesManager, protocol_upgrade_tx: Option, + stop_receiver: &watch::Receiver, ) -> Result<(), Error> { if let Some(protocol_upgrade_tx) = protocol_upgrade_tx { self.process_upgrade_tx(batch_executor, updates_manager, protocol_upgrade_tx) .await?; } - while !self.is_canceled() { + while !is_canceled(stop_receiver) { let full_latency = KEEPER_METRICS.process_l1_batch_loop_iteration.start(); if self @@ -576,7 +603,7 @@ impl ZkSyncStateKeeper { self.seal_l2_block(updates_manager).await?; let new_l2_block_params = self - .wait_for_new_l2_block_params(updates_manager) + .wait_for_new_l2_block_params(updates_manager, stop_receiver) .await .map_err(|e| e.context("wait_for_new_l2_block_params"))?; tracing::debug!( @@ -874,4 +901,9 @@ impl ZkSyncStateKeeper { latency.observe(); Ok((resolution, exec_result)) } + + /// Returns the health check for state keeper. + pub fn health_check(&self) -> ReactiveHealthCheck { + self.health_updater.subscribe() + } } diff --git a/core/node/state_keeper/src/lib.rs b/core/node/state_keeper/src/lib.rs index c12e4163fdd4..323580ab0434 100644 --- a/core/node/state_keeper/src/lib.rs +++ b/core/node/state_keeper/src/lib.rs @@ -12,6 +12,7 @@ pub use self::{ }; pub mod executor; +mod health; pub mod io; mod keeper; mod mempool_actor; diff --git a/core/node/state_keeper/src/testonly/test_batch_executor.rs b/core/node/state_keeper/src/testonly/test_batch_executor.rs index 5625add021bf..9a675c7e97e8 100644 --- a/core/node/state_keeper/src/testonly/test_batch_executor.rs +++ b/core/node/state_keeper/src/testonly/test_batch_executor.rs @@ -204,14 +204,13 @@ impl TestScenario { let (stop_sender, stop_receiver) = watch::channel(false); let (io, output_handler) = TestIO::new(stop_sender, self); let state_keeper = ZkSyncStateKeeper::new( - stop_receiver, Box::new(io), Box::new(batch_executor), output_handler, Arc::new(sealer), Arc::new(MockReadStorageFactory), ); - let sk_thread = tokio::spawn(state_keeper.run()); + let sk_thread = tokio::spawn(state_keeper.run(stop_receiver)); // We must assume that *theoretically* state keeper may ignore the stop signal from IO once scenario is // completed, so we spawn it in a separate thread to not get test stuck. diff --git a/core/node/state_keeper/src/tests/mod.rs b/core/node/state_keeper/src/tests/mod.rs index ca078354c896..d47f8f45df77 100644 --- a/core/node/state_keeper/src/tests/mod.rs +++ b/core/node/state_keeper/src/tests/mod.rs @@ -417,14 +417,13 @@ async fn load_upgrade_tx() { let sealer = SequencerSealer::default(); let scenario = TestScenario::new(); let batch_executor = TestBatchExecutorBuilder::new(&scenario); - let (stop_sender, stop_receiver) = watch::channel(false); + let (stop_sender, _stop_receiver) = watch::channel(false); let (mut io, output_handler) = TestIO::new(stop_sender, scenario); io.add_upgrade_tx(ProtocolVersionId::latest(), random_upgrade_tx(1)); io.add_upgrade_tx(ProtocolVersionId::next(), random_upgrade_tx(2)); let mut sk = ZkSyncStateKeeper::new( - stop_receiver, Box::new(io), Box::new(batch_executor), output_handler, diff --git a/core/node/state_keeper/src/utils.rs b/core/node/state_keeper/src/utils.rs index 320dd49583ed..af4616fe11dc 100644 --- a/core/node/state_keeper/src/utils.rs +++ b/core/node/state_keeper/src/utils.rs @@ -1,5 +1,6 @@ use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::sync::watch; use zksync_multivm::interface::{DeduplicatedWritesMetrics, VmExecutionMetrics}; use zksync_types::{ aggregated_operations::AggregatedActionType, block::BlockGasCount, ExecuteTransactionCommon, @@ -89,6 +90,10 @@ pub(super) fn gas_count_from_writes( } } +pub(super) fn is_canceled(stop_receiver: &watch::Receiver) -> bool { + *stop_receiver.borrow() +} + // TODO (SMA-1206): use seconds instead of milliseconds. pub(super) fn millis_since_epoch() -> u128 { SystemTime::now()