From 321f9bc73ac2fe87d79f6ae5683c57926961c432 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 17:17:55 +0000 Subject: [PATCH 01/17] Use driver directly --- .../competition/participation_guard/db.rs | 13 +++++------ .../competition/participation_guard/mod.rs | 13 +++++------ .../participation_guard/onchain.rs | 6 ++--- crates/autopilot/src/run.rs | 22 +++++-------------- crates/autopilot/src/run_loop.rs | 2 +- 5 files changed, 20 insertions(+), 36 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index f765b39418..0a8dba9f7d 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -2,10 +2,10 @@ use { crate::{ database::Postgres, domain::{eth, Metrics}, + infra::Driver, }, ethrpc::block_stream::CurrentBlockWatcher, std::{ - collections::HashSet, sync::Arc, time::{Duration, Instant}, }, @@ -21,7 +21,6 @@ struct Inner { banned_solvers: dashmap::DashMap, ttl: Duration, last_auctions_count: u32, - db_validator_accepted_solvers: HashSet, } impl Validator { @@ -31,14 +30,12 @@ impl Validator { settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, ttl: Duration, last_auctions_count: u32, - db_validator_accepted_solvers: HashSet, ) -> Self { let self_ = Self(Arc::new(Inner { db, banned_solvers: Default::default(), ttl, last_auctions_count, - db_validator_accepted_solvers, })); self_.start_maintenance(settlement_updates_receiver, current_block); @@ -94,18 +91,18 @@ impl Validator { #[async_trait::async_trait] impl super::Validator for Validator { - async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { + async fn is_allowed(&self, driver: &Driver) -> anyhow::Result { // Check if solver accepted this feature. This should be removed once a CIP is // approved. - if !self.0.db_validator_accepted_solvers.contains(solver) { + if !driver.accepts_unsettled_blocking { return Ok(true); } - if let Some(entry) = self.0.banned_solvers.get(solver) { + if let Some(entry) = self.0.banned_solvers.get(&driver.submission_address) { if Instant::now().duration_since(*entry.value()) < self.0.ttl { return Ok(false); } else { - self.0.banned_solvers.remove(solver); + self.0.banned_solvers.remove(&driver.submission_address); } } diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index d0127e68d5..7b4bf53157 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -5,10 +5,9 @@ use { crate::{ arguments::DbBasedSolverParticipationGuardConfig, database::Postgres, - domain::eth, - infra::Ethereum, + infra::{Driver, Ethereum}, }, - std::{collections::HashSet, sync::Arc}, + std::sync::Arc, }; /// This struct checks whether a solver can participate in the competition by @@ -27,7 +26,6 @@ impl SolverParticipationGuard { db: Postgres, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, - db_validator_accepted_solvers: HashSet, ) -> Self { let mut validators: Vec> = Vec::new(); @@ -39,7 +37,6 @@ impl SolverParticipationGuard { settlement_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, db_based_validator_config.solver_last_auctions_participation_count, - db_validator_accepted_solvers, ); validators.push(Box::new(database_solver_participation_validator)); } @@ -55,9 +52,9 @@ impl SolverParticipationGuard { /// the following order: /// 1. DB-based validator: operates fast since it uses in-memory cache. /// 2. Onchain-based validator: only then calls the Authenticator contract. - pub async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { + pub async fn can_participate(&self, driver: &Driver) -> anyhow::Result { for validator in &self.0.validators { - if !validator.is_allowed(solver).await? { + if !validator.is_allowed(driver).await? { return Ok(false); } } @@ -68,5 +65,5 @@ impl SolverParticipationGuard { #[async_trait::async_trait] trait Validator: Send + Sync { - async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result; + async fn is_allowed(&self, driver: &Driver) -> anyhow::Result; } diff --git a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs index 82d0ef3fb7..3e472b298e 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs @@ -1,4 +1,4 @@ -use crate::{domain::eth, infra::Ethereum}; +use crate::infra::{Driver, Ethereum}; /// Calls Authenticator contract to check if a solver has a sufficient /// permission. @@ -8,12 +8,12 @@ pub(super) struct Validator { #[async_trait::async_trait] impl super::Validator for Validator { - async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { + async fn is_allowed(&self, solver: &Driver) -> anyhow::Result { Ok(self .eth .contracts() .authenticator() - .is_solver(solver.0) + .is_solver(solver.submission_address.0) .call() .await?) } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 8c8f2b7834..2f426a1967 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -52,7 +52,6 @@ use { token_list::{AutoUpdatingTokenList, TokenListConfiguration}, }, std::{ - collections::HashSet, sync::{Arc, RwLock}, time::{Duration, Instant}, }, @@ -369,6 +368,12 @@ pub async fn run(args: Arguments) { let (settlement_updates_sender, settlement_updates_receiver) = tokio::sync::mpsc::unbounded_channel(); + let solver_participation_guard = SolverParticipationGuard::new( + eth.clone(), + db.clone(), + settlement_updates_receiver, + args.db_based_solver_participation_guard, + ); let persistence = infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; let settlement_observer = crate::domain::settlement::Observer::new( @@ -578,21 +583,6 @@ pub async fn run(args: Arguments) { .into_iter() .collect(); - let solver_participation_guard = SolverParticipationGuard::new( - eth.clone(), - db.clone(), - settlement_updates_receiver, - args.db_based_solver_participation_guard, - drivers - .iter() - .filter_map(|driver| { - driver - .accepts_unsettled_blocking - .then_some(driver.submission_address) - }) - .collect::>(), - ); - let run = RunLoop::new( run_loop_config, eth, diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 492ca18751..4102f675ea 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -736,7 +736,7 @@ impl RunLoop { request: &solve::Request, ) -> Result>, SolveError> { - let can_participate = self.solver_participation_guard.can_participate(&driver.submission_address).await.map_err(|err| { + let can_participate = self.solver_participation_guard.can_participate(driver).await.map_err(|err| { tracing::error!(?err, driver = %driver.name, ?driver.submission_address, "solver participation check failed"); SolveError::SolverDenyListed } From f69e1747ad5150033f0bb94443df4550fb898b70 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 19:05:01 +0000 Subject: [PATCH 02/17] Notify external solvers --- .../competition/participation_guard/db.rs | 47 +++++++++++++++++-- .../competition/participation_guard/mod.rs | 13 +++-- .../participation_guard/onchain.rs | 6 +-- crates/autopilot/src/infra/solvers/dto/mod.rs | 1 + .../autopilot/src/infra/solvers/dto/notify.rs | 9 ++++ crates/autopilot/src/infra/solvers/mod.rs | 6 ++- crates/autopilot/src/run.rs | 18 ++++--- 7 files changed, 80 insertions(+), 20 deletions(-) create mode 100644 crates/autopilot/src/infra/solvers/dto/notify.rs diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 0a8dba9f7d..6c50b96553 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -2,10 +2,12 @@ use { crate::{ database::Postgres, domain::{eth, Metrics}, - infra::Driver, + infra, }, ethrpc::block_stream::CurrentBlockWatcher, + futures::future::join_all, std::{ + collections::HashMap, sync::Arc, time::{Duration, Instant}, }, @@ -21,6 +23,7 @@ struct Inner { banned_solvers: dashmap::DashMap, ttl: Duration, last_auctions_count: u32, + drivers_by_address: HashMap>, } impl Validator { @@ -30,12 +33,14 @@ impl Validator { settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, ttl: Duration, last_auctions_count: u32, + drivers_by_address: HashMap>, ) -> Self { let self_ = Self(Arc::new(Inner { db, banned_solvers: Default::default(), ttl, last_auctions_count, + drivers_by_address, })); self_.start_maintenance(settlement_updates_receiver, current_block); @@ -50,13 +55,14 @@ impl Validator { mut settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, current_block: CurrentBlockWatcher, ) { - let self_ = self.0.clone(); + let self_ = self.clone(); tokio::spawn(async move { while settlement_updates_receiver.recv().await.is_some() { let current_block = current_block.borrow().number; match self_ + .0 .db - .find_non_settling_solvers(self_.last_auctions_count, current_block) + .find_non_settling_solvers(self_.0.last_auctions_count, current_block) .await { Ok(non_settling_solvers) => { @@ -74,10 +80,11 @@ impl Validator { .collect::>(); tracing::debug!(?non_settling_solvers, "found non-settling solvers"); + self_.notify_solvers(&non_settling_solvers); let now = Instant::now(); for solver in non_settling_solvers { - self_.banned_solvers.insert(solver, now); + self_.0.banned_solvers.insert(solver, now); } } Err(err) => { @@ -87,11 +94,41 @@ impl Validator { } }); } + + /// Try to notify all the non-settling external solvers. + fn notify_solvers(&self, non_settling_solvers: &[eth::Address]) { + let futures = non_settling_solvers + .iter() + .cloned() + .map(|solver| { + let self_ = self.0.clone(); + async move { + match self_.drivers_by_address.get(&solver) { + Some(driver) => { + if let Err(err) = driver + .notify(&infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions) + .await + { + tracing::debug!(?solver, ?err, "unable to notify external solver"); + } + } + None => { + tracing::error!(?solver, "found unrecognized non-settling driver"); + } + } + } + } + ).collect::>(); + + tokio::spawn(async move { + join_all(futures).await; + }); + } } #[async_trait::async_trait] impl super::Validator for Validator { - async fn is_allowed(&self, driver: &Driver) -> anyhow::Result { + async fn is_allowed(&self, driver: &infra::Driver) -> anyhow::Result { // Check if solver accepted this feature. This should be removed once a CIP is // approved. if !driver.accepts_unsettled_blocking { diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index 7b4bf53157..5564cc07f6 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -5,9 +5,10 @@ use { crate::{ arguments::DbBasedSolverParticipationGuardConfig, database::Postgres, - infra::{Driver, Ethereum}, + domain::eth, + infra, }, - std::sync::Arc, + std::{collections::HashMap, sync::Arc}, }; /// This struct checks whether a solver can participate in the competition by @@ -22,10 +23,11 @@ struct Inner { impl SolverParticipationGuard { pub fn new( - eth: Ethereum, + eth: infra::Ethereum, db: Postgres, settlement_updates_receiver: tokio::sync::mpsc::UnboundedReceiver<()>, db_based_validator_config: DbBasedSolverParticipationGuardConfig, + drivers_by_address: HashMap>, ) -> Self { let mut validators: Vec> = Vec::new(); @@ -37,6 +39,7 @@ impl SolverParticipationGuard { settlement_updates_receiver, db_based_validator_config.solver_blacklist_cache_ttl, db_based_validator_config.solver_last_auctions_participation_count, + drivers_by_address, ); validators.push(Box::new(database_solver_participation_validator)); } @@ -52,7 +55,7 @@ impl SolverParticipationGuard { /// the following order: /// 1. DB-based validator: operates fast since it uses in-memory cache. /// 2. Onchain-based validator: only then calls the Authenticator contract. - pub async fn can_participate(&self, driver: &Driver) -> anyhow::Result { + pub async fn can_participate(&self, driver: &infra::Driver) -> anyhow::Result { for validator in &self.0.validators { if !validator.is_allowed(driver).await? { return Ok(false); @@ -65,5 +68,5 @@ impl SolverParticipationGuard { #[async_trait::async_trait] trait Validator: Send + Sync { - async fn is_allowed(&self, driver: &Driver) -> anyhow::Result; + async fn is_allowed(&self, driver: &infra::Driver) -> anyhow::Result; } diff --git a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs index 3e472b298e..d1327f0254 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs @@ -1,14 +1,14 @@ -use crate::infra::{Driver, Ethereum}; +use crate::infra; /// Calls Authenticator contract to check if a solver has a sufficient /// permission. pub(super) struct Validator { - pub eth: Ethereum, + pub eth: infra::Ethereum, } #[async_trait::async_trait] impl super::Validator for Validator { - async fn is_allowed(&self, solver: &Driver) -> anyhow::Result { + async fn is_allowed(&self, solver: &infra::Driver) -> anyhow::Result { Ok(self .eth .contracts() diff --git a/crates/autopilot/src/infra/solvers/dto/mod.rs b/crates/autopilot/src/infra/solvers/dto/mod.rs index d3a156294c..5365700ce8 100644 --- a/crates/autopilot/src/infra/solvers/dto/mod.rs +++ b/crates/autopilot/src/infra/solvers/dto/mod.rs @@ -1,6 +1,7 @@ //! Types for communicating with drivers as defined in //! `crates/driver/openapi.yml`. +pub mod notify; pub mod reveal; pub mod settle; pub mod solve; diff --git a/crates/autopilot/src/infra/solvers/dto/notify.rs b/crates/autopilot/src/infra/solvers/dto/notify.rs new file mode 100644 index 0000000000..08165964fd --- /dev/null +++ b/crates/autopilot/src/infra/solvers/dto/notify.rs @@ -0,0 +1,9 @@ +use {serde::Serialize, serde_with::serde_as}; + +#[serde_as] +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum Request { + /// The driver won multiple consecutive auctions but never settled them. + UnsettledConsecutiveAuctions, +} diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index aae623ebaa..2490a71b15 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -1,6 +1,6 @@ use { self::dto::{reveal, settle, solve}, - crate::{arguments::Account, domain::eth, util}, + crate::{arguments::Account, domain::eth, infra::solvers::dto::notify, util}, anyhow::{anyhow, Context, Result}, reqwest::{Client, StatusCode}, std::time::Duration, @@ -116,6 +116,10 @@ impl Driver { Ok(()) } + pub async fn notify(&self, request: ¬ify::Request) -> Result<()> { + self.request_response("notify", request, None).await + } + async fn request_response( &self, path: &str, diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 2f426a1967..a6e30dd562 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -52,6 +52,7 @@ use { token_list::{AutoUpdatingTokenList, TokenListConfiguration}, }, std::{ + collections::HashMap, sync::{Arc, RwLock}, time::{Duration, Instant}, }, @@ -368,12 +369,6 @@ pub async fn run(args: Arguments) { let (settlement_updates_sender, settlement_updates_receiver) = tokio::sync::mpsc::unbounded_channel(); - let solver_participation_guard = SolverParticipationGuard::new( - eth.clone(), - db.clone(), - settlement_updates_receiver, - args.db_based_solver_participation_guard, - ); let persistence = infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db.clone())).await; let settlement_observer = crate::domain::settlement::Observer::new( @@ -583,6 +578,17 @@ pub async fn run(args: Arguments) { .into_iter() .collect(); + let solver_participation_guard = SolverParticipationGuard::new( + eth.clone(), + db.clone(), + settlement_updates_receiver, + args.db_based_solver_participation_guard, + drivers + .iter() + .map(|driver| (driver.submission_address, driver.clone())) + .collect::>(), + ); + let run = RunLoop::new( run_loop_config, eth, From 1504c48bb75ad9dd1c65442d43ac36b0f192ff33 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 20:11:23 +0000 Subject: [PATCH 03/17] /notify driver endpoint --- crates/driver/src/infra/api/error.rs | 9 +++ crates/driver/src/infra/api/mod.rs | 1 + crates/driver/src/infra/api/routes/mod.rs | 2 + .../src/infra/api/routes/notify/dto/mod.rs | 3 + .../api/routes/notify/dto/notify_request.rs | 25 ++++++++ .../driver/src/infra/api/routes/notify/mod.rs | 23 +++++++ crates/driver/src/infra/notify/mod.rs | 16 ++--- .../driver/src/infra/notify/notification.rs | 3 + .../src/infra/solver/dto/notification.rs | 2 + crates/driver/src/infra/solver/mod.rs | 64 ++++++++++++++++--- 10 files changed, 130 insertions(+), 18 deletions(-) create mode 100644 crates/driver/src/infra/api/routes/notify/dto/mod.rs create mode 100644 crates/driver/src/infra/api/routes/notify/dto/notify_request.rs create mode 100644 crates/driver/src/infra/api/routes/notify/mod.rs diff --git a/crates/driver/src/infra/api/error.rs b/crates/driver/src/infra/api/error.rs index e788bbe5bf..e95768c616 100644 --- a/crates/driver/src/infra/api/error.rs +++ b/crates/driver/src/infra/api/error.rs @@ -112,3 +112,12 @@ impl From for (hyper::StatusCode, axum::Json) { error.into() } } + +impl From for (hyper::StatusCode, axum::Json) { + fn from(value: api::routes::NotifyError) -> Self { + let error = match value { + api::routes::NotifyError::UnableToNotify => Kind::Unknown, + }; + error.into() + } +} diff --git a/crates/driver/src/infra/api/mod.rs b/crates/driver/src/infra/api/mod.rs index dfbc33fa97..d153b34979 100644 --- a/crates/driver/src/infra/api/mod.rs +++ b/crates/driver/src/infra/api/mod.rs @@ -78,6 +78,7 @@ impl Api { let router = routes::solve(router); let router = routes::reveal(router); let router = routes::settle(router); + let router = routes::notify(router); let bad_token_config = solver.bad_token_detection(); let mut bad_tokens = diff --git a/crates/driver/src/infra/api/routes/mod.rs b/crates/driver/src/infra/api/routes/mod.rs index ee1027b0e9..d4ed5e12e3 100644 --- a/crates/driver/src/infra/api/routes/mod.rs +++ b/crates/driver/src/infra/api/routes/mod.rs @@ -1,6 +1,7 @@ mod healthz; mod info; mod metrics; +mod notify; mod quote; mod reveal; mod settle; @@ -10,6 +11,7 @@ pub(super) use { healthz::healthz, info::info, metrics::metrics, + notify::{notify, NotifyError}, quote::{quote, OrderError}, reveal::reveal, settle::settle, diff --git a/crates/driver/src/infra/api/routes/notify/dto/mod.rs b/crates/driver/src/infra/api/routes/notify/dto/mod.rs new file mode 100644 index 0000000000..3f99a96ff7 --- /dev/null +++ b/crates/driver/src/infra/api/routes/notify/dto/mod.rs @@ -0,0 +1,3 @@ +mod notify_request; + +pub use notify_request::{Error as NotifyError, NotifyRequest}; diff --git a/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs b/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs new file mode 100644 index 0000000000..22be6cbb00 --- /dev/null +++ b/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs @@ -0,0 +1,25 @@ +use {crate::infra::notify, serde::Deserialize, serde_with::serde_as}; + +#[serde_as] +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum NotifyRequest { + /// The driver won multiple consecutive auctions but never settled them. + UnsettledConsecutiveAuctions, +} + +impl From for notify::Kind { + fn from(value: NotifyRequest) -> Self { + match value { + NotifyRequest::UnsettledConsecutiveAuctions => { + notify::Kind::UnsettledConsecutiveAuctions + } + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Unable to notify solver")] + UnableToNotify, +} diff --git a/crates/driver/src/infra/api/routes/notify/mod.rs b/crates/driver/src/infra/api/routes/notify/mod.rs new file mode 100644 index 0000000000..7683cc499b --- /dev/null +++ b/crates/driver/src/infra/api/routes/notify/mod.rs @@ -0,0 +1,23 @@ +mod dto; + +pub use dto::NotifyError; + +use crate::infra::api::{Error, State}; + +pub(in crate::infra::api) fn notify(router: axum::Router) -> axum::Router { + router.route("/notify", axum::routing::post(route)) +} + +async fn route( + state: axum::extract::State, + req: axum::Json, +) -> Result)> { + let solver = &state.solver().name().0; + tracing::trace!(?req, ?solver, "received a notification"); + state + .solver() + .notify(None, None, req.0.into()) + .await + .map(|_| hyper::StatusCode::OK) + .map_err(|_| NotifyError::UnableToNotify.into()) +} diff --git a/crates/driver/src/infra/notify/mod.rs b/crates/driver/src/infra/notify/mod.rs index edb4c55458..72446e96bf 100644 --- a/crates/driver/src/infra/notify/mod.rs +++ b/crates/driver/src/infra/notify/mod.rs @@ -12,11 +12,11 @@ use { }; pub fn solver_timeout(solver: &Solver, auction_id: Option) { - solver.notify(auction_id, None, notification::Kind::Timeout); + solver.notify_and_forget(auction_id, None, notification::Kind::Timeout); } pub fn empty_solution(solver: &Solver, auction_id: Option, solution: solution::Id) { - solver.notify( + solver.notify_and_forget( auction_id, Some(solution), notification::Kind::EmptySolution, @@ -45,7 +45,7 @@ pub fn scoring_failed( } }; - solver.notify(auction_id, Some(solution_id.clone()), notification); + solver.notify_and_forget(auction_id, Some(solution_id.clone()), notification); } pub fn encoding_failed( @@ -76,7 +76,7 @@ pub fn encoding_failed( solution::Error::Encoding(_) => return, }; - solver.notify(auction_id, Some(solution_id.clone()), notification); + solver.notify_and_forget(auction_id, Some(solution_id.clone()), notification); } pub fn simulation_failed( @@ -94,7 +94,7 @@ pub fn simulation_failed( ), simulator::Error::Other(error) => notification::Kind::DriverError(error.to_string()), }; - solver.notify(auction_id, Some(solution_id.clone()), kind); + solver.notify_and_forget(auction_id, Some(solution_id.clone()), kind); } pub fn executed( @@ -111,7 +111,7 @@ pub fn executed( Err(Error::Other(_) | Error::Disabled) => notification::Settlement::Fail, }; - solver.notify( + solver.notify_and_forget( Some(auction_id), Some(solution_id.clone()), notification::Kind::Settled(kind), @@ -123,7 +123,7 @@ pub fn duplicated_solution_id( auction_id: Option, solution_id: &solution::Id, ) { - solver.notify( + solver.notify_and_forget( auction_id, Some(solution_id.clone()), notification::Kind::DuplicatedSolutionId, @@ -131,5 +131,5 @@ pub fn duplicated_solution_id( } pub fn postprocessing_timed_out(solver: &Solver, auction_id: Option) { - solver.notify(auction_id, None, notification::Kind::PostprocessingTimedOut); + solver.notify_and_forget(auction_id, None, notification::Kind::PostprocessingTimedOut); } diff --git a/crates/driver/src/infra/notify/notification.rs b/crates/driver/src/infra/notify/notification.rs index 0a178e9af5..ca123bb5e8 100644 --- a/crates/driver/src/infra/notify/notification.rs +++ b/crates/driver/src/infra/notify/notification.rs @@ -45,6 +45,9 @@ pub enum Kind { DriverError(String), /// On-chain solution postprocessing timed out. PostprocessingTimedOut, + /// The solver won multiple consecutive auctions but none of the settlement + /// succeeded. + UnsettledConsecutiveAuctions, } #[derive(Debug)] diff --git a/crates/driver/src/infra/solver/dto/notification.rs b/crates/driver/src/infra/solver/dto/notification.rs index 08969c5c35..2c70f50f5a 100644 --- a/crates/driver/src/infra/solver/dto/notification.rs +++ b/crates/driver/src/infra/solver/dto/notification.rs @@ -61,6 +61,7 @@ impl Notification { notify::Settlement::Expired => Kind::Expired, }, notify::Kind::PostprocessingTimedOut => Kind::PostprocessingTimedOut, + notify::Kind::UnsettledConsecutiveAuctions => Kind::UnsettledConsecutiveAuctions, }, } } @@ -144,6 +145,7 @@ pub enum Kind { Expired, Fail, PostprocessingTimedOut, + UnsettledConsecutiveAuctions, } type BlockNo = u64; diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index 3e63af9e86..6a6b3f8386 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -260,27 +260,71 @@ impl Solver { } /// Make a fire and forget POST request to notify the solver about an event. - pub fn notify( + pub fn notify_and_forget( &self, auction_id: Option, solution_id: Option, kind: notify::Kind, ) { + let base_url = self.config.endpoint.clone(); + let client = self.client.clone(); + let response_limit = self.config.response_size_limit_max_bytes; + let future = async move { + if let Err(error) = Self::notify_( + base_url, + client, + response_limit, + auction_id, + solution_id, + kind, + ) + .await + { + tracing::warn!(?error, "failed to notify solver"); + } + }; + + tokio::task::spawn(future.in_current_span()); + } + + pub async fn notify( + &self, + auction_id: Option, + solution_id: Option, + kind: notify::Kind, + ) -> Result<(), crate::util::http::Error> { + let base_url = self.config.endpoint.clone(); + let client = self.client.clone(); + + Self::notify_( + base_url, + client, + self.config.response_size_limit_max_bytes, + auction_id, + solution_id, + kind, + ) + .await + } + + async fn notify_( + base_url: url::Url, + client: reqwest::Client, + response_limit: usize, + auction_id: Option, + solution_id: Option, + kind: notify::Kind, + ) -> Result<(), crate::util::http::Error> { let body = serde_json::to_string(&dto::Notification::new(auction_id, solution_id, kind)).unwrap(); - let url = shared::url::join(&self.config.endpoint, "notify"); + let url = shared::url::join(&base_url, "notify"); super::observe::solver_request(&url, &body); - let mut req = self.client.post(url).body(body); + let mut req = client.post(url).body(body); if let Some(id) = observe::request_id::from_current_span() { req = req.header("X-REQUEST-ID", id); } - let response_size = self.config.response_size_limit_max_bytes; - let future = async move { - if let Err(error) = util::http::send(response_size, req).await { - tracing::warn!(?error, "failed to notify solver"); - } - }; - tokio::task::spawn(future.in_current_span()); + + util::http::send(response_limit, req).await.map(|_| ()) } } From 9f0cbc8462a8110f84c1371dcd7d1ded3b18950a Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 20:24:11 +0000 Subject: [PATCH 04/17] OpenApi --- crates/driver/openapi.yml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/crates/driver/openapi.yml b/crates/driver/openapi.yml index a8d43d199f..0383a35130 100644 --- a/crates/driver/openapi.yml +++ b/crates/driver/openapi.yml @@ -138,6 +138,33 @@ paths: $ref: "#/components/responses/BadRequest" "500": $ref: "#/components/responses/InternalServerError" + /notify: + post: + description: | + Receive a notification in case the driver was banned for a certain reason. + requestBody: + required: true + content: + application/json: + schema: + type: string + enum: + - unsettledConsecutiveAuctions + description: |- + A notification that informs about a reasoning why the driver was banned. + responses: + "200": + description: notification successfully received. + "500": + description: Unable to notify solver. + content: + application/json: + schema: + type: object + properties: + error: + type: string + example: "Unable to notify solver" components: schemas: Address: From 7f86000b61b69c513926b2a97d8f8fbceb3bc9e8 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 20:24:40 +0000 Subject: [PATCH 05/17] Nit --- .../autopilot/src/domain/competition/participation_guard/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 6c50b96553..969674bc55 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -95,7 +95,7 @@ impl Validator { }); } - /// Try to notify all the non-settling external solvers. + /// Try to notify all the non-settling solvers. fn notify_solvers(&self, non_settling_solvers: &[eth::Address]) { let futures = non_settling_solvers .iter() From d2330942e1a2c7e196f96d64747b960450d73830 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Jan 2025 20:35:38 +0000 Subject: [PATCH 06/17] Solver's OpenAPI --- crates/solvers/openapi.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/solvers/openapi.yml b/crates/solvers/openapi.yml index af9562efd2..135f47cfaa 100644 --- a/crates/solvers/openapi.yml +++ b/crates/solvers/openapi.yml @@ -89,6 +89,7 @@ paths: - cancelled - fail - postprocessingTimedOut + - unsettledConsecutiveAuctions responses: "200": description: notification successfully received. From 105b9a7bc90b8f43390ee221e6660c1fd5626e04 Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 08:52:59 +0000 Subject: [PATCH 07/17] Store drivers accepted for the feature --- .../competition/participation_guard/db.rs | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 969674bc55..64d7ea6234 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -35,12 +35,16 @@ impl Validator { last_auctions_count: u32, drivers_by_address: HashMap>, ) -> Self { + let accepted_drivers_by_address = drivers_by_address + .into_iter() + .filter(|(_, driver)| driver.accepts_unsettled_blocking) + .collect::>(); let self_ = Self(Arc::new(Inner { db, banned_solvers: Default::default(), ttl, last_auctions_count, - drivers_by_address, + drivers_by_address: accepted_drivers_by_address, })); self_.start_maintenance(settlement_updates_receiver, current_block); @@ -83,9 +87,14 @@ impl Validator { self_.notify_solvers(&non_settling_solvers); let now = Instant::now(); - for solver in non_settling_solvers { - self_.0.banned_solvers.insert(solver, now); - } + non_settling_solvers + .into_iter() + // Check if solver accepted this feature. This should be removed once a CIP is + // approved. + .filter(|solver| self_.0.drivers_by_address.contains_key(solver)) + .for_each(|solver| { + self_.0.banned_solvers.insert(solver, now); + }); } Err(err) => { tracing::warn!(?err, "error while searching for non-settling solvers") @@ -103,22 +112,19 @@ impl Validator { .map(|solver| { let self_ = self.0.clone(); async move { - match self_.drivers_by_address.get(&solver) { - Some(driver) => { - if let Err(err) = driver - .notify(&infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions) - .await - { - tracing::debug!(?solver, ?err, "unable to notify external solver"); - } - } - None => { - tracing::error!(?solver, "found unrecognized non-settling driver"); + if let Some(driver) = self_.drivers_by_address.get(&solver) { + if let Err(err) = driver + .notify( + &infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions, + ) + .await + { + tracing::debug!(?solver, ?err, "unable to notify external solver"); } } } - } - ).collect::>(); + }) + .collect::>(); tokio::spawn(async move { join_all(futures).await; @@ -129,12 +135,6 @@ impl Validator { #[async_trait::async_trait] impl super::Validator for Validator { async fn is_allowed(&self, driver: &infra::Driver) -> anyhow::Result { - // Check if solver accepted this feature. This should be removed once a CIP is - // approved. - if !driver.accepts_unsettled_blocking { - return Ok(true); - } - if let Some(entry) = self.0.banned_solvers.get(&driver.submission_address) { if Instant::now().duration_since(*entry.value()) < self.0.ttl { return Ok(false); From 08924daab49dfeca85c4bd1b7f961361ef713e00 Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 08:53:48 +0000 Subject: [PATCH 08/17] Comment --- .../autopilot/src/domain/competition/participation_guard/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 64d7ea6234..bf5af37a20 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -104,7 +104,7 @@ impl Validator { }); } - /// Try to notify all the non-settling solvers. + /// Try to notify all the non-settling solvers in a background task. fn notify_solvers(&self, non_settling_solvers: &[eth::Address]) { let futures = non_settling_solvers .iter() From e37087170b373b17543e1605a50afd202e4a93ad Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 15:25:18 +0000 Subject: [PATCH 09/17] Fix after merge --- .../src/domain/competition/participation_guard/db.rs | 2 +- .../src/domain/competition/participation_guard/mod.rs | 6 +++--- .../src/domain/competition/participation_guard/onchain.rs | 6 +++--- crates/autopilot/src/run_loop.rs | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 6a0ae32466..21f7a7d846 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -141,7 +141,7 @@ impl super::Validator for Validator { if Instant::now().duration_since(*entry.value()) < self.0.ttl { return Ok(false); } else { - self.0.banned_solvers.remove(&driver.submission_address); + self.0.banned_solvers.remove(solver); } } diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index 5564cc07f6..eb252f94a1 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -55,9 +55,9 @@ impl SolverParticipationGuard { /// the following order: /// 1. DB-based validator: operates fast since it uses in-memory cache. /// 2. Onchain-based validator: only then calls the Authenticator contract. - pub async fn can_participate(&self, driver: &infra::Driver) -> anyhow::Result { + pub async fn can_participate(&self, solver: ð::Address) -> anyhow::Result { for validator in &self.0.validators { - if !validator.is_allowed(driver).await? { + if !validator.is_allowed(solver).await? { return Ok(false); } } @@ -68,5 +68,5 @@ impl SolverParticipationGuard { #[async_trait::async_trait] trait Validator: Send + Sync { - async fn is_allowed(&self, driver: &infra::Driver) -> anyhow::Result; + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result; } diff --git a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs index d1327f0254..598da9cf26 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/onchain.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/onchain.rs @@ -1,4 +1,4 @@ -use crate::infra; +use crate::{domain::eth, infra}; /// Calls Authenticator contract to check if a solver has a sufficient /// permission. @@ -8,12 +8,12 @@ pub(super) struct Validator { #[async_trait::async_trait] impl super::Validator for Validator { - async fn is_allowed(&self, solver: &infra::Driver) -> anyhow::Result { + async fn is_allowed(&self, solver: ð::Address) -> anyhow::Result { Ok(self .eth .contracts() .authenticator() - .is_solver(solver.submission_address.0) + .is_solver(solver.0) .call() .await?) } diff --git a/crates/autopilot/src/run_loop.rs b/crates/autopilot/src/run_loop.rs index 4102f675ea..492ca18751 100644 --- a/crates/autopilot/src/run_loop.rs +++ b/crates/autopilot/src/run_loop.rs @@ -736,7 +736,7 @@ impl RunLoop { request: &solve::Request, ) -> Result>, SolveError> { - let can_participate = self.solver_participation_guard.can_participate(driver).await.map_err(|err| { + let can_participate = self.solver_participation_guard.can_participate(&driver.submission_address).await.map_err(|err| { tracing::error!(?err, driver = %driver.name, ?driver.submission_address, "solver participation check failed"); SolveError::SolverDenyListed } From 921692f5eab25efe9bacb430839883b62f7bf3f7 Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 16:28:08 +0000 Subject: [PATCH 10/17] Notify only accepted solvers --- .../src/domain/competition/participation_guard/db.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 9b6c05fc48..398802fddd 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -123,7 +123,11 @@ impl Validator { .map(|solver| { let self_ = self.0.clone(); async move { - if let Some(driver) = self_.drivers_by_address.get(&solver) { + if let Some(driver) = self_ + .drivers_by_address + .get(&solver) + .filter(|driver| driver.accepts_unsettled_blocking) + { if let Err(err) = driver .notify( &infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions, From f3b64155f584354a76b0ee4a193007ee6d1226df Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 16:53:40 +0000 Subject: [PATCH 11/17] Minor refactoring --- .../competition/participation_guard/db.rs | 48 +++++++++---------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 398802fddd..6af1239d4d 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -89,23 +89,30 @@ impl Validator { .collect::>(); tracing::debug!(?non_settling_solvers, "found non-settling solvers"); - self_.notify_solvers(&non_settling_solvers); - let now = Instant::now(); - non_settling_solvers + let drivers = non_settling_solvers .into_iter() - // Check if solver accepted this feature. This should be removed once a CIP is - // approved. .filter_map(|solver| { self_ .0 .drivers_by_address .get(&solver) - .filter(|driver| driver.accepts_unsettled_blocking).map(|_| solver) + // Check if solver accepted this feature. This should be removed once a CIP is + // approved. + .filter(|driver| driver.accepts_unsettled_blocking) + .cloned() }) - .for_each(|solver| { - self_.0.banned_solvers.insert(solver, now); - }); + .collect::>(); + + Self::notify_solvers(&drivers); + + let now = Instant::now(); + for driver in drivers { + self_ + .0 + .banned_solvers + .insert(driver.submission_address, now); + } } Err(err) => { tracing::warn!(?err, "error while searching for non-settling solvers") @@ -116,26 +123,17 @@ impl Validator { } /// Try to notify all the non-settling solvers in a background task. - fn notify_solvers(&self, non_settling_solvers: &[eth::Address]) { - let futures = non_settling_solvers + fn notify_solvers(non_settling_drivers: &[Arc]) { + let futures = non_settling_drivers .iter() .cloned() - .map(|solver| { - let self_ = self.0.clone(); + .map(|driver| { async move { - if let Some(driver) = self_ - .drivers_by_address - .get(&solver) - .filter(|driver| driver.accepts_unsettled_blocking) + if let Err(err) = driver + .notify(&infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions) + .await { - if let Err(err) = driver - .notify( - &infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions, - ) - .await - { - tracing::debug!(?solver, ?err, "unable to notify external solver"); - } + tracing::debug!(solver = ?driver.name, ?err, "unable to notify external solver"); } } }) From 43e650f18bd59826eb99190aec464698809e42e4 Mon Sep 17 00:00:00 2001 From: ilya Date: Fri, 31 Jan 2025 17:40:12 +0000 Subject: [PATCH 12/17] Banned notification --- .../infra/api/routes/notify/dto/notify_request.rs | 2 +- crates/driver/src/infra/notify/mod.rs | 9 ++++++++- crates/driver/src/infra/notify/notification.rs | 11 ++++++++--- crates/driver/src/infra/solver/dto/notification.rs | 13 ++++++++++++- crates/solvers/openapi.yml | 2 +- 5 files changed, 30 insertions(+), 7 deletions(-) diff --git a/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs b/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs index 22be6cbb00..16afa72814 100644 --- a/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs +++ b/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs @@ -12,7 +12,7 @@ impl From for notify::Kind { fn from(value: NotifyRequest) -> Self { match value { NotifyRequest::UnsettledConsecutiveAuctions => { - notify::Kind::UnsettledConsecutiveAuctions + notify::Kind::Banned(notify::BanReason::UnsettledConsecutiveAuctions) } } } diff --git a/crates/driver/src/infra/notify/mod.rs b/crates/driver/src/infra/notify/mod.rs index 72446e96bf..ae18f26aa6 100644 --- a/crates/driver/src/infra/notify/mod.rs +++ b/crates/driver/src/infra/notify/mod.rs @@ -5,7 +5,14 @@ use { mod notification; -pub use notification::{Kind, Notification, ScoreKind, Settlement, SimulationSucceededAtLeastOnce}; +pub use notification::{ + BanReason, + Kind, + Notification, + ScoreKind, + Settlement, + SimulationSucceededAtLeastOnce, +}; use { super::simulator, crate::domain::{eth, mempools::Error}, diff --git a/crates/driver/src/infra/notify/notification.rs b/crates/driver/src/infra/notify/notification.rs index ca123bb5e8..5e6323dbdf 100644 --- a/crates/driver/src/infra/notify/notification.rs +++ b/crates/driver/src/infra/notify/notification.rs @@ -45,9 +45,8 @@ pub enum Kind { DriverError(String), /// On-chain solution postprocessing timed out. PostprocessingTimedOut, - /// The solver won multiple consecutive auctions but none of the settlement - /// succeeded. - UnsettledConsecutiveAuctions, + /// The solver has been banned for a specific reason. + Banned(BanReason), } #[derive(Debug)] @@ -61,6 +60,12 @@ pub enum ScoreKind { MissingPrice(TokenAddress), } +#[derive(Debug)] +pub enum BanReason { + /// The driver won multiple consecutive auctions but never settled them. + UnsettledConsecutiveAuctions, +} + #[derive(Debug)] pub enum Settlement { /// Winning solver settled successfully transaction onchain. diff --git a/crates/driver/src/infra/solver/dto/notification.rs b/crates/driver/src/infra/solver/dto/notification.rs index 2c70f50f5a..a352c712b1 100644 --- a/crates/driver/src/infra/solver/dto/notification.rs +++ b/crates/driver/src/infra/solver/dto/notification.rs @@ -61,7 +61,11 @@ impl Notification { notify::Settlement::Expired => Kind::Expired, }, notify::Kind::PostprocessingTimedOut => Kind::PostprocessingTimedOut, - notify::Kind::UnsettledConsecutiveAuctions => Kind::UnsettledConsecutiveAuctions, + notify::Kind::Banned(reason) => Kind::Banned(match reason { + notify::BanReason::UnsettledConsecutiveAuctions => { + BanReason::UnsettledConsecutiveAuctions + } + }), }, } } @@ -145,6 +149,13 @@ pub enum Kind { Expired, Fail, PostprocessingTimedOut, + Banned(BanReason), +} + +#[serde_as] +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase", tag = "reason")] +pub enum BanReason { UnsettledConsecutiveAuctions, } diff --git a/crates/solvers/openapi.yml b/crates/solvers/openapi.yml index 135f47cfaa..0b79459413 100644 --- a/crates/solvers/openapi.yml +++ b/crates/solvers/openapi.yml @@ -89,7 +89,7 @@ paths: - cancelled - fail - postprocessingTimedOut - - unsettledConsecutiveAuctions + - banned responses: "200": description: notification successfully received. From 4bee6760a1bcc5caef96f2e28b05a463c712088f Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 11 Feb 2025 20:39:32 +0000 Subject: [PATCH 13/17] Move notification to the infra mod --- .../competition/participation_guard/db.rs | 27 ++----------------- crates/autopilot/src/infra/mod.rs | 2 +- crates/autopilot/src/infra/solvers/mod.rs | 23 +++++++++++++++- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 92d7de3a4e..73683bb4ce 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -5,7 +5,6 @@ use { infra, }, ethrpc::block_stream::CurrentBlockWatcher, - futures::future::join_all, std::{ collections::HashMap, sync::Arc, @@ -74,7 +73,7 @@ impl Validator { Metrics::get() .non_settling_solver .with_label_values(&[&driver.name]); - + driver.clone() }) }) @@ -94,7 +93,7 @@ impl Validator { .filter(|driver| driver.accepts_unsettled_blocking) .collect::>(); - Self::notify_solvers(&non_settling_drivers); + infra::notify_non_settling_solvers(&non_settling_drivers); let now = Instant::now(); for driver in non_settling_drivers { @@ -111,28 +110,6 @@ impl Validator { } }); } - - /// Try to notify all the non-settling solvers in a background task. - fn notify_solvers(non_settling_drivers: &[Arc]) { - let futures = non_settling_drivers - .iter() - .cloned() - .map(|driver| { - async move { - if let Err(err) = driver - .notify(&infra::solvers::dto::notify::Request::UnsettledConsecutiveAuctions) - .await - { - tracing::debug!(solver = ?driver.name, ?err, "unable to notify external solver"); - } - } - }) - .collect::>(); - - tokio::spawn(async move { - join_all(futures).await; - }); - } } #[async_trait::async_trait] diff --git a/crates/autopilot/src/infra/mod.rs b/crates/autopilot/src/infra/mod.rs index e61a7f5dc8..b8d71918e7 100644 --- a/crates/autopilot/src/infra/mod.rs +++ b/crates/autopilot/src/infra/mod.rs @@ -7,5 +7,5 @@ pub use { blockchain::Ethereum, order_validation::banned, persistence::Persistence, - solvers::Driver, + solvers::{notify_non_settling_solvers, Driver}, }; diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index 2490a71b15..9e3d1ff7a5 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -2,8 +2,9 @@ use { self::dto::{reveal, settle, solve}, crate::{arguments::Account, domain::eth, infra::solvers::dto::notify, util}, anyhow::{anyhow, Context, Result}, + ethcontract::jsonrpc::futures_util::future::join_all, reqwest::{Client, StatusCode}, - std::time::Duration, + std::{sync::Arc, time::Duration}, thiserror::Error, url::Url, }; @@ -176,3 +177,23 @@ pub async fn response_body_with_size_limit( } Ok(bytes) } + +/// Try to notify all the non-settling solvers in a background task. +pub fn notify_non_settling_solvers(non_settling_drivers: &[Arc]) { + let futures = non_settling_drivers + .iter() + .cloned() + .map(|driver| async move { + if let Err(err) = driver + .notify(¬ify::Request::UnsettledConsecutiveAuctions) + .await + { + tracing::debug!(solver = ?driver.name, ?err, "unable to notify external solver"); + } + }) + .collect::>(); + + tokio::spawn(async move { + join_all(futures).await; + }); +} From 17f3f3a3bcbf340dd89eaf6e144ba0687f4bd103 Mon Sep 17 00:00:00 2001 From: ilya Date: Tue, 11 Feb 2025 21:03:08 +0000 Subject: [PATCH 14/17] Ban until timestamp notification --- .../competition/participation_guard/db.rs | 10 +++++++-- .../autopilot/src/infra/solvers/dto/notify.rs | 2 +- crates/autopilot/src/infra/solvers/mod.rs | 9 ++++++-- .../api/routes/notify/dto/notify_request.rs | 9 ++++---- .../driver/src/infra/api/routes/notify/mod.rs | 2 +- .../driver/src/infra/notify/notification.rs | 5 ++++- .../src/infra/solver/dto/notification.rs | 21 +++++++++++++------ 7 files changed, 41 insertions(+), 17 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/db.rs b/crates/autopilot/src/domain/competition/participation_guard/db.rs index 73683bb4ce..ca29326962 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/db.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/db.rs @@ -5,6 +5,7 @@ use { infra, }, ethrpc::block_stream::CurrentBlockWatcher, + model::time::now_in_epoch_seconds, std::{ collections::HashMap, sync::Arc, @@ -93,9 +94,14 @@ impl Validator { .filter(|driver| driver.accepts_unsettled_blocking) .collect::>(); - infra::notify_non_settling_solvers(&non_settling_drivers); - let now = Instant::now(); + let banned_until_timestamp = + u64::from(now_in_epoch_seconds()) + self_.0.ttl.as_secs(); + infra::notify_non_settling_solvers( + &non_settling_drivers, + banned_until_timestamp, + ); + for driver in non_settling_drivers { self_ .0 diff --git a/crates/autopilot/src/infra/solvers/dto/notify.rs b/crates/autopilot/src/infra/solvers/dto/notify.rs index 08165964fd..92a5532fa0 100644 --- a/crates/autopilot/src/infra/solvers/dto/notify.rs +++ b/crates/autopilot/src/infra/solvers/dto/notify.rs @@ -5,5 +5,5 @@ use {serde::Serialize, serde_with::serde_as}; #[serde(rename_all = "camelCase")] pub enum Request { /// The driver won multiple consecutive auctions but never settled them. - UnsettledConsecutiveAuctions, + UnsettledConsecutiveAuctions(u64), } diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index 9e3d1ff7a5..7123c94ba7 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -179,13 +179,18 @@ pub async fn response_body_with_size_limit( } /// Try to notify all the non-settling solvers in a background task. -pub fn notify_non_settling_solvers(non_settling_drivers: &[Arc]) { +pub fn notify_non_settling_solvers( + non_settling_drivers: &[Arc], + banned_until_timestamp: u64, +) { let futures = non_settling_drivers .iter() .cloned() .map(|driver| async move { if let Err(err) = driver - .notify(¬ify::Request::UnsettledConsecutiveAuctions) + .notify(¬ify::Request::UnsettledConsecutiveAuctions( + banned_until_timestamp, + )) .await { tracing::debug!(solver = ?driver.name, ?err, "unable to notify external solver"); diff --git a/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs b/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs index 16afa72814..a89a7a73e6 100644 --- a/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs +++ b/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs @@ -5,15 +5,16 @@ use {crate::infra::notify, serde::Deserialize, serde_with::serde_as}; #[serde(rename_all = "camelCase")] pub enum NotifyRequest { /// The driver won multiple consecutive auctions but never settled them. - UnsettledConsecutiveAuctions, + UnsettledConsecutiveAuctions(u64), } impl From for notify::Kind { fn from(value: NotifyRequest) -> Self { match value { - NotifyRequest::UnsettledConsecutiveAuctions => { - notify::Kind::Banned(notify::BanReason::UnsettledConsecutiveAuctions) - } + NotifyRequest::UnsettledConsecutiveAuctions(until_timestamp) => notify::Kind::Banned { + reason: notify::BanReason::UnsettledConsecutiveAuctions, + until_timestamp, + }, } } } diff --git a/crates/driver/src/infra/api/routes/notify/mod.rs b/crates/driver/src/infra/api/routes/notify/mod.rs index 7683cc499b..e6ae815b82 100644 --- a/crates/driver/src/infra/api/routes/notify/mod.rs +++ b/crates/driver/src/infra/api/routes/notify/mod.rs @@ -13,7 +13,7 @@ async fn route( req: axum::Json, ) -> Result)> { let solver = &state.solver().name().0; - tracing::trace!(?req, ?solver, "received a notification"); + tracing::debug!(?req, ?solver, "received a notification"); state .solver() .notify(None, None, req.0.into()) diff --git a/crates/driver/src/infra/notify/notification.rs b/crates/driver/src/infra/notify/notification.rs index 5e6323dbdf..2b9fe40003 100644 --- a/crates/driver/src/infra/notify/notification.rs +++ b/crates/driver/src/infra/notify/notification.rs @@ -46,7 +46,10 @@ pub enum Kind { /// On-chain solution postprocessing timed out. PostprocessingTimedOut, /// The solver has been banned for a specific reason. - Banned(BanReason), + Banned { + reason: BanReason, + until_timestamp: u64, + }, } #[derive(Debug)] diff --git a/crates/driver/src/infra/solver/dto/notification.rs b/crates/driver/src/infra/solver/dto/notification.rs index a352c712b1..49f55ac399 100644 --- a/crates/driver/src/infra/solver/dto/notification.rs +++ b/crates/driver/src/infra/solver/dto/notification.rs @@ -61,11 +61,17 @@ impl Notification { notify::Settlement::Expired => Kind::Expired, }, notify::Kind::PostprocessingTimedOut => Kind::PostprocessingTimedOut, - notify::Kind::Banned(reason) => Kind::Banned(match reason { - notify::BanReason::UnsettledConsecutiveAuctions => { - BanReason::UnsettledConsecutiveAuctions - } - }), + notify::Kind::Banned { + reason, + until_timestamp, + } => Kind::Banned { + reason: match reason { + notify::BanReason::UnsettledConsecutiveAuctions => { + BanReason::UnsettledConsecutiveAuctions + } + }, + until_timestamp, + }, }, } } @@ -149,7 +155,10 @@ pub enum Kind { Expired, Fail, PostprocessingTimedOut, - Banned(BanReason), + Banned { + reason: BanReason, + until_timestamp: u64, + }, } #[serde_as] From cca70c889a136515c457320fda52f37498a3a7bd Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 12 Feb 2025 11:58:00 +0000 Subject: [PATCH 15/17] Refactoring --- .../autopilot/src/infra/solvers/dto/notify.rs | 13 +++++++++-- crates/autopilot/src/infra/solvers/mod.rs | 7 +++--- .../api/routes/notify/dto/notify_request.rs | 23 ++++++++++++++++--- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/crates/autopilot/src/infra/solvers/dto/notify.rs b/crates/autopilot/src/infra/solvers/dto/notify.rs index 92a5532fa0..0ac2e786f9 100644 --- a/crates/autopilot/src/infra/solvers/dto/notify.rs +++ b/crates/autopilot/src/infra/solvers/dto/notify.rs @@ -4,6 +4,15 @@ use {serde::Serialize, serde_with::serde_as}; #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] pub enum Request { - /// The driver won multiple consecutive auctions but never settled them. - UnsettledConsecutiveAuctions(u64), + Banned { + reason: BanReason, + until_timestamp: u64, + }, +} + +#[serde_as] +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub enum BanReason { + UnsettledConsecutiveAuctions, } diff --git a/crates/autopilot/src/infra/solvers/mod.rs b/crates/autopilot/src/infra/solvers/mod.rs index 7123c94ba7..264ab0931b 100644 --- a/crates/autopilot/src/infra/solvers/mod.rs +++ b/crates/autopilot/src/infra/solvers/mod.rs @@ -188,9 +188,10 @@ pub fn notify_non_settling_solvers( .cloned() .map(|driver| async move { if let Err(err) = driver - .notify(¬ify::Request::UnsettledConsecutiveAuctions( - banned_until_timestamp, - )) + .notify(¬ify::Request::Banned { + reason: notify::BanReason::UnsettledConsecutiveAuctions, + until_timestamp: banned_until_timestamp, + }) .await { tracing::debug!(solver = ?driver.name, ?err, "unable to notify external solver"); diff --git a/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs b/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs index a89a7a73e6..4eb36c20ef 100644 --- a/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs +++ b/crates/driver/src/infra/api/routes/notify/dto/notify_request.rs @@ -4,15 +4,32 @@ use {crate::infra::notify, serde::Deserialize, serde_with::serde_as}; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub enum NotifyRequest { + Banned { + reason: BanReason, + until_timestamp: u64, + }, +} + +#[serde_as] +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BanReason { /// The driver won multiple consecutive auctions but never settled them. - UnsettledConsecutiveAuctions(u64), + UnsettledConsecutiveAuctions, } impl From for notify::Kind { fn from(value: NotifyRequest) -> Self { match value { - NotifyRequest::UnsettledConsecutiveAuctions(until_timestamp) => notify::Kind::Banned { - reason: notify::BanReason::UnsettledConsecutiveAuctions, + NotifyRequest::Banned { + reason, + until_timestamp, + } => notify::Kind::Banned { + reason: match reason { + BanReason::UnsettledConsecutiveAuctions => { + notify::BanReason::UnsettledConsecutiveAuctions + } + }, until_timestamp, }, } From 7de4dc15ec749ed8fa80e143c055e6649baa4440 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 12 Feb 2025 11:59:45 +0000 Subject: [PATCH 16/17] OpenAPI --- crates/driver/openapi.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/driver/openapi.yml b/crates/driver/openapi.yml index 0383a35130..4a43152472 100644 --- a/crates/driver/openapi.yml +++ b/crates/driver/openapi.yml @@ -149,9 +149,9 @@ paths: schema: type: string enum: - - unsettledConsecutiveAuctions + - banned description: |- - A notification that informs about a reasoning why the driver was banned. + The reason for the notification with optional additional context. responses: "200": description: notification successfully received. From 4523b52141ec98ac68f1189882776b84752ab727 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 12 Feb 2025 14:26:32 +0000 Subject: [PATCH 17/17] Formatting --- .../src/domain/competition/participation_guard/mod.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/autopilot/src/domain/competition/participation_guard/mod.rs b/crates/autopilot/src/domain/competition/participation_guard/mod.rs index 54e9a72ecf..63eb301e3a 100644 --- a/crates/autopilot/src/domain/competition/participation_guard/mod.rs +++ b/crates/autopilot/src/domain/competition/participation_guard/mod.rs @@ -2,11 +2,7 @@ mod db; mod onchain; use { - crate::{ - arguments::DbBasedSolverParticipationGuardConfig, - domain::eth, - infra, - }, + crate::{arguments::DbBasedSolverParticipationGuardConfig, domain::eth, infra}, std::{collections::HashMap, sync::Arc}, };