diff --git a/ethers-middleware/src/gas_escalator/mod.rs b/ethers-middleware/src/gas_escalator/mod.rs index 71e089790..ce5ce6d81 100644 --- a/ethers-middleware/src/gas_escalator/mod.rs +++ b/ethers-middleware/src/gas_escalator/mod.rs @@ -13,7 +13,7 @@ use thiserror::Error; use tracing::{self, instrument}; use tracing_futures::Instrument; -use ethers_core::types::{transaction::eip2718::TypedTransaction, BlockId, TxHash, U256}; +use ethers_core::types::{transaction::eip2718::TypedTransaction, BlockId, TxHash, H256, U256}; use ethers_providers::{interval, FromErr, Middleware, PendingTransaction, StreamExt}; #[cfg(not(target_arch = "wasm32"))] @@ -246,7 +246,7 @@ where let esc = EscalationTask { inner, escalator, frequency, txs }; - spawn(esc.escalate().instrument(tracing::debug_span!("gas_escalation"))); + spawn(esc.monitor().instrument(tracing::debug_span!("gas_escalation"))); Self { inner: this } } @@ -267,13 +267,111 @@ impl EscalationTask { Self { inner, escalator, frequency, txs } } - async fn escalate(mut self) -> Result<(), GasEscalatorError> + async fn broadcast_tx_if_escalated( + &self, + old_monitored_tx: MonitoredTransaction, + new_tx: TypedTransaction, + ) -> Result, GasEscalatorError> + where + M: Middleware, + E: GasEscalator, + { + // gas price wasn't escalated + // keep monitoring the old tx + if old_monitored_tx.inner.eq(&new_tx) { + return Ok(Some((old_monitored_tx.hash, old_monitored_tx.creation_time))); + } + + // send a replacement tx with the escalated gas price + match self.inner.send_transaction(new_tx.clone(), old_monitored_tx.block).await { + Ok(new_tx_hash) => { + let new_tx_hash = *new_tx_hash; + tracing::debug!( + old_tx = ?old_monitored_tx, + new_tx = ?new_tx, + "escalated gas price" + ); + // Return the new tx hash to monitor and the time it was created. + // The latter is used to know when to escalate the gas price again + Ok(Some((new_tx_hash, Instant::now()))) + } + Err(err) => { + if err.to_string().contains("nonce too low") { + // may happen if we try to broadcast a new, gas-escalated tx when the original tx + // already landed onchain, meaning we no longer need to escalate it + tracing::warn!(err = %err, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been included onchain. Dropping it from escalator"); + Ok(None) + } else { + tracing::error!( + err = %err, + "Unexpected error. Killing escalator backend." + ); + Err(GasEscalatorError::MiddlewareError(err)) + } + } + } + } + + async fn escalate_stuck_txs(&self) -> Result<(), GasEscalatorError> + where + M: Middleware, + E: GasEscalator, + { + // We take monitored txs out of the mutex, and add them back if they weren't included yet + let monitored_txs: Vec<_> = { + let mut txs = self.txs.lock().await; + std::mem::take(&mut (*txs)) + // Lock scope ends + }; + + if !monitored_txs.is_empty() { + tracing::trace!(?monitored_txs, "In the escalator watcher loop. Monitoring txs"); + } + let mut new_txs_to_monitor = vec![]; + for monitored_tx in monitored_txs { + let receipt = self + .inner + .get_transaction_receipt(monitored_tx.hash) + .await + .map_err(GasEscalatorError::MiddlewareError)?; + + tracing::trace!(tx_hash = ?monitored_tx.hash, "checking if exists"); + + if receipt.is_some() { + // tx was already included, can drop from escalator + continue; + } + let Some(new_tx) = monitored_tx.escalate_gas_price(self.escalator.clone()) else { + tracing::error!(tx=?monitored_tx.hash, "gas price is not set for transaction, dropping from escalator"); + continue; + }; + + let maybe_tx_to_monitor = + self.broadcast_tx_if_escalated(monitored_tx.clone(), new_tx.clone()).await?; + + if let Some((new_txhash, new_creation_time)) = maybe_tx_to_monitor { + new_txs_to_monitor.push(MonitoredTransaction { + hash: new_txhash, + inner: new_tx, + creation_time: new_creation_time, + block: monitored_tx.block, + }); + } + } + // we add the new txs to monitor back to the list + // we don't replace here, as the vec in the mutex may contain + // items! + self.txs.lock().await.extend(new_txs_to_monitor); + Ok(()) + } + + async fn monitor(self) -> Result<(), GasEscalatorError> where M: Middleware, E: GasEscalator, { // the escalation frequency is either on a per-block basis, or on a duration basis - let watcher: WatcherFuture = match self.frequency { + let escalation_frequency_watcher: WatcherFuture = match self.frequency { Frequency::PerBlock => Box::pin( self.inner .watch_blocks() @@ -284,90 +382,9 @@ impl EscalationTask { Frequency::Duration(ms) => Box::pin(interval(std::time::Duration::from_millis(ms))), }; - let mut watcher = watcher.fuse(); - - while watcher.next().await.is_some() { - // We take the contents of the mutex, and then add them back in - // later. - let mut txs: Vec<_> = { - let mut txs = self.txs.lock().await; - std::mem::take(&mut (*txs)) - // Lock scope ends - }; - - let len = txs.len(); - if len > 0 { - tracing::debug!(?txs, "In the escalator watcher loop. Monitoring txs"); - } - // Pop all transactions and re-insert those that have not been included yet - for _ in 0..len { - // this must never panic as we're explicitly within bounds - let old_monitored_tx = txs.pop().expect("should have element in vector"); - - let receipt = self - .inner - .get_transaction_receipt(old_monitored_tx.hash) - .await - .map_err(GasEscalatorError::MiddlewareError)?; - - tracing::trace!(tx_hash = ?old_monitored_tx.hash, "checking if exists"); - - if receipt.is_none() { - let Some(new_tx) = old_monitored_tx.escalate_gas_price(self.escalator.clone()) - else { - tracing::error!(tx=?old_monitored_tx.hash, "gas price is not set for transaction, dropping from escalator"); - continue; - }; - - // gas price wasn't escalated - let (new_txhash, new_creation_time) = if old_monitored_tx.inner.eq(&new_tx) { - (old_monitored_tx.hash, old_monitored_tx.creation_time) - } else { - // the tx hash will be different so we need to update it - match self - .inner - .send_transaction(new_tx.clone(), old_monitored_tx.block) - .await - { - Ok(new_tx_hash) => { - let new_tx_hash = *new_tx_hash; - tracing::debug!( - old_tx = ?old_monitored_tx, - new_tx = ?new_tx, - "escalated gas price" - ); - (new_tx_hash, Instant::now()) - } - Err(err) => { - if err.to_string().contains("nonce too low") { - // may happen if we try to broadcast a higher - // gas price tx when one of the previous ones - // was already mined (meaning we also do not - // push it back to the pending txs vector) - tracing::warn!(err = %err, ?old_monitored_tx.hash, ?new_tx, "Nonce error when escalating gas price. Tx may have already been mined."); - continue; - } else { - tracing::error!( - err = %err, - "Killing escalator backend" - ); - return Err(GasEscalatorError::MiddlewareError(err)); - } - } - } - }; - txs.push(MonitoredTransaction { - hash: new_txhash, - inner: new_tx, - creation_time: new_creation_time, - block: old_monitored_tx.block, - }); - } - } - // after this big ugly loop, we dump everything back in - // we don't replace here, as the vec in the mutex may contain - // items! - self.txs.lock().await.extend(txs); + let mut escalation_frequency_watcher = escalation_frequency_watcher.fuse(); + while escalation_frequency_watcher.next().await.is_some() { + self.escalate_stuck_txs().await?; } tracing::error!("timing future has gone away"); Ok(()) diff --git a/ethers-middleware/tests/gas_escalator.rs b/ethers-middleware/tests/gas_escalator.rs index 3431bf85a..3cbea38ee 100644 --- a/ethers-middleware/tests/gas_escalator.rs +++ b/ethers-middleware/tests/gas_escalator.rs @@ -33,6 +33,7 @@ async fn gas_escalator_legacy_works() { let escalator = GeometricGasPrice::new(1.1, 2u64, Some(2_000_000_000_000u64)); let provider = GasEscalatorMiddleware::new(provider, escalator, Frequency::Duration(300)); + // TODO: get this to work // set the gas price to 10 gwei, so we need to escalate twice // this works but the tx still goes through regardless of its gas price for some reason // reqwest::Client::new() @@ -80,6 +81,7 @@ async fn gas_escalator_1559_works() { let escalator = GeometricGasPrice::new(1.1, 2u64, Some(2_000_000_000_000u64)); let provider = GasEscalatorMiddleware::new(provider, escalator, Frequency::Duration(300)); + // TODO: get this to work // set the gas price to 10 gwei, so we need to escalate twice // this works but the tx still goes through regardless of its gas price for some reason // reqwest::Client::new()