Skip to content

Commit

Permalink
Exclude empty AMMs (#2849)
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz authored Aug 2, 2024
1 parent cd2af46 commit 330e5f5
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 5 deletions.
8 changes: 8 additions & 0 deletions crates/cow-amm/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use {
crate::Amm,
contracts::{cow_amm_legacy_helper::Event as CowAmmEvent, CowAmmLegacyHelper},
ethcontract::Address,
ethrpc::current_block::RangeInclusive,
shared::event_handling::EventStoring,
std::{collections::BTreeMap, sync::Arc},
Expand All @@ -26,6 +27,13 @@ impl Storage {
.flat_map(|amms| amms.iter().cloned())
.collect()
}

pub(crate) async fn remove_amms(&self, amm_addresses: &[Address]) {
let mut lock = self.0.cache.write().await;
for (_, amms) in lock.iter_mut() {
amms.retain(|amm| !amm_addresses.contains(amm.address()))
}
}
}

#[derive(Debug)]
Expand Down
1 change: 1 addition & 0 deletions crates/cow-amm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod amm;
mod cache;
mod factory;
mod maintainers;
mod registry;

pub use {amm::Amm, contracts::CowAmmLegacyHelper as Helper, registry::Registry};
81 changes: 81 additions & 0 deletions crates/cow-amm/src/maintainers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use {
crate::{cache::Storage, Amm},
contracts::ERC20,
ethcontract::futures::future::{join_all, select_ok},
ethrpc::Web3,
shared::maintenance::Maintaining,
std::sync::Arc,
tokio::sync::RwLock,
};

pub struct EmptyPoolRemoval {
storage: Arc<RwLock<Vec<Storage>>>,
web3: Web3,
}

impl EmptyPoolRemoval {
pub fn new(storage: Arc<RwLock<Vec<Storage>>>, web3: Web3) -> Self {
Self { storage, web3 }
}

/// Checks if the given AMM has a zero token balance.
async fn has_zero_balance(&self, amm: Arc<Amm>) -> bool {
let amm_address = amm.address();
let futures = amm
.traded_tokens()
.iter()
.map(move |token| async move {
ERC20::at(&self.web3, *token)
.balance_of(*amm_address)
.call()
.await
.map_err(|err| {
tracing::warn!(
amm = ?amm_address,
?token,
?err,
"failed to check AMM token balance"
);
})
.and_then(|balance| if balance.is_zero() { Ok(()) } else { Err(()) })
})
.map(Box::pin);
// If any future resolved to Ok(()), then there exists a zero balance.
select_ok(futures).await.is_ok()
}
}

#[async_trait::async_trait]
impl Maintaining for EmptyPoolRemoval {
async fn run_maintenance(&self) -> anyhow::Result<()> {
let mut amms_to_check = Vec::<Arc<Amm>>::new();
{
let lock = self.storage.read().await;
for storage in lock.iter() {
amms_to_check.extend(storage.cow_amms().await);
}
}
let futures = amms_to_check.iter().map(|amm| async {
self.has_zero_balance(amm.clone())
.await
.then_some(*amm.address())
});

let empty_amms: Vec<_> = join_all(futures).await.into_iter().flatten().collect();
if !empty_amms.is_empty() {
tracing::debug!(amms = ?empty_amms, "removing AMMs with zero token balance");
let lock = self.storage.read().await;
join_all(
lock.iter()
.map(|storage| async { storage.remove_amms(&empty_amms).await }),
)
.await;
}

Ok(())
}

fn name(&self) -> &str {
"EmptyPoolRemoval"
}
}
11 changes: 8 additions & 3 deletions crates/cow-amm/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::{cache::Storage, factory::Factory, Amm},
crate::{cache::Storage, factory::Factory, maintainers::EmptyPoolRemoval, Amm},
contracts::CowAmmLegacyHelper,
ethcontract::Address,
ethrpc::{current_block::CurrentBlockStream, Web3},
Expand Down Expand Up @@ -48,8 +48,13 @@ impl Registry {
address: factory,
};
let event_handler = EventHandler::new(Arc::new(self.web3.clone()), indexer, storage, None);
let event_handler: Vec<Arc<dyn Maintaining>> = vec![Arc::new(Mutex::new(event_handler))];
let service_maintainer = ServiceMaintenance::new(event_handler);
let token_balance_maintainer =
EmptyPoolRemoval::new(self.storage.clone(), self.web3.clone());
let maintainers: Vec<Arc<dyn Maintaining>> = vec![
Arc::new(Mutex::new(event_handler)),
Arc::new(token_balance_maintainer),
];
let service_maintainer = ServiceMaintenance::new(maintainers);
tokio::task::spawn(
service_maintainer.run_maintenance_on_new_block(self.current_block_stream.clone()),
);
Expand Down
1 change: 1 addition & 0 deletions crates/e2e/src/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ static NODE_MUTEX: Mutex<()> = Mutex::new(());
const DEFAULT_FILTERS: &[&str] = &[
"warn",
"autopilot=debug",
"cow_amm=debug",
"driver=debug",
"e2e=debug",
"orderbook=debug",
Expand Down
28 changes: 26 additions & 2 deletions crates/e2e/tests/e2e/cow_amm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,29 @@ async fn cow_amm_driver_support(web3: Web3) {
usdc.approve(onchain.contracts().allowance, to_wei_with_exp(1000, 6))
);

// Empty liquidity of one of the AMMs to test EmptyPoolRemoval maintenance job.
let zero_balance_amm = addr!("b3bf81714f704720dcb0351ff0d42eca61b069fc");
let zero_balance_amm_account = forked_node_api
.impersonate(&zero_balance_amm)
.await
.unwrap();
let pendle_token = ERC20::at(&web3, addr!("808507121b80c02388fad14726482e061b8da827"));
let balance = pendle_token
.balance_of(zero_balance_amm)
.call()
.await
.unwrap();
tx!(
zero_balance_amm_account,
pendle_token.transfer(addr!("027e1cbf2c299cba5eb8a2584910d04f1a8aa403"), balance)
);
assert!(pendle_token
.balance_of(zero_balance_amm)
.call()
.await
.unwrap()
.is_zero());

// spawn a mock solver so we can later assert things about the received auction
let mock_solver = Mock::default();
colocation::start_driver(
Expand Down Expand Up @@ -512,7 +535,8 @@ async fn cow_amm_driver_support(web3: Web3) {
tracing::info!("Waiting for all cow amms to be indexed.");
let expected_cow_amms = [
addr!("027e1cbf2c299cba5eb8a2584910d04f1a8aa403"),
addr!("b3bf81714f704720dcb0351ff0d42eca61b069fc"),
// This AMM should be removed by the EmptyPoolRemoval due to empty liquidity pool.
// addr!("b3bf81714f704720dcb0351ff0d42eca61b069fc"),
addr!("301076c36e034948a747bb61bab9cd03f62672e3"),
addr!("d7cb8cc1b56356bb7b78d02e785ead28e2158660"),
addr!("9941fd7db2003308e7ee17b04400012278f12ac6"),
Expand All @@ -532,7 +556,7 @@ async fn cow_amm_driver_support(web3: Web3) {
.unwrap();

// all tokens traded by the cow amms
tracing::error!("Waiting for all relevant native prices to be indexed.");
tracing::info!("Waiting for all relevant native prices to be indexed.");
let expected_prices = [
// missing due to insufficient liquidity in e2e test (we only index univ2)
// addr!("808507121B80c02388fAd14726482e061B8da827"), // PENDLE
Expand Down

0 comments on commit 330e5f5

Please sign in to comment.