From e291ebcfddd047a75c4b6ac9586127d5e3f292ff Mon Sep 17 00:00:00 2001 From: Lech <88630083+Artemka374@users.noreply.github.com> Date: Fri, 8 Mar 2024 20:02:58 +0200 Subject: [PATCH] rename traits, types, fix some imports --- Cargo.lock | 3 ++ core/bin/contract-verifier/src/verifier.rs | 6 +-- core/bin/snapshots_creator/src/creator.rs | 6 +-- core/bin/snapshots_creator/src/tests.rs | 8 +-- core/lib/dal/src/blocks_dal.rs | 2 +- core/lib/dal/src/blocks_web3_dal.rs | 2 +- core/lib/dal/src/consensus_dal.rs | 2 +- core/lib/dal/src/contract_verification_dal.rs | 2 +- core/lib/dal/src/eth_sender_dal.rs | 2 +- core/lib/dal/src/events_dal.rs | 2 +- core/lib/dal/src/events_web3_dal.rs | 2 +- core/lib/dal/src/factory_deps_dal.rs | 2 +- core/lib/dal/src/lib.rs | 11 ++-- core/lib/dal/src/metrics.rs | 52 ------------------- core/lib/dal/src/proof_generation_dal.rs | 2 +- core/lib/dal/src/protocol_versions_dal.rs | 2 +- .../lib/dal/src/protocol_versions_web3_dal.rs | 2 +- core/lib/dal/src/snapshot_recovery_dal.rs | 2 +- core/lib/dal/src/snapshots_dal.rs | 2 +- core/lib/dal/src/storage_dal.rs | 2 +- core/lib/dal/src/storage_logs_dal.rs | 2 +- core/lib/dal/src/storage_logs_dedup_dal.rs | 2 +- core/lib/dal/src/storage_web3_dal.rs | 2 +- core/lib/dal/src/tokens_dal.rs | 2 +- core/lib/dal/src/tokens_web3_dal.rs | 2 +- core/lib/dal/src/transactions_dal.rs | 2 +- core/lib/dal/src/transactions_web3_dal.rs | 2 +- core/lib/db_connection/Cargo.toml | 3 ++ core/lib/db_connection/src/connection.rs | 7 +-- core/lib/db_connection/src/instrument.rs | 24 +++++---- core/lib/db_connection/src/metrics.rs | 51 ++++++++++++++++-- core/lib/db_connection/src/processor.rs | 45 ++++++++-------- core/lib/snapshots_applier/src/lib.rs | 10 ++-- core/lib/state/src/postgres/mod.rs | 10 ++-- core/lib/state/src/rocksdb/mod.rs | 14 ++--- core/lib/state/src/rocksdb/recovery.rs | 12 ++--- core/lib/state/src/test_utils.rs | 10 ++-- core/lib/vm_utils/src/lib.rs | 4 +- core/lib/vm_utils/src/storage.rs | 16 +++--- .../src/api_server/execution_sandbox/apply.rs | 10 ++-- .../src/api_server/execution_sandbox/mod.rs | 12 +++-- .../api_server/execution_sandbox/validate.rs | 4 +- .../src/api_server/tx_sender/mod.rs | 4 +- .../src/api_server/web3/namespaces/zks.rs | 6 +-- .../zksync_core/src/api_server/web3/state.rs | 8 +-- .../src/api_server/web3/tests/mod.rs | 10 ++-- .../zksync_core/src/consensus/storage/mod.rs | 2 +- .../src/consistency_checker/mod.rs | 4 +- .../src/consistency_checker/tests/mod.rs | 4 +- .../zksync_core/src/eth_sender/aggregator.rs | 18 +++---- .../src/eth_sender/eth_tx_aggregator.rs | 10 ++-- .../src/eth_sender/eth_tx_manager.rs | 28 +++++----- .../lib/zksync_core/src/eth_sender/metrics.rs | 4 +- .../src/eth_sender/publish_criterion.rs | 14 ++--- core/lib/zksync_core/src/eth_sender/tests.rs | 4 +- .../event_processors/governance_upgrades.rs | 4 +- .../src/eth_watch/event_processors/mod.rs | 4 +- .../event_processors/priority_ops.rs | 4 +- .../eth_watch/event_processors/upgrades.rs | 4 +- core/lib/zksync_core/src/eth_watch/mod.rs | 9 ++-- core/lib/zksync_core/src/eth_watch/tests.rs | 4 +- core/lib/zksync_core/src/genesis.rs | 16 +++--- .../src/metadata_calculator/helpers.rs | 4 +- .../src/metadata_calculator/recovery/mod.rs | 4 +- .../src/metadata_calculator/tests.rs | 10 ++-- .../src/metadata_calculator/updater.rs | 8 +-- .../zksync_core/src/reorg_detector/tests.rs | 6 +-- .../src/state_keeper/io/common/mod.rs | 6 +-- .../state_keeper/io/fee_address_migration.rs | 6 +-- .../src/state_keeper/io/seal_logic.rs | 8 +-- .../src/state_keeper/mempool_actor.rs | 4 +- .../lib/zksync_core/src/state_keeper/types.rs | 7 ++- .../sync_layer/batch_status_updater/mod.rs | 4 +- .../lib/zksync_core/src/sync_layer/fetcher.rs | 6 ++- .../lib/zksync_core/src/sync_layer/genesis.rs | 4 +- core/lib/zksync_core/src/sync_layer/tests.rs | 4 +- core/lib/zksync_core/src/utils/mod.rs | 6 +-- core/lib/zksync_core/src/utils/testonly.rs | 4 +- prover/Cargo.lock | 5 ++ prover/prover_dal/Cargo.toml | 3 ++ .../src/fri_gpu_prover_queue_dal.rs | 2 + .../src/fri_proof_compressor_dal.rs | 1 + .../src/fri_protocol_versions_dal.rs | 1 + prover/prover_dal/src/fri_prover_dal.rs | 2 +- .../fri_scheduler_dependency_tracker_dal.rs | 1 + .../src/fri_witness_generator_dal.rs | 1 + prover/prover_dal/src/lib.rs | 8 +-- 87 files changed, 331 insertions(+), 305 deletions(-) delete mode 100644 core/lib/dal/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 5f2acf18c0ae..472544b6bc70 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8351,12 +8351,15 @@ dependencies = [ "anyhow", "assert_matches", "rand 0.8.5", + "serde", + "serde_json", "sqlx", "tokio", "tracing", "url", "vise", "zksync_health_check", + "zksync_types", ] [[package]] diff --git a/core/bin/contract-verifier/src/verifier.rs b/core/bin/contract-verifier/src/verifier.rs index ab9451240f78..9adbf0ce3d2a 100644 --- a/core/bin/contract-verifier/src/verifier.rs +++ b/core/bin/contract-verifier/src/verifier.rs @@ -12,7 +12,7 @@ use lazy_static::lazy_static; use regex::Regex; use tokio::time; use zksync_config::ContractVerifierConfig; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_env_config::FromEnv; use zksync_queued_job_processor::{async_trait, JobProcessor}; use zksync_types::{ @@ -54,7 +54,7 @@ impl ContractVerifier { } async fn verify( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, mut request: VerificationRequest, config: ContractVerifierConfig, ) -> Result { @@ -429,7 +429,7 @@ impl ContractVerifier { } async fn process_result( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, request_id: usize, verification_result: Result, ) { diff --git a/core/bin/snapshots_creator/src/creator.rs b/core/bin/snapshots_creator/src/creator.rs index 0e716b5c99e1..505ff24dca47 100644 --- a/core/bin/snapshots_creator/src/creator.rs +++ b/core/bin/snapshots_creator/src/creator.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::Context as _; use tokio::sync::Semaphore; use zksync_config::SnapshotsCreatorConfig; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_object_store::ObjectStore; use zksync_types::{ snapshots::{ @@ -67,7 +67,7 @@ pub(crate) struct SnapshotCreator { } impl SnapshotCreator { - async fn connect_to_replica(&self) -> anyhow::Result> { + async fn connect_to_replica(&self) -> anyhow::Result> { self.replica_pool .access_storage_tagged("snapshots_creator") .await @@ -192,7 +192,7 @@ impl SnapshotCreator { config: &SnapshotsCreatorConfig, min_chunk_count: u64, latest_snapshot: Option<&SnapshotMetadata>, - conn: &mut StorageProcessor<'_>, + conn: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result> { // We subtract 1 so that after restore, EN node has at least one L1 batch to fetch let sealed_l1_batch_number = conn.blocks_dal().get_sealed_l1_batch_number().await?; diff --git a/core/bin/snapshots_creator/src/tests.rs b/core/bin/snapshots_creator/src/tests.rs index b12002e945e6..67ca41071685 100644 --- a/core/bin/snapshots_creator/src/tests.rs +++ b/core/bin/snapshots_creator/src/tests.rs @@ -10,7 +10,7 @@ use std::{ }; use rand::{thread_rng, Rng}; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_object_store::ObjectStore; use zksync_types::{ block::{L1BatchHeader, MiniblockHeader}, @@ -132,7 +132,7 @@ struct ExpectedOutputs { } async fn create_miniblock( - conn: &mut StorageProcessor<'_>, + conn: &mut BasicStorageProcessor<'_>, miniblock_number: MiniblockNumber, block_logs: Vec, ) { @@ -162,7 +162,7 @@ async fn create_miniblock( } async fn create_l1_batch( - conn: &mut StorageProcessor<'_>, + conn: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, logs_for_initial_writes: &[StorageLog], ) { @@ -186,7 +186,7 @@ async fn create_l1_batch( async fn prepare_postgres( rng: &mut impl Rng, - conn: &mut StorageProcessor<'_>, + conn: &mut BasicStorageProcessor<'_>, block_count: u32, ) -> ExpectedOutputs { conn.protocol_versions_dal() diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index 3493d490abb4..919195edbfae 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::Context as _; use bigdecimal::{BigDecimal, FromPrimitive, ToPrimitive}; use zksync_db_connection::{ - instrument::InstrumentExt, match_query_as, processor::StorageInteraction, + instrument::InstrumentExt, match_query_as, processor::StorageProcessor, }; use zksync_types::{ aggregated_operations::AggregatedActionType, diff --git a/core/lib/dal/src/blocks_web3_dal.rs b/core/lib/dal/src/blocks_web3_dal.rs index a59f26954124..fd6b211cab4e 100644 --- a/core/lib/dal/src/blocks_web3_dal.rs +++ b/core/lib/dal/src/blocks_web3_dal.rs @@ -1,5 +1,5 @@ use zksync_db_connection::{ - instrument::InstrumentExt, match_query_as, processor::StorageInteraction, + instrument::InstrumentExt, match_query_as, processor::StorageProcessor, }; use zksync_system_constants::EMPTY_UNCLES_HASH; use zksync_types::{ diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 54267e527334..5321173d175a 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -1,7 +1,7 @@ use anyhow::Context as _; use zksync_consensus_roles::validator; use zksync_consensus_storage::ReplicaState; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::MiniblockNumber; pub use crate::models::consensus::Payload; diff --git a/core/lib/dal/src/contract_verification_dal.rs b/core/lib/dal/src/contract_verification_dal.rs index d821b0c277d7..0bc3777d5bd5 100644 --- a/core/lib/dal/src/contract_verification_dal.rs +++ b/core/lib/dal/src/contract_verification_dal.rs @@ -5,7 +5,7 @@ use std::{ use anyhow::Context as _; use sqlx::postgres::types::PgInterval; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{ contract_verification_api::{ DeployContractCalldata, VerificationIncomingRequest, VerificationInfo, VerificationRequest, diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index 1097e518890e..80cfc61928a2 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -2,7 +2,7 @@ use std::{convert::TryFrom, str::FromStr}; use anyhow::Context as _; use sqlx::types::chrono::{DateTime, Utc}; -use zksync_db_connection::{match_query_as, processor::StorageInteraction}; +use zksync_db_connection::{match_query_as, processor::StorageProcessor}; use zksync_types::{ aggregated_operations::AggregatedActionType, eth_sender::{EthTx, EthTxBlobSidecar, TxHistory, TxHistoryToSend}, diff --git a/core/lib/dal/src/events_dal.rs b/core/lib/dal/src/events_dal.rs index 122c43d1b854..8b70e87674f1 100644 --- a/core/lib/dal/src/events_dal.rs +++ b/core/lib/dal/src/events_dal.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, fmt}; use sqlx::types::chrono::Utc; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_system_constants::L1_MESSENGER_ADDRESS; use zksync_types::{ api, diff --git a/core/lib/dal/src/events_web3_dal.rs b/core/lib/dal/src/events_web3_dal.rs index ee3ee3779d95..033cc49922e5 100644 --- a/core/lib/dal/src/events_web3_dal.rs +++ b/core/lib/dal/src/events_web3_dal.rs @@ -3,7 +3,7 @@ use sqlx::{ query::{Query, QueryAs}, Postgres, Row, }; -use zksync_db_connection::instrument::InstrumentExt; +use zksync_db_connection::{instrument::InstrumentExt, processor::StorageProcessor}; use zksync_types::{ api::{GetLogsFilter, Log}, Address, MiniblockNumber, H256, diff --git a/core/lib/dal/src/factory_deps_dal.rs b/core/lib/dal/src/factory_deps_dal.rs index 69c20321ad6d..a260d30ad0a4 100644 --- a/core/lib/dal/src/factory_deps_dal.rs +++ b/core/lib/dal/src/factory_deps_dal.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; use anyhow::Context as _; use zksync_contracts::{BaseSystemContracts, SystemContractCode}; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{MiniblockNumber, H256, U256}; use zksync_utils::{bytes_to_be_words, bytes_to_chunks}; diff --git a/core/lib/dal/src/lib.rs b/core/lib/dal/src/lib.rs index f05a9943e38d..9a9c538f7a7a 100644 --- a/core/lib/dal/src/lib.rs +++ b/core/lib/dal/src/lib.rs @@ -1,12 +1,10 @@ //! Data access layer (DAL) for zkSync Era. -use std::time::Instant; - use sqlx::{pool::PoolConnection, PgConnection, Postgres}; pub use sqlx::{types::BigDecimal, Error as SqlxError}; pub use zksync_db_connection::connection::ConnectionPool; use zksync_db_connection::processor::{ - StorageInteraction, StorageKind, StorageProcessor, StorageProcessorTags, TracedConnections, + BasicStorageProcessor, StorageKind, StorageProcessor, StorageProcessorTags, TracedConnections, }; use crate::{ @@ -33,7 +31,6 @@ pub mod eth_sender_dal; pub mod events_dal; pub mod events_web3_dal; pub mod factory_deps_dal; -mod metrics; mod models; pub mod proof_generation_dal; pub mod protocol_versions_dal; @@ -58,13 +55,13 @@ mod tests; pub struct Server(()); -pub struct ServerProcessor<'a>(StorageProcessor<'a>); +pub struct ServerProcessor<'a>(BasicStorageProcessor<'a>); impl StorageKind for Server { type Processor<'a> = ServerProcessor<'a>; } -impl<'a> StorageInteraction for ServerProcessor<'a> { +impl<'a> StorageProcessor for ServerProcessor<'a> { async fn start_transaction(&mut self) -> sqlx::Result> { self.0.start_transaction() } @@ -83,7 +80,7 @@ impl<'a> StorageInteraction for ServerProcessor<'a> { tags: Option, traced_connections: Option<&'a TracedConnections>, ) -> Self { - Self(StorageProcessor::from_pool( + Self(BasicStorageProcessor::from_pool( connection, tags, traced_connections, diff --git a/core/lib/dal/src/metrics.rs b/core/lib/dal/src/metrics.rs deleted file mode 100644 index 45eeb9448dbb..000000000000 --- a/core/lib/dal/src/metrics.rs +++ /dev/null @@ -1,52 +0,0 @@ -//! Metrics for the data access layer. - -use std::{thread, time::Duration}; - -use anyhow::Context as _; -use vise::{ - Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, LabeledFamily, - LatencyObserver, Metrics, Unit, -}; - -use crate::ConnectionPool; - -/// Request-related DB metrics. -#[derive(Debug, Metrics)] -#[metrics(prefix = "sql")] -pub(crate) struct RequestMetrics { - /// Latency of a DB request. - #[metrics(buckets = Buckets::LATENCIES, labels = ["method"])] - pub request: LabeledFamily<&'static str, Histogram>, - /// Counter of slow DB requests. - #[metrics(labels = ["method"])] - pub request_slow: LabeledFamily<&'static str, Counter>, - /// Counter of errored DB requests. - #[metrics(labels = ["method"])] - pub request_error: LabeledFamily<&'static str, Counter>, -} - -#[vise::register] -pub(crate) static REQUEST_METRICS: vise::Global = vise::Global::new(); - -/// Reporter of latency for DAL methods consisting of multiple DB queries. If there's a single query, -/// use `.instrument().report_latency()` on it instead. -/// -/// Should be created at the start of the relevant method and dropped when the latency needs to be reported. -#[derive(Debug)] -pub(crate) struct MethodLatency(Option>); - -impl MethodLatency { - pub fn new(name: &'static str) -> Self { - Self(Some(REQUEST_METRICS.request[&name].start())) - } -} - -impl Drop for MethodLatency { - fn drop(&mut self) { - if !thread::panicking() { - let observer = self.0.take().unwrap(); - // `unwrap()` is safe; the observer is only taken out on drop - observer.observe(); - } - } -} diff --git a/core/lib/dal/src/proof_generation_dal.rs b/core/lib/dal/src/proof_generation_dal.rs index b27039e17e4e..7c9c7a308adc 100644 --- a/core/lib/dal/src/proof_generation_dal.rs +++ b/core/lib/dal/src/proof_generation_dal.rs @@ -1,7 +1,7 @@ use std::time::Duration; use strum::{Display, EnumString}; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::L1BatchNumber; use crate::{time_utils::pg_interval_from_duration, ServerProcessor, SqlxError}; diff --git a/core/lib/dal/src/protocol_versions_dal.rs b/core/lib/dal/src/protocol_versions_dal.rs index 6e2945c869db..bd461907527b 100644 --- a/core/lib/dal/src/protocol_versions_dal.rs +++ b/core/lib/dal/src/protocol_versions_dal.rs @@ -2,7 +2,7 @@ use std::convert::TryInto; use anyhow::Context as _; use zksync_contracts::{BaseSystemContracts, BaseSystemContractsHashes}; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{ protocol_version::{L1VerifierConfig, ProtocolUpgradeTx, ProtocolVersion, VerifierParams}, Address, ProtocolVersionId, H256, diff --git a/core/lib/dal/src/protocol_versions_web3_dal.rs b/core/lib/dal/src/protocol_versions_web3_dal.rs index df23ce6c20a5..659ca01a25b5 100644 --- a/core/lib/dal/src/protocol_versions_web3_dal.rs +++ b/core/lib/dal/src/protocol_versions_web3_dal.rs @@ -1,4 +1,4 @@ -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::api::ProtocolVersion; use crate::{models::storage_protocol_version::StorageProtocolVersion, ServerProcessor}; diff --git a/core/lib/dal/src/snapshot_recovery_dal.rs b/core/lib/dal/src/snapshot_recovery_dal.rs index 314642833810..ac572acca0e3 100644 --- a/core/lib/dal/src/snapshot_recovery_dal.rs +++ b/core/lib/dal/src/snapshot_recovery_dal.rs @@ -1,4 +1,4 @@ -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{ snapshots::SnapshotRecoveryStatus, L1BatchNumber, MiniblockNumber, ProtocolVersionId, H256, }; diff --git a/core/lib/dal/src/snapshots_dal.rs b/core/lib/dal/src/snapshots_dal.rs index 6f59c9f5978c..98d0aad5d000 100644 --- a/core/lib/dal/src/snapshots_dal.rs +++ b/core/lib/dal/src/snapshots_dal.rs @@ -1,4 +1,4 @@ -use zksync_db_connection::{instrument::InstrumentExt, processor::StorageInteraction}; +use zksync_db_connection::{instrument::InstrumentExt, processor::StorageProcessor}; use zksync_types::{ snapshots::{AllSnapshots, SnapshotMetadata}, L1BatchNumber, diff --git a/core/lib/dal/src/storage_dal.rs b/core/lib/dal/src/storage_dal.rs index 0fc8e61d95cf..bc4562debf17 100644 --- a/core/lib/dal/src/storage_dal.rs +++ b/core/lib/dal/src/storage_dal.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use itertools::Itertools; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{StorageKey, StorageLog, StorageValue, H256}; use crate::ServerProcessor; diff --git a/core/lib/dal/src/storage_logs_dal.rs b/core/lib/dal/src/storage_logs_dal.rs index 4d1ded760039..ba141b28acba 100644 --- a/core/lib/dal/src/storage_logs_dal.rs +++ b/core/lib/dal/src/storage_logs_dal.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, ops, time::Instant}; use sqlx::{types::chrono::Utc, Row}; -use zksync_db_connection::{instrument::InstrumentExt, processor::StorageInteraction}; +use zksync_db_connection::{instrument::InstrumentExt, processor::StorageProcessor}; use zksync_types::{ get_code_key, snapshots::SnapshotStorageLog, AccountTreeId, Address, L1BatchNumber, MiniblockNumber, StorageKey, StorageLog, FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH, H160, H256, diff --git a/core/lib/dal/src/storage_logs_dedup_dal.rs b/core/lib/dal/src/storage_logs_dedup_dal.rs index 12b8d3aa0a36..e79fb1588904 100644 --- a/core/lib/dal/src/storage_logs_dedup_dal.rs +++ b/core/lib/dal/src/storage_logs_dedup_dal.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use sqlx::types::chrono::Utc; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{ snapshots::SnapshotStorageLog, zk_evm_types::LogQuery, AccountTreeId, Address, L1BatchNumber, StorageKey, H256, diff --git a/core/lib/dal/src/storage_web3_dal.rs b/core/lib/dal/src/storage_web3_dal.rs index 7b84ee41a7e4..ff77359d1a74 100644 --- a/core/lib/dal/src/storage_web3_dal.rs +++ b/core/lib/dal/src/storage_web3_dal.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, ops}; -use zksync_db_connection::{instrument::InstrumentExt, processor::StorageInteraction}; +use zksync_db_connection::{instrument::InstrumentExt, processor::StorageProcessor}; use zksync_types::{ get_code_key, get_nonce_key, utils::{decompose_full_nonce, storage_key_for_standard_token_balance}, diff --git a/core/lib/dal/src/tokens_dal.rs b/core/lib/dal/src/tokens_dal.rs index 1c1d38030989..12df6d5c3b6e 100644 --- a/core/lib/dal/src/tokens_dal.rs +++ b/core/lib/dal/src/tokens_dal.rs @@ -1,5 +1,5 @@ use sqlx::types::chrono::Utc; -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{tokens::TokenInfo, Address, MiniblockNumber}; use crate::ServerProcessor; diff --git a/core/lib/dal/src/tokens_web3_dal.rs b/core/lib/dal/src/tokens_web3_dal.rs index 6a2adc7dfc7e..3ffd8782887a 100644 --- a/core/lib/dal/src/tokens_web3_dal.rs +++ b/core/lib/dal/src/tokens_web3_dal.rs @@ -1,4 +1,4 @@ -use zksync_db_connection::processor::StorageInteraction; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{ tokens::{TokenInfo, TokenMetadata}, Address, MiniblockNumber, diff --git a/core/lib/dal/src/transactions_dal.rs b/core/lib/dal/src/transactions_dal.rs index 62682c21e9d1..f50ca2eb94b4 100644 --- a/core/lib/dal/src/transactions_dal.rs +++ b/core/lib/dal/src/transactions_dal.rs @@ -4,7 +4,7 @@ use anyhow::Context as _; use bigdecimal::BigDecimal; use itertools::Itertools; use sqlx::{error, types::chrono::NaiveDateTime}; -use zksync_db_connection::{instrument::InstrumentExt, processor::StorageInteraction}; +use zksync_db_connection::{instrument::InstrumentExt, processor::StorageProcessor}; use zksync_types::{ block::MiniblockExecutionData, fee::TransactionExecutionMetrics, diff --git a/core/lib/dal/src/transactions_web3_dal.rs b/core/lib/dal/src/transactions_web3_dal.rs index 127c5f24d4d8..affe2b7f3976 100644 --- a/core/lib/dal/src/transactions_web3_dal.rs +++ b/core/lib/dal/src/transactions_web3_dal.rs @@ -1,6 +1,6 @@ use sqlx::types::chrono::NaiveDateTime; use zksync_db_connection::{ - instrument::InstrumentExt, match_query_as, processor::StorageInteraction, + instrument::InstrumentExt, match_query_as, processor::StorageProcessor, }; use zksync_types::{ api, api::TransactionReceipt, Address, L2ChainId, MiniblockNumber, Transaction, diff --git a/core/lib/db_connection/Cargo.toml b/core/lib/db_connection/Cargo.toml index 1a0ed542163a..800ef40af3bb 100644 --- a/core/lib/db_connection/Cargo.toml +++ b/core/lib/db_connection/Cargo.toml @@ -11,7 +11,10 @@ categories = ["cryptography"] [dependencies] zksync_health_check = { path = "../health_check" } +zksync_types = { path = "../types" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" sqlx = { version = "0.7.3", default-features = false, features = [ "runtime-tokio", "tls-native-tls", diff --git a/core/lib/db_connection/src/connection.rs b/core/lib/db_connection/src/connection.rs index 2cb52e139373..8ee9935e0893 100644 --- a/core/lib/db_connection/src/connection.rs +++ b/core/lib/db_connection/src/connection.rs @@ -20,7 +20,8 @@ use sqlx::{ use crate::{ metrics::{PostgresMetrics, CONNECTION_METRICS}, processor::{ - StorageInteraction, StorageKind, StorageProcessor, StorageProcessorTags, TracedConnections, + BasicStorageProcessor, StorageKind, StorageProcessor, StorageProcessorTags, + TracedConnections, }, }; @@ -273,14 +274,14 @@ impl ConnectionPool { /// /// Test pools trace their active connections. If acquiring a connection fails (e.g., with a timeout), /// the returned error will contain information on all active connections. - pub async fn test_pool() -> ConnectionPool { + pub async fn test_pool() -> ConnectionPool { const DEFAULT_CONNECTIONS: u32 = 50; // Expected to be enough for any unit test. Self::constrained_test_pool(DEFAULT_CONNECTIONS).await } /// Same as [`Self::test_pool()`], but with a configurable number of connections. This is useful to test /// behavior of components that rely on singleton / constrained pools in production. - pub async fn constrained_test_pool(connections: u32) -> ConnectionPool { + pub async fn constrained_test_pool(connections: u32) -> ConnectionPool { assert!(connections > 0, "Number of connections must be positive"); let mut builder = TestTemplate::empty() .expect("failed creating test template") diff --git a/core/lib/db_connection/src/instrument.rs b/core/lib/db_connection/src/instrument.rs index 6504338c790d..1eb21b67f269 100644 --- a/core/lib/db_connection/src/instrument.rs +++ b/core/lib/db_connection/src/instrument.rs @@ -21,8 +21,9 @@ use sqlx::{ use tokio::time::Instant; use crate::{ - connection::{ConnectionPool, StorageProcessor, StorageProcessorTags}, + connection::ConnectionPool, metrics::REQUEST_METRICS, + processor::{StorageProcessor, StorageProcessorTags}, }; type ThreadSafeDebug<'a> = dyn fmt::Debug + Send + Sync + 'a; @@ -219,15 +220,18 @@ where A: 'q + IntoArguments<'q, Postgres>, { /// Executes an SQL statement using this query. - pub async fn execute(self, storage: &mut StorageProcessor<'_>) -> sqlx::Result { + pub async fn execute( + self, + storage: &mut SP, + ) -> sqlx::Result { let (conn, tags) = storage.conn_and_tags(); self.data.fetch(tags, self.query.execute(conn)).await } /// Fetches an optional row using this query. - pub async fn fetch_optional( + pub async fn fetch_optional( self, - storage: &mut StorageProcessor<'_>, + storage: &mut SP, ) -> Result, sqlx::Error> { let (conn, tags) = storage.conn_and_tags(); self.data.fetch(tags, self.query.fetch_optional(conn)).await @@ -240,7 +244,7 @@ where O: Send + Unpin + for<'r> FromRow<'r, PgRow>, { /// Fetches all rows using this query and collects them into a `Vec`. - pub async fn fetch_all(self, storage: &mut StorageProcessor<'_>) -> sqlx::Result> { + pub async fn fetch_all(self, storage: &mut SP) -> sqlx::Result> { let (conn, tags) = storage.conn_and_tags(); self.data.fetch(tags, self.query.fetch_all(conn)).await } @@ -253,22 +257,22 @@ where A: 'q + Send + IntoArguments<'q, Postgres>, { /// Fetches an optional row using this query. - pub async fn fetch_optional( + pub async fn fetch_optional( self, - storage: &mut StorageProcessor<'_>, + storage: &mut SP, ) -> sqlx::Result> { let (conn, tags) = storage.conn_and_tags(); self.data.fetch(tags, self.query.fetch_optional(conn)).await } /// Fetches a single row using this query. - pub async fn fetch_one(self, storage: &mut StorageProcessor<'_>) -> sqlx::Result { + pub async fn fetch_one(self, storage: &mut SP) -> sqlx::Result { let (conn, tags) = storage.conn_and_tags(); self.data.fetch(tags, self.query.fetch_one(conn)).await } /// Fetches all rows using this query and collects them into a `Vec`. - pub async fn fetch_all(self, storage: &mut StorageProcessor<'_>) -> sqlx::Result> { + pub async fn fetch_all(self, storage: &mut SP) -> sqlx::Result> { let (conn, tags) = storage.conn_and_tags(); self.data.fetch(tags, self.query.fetch_all(conn)).await } @@ -279,7 +283,7 @@ mod tests { use zksync_types::{MiniblockNumber, H256}; use super::*; - use crate::ConnectionPool; + use crate::connection::ConnectionPool; #[tokio::test] async fn instrumenting_erroneous_query() { diff --git a/core/lib/db_connection/src/metrics.rs b/core/lib/db_connection/src/metrics.rs index 235f9214d547..b631a1a12ce3 100644 --- a/core/lib/db_connection/src/metrics.rs +++ b/core/lib/db_connection/src/metrics.rs @@ -1,10 +1,50 @@ -use std::time::Duration; +use std::{thread, time::Duration}; +use anyhow::Context as _; use vise::{ Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, LabeledFamily, - Metrics, + LatencyObserver, Metrics, Unit, }; +use crate::{connection::ConnectionPool, processor::StorageKind}; + +/// Request-related DB metrics. +#[derive(Debug, Metrics)] +#[metrics(prefix = "sql")] +pub(crate) struct RequestMetrics { + /// Latency of a DB request. + #[metrics(buckets = Buckets::LATENCIES, labels = ["method"])] + pub request: LabeledFamily<&'static str, Histogram>, + /// Counter of slow DB requests. + #[metrics(labels = ["method"])] + pub request_slow: LabeledFamily<&'static str, Counter>, + /// Counter of errored DB requests. + #[metrics(labels = ["method"])] + pub request_error: LabeledFamily<&'static str, Counter>, +} +#[vise::register] +pub(crate) static REQUEST_METRICS: vise::Global = vise::Global::new(); +/// Reporter of latency for DAL methods consisting of multiple DB queries. If there's a single query, +/// use `.instrument().report_latency()` on it instead. +/// +/// Should be created at the start of the relevant method and dropped when the latency needs to be reported. +#[derive(Debug)] +pub(crate) struct MethodLatency(Option>); +impl MethodLatency { + pub fn new(name: &'static str) -> Self { + Self(Some(REQUEST_METRICS.request[&name].start())) + } +} +impl Drop for MethodLatency { + fn drop(&mut self) { + if !thread::panicking() { + let observer = self.0.take().unwrap(); + // `unwrap()` is safe; the observer is only taken out on drop + observer.observe(); + } + } +} + /// Kind of a connection error. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)] #[metrics(label = "kind", rename_all = "snake_case")] @@ -75,7 +115,10 @@ pub(crate) struct PostgresMetrics { static POSTGRES_METRICS: vise::Global = vise::Global::new(); impl PostgresMetrics { - pub(crate) async fn run_scraping(pool: ConnectionPool, scrape_interval: Duration) { + pub(crate) async fn run_scraping( + pool: ConnectionPool, + scrape_interval: Duration, + ) { let scrape_timeout = Duration::from_secs(1).min(scrape_interval / 2); loop { match tokio::time::timeout(scrape_timeout, Self::scrape(&pool)).await { @@ -91,7 +134,7 @@ impl PostgresMetrics { } } - async fn scrape(pool: &ConnectionPool) -> anyhow::Result<()> { + async fn scrape(pool: &ConnectionPool) -> anyhow::Result<()> { let mut storage = pool .access_storage_tagged("postgres_metrics") .await diff --git a/core/lib/db_connection/src/processor.rs b/core/lib/db_connection/src/processor.rs index df3e0fcb6fe7..a40aeda625c6 100644 --- a/core/lib/db_connection/src/processor.rs +++ b/core/lib/db_connection/src/processor.rs @@ -10,6 +10,9 @@ use std::{ }; use sqlx::{pool::PoolConnection, types::chrono, Connection, PgConnection, Postgres, Transaction}; +use zksync_health_check::async_trait; + +use crate::{connection::ConnectionPool, metrics::CONNECTION_METRICS}; /// Tags that can be associated with a connection. #[derive(Debug, Clone, Copy, PartialEq)] @@ -107,19 +110,19 @@ impl fmt::Debug for PooledStorageProcessor<'_> { impl Drop for PooledStorageProcessor<'_> { fn drop(&mut self) { - // if let Some(tags) = &self.tags { - // let lifetime = self.created_at.elapsed(); - // CONNECTION_METRICS.lifetime[&tags.requester].observe(lifetime); - // - // if lifetime > ConnectionPool::global_config().long_connection_threshold() { - // let file = tags.location.file(); - // let line = tags.location.line(); - // tracing::info!( - // "Long-living connection for `{}` created at {file}:{line}: {lifetime:?}", - // tags.requester - // ); - // } - // } + if let Some(tags) = &self.tags { + let lifetime = self.created_at.elapsed(); + CONNECTION_METRICS.lifetime[&tags.requester].observe(lifetime); + + if lifetime > ConnectionPool::global_config().long_connection_threshold() { + let file = tags.location.file(); + let line = tags.location.line(); + tracing::info!( + "Long-living connection for `{}` created at {file}:{line}: {lifetime:?}", + tags.requester + ); + } + } if let Some((connections, id)) = self.traced { connections.mark_as_dropped(id); } @@ -139,13 +142,13 @@ enum StorageProcessorInner<'a> { /// It holds down the connection (either direct or pooled) to the database /// and provide methods to obtain different storage schema. #[derive(Debug)] -pub struct StorageProcessor<'a> { +pub struct BasicStorageProcessor<'a> { inner: StorageProcessorInner<'a>, } -// todo: rename -pub trait StorageInteraction { - async fn start_transaction(&mut self) -> sqlx::Result>; +#[async_trait] +pub trait StorageProcessor { + async fn start_transaction(&mut self) -> sqlx::Result>; /// Checks if the `StorageProcessor` is currently within database transaction. fn in_transaction(&self) -> bool; @@ -166,14 +169,14 @@ pub trait StorageInteraction { fn conn_and_tags(&mut self) -> (&mut PgConnection, Option<&StorageProcessorTags>); } -impl<'a> StorageInteraction for StorageProcessor<'a> { - async fn start_transaction(&mut self) -> sqlx::Result> { +impl<'a> StorageProcessor for BasicStorageProcessor<'a> { + async fn start_transaction(&mut self) -> sqlx::Result> { let (conn, tags) = self.conn_and_tags(); let inner = StorageProcessorInner::Transaction { transaction: conn.begin().await?, tags, }; - Ok(StorageProcessor { inner }) + Ok(BasicStorageProcessor { inner }) } /// Checks if the `StorageProcessor` is currently within database transaction. @@ -227,7 +230,7 @@ impl<'a> StorageInteraction for StorageProcessor<'a> { } pub trait StorageKind { - type Processor<'a>: 'a + Send + From> + StorageInteraction; + type Processor<'a>: 'a + Send + From> + StorageProcessor; } #[cfg(test)] diff --git a/core/lib/snapshots_applier/src/lib.rs b/core/lib/snapshots_applier/src/lib.rs index b0abbfc504c0..24fb39915bbe 100644 --- a/core/lib/snapshots_applier/src/lib.rs +++ b/core/lib/snapshots_applier/src/lib.rs @@ -6,7 +6,7 @@ use anyhow::Context as _; use async_trait::async_trait; use serde::Serialize; use tokio::sync::Semaphore; -use zksync_dal::{ConnectionPool, SqlxError, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool, SqlxError}; use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_object_store::{ObjectStore, ObjectStoreError}; use zksync_types::{ @@ -246,7 +246,7 @@ struct SnapshotsApplier<'a> { impl<'a> SnapshotsApplier<'a> { /// Recovers [`SnapshotRecoveryStatus`] from the storage and the main node. async fn prepare_applied_snapshot_status( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, main_node_client: &dyn SnapshotsApplierMainNodeClient, ) -> Result<(SnapshotRecoveryStatus, bool), SnapshotsApplierError> { let latency = @@ -428,7 +428,7 @@ impl<'a> SnapshotsApplier<'a> { async fn recover_factory_deps( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> Result<(), SnapshotsApplierError> { let latency = METRICS.initial_stage_duration[&InitialStage::ApplyFactoryDeps].start(); @@ -472,7 +472,7 @@ impl<'a> SnapshotsApplier<'a> { &self, chunk_id: u64, storage_logs: &[SnapshotStorageLog], - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> Result<(), SnapshotsApplierError> { storage .storage_logs_dedup_dal() @@ -490,7 +490,7 @@ impl<'a> SnapshotsApplier<'a> { &self, chunk_id: u64, storage_logs: &[SnapshotStorageLog], - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> Result<(), SnapshotsApplierError> { storage .storage_logs_dal() diff --git a/core/lib/state/src/postgres/mod.rs b/core/lib/state/src/postgres/mod.rs index 91e59b68fea5..22231c7d042e 100644 --- a/core/lib/state/src/postgres/mod.rs +++ b/core/lib/state/src/postgres/mod.rs @@ -5,7 +5,7 @@ use std::{ use anyhow::Context as _; use tokio::{runtime::Handle, sync::mpsc}; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_types::{L1BatchNumber, MiniblockNumber, StorageKey, StorageValue, H256}; use self::metrics::{Method, ValuesUpdateStage, CACHE_METRICS, STORAGE_METRICS}; @@ -146,7 +146,7 @@ impl ValuesCache { from_miniblock: MiniblockNumber, to_miniblock: MiniblockNumber, rt_handle: &Handle, - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, ) { const MAX_MINIBLOCKS_LAG: u32 = 5; @@ -331,7 +331,7 @@ impl PostgresStorageCaches { #[derive(Debug)] pub struct PostgresStorage<'a> { rt_handle: Handle, - connection: StorageProcessor<'a>, + connection: BasicStorageProcessor<'a>, miniblock_number: MiniblockNumber, l1_batch_number_for_miniblock: L1BatchNumber, pending_l1_batch_number: L1BatchNumber, @@ -347,7 +347,7 @@ impl<'a> PostgresStorage<'a> { /// Panics on Postgres errors. pub fn new( rt_handle: Handle, - connection: StorageProcessor<'a>, + connection: BasicStorageProcessor<'a>, block_number: MiniblockNumber, consider_new_l1_batch: bool, ) -> Self { @@ -369,7 +369,7 @@ impl<'a> PostgresStorage<'a> { /// Propagates Postgres errors. pub async fn new_async( rt_handle: Handle, - mut connection: StorageProcessor<'a>, + mut connection: BasicStorageProcessor<'a>, block_number: MiniblockNumber, consider_new_l1_batch: bool, ) -> anyhow::Result> { diff --git a/core/lib/state/src/rocksdb/mod.rs b/core/lib/state/src/rocksdb/mod.rs index e2b5bb66c4d0..758a6b80ef23 100644 --- a/core/lib/state/src/rocksdb/mod.rs +++ b/core/lib/state/src/rocksdb/mod.rs @@ -30,7 +30,7 @@ use std::{ use anyhow::Context as _; use itertools::{Either, Itertools}; use tokio::sync::watch; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_storage::{db::NamedColumnFamily, RocksDB}; use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256, U256}; use zksync_utils::{h256_to_u256, u256_to_h256}; @@ -165,7 +165,7 @@ impl RocksbStorageBuilder { /// in Postgres. pub async fn synchronize( self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, stop_receiver: &watch::Receiver, ) -> anyhow::Result> { let mut inner = self.0; @@ -183,7 +183,7 @@ impl RocksbStorageBuilder { /// Propagates RocksDB and Postgres errors. pub async fn rollback( mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, last_l1_batch_to_keep: L1BatchNumber, ) -> anyhow::Result<()> { self.0.rollback(storage, last_l1_batch_to_keep).await @@ -230,7 +230,7 @@ impl RocksdbStorage { async fn update_from_postgres( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, stop_receiver: &watch::Receiver, ) -> Result<(), RocksdbSyncError> { let mut current_l1_batch_number = self @@ -316,7 +316,7 @@ impl RocksdbStorage { async fn apply_storage_logs( &mut self, storage_logs: HashMap, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result<()> { let db = self.db.clone(); let processed_logs = @@ -357,7 +357,7 @@ impl RocksdbStorage { async fn save_missing_enum_indices( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result<()> { let (true, Some(start_from)) = ( self.enum_index_migration_chunk_size > 0, @@ -481,7 +481,7 @@ impl RocksdbStorage { async fn rollback( &mut self, - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, last_l1_batch_to_keep: L1BatchNumber, ) -> anyhow::Result<()> { tracing::info!("Rolling back state keeper storage to L1 batch #{last_l1_batch_to_keep}..."); diff --git a/core/lib/state/src/rocksdb/recovery.rs b/core/lib/state/src/rocksdb/recovery.rs index dae4ae144be0..2f48dda24022 100644 --- a/core/lib/state/src/rocksdb/recovery.rs +++ b/core/lib/state/src/rocksdb/recovery.rs @@ -4,7 +4,7 @@ use std::ops; use anyhow::Context as _; use tokio::sync::watch; -use zksync_dal::{storage_logs_dal::StorageRecoveryLogEntry, StorageProcessor}; +use zksync_dal::{storage_logs_dal::StorageRecoveryLogEntry, BasicStorageProcessor}; use zksync_types::{ snapshots::{uniform_hashed_keys_chunk, SnapshotRecoveryStatus}, L1BatchNumber, MiniblockNumber, H256, @@ -30,7 +30,7 @@ impl RocksdbStorage { /// Returns the next L1 batch that should be fed to the storage. pub(super) async fn ensure_ready( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, desired_log_chunk_size: u64, stop_receiver: &watch::Receiver, ) -> Result { @@ -65,7 +65,7 @@ impl RocksdbStorage { /// (it would be considered complete even if it failed in the middle). async fn recover_from_snapshot( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, snapshot_recovery: &SnapshotRecoveryStatus, desired_log_chunk_size: u64, stop_receiver: &watch::Receiver, @@ -140,7 +140,7 @@ impl RocksdbStorage { async fn recover_factory_deps( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, snapshot_recovery: &SnapshotRecoveryStatus, ) -> anyhow::Result<()> { // We don't expect that many factory deps; that's why we recover factory deps in any case. @@ -169,7 +169,7 @@ impl RocksdbStorage { } async fn load_key_chunks( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, snapshot_recovery: &SnapshotRecoveryStatus, desired_log_chunk_size: u64, ) -> anyhow::Result> { @@ -219,7 +219,7 @@ impl RocksdbStorage { async fn recover_logs_chunk( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, snapshot_miniblock: MiniblockNumber, key_chunk: ops::RangeInclusive, ) -> anyhow::Result<()> { diff --git a/core/lib/state/src/test_utils.rs b/core/lib/state/src/test_utils.rs index 77c9e9029ea8..68f5a2e8ef28 100644 --- a/core/lib/state/src/test_utils.rs +++ b/core/lib/state/src/test_utils.rs @@ -2,7 +2,7 @@ use std::ops; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{ block::{L1BatchHeader, MiniblockHeader}, snapshots::SnapshotRecoveryStatus, @@ -10,7 +10,7 @@ use zksync_types::{ StorageKey, StorageLog, H256, }; -pub(crate) async fn prepare_postgres(conn: &mut StorageProcessor<'_>) { +pub(crate) async fn prepare_postgres(conn: &mut BasicStorageProcessor<'_>) { if conn.blocks_dal().is_genesis_needed().await.unwrap() { conn.protocol_versions_dal() .save_protocol_version_with_tx(ProtocolVersion::default()) @@ -68,7 +68,7 @@ pub(crate) fn gen_storage_logs(indices: ops::Range) -> Vec { #[allow(clippy::default_trait_access)] // ^ `BaseSystemContractsHashes::default()` would require a new direct dependency pub(crate) async fn create_miniblock( - conn: &mut StorageProcessor<'_>, + conn: &mut BasicStorageProcessor<'_>, miniblock_number: MiniblockNumber, block_logs: Vec, ) { @@ -100,7 +100,7 @@ pub(crate) async fn create_miniblock( #[allow(clippy::default_trait_access)] // ^ `BaseSystemContractsHashes::default()` would require a new direct dependency pub(crate) async fn create_l1_batch( - conn: &mut StorageProcessor<'_>, + conn: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, logs_for_initial_writes: &[StorageLog], ) { @@ -123,7 +123,7 @@ pub(crate) async fn create_l1_batch( } pub(crate) async fn prepare_postgres_for_snapshot_recovery( - conn: &mut StorageProcessor<'_>, + conn: &mut BasicStorageProcessor<'_>, ) -> (SnapshotRecoveryStatus, Vec) { conn.protocol_versions_dal() .save_protocol_version_with_tx(ProtocolVersion::default()) diff --git a/core/lib/vm_utils/src/lib.rs b/core/lib/vm_utils/src/lib.rs index 5a661e433fbd..528898afb245 100644 --- a/core/lib/vm_utils/src/lib.rs +++ b/core/lib/vm_utils/src/lib.rs @@ -7,7 +7,7 @@ use multivm::{ VmInstance, }; use tokio::runtime::Handle; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_state::{PostgresStorage, StoragePtr, StorageView, WriteStorage}; use zksync_types::{L1BatchNumber, L2ChainId, Transaction}; @@ -21,7 +21,7 @@ pub type VmAndStorage<'a> = ( pub fn create_vm( rt_handle: Handle, l1_batch_number: L1BatchNumber, - mut connection: StorageProcessor<'_>, + mut connection: BasicStorageProcessor<'_>, l2_chain_id: L2ChainId, ) -> anyhow::Result { let l1_batch_params_provider = rt_handle diff --git a/core/lib/vm_utils/src/storage.rs b/core/lib/vm_utils/src/storage.rs index 3c6d8d0221ba..4920fb3652d5 100644 --- a/core/lib/vm_utils/src/storage.rs +++ b/core/lib/vm_utils/src/storage.rs @@ -7,7 +7,7 @@ use multivm::{ zk_evm_latest::ethereum_types::H256, }; use zksync_contracts::BaseSystemContracts; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{ block::MiniblockHeader, fee_model::BatchFeeInput, snapshots::SnapshotRecoveryStatus, Address, L1BatchNumber, L2ChainId, MiniblockNumber, ProtocolVersionId, ZKPORTER_IS_AVAILABLE, @@ -89,7 +89,7 @@ pub struct L1BatchParamsProvider { } impl L1BatchParamsProvider { - pub async fn new(storage: &mut StorageProcessor<'_>) -> anyhow::Result { + pub async fn new(storage: &mut BasicStorageProcessor<'_>) -> anyhow::Result { let snapshot = storage .snapshot_recovery_dal() .get_applied_snapshot_status() @@ -101,7 +101,7 @@ impl L1BatchParamsProvider { /// if necessary. pub async fn wait_for_l1_batch_params( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, number: L1BatchNumber, ) -> anyhow::Result<(H256, u64)> { let first_l1_batch = if let Some(snapshot) = &self.snapshot { @@ -122,7 +122,7 @@ impl L1BatchParamsProvider { } async fn wait_for_l1_batch_params_unchecked( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, number: L1BatchNumber, ) -> anyhow::Result<(H256, u64)> { // If the state root is not known yet, this duration will be used to back off in the while loops @@ -148,7 +148,7 @@ impl L1BatchParamsProvider { pub async fn load_l1_batch_protocol_version( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { if let Some(snapshot) = &self.snapshot { @@ -172,7 +172,7 @@ impl L1BatchParamsProvider { /// Returns a header of the first miniblock in the specified L1 batch regardless of whether the batch is sealed or not. pub async fn load_first_miniblock_in_batch( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { let miniblock_number = self @@ -196,7 +196,7 @@ impl L1BatchParamsProvider { #[doc(hidden)] // public for testing purposes pub async fn load_number_of_first_miniblock_in_batch( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, ) -> anyhow::Result> { if l1_batch_number == L1BatchNumber(0) { @@ -232,7 +232,7 @@ impl L1BatchParamsProvider { /// Loads VM-related L1 batch parameters for the specified batch. pub async fn load_l1_batch_params( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, first_miniblock_in_batch: &FirstMiniblockInBatch, validation_computational_gas_limit: u32, chain_id: L2ChainId, diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs index 147c8743bdb9..82c55c8a3a54 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/apply.rs @@ -16,7 +16,7 @@ use multivm::{ VmInstance, }; use tokio::runtime::Handle; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_state::{PostgresStorage, ReadStorage, StoragePtr, StorageView, WriteStorage}; use zksync_system_constants::{ SYSTEM_CONTEXT_ADDRESS, SYSTEM_CONTEXT_CURRENT_L2_BLOCK_INFO_POSITION, @@ -51,7 +51,7 @@ struct Sandbox<'a> { impl<'a> Sandbox<'a> { async fn new( - mut connection: StorageProcessor<'a>, + mut connection: BasicStorageProcessor<'a>, shared_args: TxSharedArgs, execution_args: &'a TxExecutionArgs, block_args: BlockArgs, @@ -108,7 +108,7 @@ impl<'a> Sandbox<'a> { } async fn load_l2_block_info( - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, is_pending_block: bool, resolved_block_info: &ResolvedBlockInfo, ) -> anyhow::Result<(L2BlockEnv, Option)> { @@ -353,7 +353,7 @@ struct StoredL2BlockInfo { impl StoredL2BlockInfo { /// If `miniblock_hash` is `None`, it needs to be fetched from the storage. async fn new( - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, miniblock_number: MiniblockNumber, miniblock_hash: Option, ) -> anyhow::Result { @@ -427,7 +427,7 @@ impl BlockArgs { async fn resolve_block_info( &self, - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result { let (state_l2_block_number, vm_l1_batch_number, l1_batch_timestamp); diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs index 4e40cacae958..6aeabff45a7e 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/mod.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::Context as _; use tokio::runtime::Handle; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_state::{PostgresStorage, PostgresStorageCaches, ReadStorage, StorageView}; use zksync_system_constants::PUBLISH_BYTECODE_OVERHEAD; use zksync_types::{ @@ -149,7 +149,7 @@ impl VmConcurrencyLimiter { } async fn get_pending_state( - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result<(api::BlockId, MiniblockNumber)> { let block_id = api::BlockId::Number(api::BlockNumber::Pending); let resolved_block_number = connection @@ -243,7 +243,7 @@ pub(crate) struct BlockStartInfo { } impl BlockStartInfo { - pub async fn new(storage: &mut StorageProcessor<'_>) -> anyhow::Result { + pub async fn new(storage: &mut BasicStorageProcessor<'_>) -> anyhow::Result { let snapshot_recovery = storage .snapshot_recovery_dal() .get_applied_snapshot_status() @@ -296,7 +296,9 @@ pub(crate) struct BlockArgs { } impl BlockArgs { - pub(crate) async fn pending(connection: &mut StorageProcessor<'_>) -> anyhow::Result { + pub(crate) async fn pending( + connection: &mut BasicStorageProcessor<'_>, + ) -> anyhow::Result { let (block_id, resolved_block_number) = get_pending_state(connection).await?; Ok(Self { block_id, @@ -307,7 +309,7 @@ impl BlockArgs { /// Loads block information from DB. pub async fn new( - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, block_id: api::BlockId, start_info: BlockStartInfo, ) -> Result { diff --git a/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs b/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs index 462977a8d36c..faee37016a75 100644 --- a/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs +++ b/core/lib/zksync_core/src/api_server/execution_sandbox/validate.rs @@ -10,7 +10,7 @@ use multivm::{ vm_latest::HistoryDisabled, MultiVMTracer, }; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_types::{l2::L2Tx, Transaction, TRUSTED_ADDRESS_SLOTS, TRUSTED_TOKEN_SLOTS}; use super::{ @@ -117,7 +117,7 @@ impl TransactionExecutor { /// trusted to change between validation and execution in general case, but /// sometimes we can safely rely on them to not change often. async fn get_validation_params( - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, tx: &L2Tx, computational_gas_limit: u32, ) -> anyhow::Result { diff --git a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs index cdaf6561445e..c47df39f145a 100644 --- a/core/lib/zksync_core/src/api_server/tx_sender/mod.rs +++ b/core/lib/zksync_core/src/api_server/tx_sender/mod.rs @@ -10,7 +10,7 @@ use multivm::{ }; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::StateKeeperConfig}; use zksync_contracts::BaseSystemContracts; -use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, StorageProcessor}; +use zksync_dal::{transactions_dal::L2TxSubmissionResult, BasicStorageProcessor, ConnectionPool}; use zksync_state::PostgresStorageCaches; use zksync_types::{ fee::{Fee, TransactionExecutionMetrics}, @@ -269,7 +269,7 @@ impl TxSender { self.0.storage_caches.clone() } - async fn acquire_replica_connection(&self) -> anyhow::Result> { + async fn acquire_replica_connection(&self) -> anyhow::Result> { self.0 .replica_connection_pool .access_storage_tagged("api") diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs index c662b78e240c..69a06b827b91 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, convert::TryInto}; use anyhow::Context as _; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_mini_merkle_tree::MiniMerkleTree; use zksync_system_constants::DEFAULT_L2_TX_GAS_PER_PUBDATA_BYTE; use zksync_types::{ @@ -45,7 +45,7 @@ impl ZksNamespace { &self.state.current_method } - async fn access_storage(&self) -> Result, Web3Error> { + async fn access_storage(&self) -> Result, Web3Error> { Ok(self .state .connection_pool @@ -275,7 +275,7 @@ impl ZksNamespace { async fn get_l2_to_l1_log_proof_inner( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, index_in_filtered_logs: usize, log_filter: impl Fn(&L2ToL1Log) -> bool, diff --git a/core/lib/zksync_core/src/api_server/web3/state.rs b/core/lib/zksync_core/src/api_server/web3/state.rs index 808f4049f5f7..0e53e4823ed9 100644 --- a/core/lib/zksync_core/src/api_server/web3/state.rs +++ b/core/lib/zksync_core/src/api_server/web3/state.rs @@ -12,7 +12,7 @@ use lru::LruCache; use tokio::sync::{watch, Mutex}; use vise::GaugeGuard; use zksync_config::configs::{api::Web3JsonRpcConfig, chain::NetworkConfig, ContractsConfig}; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_types::{ api, l2::L2Tx, transaction_request::CallRequest, Address, L1BatchNumber, L1ChainId, L2ChainId, MiniblockNumber, H256, U256, U64, @@ -239,7 +239,7 @@ impl RpcState { /// Resolves the specified block ID to a block number, which is guaranteed to be present in the node storage. pub(crate) async fn resolve_block( &self, - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, block: api::BlockId, ) -> Result { self.start_info.ensure_not_pruned(block)?; @@ -260,7 +260,7 @@ impl RpcState { /// non-existing blocks. pub(crate) async fn resolve_block_unchecked( &self, - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, block: api::BlockId, ) -> Result, Web3Error> { self.start_info.ensure_not_pruned(block)?; @@ -279,7 +279,7 @@ impl RpcState { pub(crate) async fn resolve_block_args( &self, - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, block: api::BlockId, ) -> Result { BlockArgs::new(connection, block, self.start_info) diff --git a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs index be1579bb3dc1..00ace2f4a7bf 100644 --- a/core/lib/zksync_core/src/api_server/web3/tests/mod.rs +++ b/core/lib/zksync_core/src/api_server/web3/tests/mod.rs @@ -15,7 +15,7 @@ use zksync_config::configs::{ chain::{NetworkConfig, StateKeeperConfig}, ContractsConfig, }; -use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, StorageProcessor}; +use zksync_dal::{transactions_dal::L2TxSubmissionResult, BasicStorageProcessor, ConnectionPool}; use zksync_health_check::CheckHealth; use zksync_types::{ api, @@ -233,7 +233,7 @@ impl StorageInitialization { async fn prepare_storage( &self, network_config: &NetworkConfig, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result<()> { match self { Self::Genesis => { @@ -340,7 +340,7 @@ fn execute_l2_transaction(transaction: L2Tx) -> TransactionExecutionResult { /// Stores miniblock #1 with a single transaction and returns the miniblock header + transaction hash. async fn store_miniblock( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, number: MiniblockNumber, transaction_results: &[TransactionExecutionResult], ) -> anyhow::Result { @@ -366,7 +366,7 @@ async fn store_miniblock( } async fn seal_l1_batch( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, number: L1BatchNumber, ) -> anyhow::Result<()> { let header = create_l1_batch(number.0); @@ -391,7 +391,7 @@ async fn seal_l1_batch( } async fn store_events( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, miniblock_number: u32, start_idx: u32, ) -> anyhow::Result<(IncludedTxLocation, Vec)> { diff --git a/core/lib/zksync_core/src/consensus/storage/mod.rs b/core/lib/zksync_core/src/consensus/storage/mod.rs index ab5a92637d78..db36c5d17f18 100644 --- a/core/lib/zksync_core/src/consensus/storage/mod.rs +++ b/core/lib/zksync_core/src/consensus/storage/mod.rs @@ -17,7 +17,7 @@ use crate::{ }; /// Context-aware `zksync_dal::StorageProcessor` wrapper. -pub(super) struct Connection<'a>(pub(super) zksync_dal::StorageProcessor<'a>); +pub(super) struct Connection<'a>(pub(super) zksync_dal::BasicStorageProcessor<'a>); impl<'a> Connection<'a> { /// Wrapper for `start_transaction()`. diff --git a/core/lib/zksync_core/src/consistency_checker/mod.rs b/core/lib/zksync_core/src/consistency_checker/mod.rs index a72a558370af..970a06bace08 100644 --- a/core/lib/zksync_core/src/consistency_checker/mod.rs +++ b/core/lib/zksync_core/src/consistency_checker/mod.rs @@ -4,7 +4,7 @@ use anyhow::Context as _; use serde::Serialize; use tokio::sync::watch; use zksync_contracts::PRE_BOOJUM_COMMIT_FUNCTION; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_eth_client::{clients::QueryClient, Error as L1ClientError, EthInterface}; use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_l1_contract_interface::{ @@ -136,7 +136,7 @@ impl LocalL1BatchCommitData { /// Returns `Ok(None)` if Postgres doesn't contain all data necessary to check L1 commitment /// for the specified batch. async fn new( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, batch_number: L1BatchNumber, ) -> anyhow::Result> { let Some(storage_l1_batch) = storage diff --git a/core/lib/zksync_core/src/consistency_checker/tests/mod.rs b/core/lib/zksync_core/src/consistency_checker/tests/mod.rs index b11771ec68ef..995f864b4876 100644 --- a/core/lib/zksync_core/src/consistency_checker/tests/mod.rs +++ b/core/lib/zksync_core/src/consistency_checker/tests/mod.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, slice}; use assert_matches::assert_matches; use test_casing::{test_casing, Product}; use tokio::sync::mpsc; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_eth_client::{clients::MockEthereum, Options}; use zksync_l1_contract_interface::i_executor::structures::StoredBatchInfo; use zksync_types::{ @@ -209,7 +209,7 @@ enum SaveAction<'a> { impl SaveAction<'_> { async fn apply( self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, commit_tx_hash_by_l1_batch: &HashMap, ) { match self { diff --git a/core/lib/zksync_core/src/eth_sender/aggregator.rs b/core/lib/zksync_core/src/eth_sender/aggregator.rs index 4dd1fec06621..19ed6efe97be 100644 --- a/core/lib/zksync_core/src/eth_sender/aggregator.rs +++ b/core/lib/zksync_core/src/eth_sender/aggregator.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use zksync_config::configs::eth_sender::{ProofLoadingMode, ProofSendingMode, SenderConfig}; use zksync_contracts::BaseSystemContractsHashes; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_l1_contract_interface::i_executor::methods::{ CommitBatches, ExecuteBatches, ProveBatches, }; @@ -108,7 +108,7 @@ impl Aggregator { pub async fn get_next_ready_operation( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, base_system_contracts_hashes: BaseSystemContractsHashes, protocol_version_id: ProtocolVersionId, l1_verifier_config: L1VerifierConfig, @@ -156,7 +156,7 @@ impl Aggregator { async fn get_execute_operations( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, limit: usize, last_sealed_l1_batch: L1BatchNumber, ) -> Option { @@ -182,7 +182,7 @@ impl Aggregator { async fn get_commit_operation( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, limit: usize, last_sealed_batch: L1BatchNumber, base_system_contracts_hashes: BaseSystemContractsHashes, @@ -243,7 +243,7 @@ impl Aggregator { } async fn load_dummy_proof_operations( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, limit: usize, is_4844_mode: bool, ) -> Vec { @@ -287,7 +287,7 @@ impl Aggregator { } async fn load_real_proof_operation( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_verifier_config: L1VerifierConfig, proof_loading_mode: &ProofLoadingMode, blob_store: &dyn ObjectStore, @@ -381,7 +381,7 @@ impl Aggregator { async fn prepare_dummy_proof_operation( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ready_for_proof_l1_batches: Vec, last_sealed_l1_batch: L1BatchNumber, ) -> Option { @@ -410,7 +410,7 @@ impl Aggregator { async fn get_proof_operation( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, limit: usize, last_sealed_l1_batch: L1BatchNumber, l1_verifier_config: L1VerifierConfig, @@ -473,7 +473,7 @@ impl Aggregator { } async fn extract_ready_subrange( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, publish_criteria: &mut [Box], unpublished_l1_batches: Vec, last_sealed_l1_batch: L1BatchNumber, diff --git a/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs b/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs index c55d820cb2bc..4c6691f510ce 100644 --- a/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs +++ b/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs @@ -3,7 +3,7 @@ use std::{convert::TryInto, sync::Arc}; use tokio::sync::watch; use zksync_config::configs::eth_sender::SenderConfig; use zksync_contracts::BaseSystemContractsHashes; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_eth_client::{BoundEthInterface, CallFunctionArgs}; use zksync_l1_contract_interface::{ i_executor::commit::kzg::{KzgInfo, ZK_SYNC_BYTES_PER_BLOB}, @@ -342,7 +342,7 @@ impl EthTxAggregator { #[tracing::instrument(skip(self, storage))] async fn loop_iteration( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> Result<(), ETHSenderError> { let MulticallData { base_system_contracts_hashes, @@ -385,7 +385,7 @@ impl EthTxAggregator { } async fn report_eth_tx_saving( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, aggregated_op: AggregatedOperation, tx: &EthTx, ) { @@ -519,7 +519,7 @@ impl EthTxAggregator { pub(super) async fn save_eth_tx( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, aggregated_op: &AggregatedOperation, contracts_are_pre_shared_bridge: bool, ) -> Result { @@ -569,7 +569,7 @@ impl EthTxAggregator { async fn get_next_nonce( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, from_addr: Option
, ) -> Result { let db_nonce = storage diff --git a/core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs b/core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs index b738e995acc6..d581d6e75b61 100644 --- a/core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs +++ b/core/lib/zksync_core/src/eth_sender/eth_tx_manager.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use anyhow::Context as _; use tokio::sync::watch; use zksync_config::configs::eth_sender::SenderConfig; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_eth_client::{ encode_blob_tx_with_sidecar, BoundEthInterface, Error, EthInterface, ExecutedTxStatus, Options, RawTransactionBytes, SignedCallResult, @@ -87,7 +87,7 @@ impl EthTxManager { async fn check_all_sending_attempts( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, op: &EthTx, ) -> Option { // Checking history items, starting from most recently sent. @@ -115,7 +115,7 @@ impl EthTxManager { async fn calculate_fee( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, tx: &EthTx, time_in_mempool: u32, ) -> Result { @@ -181,7 +181,7 @@ impl EthTxManager { async fn increase_priority_fee( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, eth_tx_id: u32, base_fee_per_gas: u64, ) -> Result { @@ -217,7 +217,7 @@ impl EthTxManager { pub(crate) async fn send_eth_tx( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, tx: &EthTx, time_in_mempool: u32, current_block: L1BlockNumber, @@ -285,7 +285,7 @@ impl EthTxManager { async fn send_raw_transaction( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, tx_history_id: u32, raw_tx: RawTransactionBytes, current_block: L1BlockNumber, @@ -404,7 +404,7 @@ impl EthTxManager { // returns the one that has to be resent (if there is one). pub(super) async fn monitor_inflight_transactions( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_block_numbers: L1BlockNumbers, ) -> Result, ETHSenderError> { METRICS.track_block_numbers(&l1_block_numbers); @@ -442,7 +442,7 @@ impl EthTxManager { async fn monitor_inflight_transactions_inner( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_block_numbers: L1BlockNumbers, operator_nonce: OperatorNonce, operator_address: Option
, @@ -569,7 +569,7 @@ impl EthTxManager { async fn send_unsent_txs( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_block_numbers: L1BlockNumbers, ) { for tx in storage.eth_sender_dal().get_unsent_txs().await.unwrap() { @@ -611,7 +611,7 @@ impl EthTxManager { async fn apply_tx_status( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, tx: &EthTx, tx_status: ExecutedTxStatus, finalized_block: L1BlockNumber, @@ -634,7 +634,7 @@ impl EthTxManager { pub async fn fail_tx( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, tx: &EthTx, tx_status: ExecutedTxStatus, ) { @@ -662,7 +662,7 @@ impl EthTxManager { pub async fn confirm_tx( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, tx: &EthTx, tx_status: ExecutedTxStatus, ) { @@ -752,7 +752,7 @@ impl EthTxManager { async fn send_new_eth_txs( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, current_block: L1BlockNumber, ) { let number_inflight_txs = storage @@ -783,7 +783,7 @@ impl EthTxManager { #[tracing::instrument(skip(self, storage))] async fn loop_iteration( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, previous_block: L1BlockNumber, ) -> Result { let l1_block_numbers = self.get_l1_block_numbers().await?; diff --git a/core/lib/zksync_core/src/eth_sender/metrics.rs b/core/lib/zksync_core/src/eth_sender/metrics.rs index 2b5f4b902ad2..466a0f8a9e3e 100644 --- a/core/lib/zksync_core/src/eth_sender/metrics.rs +++ b/core/lib/zksync_core/src/eth_sender/metrics.rs @@ -3,7 +3,7 @@ use std::{fmt, time::Duration}; use vise::{Buckets, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics}; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{aggregated_operations::AggregatedActionType, eth_sender::EthTx}; use zksync_utils::time::seconds_since_epoch; @@ -113,7 +113,7 @@ impl EthSenderMetrics { pub async fn track_eth_tx_metrics( &self, - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, l1_stage: BlockL1Stage, tx: &EthTx, ) { diff --git a/core/lib/zksync_core/src/eth_sender/publish_criterion.rs b/core/lib/zksync_core/src/eth_sender/publish_criterion.rs index 434a3d4b1d3d..155eb630f31e 100644 --- a/core/lib/zksync_core/src/eth_sender/publish_criterion.rs +++ b/core/lib/zksync_core/src/eth_sender/publish_criterion.rs @@ -2,7 +2,7 @@ use std::fmt; use async_trait::async_trait; use chrono::Utc; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_l1_contract_interface::{i_executor::structures::CommitBatchInfo, Tokenizable}; use zksync_types::{ aggregated_operations::AggregatedActionType, commitment::L1BatchWithMetadata, ethabi, @@ -21,7 +21,7 @@ pub trait L1BatchPublishCriterion: fmt::Debug + Send + Sync { /// Otherwise, returns the number of the last L1 batch that needs to be published. async fn last_l1_batch_to_publish( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, consecutive_l1_batches: &[L1BatchWithMetadata], last_sealed_l1_batch: L1BatchNumber, ) -> Option; @@ -42,7 +42,7 @@ impl L1BatchPublishCriterion for NumberCriterion { async fn last_l1_batch_to_publish( &mut self, - _storage: &mut StorageProcessor<'_>, + _storage: &mut BasicStorageProcessor<'_>, consecutive_l1_batches: &[L1BatchWithMetadata], _last_sealed_l1_batch: L1BatchNumber, ) -> Option { @@ -88,7 +88,7 @@ impl L1BatchPublishCriterion for TimestampDeadlineCriterion { async fn last_l1_batch_to_publish( &mut self, - _storage: &mut StorageProcessor<'_>, + _storage: &mut BasicStorageProcessor<'_>, consecutive_l1_batches: &[L1BatchWithMetadata], last_sealed_l1_batch: L1BatchNumber, ) -> Option { @@ -133,7 +133,7 @@ impl GasCriterion { async fn get_gas_amount( &self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, batch_number: L1BatchNumber, ) -> u32 { storage @@ -152,7 +152,7 @@ impl L1BatchPublishCriterion for GasCriterion { async fn last_l1_batch_to_publish( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, consecutive_l1_batches: &[L1BatchWithMetadata], _last_sealed_l1_batch: L1BatchNumber, ) -> Option { @@ -210,7 +210,7 @@ impl L1BatchPublishCriterion for DataSizeCriterion { async fn last_l1_batch_to_publish( &mut self, - _storage: &mut StorageProcessor<'_>, + _storage: &mut BasicStorageProcessor<'_>, consecutive_l1_batches: &[L1BatchWithMetadata], _last_sealed_l1_batch: L1BatchNumber, ) -> Option { diff --git a/core/lib/zksync_core/src/eth_sender/tests.rs b/core/lib/zksync_core/src/eth_sender/tests.rs index 3a7f4c25c663..1a24cdccb91c 100644 --- a/core/lib/zksync_core/src/eth_sender/tests.rs +++ b/core/lib/zksync_core/src/eth_sender/tests.rs @@ -7,7 +7,7 @@ use zksync_config::{ configs::eth_sender::{ProofSendingMode, PubdataSendingMode, SenderConfig}, ContractsConfig, ETHSenderConfig, GasAdjusterConfig, }; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_eth_client::{clients::MockEthereum, EthInterface}; use zksync_l1_contract_interface::i_executor::methods::{ CommitBatches, ExecuteBatches, ProveBatches, @@ -137,7 +137,7 @@ impl EthSenderTester { } } - async fn storage(&self) -> StorageProcessor<'_> { + async fn storage(&self) -> BasicStorageProcessor<'_> { self.conn.access_storage().await.unwrap() } diff --git a/core/lib/zksync_core/src/eth_watch/event_processors/governance_upgrades.rs b/core/lib/zksync_core/src/eth_watch/event_processors/governance_upgrades.rs index 7066838fee88..8ccea4113832 100644 --- a/core/lib/zksync_core/src/eth_watch/event_processors/governance_upgrades.rs +++ b/core/lib/zksync_core/src/eth_watch/event_processors/governance_upgrades.rs @@ -1,6 +1,6 @@ use std::{convert::TryFrom, time::Instant}; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{ ethabi::Contract, protocol_version::GovernanceOperation, web3::types::Log, Address, ProtocolUpgrade, ProtocolVersionId, H256, @@ -41,7 +41,7 @@ impl GovernanceUpgradesEventProcessor { impl EventProcessor for GovernanceUpgradesEventProcessor { async fn process_events( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, client: &dyn EthClient, events: Vec, ) -> Result<(), Error> { diff --git a/core/lib/zksync_core/src/eth_watch/event_processors/mod.rs b/core/lib/zksync_core/src/eth_watch/event_processors/mod.rs index 0a068033f2bd..748324b98c46 100644 --- a/core/lib/zksync_core/src/eth_watch/event_processors/mod.rs +++ b/core/lib/zksync_core/src/eth_watch/event_processors/mod.rs @@ -1,6 +1,6 @@ use std::fmt; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{web3::types::Log, H256}; use crate::eth_watch::client::{Error, EthClient}; @@ -14,7 +14,7 @@ pub trait EventProcessor: 'static + fmt::Debug + Send + Sync { /// Processes given events async fn process_events( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, client: &dyn EthClient, events: Vec, ) -> Result<(), Error>; diff --git a/core/lib/zksync_core/src/eth_watch/event_processors/priority_ops.rs b/core/lib/zksync_core/src/eth_watch/event_processors/priority_ops.rs index ad24eba1791b..7a7ca163a286 100644 --- a/core/lib/zksync_core/src/eth_watch/event_processors/priority_ops.rs +++ b/core/lib/zksync_core/src/eth_watch/event_processors/priority_ops.rs @@ -1,7 +1,7 @@ use std::convert::TryFrom; use zksync_contracts::zksync_contract; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{l1::L1Tx, web3::types::Log, PriorityOpId, H256}; use crate::{ @@ -36,7 +36,7 @@ impl PriorityOpsEventProcessor { impl EventProcessor for PriorityOpsEventProcessor { async fn process_events( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, _client: &dyn EthClient, events: Vec, ) -> Result<(), Error> { diff --git a/core/lib/zksync_core/src/eth_watch/event_processors/upgrades.rs b/core/lib/zksync_core/src/eth_watch/event_processors/upgrades.rs index 393dad5afcda..510b11f02625 100644 --- a/core/lib/zksync_core/src/eth_watch/event_processors/upgrades.rs +++ b/core/lib/zksync_core/src/eth_watch/event_processors/upgrades.rs @@ -1,6 +1,6 @@ use std::convert::TryFrom; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{web3::types::Log, ProtocolUpgrade, ProtocolVersionId, H256}; use crate::eth_watch::{ @@ -32,7 +32,7 @@ impl UpgradesEventProcessor { impl EventProcessor for UpgradesEventProcessor { async fn process_events( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, client: &dyn EthClient, events: Vec, ) -> Result<(), Error> { diff --git a/core/lib/zksync_core/src/eth_watch/mod.rs b/core/lib/zksync_core/src/eth_watch/mod.rs index 0f23650359ef..c623b9269deb 100644 --- a/core/lib/zksync_core/src/eth_watch/mod.rs +++ b/core/lib/zksync_core/src/eth_watch/mod.rs @@ -8,7 +8,7 @@ use std::{sync::Arc, time::Duration}; use tokio::{sync::watch, task::JoinHandle}; use zksync_config::ETHWatchConfig; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_eth_client::EthInterface; use zksync_system_constants::PRIORITY_EXPIRATION; use zksync_types::{ @@ -98,7 +98,7 @@ impl EthWatch { async fn initialize_state( client: &dyn EthClient, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> EthWatchState { let next_expected_priority_id: PriorityOpId = storage .transactions_dal() @@ -162,7 +162,10 @@ impl EthWatch { } #[tracing::instrument(skip(self, storage))] - async fn loop_iteration(&mut self, storage: &mut StorageProcessor<'_>) -> Result<(), Error> { + async fn loop_iteration( + &mut self, + storage: &mut BasicStorageProcessor<'_>, + ) -> Result<(), Error> { let stage_latency = METRICS.poll_eth_node[&PollStage::Request].start(); let to_block = self.client.finalized_block_number().await?; if to_block <= self.last_processed_ethereum_block { diff --git a/core/lib/zksync_core/src/eth_watch/tests.rs b/core/lib/zksync_core/src/eth_watch/tests.rs index 8d9965ef6f23..e2878c63ef96 100644 --- a/core/lib/zksync_core/src/eth_watch/tests.rs +++ b/core/lib/zksync_core/src/eth_watch/tests.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, convert::TryInto, sync::Arc}; use tokio::sync::RwLock; use zksync_contracts::{governance_contract, zksync_contract}; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_types::{ ethabi::{encode, Hash, Token}, l1::{L1Tx, OpProcessingType, PriorityQueueType}, @@ -523,7 +523,7 @@ async fn test_overlapping_batches() { assert_eq!(tx.common_data.serial_id.0, 4); } -async fn get_all_db_txs(storage: &mut StorageProcessor<'_>) -> Vec { +async fn get_all_db_txs(storage: &mut BasicStorageProcessor<'_>) -> Vec { storage.transactions_dal().reset_mempool().await.unwrap(); storage .transactions_dal() diff --git a/core/lib/zksync_core/src/genesis.rs b/core/lib/zksync_core/src/genesis.rs index 6a70a796cd48..459660e5b525 100644 --- a/core/lib/zksync_core/src/genesis.rs +++ b/core/lib/zksync_core/src/genesis.rs @@ -9,7 +9,7 @@ use multivm::{ zk_evm_latest::aux_structures::{LogQuery as MultiVmLogQuery, Timestamp as MultiVMTimestamp}, }; use zksync_contracts::{BaseSystemContracts, SET_CHAIN_ID_EVENT}; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_eth_client::{clients::QueryClient, EthInterface}; use zksync_merkle_tree::domain::ZkSyncTree; use zksync_system_constants::PRIORITY_EXPIRATION; @@ -59,7 +59,7 @@ impl GenesisParams { } pub async fn ensure_genesis_state( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, zksync_chain_id: L2ChainId, genesis_params: &GenesisParams, ) -> anyhow::Result { @@ -157,7 +157,7 @@ pub async fn ensure_genesis_state( // The code of the bootloader should not be deployed anywhere anywhere in the kernel space (i.e. addresses below 2^16) // because in this case we will have to worry about protecting it. async fn insert_base_system_contracts_to_factory_deps( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, contracts: &BaseSystemContracts, ) -> anyhow::Result<()> { let factory_deps = [&contracts.bootloader, &contracts.default_aa] @@ -173,7 +173,7 @@ async fn insert_base_system_contracts_to_factory_deps( } async fn insert_system_contracts( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, contracts: &[DeployedContract], chain_id: L2ChainId, ) -> anyhow::Result<()> { @@ -288,7 +288,7 @@ async fn insert_system_contracts( #[allow(clippy::too_many_arguments)] pub(crate) async fn create_genesis_l1_batch( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, first_validator_address: Address, chain_id: L2ChainId, protocol_version: ProtocolVersionId, @@ -367,7 +367,7 @@ pub(crate) async fn create_genesis_l1_batch( Ok(()) } -async fn add_eth_token(transaction: &mut StorageProcessor<'_>) -> anyhow::Result<()> { +async fn add_eth_token(transaction: &mut BasicStorageProcessor<'_>) -> anyhow::Result<()> { assert!(transaction.in_transaction()); // sanity check let eth_token = TokenInfo { l1_address: ETHEREUM_ADDRESS, @@ -393,7 +393,7 @@ async fn add_eth_token(transaction: &mut StorageProcessor<'_>) -> anyhow::Result } async fn save_genesis_l1_batch_metadata( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, commitment: L1BatchCommitment, genesis_root_hash: H256, rollup_last_leaf_index: u64, @@ -428,7 +428,7 @@ pub(crate) async fn save_set_chain_id_tx( eth_client_url: &str, diamond_proxy_address: Address, state_transition_manager_address: Address, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result<()> { let eth_client = QueryClient::new(eth_client_url)?; let to = eth_client.block_number("fetch_chain_id_tx").await?.as_u64(); diff --git a/core/lib/zksync_core/src/metadata_calculator/helpers.rs b/core/lib/zksync_core/src/metadata_calculator/helpers.rs index c3c6a8e324d2..54a762266b88 100644 --- a/core/lib/zksync_core/src/metadata_calculator/helpers.rs +++ b/core/lib/zksync_core/src/metadata_calculator/helpers.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::sync::watch; use zksync_config::configs::database::MerkleTreeMode; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_health_check::{Health, HealthStatus}; use zksync_merkle_tree::{ domain::{TreeMetadata, ZkSyncTree, ZkSyncTreeReader}, @@ -398,7 +398,7 @@ pub(crate) struct L1BatchWithLogs { impl L1BatchWithLogs { pub async fn new( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, ) -> Option { tracing::debug!("Loading storage logs data for L1 batch #{l1_batch_number}"); diff --git a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs index f6b6f74fb2b2..2375b9b772e2 100644 --- a/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs +++ b/core/lib/zksync_core/src/metadata_calculator/recovery/mod.rs @@ -34,7 +34,7 @@ use anyhow::Context as _; use async_trait::async_trait; use futures::future; use tokio::sync::{watch, Mutex, Semaphore}; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_health_check::HealthUpdater; use zksync_merkle_tree::TreeEntry; use zksync_types::{ @@ -268,7 +268,7 @@ impl AsyncTreeRecovery { /// Filters out `key_chunks` for which recovery was successfully performed. async fn filter_chunks( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, snapshot_miniblock: MiniblockNumber, key_chunks: &[ops::RangeInclusive], ) -> anyhow::Result>> { diff --git a/core/lib/zksync_core/src/metadata_calculator/tests.rs b/core/lib/zksync_core/src/metadata_calculator/tests.rs index ece4c4e66fce..649f4cf9daca 100644 --- a/core/lib/zksync_core/src/metadata_calculator/tests.rs +++ b/core/lib/zksync_core/src/metadata_calculator/tests.rs @@ -10,7 +10,7 @@ use zksync_config::configs::{ chain::OperationsManagerConfig, database::{MerkleTreeConfig, MerkleTreeMode}, }; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_health_check::{CheckHealth, HealthStatus}; use zksync_merkle_tree::domain::ZkSyncTree; use zksync_object_store::{ObjectStore, ObjectStoreFactory}; @@ -469,7 +469,7 @@ pub(crate) async fn reset_db_state(pool: &ConnectionPool, num_batches: usize) { } pub(super) async fn extend_db_state( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, new_logs: impl IntoIterator>, ) { let mut storage = storage.start_transaction().await.unwrap(); @@ -484,7 +484,7 @@ pub(super) async fn extend_db_state( } pub(super) async fn extend_db_state_from_l1_batch( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, next_l1_batch: L1BatchNumber, new_logs: impl IntoIterator>, ) { @@ -522,7 +522,7 @@ pub(super) async fn extend_db_state_from_l1_batch( } async fn insert_initial_writes_for_batch( - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, ) { let written_non_zero_slots: Vec<_> = connection @@ -592,7 +592,7 @@ pub(crate) fn gen_storage_logs( } async fn remove_l1_batches( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, last_l1_batch_to_keep: L1BatchNumber, ) -> Vec { let sealed_l1_batch_number = storage diff --git a/core/lib/zksync_core/src/metadata_calculator/updater.rs b/core/lib/zksync_core/src/metadata_calculator/updater.rs index 081fa2458899..2234e939c6ff 100644 --- a/core/lib/zksync_core/src/metadata_calculator/updater.rs +++ b/core/lib/zksync_core/src/metadata_calculator/updater.rs @@ -5,7 +5,7 @@ use std::{ops, sync::Arc, time::Instant}; use anyhow::Context as _; use futures::{future, FutureExt}; use tokio::sync::watch; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_health_check::HealthUpdater; use zksync_merkle_tree::domain::TreeMetadata; use zksync_object_store::ObjectStore; @@ -86,7 +86,7 @@ impl TreeUpdater { /// is slow for whatever reason. async fn process_multiple_batches( &mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_batch_numbers: ops::RangeInclusive, ) -> L1BatchNumber { let start = Instant::now(); @@ -167,7 +167,7 @@ impl TreeUpdater { async fn step( &mut self, - mut storage: StorageProcessor<'_>, + mut storage: BasicStorageProcessor<'_>, next_l1_batch_to_seal: &mut L1BatchNumber, ) { let Some(last_sealed_l1_batch) = storage @@ -312,7 +312,7 @@ impl TreeUpdater { } async fn check_initial_writes_consistency( - connection: &mut StorageProcessor<'_>, + connection: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, tree_initial_writes: &[InitialStorageWrite], ) { diff --git a/core/lib/zksync_core/src/reorg_detector/tests.rs b/core/lib/zksync_core/src/reorg_detector/tests.rs index 3e306d134c4a..3f9bf51a26ea 100644 --- a/core/lib/zksync_core/src/reorg_detector/tests.rs +++ b/core/lib/zksync_core/src/reorg_detector/tests.rs @@ -8,7 +8,7 @@ use std::{ use assert_matches::assert_matches; use test_casing::{test_casing, Product}; use tokio::sync::mpsc; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{ block::{MiniblockHasher, MiniblockHeader}, L2ChainId, ProtocolVersion, @@ -21,7 +21,7 @@ use crate::{ utils::testonly::{create_l1_batch, create_miniblock}, }; -async fn store_miniblock(storage: &mut StorageProcessor<'_>, number: u32, hash: H256) { +async fn store_miniblock(storage: &mut BasicStorageProcessor<'_>, number: u32, hash: H256) { let header = MiniblockHeader { hash, ..create_miniblock(number) @@ -33,7 +33,7 @@ async fn store_miniblock(storage: &mut StorageProcessor<'_>, number: u32, hash: .unwrap(); } -async fn seal_l1_batch(storage: &mut StorageProcessor<'_>, number: u32, hash: H256) { +async fn seal_l1_batch(storage: &mut BasicStorageProcessor<'_>, number: u32, hash: H256) { let header = create_l1_batch(number); storage .blocks_dal() diff --git a/core/lib/zksync_core/src/state_keeper/io/common/mod.rs b/core/lib/zksync_core/src/state_keeper/io/common/mod.rs index 64001d7d502d..edcd5df6a126 100644 --- a/core/lib/zksync_core/src/state_keeper/io/common/mod.rs +++ b/core/lib/zksync_core/src/state_keeper/io/common/mod.rs @@ -2,7 +2,7 @@ use std::time::Duration; use anyhow::Context; use multivm::interface::{L1BatchEnv, SystemEnv}; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{L1BatchNumber, MiniblockNumber, H256}; use super::PendingBatchData; @@ -30,7 +30,7 @@ pub(crate) struct IoCursor { impl IoCursor { /// Loads the cursor from Postgres. - pub async fn new(storage: &mut StorageProcessor<'_>) -> anyhow::Result { + pub async fn new(storage: &mut BasicStorageProcessor<'_>) -> anyhow::Result { let last_sealed_l1_batch_number = storage .blocks_dal() .get_sealed_l1_batch_number() @@ -88,7 +88,7 @@ impl IoCursor { /// /// Propagates DB errors. Also returns an error if environment doesn't correspond to a pending L1 batch. pub(crate) async fn load_pending_batch( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, system_env: SystemEnv, l1_batch_env: L1BatchEnv, ) -> anyhow::Result { diff --git a/core/lib/zksync_core/src/state_keeper/io/fee_address_migration.rs b/core/lib/zksync_core/src/state_keeper/io/fee_address_migration.rs index 84fdc8e44cff..a6a81e7dcf84 100644 --- a/core/lib/zksync_core/src/state_keeper/io/fee_address_migration.rs +++ b/core/lib/zksync_core/src/state_keeper/io/fee_address_migration.rs @@ -6,12 +6,12 @@ use std::time::{Duration, Instant}; use anyhow::Context as _; use tokio::sync::watch; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_types::MiniblockNumber; /// Runs the migration for pending miniblocks. pub(crate) async fn migrate_pending_miniblocks( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result<()> { let started_at = Instant::now(); tracing::info!("Started migrating `fee_account_address` for pending miniblocks"); @@ -153,7 +153,7 @@ async fn migrate_miniblocks_inner( #[allow(deprecated)] async fn is_fee_address_migrated( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, miniblock: MiniblockNumber, ) -> anyhow::Result { storage diff --git a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs index 85f7d3024c66..c1d27115915f 100644 --- a/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs +++ b/core/lib/zksync_core/src/state_keeper/io/seal_logic.rs @@ -9,7 +9,7 @@ use multivm::{ interface::{FinishedL1Batch, L1BatchEnv}, utils::get_max_gas_per_pubdata_byte, }; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{ block::{unpack_block_info, L1BatchHeader, MiniblockHeader}, event::{extract_added_tokens, extract_long_l2_to_l1_messages}, @@ -49,7 +49,7 @@ impl UpdatesManager { #[must_use = "fictive miniblock must be used to update I/O params"] pub(crate) async fn seal_l1_batch( mut self, - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, current_miniblock_number: MiniblockNumber, l1_batch_env: &L1BatchEnv, finished_batch: FinishedL1Batch, @@ -268,7 +268,7 @@ impl UpdatesManager { } impl MiniblockSealCommand { - pub async fn seal(&self, storage: &mut StorageProcessor<'_>) { + pub async fn seal(&self, storage: &mut BasicStorageProcessor<'_>) { self.seal_inner(storage, false).await; } @@ -281,7 +281,7 @@ impl MiniblockSealCommand { /// one for sending fees to the operator). /// /// `l2_erc20_bridge_addr` is required to extract the information on newly added tokens. - async fn seal_inner(&self, storage: &mut StorageProcessor<'_>, is_fictive: bool) { + async fn seal_inner(&self, storage: &mut BasicStorageProcessor<'_>, is_fictive: bool) { self.assert_valid_miniblock(is_fictive); let mut transaction = storage.start_transaction().await.unwrap(); diff --git a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs index 1e864b063475..1b3acf01f0d2 100644 --- a/core/lib/zksync_core/src/state_keeper/mempool_actor.rs +++ b/core/lib/zksync_core/src/state_keeper/mempool_actor.rs @@ -6,7 +6,7 @@ use multivm::utils::derive_base_fee_and_gas_per_pubdata; use tokio::sync::mpsc; use tokio::sync::watch; use zksync_config::configs::chain::MempoolConfig; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_mempool::L2TxFilter; #[cfg(test)] use zksync_types::H256; @@ -131,7 +131,7 @@ impl MempoolFetcher { /// Loads nonces for all distinct `transactions` initiators from the storage. async fn get_transaction_nonces( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, transactions: &[Transaction], ) -> anyhow::Result> { let (nonce_keys, address_by_nonce_key): (Vec<_>, HashMap<_, _>) = transactions diff --git a/core/lib/zksync_core/src/state_keeper/types.rs b/core/lib/zksync_core/src/state_keeper/types.rs index 34ed66895cb0..1f596ff47785 100644 --- a/core/lib/zksync_core/src/state_keeper/types.rs +++ b/core/lib/zksync_core/src/state_keeper/types.rs @@ -4,7 +4,7 @@ use std::{ }; use multivm::interface::VmExecutionResultAndLogs; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_mempool::{L2TxFilter, MempoolInfo, MempoolStore}; use zksync_types::{ block::BlockGasCount, tx::ExecutionMetrics, Address, Nonce, PriorityOpId, Transaction, @@ -17,7 +17,10 @@ use crate::gas_tracker::{gas_count_from_metrics, gas_count_from_tx_and_metrics}; pub struct MempoolGuard(Arc>); impl MempoolGuard { - pub async fn from_storage(storage_processor: &mut StorageProcessor<'_>, capacity: u64) -> Self { + pub async fn from_storage( + storage_processor: &mut BasicStorageProcessor<'_>, + capacity: u64, + ) -> Self { let next_priority_id = storage_processor .transactions_dal() .next_priority_id() diff --git a/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs b/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs index a6a1835a9f18..31ef0e8e4663 100644 --- a/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs +++ b/core/lib/zksync_core/src/sync_layer/batch_status_updater/mod.rs @@ -9,7 +9,7 @@ use serde::Serialize; #[cfg(test)] use tokio::sync::mpsc; use tokio::sync::watch; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_health_check::{Health, HealthStatus, HealthUpdater, ReactiveHealthCheck}; use zksync_types::{ aggregated_operations::AggregatedActionType, api, L1BatchNumber, MiniblockNumber, H256, @@ -126,7 +126,7 @@ struct UpdaterCursor { } impl UpdaterCursor { - async fn new(storage: &mut StorageProcessor<'_>) -> anyhow::Result { + async fn new(storage: &mut BasicStorageProcessor<'_>) -> anyhow::Result { let first_l1_batch_number = projected_first_l1_batch(storage).await?; // Use the snapshot L1 batch, or the genesis batch if we are not using a snapshot. Technically, the snapshot L1 batch // is not necessarily proven / executed yet, but since it and earlier batches are not stored, it serves diff --git a/core/lib/zksync_core/src/sync_layer/fetcher.rs b/core/lib/zksync_core/src/sync_layer/fetcher.rs index ddaa1516e61e..1ab597377af9 100644 --- a/core/lib/zksync_core/src/sync_layer/fetcher.rs +++ b/core/lib/zksync_core/src/sync_layer/fetcher.rs @@ -1,5 +1,5 @@ use anyhow::Context as _; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{ api::en::SyncBlock, block::MiniblockHasher, Address, L1BatchNumber, MiniblockNumber, ProtocolVersionId, H256, @@ -74,7 +74,9 @@ impl TryFrom for FetchedBlock { impl IoCursor { /// Loads this cursor from storage and modifies it to account for the pending L1 batch if necessary. - pub(crate) async fn for_fetcher(storage: &mut StorageProcessor<'_>) -> anyhow::Result { + pub(crate) async fn for_fetcher( + storage: &mut BasicStorageProcessor<'_>, + ) -> anyhow::Result { let mut this = Self::new(storage).await?; // It's important to know whether we have opened a new batch already or just sealed the previous one. // Depending on it, we must either insert `OpenBatch` item into the queue, or not. diff --git a/core/lib/zksync_core/src/sync_layer/genesis.rs b/core/lib/zksync_core/src/sync_layer/genesis.rs index 021d828c9667..d0df2f9c7ecb 100644 --- a/core/lib/zksync_core/src/sync_layer/genesis.rs +++ b/core/lib/zksync_core/src/sync_layer/genesis.rs @@ -1,6 +1,6 @@ use anyhow::Context as _; use zksync_contracts::{BaseSystemContracts, BaseSystemContractsHashes, SystemContractCode}; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_types::{ block::DeployedContract, protocol_version::L1VerifierConfig, system_contracts::get_system_smart_contracts, AccountTreeId, Address, L1BatchNumber, L2ChainId, @@ -11,7 +11,7 @@ use super::client::MainNodeClient; use crate::genesis::{ensure_genesis_state, GenesisParams}; pub async fn perform_genesis_if_needed( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, zksync_chain_id: L2ChainId, client: &dyn MainNodeClient, ) -> anyhow::Result<()> { diff --git a/core/lib/zksync_core/src/sync_layer/tests.rs b/core/lib/zksync_core/src/sync_layer/tests.rs index f1bfb70a622e..d81ffb4f8458 100644 --- a/core/lib/zksync_core/src/sync_layer/tests.rs +++ b/core/lib/zksync_core/src/sync_layer/tests.rs @@ -9,7 +9,7 @@ use std::{ use test_casing::test_casing; use tokio::{sync::watch, task::JoinHandle}; use zksync_contracts::BaseSystemContractsHashes; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_types::{ api, block::MiniblockHasher, @@ -125,7 +125,7 @@ impl StateKeeperHandles { } } -async fn ensure_genesis(storage: &mut StorageProcessor<'_>) { +async fn ensure_genesis(storage: &mut BasicStorageProcessor<'_>) { if storage.blocks_dal().is_genesis_needed().await.unwrap() { ensure_genesis_state(storage, L2ChainId::default(), &GenesisParams::mock()) .await diff --git a/core/lib/zksync_core/src/utils/mod.rs b/core/lib/zksync_core/src/utils/mod.rs index ad2e4bd05412..5b88f0434a1d 100644 --- a/core/lib/zksync_core/src/utils/mod.rs +++ b/core/lib/zksync_core/src/utils/mod.rs @@ -9,7 +9,7 @@ use std::{ use anyhow::Context as _; use async_trait::async_trait; use tokio::sync::watch; -use zksync_dal::{ConnectionPool, StorageProcessor}; +use zksync_dal::{BasicStorageProcessor, ConnectionPool}; use zksync_types::{L1BatchNumber, ProtocolVersionId}; #[cfg(test)] @@ -120,7 +120,7 @@ pub(crate) async fn wait_for_l1_batch_with_metadata( /// Returns the projected number of the first locally available L1 batch. The L1 batch is **not** /// guaranteed to be present in the storage! pub(crate) async fn projected_first_l1_batch( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result { let snapshot_recovery = storage .snapshot_recovery_dal() @@ -133,7 +133,7 @@ pub(crate) async fn projected_first_l1_batch( /// Obtains a protocol version projected to be applied for the next miniblock. This is either the version used by the last /// sealed miniblock, or (if there are no miniblocks), one referenced in the snapshot recovery record. pub(crate) async fn pending_protocol_version( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, ) -> anyhow::Result { static WARNED_ABOUT_NO_VERSION: AtomicBool = AtomicBool::new(false); diff --git a/core/lib/zksync_core/src/utils/testonly.rs b/core/lib/zksync_core/src/utils/testonly.rs index 978b342a4b8a..752d00b6311a 100644 --- a/core/lib/zksync_core/src/utils/testonly.rs +++ b/core/lib/zksync_core/src/utils/testonly.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use multivm::utils::get_max_gas_per_pubdata_byte; use zksync_contracts::BaseSystemContractsHashes; -use zksync_dal::StorageProcessor; +use zksync_dal::BasicStorageProcessor; use zksync_merkle_tree::{domain::ZkSyncTree, TreeInstruction}; use zksync_system_constants::ZKPORTER_IS_AVAILABLE; use zksync_types::{ @@ -150,7 +150,7 @@ pub(crate) fn execute_l2_transaction(transaction: L2Tx) -> TransactionExecutionR /// Prepares a recovery snapshot without performing genesis. pub(crate) async fn prepare_recovery_snapshot( - storage: &mut StorageProcessor<'_>, + storage: &mut BasicStorageProcessor<'_>, l1_batch_number: L1BatchNumber, miniblock_number: MiniblockNumber, snapshot_logs: &[StorageLog], diff --git a/prover/Cargo.lock b/prover/Cargo.lock index f1b5e9ba7a94..b6def6898f3e 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -4024,7 +4024,9 @@ name = "prover_dal" version = "0.1.0" dependencies = [ "sqlx", + "strum", "zksync_db_connection", + "zksync_types", ] [[package]] @@ -7095,12 +7097,15 @@ version = "0.1.0" dependencies = [ "anyhow", "rand 0.8.5", + "serde", + "serde_json", "sqlx", "tokio", "tracing", "url", "vise", "zksync_health_check", + "zksync_types", ] [[package]] diff --git a/prover/prover_dal/Cargo.toml b/prover/prover_dal/Cargo.toml index 0aea1bb600df..132d1d54226c 100644 --- a/prover/prover_dal/Cargo.toml +++ b/prover/prover_dal/Cargo.toml @@ -7,6 +7,9 @@ edition = "2021" [dependencies] zksync_db_connection = { path = "../../core/lib/db_connection" } +zksync_types = { path = "../../core/lib/types" } + +strum = { version = "0.24", features = ["derive"] } sqlx = { version = "0.7.3", default-features = false, features = [ "runtime-tokio", "tls-native-tls", diff --git a/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs b/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs index 1193371df2bb..ca05a98b2764 100644 --- a/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs +++ b/prover/prover_dal/src/fri_gpu_prover_queue_dal.rs @@ -1,5 +1,7 @@ use std::time::Duration; +use zksync_db_connection::processor::StorageProcessor; + use crate::{ fri_prover_dal::types::{GpuProverInstanceStatus, SocketAddress}, time_utils::pg_interval_from_duration, diff --git a/prover/prover_dal/src/fri_proof_compressor_dal.rs b/prover/prover_dal/src/fri_proof_compressor_dal.rs index 433857e80f0f..97a7c6b24b4f 100644 --- a/prover/prover_dal/src/fri_proof_compressor_dal.rs +++ b/prover/prover_dal/src/fri_proof_compressor_dal.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, str::FromStr, time::Duration}; use sqlx::Row; use strum::{Display, EnumString}; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::L1BatchNumber; use crate::{ diff --git a/prover/prover_dal/src/fri_protocol_versions_dal.rs b/prover/prover_dal/src/fri_protocol_versions_dal.rs index 5357232eb4e2..f74c05c9174d 100644 --- a/prover/prover_dal/src/fri_protocol_versions_dal.rs +++ b/prover/prover_dal/src/fri_protocol_versions_dal.rs @@ -1,5 +1,6 @@ use std::convert::TryFrom; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::protocol_version::{FriProtocolVersionId, L1VerifierConfig}; use crate::ProverProcessor; diff --git a/prover/prover_dal/src/fri_prover_dal.rs b/prover/prover_dal/src/fri_prover_dal.rs index 0e6b42322f11..1d4292ea9d2b 100644 --- a/prover/prover_dal/src/fri_prover_dal.rs +++ b/prover/prover_dal/src/fri_prover_dal.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, convert::TryFrom, time::Duration}; -use zksync_db_connection::instrument::InstrumentExt; +use zksync_db_connection::{instrument::InstrumentExt, processor::StorageProcessor}; use zksync_types::{ basic_fri_types::{AggregationRound, CircuitIdRoundTuple}, protocol_version::FriProtocolVersionId, diff --git a/prover/prover_dal/src/fri_scheduler_dependency_tracker_dal.rs b/prover/prover_dal/src/fri_scheduler_dependency_tracker_dal.rs index 222b227ed5ca..c999cddd6a2d 100644 --- a/prover/prover_dal/src/fri_scheduler_dependency_tracker_dal.rs +++ b/prover/prover_dal/src/fri_scheduler_dependency_tracker_dal.rs @@ -1,3 +1,4 @@ +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{basic_fri_types::FinalProofIds, L1BatchNumber}; use crate::{fri_prover_dal::types, ProverProcessor}; diff --git a/prover/prover_dal/src/fri_witness_generator_dal.rs b/prover/prover_dal/src/fri_witness_generator_dal.rs index 75f08264df57..5caf3ead2534 100644 --- a/prover/prover_dal/src/fri_witness_generator_dal.rs +++ b/prover/prover_dal/src/fri_witness_generator_dal.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, convert::TryFrom, time::Duration}; use sqlx::Row; +use zksync_db_connection::processor::StorageProcessor; use zksync_types::{ basic_fri_types::{AggregationRound, Eip4844Blobs}, protocol_version::FriProtocolVersionId, diff --git a/prover/prover_dal/src/lib.rs b/prover/prover_dal/src/lib.rs index c1dd917bacb7..e57483e81b7a 100644 --- a/prover/prover_dal/src/lib.rs +++ b/prover/prover_dal/src/lib.rs @@ -1,6 +1,6 @@ use sqlx::{pool::PoolConnection, PgConnection, Postgres}; use zksync_db_connection::processor::{ - StorageInteraction, StorageKind, StorageProcessor, StorageProcessorTags, TracedConnections, + BasicStorageProcessor, StorageKind, StorageProcessor, StorageProcessorTags, TracedConnections, }; use crate::{ @@ -20,13 +20,13 @@ pub mod fri_witness_generator_dal; pub struct Prover(()); -pub struct ProverProcessor<'a>(StorageProcessor<'a>); +pub struct ProverProcessor<'a>(BasicStorageProcessor<'a>); impl StorageKind for Prover { type Processor<'a> = ProverProcessor<'a>; } -impl<'a> StorageInteraction for ProverProcessor<'a> { +impl<'a> StorageProcessor for ProverProcessor<'a> { async fn start_transaction(&mut self) -> sqlx::Result> { self.0.start_transaction() } @@ -45,7 +45,7 @@ impl<'a> StorageInteraction for ProverProcessor<'a> { tags: Option, traced_connections: Option<&TracedConnections>, ) -> Self { - Self(StorageProcessor::from_pool( + Self(BasicStorageProcessor::from_pool( connection, tags, traced_connections,