Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await for a settlement tx in autopilot #2630

Merged
merged 13 commits into from
Apr 24, 2024
20 changes: 8 additions & 12 deletions crates/autopilot/src/database/recent_settlements.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
use {anyhow::Context, primitive_types::H256, std::ops::Range};
use {anyhow::Context, primitive_types::H256};

impl super::Postgres {
pub async fn recent_settlement_tx_hashes(
pub async fn find_tx_hash_by_auction_id(
&self,
block_range: Range<u64>,
) -> anyhow::Result<Vec<H256>> {
let start: i64 = block_range.start.try_into()?;
let end: i64 = block_range.end.try_into()?;
let block_range = start..end;

auction_id: i64,
) -> anyhow::Result<Option<H256>> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["recent_settlement_tx_hashes"])
.with_label_values(&["find_tx_hash_by_auction_id"])
.start_timer();

let mut ex = self.pool.acquire().await.context("acquire")?;
let hashes = database::settlements::recent_settlement_tx_hashes(&mut ex, block_range)
let hash = database::settlements::get_hash_by_auction_id(&mut ex, auction_id)
.await
.context("recent_settlement_tx_hashes")?;
Ok(hashes.into_iter().map(|hash| H256(hash.0)).collect())
.context("get_hash_by_auction_id")?;
Ok(hash.map(|hash| H256(hash.0)))
}
}
10 changes: 10 additions & 0 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
},
anyhow::Context,
chrono::Utc,
primitive_types::H256,
std::sync::Arc,
tracing::Instrument,
};
Expand Down Expand Up @@ -125,6 +126,15 @@ impl Persistence {

ex.commit().await.context("commit")
}

/// Retrieves the transaction hash for the settlement with the given
/// auction_id.
pub async fn find_tx_hash_by_auction_id(&self, auction_id: i64) -> Result<Option<H256>, Error> {
self.postgres
.find_tx_hash_by_auction_id(auction_id)
.await
.map_err(Error::DbError)
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
75 changes: 66 additions & 9 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl RunLoop {

tracing::info!(driver = %driver.name, "settling");
let submission_start = Instant::now();
match self.settle(driver, solution).await {
match self.settle(driver, solution, auction_id).await {
Ok(()) => Metrics::settle_ok(driver, submission_start.elapsed()),
Err(err) => {
Metrics::settle_err(driver, &err, submission_start.elapsed());
Expand Down Expand Up @@ -405,21 +405,22 @@ impl RunLoop {

/// Execute the solver's solution. Returns Ok when the corresponding
/// transaction has been mined.
async fn settle(&self, driver: &infra::Driver, solved: &Solution) -> Result<(), SettleError> {
async fn settle(
&self,
driver: &infra::Driver,
solved: &Solution,
auction_id: i64,
) -> Result<(), SettleError> {
let order_ids = solved.order_ids().copied().collect();
self.persistence
.store_order_events(order_ids, OrderEventLabel::Executing);

let request = settle::Request {
solution_id: solved.id,
};

let tx_hash = driver
.settle(&request, self.max_settlement_transaction_wait)
.await
.map_err(SettleError::Failure)?
.tx_hash;

let tx_hash = self
.wait_for_settlement(driver, auction_id, request)
.await?;
*self.in_flight_orders.lock().await = Some(InFlightOrders {
tx_hash,
orders: solved.orders.keys().copied().collect(),
Expand All @@ -429,6 +430,62 @@ impl RunLoop {
Ok(())
}

/// Wait for either the settlement transaction to be mined or the driver
/// returned a result.
async fn wait_for_settlement(
&self,
driver: &infra::Driver,
auction_id: i64,
request: settle::Request,
) -> Result<H256, SettleError> {
match futures::future::select(
Box::pin(self.wait_for_settlement_transaction(auction_id, self.submission_deadline)),
Box::pin(driver.settle(&request, self.max_settlement_transaction_wait)),
)
.await
{
futures::future::Either::Left((res, _)) => res,
futures::future::Either::Right((res, _)) => {
res.map_err(SettleError::Failure).map(|tx| tx.tx_hash)
}
}
}

/// Tries to find a `settle` contract call with calldata ending in `tag`.
///
/// Returns None if no transaction was found within the deadline or the task
/// is cancelled.
async fn wait_for_settlement_transaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to test this in e2e or unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I couldn't find a quick way to achieve that. That would require either an artificial delay in the driver's response or creating unit tests with mocks. Someone may have a better idea of how to test it.

&self,
auction_id: i64,
max_blocks_wait: u64,
) -> Result<H256, SettleError> {
let current = self.eth.current_block().borrow().number;
let deadline = current.saturating_add(max_blocks_wait);
tracing::debug!(%current, %deadline, %auction_id, "waiting for tag");
loop {
if self.eth.current_block().borrow().number > deadline {
break;
}

match self
.persistence
.find_tx_hash_by_auction_id(auction_id)
.await
{
Ok(Some(hash)) => return Ok(hash),
Err(err) => {
tracing::warn!(?err, "failed to fetch recent settlement tx hashes");
}
Ok(None) => {}
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
Err(SettleError::Failure(anyhow::anyhow!(
"settlement transaction await reached deadline"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could define a separate variant Deadline instead of anyhow error.

)))
}

/// Removes orders that are currently being settled to avoid solvers trying
/// to fill an order a second time.
async fn remove_in_flight_orders(&self, mut auction: domain::Auction) -> domain::Auction {
Expand Down
60 changes: 32 additions & 28 deletions crates/database/src/settlements.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,39 @@
use {
crate::{events::EventIndex, PgTransaction, TransactionHash},
sqlx::{Executor, PgConnection},
std::ops::Range,
};

pub async fn recent_settlement_tx_hashes(
pub async fn get_hash_by_event(
ex: &mut PgConnection,
block_range: Range<i64>,
) -> Result<Vec<TransactionHash>, sqlx::Error> {
event: &EventIndex,
) -> Result<TransactionHash, sqlx::Error> {
const QUERY: &str = r#"
SELECT tx_hash
FROM settlements
WHERE
block_number >= $1 AND
block_number < $2
block_number = $1 AND
log_index = $2
"#;
sqlx::query_scalar::<_, TransactionHash>(QUERY)
.bind(block_range.start)
.bind(block_range.end)
.fetch_all(ex)
.bind(event.block_number)
.bind(event.log_index)
.fetch_one(ex)
.await
}

pub async fn get_hash_by_event(
pub async fn get_hash_by_auction_id(
ex: &mut PgConnection,
event: &EventIndex,
) -> Result<TransactionHash, sqlx::Error> {
auction_id: i64,
) -> Result<Option<TransactionHash>, sqlx::Error> {
const QUERY: &str = r#"
SELECT tx_hash
FROM settlements
WHERE
block_number = $1 AND
log_index = $2
auction_id = $1
"#;
sqlx::query_scalar::<_, TransactionHash>(QUERY)
.bind(event.block_number)
.bind(event.log_index)
.fetch_one(ex)
.bind(auction_id)
.fetch_optional(ex)
.await
}

Expand Down Expand Up @@ -119,6 +116,15 @@ mod tests {
sqlx::Connection,
};

async fn all_settlement_tx_hashes(
ex: &mut PgConnection,
) -> Result<Vec<TransactionHash>, sqlx::Error> {
const QUERY: &str = "SELECT tx_hash FROM settlements";
sqlx::query_scalar::<_, TransactionHash>(QUERY)
.fetch_all(ex)
.await
}

#[tokio::test]
#[ignore]
async fn postgres_gets_event() {
Expand Down Expand Up @@ -164,17 +170,15 @@ mod tests {
.await
.unwrap();

let results = recent_settlement_tx_hashes(&mut db, 0..1).await.unwrap();
assert_eq!(results, &[ByteArray([0u8; 32])]);

let results = recent_settlement_tx_hashes(&mut db, 1..5).await.unwrap();
assert_eq!(results, &[ByteArray([1u8; 32]), ByteArray([2u8; 32])]);

let results = recent_settlement_tx_hashes(&mut db, 2..5).await.unwrap();
assert_eq!(results, &[ByteArray([2u8; 32])]);

let results = recent_settlement_tx_hashes(&mut db, 3..5).await.unwrap();
assert_eq!(results, &[]);
let results = all_settlement_tx_hashes(&mut db).await.unwrap();
assert_eq!(
results,
&[
ByteArray([0u8; 32]),
ByteArray([1u8; 32]),
ByteArray([2u8; 32])
]
);
}

#[tokio::test]
Expand Down
Loading