From 1044860619ca33a282e5314a1b6bdeb24c549ed5 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Fri, 20 Oct 2023 14:21:14 +0200 Subject: [PATCH] Make /settle call blocking and report tx_hash (#1999) # Description Currently the autopilot wastes a lot of time waiting for transactions that will never appear. This happens because the driver's `/settle` endpoint operates in a fire-and-forget (get request, kick off submission in background, return immediately). That way the driver does not have any way to communicate that it will not be able to submit the solution to the autopilot and it will have to monitor the blockchain until the deadline is reached. This PR is slightly related to https://github.com/cowprotocol/services/issues/1974 but does not solve the issue that solutions don't get submitted in the first place. # Changes This PR makes it so that `/settle` actually blocks in the driver until the solution gets submitted and returns the tx_hash. If the driver is not able to submit the solution (e.g. the simulations for the solution start reverting) it will return an error to the autopilot which will immediately move to the next auction. Note that this is not really how it's supposed to work but this solution is fine as long as we are running all the drivers (and can therefore assume a reasonable behavior). We should revisit this decision before we can encourage external parties to run their own driver. ## How to test e2e tests --- crates/autopilot/src/driver_api.rs | 28 +++-- crates/autopilot/src/driver_model.rs | 2 + crates/autopilot/src/run_loop.rs | 116 +++--------------- crates/driver/src/boundary/mempool.rs | 2 +- crates/driver/src/domain/competition/mod.rs | 40 +++--- crates/driver/src/domain/mempools.rs | 30 +++-- crates/driver/src/infra/api/error.rs | 3 + .../infra/api/routes/settle/dto/settled.rs | 8 +- crates/driver/src/infra/observe/mod.rs | 1 + crates/driver/src/tests/setup/mod.rs | 7 +- crates/solver/src/settlement_submission.rs | 2 +- .../src/settlement_submission/submitter.rs | 9 +- 12 files changed, 102 insertions(+), 146 deletions(-) diff --git a/crates/autopilot/src/driver_api.rs b/crates/autopilot/src/driver_api.rs index 8f64ecc153..b1c4d56552 100644 --- a/crates/autopilot/src/driver_api.rs +++ b/crates/autopilot/src/driver_api.rs @@ -29,21 +29,27 @@ impl Driver { } pub async fn solve(&self, request: &solve::Request) -> Result { - self.request_response("solve", request).await + self.request_response("solve", request, None).await } pub async fn reveal(&self, request: &reveal::Request) -> Result { - self.request_response("reveal", request).await + self.request_response("reveal", request, None).await } - pub async fn settle(&self, request: &settle::Request) -> Result { - self.request_response("settle", request).await + pub async fn settle( + &self, + request: &settle::Request, + timeout: std::time::Duration, + ) -> Result { + self.request_response("settle", request, Some(timeout)) + .await } async fn request_response( &self, path: &str, request: &impl serde::Serialize, + timeout: Option, ) -> Result where Response: serde::de::DeserializeOwned, @@ -54,13 +60,13 @@ impl Driver { body=%serde_json::to_string_pretty(request).unwrap(), "request", ); - let mut response = self - .client - .post(url.clone()) - .json(request) - .send() - .await - .context("send")?; + let mut request = self.client.post(url.clone()).json(request); + + if let Some(timeout) = timeout { + request = request.timeout(timeout); + } + + let mut response = request.send().await.context("send")?; let status = response.status().as_u16(); let body = response_body_with_size_limit(&mut response, RESPONSE_SIZE_LIMIT) .await diff --git a/crates/autopilot/src/driver_model.rs b/crates/autopilot/src/driver_model.rs index ac7dcf289c..f2d195570f 100644 --- a/crates/autopilot/src/driver_model.rs +++ b/crates/autopilot/src/driver_model.rs @@ -194,6 +194,7 @@ pub mod reveal { pub mod settle { use { model::bytes_hex, + primitive_types::H256, serde::{Deserialize, Serialize}, serde_with::serde_as, }; @@ -212,6 +213,7 @@ pub mod settle { #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct Response { pub calldata: Calldata, + pub tx_hash: H256, } #[serde_as] diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index c784ee09e9..bb3033d086 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -30,20 +30,15 @@ use { }, }, number::nonzero::U256 as NonZeroU256, - primitive_types::{H160, H256, U256}, + primitive_types::{H160, U256}, rand::seq::SliceRandom, - shared::{ - event_handling::MAX_REORG_BLOCK_COUNT, - remaining_amounts, - token_list::AutoUpdatingTokenList, - }, + shared::{remaining_amounts, token_list::AutoUpdatingTokenList}, std::{ collections::{BTreeMap, HashSet}, sync::Arc, time::{Duration, Instant}, }, tracing::Instrument, - web3::types::Transaction, }; pub const SOLVE_TIME_LIMIT: Duration = Duration::from_secs(15); @@ -290,7 +285,7 @@ impl RunLoop { } tracing::info!(driver = %driver.name, "settling"); - match self.settle(driver, auction_id, solution, &revealed).await { + match self.settle(driver, solution, &revealed).await { Ok(()) => Metrics::settle_ok(driver), Err(err) => { Metrics::settle_err(driver, &err); @@ -413,7 +408,6 @@ impl RunLoop { async fn settle( &self, driver: &Driver, - id: AuctionId, solved: &Solution, revealed: &reveal::Response, ) -> Result<(), SettleError> { @@ -424,95 +418,27 @@ impl RunLoop { .collect_vec(); self.database.store_order_events(&events).await; - driver - .settle(&settle::Request { - solution_id: solved.id, - }) + let request = settle::Request { + solution_id: solved.id, + }; + + let tx_hash = driver + .settle(&request, self.max_settlement_transaction_wait) .await - .map_err(SettleError::Failure)?; + .map_err(SettleError::Failure)? + .tx_hash; - // TODO: React to deadline expiring. - let transaction = self - .wait_for_settlement_transaction(id, solved.account) - .await?; - if let Some(tx) = transaction { - let events = revealed - .orders - .iter() - .map(|uid| (*uid, OrderEventLabel::Traded)) - .collect_vec(); - self.database.store_order_events(&events).await; - tracing::debug!("settled in tx {:?}", tx.hash); - } else { - tracing::warn!("could not find a mined transaction in time"); - } + let events = revealed + .orders + .iter() + .map(|uid| (*uid, OrderEventLabel::Traded)) + .collect_vec(); + self.database.store_order_events(&events).await; + tracing::debug!(?tx_hash, "solution settled"); Ok(()) } - /// Tries to find a `settle` contract call with calldata ending in `tag`. - /// - /// Returns None if no transaction was found within the deadline. - async fn wait_for_settlement_transaction( - &self, - id: AuctionId, - submission_address: H160, - ) -> Result, SettleError> { - // Start earlier than current block because there might be a delay when - // receiving the Solver's /execute response during which it already - // started broadcasting the tx. - let start_offset = MAX_REORG_BLOCK_COUNT; - let max_wait_time_blocks = (self.max_settlement_transaction_wait.as_secs_f32() - / self.network_block_interval.as_secs_f32()) - .ceil() as u64; - let current = self.current_block.borrow().number; - let start = current.saturating_sub(start_offset); - let deadline = current.saturating_add(max_wait_time_blocks); - tracing::debug!( - %current, %start, %deadline, ?id, ?submission_address, - "waiting for settlement", - ); - - // Use the existing event indexing infrastructure to find the transaction. We - // query all settlement events in the block range to get tx hashes and - // query the node for the full calldata. - // - // If the block range was large, we would make the query more efficient by - // moving the starting block up while taking reorgs into account. With - // the current range of 30 blocks this isn't necessary. - // - // We do keep track of hashes we have already seen to reduce load from the node. - - let mut seen_transactions: HashSet = Default::default(); - while self.current_block.borrow().number <= deadline { - let mut hashes = self - .database - .recent_settlement_tx_hashes(start..deadline + 1) - .await - .map_err(SettleError::Database)?; - hashes.retain(|hash| !seen_transactions.contains(hash)); - for hash in hashes { - let Some(tx) = self - .web3 - .eth() - .transaction(web3::types::TransactionId::Hash(hash)) - .await - .map_err(|err| SettleError::TransactionFetch(hash, err))? - else { - continue; - }; - if tx.input.0.ends_with(&id.to_be_bytes()) && tx.from == Some(submission_address) { - return Ok(Some(tx)); - } - seen_transactions.insert(hash); - } - // It would be more correct to wait until just after the last event update run, - // but that is hard to synchronize. - tokio::time::sleep(self.network_block_interval.div_f32(2.)).await; - } - Ok(None) - } - /// Saves the competition data to the database async fn save_competition(&self, competition: &Competition) -> Result<()> { self.database.save_competition(competition).await @@ -632,10 +558,6 @@ enum RevealError { #[derive(Debug, thiserror::Error)] enum SettleError { - #[error("unexpected database error: {0}")] - Database(anyhow::Error), - #[error("error fetching transaction receipts for {0:?}: {1}")] - TransactionFetch(H256, web3::Error), #[error(transparent)] Failure(anyhow::Error), } @@ -732,8 +654,6 @@ impl Metrics { fn settle_err(driver: &Driver, err: &SettleError) { let label = match err { - SettleError::Database(_) => "internal_error", - SettleError::TransactionFetch(..) => "tx_error", SettleError::Failure(_) => "error", }; Self::get() diff --git a/crates/driver/src/boundary/mempool.rs b/crates/driver/src/boundary/mempool.rs index 9924e88075..e4a18985b2 100644 --- a/crates/driver/src/boundary/mempool.rs +++ b/crates/driver/src/boundary/mempool.rs @@ -166,7 +166,7 @@ impl Mempool { self.submitted_transactions.clone(), web3.clone(), &web3, - )?; + ); let receipt = submitter .submit( settlement.boundary.inner, diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index 8409f244e1..8e3e9d7135 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -2,7 +2,7 @@ use { self::solution::settlement, super::Mempools, crate::{ - domain::competition::solution::Settlement, + domain::{competition::solution::Settlement, eth}, infra::{ self, blockchain::Ethereum, @@ -208,21 +208,25 @@ impl Competition { .unwrap() .take() .ok_or(Error::SolutionNotAvailable)?; - self.mempools.execute(&self.solver, &settlement); - Ok(Settled { - internalized_calldata: settlement - .calldata( - self.eth.contracts().settlement(), - settlement::Internalization::Enable, - ) - .into(), - uninternalized_calldata: settlement - .calldata( - self.eth.contracts().settlement(), - settlement::Internalization::Disable, - ) - .into(), - }) + + match self.mempools.execute(&self.solver, &settlement).await { + Err(_) => Err(Error::SubmissionError), + Ok(tx_hash) => Ok(Settled { + internalized_calldata: settlement + .calldata( + self.eth.contracts().settlement(), + settlement::Internalization::Enable, + ) + .into(), + uninternalized_calldata: settlement + .calldata( + self.eth.contracts().settlement(), + settlement::Internalization::Disable, + ) + .into(), + tx_hash, + }), + } } /// The ID of the auction being competed on. @@ -264,6 +268,8 @@ pub struct Settled { /// can manually enforce certain rules which can not be enforced /// automatically. pub uninternalized_calldata: Bytes>, + /// The transaction hash in which the solution was submitted. + pub tx_hash: eth::TxId, } #[derive(Debug, thiserror::Error)] @@ -277,4 +283,6 @@ pub enum Error { DeadlineExceeded(#[from] solution::DeadlineExceeded), #[error("solver error: {0:?}")] Solver(#[from] solver::Error), + #[error("failed to submit the solution")] + SubmissionError, } diff --git a/crates/driver/src/domain/mempools.rs b/crates/driver/src/domain/mempools.rs index a1ab082fe4..43f21bd233 100644 --- a/crates/driver/src/domain/mempools.rs +++ b/crates/driver/src/domain/mempools.rs @@ -1,6 +1,6 @@ use { crate::{ - domain::competition::solution::Settlement, + domain::{competition::solution::Settlement, eth}, infra::{self, observe, solver::Solver}, }, futures::{future::select_ok, FutureExt}, @@ -21,17 +21,19 @@ impl Mempools { } } - /// Publish a settlement to the mempools. Wait until it is confirmed in the - /// background. - pub fn execute(&self, solver: &Solver, settlement: &Settlement) { + /// Publish a settlement to the mempools. + pub async fn execute( + &self, + solver: &Solver, + settlement: &Settlement, + ) -> Result { let auction_id = settlement.auction_id; let solver_name = solver.name(); - tokio::spawn(select_ok(self.0.iter().cloned().map(|mempool| { - let solver = solver.clone(); - let settlement = settlement.clone(); + + let (tx_hash, _remaining_futures) = select_ok(self.0.iter().cloned().map(|mempool| { async move { - let result = mempool.execute(&solver, settlement.clone()).await; - observe::mempool_executed(&mempool, &settlement, &result); + let result = mempool.execute(solver, settlement.clone()).await; + observe::mempool_executed(&mempool, settlement, &result); result } .instrument(tracing::info_span!( @@ -40,7 +42,11 @@ impl Mempools { ?auction_id, )) .boxed() - }))); + })) + .await + .map_err(|_| AllFailed)?; + + Ok(tx_hash) } /// Defines if the mempools are configured in a way that guarantees that @@ -70,3 +76,7 @@ pub enum RevertProtection { Enabled, Disabled, } + +#[derive(Debug, Error)] +#[error("none of the submission strategies successfully submitted the solution")] +pub struct AllFailed; diff --git a/crates/driver/src/infra/api/error.rs b/crates/driver/src/infra/api/error.rs index 6b1d8254a9..ccc62d8f7e 100644 --- a/crates/driver/src/infra/api/error.rs +++ b/crates/driver/src/infra/api/error.rs @@ -19,6 +19,7 @@ enum Kind { InvalidTokens, InvalidAmounts, QuoteSameTokens, + FailedToSubmit, } #[derive(Debug, Serialize)] @@ -49,6 +50,7 @@ impl From for (hyper::StatusCode, axum::Json) { "Invalid order specified in the auction, some orders have either a 0 remaining buy \ or sell amount" } + Kind::FailedToSubmit => "Could not submit the solution to the blockchain", }; ( hyper::StatusCode::BAD_REQUEST, @@ -79,6 +81,7 @@ impl From for (hyper::StatusCode, axum::Json) { competition::Error::SolutionNotAvailable => Kind::SolutionNotAvailable, competition::Error::DeadlineExceeded(_) => Kind::DeadlineExceeded, competition::Error::Solver(_) => Kind::SolverFailed, + competition::Error::SubmissionError => Kind::FailedToSubmit, }; error.into() } diff --git a/crates/driver/src/infra/api/routes/settle/dto/settled.rs b/crates/driver/src/infra/api/routes/settle/dto/settled.rs index 8e3b6921de..5ace54f3a9 100644 --- a/crates/driver/src/infra/api/routes/settle/dto/settled.rs +++ b/crates/driver/src/infra/api/routes/settle/dto/settled.rs @@ -5,12 +5,13 @@ use { }; impl Settled { - pub fn new(calldata: competition::Settled) -> Self { + pub fn new(settled: competition::Settled) -> Self { Self { calldata: CalldataInner { - internalized: calldata.internalized_calldata.into(), - uninternalized: calldata.uninternalized_calldata.into(), + internalized: settled.internalized_calldata.into(), + uninternalized: settled.uninternalized_calldata.into(), }, + tx_hash: settled.tx_hash.0, } } } @@ -20,6 +21,7 @@ impl Settled { #[serde(rename_all = "camelCase")] pub struct Settled { calldata: CalldataInner, + tx_hash: primitive_types::H256, } #[serde_as] diff --git a/crates/driver/src/infra/observe/mod.rs b/crates/driver/src/infra/observe/mod.rs index 176c743dad..8a00aab754 100644 --- a/crates/driver/src/infra/observe/mod.rs +++ b/crates/driver/src/infra/observe/mod.rs @@ -323,6 +323,7 @@ fn competition_error(err: &competition::Error) -> &'static str { competition::Error::Solver(solver::Error::Deserialize(_)) => "SolverDeserializeError", competition::Error::Solver(solver::Error::RepeatedSolutionIds) => "RepeatedSolutionIds", competition::Error::Solver(solver::Error::Dto(_)) => "SolverDtoError", + competition::Error::SubmissionError => "SubmissionError", } } diff --git a/crates/driver/src/tests/setup/mod.rs b/crates/driver/src/tests/setup/mod.rs index f35e401efa..4086d3b5e0 100644 --- a/crates/driver/src/tests/setup/mod.rs +++ b/crates/driver/src/tests/setup/mod.rs @@ -1027,7 +1027,7 @@ impl<'a> Settle<'a> { assert_eq!(self.status, hyper::StatusCode::OK); let result: serde_json::Value = serde_json::from_str(&self.body).unwrap(); assert!(result.is_object()); - assert_eq!(result.as_object().unwrap().len(), 1); + assert_eq!(result.as_object().unwrap().len(), 2); assert!(!result .get("calldata") .unwrap() @@ -1045,6 +1045,9 @@ impl<'a> Settle<'a> { .unwrap() .is_empty()); + let reported_tx_hash = + serde_json::from_value::(result.get("txHash").unwrap().clone()).unwrap(); + // Wait for the new block with the settlement to be mined. blockchain::wait_for_block(&self.test.blockchain.web3, self.old_block + 1).await; @@ -1061,10 +1064,12 @@ impl<'a> Settle<'a> { .await .unwrap() .unwrap(); + let input = tx.input.0; let len = input.len(); let tx_auction_id = u64::from_be_bytes((&input[len - 8..]).try_into().unwrap()); assert_eq!(tx_auction_id.to_string(), "1"); + assert_eq!(reported_tx_hash, tx.hash); // Ensure that the internalized calldata returned by the driver is equal to the // calldata published to the blockchain. diff --git a/crates/solver/src/settlement_submission.rs b/crates/solver/src/settlement_submission.rs index 4d1b8f5d66..8f64a613c0 100644 --- a/crates/solver/src/settlement_submission.rs +++ b/crates/solver/src/settlement_submission.rs @@ -311,7 +311,7 @@ impl SolutionSubmitter { strategy_args.sub_tx_pool.clone(), self.web3.clone(), self.code_fetcher.as_ref(), - )?; + ); submitter .submit(settlement, params) .await diff --git a/crates/solver/src/settlement_submission/submitter.rs b/crates/solver/src/settlement_submission/submitter.rs index b72d8995c0..39843699ea 100644 --- a/crates/solver/src/settlement_submission/submitter.rs +++ b/crates/solver/src/settlement_submission/submitter.rs @@ -203,8 +203,8 @@ impl<'a> Submitter<'a> { submitted_transactions: SubTxPoolRef, web3: Web3, code_fetcher: &'a dyn CodeFetching, - ) -> Result { - Ok(Self { + ) -> Self { + Self { contract, account, nonce, @@ -214,7 +214,7 @@ impl<'a> Submitter<'a> { submitted_transactions, web3, code_fetcher, - }) + } } } @@ -803,8 +803,7 @@ mod tests { submitted_transactions, web3.clone(), &code_fetcher, - ) - .unwrap(); + ); let params = SubmitterParams { target_confirm_time: Duration::from_secs(0),