Skip to content

Commit

Permalink
Index cow amms in background task (#3076)
Browse files Browse the repository at this point in the history
# Description
Currently restarts on arbitrum are slow enough to cause emergency
alerts. The reason is that we still don't persist indexed cow amms so we
re-index everything from scratch. As a workaround until we store cow
amms in the DB which makes syncing to the tip very fast after a restart
I moved the cow amm indexing into a background task that will not block
starting new auctions.

The effect is that as we continue to index cow amms after a restart more
and more cow amms will show up in the auction.

## How to test
CI and test on staging to see that restarts are fast again and cow amms
eventually show up
  • Loading branch information
MartinquaXD authored Oct 22, 2024
1 parent fd743de commit 079db75
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
43 changes: 33 additions & 10 deletions crates/autopilot/src/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use {
event_updater::EventUpdater,
},
anyhow::Result,
ethrpc::block_stream::BlockInfo,
ethrpc::block_stream::{into_stream, BlockInfo, CurrentBlockWatcher},
futures::StreamExt,
prometheus::{
core::{AtomicU64, GenericGauge},
HistogramVec,
Expand Down Expand Up @@ -89,15 +90,6 @@ impl Maintenance {
),
Self::timed_future("db_cleanup", self.db_cleanup.run_maintenance()),
Self::timed_future("ethflow_indexer", self.index_ethflow_orders()),
Self::timed_future(
"cow_amm_indexer",
futures::future::try_join_all(
self.cow_amm_indexer
.iter()
.cloned()
.map(|indexer| async move { indexer.run_maintenance().await }),
)
),
)?;

Ok(())
Expand Down Expand Up @@ -128,6 +120,37 @@ impl Maintenance {
.start_timer();
fut.await
}

/// Spawns a background task that runs on every new block but also
/// at least after every `update_interval`.
pub fn spawn_cow_amm_indexing_task(self_: Arc<Self>, current_block: CurrentBlockWatcher) {
tokio::task::spawn(async move {
let mut stream = into_stream(current_block);
loop {
let _ = match stream.next().await {
Some(block) => {
metrics().last_seen_block.set(block.number);
block
}
None => panic!("block stream terminated unexpectedly"),
};

// TODO: move this back into `Self::update_inner()` once we
// store cow amms in the DB to avoid incredibly slow restarts.
let _ = Self::timed_future(
"cow_amm_indexer",
futures::future::try_join_all(
self_
.cow_amm_indexer
.iter()
.cloned()
.map(|indexer| async move { indexer.run_maintenance().await }),
),
)
.await;
}
});
}
}

type EthflowIndexer =
Expand Down
4 changes: 4 additions & 0 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl RunLoop {
}

pub async fn run_forever(self) -> ! {
Maintenance::spawn_cow_amm_indexing_task(
self.maintenance.clone(),
self.eth.current_block().clone(),
);
let mut last_auction = None;
let mut last_block = None;
let self_arc = Arc::new(self);
Expand Down

0 comments on commit 079db75

Please sign in to comment.