From 9ab72005f0d349d607cd28a704893d8fdd4423bd Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 16 Sep 2024 11:27:23 +0300 Subject: [PATCH] refactor(vm-runner): Improve VM runner / VM playground (#2840) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Various minor improvements to VM runner / VM playground: - Get batch storage asynchronously, so that it works efficiently with snapshot storage. - Add metrics / logs to ensure that snapshot storage works as expected. ## Why ❔ Improves usability. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- core/lib/state/src/lib.rs | 1 + core/lib/state/src/storage_factory/metrics.rs | 37 ++++ .../mod.rs} | 184 +++++++----------- .../storage_factory/rocksdb_with_memory.rs | 75 +++++++ .../lib/state/src/storage_factory/snapshot.rs | 49 +++++ core/lib/vm_executor/src/batch/factory.rs | 39 +++- core/lib/vm_executor/src/oneshot/metrics.rs | 77 +------- core/lib/vm_executor/src/oneshot/mod.rs | 2 +- core/lib/vm_executor/src/shared.rs | 81 +++++++- core/lib/vm_interface/src/storage/mod.rs | 2 +- core/lib/vm_interface/src/storage/view.rs | 71 ++++--- core/node/vm_runner/src/impls/bwip.rs | 8 +- core/node/vm_runner/src/impls/playground.rs | 7 +- .../vm_runner/src/impls/protective_reads.rs | 6 +- core/node/vm_runner/src/io.rs | 51 ++++- core/node/vm_runner/src/metrics.rs | 26 ++- core/node/vm_runner/src/output_handler.rs | 6 +- core/node/vm_runner/src/process.rs | 108 +++++----- core/node/vm_runner/src/storage.rs | 8 +- core/node/vm_runner/src/tests/mod.rs | 4 +- core/node/vm_runner/src/tests/process.rs | 4 +- .../vm_runner/src/tests/storage_writer.rs | 10 +- 22 files changed, 552 insertions(+), 304 deletions(-) create mode 100644 core/lib/state/src/storage_factory/metrics.rs rename core/lib/state/src/{storage_factory.rs => storage_factory/mod.rs} (80%) create mode 100644 core/lib/state/src/storage_factory/rocksdb_with_memory.rs create mode 100644 core/lib/state/src/storage_factory/snapshot.rs diff --git a/core/lib/state/src/lib.rs b/core/lib/state/src/lib.rs index 205579552..fa0659935 100644 --- a/core/lib/state/src/lib.rs +++ b/core/lib/state/src/lib.rs @@ -21,6 +21,7 @@ pub use self::{ shadow_storage::ShadowStorage, storage_factory::{ BatchDiff, CommonStorage, OwnedStorage, ReadStorageFactory, RocksdbWithMemory, + SnapshotStorage, }, }; diff --git a/core/lib/state/src/storage_factory/metrics.rs b/core/lib/state/src/storage_factory/metrics.rs new file mode 100644 index 000000000..822db9082 --- /dev/null +++ b/core/lib/state/src/storage_factory/metrics.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics, Unit}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "stage", rename_all = "snake_case")] +pub(super) enum SnapshotStage { + BatchHeader, + ProtectiveReads, + TouchedSlots, + PreviousValues, + InitialWrites, + Bytecodes, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "kind", rename_all = "snake_case")] +pub(super) enum AccessKind { + ReadValue, + IsWriteInitial, + LoadFactoryDep, + GetEnumerationIndex, +} + +#[derive(Debug, Metrics)] +#[metrics(prefix = "state_snapshot")] +pub(super) struct SnapshotMetrics { + /// Latency of loading a batch snapshot split by stage. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub load_latency: Family>, + /// Latency of accessing the fallback storage for a batch snapshot. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub fallback_access_latency: Family>, +} + +#[vise::register] +pub(super) static SNAPSHOT_METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/state/src/storage_factory.rs b/core/lib/state/src/storage_factory/mod.rs similarity index 80% rename from core/lib/state/src/storage_factory.rs rename to core/lib/state/src/storage_factory/mod.rs index 2ef9b249a..0b514f8f9 100644 --- a/core/lib/state/src/storage_factory.rs +++ b/core/lib/state/src/storage_factory/mod.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - fmt::Debug, -}; +use std::{collections::HashSet, fmt}; use anyhow::Context as _; use async_trait::async_trait; @@ -10,64 +7,18 @@ use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; use zksync_storage::RocksDB; use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256}; use zksync_utils::u256_to_h256; -use zksync_vm_interface::storage::{ReadStorage, StorageSnapshot, StorageWithSnapshot}; +use zksync_vm_interface::storage::{ReadStorage, StorageSnapshot}; +use self::metrics::{SnapshotStage, SNAPSHOT_METRICS}; +pub use self::{ + rocksdb_with_memory::{BatchDiff, RocksdbWithMemory}, + snapshot::SnapshotStorage, +}; use crate::{PostgresStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily}; -/// Storage with a static lifetime that can be sent to Tokio tasks etc. -pub type OwnedStorage = CommonStorage<'static>; - -/// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param -/// (mostly for testing purposes); the default is [`OwnedStorage`]. -#[async_trait] -pub trait ReadStorageFactory: Debug + Send + Sync + 'static { - /// Creates a storage instance, e.g. over a Postgres connection or a RocksDB instance. - /// The specific criteria on which one are left up to the implementation. - /// - /// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives - /// a stop signal; this is the only case in which `Ok(None)` should be returned. - async fn access_storage( - &self, - stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result>; -} - -/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced -/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing). -#[async_trait] -impl ReadStorageFactory for ConnectionPool { - async fn access_storage( - &self, - _stop_receiver: &watch::Receiver, - l1_batch_number: L1BatchNumber, - ) -> anyhow::Result> { - let connection = self.connection().await?; - let storage = OwnedStorage::postgres(connection, l1_batch_number).await?; - Ok(Some(storage.into())) - } -} - -/// DB difference introduced by one batch. -#[derive(Debug, Clone)] -pub struct BatchDiff { - /// Storage slots touched by this batch along with new values there. - pub state_diff: HashMap, - /// Initial write indices introduced by this batch. - pub enum_index_diff: HashMap, - /// Factory dependencies introduced by this batch. - pub factory_dep_diff: HashMap>, -} - -/// A RocksDB cache instance with in-memory DB diffs that gives access to DB state at batches `N` to -/// `N + K`, where `K` is the number of diffs. -#[derive(Debug)] -pub struct RocksdbWithMemory { - /// RocksDB cache instance caught up to batch `N`. - pub rocksdb: RocksdbStorage, - /// Diffs for batches `N + 1` to `N + K`. - pub batch_diffs: Vec, -} +mod metrics; +mod rocksdb_with_memory; +mod snapshot; /// Union of all [`ReadStorage`] implementations that are returned by [`ReadStorageFactory`], such as /// Postgres- and RocksDB-backed storages. @@ -83,7 +34,7 @@ pub enum CommonStorage<'a> { /// Implementation over a RocksDB cache instance with in-memory DB diffs. RocksdbWithMemory(RocksdbWithMemory), /// In-memory storage snapshot with the Postgres storage fallback. - Snapshot(StorageWithSnapshot>), + Snapshot(SnapshotStorage<'a>), /// Generic implementation. Should be used for testing purposes only since it has performance penalty because /// of the dynamic dispatch. Boxed(Box), @@ -176,6 +127,7 @@ impl CommonStorage<'static> { connection: &mut Connection<'static, Core>, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { + let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::BatchHeader].start(); let Some(header) = connection .blocks_dal() .get_l1_batch_header(l1_batch_number) @@ -188,8 +140,10 @@ impl CommonStorage<'static> { .into_iter() .map(u256_to_h256) .collect(); + latency.observe(); // Check protective reads early on. + let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::ProtectiveReads].start(); let protective_reads = connection .storage_logs_dedup_dal() .get_protective_reads_for_l1_batch(l1_batch_number) @@ -199,14 +153,18 @@ impl CommonStorage<'static> { return Ok(None); } let protective_reads_len = protective_reads.len(); - tracing::debug!("Loaded {protective_reads_len} protective reads"); + let latency = latency.observe(); + tracing::debug!("Loaded {protective_reads_len} protective reads in {latency:?}"); + let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::TouchedSlots].start(); let touched_slots = connection .storage_logs_dal() .get_touched_slots_for_l1_batch(l1_batch_number) .await?; - tracing::debug!("Loaded {} touched keys", touched_slots.len()); + let latency = latency.observe(); + tracing::debug!("Loaded {} touched keys in {latency:?}", touched_slots.len()); + let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::PreviousValues].start(); let all_accessed_keys: Vec<_> = protective_reads .into_iter() .map(|key| key.hashed_key()) @@ -216,21 +174,31 @@ impl CommonStorage<'static> { .storage_logs_dal() .get_previous_storage_values(&all_accessed_keys, l1_batch_number) .await?; + let latency = latency.observe(); tracing::debug!( - "Obtained {} previous values for accessed keys", + "Obtained {} previous values for accessed keys in {latency:?}", previous_values.len() ); + + let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::InitialWrites].start(); let initial_write_info = connection .storage_logs_dal() .get_l1_batches_and_indices_for_initial_writes(&all_accessed_keys) .await?; - tracing::debug!("Obtained initial write info for accessed keys"); + let latency = latency.observe(); + tracing::debug!("Obtained initial write info for accessed keys in {latency:?}"); + let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::Bytecodes].start(); let bytecodes = connection .factory_deps_dal() .get_factory_deps(&bytecode_hashes) .await; - tracing::debug!("Loaded {} bytecodes used in the batch", bytecodes.len()); + let latency = latency.observe(); + tracing::debug!( + "Loaded {} bytecodes used in the batch in {latency:?}", + bytecodes.len() + ); + let factory_deps = bytecodes .into_iter() .map(|(hash_u256, words)| { @@ -256,54 +224,6 @@ impl CommonStorage<'static> { } } -impl ReadStorage for RocksdbWithMemory { - fn read_value(&mut self, key: &StorageKey) -> StorageValue { - let hashed_key = key.hashed_key(); - match self - .batch_diffs - .iter() - .rev() - .find_map(|b| b.state_diff.get(&hashed_key)) - { - None => self.rocksdb.read_value(key), - Some(value) => *value, - } - } - - fn is_write_initial(&mut self, key: &StorageKey) -> bool { - match self - .batch_diffs - .iter() - .find_map(|b| b.enum_index_diff.get(&key.hashed_key())) - { - None => self.rocksdb.is_write_initial(key), - Some(_) => false, - } - } - - fn load_factory_dep(&mut self, hash: H256) -> Option> { - match self - .batch_diffs - .iter() - .find_map(|b| b.factory_dep_diff.get(&hash)) - { - None => self.rocksdb.load_factory_dep(hash), - Some(value) => Some(value.clone()), - } - } - - fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { - match self - .batch_diffs - .iter() - .find_map(|b| b.enum_index_diff.get(&key.hashed_key())) - { - None => self.rocksdb.get_enumeration_index(key), - Some(value) => Some(*value), - } - } -} - impl ReadStorage for CommonStorage<'_> { fn read_value(&mut self, key: &StorageKey) -> StorageValue { match self { @@ -358,8 +278,42 @@ impl From for CommonStorage<'_> { } } -impl<'a> From>> for CommonStorage<'a> { - fn from(value: StorageWithSnapshot>) -> Self { +impl<'a> From> for CommonStorage<'a> { + fn from(value: SnapshotStorage<'a>) -> Self { Self::Snapshot(value) } } + +/// Storage with a static lifetime that can be sent to Tokio tasks etc. +pub type OwnedStorage = CommonStorage<'static>; + +/// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param +/// (mostly for testing purposes); the default is [`OwnedStorage`]. +#[async_trait] +pub trait ReadStorageFactory: fmt::Debug + Send + Sync + 'static { + /// Creates a storage instance, e.g. over a Postgres connection or a RocksDB instance. + /// The specific criteria on which one are left up to the implementation. + /// + /// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives + /// a stop signal; this is the only case in which `Ok(None)` should be returned. + async fn access_storage( + &self, + stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result>; +} + +/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced +/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing). +#[async_trait] +impl ReadStorageFactory for ConnectionPool { + async fn access_storage( + &self, + _stop_receiver: &watch::Receiver, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result> { + let connection = self.connection().await?; + let storage = OwnedStorage::postgres(connection, l1_batch_number).await?; + Ok(Some(storage.into())) + } +} diff --git a/core/lib/state/src/storage_factory/rocksdb_with_memory.rs b/core/lib/state/src/storage_factory/rocksdb_with_memory.rs new file mode 100644 index 000000000..411460dad --- /dev/null +++ b/core/lib/state/src/storage_factory/rocksdb_with_memory.rs @@ -0,0 +1,75 @@ +use std::collections::HashMap; + +use zksync_types::{StorageKey, StorageValue, H256}; +use zksync_vm_interface::storage::ReadStorage; + +use crate::RocksdbStorage; + +/// DB difference introduced by one batch. +#[derive(Debug, Clone)] +pub struct BatchDiff { + /// Storage slots touched by this batch along with new values there. + pub state_diff: HashMap, + /// Initial write indices introduced by this batch. + pub enum_index_diff: HashMap, + /// Factory dependencies introduced by this batch. + pub factory_dep_diff: HashMap>, +} + +/// A RocksDB cache instance with in-memory DB diffs that gives access to DB state at batches `N` to +/// `N + K`, where `K` is the number of diffs. +#[derive(Debug)] +pub struct RocksdbWithMemory { + /// RocksDB cache instance caught up to batch `N`. + pub rocksdb: RocksdbStorage, + /// Diffs for batches `N + 1` to `N + K`. + pub batch_diffs: Vec, +} + +impl ReadStorage for RocksdbWithMemory { + fn read_value(&mut self, key: &StorageKey) -> StorageValue { + let hashed_key = key.hashed_key(); + match self + .batch_diffs + .iter() + .rev() + .find_map(|b| b.state_diff.get(&hashed_key)) + { + None => self.rocksdb.read_value(key), + Some(value) => *value, + } + } + + fn is_write_initial(&mut self, key: &StorageKey) -> bool { + match self + .batch_diffs + .iter() + .find_map(|b| b.enum_index_diff.get(&key.hashed_key())) + { + None => self.rocksdb.is_write_initial(key), + Some(_) => false, + } + } + + fn load_factory_dep(&mut self, hash: H256) -> Option> { + match self + .batch_diffs + .iter() + .find_map(|b| b.factory_dep_diff.get(&hash)) + { + None => self.rocksdb.load_factory_dep(hash), + Some(value) => Some(value.clone()), + } + } + + fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { + match self + .batch_diffs + .iter() + .find_map(|b| b.enum_index_diff.get(&key.hashed_key())) + { + None => self.rocksdb.get_enumeration_index(key), + Some(value) => Some(*value), + } + } +} diff --git a/core/lib/state/src/storage_factory/snapshot.rs b/core/lib/state/src/storage_factory/snapshot.rs new file mode 100644 index 000000000..05a79125d --- /dev/null +++ b/core/lib/state/src/storage_factory/snapshot.rs @@ -0,0 +1,49 @@ +use zksync_types::{StorageKey, StorageValue, H256}; +use zksync_vm_interface::storage::StorageWithSnapshot; + +use super::metrics::{AccessKind, SNAPSHOT_METRICS}; +use crate::{interface::ReadStorage, PostgresStorage}; + +/// Wrapper around [`PostgresStorage`] used to track frequency of fallback access. +#[derive(Debug)] +pub struct FallbackStorage<'a>(PostgresStorage<'a>); + +impl<'a> From> for FallbackStorage<'a> { + fn from(storage: PostgresStorage<'a>) -> Self { + Self(storage) + } +} + +impl ReadStorage for FallbackStorage<'_> { + fn read_value(&mut self, key: &StorageKey) -> StorageValue { + let latency = SNAPSHOT_METRICS.fallback_access_latency[&AccessKind::ReadValue].start(); + let output = self.0.read_value(key); + latency.observe(); + output + } + + fn is_write_initial(&mut self, key: &StorageKey) -> bool { + let latency = SNAPSHOT_METRICS.fallback_access_latency[&AccessKind::IsWriteInitial].start(); + let output = self.0.is_write_initial(key); + latency.observe(); + output + } + + fn load_factory_dep(&mut self, hash: H256) -> Option> { + let latency = SNAPSHOT_METRICS.fallback_access_latency[&AccessKind::LoadFactoryDep].start(); + let output = self.0.load_factory_dep(hash); + latency.observe(); + output + } + + fn get_enumeration_index(&mut self, key: &StorageKey) -> Option { + let latency = + SNAPSHOT_METRICS.fallback_access_latency[&AccessKind::GetEnumerationIndex].start(); + let output = self.0.get_enumeration_index(key); + latency.observe(); + output + } +} + +/// Snapshot-backed storage used for batch processing. +pub type SnapshotStorage<'a> = StorageWithSnapshot>; diff --git a/core/lib/vm_executor/src/batch/factory.rs b/core/lib/vm_executor/src/batch/factory.rs index 68a3769ee..d6f7555b7 100644 --- a/core/lib/vm_executor/src/batch/factory.rs +++ b/core/lib/vm_executor/src/batch/factory.rs @@ -1,4 +1,4 @@ -use std::{marker::PhantomData, rc::Rc, sync::Arc}; +use std::{marker::PhantomData, rc::Rc, sync::Arc, time::Duration}; use anyhow::Context as _; use once_cell::sync::OnceCell; @@ -6,7 +6,7 @@ use tokio::sync::mpsc; use zksync_multivm::{ interface::{ executor::{BatchExecutor, BatchExecutorFactory}, - storage::{ReadStorage, StorageView}, + storage::{ReadStorage, StorageView, StorageViewStats}, BatchTransactionExecutionResult, ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, SystemEnv, VmInterface, VmInterfaceHistoryEnabled, }, @@ -20,7 +20,7 @@ use super::{ executor::{Command, MainBatchExecutor}, metrics::{TxExecutionStage, BATCH_TIP_METRICS, EXECUTOR_METRICS, KEEPER_METRICS}, }; -use crate::shared::InteractionType; +use crate::shared::{InteractionType, STORAGE_METRICS}; /// The default implementation of [`BatchExecutorFactory`]. /// Creates real batch executors which maintain the VM (as opposed to the test factories which don't use the VM). @@ -35,6 +35,7 @@ pub struct MainBatchExecutorFactory { /// regardless of its configuration, this flag should be set to `true`. optional_bytecode_compression: bool, fast_vm_mode: FastVmMode, + observe_storage_metrics: bool, } impl MainBatchExecutorFactory { @@ -43,9 +44,11 @@ impl MainBatchExecutorFactory { save_call_traces, optional_bytecode_compression, fast_vm_mode: FastVmMode::Old, + observe_storage_metrics: false, } } + /// Sets the fast VM mode used by this executor. pub fn set_fast_vm_mode(&mut self, fast_vm_mode: FastVmMode) { if !matches!(fast_vm_mode, FastVmMode::Old) { tracing::warn!( @@ -54,6 +57,13 @@ impl MainBatchExecutorFactory { } self.fast_vm_mode = fast_vm_mode; } + + /// Enables storage metrics reporting for this executor. Storage metrics will be reported for each transaction. + // The reason this isn't on by default is that storage metrics don't distinguish between "batch-executed" and "oneshot-executed" transactions; + // this optimally needs some improvements in `vise` (ability to add labels for groups of metrics). + pub fn observe_storage_metrics(&mut self) { + self.observe_storage_metrics = true; + } } impl BatchExecutorFactory for MainBatchExecutorFactory { @@ -70,6 +80,7 @@ impl BatchExecutorFactory for MainBatchExecu save_call_traces: self.save_call_traces, optional_bytecode_compression: self.optional_bytecode_compression, fast_vm_mode: self.fast_vm_mode, + observe_storage_metrics: self.observe_storage_metrics, commands: commands_receiver, _storage: PhantomData, }; @@ -91,6 +102,7 @@ struct CommandReceiver { save_call_traces: bool, optional_bytecode_compression: bool, fast_vm_mode: FastVmMode, + observe_storage_metrics: bool, commands: mpsc::Receiver, _storage: PhantomData, } @@ -112,14 +124,22 @@ impl CommandReceiver { self.fast_vm_mode, ); let mut batch_finished = false; + let mut prev_storage_stats = StorageViewStats::default(); while let Some(cmd) = self.commands.blocking_recv() { match cmd { Command::ExecuteTx(tx, resp) => { let tx_hash = tx.hash(); - let result = self.execute_tx(*tx, &mut vm).with_context(|| { + let (result, latency) = self.execute_tx(*tx, &mut vm).with_context(|| { format!("fatal error executing transaction {tx_hash:?}") })?; + + if self.observe_storage_metrics { + let storage_stats = storage_view.borrow().stats(); + let stats_diff = storage_stats.saturating_sub(&prev_storage_stats); + STORAGE_METRICS.observe(&format!("Tx {tx_hash:?}"), latency, &stats_diff); + prev_storage_stats = storage_stats; + } if resp.send(result).is_err() { break; } @@ -152,11 +172,11 @@ impl CommandReceiver { .context("storage view leaked")? .into_inner(); if batch_finished { - let metrics = storage_view.metrics(); + let stats = storage_view.stats(); EXECUTOR_METRICS.batch_storage_interaction_duration[&InteractionType::GetValue] - .observe(metrics.time_spent_on_get_value); + .observe(stats.time_spent_on_get_value); EXECUTOR_METRICS.batch_storage_interaction_duration[&InteractionType::SetValue] - .observe(metrics.time_spent_on_set_value); + .observe(stats.time_spent_on_set_value); } else { // State keeper can exit because of stop signal, so it's OK to exit mid-batch. tracing::info!("State keeper exited with an unfinished L1 batch"); @@ -168,7 +188,7 @@ impl CommandReceiver { &self, transaction: Transaction, vm: &mut VmInstance, - ) -> anyhow::Result { + ) -> anyhow::Result<(BatchTransactionExecutionResult, Duration)> { // Executing a next transaction means that a previous transaction was either rolled back (in which case its snapshot // was already removed), or that we build on top of it (in which case, it can be removed now). vm.pop_snapshot_no_rollback(); @@ -182,9 +202,8 @@ impl CommandReceiver { } else { self.execute_tx_in_vm(&transaction, vm)? }; - latency.observe(); - Ok(result) + Ok((result, latency.observe())) } fn rollback_last_tx(&self, vm: &mut VmInstance) { diff --git a/core/lib/vm_executor/src/oneshot/metrics.rs b/core/lib/vm_executor/src/oneshot/metrics.rs index 8a89ce0a9..475463300 100644 --- a/core/lib/vm_executor/src/oneshot/metrics.rs +++ b/core/lib/vm_executor/src/oneshot/metrics.rs @@ -1,9 +1,9 @@ use std::time::Duration; use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics}; -use zksync_multivm::interface::{storage::StorageViewMetrics, VmMemoryMetrics}; +use zksync_multivm::interface::{storage::StorageViewStats, VmMemoryMetrics}; -use crate::shared::InteractionType; +use crate::shared::STORAGE_METRICS; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "type", rename_all = "snake_case")] @@ -46,29 +46,11 @@ struct RuntimeContextMemoryMetrics { #[vise::register] static MEMORY_METRICS: vise::Global = vise::Global::new(); -const INTERACTION_AMOUNT_BUCKETS: Buckets = Buckets::exponential(10.0..=10_000_000.0, 10.0); - -#[derive(Debug, Metrics)] -#[metrics(prefix = "runtime_context_storage_interaction")] -struct RuntimeContextStorageMetrics { - #[metrics(buckets = INTERACTION_AMOUNT_BUCKETS)] - amount: Family>, - #[metrics(buckets = Buckets::LATENCIES)] - duration: Family>, - #[metrics(buckets = Buckets::LATENCIES)] - duration_per_unit: Family>, - #[metrics(buckets = Buckets::ZERO_TO_ONE)] - ratio: Histogram, -} - -#[vise::register] -static STORAGE_METRICS: vise::Global = vise::Global::new(); - pub(super) fn report_vm_memory_metrics( tx_id: &str, memory_metrics: &VmMemoryMetrics, vm_execution_took: Duration, - storage_metrics: StorageViewMetrics, + storage_metrics: &StorageViewStats, ) { MEMORY_METRICS.event_sink_size[&SizeType::Inner].observe(memory_metrics.event_sink_inner); MEMORY_METRICS.event_sink_size[&SizeType::History].observe(memory_metrics.event_sink_history); @@ -88,56 +70,5 @@ pub(super) fn report_vm_memory_metrics( .full .observe(memory_metrics.full_size() + storage_metrics.cache_size); - let total_storage_invocations = storage_metrics.get_value_storage_invocations - + storage_metrics.set_value_storage_invocations; - let total_time_spent_in_storage = - storage_metrics.time_spent_on_get_value + storage_metrics.time_spent_on_set_value; - - STORAGE_METRICS.amount[&InteractionType::Missed] - .observe(storage_metrics.storage_invocations_missed); - STORAGE_METRICS.amount[&InteractionType::GetValue] - .observe(storage_metrics.get_value_storage_invocations); - STORAGE_METRICS.amount[&InteractionType::SetValue] - .observe(storage_metrics.set_value_storage_invocations); - STORAGE_METRICS.amount[&InteractionType::Total].observe(total_storage_invocations); - - STORAGE_METRICS.duration[&InteractionType::Missed] - .observe(storage_metrics.time_spent_on_storage_missed); - STORAGE_METRICS.duration[&InteractionType::GetValue] - .observe(storage_metrics.time_spent_on_get_value); - STORAGE_METRICS.duration[&InteractionType::SetValue] - .observe(storage_metrics.time_spent_on_set_value); - STORAGE_METRICS.duration[&InteractionType::Total].observe(total_time_spent_in_storage); - - if total_storage_invocations > 0 { - STORAGE_METRICS.duration_per_unit[&InteractionType::Total] - .observe(total_time_spent_in_storage.div_f64(total_storage_invocations as f64)); - } - if storage_metrics.storage_invocations_missed > 0 { - let duration_per_unit = storage_metrics - .time_spent_on_storage_missed - .div_f64(storage_metrics.storage_invocations_missed as f64); - STORAGE_METRICS.duration_per_unit[&InteractionType::Missed].observe(duration_per_unit); - } - - STORAGE_METRICS - .ratio - .observe(total_time_spent_in_storage.as_secs_f64() / vm_execution_took.as_secs_f64()); - - const STORAGE_INVOCATIONS_DEBUG_THRESHOLD: usize = 1_000; - - if total_storage_invocations > STORAGE_INVOCATIONS_DEBUG_THRESHOLD { - tracing::info!( - "Tx {tx_id} resulted in {total_storage_invocations} storage_invocations, {} new_storage_invocations, \ - {} get_value_storage_invocations, {} set_value_storage_invocations, \ - vm execution took {vm_execution_took:?}, storage interaction took {total_time_spent_in_storage:?} \ - (missed: {:?} get: {:?} set: {:?})", - storage_metrics.storage_invocations_missed, - storage_metrics.get_value_storage_invocations, - storage_metrics.set_value_storage_invocations, - storage_metrics.time_spent_on_storage_missed, - storage_metrics.time_spent_on_get_value, - storage_metrics.time_spent_on_set_value, - ); - } + STORAGE_METRICS.observe(&format!("Tx {tx_id}"), vm_execution_took, storage_metrics); } diff --git a/core/lib/vm_executor/src/oneshot/mod.rs b/core/lib/vm_executor/src/oneshot/mod.rs index cac8edfdf..1838381d2 100644 --- a/core/lib/vm_executor/src/oneshot/mod.rs +++ b/core/lib/vm_executor/src/oneshot/mod.rs @@ -284,7 +284,7 @@ impl VmSandbox { &tx_id, &memory_metrics, vm_execution_took, - self.storage_view.as_ref().borrow_mut().metrics(), + &self.storage_view.borrow().stats(), ); result } diff --git a/core/lib/vm_executor/src/shared.rs b/core/lib/vm_executor/src/shared.rs index 420005be0..8ac4dce2e 100644 --- a/core/lib/vm_executor/src/shared.rs +++ b/core/lib/vm_executor/src/shared.rs @@ -1,6 +1,9 @@ //! Functionality shared among different types of executors. -use vise::{EncodeLabelSet, EncodeLabelValue}; +use std::time::Duration; + +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics}; +use zksync_multivm::interface::storage::StorageViewStats; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "interaction", rename_all = "snake_case")] @@ -10,3 +13,79 @@ pub(crate) enum InteractionType { SetValue, Total, } + +const INTERACTION_AMOUNT_BUCKETS: Buckets = Buckets::exponential(10.0..=10_000_000.0, 10.0); + +#[derive(Debug, Metrics)] +#[metrics(prefix = "runtime_context_storage_interaction")] +pub(crate) struct RuntimeContextStorageMetrics { + #[metrics(buckets = INTERACTION_AMOUNT_BUCKETS)] + amount: Family>, + #[metrics(buckets = Buckets::LATENCIES)] + duration: Family>, + #[metrics(buckets = Buckets::LATENCIES)] + duration_per_unit: Family>, + #[metrics(buckets = Buckets::ZERO_TO_ONE)] + ratio: Histogram, +} + +impl RuntimeContextStorageMetrics { + pub fn observe( + &self, + op: &str, + total_vm_latency: Duration, + storage_metrics: &StorageViewStats, + ) { + const STORAGE_INVOCATIONS_DEBUG_THRESHOLD: usize = 1_000; + + let total_storage_invocations = storage_metrics.get_value_storage_invocations + + storage_metrics.set_value_storage_invocations; + let total_time_spent_in_storage = + storage_metrics.time_spent_on_get_value + storage_metrics.time_spent_on_set_value; + + self.amount[&InteractionType::Missed].observe(storage_metrics.storage_invocations_missed); + self.amount[&InteractionType::GetValue] + .observe(storage_metrics.get_value_storage_invocations); + self.amount[&InteractionType::SetValue] + .observe(storage_metrics.set_value_storage_invocations); + self.amount[&InteractionType::Total].observe(total_storage_invocations); + + self.duration[&InteractionType::Missed] + .observe(storage_metrics.time_spent_on_storage_missed); + self.duration[&InteractionType::GetValue].observe(storage_metrics.time_spent_on_get_value); + self.duration[&InteractionType::SetValue].observe(storage_metrics.time_spent_on_set_value); + self.duration[&InteractionType::Total].observe(total_time_spent_in_storage); + + if total_storage_invocations > 0 { + self.duration_per_unit[&InteractionType::Total] + .observe(total_time_spent_in_storage.div_f64(total_storage_invocations as f64)); + } + if storage_metrics.storage_invocations_missed > 0 { + let duration_per_unit = storage_metrics + .time_spent_on_storage_missed + .div_f64(storage_metrics.storage_invocations_missed as f64); + self.duration_per_unit[&InteractionType::Missed].observe(duration_per_unit); + } + + self.ratio + .observe(total_time_spent_in_storage.as_secs_f64() / total_vm_latency.as_secs_f64()); + + if total_storage_invocations > STORAGE_INVOCATIONS_DEBUG_THRESHOLD { + tracing::info!( + "{op} resulted in {total_storage_invocations} storage_invocations, {} new_storage_invocations, \ + {} get_value_storage_invocations, {} set_value_storage_invocations, \ + vm execution took {total_vm_latency:?}, storage interaction took {total_time_spent_in_storage:?} \ + (missed: {:?} get: {:?} set: {:?})", + storage_metrics.storage_invocations_missed, + storage_metrics.get_value_storage_invocations, + storage_metrics.set_value_storage_invocations, + storage_metrics.time_spent_on_storage_missed, + storage_metrics.time_spent_on_get_value, + storage_metrics.time_spent_on_set_value, + ); + } + } +} + +#[vise::register] +pub(crate) static STORAGE_METRICS: vise::Global = vise::Global::new(); diff --git a/core/lib/vm_interface/src/storage/mod.rs b/core/lib/vm_interface/src/storage/mod.rs index 9b92ef8b7..6cdcd33db 100644 --- a/core/lib/vm_interface/src/storage/mod.rs +++ b/core/lib/vm_interface/src/storage/mod.rs @@ -6,7 +6,7 @@ pub use self::{ // Note, that `test_infra` of the bootloader tests relies on this value to be exposed in_memory::{InMemoryStorage, IN_MEMORY_STORAGE_DEFAULT_NETWORK_ID}, snapshot::{StorageSnapshot, StorageWithSnapshot}, - view::{ImmutableStorageView, StorageView, StorageViewCache, StorageViewMetrics}, + view::{ImmutableStorageView, StorageView, StorageViewCache, StorageViewStats}, }; mod in_memory; diff --git a/core/lib/vm_interface/src/storage/view.rs b/core/lib/vm_interface/src/storage/view.rs index 101f5c82f..ec9267609 100644 --- a/core/lib/vm_interface/src/storage/view.rs +++ b/core/lib/vm_interface/src/storage/view.rs @@ -10,9 +10,9 @@ use zksync_types::{StorageKey, StorageValue, H256}; use super::{ReadStorage, StoragePtr, WriteStorage}; -/// Metrics for [`StorageView`]. +/// Statistics for [`StorageView`]. #[derive(Debug, Default, Clone, Copy)] -pub struct StorageViewMetrics { +pub struct StorageViewStats { /// Estimated byte size of the cache used by the `StorageView`. pub cache_size: usize, /// Number of read / write ops for which the value was read from the underlying storage. @@ -29,6 +29,33 @@ pub struct StorageViewMetrics { pub time_spent_on_set_value: Duration, } +impl StorageViewStats { + /// Subtracts two sets of statistics. This can be used to measure increment between these stats and older stats for the same VM. + pub fn saturating_sub(&self, older: &Self) -> Self { + Self { + cache_size: self.cache_size.saturating_sub(older.cache_size), + storage_invocations_missed: self + .storage_invocations_missed + .saturating_sub(older.storage_invocations_missed), + get_value_storage_invocations: self + .get_value_storage_invocations + .saturating_sub(older.get_value_storage_invocations), + set_value_storage_invocations: self + .set_value_storage_invocations + .saturating_sub(older.set_value_storage_invocations), + time_spent_on_storage_missed: self + .time_spent_on_storage_missed + .saturating_sub(older.time_spent_on_storage_missed), + time_spent_on_get_value: self + .time_spent_on_get_value + .saturating_sub(older.time_spent_on_get_value), + time_spent_on_set_value: self + .time_spent_on_set_value + .saturating_sub(older.time_spent_on_set_value), + } + } +} + /// `StorageView` is a buffer for `StorageLog`s between storage and transaction execution code. /// In order to commit transactions logs should be submitted to the underlying storage /// after a transaction is executed. @@ -46,7 +73,7 @@ pub struct StorageView { // Used for caching and to get the list/count of modified keys modified_storage_keys: HashMap, cache: StorageViewCache, - metrics: StorageViewMetrics, + stats: StorageViewStats, } /// `StorageViewCache` is a struct for caching storage reads and `contains_key()` checks. @@ -112,7 +139,7 @@ impl StorageView { read_storage_keys: HashMap::new(), initial_writes: HashMap::new(), }, - metrics: StorageViewMetrics::default(), + stats: StorageViewStats::default(), } } @@ -126,8 +153,8 @@ impl StorageView { cached_value.copied().unwrap_or_else(|| { let value = self.storage_handle.read_value(key); self.cache.read_storage_keys.insert(*key, value); - self.metrics.time_spent_on_storage_missed += started_at.elapsed(); - self.metrics.storage_invocations_missed += 1; + self.stats.time_spent_on_storage_missed += started_at.elapsed(); + self.stats.storage_invocations_missed += 1; value }) } @@ -138,11 +165,11 @@ impl StorageView { + self.cache.read_storage_keys.len() * mem::size_of::<(StorageKey, StorageValue)>() } - /// Returns the current metrics. - pub fn metrics(&self) -> StorageViewMetrics { - StorageViewMetrics { + /// Returns the current storage access stats. + pub fn stats(&self) -> StorageViewStats { + StorageViewStats { cache_size: self.cache_size(), - ..self.metrics + ..self.stats } } @@ -155,7 +182,7 @@ impl StorageView { impl ReadStorage for StorageView { fn read_value(&mut self, key: &StorageKey) -> StorageValue { let started_at = Instant::now(); - self.metrics.get_value_storage_invocations += 1; + self.stats.get_value_storage_invocations += 1; let value = self.get_value_no_log(key); tracing::trace!( @@ -166,7 +193,7 @@ impl ReadStorage for StorageView { key.key() ); - self.metrics.time_spent_on_get_value += started_at.elapsed(); + self.stats.time_spent_on_get_value += started_at.elapsed(); value } @@ -198,7 +225,7 @@ impl WriteStorage for StorageView { fn set_value(&mut self, key: StorageKey, value: StorageValue) -> StorageValue { let started_at = Instant::now(); - self.metrics.set_value_storage_invocations += 1; + self.stats.set_value_storage_invocations += 1; let original = self.get_value_no_log(&key); tracing::trace!( @@ -210,7 +237,7 @@ impl WriteStorage for StorageView { key.key() ); self.modified_storage_keys.insert(key, value); - self.metrics.time_spent_on_set_value += started_at.elapsed(); + self.stats.time_spent_on_set_value += started_at.elapsed(); original } @@ -220,7 +247,7 @@ impl WriteStorage for StorageView { } fn missed_storage_invocations(&self) -> usize { - self.metrics.storage_invocations_missed + self.stats.storage_invocations_missed } } @@ -245,8 +272,8 @@ impl ReadStorage for ImmutableStorageView { cached_value.copied().unwrap_or_else(|| { let value = this.storage_handle.read_value(key); this.cache.read_storage_keys.insert(*key, value); - this.metrics.time_spent_on_storage_missed += started_at.elapsed(); - this.metrics.storage_invocations_missed += 1; + this.stats.time_spent_on_storage_missed += started_at.elapsed(); + this.stats.storage_invocations_missed += 1; value }) } @@ -289,7 +316,7 @@ mod test { assert_eq!(storage_view.read_value(&key), value); assert!(storage_view.is_write_initial(&key)); // key was inserted during the view lifetime - assert_eq!(storage_view.metrics().storage_invocations_missed, 1); + assert_eq!(storage_view.stats().storage_invocations_missed, 1); // ^ We should only read a value at `key` once, and then used the cached value. raw_storage.set_value(key, value); @@ -307,10 +334,10 @@ mod test { assert_eq!(storage_view.read_value(&new_key), new_value); assert!(storage_view.is_write_initial(&new_key)); - let metrics = storage_view.metrics(); - assert_eq!(metrics.storage_invocations_missed, 2); - assert_eq!(metrics.get_value_storage_invocations, 3); - assert_eq!(metrics.set_value_storage_invocations, 2); + let stats = storage_view.stats(); + assert_eq!(stats.storage_invocations_missed, 2); + assert_eq!(stats.get_value_storage_invocations, 3); + assert_eq!(stats.set_value_storage_invocations, 2); } #[test] diff --git a/core/node/vm_runner/src/impls/bwip.rs b/core/node/vm_runner/src/impls/bwip.rs index f23f63533..6c2933635 100644 --- a/core/node/vm_runner/src/impls/bwip.rs +++ b/core/node/vm_runner/src/impls/bwip.rs @@ -52,9 +52,9 @@ impl BasicWitnessInputProducer { ConcurrentOutputHandlerFactory::new(pool.clone(), io.clone(), output_handler_factory); let vm_runner = VmRunner::new( pool, - Box::new(io), + Arc::new(io), Arc::new(loader), - Box::new(output_handler_factory), + Arc::new(output_handler_factory), batch_executor_factory, ); Ok(( @@ -168,7 +168,7 @@ impl OutputHandler for BasicWitnessInputProducerOutputHandler { )] async fn handle_l1_batch(self: Box, output: Arc) -> anyhow::Result<()> { let l1_batch_number = self.l1_batch_number; - let mut connection = self.pool.connection().await?; + let mut connection = self.pool.connection_tagged("bwip").await?; tracing::info!(%l1_batch_number, "Started saving VM run data"); @@ -381,7 +381,7 @@ struct BasicWitnessInputProducerOutputHandlerFactory { #[async_trait] impl OutputHandlerFactory for BasicWitnessInputProducerOutputHandlerFactory { async fn create_handler( - &mut self, + &self, system_env: SystemEnv, l1_batch_env: L1BatchEnv, ) -> anyhow::Result> { diff --git a/core/node/vm_runner/src/impls/playground.rs b/core/node/vm_runner/src/impls/playground.rs index 091fa15fc..dc21d5a32 100644 --- a/core/node/vm_runner/src/impls/playground.rs +++ b/core/node/vm_runner/src/impls/playground.rs @@ -129,6 +129,7 @@ impl VmPlayground { let mut batch_executor_factory = MainBatchExecutorFactory::new(false, false); batch_executor_factory.set_fast_vm_mode(vm_mode); + batch_executor_factory.observe_storage_metrics(); let io = VmPlaygroundIo { cursor_file_path, @@ -246,9 +247,9 @@ impl VmPlayground { }; let vm_runner = VmRunner::new( self.pool, - Box::new(self.io), + Arc::new(self.io), loader, - Box::new(self.output_handler_factory), + Arc::new(self.output_handler_factory), Box::new(self.batch_executor_factory), ); vm_runner.run(&stop_receiver).await @@ -412,7 +413,7 @@ impl OutputHandler for VmPlaygroundOutputHandler { #[async_trait] impl OutputHandlerFactory for VmPlaygroundOutputHandler { async fn create_handler( - &mut self, + &self, _system_env: SystemEnv, _l1_batch_env: L1BatchEnv, ) -> anyhow::Result> { diff --git a/core/node/vm_runner/src/impls/protective_reads.rs b/core/node/vm_runner/src/impls/protective_reads.rs index b620675b7..b1aff9fe3 100644 --- a/core/node/vm_runner/src/impls/protective_reads.rs +++ b/core/node/vm_runner/src/impls/protective_reads.rs @@ -41,9 +41,9 @@ impl ProtectiveReadsWriter { let batch_processor = MainBatchExecutorFactory::new(false, false); let vm_runner = VmRunner::new( pool, - Box::new(io), + Arc::new(io), Arc::new(loader), - Box::new(output_handler_factory), + Arc::new(output_handler_factory), Box::new(batch_processor), ); Ok(( @@ -219,7 +219,7 @@ struct ProtectiveReadsOutputHandlerFactory { #[async_trait] impl OutputHandlerFactory for ProtectiveReadsOutputHandlerFactory { async fn create_handler( - &mut self, + &self, _system_env: SystemEnv, l1_batch_env: L1BatchEnv, ) -> anyhow::Result> { diff --git a/core/node/vm_runner/src/io.rs b/core/node/vm_runner/src/io.rs index 2e118f6cf..6d758f816 100644 --- a/core/node/vm_runner/src/io.rs +++ b/core/node/vm_runner/src/io.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use zksync_dal::{Connection, Core}; @@ -31,8 +31,9 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static { conn: &mut Connection<'_, Core>, ) -> anyhow::Result; - /// Marks the specified batch as being in progress. Must be called before a batch can be marked - /// as completed. + /// Marks the specified batch as being in progress. Will be called at least once before a batch can be marked + /// as completed; can be called multiple times in case of a crash. The order in which this method is called + /// is not specified; i.e., it is **not** guaranteed to be called sequentially. /// /// # Errors /// @@ -44,7 +45,8 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static { ) -> anyhow::Result<()>; /// Marks the specified batch as the latest completed batch. All earlier batches are considered - /// to be completed too. No guarantees about later batches. + /// to be completed too. No guarantees about later batches. This method is guaranteed to be called + /// with monotonically increasing batch numbers. /// /// # Errors /// @@ -55,3 +57,44 @@ pub trait VmRunnerIo: Debug + Send + Sync + 'static { l1_batch_number: L1BatchNumber, ) -> anyhow::Result<()>; } + +#[async_trait] +impl VmRunnerIo for Arc { + fn name(&self) -> &'static str { + (**self).name() + } + + async fn latest_processed_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + (**self).latest_processed_batch(conn).await + } + + async fn last_ready_to_be_loaded_batch( + &self, + conn: &mut Connection<'_, Core>, + ) -> anyhow::Result { + (**self).last_ready_to_be_loaded_batch(conn).await + } + + async fn mark_l1_batch_as_processing( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + (**self) + .mark_l1_batch_as_processing(conn, l1_batch_number) + .await + } + + async fn mark_l1_batch_as_completed( + &self, + conn: &mut Connection<'_, Core>, + l1_batch_number: L1BatchNumber, + ) -> anyhow::Result<()> { + (**self) + .mark_l1_batch_as_completed(conn, l1_batch_number) + .await + } +} diff --git a/core/node/vm_runner/src/metrics.rs b/core/node/vm_runner/src/metrics.rs index 4252ad5f0..cc588fd02 100644 --- a/core/node/vm_runner/src/metrics.rs +++ b/core/node/vm_runner/src/metrics.rs @@ -2,7 +2,28 @@ use std::time::Duration; -use vise::{Buckets, Gauge, Histogram, Metrics}; +use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics, Unit}; +use zksync_state::OwnedStorage; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] +#[metrics(label = "storage", rename_all = "snake_case")] +pub(super) enum StorageKind { + Postgres, + Snapshot, + Rocksdb, + Unknown, +} + +impl StorageKind { + pub fn new(storage: &OwnedStorage) -> Self { + match storage { + OwnedStorage::Rocksdb(_) | OwnedStorage::RocksdbWithMemory(_) => Self::Rocksdb, + OwnedStorage::Postgres(_) => Self::Postgres, + OwnedStorage::Snapshot(_) => Self::Snapshot, + OwnedStorage::Boxed(_) => Self::Unknown, + } + } +} #[derive(Debug, Metrics)] #[metrics(prefix = "vm_runner")] @@ -16,6 +37,9 @@ pub(super) struct VmRunnerMetrics { /// Total latency of loading an L1 batch (RocksDB mode only). #[metrics(buckets = Buckets::LATENCIES)] pub storage_load_time: Histogram, + /// Latency of loading data and storage for a batch, grouped by the storage kind. + #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] + pub data_and_storage_latency: Family>, /// Total latency of running VM on an L1 batch. #[metrics(buckets = Buckets::LATENCIES)] pub run_vm_time: Histogram, diff --git a/core/node/vm_runner/src/output_handler.rs b/core/node/vm_runner/src/output_handler.rs index 25eae5e36..7a8d1e41e 100644 --- a/core/node/vm_runner/src/output_handler.rs +++ b/core/node/vm_runner/src/output_handler.rs @@ -61,7 +61,7 @@ pub trait OutputHandler: fmt::Debug + Send { /// simultaneously. Implementing this trait signifies that this property is held for the data the /// implementation is responsible for. #[async_trait] -pub trait OutputHandlerFactory: fmt::Debug + Send { +pub trait OutputHandlerFactory: fmt::Debug + Send + Sync { /// Creates a [`StateKeeperOutputHandler`] implementation for the provided L1 batch. Only /// supposed to be used for the L1 batch data it was created against. Using it for anything else /// will lead to errors. @@ -70,7 +70,7 @@ pub trait OutputHandlerFactory: fmt::Debug + Send { /// /// Propagates DB errors. async fn create_handler( - &mut self, + &self, system_env: SystemEnv, l1_batch_env: L1BatchEnv, ) -> anyhow::Result>; @@ -139,7 +139,7 @@ impl OutputHandlerFactory for ConcurrentOutputHandlerFactory { async fn create_handler( - &mut self, + &self, system_env: SystemEnv, l1_batch_env: L1BatchEnv, ) -> anyhow::Result> { diff --git a/core/node/vm_runner/src/process.rs b/core/node/vm_runner/src/process.rs index e2a678ccd..4f7ac1f97 100644 --- a/core/node/vm_runner/src/process.rs +++ b/core/node/vm_runner/src/process.rs @@ -1,20 +1,26 @@ -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use anyhow::Context; -use tokio::{sync::watch, task::JoinHandle}; +use tokio::{ + sync::{watch, Mutex}, + task::JoinHandle, +}; use zksync_dal::{ConnectionPool, Core}; use zksync_state::OwnedStorage; -use zksync_types::{block::L2BlockExecutionData, L1BatchNumber}; -use zksync_vm_interface::{ - executor::{BatchExecutor, BatchExecutorFactory}, - L2BlockEnv, -}; +use zksync_types::L1BatchNumber; +use zksync_vm_interface::{executor::BatchExecutorFactory, L2BlockEnv}; use crate::{ - metrics::METRICS, output_handler::OutputHandler, storage::StorageLoader, L1BatchOutput, - L2BlockOutput, OutputHandlerFactory, VmRunnerIo, + metrics::{StorageKind, METRICS}, + storage::StorageLoader, + L1BatchOutput, L2BlockOutput, OutputHandlerFactory, VmRunnerIo, }; +const SLEEP_INTERVAL: Duration = Duration::from_millis(50); + /// VM runner represents a logic layer of L1 batch / L2 block processing flow akin to that of state /// keeper. The difference is that VM runner is designed to be run on batches/blocks that have /// already been processed by state keeper but still require some extra handling as regulated by @@ -26,13 +32,13 @@ use crate::{ /// /// You can think of VM runner as a concurrent processor of a continuous stream of newly committed /// batches/blocks. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct VmRunner { pool: ConnectionPool, - io: Box, + io: Arc, loader: Arc, - output_handler_factory: Box, - batch_executor_factory: Box>, + output_handler_factory: Arc, + batch_executor_factory: Arc>>>, } impl VmRunner { @@ -44,9 +50,9 @@ impl VmRunner { /// an underlying implementation of [`OutputHandlerFactory`]. pub fn new( pool: ConnectionPool, - io: Box, + io: Arc, loader: Arc, - output_handler_factory: Box, + output_handler_factory: Arc, batch_executor_factory: Box>, ) -> Self { Self { @@ -54,17 +60,42 @@ impl VmRunner { io, loader, output_handler_factory, - batch_executor_factory, + batch_executor_factory: Arc::new(Mutex::new(batch_executor_factory)), } } - async fn process_batch( - mut batch_executor: Box>, - l2_blocks: Vec, - mut output_handler: Box, - ) -> anyhow::Result<()> { + async fn process_batch(self, number: L1BatchNumber) -> anyhow::Result<()> { + let stage_started_at = Instant::now(); + let (batch_data, storage) = loop { + match self.loader.load_batch(number).await? { + Some(data_and_storage) => break data_and_storage, + None => { + // Next batch has not been loaded yet + tokio::time::sleep(SLEEP_INTERVAL).await; + } + } + }; + let kind = StorageKind::new(&storage); + METRICS.data_and_storage_latency[&kind].observe(stage_started_at.elapsed()); + + let mut batch_executor = self.batch_executor_factory.lock().await.init_batch( + storage, + batch_data.l1_batch_env.clone(), + batch_data.system_env.clone(), + ); + let mut output_handler = self + .output_handler_factory + .create_handler(batch_data.system_env, batch_data.l1_batch_env) + .await?; + self.io + .mark_l1_batch_as_processing( + &mut self.pool.connection_tagged("vm_runner").await?, + number, + ) + .await?; + let latency = METRICS.run_vm_time.start(); - for (i, l2_block) in l2_blocks.into_iter().enumerate() { + for (i, l2_block) in batch_data.l2_blocks.into_iter().enumerate() { let block_env = L2BlockEnv::from_l2_block_data(&l2_block); if i > 0 { // First L2 block in every batch is already preloaded @@ -112,14 +143,12 @@ impl VmRunner { /// Consumes VM runner to execute a loop that continuously pulls data from [`VmRunnerIo`] and /// processes it. - pub async fn run(mut self, stop_receiver: &watch::Receiver) -> anyhow::Result<()> { - const SLEEP_INTERVAL: Duration = Duration::from_millis(50); - + pub async fn run(self, stop_receiver: &watch::Receiver) -> anyhow::Result<()> { // Join handles for asynchronous tasks that are being run in the background let mut task_handles: Vec<(L1BatchNumber, JoinHandle>)> = Vec::new(); let mut next_batch = self .io - .latest_processed_batch(&mut self.pool.connection().await?) + .latest_processed_batch(&mut self.pool.connection_tagged("vm_runner").await?) .await? + 1; loop { @@ -148,7 +177,7 @@ impl VmRunner { let last_ready_batch = self .io - .last_ready_to_be_loaded_batch(&mut self.pool.connection().await?) + .last_ready_to_be_loaded_batch(&mut self.pool.connection_tagged("vm_runner").await?) .await?; METRICS.last_ready_batch.set(last_ready_batch.0.into()); if next_batch > last_ready_batch { @@ -156,31 +185,8 @@ impl VmRunner { tokio::time::sleep(SLEEP_INTERVAL).await; continue; } - let Some((batch_data, storage)) = self.loader.load_batch(next_batch).await? else { - // Next batch has not been loaded yet - tokio::time::sleep(SLEEP_INTERVAL).await; - continue; - }; - let batch_executor = self.batch_executor_factory.init_batch( - storage, - batch_data.l1_batch_env.clone(), - batch_data.system_env.clone(), - ); - let output_handler = self - .output_handler_factory - .create_handler(batch_data.system_env, batch_data.l1_batch_env) - .await?; - - self.io - .mark_l1_batch_as_processing(&mut self.pool.connection().await?, next_batch) - .await?; - let handle = tokio::task::spawn(Self::process_batch( - batch_executor, - batch_data.l2_blocks, - output_handler, - )); + let handle = tokio::spawn(self.clone().process_batch(next_batch)); task_handles.push((next_batch, handle)); - next_batch += 1; } } diff --git a/core/node/vm_runner/src/storage.rs b/core/node/vm_runner/src/storage.rs index baee42600..cd746e4e1 100644 --- a/core/node/vm_runner/src/storage.rs +++ b/core/node/vm_runner/src/storage.rs @@ -49,7 +49,7 @@ pub(crate) struct PostgresLoader { impl PostgresLoader { pub async fn new(pool: ConnectionPool, chain_id: L2ChainId) -> anyhow::Result { let mut l1_batch_params_provider = L1BatchParamsProvider::new(); - let mut conn = pool.connection().await?; + let mut conn = pool.connection_tagged("vm_runner").await?; l1_batch_params_provider.initialize(&mut conn).await?; Ok(Self { pool, @@ -72,7 +72,7 @@ impl StorageLoader for PostgresLoader { &self, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { - let mut conn = self.pool.connection().await?; + let mut conn = self.pool.connection_tagged("vm_runner").await?; let Some(data) = load_batch_execute_data( &mut conn, l1_batch_number, @@ -86,7 +86,7 @@ impl StorageLoader for PostgresLoader { if let Some(snapshot) = OwnedStorage::snapshot(&mut conn, l1_batch_number).await? { let postgres = OwnedStorage::postgres(conn, l1_batch_number - 1).await?; - let storage = snapshot.with_fallback(postgres, self.shadow_snapshots); + let storage = snapshot.with_fallback(postgres.into(), self.shadow_snapshots); let storage = OwnedStorage::from(storage); return Ok(Some((data, storage))); } @@ -94,7 +94,7 @@ impl StorageLoader for PostgresLoader { tracing::info!( "Incomplete data to create storage snapshot for batch; will use sequential storage" ); - let conn = self.pool.connection().await?; + let conn = self.pool.connection_tagged("vm_runner").await?; let storage = OwnedStorage::postgres(conn, l1_batch_number - 1).await?; Ok(Some((data, storage.into()))) } diff --git a/core/node/vm_runner/src/tests/mod.rs b/core/node/vm_runner/src/tests/mod.rs index 530016408..9fe9e99e9 100644 --- a/core/node/vm_runner/src/tests/mod.rs +++ b/core/node/vm_runner/src/tests/mod.rs @@ -39,7 +39,7 @@ struct IoMock { } #[async_trait] -impl VmRunnerIo for Arc> { +impl VmRunnerIo for RwLock { fn name(&self) -> &'static str { "io_mock" } @@ -153,7 +153,7 @@ struct TestOutputFactory { #[async_trait] impl OutputHandlerFactory for TestOutputFactory { async fn create_handler( - &mut self, + &self, _system_env: SystemEnv, l1_batch_env: L1BatchEnv, ) -> anyhow::Result> { diff --git a/core/node/vm_runner/src/tests/process.rs b/core/node/vm_runner/src/tests/process.rs index fec3fd2ba..115410ce8 100644 --- a/core/node/vm_runner/src/tests/process.rs +++ b/core/node/vm_runner/src/tests/process.rs @@ -57,9 +57,9 @@ async fn process_batches((batch_count, window): (u32, u32)) -> anyhow::Result<() let batch_executor = MainBatchExecutorFactory::new(false, false); let vm_runner = VmRunner::new( connection_pool, - Box::new(io.clone()), + io.clone(), storage, - Box::new(output_factory), + Arc::new(output_factory), Box::new(batch_executor), ); tokio::task::spawn(async move { vm_runner.run(&stop_receiver).await.unwrap() }); diff --git a/core/node/vm_runner/src/tests/storage_writer.rs b/core/node/vm_runner/src/tests/storage_writer.rs index 76d086712..c377cf95b 100644 --- a/core/node/vm_runner/src/tests/storage_writer.rs +++ b/core/node/vm_runner/src/tests/storage_writer.rs @@ -57,6 +57,8 @@ impl VmRunnerIo for StorageWriterIo { l1_batch_number: L1BatchNumber, ) -> anyhow::Result<()> { assert_eq!(l1_batch_number, self.batch() + 1); + // ^ The assertion works because of `last_ready_to_be_loaded_batch()` implementation; it wouldn't hold if we allowed + // to process multiple batches concurrently. Ok(()) } @@ -147,7 +149,7 @@ impl OutputHandler for StorageWriterIo { #[async_trait] impl OutputHandlerFactory for StorageWriterIo { async fn create_handler( - &mut self, + &self, _system_env: SystemEnv, l1_batch_env: L1BatchEnv, ) -> anyhow::Result> { @@ -167,7 +169,7 @@ pub(super) async fn write_storage_logs(pool: ConnectionPool, insert_protec .unwrap() .expect("No L1 batches in storage"); drop(conn); - let io = Box::new(StorageWriterIo { + let io = Arc::new(StorageWriterIo { last_processed_batch: Arc::new(watch::channel(L1BatchNumber(0)).0), last_processed_block: L2BlockNumber(0), pool: pool.clone(), @@ -240,9 +242,9 @@ async fn storage_writer_works(insert_protective_reads: bool) { let batch_executor = MainBatchExecutorFactory::new(false, false); let vm_runner = VmRunner::new( pool, - Box::new(io.clone()), + io.clone(), loader, - Box::new(output_factory), + Arc::new(output_factory), Box::new(batch_executor), );