From bb8b0fbeccf4f88c560fe85658e823e830a85c63 Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Fri, 20 Oct 2023 14:50:28 +0200 Subject: [PATCH] Refactor onchain settlement updater to use block stream (#1972) # Description The added e2e test adds ~10s in unnecessary total time to the test suite, because the onchain settlement updater runs on its own schedule (which is hardcoded at 10s). My first thought was to make this schedule configurable (and set it to 1s in the test), but it seems more correct for this to also be driven by new blocks incoming. This PR refactors the component to use `CurrentBlockStream` instead of its own polling. # Changes - Use CurrentBlockStream instead of hardcoded polling loop One subtlety is that the SettlementUpdater is not reading the tx_hash from the onchain settlement event, but from the DB instead. This means that the Settlement has to be processed first (and then the competition data may only be updated in the next run since it doesn't yet see the settlement when it runs). This is not an issue in practice (submission data will be written to the DB once the next block arrives). For the For the e2e test, we now add extra `evm_mine` in the `wait_for_condition_block` to simulate the chain advancing. I was thinking to either add a concept of dependencies into the different jobs that run when a new block arrive, or put the settlement updating code on the same codepath that processes the original settlement events, but both seem to be involved refactorings for fairly little benefit imo. ## How to test e2e tests now run faster ## Related Issues Improves test from #1959 --- .../src/on_settlement_event_updater.rs | 49 +++++++++++-------- crates/autopilot/src/run.rs | 3 +- crates/e2e/src/setup/onchain_components.rs | 9 ++++ .../e2e/tests/e2e/colocation_partial_fill.rs | 1 + .../e2e/partially_fillable_observed_score.rs | 1 + 5 files changed, 40 insertions(+), 23 deletions(-) diff --git a/crates/autopilot/src/on_settlement_event_updater.rs b/crates/autopilot/src/on_settlement_event_updater.rs index 6a96c8eb15..cc3c1a306e 100644 --- a/crates/autopilot/src/on_settlement_event_updater.rs +++ b/crates/autopilot/src/on_settlement_event_updater.rs @@ -41,11 +41,14 @@ use { anyhow::{anyhow, Context, Result}, contracts::GPv2Settlement, database::byte_array::ByteArray, - ethrpc::{current_block::CurrentBlockStream, Web3}, + ethrpc::{ + current_block::{into_stream, CurrentBlockStream}, + Web3, + }, + futures::StreamExt, primitive_types::{H160, H256}, shared::{event_handling::MAX_REORG_BLOCK_COUNT, external_prices::ExternalPrices}, sqlx::PgConnection, - std::time::Duration, web3::types::{Transaction, TransactionId}, }; @@ -54,28 +57,40 @@ pub struct OnSettlementEventUpdater { pub contract: GPv2Settlement, pub native_token: H160, pub db: Postgres, - pub current_block: CurrentBlockStream, } impl OnSettlementEventUpdater { - pub async fn run_forever(self) -> ! { + pub async fn run_forever(self, block_stream: CurrentBlockStream) -> ! { + let mut current_block = *block_stream.borrow(); + let mut block_stream = into_stream(block_stream); loop { - match self.update().await { - Ok(true) => (), - Ok(false) => tokio::time::sleep(Duration::from_secs(10)).await, + match self.update(current_block.number).await { + Ok(true) => { + tracing::debug!( + block = current_block.number, + "on settlement event updater ran and processed event" + ); + // Don't wait until next block in case there are more pending events to process. + continue; + } + Ok(false) => { + tracing::debug!( + block = current_block.number, + "on settlement event updater ran without update" + ); + } Err(err) => { tracing::error!(?err, "on settlement event update task failed"); - tokio::time::sleep(Duration::from_secs(10)).await; } } + current_block = block_stream.next().await.expect("blockchains never end"); } } /// Update database for settlement events that have not been processed yet. /// /// Returns whether an update was performed. - async fn update(&self) -> Result { - let current_block = self.current_block.borrow().number; + async fn update(&self, current_block: u64) -> Result { let reorg_safe_block: i64 = current_block .checked_sub(MAX_REORG_BLOCK_COUNT) .context("no reorg safe block")? @@ -282,7 +297,6 @@ mod tests { super::*, database::{auction_prices::AuctionPrice, settlement_observations::Observation}, sqlx::Executor, - std::sync::Arc, }; #[tokio::test] @@ -294,12 +308,6 @@ mod tests { database::clear_DANGER(&db.0).await.unwrap(); let transport = shared::ethrpc::create_env_test_transport(); let web3 = Web3::new(transport); - let current_block = ethrpc::current_block::current_block_stream( - Arc::new(web3.clone()), - Duration::from_secs(1), - ) - .await - .unwrap(); let contract = contracts::GPv2Settlement::deployed(&web3).await.unwrap(); let native_token = contracts::WETH9::deployed(&web3).await.unwrap().address(); @@ -307,11 +315,10 @@ mod tests { web3, db, native_token, - current_block, contract, }; - assert!(!updater.update().await.unwrap()); + assert!(!updater.update(15875900).await.unwrap()); let query = r" INSERT INTO settlements (block_number, log_index, solver, tx_hash, tx_from, tx_nonce) @@ -334,7 +341,7 @@ VALUES (0, '\x056fd409e1d7a124bd7017459dfea2f387b6d5cd', 63477957279334750883433 updater.db.0.execute(query).await.unwrap(); - assert!(updater.update().await.unwrap()); + assert!(updater.update(15875900).await.unwrap()); let query = r" SELECT tx_from, tx_nonce @@ -384,6 +391,6 @@ FROM auction_transaction assert_eq!(observation.effective_gas_price, 19789368758u64.into()); assert_eq!(observation.surplus, 5150444803867862u64.into()); - assert!(!updater.update().await.unwrap()); + assert!(!updater.update(15875900).await.unwrap()); } } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index e592d243e5..36ba724eea 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -575,11 +575,10 @@ pub async fn run(args: Arguments) { contract: settlement_contract, native_token: native_token.address(), db: db.clone(), - current_block: current_block_stream.clone(), }; tokio::task::spawn( on_settlement_event_updater - .run_forever() + .run_forever(current_block_stream.clone()) .instrument(tracing::info_span!("on_settlement_event_updater")), ); diff --git a/crates/e2e/src/setup/onchain_components.rs b/crates/e2e/src/setup/onchain_components.rs index 9a2fdeca8b..0670dda8c4 100644 --- a/crates/e2e/src/setup/onchain_components.rs +++ b/crates/e2e/src/setup/onchain_components.rs @@ -437,6 +437,15 @@ impl OnchainComponents { } } + pub async fn mint_block(&self) { + tracing::info!("mining block"); + self.web3 + .transport() + .execute("evm_mine", vec![]) + .await + .unwrap(); + } + pub fn contracts(&self) -> &Contracts { &self.contracts } diff --git a/crates/e2e/tests/e2e/colocation_partial_fill.rs b/crates/e2e/tests/e2e/colocation_partial_fill.rs index f780f7596c..64ab50f441 100644 --- a/crates/e2e/tests/e2e/colocation_partial_fill.rs +++ b/crates/e2e/tests/e2e/colocation_partial_fill.rs @@ -102,6 +102,7 @@ async fn test(web3: Web3) { onchain.mint_blocks_past_reorg_threshold().await; let settlement_event_processed = || async { + onchain.mint_block().await; let order = services.get_order(&uid).await.unwrap(); if let OrderClass::Limit(LimitOrderClass { executed_surplus_fee, diff --git a/crates/e2e/tests/e2e/partially_fillable_observed_score.rs b/crates/e2e/tests/e2e/partially_fillable_observed_score.rs index e6178af70e..790fc8e6a6 100644 --- a/crates/e2e/tests/e2e/partially_fillable_observed_score.rs +++ b/crates/e2e/tests/e2e/partially_fillable_observed_score.rs @@ -133,6 +133,7 @@ async fn test(web3: Web3) { tracing::info!("waiting for solver competitions to get indexed"); let competitions_indexed = || { futures::stream::iter(&trades).all(|trade| async { + onchain.mint_block().await; services .get_solver_competition(trade.tx_hash.unwrap()) .await