diff --git a/core/lib/circuit_breaker/src/l1_txs.rs b/core/lib/circuit_breaker/src/l1_txs.rs index e2d54ffee94b..5279106637e7 100644 --- a/core/lib/circuit_breaker/src/l1_txs.rs +++ b/core/lib/circuit_breaker/src/l1_txs.rs @@ -17,6 +17,7 @@ impl CircuitBreaker for FailedL1TransactionChecker { .eth_sender_dal() .get_number_of_failed_transactions() .await + .unwrap() > 0 { return Err(CircuitBreakerError::FailedL1Transaction); diff --git a/core/lib/dal/src/blocks_web3_dal.rs b/core/lib/dal/src/blocks_web3_dal.rs index 58f6730570ed..e42a645966ff 100644 --- a/core/lib/dal/src/blocks_web3_dal.rs +++ b/core/lib/dal/src/blocks_web3_dal.rs @@ -30,7 +30,7 @@ pub struct BlocksWeb3Dal<'a, 'c> { } impl BlocksWeb3Dal<'_, '_> { - pub async fn get_sealed_miniblock_number(&mut self) -> Result { + pub async fn get_sealed_miniblock_number(&mut self) -> sqlx::Result { let number = sqlx::query!("SELECT MAX(number) as \"number\" FROM miniblocks") .instrument("get_sealed_block_number") .report_latency() @@ -41,7 +41,7 @@ impl BlocksWeb3Dal<'_, '_> { Ok(MiniblockNumber(number as u32)) } - pub async fn get_sealed_l1_batch_number(&mut self) -> Result { + pub async fn get_sealed_l1_batch_number(&mut self) -> sqlx::Result { let number = sqlx::query!("SELECT MAX(number) as \"number\" FROM l1_batches") .instrument("get_sealed_block_number") .report_latency() @@ -57,7 +57,7 @@ impl BlocksWeb3Dal<'_, '_> { block_id: api::BlockId, include_full_transactions: bool, chain_id: L2ChainId, - ) -> Result>, sqlx::Error> { + ) -> sqlx::Result>> { let transactions_sql = if include_full_transactions { web3_transaction_select_sql() } else { @@ -147,7 +147,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_block_tx_count( &mut self, block_id: api::BlockId, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let query = format!( "SELECT number, l1_tx_count + l2_tx_count AS tx_count FROM miniblocks WHERE {}", web3_block_where_sql(block_id, 1) @@ -166,7 +166,7 @@ impl BlocksWeb3Dal<'_, '_> { &mut self, from_block: MiniblockNumber, limit: usize, - ) -> Result<(Vec, Option), sqlx::Error> { + ) -> sqlx::Result<(Vec, Option)> { let rows = sqlx::query!( "SELECT number, hash FROM miniblocks \ WHERE number > $1 \ @@ -187,7 +187,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_block_headers_after( &mut self, from_block: MiniblockNumber, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let rows = sqlx::query!( "SELECT hash, number, timestamp \ FROM miniblocks \ @@ -224,7 +224,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn resolve_block_id( &mut self, block_id: api::BlockId, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let query_string = match block_id { api::BlockId::Hash(_) => "SELECT number FROM miniblocks WHERE hash = $1".to_owned(), api::BlockId::Number(api::BlockNumber::Number(_)) => { @@ -253,7 +253,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_expected_l1_batch_timestamp( &mut self, l1_batch_number: L1BatchNumber, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let first_miniblock_of_batch = if l1_batch_number.0 == 0 { MiniblockNumber(0) } else { @@ -279,7 +279,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_miniblock_hash( &mut self, block_number: MiniblockNumber, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let hash = sqlx::query!( "SELECT hash FROM miniblocks WHERE number = $1", block_number.0 as i64 @@ -293,7 +293,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_l2_to_l1_logs( &mut self, block_number: L1BatchNumber, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let raw_logs = sqlx::query!( "SELECT l2_to_l1_logs FROM l1_batches WHERE number = $1", block_number.0 as i64 @@ -312,7 +312,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_l1_batch_number_of_miniblock( &mut self, miniblock_number: MiniblockNumber, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let number: Option = sqlx::query!( "SELECT l1_batch_number FROM miniblocks WHERE number = $1", miniblock_number.0 as i64 @@ -327,7 +327,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_miniblock_range_of_l1_batch( &mut self, l1_batch_number: L1BatchNumber, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let row = sqlx::query!( "SELECT MIN(miniblocks.number) as \"min?\", MAX(miniblocks.number) as \"max?\" \ FROM miniblocks \ @@ -349,7 +349,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_l1_batch_info_for_tx( &mut self, tx_hash: H256, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let row = sqlx::query!( "SELECT l1_batch_number, l1_batch_tx_index \ FROM transactions \ @@ -369,19 +369,21 @@ impl BlocksWeb3Dal<'_, '_> { Ok(result) } - pub async fn get_trace_for_miniblock(&mut self, block_number: MiniblockNumber) -> Vec { - sqlx::query_as!( + pub async fn get_trace_for_miniblock( + &mut self, + block_number: MiniblockNumber, + ) -> sqlx::Result> { + Ok(sqlx::query_as!( CallTrace, "SELECT * FROM call_traces WHERE tx_hash IN \ (SELECT hash FROM transactions WHERE miniblock_number = $1)", block_number.0 as i64 ) .fetch_all(self.storage.conn()) - .await - .unwrap() + .await? .into_iter() .map(Call::from) - .collect() + .collect()) } /// Returns `base_fee_per_gas` for miniblock range [min(newest_block - block_count + 1, 0), newest_block] @@ -390,7 +392,7 @@ impl BlocksWeb3Dal<'_, '_> { &mut self, newest_block: MiniblockNumber, block_count: u64, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { let result: Vec<_> = sqlx::query!( "SELECT base_fee_per_gas FROM miniblocks \ WHERE number <= $1 \ @@ -411,7 +413,7 @@ impl BlocksWeb3Dal<'_, '_> { &mut self, block_number: MiniblockNumber, current_operator_address: Address, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { { let storage_block_details = sqlx::query_as!( StorageBlockDetails, @@ -458,7 +460,7 @@ impl BlocksWeb3Dal<'_, '_> { pub async fn get_l1_batch_details( &mut self, l1_batch_number: L1BatchNumber, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { { let l1_batch_details: Option = sqlx::query_as!( StorageL1BatchDetails, @@ -500,7 +502,7 @@ impl BlocksWeb3Dal<'_, '_> { &mut self, migration_start_l1_batch_number: u64, from_virtual_block_number: u64, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { // Since virtual blocks are numerated from `migration_start_l1_batch_number` number and not from 0 // we have to subtract (migration_start_l1_batch_number - 1) from the `from` virtual block // to find miniblock using query below @@ -541,7 +543,7 @@ impl BlocksWeb3Dal<'_, '_> { &mut self, migration_start_l1_batch_number: u64, to_virtual_block_number: u64, - ) -> Result, sqlx::Error> { + ) -> sqlx::Result> { // Since virtual blocks are numerated from `migration_start_l1_batch_number` number and not from 0 // we have to subtract (migration_start_l1_batch_number - 1) from the `to` virtual block // to find miniblock using query below diff --git a/core/lib/dal/src/contract_verification_dal.rs b/core/lib/dal/src/contract_verification_dal.rs index ecef368fbc74..7cbbf226b506 100644 --- a/core/lib/dal/src/contract_verification_dal.rs +++ b/core/lib/dal/src/contract_verification_dal.rs @@ -12,7 +12,6 @@ use zksync_types::{ use sqlx::postgres::types::PgInterval; use crate::models::storage_verification_request::StorageVerificationRequest; -use crate::SqlxError; use crate::StorageProcessor; #[derive(Debug)] @@ -40,59 +39,55 @@ impl Display for Compiler { } impl ContractVerificationDal<'_, '_> { - pub async fn get_count_of_queued_verification_requests(&mut self) -> Result { - { - sqlx::query!( - r#" - SELECT COUNT(*) as "count!" - FROM contract_verification_requests - WHERE status = 'queued' - "# - ) - .fetch_one(self.storage.conn()) - .await - .map(|row| row.count as usize) - } + pub async fn get_count_of_queued_verification_requests(&mut self) -> sqlx::Result { + sqlx::query!( + r#" + SELECT COUNT(*) as "count!" + FROM contract_verification_requests + WHERE status = 'queued' + "# + ) + .fetch_one(self.storage.conn()) + .await + .map(|row| row.count as usize) } pub async fn add_contract_verification_request( &mut self, query: VerificationIncomingRequest, - ) -> Result { - { - sqlx::query!( - " - INSERT INTO contract_verification_requests ( - contract_address, - source_code, - contract_name, - zk_compiler_version, - compiler_version, - optimization_used, - optimizer_mode, - constructor_arguments, - is_system, - status, - created_at, - updated_at - ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'queued', now(), now()) - RETURNING id - ", - query.contract_address.as_bytes(), - serde_json::to_string(&query.source_code_data).unwrap(), - query.contract_name, - query.compiler_versions.zk_compiler_version(), - query.compiler_versions.compiler_version(), - query.optimization_used, - query.optimizer_mode, - query.constructor_arguments.0, - query.is_system, + ) -> sqlx::Result { + sqlx::query!( + " + INSERT INTO contract_verification_requests ( + contract_address, + source_code, + contract_name, + zk_compiler_version, + compiler_version, + optimization_used, + optimizer_mode, + constructor_arguments, + is_system, + status, + created_at, + updated_at ) - .fetch_one(self.storage.conn()) - .await - .map(|row| row.id as usize) - } + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'queued', now(), now()) + RETURNING id + ", + query.contract_address.as_bytes(), + serde_json::to_string(&query.source_code_data).unwrap(), + query.contract_name, + query.compiler_versions.zk_compiler_version(), + query.compiler_versions.compiler_version(), + query.optimization_used, + query.optimizer_mode, + query.constructor_arguments.0, + query.is_system, + ) + .fetch_one(self.storage.conn()) + .await + .map(|row| row.id as usize) } /// Returns the next verification request for processing. @@ -102,77 +97,73 @@ impl ContractVerificationDal<'_, '_> { pub async fn get_next_queued_verification_request( &mut self, processing_timeout: Duration, - ) -> Result, SqlxError> { - { - let processing_timeout = PgInterval { - months: 0, - days: 0, - microseconds: processing_timeout.as_micros() as i64, - }; - let result = sqlx::query_as!( - StorageVerificationRequest, - "UPDATE contract_verification_requests - SET status = 'in_progress', attempts = attempts + 1, - updated_at = now(), processing_started_at = now() - WHERE id = ( - SELECT id FROM contract_verification_requests - WHERE status = 'queued' OR (status = 'in_progress' AND processing_started_at < now() - $1::interval) - ORDER BY created_at - LIMIT 1 - FOR UPDATE - SKIP LOCKED - ) - RETURNING id, contract_address, source_code, contract_name, zk_compiler_version, compiler_version, optimization_used, - optimizer_mode, constructor_arguments, is_system - ", - &processing_timeout + ) -> sqlx::Result> { + let processing_timeout = PgInterval { + months: 0, + days: 0, + microseconds: processing_timeout.as_micros() as i64, + }; + let result = sqlx::query_as!( + StorageVerificationRequest, + "UPDATE contract_verification_requests + SET status = 'in_progress', attempts = attempts + 1, + updated_at = now(), processing_started_at = now() + WHERE id = ( + SELECT id FROM contract_verification_requests + WHERE status = 'queued' OR (status = 'in_progress' AND processing_started_at < now() - $1::interval) + ORDER BY created_at + LIMIT 1 + FOR UPDATE + SKIP LOCKED ) - .fetch_optional(self.storage.conn()) - .await? - .map(Into::into); - Ok(result) - } + RETURNING id, contract_address, source_code, contract_name, zk_compiler_version, compiler_version, optimization_used, + optimizer_mode, constructor_arguments, is_system + ", + &processing_timeout + ) + .fetch_optional(self.storage.conn()) + .await? + .map(Into::into); + Ok(result) } /// Updates the verification request status and inserts the verification info upon successful verification. pub async fn save_verification_info( &mut self, verification_info: VerificationInfo, - ) -> Result<(), SqlxError> { - { - let mut transaction = self.storage.start_transaction().await.unwrap(); - - sqlx::query!( - " - UPDATE contract_verification_requests - SET status = 'successful', updated_at = now() - WHERE id = $1 - ", - verification_info.request.id as i64, - ) - .execute(transaction.conn()) - .await?; - - let address = verification_info.request.req.contract_address; - let verification_info_json = serde_json::to_value(verification_info) - .expect("Failed to serialize verification info into serde_json"); - sqlx::query!( - " - INSERT INTO contracts_verification_info - (address, verification_info) - VALUES ($1, $2) - ON CONFLICT (address) - DO UPDATE SET verification_info = $2 - ", - address.as_bytes(), - &verification_info_json - ) - .execute(transaction.conn()) - .await?; + ) -> sqlx::Result<()> { + let mut transaction = self.storage.start_transaction().await.unwrap(); + + sqlx::query!( + " + UPDATE contract_verification_requests + SET status = 'successful', updated_at = now() + WHERE id = $1 + ", + verification_info.request.id as i64, + ) + .execute(transaction.conn()) + .await?; - transaction.commit().await.unwrap(); - Ok(()) - } + let address = verification_info.request.req.contract_address; + let verification_info_json = serde_json::to_value(verification_info) + .expect("Failed to serialize verification info into serde_json"); + sqlx::query!( + " + INSERT INTO contracts_verification_info + (address, verification_info) + VALUES ($1, $2) + ON CONFLICT (address) + DO UPDATE SET verification_info = $2 + ", + address.as_bytes(), + &verification_info_json + ) + .execute(transaction.conn()) + .await?; + + transaction.commit().await.unwrap(); + Ok(()) } pub async fn save_verification_error( @@ -181,159 +172,146 @@ impl ContractVerificationDal<'_, '_> { error: String, compilation_errors: serde_json::Value, panic_message: Option, - ) -> Result<(), SqlxError> { - { - sqlx::query!( - " - UPDATE contract_verification_requests - SET status = 'failed', updated_at = now(), error = $2, compilation_errors = $3, panic_message = $4 - WHERE id = $1 - ", - id as i64, - error.as_str(), - &compilation_errors, - panic_message - ) - .execute(self.storage.conn()) - .await?; - Ok(()) - } + ) -> sqlx::Result<()> { + sqlx::query!( + " + UPDATE contract_verification_requests + SET status = 'failed', updated_at = now(), error = $2, compilation_errors = $3, panic_message = $4 + WHERE id = $1 + ", + id as i64, + error.as_str(), + &compilation_errors, + panic_message + ) + .execute(self.storage.conn()) + .await?; + Ok(()) } pub async fn get_verification_request_status( &mut self, id: usize, - ) -> Result, SqlxError> { - { - let result = sqlx::query!( - " - SELECT status, error, compilation_errors FROM contract_verification_requests - WHERE id = $1 - ", - id as i64, - ) - .fetch_optional(self.storage.conn()) - .await? - .map(|row| VerificationRequestStatus { - status: row.status, - error: row.error, - compilation_errors: row - .compilation_errors - .and_then(|errors: serde_json::Value| { - let string_array: Vec = errors - .as_array() - .unwrap() - .iter() - .map(|value| value.as_str().unwrap().to_string()) - .collect(); - if string_array.is_empty() { - None - } else { - Some(string_array) - } - }), - }); - Ok(result) - } + ) -> sqlx::Result> { + let result = sqlx::query!( + " + SELECT status, error, compilation_errors FROM contract_verification_requests + WHERE id = $1 + ", + id as i64, + ) + .fetch_optional(self.storage.conn()) + .await? + .map(|row| VerificationRequestStatus { + status: row.status, + error: row.error, + compilation_errors: row + .compilation_errors + .and_then(|errors: serde_json::Value| { + let string_array: Vec = errors + .as_array() + .unwrap() + .iter() + .map(|value| value.as_str().unwrap().to_string()) + .collect(); + if string_array.is_empty() { + None + } else { + Some(string_array) + } + }), + }); + Ok(result) } /// Returns bytecode and calldata from the contract and the transaction that created it. pub async fn get_contract_info_for_verification( &mut self, address: Address, - ) -> Result, DeployContractCalldata)>, SqlxError> { - { - let hashed_key = get_code_key(&address).hashed_key(); - let result = sqlx::query!( - r#" - SELECT factory_deps.bytecode, transactions.data as "data?", transactions.contract_address as "contract_address?" - FROM ( - SELECT * FROM storage_logs - WHERE storage_logs.hashed_key = $1 - ORDER BY miniblock_number DESC, operation_number DESC - LIMIT 1 - ) storage_logs - JOIN factory_deps ON factory_deps.bytecode_hash = storage_logs.value - LEFT JOIN transactions ON transactions.hash = storage_logs.tx_hash - WHERE storage_logs.value != $2 - "#, - hashed_key.as_bytes(), - FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH.as_bytes() - ) - .fetch_optional(self.storage.conn()) - .await? - .map(|row| { - let calldata = match row.contract_address { - Some(contract_address) - if contract_address == CONTRACT_DEPLOYER_ADDRESS.0.to_vec() => - { - // `row.contract_address` and `row.data` are either both `None` or both `Some(_)`. - // In this arm it's checked that `row.contract_address` is `Some(_)`, so it's safe to unwrap `row.data`. - let data: serde_json::Value = row.data.unwrap(); - let calldata_str: String = - serde_json::from_value(data.get("calldata").unwrap().clone()).unwrap(); - let calldata = hex::decode(&calldata_str[2..]).unwrap(); - DeployContractCalldata::Deploy(calldata) - } - _ => DeployContractCalldata::Ignore, - }; - (row.bytecode, calldata) - }); - Ok(result) - } + ) -> sqlx::Result, DeployContractCalldata)>> { + let hashed_key = get_code_key(&address).hashed_key(); + let result = sqlx::query!( + r#" + SELECT factory_deps.bytecode, transactions.data as "data?", transactions.contract_address as "contract_address?" + FROM ( + SELECT * FROM storage_logs + WHERE storage_logs.hashed_key = $1 + ORDER BY miniblock_number DESC, operation_number DESC + LIMIT 1 + ) storage_logs + JOIN factory_deps ON factory_deps.bytecode_hash = storage_logs.value + LEFT JOIN transactions ON transactions.hash = storage_logs.tx_hash + WHERE storage_logs.value != $2 + "#, + hashed_key.as_bytes(), + FAILED_CONTRACT_DEPLOYMENT_BYTECODE_HASH.as_bytes() + ) + .fetch_optional(self.storage.conn()) + .await? + .map(|row| { + let calldata = match row.contract_address { + Some(contract_address) + if contract_address == CONTRACT_DEPLOYER_ADDRESS.0.to_vec() => + { + // `row.contract_address` and `row.data` are either both `None` or both `Some(_)`. + // In this arm it's checked that `row.contract_address` is `Some(_)`, so it's safe to unwrap `row.data`. + let data: serde_json::Value = row.data.unwrap(); + let calldata_str: String = + serde_json::from_value(data.get("calldata").unwrap().clone()).unwrap(); + let calldata = hex::decode(&calldata_str[2..]).unwrap(); + DeployContractCalldata::Deploy(calldata) + } + _ => DeployContractCalldata::Ignore, + }; + (row.bytecode, calldata) + }); + Ok(result) } /// Returns true if the contract has a stored contracts_verification_info. pub async fn is_contract_verified(&mut self, address: Address) -> bool { - { - let count = sqlx::query!( - r#" - SELECT COUNT(*) as "count!" - FROM contracts_verification_info - WHERE address = $1 - "#, - address.as_bytes() - ) - .fetch_one(self.storage.conn()) - .await - .unwrap() - .count; - count > 0 - } + let count = sqlx::query!( + r#" + SELECT COUNT(*) as "count!" + FROM contracts_verification_info + WHERE address = $1 + "#, + address.as_bytes() + ) + .fetch_one(self.storage.conn()) + .await + .unwrap() + .count; + count > 0 } - async fn get_compiler_versions( - &mut self, - compiler: Compiler, - ) -> Result, SqlxError> { - { - let compiler = format!("{compiler}"); - let versions: Vec<_> = sqlx::query!( - "SELECT version FROM compiler_versions WHERE compiler = $1 ORDER by version", - &compiler - ) - .fetch_all(self.storage.conn()) - .await? - .into_iter() - .map(|row| row.version) - .collect(); - Ok(versions) - } + async fn get_compiler_versions(&mut self, compiler: Compiler) -> sqlx::Result> { + let compiler = format!("{compiler}"); + let versions: Vec<_> = sqlx::query!( + "SELECT version FROM compiler_versions WHERE compiler = $1 ORDER by version", + &compiler + ) + .fetch_all(self.storage.conn()) + .await? + .into_iter() + .map(|row| row.version) + .collect(); + Ok(versions) } - pub async fn get_zksolc_versions(&mut self) -> Result, SqlxError> { + pub async fn get_zksolc_versions(&mut self) -> sqlx::Result> { self.get_compiler_versions(Compiler::ZkSolc).await } - pub async fn get_solc_versions(&mut self) -> Result, SqlxError> { + pub async fn get_solc_versions(&mut self) -> sqlx::Result> { self.get_compiler_versions(Compiler::Solc).await } - pub async fn get_zkvyper_versions(&mut self) -> Result, SqlxError> { + pub async fn get_zkvyper_versions(&mut self) -> sqlx::Result> { self.get_compiler_versions(Compiler::ZkVyper).await } - pub async fn get_vyper_versions(&mut self) -> Result, SqlxError> { + pub async fn get_vyper_versions(&mut self) -> sqlx::Result> { self.get_compiler_versions(Compiler::Vyper).await } @@ -341,78 +319,72 @@ impl ContractVerificationDal<'_, '_> { &mut self, compiler: Compiler, versions: Vec, - ) -> Result<(), SqlxError> { - { - let mut transaction = self.storage.start_transaction().await.unwrap(); - let compiler = format!("{compiler}"); - - sqlx::query!( - "DELETE FROM compiler_versions WHERE compiler = $1", - &compiler - ) - .execute(transaction.conn()) - .await?; - - sqlx::query!( - " - INSERT INTO compiler_versions (version, compiler, created_at, updated_at) - SELECT u.version, $2, now(), now() - FROM UNNEST($1::text[]) - AS u(version) - ON CONFLICT (version, compiler) DO NOTHING", - &versions, - &compiler, - ) - .execute(transaction.conn()) - .await?; + ) -> sqlx::Result<()> { + let mut transaction = self.storage.start_transaction().await.unwrap(); + let compiler = format!("{compiler}"); - transaction.commit().await.unwrap(); - Ok(()) - } + sqlx::query!( + "DELETE FROM compiler_versions WHERE compiler = $1", + &compiler + ) + .execute(transaction.conn()) + .await?; + + sqlx::query!( + " + INSERT INTO compiler_versions (version, compiler, created_at, updated_at) + SELECT u.version, $2, now(), now() + FROM UNNEST($1::text[]) + AS u(version) + ON CONFLICT (version, compiler) DO NOTHING", + &versions, + &compiler, + ) + .execute(transaction.conn()) + .await?; + + transaction.commit().await.unwrap(); + Ok(()) } - pub async fn set_zksolc_versions(&mut self, versions: Vec) -> Result<(), SqlxError> { + pub async fn set_zksolc_versions(&mut self, versions: Vec) -> sqlx::Result<()> { self.set_compiler_versions(Compiler::ZkSolc, versions).await } - pub async fn set_solc_versions(&mut self, versions: Vec) -> Result<(), SqlxError> { + pub async fn set_solc_versions(&mut self, versions: Vec) -> sqlx::Result<()> { self.set_compiler_versions(Compiler::Solc, versions).await } - pub async fn set_zkvyper_versions(&mut self, versions: Vec) -> Result<(), SqlxError> { + pub async fn set_zkvyper_versions(&mut self, versions: Vec) -> sqlx::Result<()> { self.set_compiler_versions(Compiler::ZkVyper, versions) .await } - pub async fn set_vyper_versions(&mut self, versions: Vec) -> Result<(), SqlxError> { + pub async fn set_vyper_versions(&mut self, versions: Vec) -> sqlx::Result<()> { self.set_compiler_versions(Compiler::Vyper, versions).await } - pub async fn get_all_successful_requests( - &mut self, - ) -> Result, SqlxError> { - { - let result = sqlx::query_as!( - StorageVerificationRequest, - "SELECT id, contract_address, source_code, contract_name, zk_compiler_version, compiler_version, optimization_used, - optimizer_mode, constructor_arguments, is_system - FROM contract_verification_requests - WHERE status = 'successful' - ORDER BY id", - ) - .fetch_all(self.storage.conn()) - .await? - .into_iter() - .map(Into::into) - .collect(); - Ok(result) - } + pub async fn get_all_successful_requests(&mut self) -> sqlx::Result> { + let result = sqlx::query_as!( + StorageVerificationRequest, + "SELECT id, contract_address, source_code, contract_name, zk_compiler_version, compiler_version, optimization_used, + optimizer_mode, constructor_arguments, is_system + FROM contract_verification_requests + WHERE status = 'successful' + ORDER BY id", + ) + .fetch_all(self.storage.conn()) + .await? + .into_iter() + .map(Into::into) + .collect(); + Ok(result) } pub async fn get_contract_verification_info( &mut self, address: Address, - ) -> Result, SqlxError> { + ) -> sqlx::Result> { let row = sqlx::query!( "SELECT verification_info FROM contracts_verification_info WHERE address = $1", address.as_bytes(), diff --git a/core/lib/dal/src/eth_sender_dal.rs b/core/lib/dal/src/eth_sender_dal.rs index 14e35430e050..413ae647beee 100644 --- a/core/lib/dal/src/eth_sender_dal.rs +++ b/core/lib/dal/src/eth_sender_dal.rs @@ -2,6 +2,7 @@ use crate::models::storage_eth_tx::{ L1BatchEthSenderStats, StorageEthTx, StorageTxHistory, StorageTxHistoryToSend, }; use crate::StorageProcessor; +use anyhow::Context as _; use sqlx::{ types::chrono::{DateTime, Utc}, Row, @@ -18,122 +19,108 @@ pub struct EthSenderDal<'a, 'c> { } impl EthSenderDal<'_, '_> { - pub async fn get_inflight_txs(&mut self) -> Vec { - { - let txs = sqlx::query_as!( - StorageEthTx, - "SELECT * FROM eth_txs WHERE confirmed_eth_tx_history_id IS NULL - AND id <= (SELECT COALESCE(MAX(eth_tx_id), 0) FROM eth_txs_history WHERE sent_at_block IS NOT NULL) - ORDER BY id" - ) - .fetch_all(self.storage.conn()) - .await - .unwrap(); - txs.into_iter().map(|tx| tx.into()).collect() - } + pub async fn get_inflight_txs(&mut self) -> sqlx::Result> { + let txs = sqlx::query_as!( + StorageEthTx, + "SELECT * FROM eth_txs WHERE confirmed_eth_tx_history_id IS NULL + AND id <= (SELECT COALESCE(MAX(eth_tx_id), 0) FROM eth_txs_history WHERE sent_at_block IS NOT NULL) + ORDER BY id" + ) + .fetch_all(self.storage.conn()) + .await?; + Ok(txs.into_iter().map(|tx| tx.into()).collect()) } - pub async fn get_eth_l1_batches(&mut self) -> L1BatchEthSenderStats { - { - let mut stats = L1BatchEthSenderStats::default(); - for tx_type in ["execute_tx", "commit_tx", "prove_tx"] { - let mut records= sqlx::query(&format!( - "SELECT number as number, true as confirmed FROM l1_batches - INNER JOIN eth_txs_history ON (l1_batches.eth_{}_id = eth_txs_history.eth_tx_id) - WHERE eth_txs_history.confirmed_at IS NOT NULL - ORDER BY number DESC - LIMIT 1", - tx_type - )) - .fetch_all(self.storage.conn()) - .await - .unwrap(); - - records.extend(sqlx::query(&format!( - "SELECT number as number, false as confirmed FROM l1_batches + pub async fn get_eth_l1_batches(&mut self) -> sqlx::Result { + let mut stats = L1BatchEthSenderStats::default(); + for tx_type in ["execute_tx", "commit_tx", "prove_tx"] { + let mut records= sqlx::query(&format!( + "SELECT number as number, true as confirmed FROM l1_batches INNER JOIN eth_txs_history ON (l1_batches.eth_{}_id = eth_txs_history.eth_tx_id) + WHERE eth_txs_history.confirmed_at IS NOT NULL ORDER BY number DESC LIMIT 1", tx_type )) .fetch_all(self.storage.conn()) - .await - .unwrap()); - - for record in records { - let batch_number = L1BatchNumber(record.get::("number") as u32); - let aggregation_action = match tx_type { - "execute_tx" => AggregatedActionType::Execute, - "commit_tx" => AggregatedActionType::Commit, - "prove_tx" => AggregatedActionType::PublishProofOnchain, - _ => unreachable!(), - }; - if record.get::("confirmed") { - stats.mined.push((aggregation_action, batch_number)); - } else { - stats.saved.push((aggregation_action, batch_number)); - } + .await?; + + records.extend( + sqlx::query(&format!( + "SELECT number as number, false as confirmed FROM l1_batches + INNER JOIN eth_txs_history ON (l1_batches.eth_{}_id = eth_txs_history.eth_tx_id) + ORDER BY number DESC + LIMIT 1", + tx_type + )) + .fetch_all(self.storage.conn()) + .await?, + ); + + for record in records { + let batch_number = L1BatchNumber(record.get::("number") as u32); + let aggregation_action = match tx_type { + "execute_tx" => AggregatedActionType::Execute, + "commit_tx" => AggregatedActionType::Commit, + "prove_tx" => AggregatedActionType::PublishProofOnchain, + _ => unreachable!(), + }; + if record.get::("confirmed") { + stats.mined.push((aggregation_action, batch_number)); + } else { + stats.saved.push((aggregation_action, batch_number)); } } - stats } + Ok(stats) } - pub async fn get_eth_tx(&mut self, eth_tx_id: u32) -> Option { - { - sqlx::query_as!( - StorageEthTx, - "SELECT * FROM eth_txs WHERE id = $1", - eth_tx_id as i32 - ) - .fetch_optional(self.storage.conn()) - .await - .unwrap() - .map(Into::into) - } + pub async fn get_eth_tx(&mut self, eth_tx_id: u32) -> sqlx::Result> { + Ok(sqlx::query_as!( + StorageEthTx, + "SELECT * FROM eth_txs WHERE id = $1", + eth_tx_id as i32 + ) + .fetch_optional(self.storage.conn()) + .await? + .map(Into::into)) } - pub async fn get_new_eth_txs(&mut self, limit: u64) -> Vec { - { - let txs = sqlx::query_as!( - StorageEthTx, - r#"SELECT * FROM eth_txs - WHERE id > (SELECT COALESCE(MAX(eth_tx_id), 0) FROM eth_txs_history) - ORDER BY id - LIMIT $1 - "#, - limit as i64 - ) - .fetch_all(self.storage.conn()) - .await - .unwrap(); - txs.into_iter().map(|tx| tx.into()).collect() - } + pub async fn get_new_eth_txs(&mut self, limit: u64) -> sqlx::Result> { + let txs = sqlx::query_as!( + StorageEthTx, + r#"SELECT * FROM eth_txs + WHERE id > (SELECT COALESCE(MAX(eth_tx_id), 0) FROM eth_txs_history) + ORDER BY id + LIMIT $1 + "#, + limit as i64 + ) + .fetch_all(self.storage.conn()) + .await?; + Ok(txs.into_iter().map(|tx| tx.into()).collect()) } - pub async fn get_unsent_txs(&mut self) -> Vec { - { - let txs = sqlx::query_as!( - StorageTxHistoryToSend, - r#" - SELECT - eth_txs_history.id, - eth_txs_history.eth_tx_id, - eth_txs_history.tx_hash, - eth_txs_history.base_fee_per_gas, - eth_txs_history.priority_fee_per_gas, - eth_txs_history.signed_raw_tx, - eth_txs.nonce - FROM eth_txs_history - JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id - WHERE eth_txs_history.sent_at_block IS NULL AND eth_txs.confirmed_eth_tx_history_id IS NULL - ORDER BY eth_txs_history.id DESC"#, - ) - .fetch_all(self.storage.conn()) - .await - .unwrap(); - txs.into_iter().map(|tx| tx.into()).collect() - } + pub async fn get_unsent_txs(&mut self) -> sqlx::Result> { + let txs = sqlx::query_as!( + StorageTxHistoryToSend, + r#" + SELECT + eth_txs_history.id, + eth_txs_history.eth_tx_id, + eth_txs_history.tx_hash, + eth_txs_history.base_fee_per_gas, + eth_txs_history.priority_fee_per_gas, + eth_txs_history.signed_raw_tx, + eth_txs.nonce + FROM eth_txs_history + JOIN eth_txs ON eth_txs.id = eth_txs_history.eth_tx_id + WHERE eth_txs_history.sent_at_block IS NULL AND eth_txs.confirmed_eth_tx_history_id IS NULL + ORDER BY eth_txs_history.id DESC"#, + ) + .fetch_all(self.storage.conn()) + .await?; + Ok(txs.into_iter().map(|tx| tx.into()).collect()) } pub async fn save_eth_tx( @@ -143,10 +130,9 @@ impl EthSenderDal<'_, '_> { tx_type: AggregatedActionType, contract_address: Address, predicted_gas_cost: u32, - ) -> EthTx { - { - let address = format!("{:#x}", contract_address); - let eth_tx = sqlx::query_as!( + ) -> sqlx::Result { + let address = format!("{:#x}", contract_address); + let eth_tx = sqlx::query_as!( StorageEthTx, "INSERT INTO eth_txs (raw_tx, nonce, tx_type, contract_address, predicted_gas_cost, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, now(), now()) @@ -158,10 +144,8 @@ impl EthSenderDal<'_, '_> { predicted_gas_cost as i64 ) .fetch_one(self.storage.conn()) - .await - .unwrap(); - eth_tx.into() - } + .await?; + Ok(eth_tx.into()) } pub async fn insert_tx_history( @@ -171,109 +155,108 @@ impl EthSenderDal<'_, '_> { priority_fee_per_gas: u64, tx_hash: H256, raw_signed_tx: Vec, - ) -> Option { - { - let priority_fee_per_gas = - i64::try_from(priority_fee_per_gas).expect("Can't convert U256 to i64"); - let base_fee_per_gas = - i64::try_from(base_fee_per_gas).expect("Can't convert U256 to i64"); - let tx_hash = format!("{:#x}", tx_hash); - - sqlx::query!( - "INSERT INTO eth_txs_history - (eth_tx_id, base_fee_per_gas, priority_fee_per_gas, tx_hash, signed_raw_tx, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, now(), now()) - ON CONFLICT (tx_hash) DO NOTHING - RETURNING id", - eth_tx_id as u32, - base_fee_per_gas, - priority_fee_per_gas, - tx_hash, - raw_signed_tx - ) - .fetch_optional(self.storage.conn()) - .await - .unwrap() - .map(|row| row.id as u32) - } + ) -> sqlx::Result> { + let priority_fee_per_gas = + i64::try_from(priority_fee_per_gas).expect("Can't convert U256 to i64"); + let base_fee_per_gas = i64::try_from(base_fee_per_gas).expect("Can't convert U256 to i64"); + let tx_hash = format!("{:#x}", tx_hash); + + Ok(sqlx::query!( + "INSERT INTO eth_txs_history + (eth_tx_id, base_fee_per_gas, priority_fee_per_gas, tx_hash, signed_raw_tx, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, now(), now()) + ON CONFLICT (tx_hash) DO NOTHING + RETURNING id", + eth_tx_id as u32, + base_fee_per_gas, + priority_fee_per_gas, + tx_hash, + raw_signed_tx + ) + .fetch_optional(self.storage.conn()) + .await? + .map(|row| row.id as u32)) } - pub async fn set_sent_at_block(&mut self, eth_txs_history_id: u32, sent_at_block: u32) { - { - sqlx::query!( - "UPDATE eth_txs_history SET sent_at_block = $2, sent_at = now() - WHERE id = $1 AND sent_at_block IS NULL", - eth_txs_history_id as i32, - sent_at_block as i32 - ) - .execute(self.storage.conn()) - .await - .unwrap(); - } + pub async fn set_sent_at_block( + &mut self, + eth_txs_history_id: u32, + sent_at_block: u32, + ) -> sqlx::Result<()> { + sqlx::query!( + "UPDATE eth_txs_history SET sent_at_block = $2, sent_at = now() + WHERE id = $1 AND sent_at_block IS NULL", + eth_txs_history_id as i32, + sent_at_block as i32 + ) + .execute(self.storage.conn()) + .await?; + Ok(()) } - pub async fn remove_tx_history(&mut self, eth_txs_history_id: u32) { - { - sqlx::query!( - "DELETE FROM eth_txs_history - WHERE id = $1", - eth_txs_history_id as i64 - ) - .execute(self.storage.conn()) - .await - .unwrap(); - } + pub async fn remove_tx_history(&mut self, eth_txs_history_id: u32) -> sqlx::Result<()> { + sqlx::query!( + "DELETE FROM eth_txs_history + WHERE id = $1", + eth_txs_history_id as i64 + ) + .execute(self.storage.conn()) + .await?; + Ok(()) } - pub async fn confirm_tx(&mut self, tx_hash: H256, gas_used: U256) { - { - let mut transaction = self.storage.start_transaction().await.unwrap(); - let gas_used = i64::try_from(gas_used).expect("Can't convert U256 to i64"); - let tx_hash = format!("{:#x}", tx_hash); - let ids = sqlx::query!( - "UPDATE eth_txs_history - SET updated_at = now(), confirmed_at = now() - WHERE tx_hash = $1 - RETURNING id, eth_tx_id", - tx_hash, - ) - .fetch_one(transaction.conn()) + pub async fn confirm_tx(&mut self, tx_hash: H256, gas_used: U256) -> anyhow::Result<()> { + let mut transaction = self + .storage + .start_transaction() .await - .unwrap(); - - sqlx::query!( - "UPDATE eth_txs - SET gas_used = $1, confirmed_eth_tx_history_id = $2 - WHERE id = $3", - gas_used, - ids.id, - ids.eth_tx_id - ) - .execute(transaction.conn()) - .await - .unwrap(); + .context("start_transaction()")?; + let gas_used = i64::try_from(gas_used) + .map_err(|err| anyhow::anyhow!("Can't convert U256 to i64: {err}"))?; + let tx_hash = format!("{:#x}", tx_hash); + let ids = sqlx::query!( + "UPDATE eth_txs_history + SET updated_at = now(), confirmed_at = now() + WHERE tx_hash = $1 + RETURNING id, eth_tx_id", + tx_hash, + ) + .fetch_one(transaction.conn()) + .await?; + + sqlx::query!( + "UPDATE eth_txs + SET gas_used = $1, confirmed_eth_tx_history_id = $2 + WHERE id = $3", + gas_used, + ids.id, + ids.eth_tx_id + ) + .execute(transaction.conn()) + .await?; - transaction.commit().await.unwrap(); - } + transaction.commit().await?; + Ok(()) } - pub async fn get_confirmed_tx_hash_by_eth_tx_id(&mut self, eth_tx_id: u32) -> Option { - { - let tx_hash = sqlx::query!( - "SELECT tx_hash FROM eth_txs_history - WHERE eth_tx_id = $1 AND confirmed_at IS NOT NULL", - eth_tx_id as i64 - ) - .fetch_optional(self.storage.conn()) - .await - .unwrap(); - - tx_hash.map(|tx_hash| { - let tx_hash = tx_hash.tx_hash; - let tx_hash = tx_hash.trim_start_matches("0x"); - H256::from_str(tx_hash).unwrap() - }) - } + pub async fn get_confirmed_tx_hash_by_eth_tx_id( + &mut self, + eth_tx_id: u32, + ) -> anyhow::Result> { + let tx_hash = sqlx::query!( + "SELECT tx_hash FROM eth_txs_history + WHERE eth_tx_id = $1 AND confirmed_at IS NOT NULL", + eth_tx_id as i64 + ) + .fetch_optional(self.storage.conn()) + .await?; + + let Some(tx_hash) = tx_hash else { + return Ok(None); + }; + let tx_hash = tx_hash.tx_hash; + let tx_hash = tx_hash.trim_start_matches("0x"); + Ok(Some(H256::from_str(tx_hash).context("invalid tx_hash")?)) } /// This method inserts a fake transaction into the database that would make the corresponding L1 batch @@ -292,162 +275,150 @@ impl EthSenderDal<'_, '_> { tx_type: AggregatedActionType, tx_hash: H256, confirmed_at: DateTime, - ) { - { - let mut transaction = self.storage.start_transaction().await.unwrap(); - let tx_hash = format!("{:#x}", tx_hash); + ) -> anyhow::Result<()> { + let mut transaction = self + .storage + .start_transaction() + .await + .context("start_transaction")?; + let tx_hash = format!("{:#x}", tx_hash); + + let eth_tx_id = sqlx::query_scalar!( + "SELECT eth_txs.id FROM eth_txs_history JOIN eth_txs + ON eth_txs.confirmed_eth_tx_history_id = eth_txs_history.id + WHERE eth_txs_history.tx_hash = $1", + tx_hash + ) + .fetch_optional(transaction.conn()) + .await?; + // Check if the transaction with the corresponding hash already exists. + let eth_tx_id = if let Some(eth_tx_id) = eth_tx_id { + eth_tx_id + } else { + // No such transaction in the database yet, we have to insert it. + + // Insert general tx descriptor. let eth_tx_id = sqlx::query_scalar!( - "SELECT eth_txs.id FROM eth_txs_history JOIN eth_txs - ON eth_txs.confirmed_eth_tx_history_id = eth_txs_history.id - WHERE eth_txs_history.tx_hash = $1", - tx_hash + "INSERT INTO eth_txs (raw_tx, nonce, tx_type, contract_address, predicted_gas_cost, created_at, updated_at) + VALUES ('\\x00', 0, $1, '', 0, now(), now()) + RETURNING id", + tx_type.to_string() ) - .fetch_optional(transaction.conn()) - .await - .unwrap(); + .fetch_one(transaction.conn()) + .await?; - // Check if the transaction with the corresponding hash already exists. - let eth_tx_id = if let Some(eth_tx_id) = eth_tx_id { - eth_tx_id - } else { - // No such transaction in the database yet, we have to insert it. - - // Insert general tx descriptor. - let eth_tx_id = sqlx::query_scalar!( - "INSERT INTO eth_txs (raw_tx, nonce, tx_type, contract_address, predicted_gas_cost, created_at, updated_at) - VALUES ('\\x00', 0, $1, '', 0, now(), now()) - RETURNING id", - tx_type.to_string() - ) - .fetch_one(transaction.conn()) - .await - .unwrap(); - - // Insert a "sent transaction". - let eth_history_id = sqlx::query_scalar!( - "INSERT INTO eth_txs_history - (eth_tx_id, base_fee_per_gas, priority_fee_per_gas, tx_hash, signed_raw_tx, created_at, updated_at, confirmed_at) - VALUES ($1, 0, 0, $2, '\\x00', now(), now(), $3) - RETURNING id", - eth_tx_id, - tx_hash, - confirmed_at.naive_utc() - ) - .fetch_one(transaction.conn()) - .await - .unwrap(); - - // Mark general entry as confirmed. - sqlx::query!( - "UPDATE eth_txs - SET confirmed_eth_tx_history_id = $1 - WHERE id = $2", - eth_history_id, - eth_tx_id - ) - .execute(transaction.conn()) - .await - .unwrap(); + // Insert a "sent transaction". + let eth_history_id = sqlx::query_scalar!( + "INSERT INTO eth_txs_history + (eth_tx_id, base_fee_per_gas, priority_fee_per_gas, tx_hash, signed_raw_tx, created_at, updated_at, confirmed_at) + VALUES ($1, 0, 0, $2, '\\x00', now(), now(), $3) + RETURNING id", + eth_tx_id, + tx_hash, + confirmed_at.naive_utc() + ) + .fetch_one(transaction.conn()) + .await?; + // Mark general entry as confirmed. + sqlx::query!( + "UPDATE eth_txs + SET confirmed_eth_tx_history_id = $1 + WHERE id = $2", + eth_history_id, eth_tx_id - }; + ) + .execute(transaction.conn()) + .await?; - // Tie the ETH tx to the L1 batch. - super::BlocksDal { - storage: &mut transaction, - } - .set_eth_tx_id(l1_batch..=l1_batch, eth_tx_id as u32, tx_type) - .await - .unwrap(); + eth_tx_id + }; - transaction.commit().await.unwrap(); + // Tie the ETH tx to the L1 batch. + super::BlocksDal { + storage: &mut transaction, } + .set_eth_tx_id(l1_batch..=l1_batch, eth_tx_id as u32, tx_type) + .await + .context("set_eth_tx_id()")?; + + transaction.commit().await.context("commit()") } - pub async fn get_tx_history_to_check(&mut self, eth_tx_id: u32) -> Vec { - { - let tx_history = sqlx::query_as!( - StorageTxHistory, - "SELECT * FROM eth_txs_history WHERE eth_tx_id = $1 ORDER BY created_at DESC", - eth_tx_id as i32 - ) - .fetch_all(self.storage.conn()) - .await - .unwrap(); - tx_history.into_iter().map(|tx| tx.into()).collect() - } + pub async fn get_tx_history_to_check( + &mut self, + eth_tx_id: u32, + ) -> sqlx::Result> { + let tx_history = sqlx::query_as!( + StorageTxHistory, + "SELECT * FROM eth_txs_history WHERE eth_tx_id = $1 ORDER BY created_at DESC", + eth_tx_id as i32 + ) + .fetch_all(self.storage.conn()) + .await?; + Ok(tx_history.into_iter().map(|tx| tx.into()).collect()) } - pub async fn get_block_number_on_first_sent_attempt(&mut self, eth_tx_id: u32) -> Option { - { - let sent_at_block = sqlx::query_scalar!( + pub async fn get_block_number_on_first_sent_attempt( + &mut self, + eth_tx_id: u32, + ) -> sqlx::Result> { + let sent_at_block = sqlx::query_scalar!( "SELECT sent_at_block FROM eth_txs_history WHERE eth_tx_id = $1 AND sent_at_block IS NOT NULL ORDER BY created_at ASC LIMIT 1", eth_tx_id as i32 ) - .fetch_optional(self.storage.conn()) - .await - .unwrap(); - sent_at_block.flatten().map(|block| block as u32) - } + .fetch_optional(self.storage.conn()) + .await?; + Ok(sent_at_block.flatten().map(|block| block as u32)) } - pub async fn get_last_sent_eth_tx(&mut self, eth_tx_id: u32) -> Option { - { - let history_item = sqlx::query_as!( + pub async fn get_last_sent_eth_tx( + &mut self, + eth_tx_id: u32, + ) -> sqlx::Result> { + let history_item = sqlx::query_as!( StorageTxHistory, "SELECT * FROM eth_txs_history WHERE eth_tx_id = $1 ORDER BY created_at DESC LIMIT 1", eth_tx_id as i32 ) - .fetch_optional(self.storage.conn()) - .await - .unwrap(); - history_item.map(|tx| tx.into()) - } + .fetch_optional(self.storage.conn()) + .await?; + Ok(history_item.map(|tx| tx.into())) } - pub async fn get_next_nonce(&mut self) -> Option { - { - let row = sqlx::query!("SELECT nonce FROM eth_txs ORDER BY id DESC LIMIT 1") - .fetch_optional(self.storage.conn()) - .await - .unwrap(); - row.map(|row| row.nonce as u64 + 1) - } + pub async fn get_next_nonce(&mut self) -> sqlx::Result> { + let row = sqlx::query!("SELECT nonce FROM eth_txs ORDER BY id DESC LIMIT 1") + .fetch_optional(self.storage.conn()) + .await?; + Ok(row.map(|row| row.nonce as u64 + 1)) } - pub async fn mark_failed_transaction(&mut self, eth_tx_id: u32) { - { - sqlx::query!( - "UPDATE eth_txs SET has_failed = TRUE WHERE id = $1", - eth_tx_id as i32 - ) - .execute(self.storage.conn()) - .await - .unwrap(); - } + pub async fn mark_failed_transaction(&mut self, eth_tx_id: u32) -> sqlx::Result<()> { + sqlx::query!( + "UPDATE eth_txs SET has_failed = TRUE WHERE id = $1", + eth_tx_id as i32 + ) + .execute(self.storage.conn()) + .await?; + Ok(()) } - pub async fn get_number_of_failed_transactions(&mut self) -> i64 { - { - sqlx::query!("SELECT COUNT(*) FROM eth_txs WHERE has_failed = TRUE") - .fetch_one(self.storage.conn()) - .await - .unwrap() - .count - .unwrap() - } + pub async fn get_number_of_failed_transactions(&mut self) -> anyhow::Result { + sqlx::query!("SELECT COUNT(*) FROM eth_txs WHERE has_failed = TRUE") + .fetch_one(self.storage.conn()) + .await? + .count + .context("count field is missing") } - pub async fn clear_failed_transactions(&mut self) { - { - sqlx::query!( - "DELETE FROM eth_txs WHERE id >= - (SELECT MIN(id) FROM eth_txs WHERE has_failed = TRUE)" - ) - .execute(self.storage.conn()) - .await - .unwrap(); - } + pub async fn clear_failed_transactions(&mut self) -> sqlx::Result<()> { + sqlx::query!( + "DELETE FROM eth_txs WHERE id >= + (SELECT MIN(id) FROM eth_txs WHERE has_failed = TRUE)" + ) + .execute(self.storage.conn()) + .await?; + Ok(()) } } diff --git a/core/lib/zksync_core/src/api_server/web3/namespaces/debug.rs b/core/lib/zksync_core/src/api_server/web3/namespaces/debug.rs index 5be21325f588..d59c25ddbe9e 100644 --- a/core/lib/zksync_core/src/api_server/web3/namespaces/debug.rs +++ b/core/lib/zksync_core/src/api_server/web3/namespaces/debug.rs @@ -79,7 +79,8 @@ impl DebugNamespace { let call_trace = connection .blocks_web3_dal() .get_trace_for_miniblock(block_number) - .await; + .await + .unwrap(); let call_trace = call_trace .into_iter() .map(|call_trace| { diff --git a/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs b/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs index 9e55de888a15..ca8314b434c2 100644 --- a/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs +++ b/core/lib/zksync_core/src/eth_sender/eth_tx_aggregator.rs @@ -516,7 +516,12 @@ impl EthTxAggregator { &self, storage: &mut StorageProcessor<'_>, ) -> Result { - let db_nonce = storage.eth_sender_dal().get_next_nonce().await.unwrap_or(0); + let db_nonce = storage + .eth_sender_dal() + .get_next_nonce() + .await + .unwrap() + .unwrap_or(0); // Between server starts we can execute some txs using operator account or remove some txs from the database // At the start we have to consider this fact and get the max nonce. Ok(db_nonce.max(self.base_nonce))