From 100e3f337c33a95a4b73016664dfae2aee326e88 Mon Sep 17 00:00:00 2001 From: Roman Petriv Date: Tue, 17 Dec 2024 23:01:27 +0200 Subject: [PATCH 1/3] fix: properly set transaction index (#499) --- crates/core/src/node/eth.rs | 4 ++-- crates/core/src/node/in_memory.rs | 19 +++++++++++-------- e2e-tests-rust/tests/lib.rs | 16 ++++++++++++++++ 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/crates/core/src/node/eth.rs b/crates/core/src/node/eth.rs index ad95d3ee..3d1957d8 100644 --- a/crates/core/src/node/eth.rs +++ b/crates/core/src/node/eth.rs @@ -567,7 +567,7 @@ impl EthNamespaceT for InMemoryNode { let maybe_result = { // try retrieving transaction from memory, and if unavailable subsequently from the fork - reader.tx_results.get(&hash).and_then(|TransactionResult { info, .. }| { + reader.tx_results.get(&hash).and_then(|TransactionResult { info, receipt, .. }| { let input_data = info.tx.common_data.input.clone().or(None)?; let chain_id = info.tx.common_data.extract_chain_id().or(None)?; Some(zksync_types::api::Transaction { @@ -575,7 +575,7 @@ impl EthNamespaceT for InMemoryNode { nonce: U256::from(info.tx.common_data.nonce.0), block_hash: Some(hash), block_number: Some(U64::from(info.miniblock_number)), - transaction_index: Some(U64::from(0)), + transaction_index: Some(receipt.transaction_index), from: Some(info.tx.initiator_account()), to: info.tx.recipient_account(), value: info.tx.execute.value, diff --git a/crates/core/src/node/in_memory.rs b/crates/core/src/node/in_memory.rs index 19e11a16..5d83c3f0 100644 --- a/crates/core/src/node/in_memory.rs +++ b/crates/core/src/node/in_memory.rs @@ -1655,6 +1655,7 @@ impl InMemoryNode { pub fn run_l2_tx( &self, l2_tx: L2Tx, + l2_tx_index: U64, block_ctx: &BlockContext, batch_env: &L1BatchEnv, vm: &mut Vm, @@ -1730,7 +1731,7 @@ impl InMemoryNode { block_number: Some(block_ctx.miniblock.into()), l1_batch_number: Some(U64::from(batch_env.number.0)), transaction_hash: Some(tx_hash), - transaction_index: Some(U64::zero()), + transaction_index: Some(l2_tx_index), log_index: Some(U256::from(log_idx)), transaction_log_index: Some(U256::from(log_idx)), log_type: None, @@ -1745,7 +1746,7 @@ impl InMemoryNode { } let tx_receipt = TransactionReceipt { transaction_hash: tx_hash, - transaction_index: U64::from(0), + transaction_index: l2_tx_index, block_hash: block_ctx.hash, block_number: block_ctx.miniblock.into(), l1_batch_tx_index: None, @@ -1811,6 +1812,7 @@ impl InMemoryNode { // Execute transactions and bootloader let mut executed_tx_hashes = Vec::with_capacity(tx_hashes.len()); + let mut tx_index = U64::from(0); for tx in txs { // Executing a next transaction means that a previous transaction was either rolled back (in which case its snapshot // was already removed), or that we build on top of it (in which case, it can be removed now). @@ -1818,11 +1820,12 @@ impl InMemoryNode { // Save pre-execution VM snapshot. vm.make_snapshot(); let hash = tx.hash(); - if let Err(e) = self.run_l2_tx(tx, &block_ctx, &batch_env, &mut vm) { + if let Err(e) = self.run_l2_tx(tx, tx_index, &block_ctx, &batch_env, &mut vm) { tracing::error!("Error while executing transaction: {e}"); vm.rollback_to_the_latest_snapshot(); } else { executed_tx_hashes.push(hash); + tx_index += U64::from(1); } } vm.execute(InspectExecutionMode::Bootloader); @@ -1839,7 +1842,7 @@ impl InMemoryNode { let mut transactions = Vec::new(); let mut tx_receipts = Vec::new(); let mut debug_calls = Vec::new(); - for tx_hash in &executed_tx_hashes { + for (index, tx_hash) in executed_tx_hashes.iter().enumerate() { let Some(tx_result) = inner.tx_results.get(tx_hash) else { // Skipping halted transaction continue; @@ -1850,7 +1853,7 @@ impl InMemoryNode { let mut transaction = zksync_types::api::Transaction::from(tx_result.info.tx.clone()); transaction.block_hash = Some(block_ctx.hash); transaction.block_number = Some(U64::from(block_ctx.miniblock)); - transaction.transaction_index = Some(Index::zero()); + transaction.transaction_index = Some(index.into()); transaction.l1_batch_number = Some(U64::from(batch_env.number.0)); transaction.l1_batch_tx_index = Some(Index::zero()); if transaction.transaction_type == Some(U64::zero()) @@ -2073,7 +2076,7 @@ mod tests { .unwrap(); let (block_ctx, batch_env, mut vm) = test_vm(&node, system_contracts.clone()); let err = node - .run_l2_tx(tx, &block_ctx, &batch_env, &mut vm) + .run_l2_tx(tx, U64::from(0), &block_ctx, &batch_env, &mut vm) .unwrap_err(); assert_eq!(err.to_string(), "exceeds block gas limit"); } @@ -2094,7 +2097,7 @@ mod tests { .unwrap(); let (block_ctx, batch_env, mut vm) = test_vm(&node, system_contracts.clone()); let err = node - .run_l2_tx(tx, &block_ctx, &batch_env, &mut vm) + .run_l2_tx(tx, U64::from(0), &block_ctx, &batch_env, &mut vm) .unwrap_err(); assert_eq!( @@ -2119,7 +2122,7 @@ mod tests { .unwrap(); let (block_ctx, batch_env, mut vm) = test_vm(&node, system_contracts.clone()); let err = node - .run_l2_tx(tx, &block_ctx, &batch_env, &mut vm) + .run_l2_tx(tx, U64::from(0), &block_ctx, &batch_env, &mut vm) .unwrap_err(); assert_eq!( diff --git a/e2e-tests-rust/tests/lib.rs b/e2e-tests-rust/tests/lib.rs index 42b5ccdc..2949c268 100644 --- a/e2e-tests-rust/tests/lib.rs +++ b/e2e-tests-rust/tests/lib.rs @@ -398,3 +398,19 @@ async fn cli_allow_origin() -> anyhow::Result<()> { Ok(()) } + +#[tokio::test] +async fn transactions_have_index() -> anyhow::Result<()> { + let provider = init_testing_provider(|node| node.no_mine()).await?; + let tx1 = provider.tx().with_rich_from(0).register().await?; + let tx2 = provider.tx().with_rich_from(1).register().await?; + + provider.anvil_mine(Some(U256::from(1)), None).await?; + + let receipt1 = tx1.wait_until_finalized().await?; + let receipt2 = tx2.wait_until_finalized().await?; + + assert_eq!(receipt1.transaction_index(), 0.into()); + assert_eq!(receipt2.transaction_index(), 1.into()); + Ok(()) +} From e279c2100c7ed96685380c05d795be62581fc1c8 Mon Sep 17 00:00:00 2001 From: Roman Petriv Date: Wed, 18 Dec 2024 00:14:11 +0200 Subject: [PATCH 2/3] fix: don't print console logs title when no console logs printed (#500) * fix: don't print console logs title when no console logs printed * fix: lint --- crates/core/src/console_log.rs | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/crates/core/src/console_log.rs b/crates/core/src/console_log.rs index d0aac4e5..89cb3807 100644 --- a/crates/core/src/console_log.rs +++ b/crates/core/src/console_log.rs @@ -36,25 +36,33 @@ impl Default for ConsoleLogHandler { impl ConsoleLogHandler { pub fn handle_calls_recursive(&self, calls: &Vec) { - tracing::info!(""); - tracing::info!("==== Console logs: "); - + let mut messages: Vec = vec![]; for call in calls { - self.handle_call_recursive(call); + self.handle_call_recursive(call, &mut messages); + } + + if !messages.is_empty() { + tracing::info!(""); + tracing::info!("==== Console logs: "); + } + for message in messages { + tracing::info!("{}", message.cyan()); } } - pub fn handle_call_recursive(&self, current_call: &Call) { - self.handle_call(current_call); + pub fn handle_call_recursive(&self, current_call: &Call, messages: &mut Vec) { + if let Some(message) = self.handle_call(current_call) { + messages.push(message); + }; for call in ¤t_call.calls { - self.handle_call_recursive(call); + self.handle_call_recursive(call, messages); } } - pub fn handle_call(&self, current_call: &Call) { + pub fn handle_call(&self, current_call: &Call) -> Option { if current_call.to != self.target_contract { - return; + return None; } if current_call.input.len() < 4 { - return; + return None; } let signature = ¤t_call.input[..4]; let message = @@ -67,7 +75,7 @@ impl ConsoleLogHandler { tokens.iter().map(|t| format!("{}", t)).join(" ") }) }); - tracing::info!("{}", message.cyan()); + Some(message) } } From 4f3526899378a453f1fae6701496f40b44db3ad7 Mon Sep 17 00:00:00 2001 From: Roman Petriv Date: Wed, 18 Dec 2024 00:27:06 +0200 Subject: [PATCH 3/3] feat: fees transactions order in mempool (#492) * feat: fees transactions order in mempool * fix: fix locking * fix: lint * fix: fix tests and refactoring * fix: make FIFO default tx order * chore: add tests * chore: rename transactions_order --- crates/cli/src/cli.rs | 9 +- crates/cli/src/main.rs | 2 +- crates/config/src/config.rs | 10 + crates/config/src/types/mod.rs | 2 + crates/config/src/types/transaction_order.rs | 57 +++++ crates/core/src/node/in_memory.rs | 17 +- crates/core/src/node/in_memory_ext.rs | 6 +- crates/core/src/node/pool.rs | 208 +++++++++++++++---- crates/core/src/node/sealer.rs | 19 +- e2e-tests-rust/src/provider/testing.rs | 6 + e2e-tests-rust/tests/lib.rs | 37 ++++ 11 files changed, 307 insertions(+), 66 deletions(-) create mode 100644 crates/config/src/types/transaction_order.rs diff --git a/crates/cli/src/cli.rs b/crates/cli/src/cli.rs index b0466a45..9be830ca 100644 --- a/crates/cli/src/cli.rs +++ b/crates/cli/src/cli.rs @@ -7,7 +7,7 @@ use anvil_zksync_config::types::{ AccountGenerator, CacheConfig, CacheType, Genesis, LogLevel, ShowCalls, ShowGasDetails, ShowStorageLogs, ShowVMDetails, SystemContractsOptions, }; -use anvil_zksync_config::TestNodeConfig; +use anvil_zksync_config::{types::TransactionOrder, TestNodeConfig}; use clap::{arg, command, Parser, Subcommand}; use rand::{rngs::StdRng, SeedableRng}; use std::env; @@ -251,6 +251,10 @@ pub struct Cli { /// Disable CORS. #[arg(long, default_missing_value = "true", num_args(0..=1), conflicts_with = "allow_origin", help_heading = "Server options")] pub no_cors: Option, + + /// Transaction ordering in the mempool. + #[arg(long, default_value = "fifo")] + pub order: TransactionOrder, } #[derive(Debug, Subcommand, Clone)] @@ -391,7 +395,8 @@ impl Cli { .with_block_time(self.block_time) .with_no_mining(self.no_mining) .with_allow_origin(self.allow_origin) - .with_no_cors(self.no_cors); + .with_no_cors(self.no_cors) + .with_transaction_order(self.order); if self.emulate_evm && self.dev_system_contracts != Some(SystemContractsOptions::Local) { return Err(eyre::eyre!( diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 17938cee..452c4356 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -276,7 +276,7 @@ async fn main() -> anyhow::Result<()> { let time = TimestampManager::default(); let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation.clone()); + let pool = TxPool::new(impersonation.clone(), config.transaction_order); let sealing_mode = if config.no_mining { BlockSealerMode::noop() } else if let Some(block_time) = config.block_time { diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index e0291d43..fd399239 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -115,6 +115,8 @@ pub struct TestNodeConfig { pub allow_origin: String, /// Disable CORS if true pub no_cors: bool, + /// How transactions are sorted in the mempool + pub transaction_order: TransactionOrder, } impl Default for TestNodeConfig { @@ -175,6 +177,7 @@ impl Default for TestNodeConfig { no_mining: false, max_transactions: 1000, + transaction_order: TransactionOrder::Fifo, // Server configuration allow_origin: "*".to_string(), @@ -878,6 +881,13 @@ impl TestNodeConfig { self } + // Set transactions order in the mempool + #[must_use] + pub fn with_transaction_order(mut self, transaction_order: TransactionOrder) -> Self { + self.transaction_order = transaction_order; + self + } + // Set allow_origin CORS header #[must_use] pub fn with_allow_origin(mut self, allow_origin: String) -> Self { diff --git a/crates/config/src/types/mod.rs b/crates/config/src/types/mod.rs index 2c216f4d..671cf270 100644 --- a/crates/config/src/types/mod.rs +++ b/crates/config/src/types/mod.rs @@ -3,6 +3,7 @@ mod cache; mod genesis; mod log; mod show_details; +mod transaction_order; pub use account_generator::AccountGenerator; pub use cache::{CacheConfig, CacheType}; @@ -11,6 +12,7 @@ pub use genesis::Genesis; pub use log::LogLevel; use serde::Deserialize; pub use show_details::{ShowCalls, ShowGasDetails, ShowStorageLogs, ShowVMDetails}; +pub use transaction_order::{TransactionOrder, TransactionPriority}; #[derive(Deserialize, Default, Debug, Copy, Clone, PartialEq, ValueEnum)] pub enum SystemContractsOptions { diff --git a/crates/config/src/types/transaction_order.rs b/crates/config/src/types/transaction_order.rs new file mode 100644 index 00000000..1ab7c05a --- /dev/null +++ b/crates/config/src/types/transaction_order.rs @@ -0,0 +1,57 @@ +use std::fmt; +use std::str::FromStr; +use zksync_types::{l2::L2Tx, U256}; + +/// Metric value for the priority of a transaction. +/// +/// The `TransactionPriority` determines the ordering of two transactions. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +pub struct TransactionPriority(pub U256); + +/// Modes that determine the transaction ordering of the mempool +/// +/// This type controls the transaction order via the priority metric of a transaction +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum TransactionOrder { + /// Keep the pool transactions sorted in the order they arrive. + /// + /// This will essentially assign every transaction the exact priority so the order is + /// determined by their internal submission number + Fifo, + /// This means that it prioritizes transactions based on the fees paid to the miner. + #[default] + Fees, +} + +impl TransactionOrder { + /// Returns the priority of the transactions + pub fn priority(&self, tx: &L2Tx) -> TransactionPriority { + match self { + Self::Fifo => TransactionPriority::default(), + Self::Fees => TransactionPriority(tx.common_data.fee.max_fee_per_gas), + } + } +} + +impl FromStr for TransactionOrder { + type Err = String; + + fn from_str(s: &str) -> Result { + let s = s.to_lowercase(); + let order = match s.as_str() { + "fees" => Self::Fees, + "fifo" => Self::Fifo, + _ => return Err(format!("Unknown TransactionOrder: `{s}`")), + }; + Ok(order) + } +} + +impl fmt::Display for TransactionOrder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + TransactionOrder::Fifo => f.write_str("fifo"), + TransactionOrder::Fees => f.write_str("fees"), + } + } +} diff --git a/crates/core/src/node/in_memory.rs b/crates/core/src/node/in_memory.rs index 5d83c3f0..aee9f8f3 100644 --- a/crates/core/src/node/in_memory.rs +++ b/crates/core/src/node/in_memory.rs @@ -25,7 +25,7 @@ use anvil_zksync_config::constants::{ }; use anvil_zksync_config::types::{ CacheConfig, Genesis, ShowCalls, ShowGasDetails, ShowStorageLogs, ShowVMDetails, - SystemContractsOptions, + SystemContractsOptions, TransactionOrder, }; use anvil_zksync_config::TestNodeConfig; use colored::Colorize; @@ -1108,7 +1108,7 @@ fn contract_address_from_tx_result(execution_result: &VmExecutionResultAndLogs) impl Default for InMemoryNode { fn default() -> Self { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation.clone()); + let pool = TxPool::new(impersonation.clone(), TransactionOrder::Fifo); let tx_listener = pool.add_tx_listener(); InMemoryNode::new( None, @@ -1161,7 +1161,7 @@ impl InMemoryNode { // TODO: Refactor InMemoryNode with a builder pattern pub fn default_fork(fork: Option) -> Self { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation.clone()); + let pool = TxPool::new(impersonation.clone(), TransactionOrder::Fifo); let tx_listener = pool.add_tx_listener(); Self::new( fork, @@ -1263,7 +1263,10 @@ impl InMemoryNode { tracing::debug!(count = txs.len(), "applying transactions"); // Create a temporary tx pool (i.e. state is not shared with the node mempool). - let pool = TxPool::new(self.impersonation.clone()); + let pool = TxPool::new( + self.impersonation.clone(), + self.read_inner()?.config.transaction_order, + ); pool.add_txs(txs); // Lock time so that the produced blocks are guaranteed to be sequential in time. @@ -2035,7 +2038,7 @@ mod tests { DEFAULT_ESTIMATE_GAS_SCALE_FACTOR, DEFAULT_FAIR_PUBDATA_PRICE, DEFAULT_L2_GAS_PRICE, TEST_NODE_NETWORK_ID, }; - use anvil_zksync_config::types::SystemContractsOptions; + use anvil_zksync_config::types::{SystemContractsOptions, TransactionOrder}; use anvil_zksync_config::TestNodeConfig; use ethabi::{Token, Uint}; use zksync_types::{utils::deployed_address_create, K256PrivateKey, Nonce}; @@ -2160,7 +2163,7 @@ mod tests { raw_storage: external_storage.inner.read().unwrap().raw_storage.clone(), }; let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation.clone()); + let pool = TxPool::new(impersonation.clone(), TransactionOrder::Fifo); let sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener())); let node = InMemoryNode::new( Some(ForkDetails { @@ -2200,7 +2203,7 @@ mod tests { #[tokio::test] async fn test_transact_returns_data_in_built_in_without_security_mode() { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation.clone()); + let pool = TxPool::new(impersonation.clone(), TransactionOrder::Fifo); let sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener())); let node = InMemoryNode::new( None, diff --git a/crates/core/src/node/in_memory_ext.rs b/crates/core/src/node/in_memory_ext.rs index a80984e3..0503ffc1 100644 --- a/crates/core/src/node/in_memory_ext.rs +++ b/crates/core/src/node/in_memory_ext.rs @@ -441,7 +441,8 @@ impl InMemoryNode { } pub fn remove_pool_transactions(&self, address: Address) -> Result<()> { - self.pool.drop_transactions_by_sender(address); + self.pool + .drop_transactions(|tx| tx.transaction.common_data.initiator_address == address); Ok(()) } @@ -499,6 +500,7 @@ mod tests { use crate::node::time::{ReadTime, TimestampManager}; use crate::node::InMemoryNode; use crate::node::{BlockSealer, ImpersonationManager, InMemoryNodeInner, Snapshot, TxPool}; + use anvil_zksync_config::types::TransactionOrder; use std::str::FromStr; use std::sync::{Arc, RwLock}; use zksync_multivm::interface::storage::ReadStorage; @@ -626,7 +628,7 @@ mod tests { rich_accounts: Default::default(), previous_states: Default::default(), }; - let pool = TxPool::new(impersonation.clone()); + let pool = TxPool::new(impersonation.clone(), TransactionOrder::Fifo); let sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener())); let node = InMemoryNode { diff --git a/crates/core/src/node/pool.rs b/crates/core/src/node/pool.rs index 074adef1..51231be3 100644 --- a/crates/core/src/node/pool.rs +++ b/crates/core/src/node/pool.rs @@ -1,59 +1,96 @@ use crate::node::impersonate::ImpersonationManager; +use anvil_zksync_config::types::{TransactionOrder, TransactionPriority}; use futures::channel::mpsc::{channel, Receiver, Sender}; -use itertools::Itertools; -use std::sync::{Arc, Mutex, RwLock}; +use std::cmp::Ordering; +use std::collections::BTreeSet; +use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use zksync_types::l2::L2Tx; -use zksync_types::{Address, H256}; +use zksync_types::H256; #[derive(Clone)] pub struct TxPool { - inner: Arc>>, + inner: Arc>>, + /// Transaction ordering in the mempool. + transaction_order: Arc>, + /// Used to preserve transactions submission order in the pool + submission_number: Arc>, /// Listeners for new transactions' hashes tx_listeners: Arc>>>, pub(crate) impersonation: ImpersonationManager, } impl TxPool { - pub fn new(impersonation: ImpersonationManager) -> Self { + pub fn new(impersonation: ImpersonationManager, transaction_order: TransactionOrder) -> Self { Self { - inner: Arc::new(RwLock::new(Vec::new())), + inner: Arc::new(RwLock::new(BTreeSet::new())), + submission_number: Arc::new(Mutex::new(0)), tx_listeners: Arc::new(Mutex::new(Vec::new())), impersonation, + transaction_order: Arc::new(RwLock::new(transaction_order)), } } + fn lock_submission_number(&self) -> MutexGuard<'_, u64> { + self.submission_number + .lock() + .expect("submission_number lock is poisoned") + } + + fn read_transaction_order(&self) -> RwLockReadGuard<'_, TransactionOrder> { + self.transaction_order + .read() + .expect("transaction_order lock is poisoned") + } + pub fn add_tx(&self, tx: L2Tx) { - let mut guard = self.inner.write().expect("TxPool lock is poisoned"); let hash = tx.hash(); - guard.push(tx); + let priority = self.read_transaction_order().priority(&tx); + let mut submission_number = self.lock_submission_number(); + *submission_number = submission_number.wrapping_add(1); + + let mut guard = self.inner.write().expect("TxPool lock is poisoned"); + guard.insert(PoolTransaction { + transaction: tx, + submission_number: *submission_number, + priority, + }); self.notify_listeners(hash); } - pub fn add_txs(&self, txs: impl IntoIterator) { + pub fn add_txs(&self, txs: Vec) { + let transaction_order = self.read_transaction_order(); + let mut submission_number = self.lock_submission_number(); + let mut guard = self.inner.write().expect("TxPool lock is poisoned"); for tx in txs { let hash = tx.hash(); - guard.push(tx); + let priority = transaction_order.priority(&tx); + *submission_number = submission_number.wrapping_add(1); + guard.insert(PoolTransaction { + transaction: tx, + submission_number: *submission_number, + priority, + }); self.notify_listeners(hash); } } /// Removes a single transaction from the pool pub fn drop_transaction(&self, hash: H256) -> Option { - let mut guard = self.inner.write().expect("TxPool lock is poisoned"); - let (position, _) = guard.iter_mut().find_position(|tx| tx.hash() == hash)?; - Some(guard.remove(position)) + let dropped = self.drop_transactions(|tx| tx.transaction.hash() == hash); + dropped.first().cloned() } - /// Remove transactions by sender - pub fn drop_transactions_by_sender(&self, sender: Address) -> Vec { + /// Remove transactions matching the specified condition + pub fn drop_transactions(&self, f: F) -> Vec + where + F: Fn(&PoolTransaction) -> bool, + { let mut guard = self.inner.write().expect("TxPool lock is poisoned"); let txs = std::mem::take(&mut *guard); - let (sender_txs, other_txs) = txs - .into_iter() - .partition(|tx| tx.common_data.initiator_address == sender); + let (matching_txs, other_txs) = txs.into_iter().partition(f); *guard = other_txs; - sender_txs + matching_txs.into_iter().map(|tx| tx.transaction).collect() } /// Removes all transactions from the pool @@ -70,27 +107,38 @@ impl TxPool { return None; } let mut guard = self.inner.write().expect("TxPool lock is poisoned"); - let mut iter = guard.iter(); - let Some(head_tx) = iter.next() else { + let Some(head_tx) = guard.pop_last() else { // Pool is empty return None; }; - let (impersonating, tx_count) = self.impersonation.inspect(|state| { + let mut taken_txs = vec![]; + let impersonating = self.impersonation.inspect(|state| { // First tx's impersonation status decides what all other txs' impersonation status is // expected to be. - let impersonating = state.is_impersonating(&head_tx.common_data.initiator_address); - let tail_txs = iter - // Guaranteed to be non-zero - .take(n - 1) - .take_while(|tx| { - impersonating == state.is_impersonating(&tx.common_data.initiator_address) - }); - // The amount of transactions that can be taken from the pool; `+1` accounts for `head_tx`. - (impersonating, tail_txs.count() + 1) + let impersonating = + state.is_impersonating(&head_tx.transaction.common_data.initiator_address); + taken_txs.insert(0, head_tx.transaction); + let mut taken_txs_number = 1; + + while taken_txs_number < n { + let Some(next_tx) = guard.last() else { + break; + }; + if impersonating + != state.is_impersonating(&next_tx.transaction.common_data.initiator_address) + { + break; + } + taken_txs.insert(taken_txs_number, guard.pop_last().unwrap().transaction); + taken_txs_number += 1; + } + impersonating }); - let txs = guard.drain(0..tx_count).collect(); - Some(TxBatch { impersonating, txs }) + Some(TxBatch { + impersonating, + txs: taken_txs, + }) } /// Adds a new transaction listener to the pool that gets notified about every new transaction. @@ -158,18 +206,53 @@ pub struct TxBatch { pub txs: Vec, } +/// A reference to a transaction in the pool +#[derive(Clone, Debug)] +pub struct PoolTransaction { + /// actual transaction + pub transaction: L2Tx, + /// Used to internally compare the transaction in the pool + pub submission_number: u64, + /// priority of the transaction + pub priority: TransactionPriority, +} + +impl Eq for PoolTransaction {} + +impl PartialEq for PoolTransaction { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl PartialOrd for PoolTransaction { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PoolTransaction { + fn cmp(&self, other: &Self) -> Ordering { + self.priority + .cmp(&other.priority) + .then_with(|| other.submission_number.cmp(&self.submission_number)) + } +} + #[cfg(test)] mod tests { use crate::node::impersonate::ImpersonationState; use crate::node::pool::TxBatch; use crate::node::{ImpersonationManager, TxPool}; use crate::testing; + use anvil_zksync_config::types::TransactionOrder; use test_case::test_case; + use zksync_types::{l2::L2Tx, U256}; #[test] fn take_from_empty() { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); assert_eq!(pool.take_uniform(1), None); } @@ -177,7 +260,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_zero(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); pool.populate_impersonate([imp]); assert_eq!(pool.take_uniform(0), None); @@ -187,7 +270,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_exactly_one(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let [tx0, ..] = pool.populate_impersonate([imp, false]); assert_eq!( @@ -203,7 +286,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_exactly_two(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, false]); assert_eq!( @@ -219,7 +302,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_one_eligible(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let [tx0, ..] = pool.populate_impersonate([imp, !imp, !imp, !imp]); assert_eq!( @@ -237,7 +320,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_two_when_third_is_not_uniform(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, !imp]); assert_eq!( @@ -255,7 +338,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_interrupted_by_non_uniformness(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let [tx0, tx1, ..] = pool.populate_impersonate([imp, imp, !imp, imp]); assert_eq!( @@ -271,7 +354,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_multiple(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let [tx0, tx1, tx2, tx3] = pool.populate_impersonate([imp, !imp, !imp, imp]); assert_eq!( @@ -301,7 +384,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn pool_clones_share_state(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let txs = { let pool_clone = pool.clone(); @@ -320,7 +403,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_multiple_from_clones(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let [tx0, tx1, tx2, tx3] = { let pool_clone = pool.clone(); @@ -356,7 +439,7 @@ mod tests { #[test_case(true ; "is impersonated")] fn take_respects_impersonation_change(imp: bool) { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation); + let pool = TxPool::new(impersonation, TransactionOrder::Fifo); let [tx0, tx1, tx2, tx3] = pool.populate_impersonate([imp, imp, !imp, imp]); assert_eq!( @@ -388,7 +471,7 @@ mod tests { #[tokio::test] async fn take_uses_consistent_impersonation() { let impersonation = ImpersonationManager::default(); - let pool = TxPool::new(impersonation.clone()); + let pool = TxPool::new(impersonation.clone(), TransactionOrder::Fifo); for _ in 0..4096 { let tx = testing::TransactionBuilder::new().build(); @@ -415,4 +498,39 @@ mod tests { // a change in impersonation state partway through iterating the transactions. assert_eq!(tx_batch.txs.len(), 4096); } + + #[tokio::test] + async fn take_uses_transaction_order() { + let impersonation = ImpersonationManager::default(); + let pool_fifo = TxPool::new(impersonation.clone(), TransactionOrder::Fifo); + let pool_fees = TxPool::new(impersonation.clone(), TransactionOrder::Fees); + + let txs: Vec = [1, 2, 3] + .iter() + .map(|index| { + let tx = testing::TransactionBuilder::new() + .set_max_fee_per_gas(U256::from(50_000_000 + index)) + .build(); + pool_fifo.add_tx(tx.clone()); + pool_fees.add_tx(tx.clone()); + tx + }) + .collect(); + + assert_eq!( + pool_fifo.take_uniform(3), + Some(TxBatch { + impersonating: false, + txs: vec![txs[0].clone(), txs[1].clone(), txs[2].clone()] + }) + ); + + assert_eq!( + pool_fees.take_uniform(3), + Some(TxBatch { + impersonating: false, + txs: vec![txs[2].clone(), txs[1].clone(), txs[0].clone()] + }) + ); + } } diff --git a/crates/core/src/node/sealer.rs b/crates/core/src/node/sealer.rs index 9d3e379f..8e9b4c92 100644 --- a/crates/core/src/node/sealer.rs +++ b/crates/core/src/node/sealer.rs @@ -154,6 +154,7 @@ mod tests { use crate::node::pool::TxBatch; use crate::node::sealer::BlockSealerMode; use crate::node::{BlockSealer, ImpersonationManager, TxPool}; + use anvil_zksync_config::types::TransactionOrder; use std::ptr; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::time::Duration; @@ -175,7 +176,7 @@ mod tests { #[test] fn immediate_empty() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener())); let waker = &WAKER_NOOP; @@ -186,7 +187,7 @@ mod tests { #[test] fn immediate_one_tx() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener())); let waker = &WAKER_NOOP; @@ -206,7 +207,7 @@ mod tests { #[test] fn immediate_several_txs() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener())); let waker = &WAKER_NOOP; @@ -226,7 +227,7 @@ mod tests { #[test] fn immediate_respect_max_txs() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(3, pool.add_tx_listener())); let waker = &WAKER_NOOP; @@ -247,7 +248,7 @@ mod tests { #[test] fn immediate_gradual_txs() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::immediate(1000, pool.add_tx_listener())); let waker = &WAKER_NOOP; @@ -285,7 +286,7 @@ mod tests { #[tokio::test] async fn fixed_time_very_long() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::fixed_time( 1000, Duration::from_secs(10000), @@ -298,7 +299,7 @@ mod tests { #[tokio::test] async fn fixed_time_seal_empty() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::fixed_time( 1000, Duration::from_millis(100), @@ -335,7 +336,7 @@ mod tests { #[tokio::test] async fn fixed_time_seal_with_txs() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::fixed_time( 1000, Duration::from_millis(100), @@ -359,7 +360,7 @@ mod tests { #[tokio::test] async fn fixed_time_respect_max_txs() { - let pool = TxPool::new(ImpersonationManager::default()); + let pool = TxPool::new(ImpersonationManager::default(), TransactionOrder::Fifo); let mut block_sealer = BlockSealer::new(BlockSealerMode::fixed_time(3, Duration::from_millis(100))); let waker = &WAKER_NOOP; diff --git a/e2e-tests-rust/src/provider/testing.rs b/e2e-tests-rust/src/provider/testing.rs index 897d60e4..ae1ad52a 100644 --- a/e2e-tests-rust/src/provider/testing.rs +++ b/e2e-tests-rust/src/provider/testing.rs @@ -469,6 +469,12 @@ where self } + /// Builder-pattern method for setting max fee per gas. + pub fn with_max_fee_per_gas(mut self, max_fee_per_gas: u128) -> Self { + self.inner = self.inner.with_max_fee_per_gas(max_fee_per_gas); + self + } + /// Submits transaction to the node. /// /// This does not wait for the transaction to be confirmed, but returns a [`PendingTransactionFinalizable`] diff --git a/e2e-tests-rust/tests/lib.rs b/e2e-tests-rust/tests/lib.rs index 2949c268..2dafad94 100644 --- a/e2e-tests-rust/tests/lib.rs +++ b/e2e-tests-rust/tests/lib.rs @@ -5,6 +5,7 @@ use anvil_zksync_e2e_tests::{ init_testing_provider, init_testing_provider_with_http_headers, AnvilZKsyncApi, ReceiptExt, ZksyncWalletProviderExt, DEFAULT_TX_VALUE, }; use alloy::{ + network::primitives::BlockTransactionsKind, primitives::U256, signers::local::PrivateKeySigner, }; @@ -399,6 +400,42 @@ async fn cli_allow_origin() -> anyhow::Result<()> { Ok(()) } +#[tokio::test] +async fn pool_txs_order_fifo() -> anyhow::Result<()> { + let provider_fifo = init_testing_provider(|node| node.no_mine()).await?; + + let pending_tx0 = provider_fifo.tx().with_rich_from(0).with_max_fee_per_gas(50_000_000).register().await?; + let pending_tx1 = provider_fifo.tx().with_rich_from(1).with_max_fee_per_gas(100_000_000).register().await?; + let pending_tx2 = provider_fifo.tx().with_rich_from(2).with_max_fee_per_gas(150_000_000).register().await?; + + provider_fifo.anvil_mine(Some(U256::from(1)), None).await?; + + let block = provider_fifo.get_block(1.into(), BlockTransactionsKind::Hashes).await?.unwrap(); + let tx_hashes = block.transactions.as_hashes().unwrap(); + assert_eq!(&tx_hashes[0], pending_tx0.tx_hash()); + assert_eq!(&tx_hashes[1], pending_tx1.tx_hash()); + assert_eq!(&tx_hashes[2], pending_tx2.tx_hash()); + Ok(()) +} + +#[tokio::test] +async fn pool_txs_order_fees() -> anyhow::Result<()> { + let provider_fees = init_testing_provider(|node| node.no_mine().arg("--order=fees")).await?; + + let pending_tx0 = provider_fees.tx().with_rich_from(0).with_max_fee_per_gas(50_000_000).register().await?; + let pending_tx1 = provider_fees.tx().with_rich_from(1).with_max_fee_per_gas(100_000_000).register().await?; + let pending_tx2 = provider_fees.tx().with_rich_from(2).with_max_fee_per_gas(150_000_000).register().await?; + + provider_fees.anvil_mine(Some(U256::from(1)), None).await?; + + let block = provider_fees.get_block(1.into(), BlockTransactionsKind::Hashes).await?.unwrap(); + let tx_hashes = block.transactions.as_hashes().unwrap(); + assert_eq!(&tx_hashes[0], pending_tx2.tx_hash()); + assert_eq!(&tx_hashes[1], pending_tx1.tx_hash()); + assert_eq!(&tx_hashes[2], pending_tx0.tx_hash()); + Ok(()) +} + #[tokio::test] async fn transactions_have_index() -> anyhow::Result<()> { let provider = init_testing_provider(|node| node.no_mine()).await?;