Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await for a settlement tx in autopilot #2630

Merged
merged 13 commits into from
Apr 24, 2024
13 changes: 13 additions & 0 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
},
anyhow::Context,
chrono::Utc,
primitive_types::H256,
std::sync::Arc,
tracing::Instrument,
};
Expand Down Expand Up @@ -125,6 +126,18 @@ impl Persistence {

ex.commit().await.context("commit")
}

/// Retrieves the recent settlement transaction hashes for the provided
/// blocks range.
pub async fn recent_settlement_tx_hashes(
&self,
block_range: std::ops::Range<u64>,
) -> Result<Vec<H256>, Error> {
self.postgres
.recent_settlement_tx_hashes(block_range)
.await
.map_err(Error::DbError)
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
87 changes: 76 additions & 11 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ use {
number::nonzero::U256 as NonZeroU256,
primitive_types::{H160, H256, U256},
rand::seq::SliceRandom,
shared::token_list::AutoUpdatingTokenList,
shared::{event_handling::MAX_REORG_BLOCK_COUNT, token_list::AutoUpdatingTokenList},
std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
},
tokio::sync::Mutex,
tracing::Instrument,
web3::types::TransactionReceipt,
web3::types::{Transaction, TransactionReceipt},
};

pub struct RunLoop {
Expand Down Expand Up @@ -277,7 +277,7 @@ impl RunLoop {

tracing::info!(driver = %driver.name, "settling");
let submission_start = Instant::now();
match self.settle(driver, solution).await {
match self.settle(driver, solution, auction_id).await {
Ok(()) => Metrics::settle_ok(driver, submission_start.elapsed()),
Err(err) => {
Metrics::settle_err(driver, &err, submission_start.elapsed());
Expand Down Expand Up @@ -405,21 +405,44 @@ impl RunLoop {

/// Execute the solver's solution. Returns Ok when the corresponding
/// transaction has been mined.
async fn settle(&self, driver: &infra::Driver, solved: &Solution) -> Result<(), SettleError> {
async fn settle(
&self,
driver: &infra::Driver,
solved: &Solution,
auction_id: i64,
) -> Result<(), SettleError> {
let order_ids = solved.order_ids().copied().collect();
self.persistence
.store_order_events(order_ids, OrderEventLabel::Executing);

let tag = auction_id.to_be_bytes();
let request = settle::Request {
solution_id: solved.id,
};

let tx_hash = driver
.settle(&request, self.max_settlement_transaction_wait)
.await
.map_err(SettleError::Failure)?
.tx_hash;

let settlement_tx_wait_task =
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
self.wait_for_settlement_transaction(&tag, self.submission_deadline);
let driver_settle_task = driver.settle(&request, self.max_settlement_transaction_wait);
// Wait for either the settlement transaction to be mined or the driver returned
// a result.
let tx_hash = match futures::future::select(
Box::pin(settlement_tx_wait_task),
Box::pin(driver_settle_task),
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
)
.await
{
futures::future::Either::Left((res, _)) => {
res.map_err(SettleError::Failure).and_then(|maybe_tx| {
maybe_tx
.map(|tx| tx.hash)
.ok_or(SettleError::Failure(anyhow::anyhow!(
"settlement transaction await task reached deadline"
)))
})
}
futures::future::Either::Right((res, _)) => res
.map_err(SettleError::Failure)
.map(|response| response.tx_hash),
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
}?;
*self.in_flight_orders.lock().await = Some(InFlightOrders {
tx_hash,
orders: solved.orders.keys().copied().collect(),
Expand All @@ -429,6 +452,48 @@ impl RunLoop {
Ok(())
}

/// Tries to find a `settle` contract call with calldata ending in `tag`.
///
/// Returns None if no transaction was found within the deadline or the task
/// is cancelled.
async fn wait_for_settlement_transaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to test this in e2e or unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I couldn't find a quick way to achieve that. That would require either an artificial delay in the driver's response or creating unit tests with mocks. Someone may have a better idea of how to test it.

&self,
tag: &[u8],
max_blocks_wait: u64,
) -> Result<Option<Transaction>> {
let start_offset = MAX_REORG_BLOCK_COUNT;
let current = self.eth.current_block().borrow().number;
let start = current.saturating_sub(start_offset);
let deadline = current.saturating_add(max_blocks_wait);
tracing::debug!(%current, %start, %deadline, ?tag, "waiting for tag");
let mut seen_transactions: HashSet<H256> = Default::default();
loop {
if self.eth.current_block().borrow().number > deadline {
break;
}
let mut hashes = self
.persistence
.recent_settlement_tx_hashes(start..deadline + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you are using the existing event indexing instead of trying to inspect new blocks consecutively (which makes the code simpler)

However, the settlements table also contains the auction_id as a column. It might require exposing a new method on persistence, but I think it would be even simpler and more coherent to just see if we can fetch a settlement by auction and return its tx_hash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come I missed that column 🙈 Thanks, updated the PR.

.await?;
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
hashes.retain(|hash| !seen_transactions.contains(hash));
for hash in hashes {
let tx = match self.eth.transaction(hash).await {
Ok(Some(tx)) => tx,
Ok(None) | Err(_) => {
tracing::warn!(?hash, "unable to fetch a tx");
continue;
}
};
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
if tx.input.0.ends_with(tag) {
return Ok(Some(tx));
}
seen_transactions.insert(hash);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
Ok(None)
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
}

/// Removes orders that are currently being settled to avoid solvers trying
/// to fill an order a second time.
async fn remove_in_flight_orders(&self, mut auction: domain::Auction) -> domain::Auction {
Expand Down
Loading