diff --git a/Cargo.lock b/Cargo.lock index eac6e9771f5a..a5e51346bdf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10777,6 +10777,7 @@ dependencies = [ "anyhow", "assert_matches", "async-trait", + "backon", "chrono", "futures 0.3.30", "once_cell", diff --git a/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json b/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json new file mode 100644 index 000000000000..b40bdca666b8 --- /dev/null +++ b/core/lib/dal/.sqlx/query-180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM l1_batches\n WHERE\n number > $1\n AND NOT is_sealed\n RETURNING number\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "180cc8d88563a42423ca1d4b92181f4625ebd593aa4cd2bae79bcc0637387d78" +} diff --git a/core/lib/dal/src/blocks_dal.rs b/core/lib/dal/src/blocks_dal.rs index bf1b48130c40..f71dc68ce757 100644 --- a/core/lib/dal/src/blocks_dal.rs +++ b/core/lib/dal/src/blocks_dal.rs @@ -2058,6 +2058,37 @@ impl BlocksDal<'_, '_> { Ok(()) } + /// Deletes the unsealed L1 batch from the storage. Expects the caller to make sure there are no + /// associated L2 blocks. + /// + /// Accepts `batch_to_keep` as a safety mechanism. + pub async fn delete_unsealed_l1_batch( + &mut self, + batch_to_keep: L1BatchNumber, + ) -> DalResult<()> { + let deleted_row = sqlx::query!( + r#" + DELETE FROM l1_batches + WHERE + number > $1 + AND NOT is_sealed + RETURNING number + "#, + i64::from(batch_to_keep.0) + ) + .instrument("delete_unsealed_l1_batch") + .with_arg("batch_to_keep", &batch_to_keep) + .fetch_optional(self.storage) + .await?; + if let Some(deleted_row) = deleted_row { + tracing::info!( + l1_batch_number = %deleted_row.number, + "Deleted unsealed batch" + ); + } + Ok(()) + } + /// Deletes all L1 batches from the storage so that the specified batch number is the last one left. pub async fn delete_l1_batches(&mut self, last_batch_to_keep: L1BatchNumber) -> DalResult<()> { self.delete_l1_batches_inner(Some(last_batch_to_keep)).await @@ -2184,6 +2215,20 @@ impl BlocksDal<'_, '_> { Ok(Some((L2BlockNumber(min as u32), L2BlockNumber(max as u32)))) } + /// Returns `true` if there exists a non-sealed batch (i.e. there is one+ stored L2 block that isn't assigned + /// to any batch yet). + pub async fn pending_batch_exists(&mut self) -> DalResult { + let count = sqlx::query_scalar!( + "SELECT COUNT(miniblocks.number) FROM miniblocks WHERE l1_batch_number IS NULL" + ) + .instrument("pending_batch_exists") + .fetch_one(self.storage) + .await? + .unwrap_or(0); + + Ok(count != 0) + } + // methods used for measuring Eth tx stage transition latencies // and emitting metrics base on these measured data pub async fn oldest_uncommitted_batch_timestamp(&mut self) -> DalResult> { diff --git a/core/lib/multivm/src/versions/testonly/bytecode_publishing.rs b/core/lib/multivm/src/versions/testonly/bytecode_publishing.rs index 33af7be8cc6f..346241a96245 100644 --- a/core/lib/multivm/src/versions/testonly/bytecode_publishing.rs +++ b/core/lib/multivm/src/versions/testonly/bytecode_publishing.rs @@ -1,4 +1,4 @@ -use zksync_test_account::{DeployContractsTx, TxType}; +use zksync_test_account::TxType; use super::{read_test_contract, tester::VmTesterBuilder, TestedVm}; use crate::{ @@ -20,8 +20,16 @@ pub(crate) fn test_bytecode_publishing() { let compressed_bytecode = bytecode::compress(counter.clone()).unwrap().compressed; - let DeployContractsTx { tx, .. } = account.get_deploy_tx(&counter, None, TxType::L2); - vm.vm.push_transaction(tx); + let tx = account.get_deploy_tx(&counter, None, TxType::L2).tx; + assert_eq!(tx.execute.factory_deps.len(), 1); // The deployed bytecode is the only dependency + let push_result = vm.vm.push_transaction(tx); + assert_eq!(push_result.compressed_bytecodes.len(), 1); + assert_eq!(push_result.compressed_bytecodes[0].original, counter); + assert_eq!( + push_result.compressed_bytecodes[0].compressed, + compressed_bytecode + ); + let result = vm.vm.execute(VmExecutionMode::OneTx); assert!(!result.result.is_failed(), "Transaction wasn't successful"); diff --git a/core/lib/multivm/src/versions/vm_1_3_2/vm.rs b/core/lib/multivm/src/versions/vm_1_3_2/vm.rs index 89196788a762..31457fc9676a 100644 --- a/core/lib/multivm/src/versions/vm_1_3_2/vm.rs +++ b/core/lib/multivm/src/versions/vm_1_3_2/vm.rs @@ -8,8 +8,9 @@ use crate::{ interface::{ storage::{StoragePtr, WriteStorage}, BytecodeCompressionError, BytecodeCompressionResult, FinishedL1Batch, L1BatchEnv, - L2BlockEnv, SystemEnv, TxExecutionMode, VmExecutionMode, VmExecutionResultAndLogs, - VmFactory, VmInterface, VmInterfaceHistoryEnabled, VmMemoryMetrics, + L2BlockEnv, PushTransactionResult, SystemEnv, TxExecutionMode, VmExecutionMode, + VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, + VmMemoryMetrics, }, tracers::old::TracerDispatcher, utils::bytecode, @@ -44,13 +45,17 @@ impl Vm { impl VmInterface for Vm { type TracerDispatcher = TracerDispatcher; - fn push_transaction(&mut self, tx: Transaction) { - crate::vm_1_3_2::vm_with_bootloader::push_transaction_to_bootloader_memory( - &mut self.vm, - &tx, - self.system_env.execution_mode.glue_into(), - None, - ) + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { + let compressed_bytecodes = + crate::vm_1_3_2::vm_with_bootloader::push_transaction_to_bootloader_memory( + &mut self.vm, + &tx, + self.system_env.execution_mode.glue_into(), + None, + ); + PushTransactionResult { + compressed_bytecodes: compressed_bytecodes.into(), + } } fn inspect( diff --git a/core/lib/multivm/src/versions/vm_1_3_2/vm_with_bootloader.rs b/core/lib/multivm/src/versions/vm_1_3_2/vm_with_bootloader.rs index d1acdf7708e8..fd4d483fba5e 100644 --- a/core/lib/multivm/src/versions/vm_1_3_2/vm_with_bootloader.rs +++ b/core/lib/multivm/src/versions/vm_1_3_2/vm_with_bootloader.rs @@ -442,7 +442,7 @@ pub fn get_bootloader_memory( let mut previous_compressed: usize = 0; let mut already_included_txs_size = 0; for (tx_index_in_block, tx) in txs.into_iter().enumerate() { - let compressed_bytecodes = predefined_compressed_bytecodes[tx_index_in_block].clone(); + let compressed_bytecodes = &predefined_compressed_bytecodes[tx_index_in_block]; let mut total_compressed_len_words = 0; for i in compressed_bytecodes.iter() { @@ -475,7 +475,7 @@ pub fn push_transaction_to_bootloader_memory( tx: &Transaction, execution_mode: TxExecutionMode, explicit_compressed_bytecodes: Option>, -) { +) -> Vec { let tx: TransactionData = tx.clone().into(); let block_gas_per_pubdata_byte = vm.block_context.context.block_gas_price_per_pubdata(); let overhead = tx.overhead_gas(block_gas_per_pubdata_byte as u32); @@ -485,7 +485,7 @@ pub fn push_transaction_to_bootloader_memory( execution_mode, overhead, explicit_compressed_bytecodes, - ); + ) } pub fn push_raw_transaction_to_bootloader_memory( @@ -494,7 +494,7 @@ pub fn push_raw_transaction_to_bootloader_memory>, -) { +) -> Vec { let tx_index_in_block = vm.bootloader_state.free_tx_index(); let already_included_txs_size = vm.bootloader_state.free_tx_offset(); @@ -555,7 +555,7 @@ pub fn push_raw_transaction_to_bootloader_memory, + compressed_bytecodes: &[CompressedBytecodeInfo], ) -> Vec<(usize, U256)> { let overhead_gas = tx.overhead_gas(block_gas_per_pubdata); let trusted_gas_limit = tx.trusted_gas_limit(block_gas_per_pubdata); @@ -604,7 +605,7 @@ pub(crate) fn get_bootloader_memory_for_encoded_tx( predefined_overhead: u32, trusted_gas_limit: u32, previous_compressed_bytecode_size: usize, - compressed_bytecodes: Vec, + compressed_bytecodes: &[CompressedBytecodeInfo], ) -> Vec<(usize, U256)> { let mut memory: Vec<(usize, U256)> = Vec::default(); let bootloader_description_offset = @@ -640,8 +641,8 @@ pub(crate) fn get_bootloader_memory_for_encoded_tx( COMPRESSED_BYTECODES_OFFSET + 1 + previous_compressed_bytecode_size; let memory_addition: Vec<_> = compressed_bytecodes - .into_iter() - .flat_map(|x| bytecode::encode_call(&x)) + .iter() + .flat_map(bytecode::encode_call) .collect(); let memory_addition = bytes_to_be_words(memory_addition); diff --git a/core/lib/multivm/src/versions/vm_1_4_1/vm.rs b/core/lib/multivm/src/versions/vm_1_4_1/vm.rs index 4122ee94e66a..1c38958bb318 100644 --- a/core/lib/multivm/src/versions/vm_1_4_1/vm.rs +++ b/core/lib/multivm/src/versions/vm_1_4_1/vm.rs @@ -9,7 +9,7 @@ use crate::{ interface::{ storage::{StoragePtr, WriteStorage}, BytecodeCompressionError, BytecodeCompressionResult, CurrentExecutionState, - FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionMode, + FinishedL1Batch, L1BatchEnv, L2BlockEnv, PushTransactionResult, SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, }, utils::events::extract_l2tol1logs_from_l1_messenger, @@ -81,9 +81,14 @@ impl Vm { impl VmInterface for Vm { type TracerDispatcher = TracerDispatcher; - /// Push tx into memory for the future execution - fn push_transaction(&mut self, tx: Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { self.push_transaction_with_compression(tx, true); + PushTransactionResult { + compressed_bytecodes: self + .bootloader_state + .get_last_tx_compressed_bytecodes() + .into(), + } } /// Execute VM with custom tracers. diff --git a/core/lib/multivm/src/versions/vm_1_4_2/vm.rs b/core/lib/multivm/src/versions/vm_1_4_2/vm.rs index fe2015debd2b..ca69a191e26f 100644 --- a/core/lib/multivm/src/versions/vm_1_4_2/vm.rs +++ b/core/lib/multivm/src/versions/vm_1_4_2/vm.rs @@ -11,7 +11,7 @@ use crate::{ interface::{ storage::{StoragePtr, WriteStorage}, BytecodeCompressionError, BytecodeCompressionResult, CurrentExecutionState, - FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionMode, + FinishedL1Batch, L1BatchEnv, L2BlockEnv, PushTransactionResult, SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, }, utils::events::extract_l2tol1logs_from_l1_messenger, @@ -83,9 +83,14 @@ impl Vm { impl VmInterface for Vm { type TracerDispatcher = TracerDispatcher; - /// Push tx into memory for the future execution - fn push_transaction(&mut self, tx: Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { self.push_transaction_with_compression(tx, true); + PushTransactionResult { + compressed_bytecodes: self + .bootloader_state + .get_last_tx_compressed_bytecodes() + .into(), + } } /// Execute VM with custom tracers. diff --git a/core/lib/multivm/src/versions/vm_boojum_integration/vm.rs b/core/lib/multivm/src/versions/vm_boojum_integration/vm.rs index ebc0a511d203..bfd055a5cc84 100644 --- a/core/lib/multivm/src/versions/vm_boojum_integration/vm.rs +++ b/core/lib/multivm/src/versions/vm_boojum_integration/vm.rs @@ -9,7 +9,7 @@ use crate::{ interface::{ storage::{StoragePtr, WriteStorage}, BytecodeCompressionError, BytecodeCompressionResult, CurrentExecutionState, - FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionMode, + FinishedL1Batch, L1BatchEnv, L2BlockEnv, PushTransactionResult, SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, }, utils::events::extract_l2tol1logs_from_l1_messenger, @@ -81,9 +81,14 @@ impl Vm { impl VmInterface for Vm { type TracerDispatcher = TracerDispatcher; - /// Push tx into memory for the future execution - fn push_transaction(&mut self, tx: Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult { self.push_transaction_with_compression(tx, true); + PushTransactionResult { + compressed_bytecodes: self + .bootloader_state + .get_last_tx_compressed_bytecodes() + .into(), + } } /// Execute VM with custom tracers. diff --git a/core/lib/multivm/src/versions/vm_fast/vm.rs b/core/lib/multivm/src/versions/vm_fast/vm.rs index 39c9b3c56566..88e0b10b5eaf 100644 --- a/core/lib/multivm/src/versions/vm_fast/vm.rs +++ b/core/lib/multivm/src/versions/vm_fast/vm.rs @@ -13,7 +13,7 @@ use zksync_types::{ BYTES_PER_ENUMERATION_INDEX, }, AccountTreeId, StorageKey, StorageLog, StorageLogKind, StorageLogWithPreviousValue, - BOOTLOADER_ADDRESS, H160, H256, KNOWN_CODES_STORAGE_ADDRESS, L1_MESSENGER_ADDRESS, + Transaction, BOOTLOADER_ADDRESS, H160, H256, KNOWN_CODES_STORAGE_ADDRESS, L1_MESSENGER_ADDRESS, L2_BASE_TOKEN_ADDRESS, U256, }; use zksync_utils::{bytecode::hash_bytecode, h256_to_u256, u256_to_h256}; @@ -35,10 +35,10 @@ use crate::{ interface::{ storage::{ImmutableStorageView, ReadStorage, StoragePtr, StorageView}, BytecodeCompressionError, BytecodeCompressionResult, CurrentExecutionState, - ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, Refunds, SystemEnv, - TxRevertReason, VmEvent, VmExecutionLogs, VmExecutionMode, VmExecutionResultAndLogs, - VmExecutionStatistics, VmFactory, VmInterface, VmInterfaceHistoryEnabled, VmRevertReason, - VmTrackingContracts, + ExecutionResult, FinishedL1Batch, Halt, L1BatchEnv, L2BlockEnv, PushTransactionResult, + Refunds, SystemEnv, TxRevertReason, VmEvent, VmExecutionLogs, VmExecutionMode, + VmExecutionResultAndLogs, VmExecutionStatistics, VmFactory, VmInterface, + VmInterfaceHistoryEnabled, VmRevertReason, VmTrackingContracts, }, is_supported_by_fast_vm, utils::events::extract_l2tol1logs_from_l1_messenger, @@ -553,8 +553,14 @@ where impl VmInterface for Vm { type TracerDispatcher = Tr; - fn push_transaction(&mut self, tx: zksync_types::Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { self.push_transaction_inner(tx, 0, true); + PushTransactionResult { + compressed_bytecodes: self + .bootloader_state + .get_last_tx_compressed_bytecodes() + .into(), + } } fn inspect( diff --git a/core/lib/multivm/src/versions/vm_latest/vm.rs b/core/lib/multivm/src/versions/vm_latest/vm.rs index f4cc1580e935..3a36b008e884 100644 --- a/core/lib/multivm/src/versions/vm_latest/vm.rs +++ b/core/lib/multivm/src/versions/vm_latest/vm.rs @@ -13,7 +13,7 @@ use crate::{ interface::{ storage::{StoragePtr, WriteStorage}, BytecodeCompressionError, BytecodeCompressionResult, CurrentExecutionState, - FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionMode, + FinishedL1Batch, L1BatchEnv, L2BlockEnv, PushTransactionResult, SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, VmTrackingContracts, }, @@ -134,9 +134,14 @@ impl Vm { impl VmInterface for Vm { type TracerDispatcher = TracerDispatcher; - /// Push tx into memory for the future execution - fn push_transaction(&mut self, tx: Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { self.push_transaction_with_compression(tx, true); + PushTransactionResult { + compressed_bytecodes: self + .bootloader_state + .get_last_tx_compressed_bytecodes() + .into(), + } } /// Execute VM with custom tracers. diff --git a/core/lib/multivm/src/versions/vm_m5/vm.rs b/core/lib/multivm/src/versions/vm_m5/vm.rs index 5a26506f3463..3d57d1cd5439 100644 --- a/core/lib/multivm/src/versions/vm_m5/vm.rs +++ b/core/lib/multivm/src/versions/vm_m5/vm.rs @@ -5,8 +5,9 @@ use crate::{ glue::{history_mode::HistoryMode, GlueInto}, interface::{ storage::StoragePtr, BytecodeCompressionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, - SystemEnv, TxExecutionMode, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, - VmInterface, VmInterfaceHistoryEnabled, VmMemoryMetrics, + PushTransactionResult, SystemEnv, TxExecutionMode, VmExecutionMode, + VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, + VmMemoryMetrics, }, vm_m5::{ storage::Storage, @@ -60,12 +61,15 @@ impl VmInterface for Vm { /// Tracers are not supported for here we use `()` as a placeholder type TracerDispatcher = (); - fn push_transaction(&mut self, tx: Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { crate::vm_m5::vm_with_bootloader::push_transaction_to_bootloader_memory( &mut self.vm, &tx, self.system_env.execution_mode.glue_into(), - ) + ); + PushTransactionResult { + compressed_bytecodes: (&[]).into(), // bytecode compression isn't supported + } } fn inspect( diff --git a/core/lib/multivm/src/versions/vm_m6/vm.rs b/core/lib/multivm/src/versions/vm_m6/vm.rs index 1fdc8ae64f80..1ee6aa618220 100644 --- a/core/lib/multivm/src/versions/vm_m6/vm.rs +++ b/core/lib/multivm/src/versions/vm_m6/vm.rs @@ -7,7 +7,7 @@ use crate::{ glue::{history_mode::HistoryMode, GlueInto}, interface::{ storage::StoragePtr, BytecodeCompressionError, BytecodeCompressionResult, FinishedL1Batch, - L1BatchEnv, L2BlockEnv, SystemEnv, TxExecutionMode, VmExecutionMode, + L1BatchEnv, L2BlockEnv, PushTransactionResult, SystemEnv, TxExecutionMode, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, VmMemoryMetrics, }, @@ -72,13 +72,17 @@ impl Vm { impl VmInterface for Vm { type TracerDispatcher = TracerDispatcher; - fn push_transaction(&mut self, tx: Transaction) { - crate::vm_m6::vm_with_bootloader::push_transaction_to_bootloader_memory( - &mut self.vm, - &tx, - self.system_env.execution_mode.glue_into(), - None, - ) + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult { + let compressed_bytecodes = + crate::vm_m6::vm_with_bootloader::push_transaction_to_bootloader_memory( + &mut self.vm, + &tx, + self.system_env.execution_mode.glue_into(), + None, + ); + PushTransactionResult { + compressed_bytecodes: compressed_bytecodes.into(), + } } fn inspect( diff --git a/core/lib/multivm/src/versions/vm_m6/vm_with_bootloader.rs b/core/lib/multivm/src/versions/vm_m6/vm_with_bootloader.rs index 7a9fbb73fe49..ae44e721b0d7 100644 --- a/core/lib/multivm/src/versions/vm_m6/vm_with_bootloader.rs +++ b/core/lib/multivm/src/versions/vm_m6/vm_with_bootloader.rs @@ -491,7 +491,7 @@ fn get_bootloader_memory_v1( predefined_refunds[tx_index_in_block], block_gas_price_per_pubdata as u32, previous_compressed, - compressed_bytecodes, + &compressed_bytecodes, ); previous_compressed += total_compressed_len; @@ -536,7 +536,7 @@ fn get_bootloader_memory_v2( predefined_refunds[tx_index_in_block], block_gas_price_per_pubdata as u32, previous_compressed, - compressed_bytecodes, + &compressed_bytecodes, ); previous_compressed += total_compressed_len_words; @@ -554,7 +554,7 @@ pub fn push_transaction_to_bootloader_memory( tx: &Transaction, execution_mode: TxExecutionMode, explicit_compressed_bytecodes: Option>, -) { +) -> Vec { let tx: TransactionData = tx.clone().into(); let block_gas_per_pubdata_byte = vm.block_context.context.block_gas_price_per_pubdata(); let overhead = tx.overhead_gas(block_gas_per_pubdata_byte as u32); @@ -564,7 +564,7 @@ pub fn push_transaction_to_bootloader_memory( execution_mode, overhead, explicit_compressed_bytecodes, - ); + ) } pub fn push_raw_transaction_to_bootloader_memory( @@ -573,7 +573,7 @@ pub fn push_raw_transaction_to_bootloader_memory( execution_mode: TxExecutionMode, predefined_overhead: u32, explicit_compressed_bytecodes: Option>, -) { +) -> Vec { match vm.vm_subversion { MultiVMSubversion::V1 => push_raw_transaction_to_bootloader_memory_v1( vm, @@ -599,7 +599,7 @@ fn push_raw_transaction_to_bootloader_memory_v1( execution_mode: TxExecutionMode, predefined_overhead: u32, explicit_compressed_bytecodes: Option>, -) { +) -> Vec { let tx_index_in_block = vm.bootloader_state.free_tx_index(); let already_included_txs_size = vm.bootloader_state.free_tx_offset(); @@ -651,7 +651,7 @@ fn push_raw_transaction_to_bootloader_memory_v1( predefined_overhead, trusted_ergs_limit, previous_bytecodes, - compressed_bytecodes, + &compressed_bytecodes, ); vm.state.memory.populate_page( @@ -661,6 +661,7 @@ fn push_raw_transaction_to_bootloader_memory_v1( ); vm.bootloader_state.add_tx_data(encoded_tx_size); vm.bootloader_state.add_compressed_bytecode(compressed_len); + compressed_bytecodes } // Bytecode compression bug fixed @@ -670,7 +671,7 @@ fn push_raw_transaction_to_bootloader_memory_v2( execution_mode: TxExecutionMode, predefined_overhead: u32, explicit_compressed_bytecodes: Option>, -) { +) -> Vec { let tx_index_in_block = vm.bootloader_state.free_tx_index(); let already_included_txs_size = vm.bootloader_state.free_tx_offset(); @@ -730,7 +731,7 @@ fn push_raw_transaction_to_bootloader_memory_v2( predefined_overhead, trusted_ergs_limit, previous_bytecodes, - compressed_bytecodes, + &compressed_bytecodes, ); vm.state.memory.populate_page( @@ -741,6 +742,7 @@ fn push_raw_transaction_to_bootloader_memory_v2( vm.bootloader_state.add_tx_data(encoded_tx_size); vm.bootloader_state .add_compressed_bytecode(compressed_bytecodes_encoding_len_words); + compressed_bytecodes } #[allow(clippy::too_many_arguments)] @@ -752,7 +754,7 @@ fn get_bootloader_memory_for_tx( predefined_refund: u32, block_gas_per_pubdata: u32, previous_compressed_bytecode_size: usize, - compressed_bytecodes: Vec, + compressed_bytecodes: &[CompressedBytecodeInfo], ) -> Vec<(usize, U256)> { let overhead_gas = tx.overhead_gas(block_gas_per_pubdata); let trusted_gas_limit = tx.trusted_gas_limit(block_gas_per_pubdata); @@ -779,7 +781,7 @@ pub(crate) fn get_bootloader_memory_for_encoded_tx( predefined_overhead: u32, trusted_gas_limit: u32, previous_compressed_bytecode_size: usize, - compressed_bytecodes: Vec, + compressed_bytecodes: &[CompressedBytecodeInfo], ) -> Vec<(usize, U256)> { let mut memory: Vec<(usize, U256)> = Vec::default(); let bootloader_description_offset = @@ -815,8 +817,8 @@ pub(crate) fn get_bootloader_memory_for_encoded_tx( COMPRESSED_BYTECODES_OFFSET + 1 + previous_compressed_bytecode_size; let memory_addition: Vec<_> = compressed_bytecodes - .into_iter() - .flat_map(|x| bytecode::encode_call(&x)) + .iter() + .flat_map(bytecode::encode_call) .collect(); let memory_addition = bytes_to_be_words(memory_addition); diff --git a/core/lib/multivm/src/versions/vm_refunds_enhancement/vm.rs b/core/lib/multivm/src/versions/vm_refunds_enhancement/vm.rs index d87fd4d104da..2bcd68bec044 100644 --- a/core/lib/multivm/src/versions/vm_refunds_enhancement/vm.rs +++ b/core/lib/multivm/src/versions/vm_refunds_enhancement/vm.rs @@ -6,7 +6,7 @@ use crate::{ interface::{ storage::{StoragePtr, WriteStorage}, BytecodeCompressionError, BytecodeCompressionResult, CurrentExecutionState, - FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionMode, + FinishedL1Batch, L1BatchEnv, L2BlockEnv, PushTransactionResult, SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, }, vm_latest::HistoryEnabled, @@ -74,9 +74,14 @@ impl Vm { impl VmInterface for Vm { type TracerDispatcher = TracerDispatcher; - /// Push tx into memory for the future execution - fn push_transaction(&mut self, tx: Transaction) { - self.push_transaction_with_compression(tx, true) + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { + self.push_transaction_with_compression(tx, true); + PushTransactionResult { + compressed_bytecodes: self + .bootloader_state + .get_last_tx_compressed_bytecodes() + .into(), + } } /// Execute VM with custom tracers. diff --git a/core/lib/multivm/src/versions/vm_virtual_blocks/vm.rs b/core/lib/multivm/src/versions/vm_virtual_blocks/vm.rs index 28c09590f2ad..497128c64bd9 100644 --- a/core/lib/multivm/src/versions/vm_virtual_blocks/vm.rs +++ b/core/lib/multivm/src/versions/vm_virtual_blocks/vm.rs @@ -6,7 +6,7 @@ use crate::{ interface::{ storage::{StoragePtr, WriteStorage}, BytecodeCompressionError, BytecodeCompressionResult, CurrentExecutionState, - FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionMode, + FinishedL1Batch, L1BatchEnv, L2BlockEnv, PushTransactionResult, SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, }, vm_latest::HistoryEnabled, @@ -74,9 +74,14 @@ impl Vm { impl VmInterface for Vm { type TracerDispatcher = TracerDispatcher; - /// Push tx into memory for the future execution - fn push_transaction(&mut self, tx: Transaction) { - self.push_transaction_with_compression(tx, true) + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { + self.push_transaction_with_compression(tx, true); + PushTransactionResult { + compressed_bytecodes: self + .bootloader_state + .get_last_tx_compressed_bytecodes() + .into(), + } } /// Execute VM with custom tracers. diff --git a/core/lib/multivm/src/vm_instance.rs b/core/lib/multivm/src/vm_instance.rs index 897070345232..43a6c48aa9c5 100644 --- a/core/lib/multivm/src/vm_instance.rs +++ b/core/lib/multivm/src/vm_instance.rs @@ -8,8 +8,8 @@ use crate::{ interface::{ storage::{ImmutableStorageView, ReadStorage, StoragePtr, StorageView}, utils::ShadowVm, - BytecodeCompressionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, - VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, + BytecodeCompressionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, PushTransactionResult, + SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceHistoryEnabled, VmMemoryMetrics, }, tracers::TracerDispatcher, @@ -55,8 +55,7 @@ macro_rules! dispatch_legacy_vm { impl VmInterface for LegacyVmInstance { type TracerDispatcher = TracerDispatcher, H>; - /// Push tx into memory for the future execution - fn push_transaction(&mut self, tx: Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { dispatch_legacy_vm!(self.push_transaction(tx)) } @@ -247,8 +246,8 @@ impl VmInterface for FastVmInsta Tr, ); - fn push_transaction(&mut self, tx: Transaction) { - dispatch_fast_vm!(self.push_transaction(tx)); + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { + dispatch_fast_vm!(self.push_transaction(tx)) } fn inspect( diff --git a/core/lib/vm_interface/src/lib.rs b/core/lib/vm_interface/src/lib.rs index 645e3e7c856e..e0287483067a 100644 --- a/core/lib/vm_interface/src/lib.rs +++ b/core/lib/vm_interface/src/lib.rs @@ -30,10 +30,10 @@ pub use crate::{ outputs::{ BatchTransactionExecutionResult, BootloaderMemory, Call, CallType, CircuitStatistic, CompressedBytecodeInfo, CurrentExecutionState, DeduplicatedWritesMetrics, - ExecutionResult, FinishedL1Batch, L2Block, OneshotTransactionExecutionResult, Refunds, - TransactionExecutionMetrics, TransactionExecutionResult, TxExecutionStatus, VmEvent, - VmExecutionLogs, VmExecutionMetrics, VmExecutionResultAndLogs, VmExecutionStatistics, - VmMemoryMetrics, + ExecutionResult, FinishedL1Batch, L2Block, OneshotTransactionExecutionResult, + PushTransactionResult, Refunds, TransactionExecutionMetrics, + TransactionExecutionResult, TxExecutionStatus, VmEvent, VmExecutionLogs, + VmExecutionMetrics, VmExecutionResultAndLogs, VmExecutionStatistics, VmMemoryMetrics, }, tracer, }, diff --git a/core/lib/vm_interface/src/types/outputs/mod.rs b/core/lib/vm_interface/src/types/outputs/mod.rs index 1fa1cd5d1688..fe25801dd12e 100644 --- a/core/lib/vm_interface/src/types/outputs/mod.rs +++ b/core/lib/vm_interface/src/types/outputs/mod.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + pub use self::{ bytecode::CompressedBytecodeInfo, execution_result::{ @@ -20,3 +22,14 @@ mod execution_state; mod finished_l1batch; mod l2_block; mod statistic; + +/// Result of pushing a transaction to the VM state without executing it. +#[derive(Debug)] +pub struct PushTransactionResult<'a> { + /// Compressed bytecodes for the transaction. If the VM doesn't support bytecode compression, returns + /// an empty slice. + /// + /// Importantly, these bytecodes are not guaranteed to be published by the transaction; + /// e.g., it may run out of gas during publication. + pub compressed_bytecodes: Cow<'a, [CompressedBytecodeInfo]>, +} diff --git a/core/lib/vm_interface/src/utils/dump.rs b/core/lib/vm_interface/src/utils/dump.rs index 288c6445494d..522a455a11ba 100644 --- a/core/lib/vm_interface/src/utils/dump.rs +++ b/core/lib/vm_interface/src/utils/dump.rs @@ -5,9 +5,9 @@ use zksync_types::{block::L2BlockExecutionData, L1BatchNumber, L2BlockNumber, Tr use crate::{ storage::{ReadStorage, StoragePtr, StorageSnapshot, StorageView}, - BytecodeCompressionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, SystemEnv, VmExecutionMode, - VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceExt, VmInterfaceHistoryEnabled, - VmTrackingContracts, + BytecodeCompressionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, PushTransactionResult, + SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, VmInterfaceExt, + VmInterfaceHistoryEnabled, VmTrackingContracts, }; fn create_storage_snapshot( @@ -142,9 +142,9 @@ impl DumpingVm { impl VmInterface for DumpingVm { type TracerDispatcher = Vm::TracerDispatcher; - fn push_transaction(&mut self, tx: Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult { self.record_transaction(tx.clone()); - self.inner.push_transaction(tx); + self.inner.push_transaction(tx) } fn inspect( diff --git a/core/lib/vm_interface/src/utils/shadow.rs b/core/lib/vm_interface/src/utils/shadow.rs index 92eb65a810f7..8cdc899238e4 100644 --- a/core/lib/vm_interface/src/utils/shadow.rs +++ b/core/lib/vm_interface/src/utils/shadow.rs @@ -11,8 +11,8 @@ use super::dump::{DumpingVm, VmDump}; use crate::{ storage::{ReadStorage, StoragePtr, StorageView}, BytecodeCompressionResult, CurrentExecutionState, FinishedL1Batch, L1BatchEnv, L2BlockEnv, - SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, VmInterface, - VmInterfaceHistoryEnabled, VmTrackingContracts, + PushTransactionResult, SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, VmFactory, + VmInterface, VmInterfaceHistoryEnabled, VmTrackingContracts, }; /// Handler for VM divergences. @@ -163,11 +163,30 @@ where ::TracerDispatcher, ); - fn push_transaction(&mut self, tx: Transaction) { + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_> { + let main_result = self.main.push_transaction(tx.clone()); + // Extend lifetime to `'static` so that the result isn't mutably borrowed from the main VM. + // Unfortunately, there's no way to express that this borrow is actually immutable, which would allow not extending the lifetime unless there's a divergence. + let main_result: PushTransactionResult<'static> = PushTransactionResult { + compressed_bytecodes: main_result.compressed_bytecodes.into_owned().into(), + }; + if let Some(shadow) = self.shadow.get_mut() { - shadow.vm.push_transaction(tx.clone()); + let tx_repr = format!("{tx:?}"); // includes little data, so is OK to call proactively + let shadow_result = shadow.vm.push_transaction(tx); + + let mut errors = DivergenceErrors::new(); + errors.check_match( + "bytecodes", + &main_result.compressed_bytecodes, + &shadow_result.compressed_bytecodes, + ); + if let Err(err) = errors.into_result() { + let ctx = format!("pushing transaction {tx_repr}"); + self.report(err.context(ctx)); + } } - self.main.push_transaction(tx); + main_result } fn inspect( diff --git a/core/lib/vm_interface/src/vm.rs b/core/lib/vm_interface/src/vm.rs index 37e33a92b509..3a06d7f80cbe 100644 --- a/core/lib/vm_interface/src/vm.rs +++ b/core/lib/vm_interface/src/vm.rs @@ -15,15 +15,20 @@ use zksync_types::{Transaction, H256}; use crate::{ storage::StoragePtr, BytecodeCompressionResult, FinishedL1Batch, L1BatchEnv, L2BlockEnv, - SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, + PushTransactionResult, SystemEnv, VmExecutionMode, VmExecutionResultAndLogs, }; pub trait VmInterface { /// Lifetime is used to be able to define `Option<&mut _>` as a dispatcher. type TracerDispatcher: Default; - /// Push transaction to bootloader memory. - fn push_transaction(&mut self, tx: Transaction); + /// Pushes a transaction to bootloader memory for future execution with bytecode compression (if it's supported by the VM). + /// + /// # Return value + /// + /// Returns preprocessing results, such as compressed bytecodes. The results may borrow from the VM state, + /// so you may want to inspect results before next operations with the VM, or clone the necessary parts. + fn push_transaction(&mut self, tx: Transaction) -> PushTransactionResult<'_>; /// Executes the next VM step (either next transaction or bootloader or the whole batch) /// with custom tracers. diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 98c0d6b08131..4ebcf5c9a617 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -219,11 +219,10 @@ impl StateKeeper { .wait(IoCursor::for_fetcher(&mut conn.0)) .await? .context("IoCursor::new()")?; - let batch_sealed = ctx - .wait(conn.0.blocks_dal().get_unsealed_l1_batch()) + let pending_batch = ctx + .wait(conn.0.blocks_dal().pending_batch_exists()) .await? - .context("get_unsealed_l1_batch()")? - .is_none(); + .context("pending_batch_exists()")?; let (actions_sender, actions_queue) = ActionQueue::new(); let addr = sync::watch::channel(None).0; let sync_state = SyncState::default(); @@ -259,7 +258,7 @@ impl StateKeeper { last_batch: cursor.l1_batch, last_block: cursor.next_l2_block - 1, last_timestamp: cursor.prev_l2_block_timestamp, - batch_sealed, + batch_sealed: !pending_batch, next_priority_op: PriorityOpId(1), actions_sender, sync_state: sync_state.clone(), diff --git a/core/node/node_sync/Cargo.toml b/core/node/node_sync/Cargo.toml index ccfc8dd8a4e9..9c5b0c000700 100644 --- a/core/node/node_sync/Cargo.toml +++ b/core/node/node_sync/Cargo.toml @@ -43,3 +43,4 @@ zksync_node_test_utils.workspace = true assert_matches.workspace = true once_cell.workspace = true test-casing.workspace = true +backon.workspace = true diff --git a/core/node/node_sync/src/external_io.rs b/core/node/node_sync/src/external_io.rs index 10fb2925015f..5e3a5ce9f46e 100644 --- a/core/node/node_sync/src/external_io.rs +++ b/core/node/node_sync/src/external_io.rs @@ -155,6 +155,14 @@ impl StateKeeperIO for ExternalIO { ) })?; let Some(mut pending_l2_block_header) = pending_l2_block_header else { + tracing::info!( + l1_batch_number = %cursor.l1_batch, + "No pending L2 blocks found; pruning unsealed batch if exists as we need at least one L2 block to initialize" + ); + storage + .blocks_dal() + .delete_unsealed_l1_batch(cursor.l1_batch - 1) + .await?; return Ok((cursor, None)); }; diff --git a/core/node/node_sync/src/fetcher.rs b/core/node/node_sync/src/fetcher.rs index 3f8558ed0ac5..51b9f7c7a060 100644 --- a/core/node/node_sync/src/fetcher.rs +++ b/core/node/node_sync/src/fetcher.rs @@ -114,8 +114,8 @@ impl IoCursorExt for IoCursor { let mut this = Self::new(storage).await?; // It's important to know whether we have opened a new batch already or just sealed the previous one. // Depending on it, we must either insert `OpenBatch` item into the queue, or not. - let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?; - if unsealed_batch.is_none() { + let was_new_batch_open = storage.blocks_dal().pending_batch_exists().await?; + if !was_new_batch_open { this.l1_batch -= 1; // Should continue from the last L1 batch present in the storage } Ok(this) @@ -201,35 +201,3 @@ impl IoCursorExt for IoCursor { new_actions } } - -#[cfg(test)] -mod tests { - use zksync_dal::{ConnectionPool, Core, CoreDal}; - use zksync_node_genesis::{insert_genesis_batch, GenesisParams}; - use zksync_state_keeper::io::IoCursor; - use zksync_types::{block::UnsealedL1BatchHeader, L1BatchNumber}; - - use crate::fetcher::IoCursorExt; - - #[tokio::test] - async fn io_cursor_recognizes_empty_unsealed_batch() -> anyhow::Result<()> { - let pool = ConnectionPool::::test_pool().await; - let mut conn = pool.connection().await.unwrap(); - insert_genesis_batch(&mut conn, &GenesisParams::mock()) - .await - .unwrap(); - conn.blocks_dal() - .insert_l1_batch(UnsealedL1BatchHeader { - number: L1BatchNumber(1), - timestamp: 1, - protocol_version: None, - fee_address: Default::default(), - fee_input: Default::default(), - }) - .await?; - - let io_cursor = IoCursor::for_fetcher(&mut conn).await?; - assert_eq!(io_cursor.l1_batch, L1BatchNumber(1)); - Ok(()) - } -} diff --git a/core/node/node_sync/src/sync_action.rs b/core/node/node_sync/src/sync_action.rs index 8cb90d24fe84..e3fd56ae9bb0 100644 --- a/core/node/node_sync/src/sync_action.rs +++ b/core/node/node_sync/src/sync_action.rs @@ -33,6 +33,18 @@ impl ActionQueueSender { Ok(()) } + /// Pushes a single action into the queue without checking validity of the sequence. + /// + /// Useful to simulate situations where only a part of the sequence was executed on the node. + #[cfg(test)] + pub async fn push_action_unchecked(&self, action: SyncAction) -> anyhow::Result<()> { + self.0 + .send(action) + .await + .map_err(|_| anyhow::anyhow!("node action processor stopped"))?; + Ok(()) + } + /// Checks whether the action sequence is valid. /// Returned error is meant to be used as a panic message, since an invalid sequence represents an unrecoverable /// error. This function itself does not panic for the ease of testing. diff --git a/core/node/node_sync/src/tests.rs b/core/node/node_sync/src/tests.rs index 3f5791cdf24c..1ae148709b22 100644 --- a/core/node/node_sync/src/tests.rs +++ b/core/node/node_sync/src/tests.rs @@ -2,6 +2,7 @@ use std::{iter, sync::Arc, time::Duration}; +use backon::{ConstantBuilder, Retryable}; use test_casing::test_casing; use tokio::{sync::watch, task::JoinHandle}; use zksync_contracts::BaseSystemContractsHashes; @@ -18,7 +19,7 @@ use zksync_state_keeper::{ }; use zksync_types::{ api, - block::L2BlockHasher, + block::{L2BlockHasher, UnsealedL1BatchHeader}, fee_model::{BatchFeeInput, PubdataIndependentBatchFeeModelInput}, snapshots::SnapshotRecoveryStatus, Address, L1BatchNumber, L2BlockNumber, L2ChainId, ProtocolVersionId, Transaction, H256, @@ -652,3 +653,101 @@ async fn external_io_with_multiple_l1_batches() { assert_eq!(fictive_l2_block.timestamp, 2); assert_eq!(fictive_l2_block.l2_tx_count, 0); } + +async fn wait_for_batch_to_be_open( + pool: &ConnectionPool, + number: L1BatchNumber, +) -> anyhow::Result { + (|| async { + let mut storage = pool.connection().await.unwrap(); + let unsealed_batch = storage.blocks_dal().get_unsealed_l1_batch().await?; + + if let Some(unsealed_batch) = unsealed_batch { + if unsealed_batch.number == number { + Ok(unsealed_batch) + } else { + Err(anyhow::anyhow!("L1 batch #{number} is not open yet")) + } + } else { + Err(anyhow::anyhow!("No unsealed L1 batch found yet")) + } + }) + .retry( + &ConstantBuilder::default() + .with_delay(Duration::from_millis(200)) + .with_max_times(20), + ) + .await +} + +#[tokio::test] +async fn external_io_empty_unsealed_batch() { + let pool = ConnectionPool::::test_pool().await; + let mut storage = pool.connection().await.unwrap(); + ensure_genesis(&mut storage).await; + drop(storage); + + let open_batch_one = open_l1_batch(1, 1, 1); + let tx = create_l2_transaction(10, 100); + let tx_hash = tx.hash(); + let tx = FetchedTransaction::new(tx.into()); + let open_batch_two = open_l1_batch(2, 2, 3); + let fictive_l2_block = SyncAction::L2Block { + params: L2BlockParams { + timestamp: 2, + virtual_blocks: 0, + }, + number: L2BlockNumber(2), + }; + let actions1 = vec![open_batch_one, tx.into(), SyncAction::SealL2Block]; + let actions2 = vec![fictive_l2_block, SyncAction::SealBatch]; + + let (actions_sender, action_queue) = ActionQueue::new(); + let client = MockMainNodeClient::default(); + let state_keeper = + StateKeeperHandles::new(pool.clone(), client, action_queue, &[&[tx_hash]]).await; + actions_sender.push_actions(actions1).await.unwrap(); + actions_sender.push_actions(actions2).await.unwrap(); + // Unchecked insert of batch #2 to simulate restart in the middle of processing an action sequence + // In other words batch #2 is inserted completely empty with no blocks/txs present in it + actions_sender + .push_action_unchecked(open_batch_two.clone()) + .await + .unwrap(); + // Wait until the L2 block is sealed. + state_keeper.wait_for_local_block(L2BlockNumber(2)).await; + + // Wait until L1 batch #2 is opened and persisted. + let unsealed_batch = wait_for_batch_to_be_open(&pool, L1BatchNumber(2)) + .await + .unwrap(); + assert_eq!(unsealed_batch.number, L1BatchNumber(2)); + assert_eq!(unsealed_batch.timestamp, 2); + + // Prepare the rest of batch #2 + let tx = create_l2_transaction(20, 200); + let tx_hash = tx.hash(); + let tx = FetchedTransaction::new(tx.into()); + let fictive_l2_block = SyncAction::L2Block { + params: L2BlockParams { + timestamp: 4, + virtual_blocks: 0, + }, + number: L2BlockNumber(4), + }; + let actions1 = vec![open_batch_two, tx.into(), SyncAction::SealL2Block]; + let actions2 = vec![fictive_l2_block, SyncAction::SealBatch]; + + // Restart state keeper + let (actions_sender, action_queue) = ActionQueue::new(); + let client = MockMainNodeClient::default(); + let state_keeper = + StateKeeperHandles::new(pool.clone(), client, action_queue, &[&[tx_hash]]).await; + actions_sender.push_actions(actions1).await.unwrap(); + actions_sender.push_actions(actions2).await.unwrap(); + + let hash_task = tokio::spawn(mock_l1_batch_hash_computation(pool.clone(), 1)); + // Wait until the block #4 is sealed. + state_keeper.wait_for_local_block(L2BlockNumber(4)).await; + hash_task.await.unwrap(); +}