Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await for a settlement tx in autopilot #2630

Merged
merged 13 commits into from
Apr 24, 2024
6 changes: 6 additions & 0 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ pub struct Arguments {
)]
pub max_settlement_transaction_wait: Duration,

/// The amount blocks to wait for a settlement to appear on chain.
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
#[clap(long, env, default_value = "20")]
pub max_blocks_wait: u64,

/// Run the autopilot in a shadow mode by specifying an upstream CoW
/// protocol deployment to pull auctions from. This will cause the autopilot
/// to start a run loop where it performs solver competition on driver,
Expand Down Expand Up @@ -266,6 +270,7 @@ impl std::fmt::Display for Arguments {
native_price_estimation_results_required,
auction_update_interval,
max_settlement_transaction_wait,
max_blocks_wait,
s3,
protocol_fee_exempt_addresses,
} = self;
Expand Down Expand Up @@ -352,6 +357,7 @@ impl std::fmt::Display for Arguments {
"max_settlement_transaction_wait: {:?}",
max_settlement_transaction_wait
)?;
writeln!(f, "max_blocks_wait: {:?}", max_blocks_wait)?;
writeln!(f, "s3: {:?}", s3)?;
Ok(())
}
Expand Down
13 changes: 13 additions & 0 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
},
anyhow::Context,
chrono::Utc,
primitive_types::H256,
std::sync::Arc,
tracing::Instrument,
};
Expand Down Expand Up @@ -125,6 +126,18 @@ impl Persistence {

ex.commit().await.context("commit")
}

/// Retrieves the recent settlement transaction hashes for the provided
/// blocks range.
pub async fn recent_settlement_tx_hashes(
&self,
block_range: std::ops::Range<u64>,
) -> Result<Vec<H256>, Error> {
self.postgres
.recent_settlement_tx_hashes(block_range)
.await
.map_err(Error::DbError)
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
5 changes: 3 additions & 2 deletions crates/autopilot/src/infra/solvers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod dto;
const RESPONSE_SIZE_LIMIT: usize = 10_000_000;
const RESPONSE_TIME_LIMIT: Duration = Duration::from_secs(60);

#[derive(Clone)]
pub struct Driver {
pub name: String,
pub url: Url,
Expand Down Expand Up @@ -40,10 +41,10 @@ impl Driver {

pub async fn settle(
&self,
request: &settle::Request,
request: settle::Request,
timeout: std::time::Duration,
) -> Result<settle::Response> {
self.request_response("settle", request, Some(timeout))
self.request_response("settle", &request, Some(timeout))
.await
}

Expand Down
5 changes: 3 additions & 2 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ pub async fn run(args: Arguments) {
AutoUpdatingTokenList::from_configuration(market_makable_token_list_configuration).await;

let run = RunLoop {
eth,
eth: Arc::new(eth),
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
solvable_orders_cache,
drivers: args
.drivers
Expand All @@ -604,9 +604,10 @@ pub async fn run(args: Arguments) {
submission_deadline: args.submission_deadline as u64,
additional_deadline_for_rewards: args.additional_deadline_for_rewards as u64,
max_settlement_transaction_wait: args.max_settlement_transaction_wait,
max_blocks_wait: args.max_blocks_wait,
solve_deadline: args.solve_deadline,
in_flight_orders: Default::default(),
persistence: persistence.clone(),
persistence: Arc::new(persistence.clone()),
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
liveness: liveness.clone(),
};
run.run_forever().await;
Expand Down
128 changes: 111 additions & 17 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,28 @@ use {
number::nonzero::U256 as NonZeroU256,
primitive_types::{H160, H256, U256},
rand::seq::SliceRandom,
shared::token_list::AutoUpdatingTokenList,
shared::{event_handling::MAX_REORG_BLOCK_COUNT, token_list::AutoUpdatingTokenList},
std::{
collections::{BTreeMap, HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
},
tokio::sync::Mutex,
tokio::sync::{watch, Mutex},
tracing::Instrument,
web3::types::TransactionReceipt,
web3::types::{Transaction, TransactionReceipt},
};

pub struct RunLoop {
pub eth: infra::Ethereum,
pub persistence: infra::Persistence,
pub eth: Arc<infra::Ethereum>,
pub persistence: Arc<infra::Persistence>,
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
pub drivers: Vec<infra::Driver>,

pub solvable_orders_cache: Arc<SolvableOrdersCache>,
pub market_makable_token_list: AutoUpdatingTokenList,
pub submission_deadline: u64,
pub additional_deadline_for_rewards: u64,
pub max_settlement_transaction_wait: Duration,
pub max_blocks_wait: u64,
pub solve_deadline: Duration,
pub in_flight_orders: Arc<Mutex<Option<InFlightOrders>>>,
pub liveness: Arc<Liveness>,
Expand Down Expand Up @@ -277,7 +278,7 @@ impl RunLoop {

tracing::info!(driver = %driver.name, "settling");
let submission_start = Instant::now();
match self.settle(driver, solution).await {
match self.settle(driver.clone(), solution, auction_id).await {
Ok(()) => Metrics::settle_ok(driver, submission_start.elapsed()),
Err(err) => {
Metrics::settle_err(driver, &err, submission_start.elapsed());
Expand All @@ -298,7 +299,7 @@ impl RunLoop {
&self,
id: domain::auction::Id,
auction: &domain::Auction,
) -> Vec<Participant<'_>> {
) -> Vec<Participant> {
let request = solve::Request::new(
id,
auction,
Expand Down Expand Up @@ -333,7 +334,10 @@ impl RunLoop {
solutions.into_iter().filter_map(|solution| match solution {
Ok(solution) => {
Metrics::solution_ok(driver);
Some(Participant { driver, solution })
Some(Participant {
driver: driver.clone(),
solution,
})
}
Err(err) => {
Metrics::solution_err(driver, &err);
Expand Down Expand Up @@ -405,20 +409,62 @@ impl RunLoop {

/// Execute the solver's solution. Returns Ok when the corresponding
/// transaction has been mined.
async fn settle(&self, driver: &infra::Driver, solved: &Solution) -> Result<(), SettleError> {
async fn settle(
&self,
driver: infra::Driver,
solved: &Solution,
auction_id: i64,
) -> Result<(), SettleError> {
let order_ids = solved.order_ids().copied().collect();
self.persistence
.store_order_events(order_ids, OrderEventLabel::Executing);

let eth = self.eth.clone();
let persistence = self.persistence.clone();
let max_blocks_wait = self.max_blocks_wait;
let (cancellation_tx, cancellation_rx) = watch::channel(false);
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
let settlement_tx_wait_task = tokio::spawn(async move {
let tag = auction_id.to_be_bytes();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the

input.extend(auction_id.to_be_bytes());

Self::wait_for_settlement_transaction(
eth,
persistence,
&tag,
max_blocks_wait,
cancellation_rx,
)
.await
.map_err(SettleError::Failure)
});

let duration = self.max_settlement_transaction_wait;
let request = settle::Request {
solution_id: solved.id,
};

let tx_hash = driver
.settle(&request, self.max_settlement_transaction_wait)
.await
.map_err(SettleError::Failure)?
.tx_hash;
let driver_settle_task = tokio::spawn(async move {
driver
.clone()
.settle(request, duration)
.await
.map_err(SettleError::Failure)
});
// Wait for either the settlement transaction to be mined or the driver returned
// a result.
let tx_hash = tokio::select! {
transaction_result = settlement_tx_wait_task => {
transaction_result
.map_err(|err| SettleError::Failure(err.into()))
.and_then(|res| res.and_then(|tx| {
tx.map(|tx| tx.hash).ok_or(SettleError::Failure(anyhow::anyhow!("settlement transaction await task reached deadline")))
}))
},
settle_result = driver_settle_task => {
// When driver returns any result, the blocks fetching task has to be stopped.
let _ = cancellation_tx.send(true);
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
settle_result
.map_err(|err| SettleError::Failure(err.into()))
.and_then(|res| res.map(|response| response.tx_hash))
}
}?;

*self.in_flight_orders.lock().await = Some(InFlightOrders {
tx_hash,
Expand All @@ -429,6 +475,54 @@ impl RunLoop {
Ok(())
}

/// Tries to find a `settle` contract call with calldata ending in `tag`.
///
/// Returns None if no transaction was found within the deadline or the task
/// is cancelled.
async fn wait_for_settlement_transaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to test this in e2e or unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I couldn't find a quick way to achieve that. That would require either an artificial delay in the driver's response or creating unit tests with mocks. Someone may have a better idea of how to test it.

eth: Arc<infra::Ethereum>,
persistence: Arc<infra::Persistence>,
tag: &[u8],
max_blocks_wait: u64,
mut cancellation_rx: watch::Receiver<bool>,
) -> Result<Option<Transaction>> {
let start_offset = MAX_REORG_BLOCK_COUNT;
let current = eth.current_block().borrow().number;
let start = current.saturating_sub(start_offset);
let deadline = current.saturating_add(max_blocks_wait);
tracing::debug!(%current, %start, %deadline, ?tag, "waiting for tag");
let mut seen_transactions: HashSet<H256> = Default::default();
loop {
if eth.current_block().borrow().number > deadline {
break;
}
let mut hashes = persistence
.recent_settlement_tx_hashes(start..deadline + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you are using the existing event indexing instead of trying to inspect new blocks consecutively (which makes the code simpler)

However, the settlements table also contains the auction_id as a column. It might require exposing a new method on persistence, but I think it would be even simpler and more coherent to just see if we can fetch a settlement by auction and return its tx_hash.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come I missed that column 🙈 Thanks, updated the PR.

.await?;
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
hashes.retain(|hash| !seen_transactions.contains(hash));
for hash in hashes {
let tx = match eth.transaction(hash).await {
Ok(Some(tx)) => tx,
Ok(None) | Err(_) => {
tracing::warn!(?hash, "unable to fetch a tx");
continue;
}
};
if tx.input.0.ends_with(tag) {
return Ok(Some(tx));
}
seen_transactions.insert(hash);
tokio::select! {
_ = cancellation_rx.changed() => {
return Ok(None);
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {},
}
}
}
Ok(None)
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
}

/// Removes orders that are currently being settled to avoid solvers trying
/// to fill an order a second time.
async fn remove_in_flight_orders(&self, mut auction: domain::Auction) -> domain::Auction {
Expand Down Expand Up @@ -468,8 +562,8 @@ pub struct InFlightOrders {
orders: HashSet<domain::OrderUid>,
}

struct Participant<'a> {
driver: &'a infra::Driver,
struct Participant {
driver: infra::Driver,
squadgazzz marked this conversation as resolved.
Show resolved Hide resolved
solution: Solution,
}

Expand Down
Loading