Skip to content

Commit

Permalink
refactor(vm-runner): Improve VM runner / VM playground (#2840)
Browse files Browse the repository at this point in the history
## What ❔

Various minor improvements to VM runner / VM playground:

- Get batch storage asynchronously, so that it works efficiently with
snapshot storage.
- Add metrics / logs to ensure that snapshot storage works as expected.

## Why ❔

Improves usability.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Sep 16, 2024
1 parent c957dd8 commit 9ab7200
Show file tree
Hide file tree
Showing 22 changed files with 552 additions and 304 deletions.
1 change: 1 addition & 0 deletions core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub use self::{
shadow_storage::ShadowStorage,
storage_factory::{
BatchDiff, CommonStorage, OwnedStorage, ReadStorageFactory, RocksdbWithMemory,
SnapshotStorage,
},
};

Expand Down
37 changes: 37 additions & 0 deletions core/lib/state/src/storage_factory/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::time::Duration;

use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Histogram, Metrics, Unit};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "stage", rename_all = "snake_case")]
pub(super) enum SnapshotStage {
BatchHeader,
ProtectiveReads,
TouchedSlots,
PreviousValues,
InitialWrites,
Bytecodes,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "kind", rename_all = "snake_case")]
pub(super) enum AccessKind {
ReadValue,
IsWriteInitial,
LoadFactoryDep,
GetEnumerationIndex,
}

#[derive(Debug, Metrics)]
#[metrics(prefix = "state_snapshot")]
pub(super) struct SnapshotMetrics {
/// Latency of loading a batch snapshot split by stage.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub load_latency: Family<SnapshotStage, Histogram<Duration>>,
/// Latency of accessing the fallback storage for a batch snapshot.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub fallback_access_latency: Family<AccessKind, Histogram<Duration>>,
}

#[vise::register]
pub(super) static SNAPSHOT_METRICS: vise::Global<SnapshotMetrics> = vise::Global::new();
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
};
use std::{collections::HashSet, fmt};

use anyhow::Context as _;
use async_trait::async_trait;
Expand All @@ -10,64 +7,18 @@ use zksync_dal::{Connection, ConnectionPool, Core, CoreDal};
use zksync_storage::RocksDB;
use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256};
use zksync_utils::u256_to_h256;
use zksync_vm_interface::storage::{ReadStorage, StorageSnapshot, StorageWithSnapshot};
use zksync_vm_interface::storage::{ReadStorage, StorageSnapshot};

use self::metrics::{SnapshotStage, SNAPSHOT_METRICS};
pub use self::{
rocksdb_with_memory::{BatchDiff, RocksdbWithMemory},
snapshot::SnapshotStorage,
};
use crate::{PostgresStorage, RocksdbStorage, RocksdbStorageBuilder, StateKeeperColumnFamily};

/// Storage with a static lifetime that can be sent to Tokio tasks etc.
pub type OwnedStorage = CommonStorage<'static>;

/// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param
/// (mostly for testing purposes); the default is [`OwnedStorage`].
#[async_trait]
pub trait ReadStorageFactory<S = OwnedStorage>: Debug + Send + Sync + 'static {
/// Creates a storage instance, e.g. over a Postgres connection or a RocksDB instance.
/// The specific criteria on which one are left up to the implementation.
///
/// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives
/// a stop signal; this is the only case in which `Ok(None)` should be returned.
async fn access_storage(
&self,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<S>>;
}

/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced
/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing).
#[async_trait]
impl ReadStorageFactory for ConnectionPool<Core> {
async fn access_storage(
&self,
_stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<OwnedStorage>> {
let connection = self.connection().await?;
let storage = OwnedStorage::postgres(connection, l1_batch_number).await?;
Ok(Some(storage.into()))
}
}

/// DB difference introduced by one batch.
#[derive(Debug, Clone)]
pub struct BatchDiff {
/// Storage slots touched by this batch along with new values there.
pub state_diff: HashMap<H256, H256>,
/// Initial write indices introduced by this batch.
pub enum_index_diff: HashMap<H256, u64>,
/// Factory dependencies introduced by this batch.
pub factory_dep_diff: HashMap<H256, Vec<u8>>,
}

/// A RocksDB cache instance with in-memory DB diffs that gives access to DB state at batches `N` to
/// `N + K`, where `K` is the number of diffs.
#[derive(Debug)]
pub struct RocksdbWithMemory {
/// RocksDB cache instance caught up to batch `N`.
pub rocksdb: RocksdbStorage,
/// Diffs for batches `N + 1` to `N + K`.
pub batch_diffs: Vec<BatchDiff>,
}
mod metrics;
mod rocksdb_with_memory;
mod snapshot;

/// Union of all [`ReadStorage`] implementations that are returned by [`ReadStorageFactory`], such as
/// Postgres- and RocksDB-backed storages.
Expand All @@ -83,7 +34,7 @@ pub enum CommonStorage<'a> {
/// Implementation over a RocksDB cache instance with in-memory DB diffs.
RocksdbWithMemory(RocksdbWithMemory),
/// In-memory storage snapshot with the Postgres storage fallback.
Snapshot(StorageWithSnapshot<PostgresStorage<'a>>),
Snapshot(SnapshotStorage<'a>),
/// Generic implementation. Should be used for testing purposes only since it has performance penalty because
/// of the dynamic dispatch.
Boxed(Box<dyn ReadStorage + Send + 'a>),
Expand Down Expand Up @@ -176,6 +127,7 @@ impl CommonStorage<'static> {
connection: &mut Connection<'static, Core>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<StorageSnapshot>> {
let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::BatchHeader].start();
let Some(header) = connection
.blocks_dal()
.get_l1_batch_header(l1_batch_number)
Expand All @@ -188,8 +140,10 @@ impl CommonStorage<'static> {
.into_iter()
.map(u256_to_h256)
.collect();
latency.observe();

// Check protective reads early on.
let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::ProtectiveReads].start();
let protective_reads = connection
.storage_logs_dedup_dal()
.get_protective_reads_for_l1_batch(l1_batch_number)
Expand All @@ -199,14 +153,18 @@ impl CommonStorage<'static> {
return Ok(None);
}
let protective_reads_len = protective_reads.len();
tracing::debug!("Loaded {protective_reads_len} protective reads");
let latency = latency.observe();
tracing::debug!("Loaded {protective_reads_len} protective reads in {latency:?}");

let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::TouchedSlots].start();
let touched_slots = connection
.storage_logs_dal()
.get_touched_slots_for_l1_batch(l1_batch_number)
.await?;
tracing::debug!("Loaded {} touched keys", touched_slots.len());
let latency = latency.observe();
tracing::debug!("Loaded {} touched keys in {latency:?}", touched_slots.len());

let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::PreviousValues].start();
let all_accessed_keys: Vec<_> = protective_reads
.into_iter()
.map(|key| key.hashed_key())
Expand All @@ -216,21 +174,31 @@ impl CommonStorage<'static> {
.storage_logs_dal()
.get_previous_storage_values(&all_accessed_keys, l1_batch_number)
.await?;
let latency = latency.observe();
tracing::debug!(
"Obtained {} previous values for accessed keys",
"Obtained {} previous values for accessed keys in {latency:?}",
previous_values.len()
);

let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::InitialWrites].start();
let initial_write_info = connection
.storage_logs_dal()
.get_l1_batches_and_indices_for_initial_writes(&all_accessed_keys)
.await?;
tracing::debug!("Obtained initial write info for accessed keys");
let latency = latency.observe();
tracing::debug!("Obtained initial write info for accessed keys in {latency:?}");

let latency = SNAPSHOT_METRICS.load_latency[&SnapshotStage::Bytecodes].start();
let bytecodes = connection
.factory_deps_dal()
.get_factory_deps(&bytecode_hashes)
.await;
tracing::debug!("Loaded {} bytecodes used in the batch", bytecodes.len());
let latency = latency.observe();
tracing::debug!(
"Loaded {} bytecodes used in the batch in {latency:?}",
bytecodes.len()
);

let factory_deps = bytecodes
.into_iter()
.map(|(hash_u256, words)| {
Expand All @@ -256,54 +224,6 @@ impl CommonStorage<'static> {
}
}

impl ReadStorage for RocksdbWithMemory {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
let hashed_key = key.hashed_key();
match self
.batch_diffs
.iter()
.rev()
.find_map(|b| b.state_diff.get(&hashed_key))
{
None => self.rocksdb.read_value(key),
Some(value) => *value,
}
}

fn is_write_initial(&mut self, key: &StorageKey) -> bool {
match self
.batch_diffs
.iter()
.find_map(|b| b.enum_index_diff.get(&key.hashed_key()))
{
None => self.rocksdb.is_write_initial(key),
Some(_) => false,
}
}

fn load_factory_dep(&mut self, hash: H256) -> Option<Vec<u8>> {
match self
.batch_diffs
.iter()
.find_map(|b| b.factory_dep_diff.get(&hash))
{
None => self.rocksdb.load_factory_dep(hash),
Some(value) => Some(value.clone()),
}
}

fn get_enumeration_index(&mut self, key: &StorageKey) -> Option<u64> {
match self
.batch_diffs
.iter()
.find_map(|b| b.enum_index_diff.get(&key.hashed_key()))
{
None => self.rocksdb.get_enumeration_index(key),
Some(value) => Some(*value),
}
}
}

impl ReadStorage for CommonStorage<'_> {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
match self {
Expand Down Expand Up @@ -358,8 +278,42 @@ impl From<RocksdbStorage> for CommonStorage<'_> {
}
}

impl<'a> From<StorageWithSnapshot<PostgresStorage<'a>>> for CommonStorage<'a> {
fn from(value: StorageWithSnapshot<PostgresStorage<'a>>) -> Self {
impl<'a> From<SnapshotStorage<'a>> for CommonStorage<'a> {
fn from(value: SnapshotStorage<'a>) -> Self {
Self::Snapshot(value)
}
}

/// Storage with a static lifetime that can be sent to Tokio tasks etc.
pub type OwnedStorage = CommonStorage<'static>;

/// Factory that can produce storage instances on demand. The storage type is encapsulated as a type param
/// (mostly for testing purposes); the default is [`OwnedStorage`].
#[async_trait]
pub trait ReadStorageFactory<S = OwnedStorage>: fmt::Debug + Send + Sync + 'static {
/// Creates a storage instance, e.g. over a Postgres connection or a RocksDB instance.
/// The specific criteria on which one are left up to the implementation.
///
/// Implementations may be cancel-aware and return `Ok(None)` iff `stop_receiver` receives
/// a stop signal; this is the only case in which `Ok(None)` should be returned.
async fn access_storage(
&self,
stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<S>>;
}

/// [`ReadStorageFactory`] producing Postgres-backed storage instances. Hence, it is slower than more advanced
/// alternatives with RocksDB caches and should be used sparingly (e.g., for testing).
#[async_trait]
impl ReadStorageFactory for ConnectionPool<Core> {
async fn access_storage(
&self,
_stop_receiver: &watch::Receiver<bool>,
l1_batch_number: L1BatchNumber,
) -> anyhow::Result<Option<OwnedStorage>> {
let connection = self.connection().await?;
let storage = OwnedStorage::postgres(connection, l1_batch_number).await?;
Ok(Some(storage.into()))
}
}
75 changes: 75 additions & 0 deletions core/lib/state/src/storage_factory/rocksdb_with_memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::collections::HashMap;

use zksync_types::{StorageKey, StorageValue, H256};
use zksync_vm_interface::storage::ReadStorage;

use crate::RocksdbStorage;

/// DB difference introduced by one batch.
#[derive(Debug, Clone)]
pub struct BatchDiff {
/// Storage slots touched by this batch along with new values there.
pub state_diff: HashMap<H256, H256>,
/// Initial write indices introduced by this batch.
pub enum_index_diff: HashMap<H256, u64>,
/// Factory dependencies introduced by this batch.
pub factory_dep_diff: HashMap<H256, Vec<u8>>,
}

/// A RocksDB cache instance with in-memory DB diffs that gives access to DB state at batches `N` to
/// `N + K`, where `K` is the number of diffs.
#[derive(Debug)]
pub struct RocksdbWithMemory {
/// RocksDB cache instance caught up to batch `N`.
pub rocksdb: RocksdbStorage,
/// Diffs for batches `N + 1` to `N + K`.
pub batch_diffs: Vec<BatchDiff>,
}

impl ReadStorage for RocksdbWithMemory {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
let hashed_key = key.hashed_key();
match self
.batch_diffs
.iter()
.rev()
.find_map(|b| b.state_diff.get(&hashed_key))
{
None => self.rocksdb.read_value(key),
Some(value) => *value,
}
}

fn is_write_initial(&mut self, key: &StorageKey) -> bool {
match self
.batch_diffs
.iter()
.find_map(|b| b.enum_index_diff.get(&key.hashed_key()))
{
None => self.rocksdb.is_write_initial(key),
Some(_) => false,
}
}

fn load_factory_dep(&mut self, hash: H256) -> Option<Vec<u8>> {
match self
.batch_diffs
.iter()
.find_map(|b| b.factory_dep_diff.get(&hash))
{
None => self.rocksdb.load_factory_dep(hash),
Some(value) => Some(value.clone()),
}
}

fn get_enumeration_index(&mut self, key: &StorageKey) -> Option<u64> {
match self
.batch_diffs
.iter()
.find_map(|b| b.enum_index_diff.get(&key.hashed_key()))
{
None => self.rocksdb.get_enumeration_index(key),
Some(value) => Some(*value),
}
}
}
Loading

0 comments on commit 9ab7200

Please sign in to comment.