diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 7f62894113..18a5dfbb9a 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -443,8 +443,7 @@ pub async fn run(args: Arguments) { block_retriever.clone(), skip_event_sync_start, )); - let mut maintainers: Vec> = - vec![pool_fetcher.clone(), event_updater, Arc::new(db.clone())]; + let mut maintainers: Vec> = vec![event_updater, Arc::new(db.clone())]; let gas_price_estimator = Arc::new(InstrumentedGasEstimator::new( shared::gas_price_estimation::create_priority_estimator( @@ -538,9 +537,6 @@ pub async fn run(args: Arguments) { ); maintainers.push(broadcaster_event_updater); } - if let Some(balancer) = balancer_pool_fetcher { - maintainers.push(balancer); - } if let Some(uniswap_v3) = uniswap_v3_pool_fetcher { maintainers.push(uniswap_v3); } diff --git a/crates/driver/src/boundary/liquidity/uniswap/v2.rs b/crates/driver/src/boundary/liquidity/uniswap/v2.rs index e45d9903ee..51039e8455 100644 --- a/crates/driver/src/boundary/liquidity/uniswap/v2.rs +++ b/crates/driver/src/boundary/liquidity/uniswap/v2.rs @@ -9,14 +9,9 @@ use { }, async_trait::async_trait, contracts::{GPv2Settlement, IUniswapLikeRouter}, - ethrpc::{ - current_block::{self, CurrentBlockStream}, - Web3, - }, - futures::StreamExt, + ethrpc::{current_block::CurrentBlockStream, Web3}, shared::{ http_solver::model::TokenAmount, - maintenance::Maintaining, sources::uniswap_v2::{ pair_provider::PairProvider, pool_cache::PoolCache, @@ -30,9 +25,8 @@ use { }, std::{ collections::HashSet, - sync::{self, Arc, Mutex}, + sync::{Arc, Mutex}, }, - tracing::Instrument, }; /// Median gas used per UniswapInteraction (v2). @@ -150,18 +144,11 @@ where config.missing_pool_cache_time, ); - let pool_cache = Arc::new(PoolCache::new( + Arc::new(PoolCache::new( boundary::liquidity::cache_config(), Arc::new(pool_fetcher), blocks.clone(), - )?); - - tokio::task::spawn( - cache_update(blocks.clone(), Arc::downgrade(&pool_cache)) - .instrument(tracing::info_span!("uniswap_v2_cache")), - ); - - pool_cache + )?) }; Ok(Box::new(UniswapLikeLiquidity::with_allowances( @@ -172,33 +159,6 @@ where ))) } -async fn cache_update(blocks: CurrentBlockStream, pool_cache: sync::Weak) { - let mut blocks = current_block::into_stream(blocks); - loop { - let block = blocks - .next() - .await - .expect("block stream unexpectedly ended") - .number; - - let pool_cache = match pool_cache.upgrade() { - Some(value) => value, - None => { - tracing::debug!("pool cache dropped; stopping update task"); - break; - } - }; - - tracing::info_span!("maintenance", block) - .in_scope(|| async move { - if let Err(err) = pool_cache.run_maintenance().await { - tracing::warn!(?err, "error updating pool cache"); - } - }) - .await; - } -} - /// An allowance manager that always reports no allowances. struct NoAllowanceManaging; diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index 7f6d32136a..98a469f04a 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -28,7 +28,7 @@ use { fee_subsidy::{config::FeeSubsidyConfiguration, FeeSubsidizing}, gas_price::InstrumentedGasEstimator, http_client::HttpClientFactory, - maintenance::{Maintaining, ServiceMaintenance}, + maintenance::ServiceMaintenance, metrics::{serve_metrics, DEFAULT_METRICS_PORT}, network::network_name, oneinch_api::OneInchClientImpl, @@ -518,12 +518,9 @@ pub async fn run(args: Arguments) { ipfs, )); - let mut maintainers = vec![pool_fetcher as Arc]; - if let Some(balancer) = balancer_pool_fetcher { - maintainers.push(balancer); - } if let Some(uniswap_v3) = uniswap_v3_pool_fetcher { - maintainers.push(uniswap_v3); + let service_maintainer = ServiceMaintenance::new(vec![uniswap_v3]); + task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream)); } check_database_connection(orderbook.as_ref()).await; @@ -548,9 +545,6 @@ pub async fn run(args: Arguments) { native_price_estimator, ); - let service_maintainer = ServiceMaintenance::new(maintainers); - task::spawn(service_maintainer.run_maintenance_on_new_block(current_block_stream)); - let mut metrics_address = args.bind_address; metrics_address.set_port(DEFAULT_METRICS_PORT); tracing::info!(%metrics_address, "serving metrics"); diff --git a/crates/shared/src/recent_block_cache.rs b/crates/shared/src/recent_block_cache.rs index 4ecbaff8b1..131ff69602 100644 --- a/crates/shared/src/recent_block_cache.rs +++ b/crates/shared/src/recent_block_cache.rs @@ -30,7 +30,7 @@ use { cached::{Cached, SizedCache}, ethcontract::BlockNumber, ethrpc::current_block::CurrentBlockStream, - futures::FutureExt, + futures::{FutureExt, StreamExt}, itertools::Itertools, prometheus::IntCounterVec, std::{ @@ -41,6 +41,7 @@ use { sync::{Arc, Mutex}, time::Duration, }, + tracing::Instrument, }; /// How many liqudity sources should at most be fetched in a single chunk. @@ -87,6 +88,14 @@ impl From for BlockNumber { /// updates the N most recently used entries automatically when a new block /// arrives. pub struct RecentBlockCache +where + K: CacheKey, + F: CacheFetching, +{ + inner: Arc>, +} + +pub struct Inner where K: CacheKey, F: CacheFetching, @@ -94,7 +103,6 @@ where mutexed: Mutex>, number_of_blocks_to_cache: NonZeroU64, fetcher: Arc, - block_stream: CurrentBlockStream, maximum_retries: u32, delay_between_retries: Duration, metrics: &'static Metrics, @@ -133,7 +141,6 @@ struct Metrics { #[metric(labels("cache_type"))] recent_block_cache_misses: IntCounterVec, } - impl RecentBlockCache where K: CacheKey, @@ -158,7 +165,7 @@ where metrics_label: &'static str, ) -> Result { let block = block_stream.borrow().number; - Ok(Self { + let inner = Arc::new(Inner { mutexed: Mutex::new(Mutexed::new( config.number_of_entries_to_auto_update, block, @@ -166,20 +173,55 @@ where )), number_of_blocks_to_cache: config.number_of_blocks_to_cache, fetcher: Arc::new(fetcher), - block_stream, maximum_retries: config.max_retries, delay_between_retries: config.delay_between_retries, metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(), metrics_label, requests: BoxRequestSharing::labelled("liquidity_fetching".into()), - }) + }); + + Self::spawn_gc_task( + Arc::downgrade(&inner), + block_stream, + metrics_label.to_string(), + ); + + Ok(Self { inner }) + } + + pub async fn fetch(&self, keys: impl IntoIterator, block: Block) -> Result> { + self.inner.fetch(keys, block).await } - pub async fn update_cache(&self) -> Result<()> { - let new_block = self.block_stream.borrow().number; - self.update_cache_at_block(new_block).await + fn spawn_gc_task( + inner: std::sync::Weak>, + block_stream: CurrentBlockStream, + label: String, + ) { + tokio::task::spawn( + async move { + let mut stream = ethrpc::current_block::into_stream(block_stream); + while let Some(block) = stream.next().await { + let Some(inner) = inner.upgrade() else { + tracing::debug!("cache no longer in use; terminate GC task"); + break; + }; + if let Err(err) = inner.update_cache_at_block(block.number).await { + tracing::warn!(?err, "failed to update cache"); + } + } + } + .instrument(tracing::info_span!("cache_maintenance", cache = label)), + ); } +} +impl Inner +where + K: CacheKey, + V: Clone + Send + Sync + 'static, + F: CacheFetching, +{ async fn update_cache_at_block(&self, new_block: u64) -> Result<()> { let keys = self .mutexed @@ -239,7 +281,7 @@ where fut.await.context("could not fetch liquidity") } - pub async fn fetch(&self, keys: impl IntoIterator, block: Block) -> Result> { + async fn fetch(&self, keys: impl IntoIterator, block: Block) -> Result> { let block = match block { Block::Recent => None, Block::Number(number) => Some(number), @@ -509,7 +551,8 @@ mod tests { block_stream, "", ) - .unwrap(); + .unwrap() + .inner; let assert_keys_recently_used = |expected_keys: &[usize]| { let cached_keys = cache @@ -565,7 +608,8 @@ mod tests { block_stream, "", ) - .unwrap(); + .unwrap() + .inner; // Initial state on the block chain. let initial_values = vec![ @@ -626,7 +670,8 @@ mod tests { block_stream, "", ) - .unwrap(); + .unwrap() + .inner; let value0 = TestValue::new(0, "0"); let value1 = TestValue::new(1, "1"); @@ -683,7 +728,8 @@ mod tests { block_stream, "", ) - .unwrap(); + .unwrap() + .inner; // cache at block 5 *values.lock().unwrap() = vec![TestValue::new(0, "foo")]; @@ -750,7 +796,8 @@ mod tests { block_stream, "", ) - .unwrap(); + .unwrap() + .inner; // Fetch 10 keys on block 10; but we only have capacity to update 2 of those in // the background. @@ -798,7 +845,8 @@ mod tests { block_stream, "", ) - .unwrap(); + .unwrap() + .inner; let key = TestKey(0); // cache at block 7, most recent block is 10. diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching.rs b/crates/shared/src/sources/balancer_v2/pool_fetching.rs index 622a294848..ab68ea9867 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching.rs @@ -27,7 +27,6 @@ use { }, crate::{ ethrpc::{Web3, Web3Transport}, - maintenance::Maintaining, recent_block_cache::{Block, CacheConfig}, token_info::TokenInfoFetching, }, @@ -357,17 +356,6 @@ impl BalancerPoolFetching for BalancerPoolFetcher { } } -#[async_trait::async_trait] -impl Maintaining for BalancerPoolFetcher { - async fn run_maintenance(&self) -> Result<()> { - self.fetcher.run_maintenance().await - } - - fn name(&self) -> &str { - "BalancerPoolFetcher" - } -} - /// Creates an aggregate fetcher for all supported pool factories. async fn create_aggregate_pool_fetcher( web3: Web3, @@ -557,7 +545,6 @@ mod tests { ) .await .unwrap(); - pool_fetcher.run_maintenance().await.unwrap(); let pair = TokenPair::new( addr!("C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), addr!("C011a73ee8576Fb46F5E1c5751cA3B9Fe0af2a6F"), @@ -604,9 +591,6 @@ mod tests { pool_id_deny_list: Default::default(), }; - // index all the pools. - pool_fetcher.run_maintenance().await.unwrap(); - // see what the subgraph says. let client = BalancerSubgraphClient::for_chain(chain_id, Client::new()).unwrap(); let subgraph_pools = client.get_registered_pools().await.unwrap(); diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/aggregate.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/aggregate.rs index 4010bf8067..525ca54ec9 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/aggregate.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/aggregate.rs @@ -3,11 +3,7 @@ use { super::internal::InternalPoolFetching, - crate::{ - maintenance::Maintaining, - recent_block_cache::Block, - sources::balancer_v2::pools::Pool, - }, + crate::{recent_block_cache::Block, sources::balancer_v2::pools::Pool}, anyhow::Result, ethcontract::H256, futures::future, @@ -53,21 +49,3 @@ impl InternalPoolFetching for Aggregate { .collect()) } } - -#[async_trait::async_trait] -impl Maintaining for Aggregate { - async fn run_maintenance(&self) -> Result<()> { - future::try_join_all( - self.fetchers - .iter() - .map(|fetcher| fetcher.run_maintenance()), - ) - .await?; - - Ok(()) - } - - fn name(&self) -> &str { - "BalancerPoolFetcher" - } -} diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs index 112cbe5bb4..c6747aa996 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/cache.rs @@ -6,7 +6,6 @@ use { super::internal::InternalPoolFetching, crate::{ - maintenance::Maintaining, recent_block_cache::{Block, CacheConfig, CacheFetching, CacheKey, RecentBlockCache}, sources::balancer_v2::pools::Pool, }, @@ -62,21 +61,6 @@ where } } -#[async_trait::async_trait] -impl Maintaining for Cache -where - Inner: InternalPoolFetching, -{ - async fn run_maintenance(&self) -> Result<()> { - futures::try_join!(self.inner.run_maintenance(), self.cache.update_cache())?; - Ok(()) - } - - fn name(&self) -> &str { - "BalancerPoolFetcher" - } -} - impl CacheKey for H256 { fn first_ord() -> Self { H256::zero() diff --git a/crates/shared/src/sources/balancer_v2/pool_fetching/internal.rs b/crates/shared/src/sources/balancer_v2/pool_fetching/internal.rs index e2c74eac0d..944fa00850 100644 --- a/crates/shared/src/sources/balancer_v2/pool_fetching/internal.rs +++ b/crates/shared/src/sources/balancer_v2/pool_fetching/internal.rs @@ -2,11 +2,7 @@ //! strategies. use { - crate::{ - maintenance::Maintaining, - recent_block_cache::Block, - sources::balancer_v2::pools::Pool, - }, + crate::{recent_block_cache::Block, sources::balancer_v2::pools::Pool}, anyhow::Result, ethcontract::H256, model::TokenPair, @@ -18,33 +14,10 @@ use { /// /// This allows us to compose different inner pool fetching strategies together. #[async_trait::async_trait] -pub trait InternalPoolFetching: Maintaining + Send + Sync + 'static { +pub trait InternalPoolFetching: Send + Sync + 'static { /// Retrives all pool IDs that trade the specified pairs. async fn pool_ids_for_token_pairs(&self, token_pairs: HashSet) -> HashSet; /// Fetches current pool states for the specified IDs and block. async fn pools_by_id(&self, pool_ids: HashSet, block: Block) -> Result>; } - -// We require some manual mocking because of the `: Maintaining` "super-trait". -mockall::mock! { - InternalPoolFetcher {} - - #[async_trait::async_trait] - impl InternalPoolFetching for InternalPoolFetcher { - async fn pool_ids_for_token_pairs(&self, token_pairs: HashSet) -> HashSet; - async fn pools_by_id( - &self, - pool_ids: HashSet, - block: Block, - ) -> Result>; - } - - #[async_trait::async_trait] - impl Maintaining for InternalPoolFetcher { - async fn run_maintenance(&self) -> Result<()>; - fn name(&self) -> &str { - "BalancerPoolFetcher" - } - } -} diff --git a/crates/shared/src/sources/uniswap_v2/pool_cache.rs b/crates/shared/src/sources/uniswap_v2/pool_cache.rs index 78891375ab..e0567158cf 100644 --- a/crates/shared/src/sources/uniswap_v2/pool_cache.rs +++ b/crates/shared/src/sources/uniswap_v2/pool_cache.rs @@ -1,6 +1,5 @@ use { crate::{ - maintenance::Maintaining, recent_block_cache::{Block, CacheConfig, CacheFetching, CacheKey, RecentBlockCache}, sources::uniswap_v2::pool_fetching::{Pool, PoolFetching}, }, @@ -51,14 +50,3 @@ impl PoolFetching for PoolCache { self.0.fetch(pairs, block).await } } - -#[async_trait::async_trait] -impl Maintaining for PoolCache { - async fn run_maintenance(&self) -> Result<()> { - self.0.update_cache().await - } - - fn name(&self) -> &str { - "UniswapV2PoolFetcher" - } -} diff --git a/crates/solver/src/run.rs b/crates/solver/src/run.rs index 3cce1b1400..391d8e77ef 100644 --- a/crates/solver/src/run.rs +++ b/crates/solver/src/run.rs @@ -202,11 +202,6 @@ pub async fn run(args: Arguments) { }) .collect() .await; - maintainers.extend( - univ2_sources - .iter() - .map(|(_, cache)| cache.clone() as Arc<_>), - ); if baseline_sources.contains(&BaselineSource::BalancerV2) { let factories = args @@ -229,7 +224,6 @@ pub async fn run(args: Arguments) { { Ok(balancer_pool_fetcher) => { let balancer_pool_fetcher = Arc::new(balancer_pool_fetcher); - maintainers.push(balancer_pool_fetcher.clone()); liquidity_sources.push(Box::new(BalancerV2Liquidity::new( web3.clone(), balancer_pool_fetcher,