diff --git a/Cargo.lock b/Cargo.lock index 7a9de100fd..cc06594b97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1540,6 +1540,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "delay_map" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4355c25cbf99edcb6b4a0e906f6bdc6956eda149e84455bea49696429b2f8e8" +dependencies = [ + "futures", + "tokio-util 0.7.8", +] + [[package]] name = "der" version = "0.7.8" @@ -3983,6 +3993,7 @@ dependencies = [ "clap", "contracts", "database", + "delay_map", "derivative", "ethcontract", "ethcontract-mock", @@ -4710,6 +4721,7 @@ dependencies = [ "futures-io", "futures-sink", "pin-project-lite", + "slab", "tokio", "tracing", ] diff --git a/crates/driver/src/boundary/liquidity/swapr.rs b/crates/driver/src/boundary/liquidity/swapr.rs index 0039eebdef..30c4db030b 100644 --- a/crates/driver/src/boundary/liquidity/swapr.rs +++ b/crates/driver/src/boundary/liquidity/swapr.rs @@ -49,6 +49,7 @@ pub async fn collector( &infra::liquidity::config::UniswapV2 { router: config.router, pool_code: config.pool_code, + missing_pool_cache_time: config.missing_pool_cache_time, }, |web3, pair_provider| { SwaprPoolReader(DefaultPoolReader { diff --git a/crates/driver/src/boundary/liquidity/uniswap/v2.rs b/crates/driver/src/boundary/liquidity/uniswap/v2.rs index f5671a3564..672b1581e1 100644 --- a/crates/driver/src/boundary/liquidity/uniswap/v2.rs +++ b/crates/driver/src/boundary/liquidity/uniswap/v2.rs @@ -150,10 +150,11 @@ where init_code_digest: config.pool_code.into(), }; - let pool_fetcher = PoolFetcher { - pool_reader: reader(web3.clone(), pair_provider), - web3: web3.clone(), - }; + let pool_fetcher = PoolFetcher::new( + reader(web3.clone(), pair_provider), + web3.clone(), + config.missing_pool_cache_time, + ); let pool_cache = Arc::new(PoolCache::new( boundary::liquidity::cache_config(), diff --git a/crates/driver/src/infra/config/file/load.rs b/crates/driver/src/infra/config/file/load.rs index ef09da4550..58ec657cd4 100644 --- a/crates/driver/src/infra/config/file/load.rs +++ b/crates/driver/src/infra/config/file/load.rs @@ -4,7 +4,7 @@ use { infra::{self, blockchain, config::file, liquidity, mempool, simulator, solver}, }, futures::future::join_all, - std::path::Path, + std::{path::Path, time::Duration}, tokio::fs, }; @@ -115,12 +115,17 @@ pub async fn load(network: &blockchain::Network, path: &Path) -> infra::Config { } } .expect("no Uniswap V2 preset for current network"), - file::UniswapV2Config::Manual { router, pool_code } => { - liquidity::config::UniswapV2 { - router: router.into(), - pool_code: pool_code.into(), - } - } + file::UniswapV2Config::Manual { + router, + pool_code, + missing_pool_cache_time_seconds, + } => liquidity::config::UniswapV2 { + router: router.into(), + pool_code: pool_code.into(), + missing_pool_cache_time: Duration::from_secs( + missing_pool_cache_time_seconds, + ), + }, }) .collect(), swapr: config @@ -133,9 +138,16 @@ pub async fn load(network: &blockchain::Network, path: &Path) -> infra::Config { file::SwaprPreset::Swapr => liquidity::config::Swapr::swapr(&network.id), } .expect("no Swapr preset for current network"), - file::SwaprConfig::Manual { router, pool_code } => liquidity::config::Swapr { + file::SwaprConfig::Manual { + router, + pool_code, + missing_pool_cache_time_seconds, + } => liquidity::config::Swapr { router: router.into(), pool_code: pool_code.into(), + missing_pool_cache_time: Duration::from_secs( + missing_pool_cache_time_seconds, + ), }, }) .collect(), diff --git a/crates/driver/src/infra/config/file/mod.rs b/crates/driver/src/infra/config/file/mod.rs index 8a6a339a23..504eae157d 100644 --- a/crates/driver/src/infra/config/file/mod.rs +++ b/crates/driver/src/infra/config/file/mod.rs @@ -304,6 +304,11 @@ enum UniswapV2Config { /// The digest of the pool initialization code. pool_code: eth::H256, + + /// How long liquidity should not be fetched for a token pair that + /// didn't return useful liquidity before allowing to fetch it + /// again. + missing_pool_cache_time_seconds: u64, }, } @@ -330,6 +335,11 @@ enum SwaprConfig { /// The digest of the pool initialization code. pool_code: eth::H256, + + /// How long liquidity should not be fetched for a token pair that + /// didn't return useful liquidity before allowing to fetch it + /// again. + missing_pool_cache_time_seconds: u64, }, } diff --git a/crates/driver/src/infra/liquidity/config.rs b/crates/driver/src/infra/liquidity/config.rs index f1b9e05ffc..92f8f2950e 100644 --- a/crates/driver/src/infra/liquidity/config.rs +++ b/crates/driver/src/infra/liquidity/config.rs @@ -1,7 +1,7 @@ use { crate::{domain::eth, infra::blockchain::contracts::deployment_address}, hex_literal::hex, - std::collections::HashSet, + std::{collections::HashSet, time::Duration}, }; /// Configuration options for liquidity fetching. @@ -36,6 +36,9 @@ pub struct UniswapV2 { /// The digest of the pool initialization code. This digest is used for /// computing the deterministic pool addresses per token pair. pub pool_code: eth::CodeDigest, + /// How long liquidity should not be fetched for a token pair that didn't + /// return useful liquidity before allowing to fetch it again. + pub missing_pool_cache_time: Duration, } impl UniswapV2 { @@ -46,6 +49,7 @@ impl UniswapV2 { router: deployment_address(contracts::UniswapV2Router02::raw_contract(), network)?, pool_code: hex!("96e8ac4277198ff8b6f785478aa9a39f403cb768dd02cbee326c3e7da348845f") .into(), + missing_pool_cache_time: Duration::from_secs(60 * 60), }) } @@ -55,6 +59,7 @@ impl UniswapV2 { router: deployment_address(contracts::SushiSwapRouter::raw_contract(), network)?, pool_code: hex!("e18a34eb0e04b04f7a0ac29a6e80748dca96319b42c54d679cb821dca90c6303") .into(), + missing_pool_cache_time: Duration::from_secs(60 * 60), }) } @@ -64,6 +69,7 @@ impl UniswapV2 { router: deployment_address(contracts::HoneyswapRouter::raw_contract(), network)?, pool_code: hex!("3f88503e8580ab941773b59034fb4b2a63e86dbc031b3633a925533ad3ed2b93") .into(), + missing_pool_cache_time: Duration::from_secs(60 * 60), }) } @@ -73,6 +79,7 @@ impl UniswapV2 { router: deployment_address(contracts::BaoswapRouter::raw_contract(), network)?, pool_code: hex!("0bae3ead48c325ce433426d2e8e6b07dac10835baec21e163760682ea3d3520d") .into(), + missing_pool_cache_time: Duration::from_secs(60 * 60), }) } @@ -82,6 +89,7 @@ impl UniswapV2 { router: deployment_address(contracts::PancakeRouter::raw_contract(), network)?, pool_code: hex!("57224589c67f3f30a6b0d7a1b54cf3153ab84563bc609ef41dfb34f8b2974d2d") .into(), + missing_pool_cache_time: Duration::from_secs(60 * 60), }) } } @@ -94,6 +102,9 @@ pub struct Swapr { /// The digest of the pool initialization code. This digest is used for /// computing the deterministic pool addresses per token pair. pub pool_code: eth::CodeDigest, + /// How long liquidity should not be fetched for a token pair that didn't + /// return useful liquidity before allowing to fetch it again. + pub missing_pool_cache_time: Duration, } impl Swapr { @@ -104,6 +115,7 @@ impl Swapr { router: deployment_address(contracts::SwaprRouter::raw_contract(), network)?, pool_code: hex!("d306a548755b9295ee49cc729e13ca4a45e00199bbd890fa146da43a50571776") .into(), + missing_pool_cache_time: Duration::from_secs(60 * 60), }) } } diff --git a/crates/e2e/src/setup/colocation.rs b/crates/e2e/src/setup/colocation.rs index 76115e91ab..7981d3ede0 100644 --- a/crates/e2e/src/setup/colocation.rs +++ b/crates/e2e/src/setup/colocation.rs @@ -55,6 +55,7 @@ base-tokens = [] [[liquidity.uniswap-v2]] router = "{:?}" pool-code = "{:?}" +missing-pool-cache-time-seconds = 3600 [submission] gas-price-cap = 1000000000000 diff --git a/crates/shared/Cargo.toml b/crates/shared/Cargo.toml index 57027e4510..95061d36a4 100644 --- a/crates/shared/Cargo.toml +++ b/crates/shared/Cargo.toml @@ -19,6 +19,7 @@ chrono = { workspace = true, features = ["clock"] } clap = { workspace = true } contracts = { path = "../contracts" } database = { path = "../database" } +delay_map = "0.3" derivative = { workspace = true } ethcontract = { workspace = true } ethcontract-mock = { workspace = true } diff --git a/crates/shared/src/sources/uniswap_v2.rs b/crates/shared/src/sources/uniswap_v2.rs index c3bdb3ae7c..67c444bb8a 100644 --- a/crates/shared/src/sources/uniswap_v2.rs +++ b/crates/shared/src/sources/uniswap_v2.rs @@ -106,6 +106,7 @@ impl UniV2BaselineSourceParameters { let fetcher = pool_fetching::PoolFetcher { pool_reader, web3: web3.clone(), + non_existent_pools: Default::default(), }; Ok(UniV2BaselineSource { router, diff --git a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs index f5599fc205..c8ab088b38 100644 --- a/crates/shared/src/sources/uniswap_v2/pool_fetching.rs +++ b/crates/shared/src/sources/uniswap_v2/pool_fetching.rs @@ -8,6 +8,7 @@ use { }, anyhow::Result, contracts::{IUniswapLikePair, ERC20}, + delay_map::HashSetDelay, ethcontract::{errors::MethodError, BlockId, H160, U256}, futures::{ future::{self, BoxFuture}, @@ -15,7 +16,7 @@ use { }, model::TokenPair, num::rational::Ratio, - std::collections::HashSet, + std::{collections::HashSet, sync::RwLock, time::Duration}, }; const POOL_SWAP_GAS_COST: usize = 60_000; @@ -205,17 +206,15 @@ impl BaselineSolvable for Pool { pub struct PoolFetcher { pub pool_reader: Reader, pub web3: Web3, + pub non_existent_pools: RwLock>, } -impl PoolFetcher { - /// Creates a pool fetcher instance for Uniswap V2 (or an exact clone). - pub fn uniswap(pair_provider: PairProvider, web3: Web3) -> Self { +impl PoolFetcher { + pub fn new(reader: Reader, web3: Web3, cache_time: Duration) -> Self { Self { - pool_reader: DefaultPoolReader { - pair_provider, - web3: web3.clone(), - }, + pool_reader: reader, web3, + non_existent_pools: RwLock::new(HashSetDelay::new(cache_time)), } } } @@ -226,19 +225,36 @@ where Reader: PoolReading, { async fn fetch(&self, token_pairs: HashSet, at_block: Block) -> Result> { + let mut token_pairs: Vec<_> = token_pairs.into_iter().collect(); + { + let non_existent_pools = self.non_existent_pools.read().unwrap(); + token_pairs.retain(|pair| !non_existent_pools.contains_key(pair)); + } let mut batch = Web3CallBatch::new(self.web3.transport().clone()); let block = BlockId::Number(at_block.into()); let futures = token_pairs - .into_iter() - .map(|pair| self.pool_reader.read_state(pair, &mut batch, block)) + .iter() + .map(|pair| self.pool_reader.read_state(*pair, &mut batch, block)) .collect::>(); batch.execute_all(MAX_BATCH_SIZE).await; - future::join_all(futures) - .await - .into_iter() - .filter_map(|pool| pool.transpose()) - .collect() + let results = future::try_join_all(futures).await?; + + let mut new_missing_pairs = vec![]; + let mut pools = vec![]; + for (result, key) in results.into_iter().zip(token_pairs) { + match result { + Some(pool) => pools.push(pool), + None => new_missing_pairs.push(key), + } + } + if !new_missing_pairs.is_empty() { + let mut non_existent_pools = self.non_existent_pools.write().unwrap(); + for pair in new_missing_pairs { + non_existent_pools.insert(pair); + } + } + Ok(pools) } }