diff --git a/core/lib/dal/.sqlx/query-0feb62586273cb15cea173d37249e2f7423da6bc8d76dbde748cce709c5b6a47.json b/core/lib/dal/.sqlx/query-0feb62586273cb15cea173d37249e2f7423da6bc8d76dbde748cce709c5b6a47.json new file mode 100644 index 000000000000..c9950d3f230d --- /dev/null +++ b/core/lib/dal/.sqlx/query-0feb62586273cb15cea173d37249e2f7423da6bc8d76dbde748cce709c5b6a47.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n COUNT(*)\n FROM\n eth_txs\n WHERE\n confirmed_eth_tx_history_id IS NULL AND has_failed = FALSE\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "0feb62586273cb15cea173d37249e2f7423da6bc8d76dbde748cce709c5b6a47" +} diff --git a/core/lib/dal/.sqlx/query-868bfdc5d8ee5eab395fa690891751dfd285628a75a35b152bccb3c73e9cc057.json b/core/lib/dal/.sqlx/query-35972060482761997ddf9b8bfc0b75c6ca4c93438ad66d37d6456d8517a03354.json similarity index 75% rename from core/lib/dal/.sqlx/query-868bfdc5d8ee5eab395fa690891751dfd285628a75a35b152bccb3c73e9cc057.json rename to core/lib/dal/.sqlx/query-35972060482761997ddf9b8bfc0b75c6ca4c93438ad66d37d6456d8517a03354.json index c2e662ef376e..ff58237b5396 100644 --- a/core/lib/dal/.sqlx/query-868bfdc5d8ee5eab395fa690891751dfd285628a75a35b152bccb3c73e9cc057.json +++ b/core/lib/dal/.sqlx/query-35972060482761997ddf9b8bfc0b75c6ca4c93438ad66d37d6456d8517a03354.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL\n AND confirmed_eth_tx_history_id IS NULL\n AND is_gateway = $2\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NOT NULL\n AND eth_txs.from_addr IS NOT DISTINCT FROM $1\n AND is_gateway = $2\n )\n ORDER BY\n id\n ", + "query": "\n SELECT\n *\n FROM\n eth_txs\n WHERE\n from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL\n AND confirmed_eth_tx_history_id IS NULL\n AND has_failed = FALSE\n AND is_gateway = $2\n AND id <= (\n SELECT\n COALESCE(MAX(eth_tx_id), 0)\n FROM\n eth_txs_history\n JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id\n WHERE\n eth_txs_history.sent_at_block IS NOT NULL\n AND eth_txs.from_addr IS NOT DISTINCT FROM $1\n AND is_gateway = $2\n )\n ORDER BY\n id\n ", "describe": { "columns": [ { @@ -109,5 +109,5 @@ true ] }, - "hash": "868bfdc5d8ee5eab395fa690891751dfd285628a75a35b152bccb3c73e9cc057" + "hash": "35972060482761997ddf9b8bfc0b75c6ca4c93438ad66d37d6456d8517a03354" } diff --git a/core/lib/dal/.sqlx/query-8e3e83b7f39d969a20f216dbb10b1730c63af98e40a67dfe9ae8e3197271fc60.json b/core/lib/dal/.sqlx/query-8e3e83b7f39d969a20f216dbb10b1730c63af98e40a67dfe9ae8e3197271fc60.json new file mode 100644 index 000000000000..67027ac94c22 --- /dev/null +++ b/core/lib/dal/.sqlx/query-8e3e83b7f39d969a20f216dbb10b1730c63af98e40a67dfe9ae8e3197271fc60.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM eth_txs\n WHERE has_failed = TRUE\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "8e3e83b7f39d969a20f216dbb10b1730c63af98e40a67dfe9ae8e3197271fc60" +} diff --git a/core/lib/dal/.sqlx/query-5b7d2612dd2dd064ea0095b40669754ed7219a77459ef40cd99d7d4d0749e538.json b/core/lib/dal/.sqlx/query-cd72562d28780140b4991408b2bf78b615362faff8e3f49ca1bda2d4e155f93d.json similarity index 73% rename from core/lib/dal/.sqlx/query-5b7d2612dd2dd064ea0095b40669754ed7219a77459ef40cd99d7d4d0749e538.json rename to core/lib/dal/.sqlx/query-cd72562d28780140b4991408b2bf78b615362faff8e3f49ca1bda2d4e155f93d.json index 88bac1a36022..ecd023024303 100644 --- a/core/lib/dal/.sqlx/query-5b7d2612dd2dd064ea0095b40669754ed7219a77459ef40cd99d7d4d0749e538.json +++ b/core/lib/dal/.sqlx/query-cd72562d28780140b4991408b2bf78b615362faff8e3f49ca1bda2d4e155f93d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n COUNT(*)\n FROM\n eth_txs\n WHERE\n confirmed_eth_tx_history_id IS NULL\n AND is_gateway = FALSE\n ", + "query": "\n SELECT\n COUNT(*)\n FROM\n eth_txs\n WHERE\n confirmed_eth_tx_history_id IS NULL\n AND has_failed = FALSE\n AND is_gateway = FALSE\n ", "describe": { "columns": [ { @@ -16,5 +16,5 @@ null ] }, - "hash": "5b7d2612dd2dd064ea0095b40669754ed7219a77459ef40cd99d7d4d0749e538" + "hash": "cd72562d28780140b4991408b2bf78b615362faff8e3f49ca1bda2d4e155f93d" } diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index 27b37e6a123c..4f2ac5b928bc 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -1,6 +1,6 @@ use std::{convert::TryFrom, str::FromStr}; -use anyhow::Context as _; +use anyhow::{anyhow, Context as _}; use sqlx::types::chrono::{DateTime, Utc}; use zksync_db_connection::{connection::Connection, interpolate_query, match_query_as}; use zksync_types::{ @@ -35,6 +35,7 @@ impl EthSenderDal<'_, '_> { WHERE from_addr IS NOT DISTINCT FROM $1 -- can't just use equality as NULL != NULL AND confirmed_eth_tx_history_id IS NULL + AND has_failed = FALSE AND is_gateway = $2 AND id <= ( SELECT @@ -69,6 +70,7 @@ impl EthSenderDal<'_, '_> { eth_txs WHERE confirmed_eth_tx_history_id IS NULL + AND has_failed = FALSE AND is_gateway = FALSE "# ) @@ -634,7 +636,13 @@ impl EthSenderDal<'_, '_> { .context("count field is missing") } - pub async fn clear_failed_transactions(&mut self) -> sqlx::Result<()> { + pub async fn clear_failed_transactions(&mut self) -> anyhow::Result<()> { + if self.count_all_inflight_txs().await.unwrap() != 0 { + return Err(anyhow!( + "There are still some in-flight txs, cannot proceed. \ + Please wait for eth-sender to process all in-flight txs and try again!" + )); + } sqlx::query!( r#" DELETE FROM eth_txs diff --git a/core/lib/eth_client/src/clients/mock.rs b/core/lib/eth_client/src/clients/mock.rs index b33554b6292c..4c636381d43b 100644 --- a/core/lib/eth_client/src/clients/mock.rs +++ b/core/lib/eth_client/src/clients/mock.rs @@ -137,6 +137,7 @@ impl MockSettlementLayerInner { unimplemented!("Getting nonce for custom account is not supported"); } + tracing::info!("Getting nonce for account {address:?} at block {block:?}, pending nonce: {}, current nonce: {}", self.pending_nonce, self.current_nonce); match block { web3::BlockNumber::Number(block_number) => { let mut nonce_range = self.nonces.range(..=block_number.as_u64()); diff --git a/core/node/block_reverter/src/lib.rs b/core/node/block_reverter/src/lib.rs index 3ea7ff9db590..c7397ee475a7 100644 --- a/core/node/block_reverter/src/lib.rs +++ b/core/node/block_reverter/src/lib.rs @@ -606,21 +606,6 @@ impl BlockReverter { /// Clears failed L1 transactions. pub async fn clear_failed_l1_transactions(&self) -> anyhow::Result<()> { tracing::info!("Clearing failed L1 transactions"); - if self - .connection_pool - .connection() - .await? - .eth_sender_dal() - .count_all_inflight_txs() - .await - .unwrap() - != 0 - { - tracing::error!( - "There are still some in-flight txs, cannot proceed. \ - Please wait for eth-sender to process all in-flight txs and try again!" - ); - } self.connection_pool .connection() .await? diff --git a/core/node/eth_sender/src/eth_tx_aggregator.rs b/core/node/eth_sender/src/eth_tx_aggregator.rs index a20bb1b8a2d4..b1227dd27a82 100644 --- a/core/node/eth_sender/src/eth_tx_aggregator.rs +++ b/core/node/eth_sender/src/eth_tx_aggregator.rs @@ -21,7 +21,7 @@ use zksync_types::{ protocol_version::{L1VerifierConfig, PACKED_SEMVER_MINOR_MASK}, pubdata_da::PubdataDA, settlement::SettlementMode, - web3::{contract::Error as Web3ContractError, BlockNumber}, + web3::contract::Error as Web3ContractError, Address, L2ChainId, ProtocolVersionId, SLChainId, H256, U256, }; @@ -49,17 +49,13 @@ pub struct MulticallData { pub struct EthTxAggregator { aggregator: Aggregator, eth_client: Box, + eth_client_blobs: Option>, config: SenderConfig, timelock_contract_address: Address, l1_multicall3_address: Address, pub(super) state_transition_chain_contract: Address, functions: ZkSyncFunctions, rollup_chain_id: L2ChainId, - /// If set to `Some` node is operating in the 4844 mode with two operator - /// addresses at play: the main one and the custom address for sending commit - /// transactions. The `Some` then contains the address of this custom operator - /// address. - custom_commit_sender_addr: Option
, pool: ConnectionPool, settlement_mode: SettlementMode, sl_chain_id: SLChainId, @@ -77,11 +73,11 @@ impl EthTxAggregator { config: SenderConfig, aggregator: Aggregator, eth_client: Box, + eth_client_blobs: Option>, timelock_contract_address: Address, l1_multicall3_address: Address, state_transition_chain_contract: Address, rollup_chain_id: L2ChainId, - custom_commit_sender_addr: Option
, settlement_mode: SettlementMode, ) -> Self { let eth_client = eth_client.for_component("eth_tx_aggregator"); @@ -93,12 +89,12 @@ impl EthTxAggregator { config, aggregator, eth_client, + eth_client_blobs, timelock_contract_address, l1_multicall3_address, state_transition_chain_contract, functions, rollup_chain_id, - custom_commit_sender_addr, pool, settlement_mode, sl_chain_id, @@ -598,11 +594,14 @@ impl EthTxAggregator { ) -> Result { let mut transaction = storage.start_transaction().await.unwrap(); let op_type = aggregated_op.get_action_type(); - // We may be using a custom sender for commit transactions, so use this + // We may be using a custom sender for commit tester.assert_inflight_txs_count_equals(0).await;transactions, so use this // var whatever it actually is: a `None` for single-addr operator or `Some` // for multi-addr operator in 4844 mode. let sender_addr = match (op_type, is_gateway) { - (AggregatedActionType::Commit, false) => self.custom_commit_sender_addr, + (AggregatedActionType::Commit, false) => self + .eth_client_blobs + .as_deref() + .map(BoundEthInterface::sender_account), (_, _) => None, }; let nonce = self.get_next_nonce(&mut transaction, sender_addr).await?; @@ -659,23 +658,14 @@ impl EthTxAggregator { .await .unwrap() .unwrap_or(0); + let client = if from_addr.is_some() { + self.eth_client_blobs.as_deref().unwrap() + } else { + self.eth_client.as_ref() + }; // We can execute some txs using operator account or remove some txs from the database // We have to consider this fact and get the max nonce. - Ok(if from_addr.is_none() { - let base_nonce = self.eth_client.pending_nonce().await.unwrap().as_u64(); - db_nonce.max(base_nonce) - } else { - let base_nonce_custom_commit_sender = (*self.eth_client) - .as_ref() - .nonce_at_for_account( - self.custom_commit_sender_addr - .expect("custom_commit_sender_addr should not be empty"), - BlockNumber::Pending, - ) - .await - .unwrap() - .as_u64(); - db_nonce.max(base_nonce_custom_commit_sender) - }) + let base_nonce = client.pending_nonce().await.unwrap().as_u64(); + Ok(db_nonce.max(base_nonce)) } } diff --git a/core/node/eth_sender/src/tester.rs b/core/node/eth_sender/src/tester.rs index 86a8c477f9fe..c73cb7bc3773 100644 --- a/core/node/eth_sender/src/tester.rs +++ b/core/node/eth_sender/src/tester.rs @@ -5,7 +5,7 @@ use zksync_config::{ ContractsConfig, EthConfig, GasAdjusterConfig, }; use zksync_dal::{Connection, ConnectionPool, Core, CoreDal}; -use zksync_eth_client::{clients::MockSettlementLayer, BaseFees, BoundEthInterface}; +use zksync_eth_client::{clients::MockSettlementLayer, BaseFees}; use zksync_l1_contract_interface::i_executor::methods::{ExecuteBatches, ProveBatches}; use zksync_node_fee_model::l1_gas_price::{GasAdjuster, GasAdjusterClient}; use zksync_node_test_utils::{create_l1_batch, l1_batch_metadata_to_commitment_artifacts}; @@ -119,7 +119,7 @@ pub(crate) struct EthSenderTester { pub gas_adjuster: Arc, pub pubdata_sending_mode: PubdataSendingMode, next_l1_batch_number_to_seal: L1BatchNumber, - next_l1_batch_number_to_commit: L1BatchNumber, + pub next_l1_batch_number_to_commit: L1BatchNumber, next_l1_batch_number_to_prove: L1BatchNumber, next_l1_batch_number_to_execute: L1BatchNumber, tx_sent_in_last_iteration_count: usize, @@ -238,13 +238,6 @@ impl EthSenderTester { let eth_sender = eth_sender_config.sender.clone().unwrap(); - let custom_commit_sender_addr = - if aggregator_operate_4844_mode && commitment_mode == L1BatchCommitmentMode::Rollup { - Some(gateway_blobs.sender_account()) - } else { - None - }; - let aggregator = EthTxAggregator::new( connection_pool.clone(), SenderConfig { @@ -260,12 +253,16 @@ impl EthSenderTester { commitment_mode, ), gateway.clone(), + if aggregator_operate_4844_mode && commitment_mode == L1BatchCommitmentMode::Rollup { + Some(gateway_blobs.clone()) + } else { + None + }, // ZKsync contract address Address::random(), contracts_config.l1_multicall3_addr, STATE_TRANSITION_CONTRACT_ADDRESS, Default::default(), - custom_commit_sender_addr, SettlementMode::SettlesToL1, ) .await; @@ -595,6 +592,14 @@ impl EthSenderTester { ); } + pub async fn clear_failed_txs_failed_attempts(&mut self) -> anyhow::Result<()> { + self.storage() + .await + .eth_sender_dal() + .clear_failed_transactions() + .await + } + pub async fn assert_inflight_txs_count_equals(&mut self, value: usize) { let inflight_count = if !self.is_l2 { //sanity check @@ -626,7 +631,8 @@ impl EthSenderTester { assert_eq!( inflight_count, value, - "Unexpected number of in-flight transactions" + "Unexpected number of in-flight transactions, expected: {}, got: {}", + value, inflight_count ); } } diff --git a/core/node/eth_sender/src/tests.rs b/core/node/eth_sender/src/tests.rs index 9e844a8b8537..aa95d43ab77c 100644 --- a/core/node/eth_sender/src/tests.rs +++ b/core/node/eth_sender/src/tests.rs @@ -391,10 +391,9 @@ async fn three_scenarios(commitment_mode: L1BatchCommitmentMode) -> anyhow::Resu Ok(()) } -#[should_panic(expected = "We can't operate after tx fail")] #[test_casing(2, COMMITMENT_MODES)] #[test_log::test(tokio::test)] -async fn failed_eth_tx(commitment_mode: L1BatchCommitmentMode) { +async fn failed_eth_tx_prevents_sending_new_transactions(commitment_mode: L1BatchCommitmentMode) { let connection_pool = ConnectionPool::::test_pool().await; let mut tester = EthSenderTester::new( connection_pool.clone(), @@ -413,7 +412,97 @@ async fn failed_eth_tx(commitment_mode: L1BatchCommitmentMode) { first_batch.fail_commit_tx(&mut tester).await; + // failed transaction shouldn't cause panic tester.run_eth_sender_tx_manager_iteration().await; + tester.assert_inflight_txs_count_equals(0).await; + + let second_batch = TestL1Batch::sealed(&mut tester).await; + second_batch.save_commit_tx(&mut tester).await; + + tester.run_eth_sender_tx_manager_iteration().await; + // we shouldn't send any new transactions when there are failed transactions in database + tester.assert_just_sent_tx_count_equals(0).await; +} + +#[test_casing(2, COMMITMENT_MODES)] +#[test_log::test(tokio::test)] +async fn failed_eth_tx_doesnt_prevent_resending_txs(commitment_mode: L1BatchCommitmentMode) { + let connection_pool = ConnectionPool::::test_pool().await; + let mut tester = EthSenderTester::new( + connection_pool.clone(), + vec![100; 100], + false, + true, + commitment_mode, + ) + .await; + + let _genesis_batch = TestL1Batch::sealed(&mut tester).await; + let first_batch = TestL1Batch::sealed(&mut tester).await; + let second_batch = TestL1Batch::sealed(&mut tester).await; + + first_batch.save_commit_tx(&mut tester).await; + second_batch.save_commit_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + // sanity check + tester.assert_inflight_txs_count_equals(2).await; + + first_batch.fail_commit_tx(&mut tester).await; + + tester.run_eth_sender_tx_manager_iteration().await; + // the second batch commit tx is not confirmed yet + tester.assert_inflight_txs_count_equals(1).await; + + tester.run_eth_sender_tx_manager_iteration().await; + // it should try to resend the second batch's commit transaction + tester.assert_just_sent_tx_count_equals(1).await; + + second_batch.fail_commit_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + // the second batch commit tx should be marked as failed in database + tester.assert_inflight_txs_count_equals(0).await; +} + +#[test_casing(2, COMMITMENT_MODES)] +#[test_log::test(tokio::test)] +async fn clearing_failed_transactions_resumes_sending_new_transactions( + commitment_mode: L1BatchCommitmentMode, +) { + let connection_pool = ConnectionPool::::test_pool().await; + let mut tester = EthSenderTester::new( + connection_pool.clone(), + vec![100; 100], + false, + true, + commitment_mode, + ) + .await; + + let _genesis_batch = TestL1Batch::sealed(&mut tester).await; + let first_batch = TestL1Batch::sealed(&mut tester).await; + + first_batch.save_commit_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + // sanity check + tester.assert_inflight_txs_count_equals(1).await; + + first_batch.fail_commit_tx(&mut tester).await; + + tester.run_eth_sender_tx_manager_iteration().await; + // sanity check + tester.assert_inflight_txs_count_equals(0).await; + + tester.clear_failed_txs_failed_attempts().await.unwrap(); + // hack to reset the number of sent transactions + tester.next_l1_batch_number_to_commit -= 1; + // try again + first_batch.save_commit_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + tester.assert_inflight_txs_count_equals(1).await; + + first_batch.execute_commit_tx(&mut tester).await; + tester.run_eth_sender_tx_manager_iteration().await; + tester.assert_inflight_txs_count_equals(0).await; } #[test_log::test(tokio::test)] diff --git a/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs b/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs index a39cf7226966..fd2739a4882d 100644 --- a/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs +++ b/core/node/node_framework/src/implementations/layers/eth_sender/aggregator.rs @@ -1,6 +1,5 @@ use anyhow::Context; use zksync_config::configs::{eth_sender::EthConfig, ContractsConfig}; -use zksync_eth_client::BoundEthInterface; use zksync_eth_sender::{Aggregator, EthTxAggregator}; use zksync_types::{commitment::L1BatchCommitmentMode, settlement::SettlementMode, L2ChainId}; @@ -97,15 +96,11 @@ impl WiringLayer for EthTxAggregatorLayer { let object_store = input.object_store.0; // Create and add tasks. - let eth_client_blobs_addr = eth_client_blobs - .as_deref() - .map(BoundEthInterface::sender_account); - let config = self.eth_sender_config.sender.context("sender")?; let aggregator = Aggregator::new( config.clone(), object_store, - eth_client_blobs_addr.is_some(), + eth_client_blobs.is_some(), self.l1_batch_commit_data_generator_mode, ); @@ -114,11 +109,11 @@ impl WiringLayer for EthTxAggregatorLayer { config.clone(), aggregator, input.eth_client.unwrap().0, + eth_client_blobs, self.contracts_config.validator_timelock_addr, self.contracts_config.l1_multicall3_addr, self.contracts_config.diamond_proxy_addr, self.zksync_network_id, - eth_client_blobs_addr, self.settlement_mode, ) .await;