Skip to content

Commit

Permalink
Omit requests for known missing pools (#2063)
Browse files Browse the repository at this point in the history
# Description
One of the things that makes liquidity fetching unnecessarily slow at
the moment is that we try to fetch data for pools that we know are
missing over and over again.

# Changes
The lowest level components that fetch balancer and uni v2 liquidity now
keep track of pools that could not be found.
Whenever a request for those pools comes it it gets omitted.

Currently we are disallowing pools forever. Eventually we should have a
background task that periodically forgets those deny-listed pools just
in case somebody deploys a new one.

## Related Issues
#2041
  • Loading branch information
MartinquaXD authored Nov 22, 2023
1 parent 82b01c1 commit 47e3707
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 28 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/driver/src/boundary/liquidity/swapr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub async fn collector(
&infra::liquidity::config::UniswapV2 {
router: config.router,
pool_code: config.pool_code,
missing_pool_cache_time: config.missing_pool_cache_time,
},
|web3, pair_provider| {
SwaprPoolReader(DefaultPoolReader {
Expand Down
9 changes: 5 additions & 4 deletions crates/driver/src/boundary/liquidity/uniswap/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,11 @@ where
init_code_digest: config.pool_code.into(),
};

let pool_fetcher = PoolFetcher {
pool_reader: reader(web3.clone(), pair_provider),
web3: web3.clone(),
};
let pool_fetcher = PoolFetcher::new(
reader(web3.clone(), pair_provider),
web3.clone(),
config.missing_pool_cache_time,
);

let pool_cache = Arc::new(PoolCache::new(
boundary::liquidity::cache_config(),
Expand Down
28 changes: 20 additions & 8 deletions crates/driver/src/infra/config/file/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
infra::{self, blockchain, config::file, liquidity, mempool, simulator, solver},
},
futures::future::join_all,
std::path::Path,
std::{path::Path, time::Duration},
tokio::fs,
};

Expand Down Expand Up @@ -115,12 +115,17 @@ pub async fn load(network: &blockchain::Network, path: &Path) -> infra::Config {
}
}
.expect("no Uniswap V2 preset for current network"),
file::UniswapV2Config::Manual { router, pool_code } => {
liquidity::config::UniswapV2 {
router: router.into(),
pool_code: pool_code.into(),
}
}
file::UniswapV2Config::Manual {
router,
pool_code,
missing_pool_cache_time_seconds,
} => liquidity::config::UniswapV2 {
router: router.into(),
pool_code: pool_code.into(),
missing_pool_cache_time: Duration::from_secs(
missing_pool_cache_time_seconds,
),
},
})
.collect(),
swapr: config
Expand All @@ -133,9 +138,16 @@ pub async fn load(network: &blockchain::Network, path: &Path) -> infra::Config {
file::SwaprPreset::Swapr => liquidity::config::Swapr::swapr(&network.id),
}
.expect("no Swapr preset for current network"),
file::SwaprConfig::Manual { router, pool_code } => liquidity::config::Swapr {
file::SwaprConfig::Manual {
router,
pool_code,
missing_pool_cache_time_seconds,
} => liquidity::config::Swapr {
router: router.into(),
pool_code: pool_code.into(),
missing_pool_cache_time: Duration::from_secs(
missing_pool_cache_time_seconds,
),
},
})
.collect(),
Expand Down
10 changes: 10 additions & 0 deletions crates/driver/src/infra/config/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ enum UniswapV2Config {

/// The digest of the pool initialization code.
pool_code: eth::H256,

/// How long liquidity should not be fetched for a token pair that
/// didn't return useful liquidity before allowing to fetch it
/// again.
missing_pool_cache_time_seconds: u64,
},
}

Expand All @@ -330,6 +335,11 @@ enum SwaprConfig {

/// The digest of the pool initialization code.
pool_code: eth::H256,

/// How long liquidity should not be fetched for a token pair that
/// didn't return useful liquidity before allowing to fetch it
/// again.
missing_pool_cache_time_seconds: u64,
},
}

Expand Down
14 changes: 13 additions & 1 deletion crates/driver/src/infra/liquidity/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{domain::eth, infra::blockchain::contracts::deployment_address},
hex_literal::hex,
std::collections::HashSet,
std::{collections::HashSet, time::Duration},
};

/// Configuration options for liquidity fetching.
Expand Down Expand Up @@ -36,6 +36,9 @@ pub struct UniswapV2 {
/// The digest of the pool initialization code. This digest is used for
/// computing the deterministic pool addresses per token pair.
pub pool_code: eth::CodeDigest,
/// How long liquidity should not be fetched for a token pair that didn't
/// return useful liquidity before allowing to fetch it again.
pub missing_pool_cache_time: Duration,
}

impl UniswapV2 {
Expand All @@ -46,6 +49,7 @@ impl UniswapV2 {
router: deployment_address(contracts::UniswapV2Router02::raw_contract(), network)?,
pool_code: hex!("96e8ac4277198ff8b6f785478aa9a39f403cb768dd02cbee326c3e7da348845f")
.into(),
missing_pool_cache_time: Duration::from_secs(60 * 60),
})
}

Expand All @@ -55,6 +59,7 @@ impl UniswapV2 {
router: deployment_address(contracts::SushiSwapRouter::raw_contract(), network)?,
pool_code: hex!("e18a34eb0e04b04f7a0ac29a6e80748dca96319b42c54d679cb821dca90c6303")
.into(),
missing_pool_cache_time: Duration::from_secs(60 * 60),
})
}

Expand All @@ -64,6 +69,7 @@ impl UniswapV2 {
router: deployment_address(contracts::HoneyswapRouter::raw_contract(), network)?,
pool_code: hex!("3f88503e8580ab941773b59034fb4b2a63e86dbc031b3633a925533ad3ed2b93")
.into(),
missing_pool_cache_time: Duration::from_secs(60 * 60),
})
}

Expand All @@ -73,6 +79,7 @@ impl UniswapV2 {
router: deployment_address(contracts::BaoswapRouter::raw_contract(), network)?,
pool_code: hex!("0bae3ead48c325ce433426d2e8e6b07dac10835baec21e163760682ea3d3520d")
.into(),
missing_pool_cache_time: Duration::from_secs(60 * 60),
})
}

Expand All @@ -82,6 +89,7 @@ impl UniswapV2 {
router: deployment_address(contracts::PancakeRouter::raw_contract(), network)?,
pool_code: hex!("57224589c67f3f30a6b0d7a1b54cf3153ab84563bc609ef41dfb34f8b2974d2d")
.into(),
missing_pool_cache_time: Duration::from_secs(60 * 60),
})
}
}
Expand All @@ -94,6 +102,9 @@ pub struct Swapr {
/// The digest of the pool initialization code. This digest is used for
/// computing the deterministic pool addresses per token pair.
pub pool_code: eth::CodeDigest,
/// How long liquidity should not be fetched for a token pair that didn't
/// return useful liquidity before allowing to fetch it again.
pub missing_pool_cache_time: Duration,
}

impl Swapr {
Expand All @@ -104,6 +115,7 @@ impl Swapr {
router: deployment_address(contracts::SwaprRouter::raw_contract(), network)?,
pool_code: hex!("d306a548755b9295ee49cc729e13ca4a45e00199bbd890fa146da43a50571776")
.into(),
missing_pool_cache_time: Duration::from_secs(60 * 60),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/e2e/src/setup/colocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ base-tokens = []
[[liquidity.uniswap-v2]]
router = "{:?}"
pool-code = "{:?}"
missing-pool-cache-time-seconds = 3600
[submission]
gas-price-cap = 1000000000000
Expand Down
1 change: 1 addition & 0 deletions crates/shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ chrono = { workspace = true, features = ["clock"] }
clap = { workspace = true }
contracts = { path = "../contracts" }
database = { path = "../database" }
delay_map = "0.3"
derivative = { workspace = true }
ethcontract = { workspace = true }
ethcontract-mock = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/shared/src/sources/uniswap_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl UniV2BaselineSourceParameters {
let fetcher = pool_fetching::PoolFetcher {
pool_reader,
web3: web3.clone(),
non_existent_pools: Default::default(),
};
Ok(UniV2BaselineSource {
router,
Expand Down
46 changes: 31 additions & 15 deletions crates/shared/src/sources/uniswap_v2/pool_fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use {
},
anyhow::Result,
contracts::{IUniswapLikePair, ERC20},
delay_map::HashSetDelay,
ethcontract::{errors::MethodError, BlockId, H160, U256},
futures::{
future::{self, BoxFuture},
FutureExt as _,
},
model::TokenPair,
num::rational::Ratio,
std::collections::HashSet,
std::{collections::HashSet, sync::RwLock, time::Duration},
};

const POOL_SWAP_GAS_COST: usize = 60_000;
Expand Down Expand Up @@ -205,17 +206,15 @@ impl BaselineSolvable for Pool {
pub struct PoolFetcher<Reader> {
pub pool_reader: Reader,
pub web3: Web3,
pub non_existent_pools: RwLock<HashSetDelay<TokenPair>>,
}

impl PoolFetcher<DefaultPoolReader> {
/// Creates a pool fetcher instance for Uniswap V2 (or an exact clone).
pub fn uniswap(pair_provider: PairProvider, web3: Web3) -> Self {
impl<Reader> PoolFetcher<Reader> {
pub fn new(reader: Reader, web3: Web3, cache_time: Duration) -> Self {
Self {
pool_reader: DefaultPoolReader {
pair_provider,
web3: web3.clone(),
},
pool_reader: reader,
web3,
non_existent_pools: RwLock::new(HashSetDelay::new(cache_time)),
}
}
}
Expand All @@ -226,19 +225,36 @@ where
Reader: PoolReading,
{
async fn fetch(&self, token_pairs: HashSet<TokenPair>, at_block: Block) -> Result<Vec<Pool>> {
let mut token_pairs: Vec<_> = token_pairs.into_iter().collect();
{
let non_existent_pools = self.non_existent_pools.read().unwrap();
token_pairs.retain(|pair| !non_existent_pools.contains_key(pair));
}
let mut batch = Web3CallBatch::new(self.web3.transport().clone());
let block = BlockId::Number(at_block.into());
let futures = token_pairs
.into_iter()
.map(|pair| self.pool_reader.read_state(pair, &mut batch, block))
.iter()
.map(|pair| self.pool_reader.read_state(*pair, &mut batch, block))
.collect::<Vec<_>>();
batch.execute_all(MAX_BATCH_SIZE).await;

future::join_all(futures)
.await
.into_iter()
.filter_map(|pool| pool.transpose())
.collect()
let results = future::try_join_all(futures).await?;

let mut new_missing_pairs = vec![];
let mut pools = vec![];
for (result, key) in results.into_iter().zip(token_pairs) {
match result {
Some(pool) => pools.push(pool),
None => new_missing_pairs.push(key),
}
}
if !new_missing_pairs.is_empty() {
let mut non_existent_pools = self.non_existent_pools.write().unwrap();
for pair in new_missing_pairs {
non_existent_pools.insert(pair);
}
}
Ok(pools)
}
}

Expand Down

0 comments on commit 47e3707

Please sign in to comment.