Skip to content

Commit

Permalink
Refactor onchain settlement updater to use block stream (#1972)
Browse files Browse the repository at this point in the history
# Description
The added e2e test adds ~10s in unnecessary total time to the test
suite, because the onchain settlement updater runs on its own schedule
(which is hardcoded at 10s). My first thought was to make this schedule
configurable (and set it to 1s in the test), but it seems more correct
for this to also be driven by new blocks incoming.

This PR refactors the component to use `CurrentBlockStream` instead of
its own polling.

# Changes

- Use CurrentBlockStream instead of hardcoded polling loop

One subtlety is that the SettlementUpdater is not reading the tx_hash
from the onchain settlement event, but from the DB instead. This means
that the Settlement has to be processed first (and then the competition
data may only be updated in the next run since it doesn't yet see the
settlement when it runs). This is not an issue in practice (submission
data will be written to the DB once the next block arrives). For the For
the e2e test, we now add extra `evm_mine` in the
`wait_for_condition_block` to simulate the chain advancing.

I was thinking to either add a concept of dependencies into the
different jobs that run when a new block arrive, or put the settlement
updating code on the same codepath that processes the original
settlement events, but both seem to be involved refactorings for fairly
little benefit imo.

## How to test
e2e tests now run faster

## Related Issues

Improves test from #1959
  • Loading branch information
fleupold authored Oct 20, 2023
1 parent 1044860 commit bb8b0fb
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 23 deletions.
49 changes: 28 additions & 21 deletions crates/autopilot/src/on_settlement_event_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ use {
anyhow::{anyhow, Context, Result},
contracts::GPv2Settlement,
database::byte_array::ByteArray,
ethrpc::{current_block::CurrentBlockStream, Web3},
ethrpc::{
current_block::{into_stream, CurrentBlockStream},
Web3,
},
futures::StreamExt,
primitive_types::{H160, H256},
shared::{event_handling::MAX_REORG_BLOCK_COUNT, external_prices::ExternalPrices},
sqlx::PgConnection,
std::time::Duration,
web3::types::{Transaction, TransactionId},
};

Expand All @@ -54,28 +57,40 @@ pub struct OnSettlementEventUpdater {
pub contract: GPv2Settlement,
pub native_token: H160,
pub db: Postgres,
pub current_block: CurrentBlockStream,
}

impl OnSettlementEventUpdater {
pub async fn run_forever(self) -> ! {
pub async fn run_forever(self, block_stream: CurrentBlockStream) -> ! {
let mut current_block = *block_stream.borrow();
let mut block_stream = into_stream(block_stream);
loop {
match self.update().await {
Ok(true) => (),
Ok(false) => tokio::time::sleep(Duration::from_secs(10)).await,
match self.update(current_block.number).await {
Ok(true) => {
tracing::debug!(
block = current_block.number,
"on settlement event updater ran and processed event"
);
// Don't wait until next block in case there are more pending events to process.
continue;
}
Ok(false) => {
tracing::debug!(
block = current_block.number,
"on settlement event updater ran without update"
);
}
Err(err) => {
tracing::error!(?err, "on settlement event update task failed");
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
current_block = block_stream.next().await.expect("blockchains never end");
}
}

/// Update database for settlement events that have not been processed yet.
///
/// Returns whether an update was performed.
async fn update(&self) -> Result<bool> {
let current_block = self.current_block.borrow().number;
async fn update(&self, current_block: u64) -> Result<bool> {
let reorg_safe_block: i64 = current_block
.checked_sub(MAX_REORG_BLOCK_COUNT)
.context("no reorg safe block")?
Expand Down Expand Up @@ -282,7 +297,6 @@ mod tests {
super::*,
database::{auction_prices::AuctionPrice, settlement_observations::Observation},
sqlx::Executor,
std::sync::Arc,
};

#[tokio::test]
Expand All @@ -294,24 +308,17 @@ mod tests {
database::clear_DANGER(&db.0).await.unwrap();
let transport = shared::ethrpc::create_env_test_transport();
let web3 = Web3::new(transport);
let current_block = ethrpc::current_block::current_block_stream(
Arc::new(web3.clone()),
Duration::from_secs(1),
)
.await
.unwrap();

let contract = contracts::GPv2Settlement::deployed(&web3).await.unwrap();
let native_token = contracts::WETH9::deployed(&web3).await.unwrap().address();
let updater = OnSettlementEventUpdater {
web3,
db,
native_token,
current_block,
contract,
};

assert!(!updater.update().await.unwrap());
assert!(!updater.update(15875900).await.unwrap());

let query = r"
INSERT INTO settlements (block_number, log_index, solver, tx_hash, tx_from, tx_nonce)
Expand All @@ -334,7 +341,7 @@ VALUES (0, '\x056fd409e1d7a124bd7017459dfea2f387b6d5cd', 63477957279334750883433

updater.db.0.execute(query).await.unwrap();

assert!(updater.update().await.unwrap());
assert!(updater.update(15875900).await.unwrap());

let query = r"
SELECT tx_from, tx_nonce
Expand Down Expand Up @@ -384,6 +391,6 @@ FROM auction_transaction
assert_eq!(observation.effective_gas_price, 19789368758u64.into());
assert_eq!(observation.surplus, 5150444803867862u64.into());

assert!(!updater.update().await.unwrap());
assert!(!updater.update(15875900).await.unwrap());
}
}
3 changes: 1 addition & 2 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,10 @@ pub async fn run(args: Arguments) {
contract: settlement_contract,
native_token: native_token.address(),
db: db.clone(),
current_block: current_block_stream.clone(),
};
tokio::task::spawn(
on_settlement_event_updater
.run_forever()
.run_forever(current_block_stream.clone())
.instrument(tracing::info_span!("on_settlement_event_updater")),
);

Expand Down
9 changes: 9 additions & 0 deletions crates/e2e/src/setup/onchain_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,15 @@ impl OnchainComponents {
}
}

pub async fn mint_block(&self) {
tracing::info!("mining block");
self.web3
.transport()
.execute("evm_mine", vec![])
.await
.unwrap();
}

pub fn contracts(&self) -> &Contracts {
&self.contracts
}
Expand Down
1 change: 1 addition & 0 deletions crates/e2e/tests/e2e/colocation_partial_fill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async fn test(web3: Web3) {
onchain.mint_blocks_past_reorg_threshold().await;

let settlement_event_processed = || async {
onchain.mint_block().await;
let order = services.get_order(&uid).await.unwrap();
if let OrderClass::Limit(LimitOrderClass {
executed_surplus_fee,
Expand Down
1 change: 1 addition & 0 deletions crates/e2e/tests/e2e/partially_fillable_observed_score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ async fn test(web3: Web3) {
tracing::info!("waiting for solver competitions to get indexed");
let competitions_indexed = || {
futures::stream::iter(&trades).all(|trade| async {
onchain.mint_block().await;
services
.get_solver_competition(trade.tx_hash.unwrap())
.await
Expand Down

0 comments on commit bb8b0fb

Please sign in to comment.