From 5be8e8f8bd39110c884fdb5740ad1a653af1dda7 Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Mon, 9 Oct 2023 12:07:08 +0200 Subject: [PATCH] [Driver] Allow stale liquidity for quote requests (#1924) # Description When giving production traffic to the colocated setup, we noticed a lot of price estimators (even fast ones like baseline) getting rate limited frequently. This is due to the driver requesting liquidity for the most recent block and blocking on fetching it if it's not available. For the legacy setup, there was a code path specifically for quotes, allowing to use a "recent" instead of the latest block for fetching liquidity. This PR recreates this path for the co-located setup # Changes - Adds a flag to the liquidity fetcher indicating whether stale liquidity is allowed - If this flag is set, quote for _recent_ instead of _latest_ block (which should already be cached) ## How to test Run this benchmark script against before and after: ```sh SECONDS=0 while (( SECONDS < 60 )); do time curl -H 'content-type: application/json' --data '{"from": "0xc3792470cee7e0d42c2be8e9552bd651766c5178","buyToken": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48","sellToken": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2","kind": "sell","sellAmountAfterFee": "100000000000000000", "quality":"optimal"}' http://localhost:8080/api/v1/quote done ``` observe http://localhost:11088/metrics in both cases and look for liquidity cache hits: Before: ``` driver_recent_block_cache_hits{cache_type="uniswapv2"} 34300 driver_recent_block_cache_misses{cache_type="uniswapv2"} 840 ``` After ``` driver_recent_block_cache_hits{cache_type="uniswapv2"} 39200 driver_recent_block_cache_misses{cache_type="uniswapv2"} 140 ``` Note that we now only have 1 cache miss (140 pools) on cold start vs 1 cache miss on each new block and higher overall throughput ## Related issues #1672 --------- Co-authored-by: Nicholas Rodrigues Lordello --- crates/driver/src/boundary/liquidity/mod.rs | 14 ++-- crates/driver/src/domain/competition/mod.rs | 9 ++- crates/driver/src/domain/quote.rs | 6 +- crates/driver/src/infra/liquidity/fetcher.rs | 24 +++++- crates/driver/src/infra/liquidity/mod.rs | 5 +- crates/e2e/src/setup/onchain_components.rs | 41 ++++++++++ crates/e2e/tests/e2e/colocation_quoting.rs | 85 ++++++++++++++++++++ crates/e2e/tests/e2e/main.rs | 1 + 8 files changed, 175 insertions(+), 10 deletions(-) create mode 100644 crates/e2e/tests/e2e/colocation_quoting.rs diff --git a/crates/driver/src/boundary/liquidity/mod.rs b/crates/driver/src/boundary/liquidity/mod.rs index 956affc30e..27c2ff4e7d 100644 --- a/crates/driver/src/boundary/liquidity/mod.rs +++ b/crates/driver/src/boundary/liquidity/mod.rs @@ -129,6 +129,7 @@ impl Fetcher { pub async fn fetch( &self, pairs: &HashSet, + block: infra::liquidity::AtBlock, ) -> Result> { let pairs = pairs .iter() @@ -137,12 +138,15 @@ impl Fetcher { TokenPair::new(a.into(), b.into()).expect("a != b") }) .collect(); - let block_number = self.blocks.borrow().number; - let liquidity = self - .inner - .get_liquidity(pairs, recent_block_cache::Block::Number(block_number)) - .await?; + let block = match block { + infra::liquidity::AtBlock::Recent => recent_block_cache::Block::Recent, + infra::liquidity::AtBlock::Latest => { + let block_number = self.blocks.borrow().number; + recent_block_cache::Block::Number(block_number) + } + }; + let liquidity = self.inner.get_liquidity(pairs, block).await?; let liquidity = liquidity .into_iter() diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index 048793bb55..68bbbf1393 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -48,7 +48,14 @@ impl Competition { /// Solve an auction as part of this competition. pub async fn solve(&self, auction: &Auction) -> Result { let liquidity = match self.solver.liquidity() { - solver::Liquidity::Fetch => self.liquidity.fetch(&auction.liquidity_pairs()).await, + solver::Liquidity::Fetch => { + self.liquidity + .fetch( + &auction.liquidity_pairs(), + infra::liquidity::AtBlock::Latest, + ) + .await + } solver::Liquidity::Skip => Default::default(), }; diff --git a/crates/driver/src/domain/quote.rs b/crates/driver/src/domain/quote.rs index 25c099eb66..c3c8c51808 100644 --- a/crates/driver/src/domain/quote.rs +++ b/crates/driver/src/domain/quote.rs @@ -75,7 +75,11 @@ impl Order { tokens: &infra::tokens::Fetcher, ) -> Result { let liquidity = match solver.liquidity() { - solver::Liquidity::Fetch => liquidity.fetch(&self.liquidity_pairs()).await, + solver::Liquidity::Fetch => { + liquidity + .fetch(&self.liquidity_pairs(), infra::liquidity::AtBlock::Recent) + .await + } solver::Liquidity::Skip => Default::default(), }; let timeout = self.deadline.timeout()?; diff --git a/crates/driver/src/infra/liquidity/fetcher.rs b/crates/driver/src/infra/liquidity/fetcher.rs index 6068d7fb0b..71d2702cf9 100644 --- a/crates/driver/src/infra/liquidity/fetcher.rs +++ b/crates/driver/src/infra/liquidity/fetcher.rs @@ -13,6 +13,22 @@ pub struct Fetcher { inner: Arc, } +/// Specifies at which block liquidity should be fetched. +pub enum AtBlock { + /// Fetches liquidity at a recent block. This will prefer reusing cached + /// liquidity even if it is stale by a few blocks instead of fetching the + /// absolute latest state from the blockchain. + /// + /// This is useful for quoting where we want an up-to-date, but not + /// necessarily exactly correct price. In the context of quote verification, + /// this is completely fine as the exactly input and output amounts will be + /// computed anyway. At worse, we might provide a slightly sub-optimal + /// route in some cases, but this is an acceptable trade-off. + Recent, + /// Fetches liquidity liquidity for the latest state of the blockchain. + Latest, +} + impl Fetcher { /// Creates a new liquidity fetcher for the specified Ethereum instance and /// configuration. @@ -25,9 +41,13 @@ impl Fetcher { /// Fetches all relevant liquidity for the specified token pairs. Handles /// failures by logging and returning an empty vector. - pub async fn fetch(&self, pairs: &HashSet) -> Vec { + pub async fn fetch( + &self, + pairs: &HashSet, + block: AtBlock, + ) -> Vec { observe::fetching_liquidity(); - match self.inner.fetch(pairs).await { + match self.inner.fetch(pairs, block).await { Ok(liquidity) => { observe::fetched_liquidity(&liquidity); liquidity diff --git a/crates/driver/src/infra/liquidity/mod.rs b/crates/driver/src/infra/liquidity/mod.rs index 05c0f99234..e709c71b30 100644 --- a/crates/driver/src/infra/liquidity/mod.rs +++ b/crates/driver/src/infra/liquidity/mod.rs @@ -13,4 +13,7 @@ pub mod config; pub mod fetcher; -pub use self::{config::Config, fetcher::Fetcher}; +pub use self::{ + config::Config, + fetcher::{AtBlock, Fetcher}, +}; diff --git a/crates/e2e/src/setup/onchain_components.rs b/crates/e2e/src/setup/onchain_components.rs index b86e682187..be85900a1b 100644 --- a/crates/e2e/src/setup/onchain_components.rs +++ b/crates/e2e/src/setup/onchain_components.rs @@ -12,6 +12,7 @@ use { order::Hook, signature::{EcdsaSignature, EcdsaSigningScheme}, DomainSeparator, + TokenPair, }, secp256k1::SecretKey, shared::ethrpc::Web3, @@ -505,6 +506,46 @@ impl OnchainComponents { tokens } + /// Mints `amount` tokens to its `token`-WETH Uniswap V2 pool. + /// + /// This can be used to modify the pool reserves during a test. + pub async fn mint_token_to_weth_uni_v2_pool(&self, token: &MintableToken, amount: U256) { + let pair = contracts::IUniswapLikePair::at( + &self.web3, + self.contracts + .uniswap_v2_factory + .get_pair(self.contracts.weth.address(), token.address()) + .call() + .await + .expect("failed to get Uniswap V2 pair"), + ); + assert!(!pair.address().is_zero(), "Uniswap V2 pair is not deployed"); + + // Mint amount + 1 to the pool, and then swap out 1 of the minted token + // in order to force it to update its K-value. + token.mint(pair.address(), amount + 1).await; + let (out0, out1) = if TokenPair::new(self.contracts.weth.address(), token.address()) + .unwrap() + .get() + .0 + == token.address() + { + (1, 0) + } else { + (0, 1) + }; + pair.swap( + out0.into(), + out1.into(), + token.minter.address(), + Default::default(), + ) + .from(token.minter.clone()) + .send() + .await + .expect("Uniswap V2 pair couldn't mint"); + } + pub async fn deploy_cow_token(&self, holder: Account, supply: U256) -> CowToken { let contract = CowProtocolToken::builder(&self.web3, holder.address(), holder.address(), supply) diff --git a/crates/e2e/tests/e2e/colocation_quoting.rs b/crates/e2e/tests/e2e/colocation_quoting.rs new file mode 100644 index 0000000000..b2fb9510a0 --- /dev/null +++ b/crates/e2e/tests/e2e/colocation_quoting.rs @@ -0,0 +1,85 @@ +use { + e2e::{setup::*, tx, tx_value}, + ethcontract::U256, + model::quote::{OrderQuoteRequest, OrderQuoteSide, SellAmount}, + number::nonzero::U256 as NonZeroU256, + shared::ethrpc::Web3, +}; + +#[tokio::test] +#[ignore] +async fn local_node_uses_stale_liquidity() { + run_test(uses_stale_liquidity).await; +} + +async fn uses_stale_liquidity(web3: Web3) { + tracing::info!("Setting up chain state."); + let mut onchain = OnchainComponents::deploy(web3.clone()).await; + + let [solver] = onchain.make_solvers(to_wei(10)).await; + let [trader] = onchain.make_accounts(to_wei(2)).await; + let [token] = onchain + .deploy_tokens_with_weth_uni_v2_pools(to_wei(1_000), to_wei(1_000)) + .await; + + tx!( + trader.account(), + onchain + .contracts() + .weth + .approve(onchain.contracts().allowance, to_wei(1)) + ); + tx_value!( + trader.account(), + to_wei(1), + onchain.contracts().weth.deposit() + ); + + tracing::info!("Starting services."); + let solver_endpoint = colocation::start_solver(onchain.contracts().weth.address()).await; + colocation::start_driver(onchain.contracts(), &solver_endpoint, &solver); + + let services = Services::new(onchain.contracts()).await; + services.start_autopilot(vec![ + "--enable-colocation=true".to_string(), + "--drivers=http://localhost:11088/test_solver".to_string(), + ]); + services + .start_api(vec![ + "--price-estimation-drivers=solver|http://localhost:11088/test_solver".to_string(), + ]) + .await; + + let quote = OrderQuoteRequest { + from: trader.address(), + sell_token: onchain.contracts().weth.address(), + buy_token: token.address(), + side: OrderQuoteSide::Sell { + sell_amount: SellAmount::AfterFee { + value: NonZeroU256::new(to_wei(1)).unwrap(), + }, + }, + ..Default::default() + }; + + tracing::info!("performining initial quote"); + let first = services.submit_quote("e).await.unwrap(); + + // Now, we want to manually unbalance the pools and assert that the quote + // doesn't change (as the price estimation will use stale pricing data). + onchain + .mint_token_to_weth_uni_v2_pool(&token, to_wei(1_000)) + .await; + + tracing::info!("performining second quote, which should match first"); + let second = services.submit_quote("e).await.unwrap(); + assert_eq!(first.quote.buy_amount, second.quote.buy_amount); + + tracing::info!("waiting for liquidity state to update"); + wait_for_condition(TIMEOUT, || async { + let next = services.submit_quote("e).await.unwrap(); + next.quote.buy_amount != first.quote.buy_amount + }) + .await + .unwrap(); +} diff --git a/crates/e2e/tests/e2e/main.rs b/crates/e2e/tests/e2e/main.rs index 198ba7f6d3..042ad5a072 100644 --- a/crates/e2e/tests/e2e/main.rs +++ b/crates/e2e/tests/e2e/main.rs @@ -8,6 +8,7 @@ mod app_data; mod colocation_ethflow; mod colocation_hooks; mod colocation_partial_fill; +mod colocation_quoting; mod colocation_univ2; mod database; mod eth_integration;