From 39f4f4a51236aa300854d7f8996f90888948a18c Mon Sep 17 00:00:00 2001 From: Felix Leupold Date: Mon, 4 Dec 2023 11:36:47 +0100 Subject: [PATCH] Update settlement contract balances in background task (#2108) # Description Addresses the bug that we currently fetch settlement buffer balances once per token and don't update them in between auctions, which can lead to solutions relying on outdate internal buffer balance information and thus incorrectly marking interactions as internalizable causing simulation to fail. # Changes - [ ] Add an e2e showcasing the problematic behavior - [ ] Spawn a task when creating a new token fetcher, which on arrival of new blocks update the token balances of all cached tokens. This approach is a bit wasteful, as token balances only really change when a settlement trading this token is included on-chain. However, getting that information in this component is non-trivial. Moreover, some tokens exhibit custom balance logic (e.g. rebasing tokens) which could update the contract balance even without a settlement taking place. If we deem this approach too inefficient, I would suggest we restrict the number of tokens we cache (using an LRU cache) and re-fetch all relevant information (including decimals and symbol) in case of a cache miss. If we don't want to lose caching of decimals and symbols for some tokens, I'd suggest splitting the fetcher into two components, one for static information, and one more `RecentBlockCache` like component for the settlement balances. Let me know what you think. ## How to test Added an end to end test which fails without the changes in `driver::infra::tokens` ## Related Issues Fixes #2093 --- crates/driver/src/infra/tokens.rs | 69 ++++++++++++- crates/e2e/src/setup/colocation.rs | 2 +- crates/e2e/tests/e2e/colocation_buffers.rs | 108 +++++++++++++++++++++ crates/e2e/tests/e2e/main.rs | 1 + 4 files changed, 177 insertions(+), 3 deletions(-) create mode 100644 crates/e2e/tests/e2e/colocation_buffers.rs diff --git a/crates/driver/src/infra/tokens.rs b/crates/driver/src/infra/tokens.rs index 3deb3e6bcc..f255943bf6 100644 --- a/crates/driver/src/infra/tokens.rs +++ b/crates/driver/src/infra/tokens.rs @@ -3,10 +3,13 @@ use { domain::eth, infra::{blockchain, Ethereum}, }, + ethrpc::current_block::{self, CurrentBlockStream}, + futures::StreamExt, std::{ collections::{HashMap, HashSet}, sync::{Arc, RwLock}, }, + tracing::Instrument, }; #[derive(Clone, Debug)] @@ -22,10 +25,16 @@ pub struct Fetcher(Arc); impl Fetcher { pub fn new(eth: Ethereum) -> Self { - Self(Arc::new(Inner { + let block_stream = eth.current_block().clone(); + let inner = Arc::new(Inner { eth, cache: RwLock::new(HashMap::new()), - })) + }); + tokio::task::spawn( + update_task(block_stream, Arc::downgrade(&inner)) + .instrument(tracing::info_span!("token_fetcher")), + ); + Self(inner) } /// Returns the `Metadata` for the given tokens. Note that the result will @@ -39,6 +48,62 @@ impl Fetcher { } } +/// Runs a single cache update cycle whenever a new block arrives until the +/// fetcher is dropped. +async fn update_task(blocks: CurrentBlockStream, inner: std::sync::Weak) { + let mut stream = current_block::into_stream(blocks); + while stream.next().await.is_some() { + let inner = match inner.upgrade() { + Some(inner) => inner, + // Fetcher was dropped, stop update task. + None => break, + }; + if let Err(err) = update_balances(inner).await { + tracing::warn!(?err, "error updating token cache"); + } + } +} + +/// Updates the settlement contract's balance for every cached token. +async fn update_balances(inner: Arc) -> Result<(), blockchain::Error> { + let settlement = inner.eth.contracts().settlement().address().into(); + let futures = { + let cache = inner.cache.read().unwrap(); + let tokens = cache.keys().cloned().collect::>(); + tracing::debug!( + tokens = tokens.len(), + "updating settlement contract balances" + ); + tokens.into_iter().map(|token| { + let erc20 = inner.eth.erc20(token); + async move { + Ok::<(eth::TokenAddress, eth::TokenAmount), blockchain::Error>(( + token, + erc20.balance(settlement).await?, + )) + } + }) + }; + + // Don't hold on to the lock while fetching balances to allow concurrent + // updates. This may lead to new entries arriving in the meantime, however + // their balances should already be up-to-date. + let mut balances = futures::future::try_join_all(futures) + .await? + .into_iter() + .collect::>(); + + let mut cache = inner.cache.write().unwrap(); + for (key, entry) in cache.iter_mut() { + if let Some(balance) = balances.remove(key) { + entry.balance = balance; + } else { + tracing::info!(?key, "key without balance update"); + } + } + Ok(()) +} + /// Provides metadata of tokens. struct Inner { eth: Ethereum, diff --git a/crates/e2e/src/setup/colocation.rs b/crates/e2e/src/setup/colocation.rs index 7981d3ede0..9255802446 100644 --- a/crates/e2e/src/setup/colocation.rs +++ b/crates/e2e/src/setup/colocation.rs @@ -11,7 +11,7 @@ pub async fn start_solver(weth: H160) -> Url { r#" weth = "{weth:?}" base-tokens = [] -max-hops = 0 +max-hops = 1 max-partial-attempts = 5 risk-parameters = [0,0,0,0] "#, diff --git a/crates/e2e/tests/e2e/colocation_buffers.rs b/crates/e2e/tests/e2e/colocation_buffers.rs new file mode 100644 index 0000000000..d403163967 --- /dev/null +++ b/crates/e2e/tests/e2e/colocation_buffers.rs @@ -0,0 +1,108 @@ +use { + e2e::{setup::*, tx}, + ethcontract::prelude::U256, + model::{ + order::{OrderCreation, OrderKind}, + signature::EcdsaSigningScheme, + }, + secp256k1::SecretKey, + shared::ethrpc::Web3, + web3::signing::SecretKeyRef, +}; + +#[tokio::test] +#[ignore] +async fn local_node_buffers() { + run_test(onchain_settlement_without_liquidity).await; +} + +async fn onchain_settlement_without_liquidity(web3: Web3) { + let mut onchain = OnchainComponents::deploy(web3).await; + + let [solver] = onchain.make_solvers(to_wei(1)).await; + let [trader] = onchain.make_accounts(to_wei(1)).await; + let [token_a, token_b] = onchain + .deploy_tokens_with_weth_uni_v2_pools(to_wei(1_000), to_wei(1_000)) + .await; + + // Fund trader, settlement accounts, and pool creation + token_a.mint(trader.address(), to_wei(100)).await; + token_b + .mint(onchain.contracts().gp_settlement.address(), to_wei(5)) + .await; + token_a.mint(solver.address(), to_wei(1000)).await; + token_b.mint(solver.address(), to_wei(1000)).await; + + // Approve GPv2 for trading + tx!( + trader.account(), + token_a.approve(onchain.contracts().allowance, to_wei(100)) + ); + + // Start system + let solver_endpoint = colocation::start_solver(onchain.contracts().weth.address()).await; + colocation::start_driver(onchain.contracts(), &solver_endpoint, &solver); + + let services = Services::new(onchain.contracts()).await; + services.start_autopilot(vec![ + "--enable-colocation=true".to_string(), + format!( + "--trusted-tokens={weth:#x},{token_a:#x},{token_b:#x}", + weth = onchain.contracts().weth.address(), + token_a = token_a.address(), + token_b = token_b.address() + ), + "--drivers=test_solver|http://localhost:11088/test_solver".to_string(), + ]); + services.start_api(vec![]).await; + + // Place Order + let order = OrderCreation { + sell_token: token_a.address(), + sell_amount: to_wei(9), + fee_amount: to_wei(1), + buy_token: token_b.address(), + buy_amount: to_wei(5), + valid_to: model::time::now_in_epoch_seconds() + 300, + kind: OrderKind::Buy, + ..Default::default() + } + .sign( + EcdsaSigningScheme::Eip712, + &onchain.contracts().domain_separator, + SecretKeyRef::from(&SecretKey::from_slice(trader.private_key()).unwrap()), + ); + services.create_order(&order).await.unwrap(); + + tracing::info!("waiting for first trade"); + let trade_happened = + || async { token_b.balance_of(trader.address()).call().await.unwrap() == order.buy_amount }; + wait_for_condition(TIMEOUT, trade_happened).await.unwrap(); + + // Check that settlement buffers were traded. + let settlement_contract_balance = token_b + .balance_of(onchain.contracts().gp_settlement.address()) + .call() + .await + .unwrap(); + // Check that internal buffers were used + assert!(settlement_contract_balance == 0.into()); + + // Same order can trade again with external liquidity + let order = OrderCreation { + valid_to: model::time::now_in_epoch_seconds() + 301, + ..order + } + .sign( + EcdsaSigningScheme::Eip712, + &onchain.contracts().domain_separator, + SecretKeyRef::from(&SecretKey::from_slice(trader.private_key()).unwrap()), + ); + services.create_order(&order).await.unwrap(); + + tracing::info!("waiting for second trade"); + let trade_happened = || async { + token_b.balance_of(trader.address()).call().await.unwrap() == order.buy_amount * 2 + }; + wait_for_condition(TIMEOUT, trade_happened).await.unwrap(); +} diff --git a/crates/e2e/tests/e2e/main.rs b/crates/e2e/tests/e2e/main.rs index 4cd9f960f6..5a991b1fcd 100644 --- a/crates/e2e/tests/e2e/main.rs +++ b/crates/e2e/tests/e2e/main.rs @@ -6,6 +6,7 @@ // Each of the following modules contains tests. mod app_data; mod app_data_signer; +mod colocation_buffers; mod colocation_ethflow; mod colocation_hooks; mod colocation_partial_fill;