From 575c679f782a99707e6eacd5e098378f08f5dfe0 Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Nov 2023 12:55:35 +0000 Subject: [PATCH 1/3] DEX solvers rate limiter (#2071) # Description Properly handles `429 Too Many Requests` response from DEX solvers by utilizing `shared::RateLimiter` to back off further requests with a cool-down period. # Changes - A new`RateLimited` variant on solvers::infra::dex::Error - Parse each individual dex solver's HTTP sub-error with status 429 into this error type - Used `shared::rate_limiter::RateLimiter` though `solvers::boundary` - A new config for solvers with `max_retries` for the `solvers` crate ## How to test TBD: is it possible to simulate it in staging? ## Related Issues Fixes #2068 --- crates/shared/src/lib.rs | 2 + crates/shared/src/rate_limiter.rs | 88 +++++++++++++++- crates/solvers/src/boundary/mod.rs | 1 + crates/solvers/src/boundary/rate_limiter.rs | 53 ++++++++++ crates/solvers/src/domain/solver/dex/mod.rs | 100 +++++++++++++------ crates/solvers/src/infra/config/dex/file.rs | 33 +++++- crates/solvers/src/infra/config/dex/mod.rs | 8 +- crates/solvers/src/infra/dex/balancer/mod.rs | 11 +- crates/solvers/src/infra/dex/mod.rs | 5 + crates/solvers/src/infra/dex/oneinch/mod.rs | 9 +- crates/solvers/src/infra/dex/paraswap/mod.rs | 9 +- crates/solvers/src/infra/dex/zeroex/mod.rs | 3 + crates/solvers/src/run.rs | 8 +- 13 files changed, 287 insertions(+), 43 deletions(-) create mode 100644 crates/solvers/src/boundary/rate_limiter.rs diff --git a/crates/shared/src/lib.rs b/crates/shared/src/lib.rs index d6d96106cf..07d93bdaa8 100644 --- a/crates/shared/src/lib.rs +++ b/crates/shared/src/lib.rs @@ -55,6 +55,8 @@ use std::{ time::{Duration, Instant}, }; +pub use rate_limiter::{RateLimiter, RateLimiterError, RateLimitingStrategy}; + /// Run a future and callback with the time the future took. The call back can /// for example log the time. pub async fn measure_time(future: impl Future, timer: impl FnOnce(Duration)) -> T { diff --git a/crates/shared/src/rate_limiter.rs b/crates/shared/src/rate_limiter.rs index c280c475c4..703867e6fe 100644 --- a/crates/shared/src/rate_limiter.rs +++ b/crates/shared/src/rate_limiter.rs @@ -174,7 +174,7 @@ impl RateLimiter { } } -#[derive(Error, Debug, Clone, Default)] +#[derive(Error, Debug, Clone, Default, PartialEq)] pub enum RateLimiterError { #[default] #[error("rate limited")] @@ -221,6 +221,30 @@ impl RateLimiter { Ok(result) } + + pub async fn execute_with_back_off( + &self, + task: impl Future, + requires_back_off: impl Fn(&T) -> bool, + ) -> Result { + if let Some(back_off_duration) = self.get_back_off_duration_if_limited() { + tokio::time::sleep(back_off_duration).await; + } + + self.execute(task, requires_back_off).await + } + + fn get_back_off_duration_if_limited(&self) -> Option { + let strategy = self.strategy.lock().unwrap(); + let now = Instant::now(); + + if strategy.drop_requests_until > now { + let back_off_duration = strategy.drop_requests_until - now; + Some(back_off_duration) + } else { + None + } + } } /// Shared module with common back-off checks. @@ -236,7 +260,7 @@ pub mod back_off { #[cfg(test)] mod tests { - use {super::*, futures::FutureExt, tokio::time::sleep}; + use {super::*, futures::FutureExt, std::ops::Add, tokio::time::sleep}; #[test] fn current_back_off_does_not_panic() { @@ -317,4 +341,64 @@ mod tests { rate_limiter.strategy().get_current_back_off() ); } + + #[tokio::test] + async fn test_execute_with_no_back_off() { + let timeout = Duration::from_secs(30); + let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap(); + let original_drop_until = strategy.drop_requests_until; + let rate_limiter = RateLimiter::from_strategy(strategy, "test_no_back_off".to_string()); + + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| false) + .now_or_never() + .unwrap() + .unwrap(); + + assert_eq!(result, 1); + { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + assert!(current_strategy.drop_requests_until < original_drop_until.add(timeout)); + } + + let result = rate_limiter.execute(async { 1 }, |_| false).await.unwrap(); + assert_eq!(result, 1); + } + + #[tokio::test] + async fn test_execute_with_back_off() { + let timeout = Duration::from_millis(50); + let strategy = RateLimitingStrategy::try_new(1.0, timeout, timeout).unwrap(); + let original_drop_until = strategy.drop_requests_until; + let rate_limiter = RateLimiter::from_strategy(strategy, "test_back_off".to_string()); + + // start the back off + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| true) + .await + .unwrap(); + + assert_eq!(result, 1); + let drop_until = { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + let drop_until = current_strategy.drop_requests_until; + assert!(drop_until >= original_drop_until.add(timeout)); + drop_until + }; + + // back off is not over, expecting a RateLimiterError + let result = rate_limiter.execute(async { 1 }, |_| false).await; + assert_eq!(result, Err(RateLimiterError::RateLimited)); + { + let current_strategy = rate_limiter.strategy.lock().unwrap(); + assert_eq!(current_strategy.drop_requests_until, drop_until); + } + + // back off is over + let result = rate_limiter + .execute_with_back_off(async { 1 }, |_| false) + .await + .unwrap(); + assert_eq!(result, 1); + } } diff --git a/crates/solvers/src/boundary/mod.rs b/crates/solvers/src/boundary/mod.rs index 11315b5591..637ef43a1e 100644 --- a/crates/solvers/src/boundary/mod.rs +++ b/crates/solvers/src/boundary/mod.rs @@ -5,5 +5,6 @@ pub mod baseline; pub mod legacy; pub mod liquidity; pub mod naive; +pub mod rate_limiter; pub type Result = anyhow::Result; diff --git a/crates/solvers/src/boundary/rate_limiter.rs b/crates/solvers/src/boundary/rate_limiter.rs new file mode 100644 index 0000000000..86ee66e2c1 --- /dev/null +++ b/crates/solvers/src/boundary/rate_limiter.rs @@ -0,0 +1,53 @@ +use { + anyhow::Result, + std::{future::Future, time::Duration}, + thiserror::Error, +}; + +pub struct RateLimiter { + inner: shared::RateLimiter, +} + +#[derive(Debug, Clone)] +pub struct RateLimitingStrategy { + inner: shared::RateLimitingStrategy, +} + +impl RateLimitingStrategy { + pub fn try_new( + back_off_growth_factor: f64, + min_back_off: Duration, + max_back_off: Duration, + ) -> Result { + shared::RateLimitingStrategy::try_new(back_off_growth_factor, min_back_off, max_back_off) + .map(|shared| Self { inner: shared }) + } +} + +#[derive(Error, Debug, Clone, Default)] +pub enum RateLimiterError { + #[default] + #[error("rate limited")] + RateLimited, +} + +impl RateLimiter { + pub fn new(strategy: RateLimitingStrategy, name: String) -> Self { + Self { + inner: shared::RateLimiter::from_strategy(strategy.inner, name), + } + } + + pub async fn execute_with_back_off( + &self, + task: impl Future, + requires_back_off: impl Fn(&T) -> bool, + ) -> Result { + self.inner + .execute_with_back_off(task, requires_back_off) + .await + .map_err(|err| match err { + shared::RateLimiterError::RateLimited => RateLimiterError::RateLimited, + }) + } +} diff --git a/crates/solvers/src/domain/solver/dex/mod.rs b/crates/solvers/src/domain/solver/dex/mod.rs index 65ceb69f69..b27f961337 100644 --- a/crates/solvers/src/domain/solver/dex/mod.rs +++ b/crates/solvers/src/domain/solver/dex/mod.rs @@ -3,8 +3,15 @@ use { crate::{ - domain, - domain::{auction, dex::slippage, order, solution, solver::dex::fills::Fills}, + boundary::rate_limiter::{RateLimiter, RateLimiterError}, + domain::{ + self, + auction, + dex::{self, slippage}, + order::{self, Order}, + solution, + solver::dex::fills::Fills, + }, infra, }, futures::{future, stream, FutureExt, StreamExt}, @@ -33,10 +40,14 @@ pub struct Dex { /// Parameters used to calculate the revert risk of a solution. risk: domain::Risk, + + /// Handles 429 Too Many Requests error with a retry mechanism + rate_limiter: RateLimiter, } impl Dex { pub fn new(dex: infra::dex::Dex, config: infra::config::dex::Config) -> Self { + let rate_limiter = RateLimiter::new(config.rate_limiting_strategy, "dex_api".to_string()); Self { dex, simulator: infra::dex::Simulator::new( @@ -48,6 +59,7 @@ impl Dex { concurrent_requests: config.concurrent_requests, fills: Fills::new(config.smallest_partial_fill), risk: config.risk, + rate_limiter, } } @@ -86,42 +98,66 @@ impl Dex { .filter_map(future::ready) } - async fn solve_order( + async fn try_solve( &self, - order: order::UserOrder<'_>, + order: &Order, + dex_order: &dex::Order, tokens: &auction::Tokens, gas_price: auction::GasPrice, - ) -> Option { - let order = order.get(); - let swap = { - let order = self.fills.dex_order(order, tokens)?; - let slippage = self.slippage.relative(&order.amount(), tokens); - self.dex.swap(&order, &slippage, tokens, gas_price).await - }; - - let swap = match swap { - Ok(swap) => swap, - Err(err @ infra::dex::Error::NotFound) => { - if order.partially_fillable { - // Only adjust the amount to try next if we are sure the API worked correctly - // yet still wasn't able to provide a swap. - self.fills.reduce_next_try(order.uid); - } else { - tracing::debug!(?err, "skipping order"); + ) -> Option { + let dex_err_handler = |err: infra::dex::Error| { + match &err { + err @ infra::dex::Error::NotFound => { + if order.partially_fillable { + // Only adjust the amount to try next if we are sure the API + // worked + // correctly yet still wasn't able to provide a + // swap. + self.fills.reduce_next_try(order.uid); + } else { + tracing::debug!(?err, "skipping order"); + } + } + err @ infra::dex::Error::OrderNotSupported => { + tracing::debug!(?err, "skipping order") + } + err @ infra::dex::Error::RateLimited => { + tracing::debug!(?err, "encountered rate limit") + } + infra::dex::Error::Other(err) => { + tracing::warn!(?err, "failed to get swap") } - return None; - } - Err(err @ infra::dex::Error::OrderNotSupported) => { - tracing::debug!(?err, "skipping order"); - return None; - } - Err(infra::dex::Error::Other(err)) => { - tracing::warn!(?err, "failed to get swap"); - return None; } + err }; + let swap = async { + let slippage = self.slippage.relative(&dex_order.amount(), tokens); + self.dex + .swap(dex_order, &slippage, tokens, gas_price) + .await + .map_err(dex_err_handler) + }; + self.rate_limiter + .execute_with_back_off(swap, |result| { + matches!(result, Err(infra::dex::Error::RateLimited)) + }) + .await + .map_err(|err| match err { + RateLimiterError::RateLimited => infra::dex::Error::RateLimited, + }) + .and_then(|result| result) + .ok() + } - let uid = order.uid; + async fn solve_order( + &self, + order: order::UserOrder<'_>, + tokens: &auction::Tokens, + gas_price: auction::GasPrice, + ) -> Option { + let order = order.get(); + let dex_order = self.fills.dex_order(order, tokens)?; + let swap = self.try_solve(order, &dex_order, tokens, gas_price).await?; let sell = tokens.reference_price(&order.sell.token); let Some(solution) = swap .into_solution(order.clone(), gas_price, sell, &self.risk, &self.simulator) @@ -133,7 +169,7 @@ impl Dex { tracing::debug!("solved"); // Maybe some liquidity appeared that enables a bigger fill. - self.fills.increase_next_try(uid); + self.fills.increase_next_try(order.uid); Some(solution.with_buffers_internalizations(tokens)) } diff --git a/crates/solvers/src/infra/config/dex/file.rs b/crates/solvers/src/infra/config/dex/file.rs index 6b101e96b8..9f126618da 100644 --- a/crates/solvers/src/infra/config/dex/file.rs +++ b/crates/solvers/src/infra/config/dex/file.rs @@ -2,6 +2,7 @@ use { crate::{ + boundary::rate_limiter::RateLimitingStrategy, domain::{dex::slippage, eth, Risk}, infra::{blockchain, config::unwrap_or_log, contracts}, util::serialize, @@ -9,7 +10,7 @@ use { bigdecimal::BigDecimal, serde::{de::DeserializeOwned, Deserialize}, serde_with::serde_as, - std::{fmt::Debug, num::NonZeroUsize, path::Path}, + std::{fmt::Debug, num::NonZeroUsize, path::Path, time::Duration}, tokio::fs, }; @@ -48,6 +49,18 @@ struct Config { /// (gas_amount_factor, gas_price_factor, nmb_orders_factor, intercept) risk_parameters: (f64, f64, f64, f64), + /// Back-off growth factor for rate limiting. + #[serde(default = "default_back_off_growth_factor")] + back_off_growth_factor: f64, + + /// Minimum back-off time in seconds for rate limiting. + #[serde(default = "default_min_back_off")] + min_back_off: u64, + + /// Maximum back-off time in seconds for rate limiting. + #[serde(default = "default_max_back_off")] + max_back_off: u64, + /// Settings specific to the wrapped dex API. dex: toml::Value, } @@ -64,6 +77,18 @@ fn default_smallest_partial_fill() -> eth::U256 { eth::U256::exp10(16) // 0.01 ETH } +fn default_back_off_growth_factor() -> f64 { + 1.0 +} + +fn default_min_back_off() -> u64 { + Default::default() +} + +fn default_max_back_off() -> u64 { + Default::default() +} + /// Loads the base solver configuration from a TOML file. /// /// # Panics @@ -117,6 +142,12 @@ pub async fn load(path: &Path) -> (super::Config, T) { nmb_orders_factor: config.risk_parameters.2, intercept: config.risk_parameters.3, }, + rate_limiting_strategy: RateLimitingStrategy::try_new( + config.back_off_growth_factor, + Duration::from_secs(config.min_back_off), + Duration::from_secs(config.max_back_off), + ) + .unwrap(), }; (config, dex) } diff --git a/crates/solvers/src/infra/config/dex/mod.rs b/crates/solvers/src/infra/config/dex/mod.rs index ac8cb2354d..400e92a6b9 100644 --- a/crates/solvers/src/infra/config/dex/mod.rs +++ b/crates/solvers/src/infra/config/dex/mod.rs @@ -5,15 +5,20 @@ pub mod paraswap; pub mod zeroex; use { - crate::domain::{dex::slippage, eth, Risk}, + crate::{ + boundary::rate_limiter::RateLimitingStrategy, + domain::{dex::slippage, eth, Risk}, + }, std::num::NonZeroUsize, }; +#[derive(Clone)] pub struct Contracts { pub settlement: eth::ContractAddress, pub authenticator: eth::ContractAddress, } +#[derive(Clone)] pub struct Config { pub node_url: reqwest::Url, pub contracts: Contracts, @@ -21,4 +26,5 @@ pub struct Config { pub concurrent_requests: NonZeroUsize, pub smallest_partial_fill: eth::Ether, pub risk: Risk, + pub rate_limiting_strategy: RateLimitingStrategy, } diff --git a/crates/solvers/src/infra/dex/balancer/mod.rs b/crates/solvers/src/infra/dex/balancer/mod.rs index 92962269f0..806495da1b 100644 --- a/crates/solvers/src/infra/dex/balancer/mod.rs +++ b/crates/solvers/src/infra/dex/balancer/mod.rs @@ -163,12 +163,21 @@ impl Sor { pub enum Error { #[error("no valid swap interaction could be found")] NotFound, + #[error("rate limited")] + RateLimited, #[error(transparent)] Http(util::http::Error), } impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { - Self::Http(err.into()) + match err { + util::http::RoundtripError::Http(util::http::Error::Status(status_code, _)) + if status_code.as_u16() == 429 => + { + Self::RateLimited + } + other_err => Self::Http(other_err.into()), + } } } diff --git a/crates/solvers/src/infra/dex/mod.rs b/crates/solvers/src/infra/dex/mod.rs index 32dacaad4e..4f2a35bb9e 100644 --- a/crates/solvers/src/infra/dex/mod.rs +++ b/crates/solvers/src/infra/dex/mod.rs @@ -46,6 +46,8 @@ pub enum Error { OrderNotSupported, #[error("no valid swap interaction could be found")] NotFound, + #[error("rate limited")] + RateLimited, #[error(transparent)] Other(Box), } @@ -64,6 +66,7 @@ impl From for Error { match err { oneinch::Error::OrderNotSupported => Self::OrderNotSupported, oneinch::Error::NotFound => Self::NotFound, + oneinch::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } @@ -73,6 +76,7 @@ impl From for Error { fn from(err: zeroex::Error) -> Self { match err { zeroex::Error::NotFound => Self::NotFound, + zeroex::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } @@ -82,6 +86,7 @@ impl From for Error { fn from(err: paraswap::Error) -> Self { match err { paraswap::Error::NotFound => Self::NotFound, + paraswap::Error::RateLimited => Self::RateLimited, _ => Self::Other(Box::new(err)), } } diff --git a/crates/solvers/src/infra/dex/oneinch/mod.rs b/crates/solvers/src/infra/dex/oneinch/mod.rs index 041ac4b38f..6ef480a810 100644 --- a/crates/solvers/src/infra/dex/oneinch/mod.rs +++ b/crates/solvers/src/infra/dex/oneinch/mod.rs @@ -165,6 +165,8 @@ pub enum Error { OrderNotSupported, #[error("no valid swap could be found")] NotFound, + #[error("rate limited")] + RateLimited, #[error("api error {code}: {description}")] Api { code: i32, description: String }, #[error(transparent)] @@ -174,7 +176,12 @@ pub enum Error { impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { match err { - util::http::RoundtripError::Http(err) => Self::Http(err), + util::http::RoundtripError::Http(http_err) => match http_err { + util::http::Error::Status(status_code, _) if status_code.as_u16() == 429 => { + Self::RateLimited + } + other_err => Self::Http(other_err), + }, util::http::RoundtripError::Api(err) => { // Unfortunately, AFAIK these codes aren't documented anywhere. These // based on empirical observations of what the API has returned in the diff --git a/crates/solvers/src/infra/dex/paraswap/mod.rs b/crates/solvers/src/infra/dex/paraswap/mod.rs index 621dafd984..d4e87310e6 100644 --- a/crates/solvers/src/infra/dex/paraswap/mod.rs +++ b/crates/solvers/src/infra/dex/paraswap/mod.rs @@ -113,6 +113,8 @@ pub enum Error { NotFound, #[error("decimals are missing for the swapped tokens")] MissingDecimals, + #[error("rate limited")] + RateLimited, #[error("api error {0}")] Api(String), #[error(transparent)] @@ -122,7 +124,12 @@ pub enum Error { impl From> for Error { fn from(err: util::http::RoundtripError) -> Self { match err { - util::http::RoundtripError::Http(err) => Self::Http(err), + util::http::RoundtripError::Http(http_err) => match http_err { + util::http::Error::Status(status_code, _) if status_code.as_u16() == 429 => { + Self::RateLimited + } + other_err => Self::Http(other_err), + }, util::http::RoundtripError::Api(err) => match err.error.as_str() { "ESTIMATED_LOSS_GREATER_THAN_MAX_IMPACT" | "No routes found with enough liquidity" diff --git a/crates/solvers/src/infra/dex/zeroex/mod.rs b/crates/solvers/src/infra/dex/zeroex/mod.rs index ac7929bfe2..43834cc24a 100644 --- a/crates/solvers/src/infra/dex/zeroex/mod.rs +++ b/crates/solvers/src/infra/dex/zeroex/mod.rs @@ -150,6 +150,8 @@ pub enum Error { NotFound, #[error("quote does not specify an approval spender")] MissingSpender, + #[error("rate limited")] + RateLimited, #[error("api error code {code}: {reason}")] Api { code: i64, reason: String }, #[error(transparent)] @@ -166,6 +168,7 @@ impl From> for Error { // past. match err.code { 100 => Self::NotFound, + 429 => Self::RateLimited, _ => Self::Api { code: err.code, reason: err.reason, diff --git a/crates/solvers/src/run.rs b/crates/solvers/src/run.rs index 9eee0b2ac1..2fc2e0784d 100644 --- a/crates/solvers/src/run.rs +++ b/crates/solvers/src/run.rs @@ -47,28 +47,28 @@ async fn run_with(args: cli::Args, bind: Option>) { dex::Dex::ZeroEx( dex::zeroex::ZeroEx::new(config.zeroex).expect("invalid 0x configuration"), ), - config.base, + config.base.clone(), )) } cli::Command::Balancer { config } => { let config = config::dex::balancer::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::Balancer(dex::balancer::Sor::new(config.sor)), - config.base, + config.base.clone(), )) } cli::Command::OneInch { config } => { let config = config::dex::oneinch::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::OneInch(dex::oneinch::OneInch::new(config.oneinch).await.unwrap()), - config.base, + config.base.clone(), )) } cli::Command::ParaSwap { config } => { let config = config::dex::paraswap::file::load(&config).await; Solver::Dex(solver::Dex::new( dex::Dex::ParaSwap(dex::paraswap::ParaSwap::new(config.paraswap)), - config.base, + config.base.clone(), )) } }; From 018e5d9f59f0e58931c0bc9e54c717f92224247a Mon Sep 17 00:00:00 2001 From: ilya Date: Thu, 30 Nov 2023 13:01:36 +0000 Subject: [PATCH 2/3] Periodic order events table clean up (#2089) # Description Add a background task that periodically drops order events from DB that are very old (configurable). # Changes - [ ] Configurable interval and threshold of the cleaning task - [ ] Adds a new metric in order to create a separate alerting when the clean-up wasn't successful for a long period of time ## How to test Added integro-test ## Related Issues Implements the first task of the #1977 --- crates/autopilot/src/arguments.rs | 31 ++- crates/autopilot/src/database/order_events.rs | 6 + crates/autopilot/src/lib.rs | 1 + crates/autopilot/src/periodic_db_cleanup.rs | 177 ++++++++++++++++++ crates/autopilot/src/run.rs | 15 ++ crates/database/src/lib.rs | 1 + crates/database/src/order_events.rs | 32 +++- 7 files changed, 254 insertions(+), 9 deletions(-) create mode 100644 crates/autopilot/src/periodic_db_cleanup.rs diff --git a/crates/autopilot/src/arguments.rs b/crates/autopilot/src/arguments.rs index 58cae5d8e0..4b60bdfee1 100644 --- a/crates/autopilot/src/arguments.rs +++ b/crates/autopilot/src/arguments.rs @@ -6,7 +6,11 @@ use { http_client, price_estimation::{self, NativePriceEstimators}, }, - std::{net::SocketAddr, num::NonZeroUsize, time::Duration}, + std::{ + net::SocketAddr, + num::{NonZeroUsize, ParseFloatError}, + time::Duration, + }, url::Url, }; @@ -203,6 +207,16 @@ pub struct Arguments { value_parser = shared::arguments::duration_from_seconds, )] pub solve_deadline: Duration, + + /// Time interval in days between each cleanup operation of the + /// `order_events` database table. + #[clap(long, env, default_value = "1", value_parser = duration_from_days)] + pub order_events_cleanup_interval: Duration, + + /// Age threshold in days for order events to be eligible for cleanup in the + /// `order_events` database table. + #[clap(long, env, default_value = "30", value_parser = duration_from_days)] + pub order_events_cleanup_threshold: Duration, } impl std::fmt::Display for Arguments { @@ -270,6 +284,21 @@ impl std::fmt::Display for Arguments { writeln!(f, "score_cap: {}", self.score_cap)?; display_option(f, "shadow", &self.shadow)?; writeln!(f, "solve_deadline: {:?}", self.solve_deadline)?; + writeln!( + f, + "order_events_cleanup_interval: {:?}", + self.order_events_cleanup_interval + )?; + writeln!( + f, + "order_events_cleanup_threshold: {:?}", + self.order_events_cleanup_threshold + )?; Ok(()) } } + +fn duration_from_days(s: &str) -> Result { + let days = s.parse::()?; + Ok(Duration::from_secs_f64(days * 86_400.0)) +} diff --git a/crates/autopilot/src/database/order_events.rs b/crates/autopilot/src/database/order_events.rs index 0afbd30b0b..d202656528 100644 --- a/crates/autopilot/src/database/order_events.rs +++ b/crates/autopilot/src/database/order_events.rs @@ -7,6 +7,7 @@ use { order_events::{self, OrderEvent}, }, model::order::OrderUid, + sqlx::Error, }; impl super::Postgres { @@ -19,6 +20,11 @@ impl super::Postgres { tracing::warn!(?err, "failed to insert order events"); } } + + /// Deletes events before the provided timestamp. + pub async fn delete_order_events_before(&self, timestamp: DateTime) -> Result { + order_events::delete_order_events_before(&self.0, timestamp).await + } } async fn store_order_events( diff --git a/crates/autopilot/src/lib.rs b/crates/autopilot/src/lib.rs index b08311a43f..40f0c148a4 100644 --- a/crates/autopilot/src/lib.rs +++ b/crates/autopilot/src/lib.rs @@ -5,6 +5,7 @@ pub mod driver_api; pub mod driver_model; pub mod event_updater; pub mod on_settlement_event_updater; +pub mod periodic_db_cleanup; pub mod protocol; pub mod run; pub mod run_loop; diff --git a/crates/autopilot/src/periodic_db_cleanup.rs b/crates/autopilot/src/periodic_db_cleanup.rs new file mode 100644 index 0000000000..e889d46042 --- /dev/null +++ b/crates/autopilot/src/periodic_db_cleanup.rs @@ -0,0 +1,177 @@ +use { + crate::database::Postgres, + chrono::{DateTime, Utc}, + std::time::Duration, + tokio::time, +}; + +pub struct OrderEventsCleanerConfig { + cleanup_interval: Duration, + event_age_threshold: chrono::Duration, +} + +impl OrderEventsCleanerConfig { + pub fn new(cleanup_interval: Duration, event_age_threshold: Duration) -> Self { + OrderEventsCleanerConfig { + cleanup_interval, + event_age_threshold: chrono::Duration::from_std(event_age_threshold).unwrap(), + } + } +} + +pub struct OrderEventsCleaner { + config: OrderEventsCleanerConfig, + db: Postgres, +} + +impl OrderEventsCleaner { + pub fn new(config: OrderEventsCleanerConfig, db: Postgres) -> Self { + OrderEventsCleaner { config, db } + } + + pub async fn run_forever(self) -> ! { + let mut interval = time::interval(self.config.cleanup_interval); + loop { + interval.tick().await; + + let timestamp: DateTime = Utc::now() - self.config.event_age_threshold; + match self.db.delete_order_events_before(timestamp).await { + Ok(affected_rows_count) => { + tracing::debug!(affected_rows_count, timestamp = %timestamp.to_string(), "order events cleanup"); + Metrics::get().order_events_cleanup_total.inc() + } + Err(err) => { + tracing::warn!(?err, "failed to delete order events before {}", timestamp) + } + } + } + } +} + +#[derive(prometheus_metric_storage::MetricStorage)] +struct Metrics { + /// The total number of successful `order_events` table cleanups + #[metric(name = "periodic_db_cleanup")] + order_events_cleanup_total: prometheus::IntCounter, +} + +impl Metrics { + fn get() -> &'static Self { + Metrics::instance(observe::metrics::get_storage_registry()).unwrap() + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + database::{ + byte_array::ByteArray, + order_events::{OrderEvent, OrderEventLabel}, + }, + itertools::Itertools, + sqlx::{PgPool, Row}, + }; + + // Note: `tokio::time::advance` was not used in these tests. While it is a + // useful tool for controlling time flow in asynchronous tests, it causes + // complications when used with `sqlx::PgPool`. Specifically, pausing or + // advancing time with `tokio::time::advance` can interfere with the pool's + // ability to acquire database connections, leading to panics and unpredictable + // behavior in tests. Given these issues, tests were designed without + // manipulating the timer, to maintain stability and reliability in the + // database connection handling. + #[tokio::test] + #[ignore] + async fn order_events_cleaner_flow() { + let db = Postgres::new("postgresql://").await.unwrap(); + let mut ex = db.0.begin().await.unwrap(); + database::clear_DANGER_(&mut ex).await.unwrap(); + + let now = Utc::now(); + let event_a = OrderEvent { + order_uid: ByteArray([1; 56]), + timestamp: now - chrono::Duration::milliseconds(300), + label: OrderEventLabel::Created, + }; + let event_b = OrderEvent { + order_uid: ByteArray([2; 56]), + timestamp: now - chrono::Duration::milliseconds(100), + label: OrderEventLabel::Created, + }; + let event_c = OrderEvent { + order_uid: ByteArray([3; 56]), + timestamp: now, + label: OrderEventLabel::Created, + }; + + database::order_events::insert_order_event(&mut ex, &event_a) + .await + .unwrap(); + database::order_events::insert_order_event(&mut ex, &event_b) + .await + .unwrap(); + database::order_events::insert_order_event(&mut ex, &event_c) + .await + .unwrap(); + + ex.commit().await.unwrap(); + + let ids = order_event_ids_before(&db.0).await; + assert_eq!(ids.len(), 3); + assert!(ids.contains(&event_a.order_uid)); + assert!(ids.contains(&event_b.order_uid)); + assert!(ids.contains(&event_c.order_uid)); + + let config = + OrderEventsCleanerConfig::new(Duration::from_millis(50), Duration::from_millis(200)); + let cleaner = OrderEventsCleaner::new(config, db.clone()); + + tokio::task::spawn(cleaner.run_forever()); + + // delete `order_a` after the initialization + time::sleep(Duration::from_millis(20)).await; + let ids = order_event_ids_before(&db.0).await; + assert_eq!(ids.len(), 2); + assert!(!ids.contains(&event_a.order_uid)); + assert!(ids.contains(&event_b.order_uid)); + assert!(ids.contains(&event_c.order_uid)); + + // nothing deleted after the first interval + time::sleep(Duration::from_millis(50)).await; + let ids = order_event_ids_before(&db.0).await; + assert_eq!(ids.len(), 2); + assert!(!ids.contains(&event_a.order_uid)); + assert!(ids.contains(&event_b.order_uid)); + assert!(ids.contains(&event_c.order_uid)); + + // delete `event_b` only + time::sleep(Duration::from_millis(100)).await; + let ids = order_event_ids_before(&db.0).await; + assert_eq!(ids.len(), 1); + assert!(!ids.contains(&event_b.order_uid)); + assert!(ids.contains(&event_c.order_uid)); + + // delete `event_c` + time::sleep(Duration::from_millis(200)).await; + let ids = order_event_ids_before(&db.0).await; + assert!(ids.is_empty()); + } + + async fn order_event_ids_before(pool: &PgPool) -> Vec> { + const QUERY: &str = r#" + SELECT order_uid + FROM order_events + "#; + sqlx::query(QUERY) + .fetch_all(pool) + .await + .unwrap() + .iter() + .map(|row| { + let order_uid: ByteArray<56> = row.try_get(0).unwrap(); + order_uid + }) + .collect_vec() + } +} diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 11bd9f4ad8..7f62894113 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -591,6 +591,21 @@ pub async fn run(args: Arguments) { .instrument(tracing::info_span!("on_settlement_event_updater")), ); + let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new( + args.order_events_cleanup_interval, + args.order_events_cleanup_threshold, + ); + let order_events_cleaner = crate::periodic_db_cleanup::OrderEventsCleaner::new( + order_events_cleaner_config, + db.clone(), + ); + + tokio::task::spawn( + order_events_cleaner + .run_forever() + .instrument(tracing::info_span!("order_events_cleaner")), + ); + if args.enable_colocation { if args.drivers.is_empty() { panic!("colocation is enabled but no drivers are configured"); diff --git a/crates/database/src/lib.rs b/crates/database/src/lib.rs index 8a9ff4c9bc..cf17d547fb 100644 --- a/crates/database/src/lib.rs +++ b/crates/database/src/lib.rs @@ -47,6 +47,7 @@ pub type PgTransaction<'a> = sqlx::Transaction<'a, sqlx::Postgres>; /// The names of all tables we use in the db. pub const ALL_TABLES: &[&str] = &[ "orders", + "order_events", "trades", "invalidations", "quotes", diff --git a/crates/database/src/order_events.rs b/crates/database/src/order_events.rs index c31b001559..726fd07e3f 100644 --- a/crates/database/src/order_events.rs +++ b/crates/database/src/order_events.rs @@ -4,7 +4,7 @@ use { crate::OrderUid, chrono::Utc, - sqlx::{types::chrono::DateTime, PgConnection}, + sqlx::{types::chrono::DateTime, PgConnection, PgPool}, }; /// Describes what kind of event was registered for an order. @@ -51,13 +51,13 @@ pub async fn insert_order_event( event: &OrderEvent, ) -> Result<(), sqlx::Error> { const QUERY: &str = r#" -INSERT INTO order_events ( - order_uid, - timestamp, - label -) -VALUES ($1, $2, $3) -"#; + INSERT INTO order_events ( + order_uid, + timestamp, + label + ) + VALUES ($1, $2, $3) + "#; sqlx::query(QUERY) .bind(event.order_uid) .bind(event.timestamp) @@ -66,3 +66,19 @@ VALUES ($1, $2, $3) .await .map(|_| ()) } + +/// Deletes rows before the provided timestamp from the `order_events` table. +pub async fn delete_order_events_before( + pool: &PgPool, + timestamp: DateTime, +) -> Result { + const QUERY: &str = r#" + DELETE FROM order_events + WHERE timestamp < $1 + "#; + sqlx::query(QUERY) + .bind(timestamp) + .execute(pool) + .await + .map(|result| result.rows_affected()) +} From df37aff4c0db382863e855eabde30499f47edf72 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Thu, 30 Nov 2023 14:26:45 +0100 Subject: [PATCH 3/3] Reduce memory used for balancer v2 liquidity (#2099) # Description The new driver is using a huge amount of memory. One low hanging fruit is to slightly redesign the internals of balancer v2 liquidity. Currently all those stable and weighted pools contain big pieces of data that don't change and are duplicated. For other liquidity sources this was handled by having an `Inner` component that contains that data which is shared with `Arc`s across all liquidity components. # Changes This PR applies that same pattern for balancer stable and weighted pools. ## How to test Just the compiler since it's only a minor refactor --- .../src/boundary/liquidity/balancer/v2/mod.rs | 2 +- crates/solver/src/liquidity/balancer_v2.rs | 67 ++++++++++++------- crates/solver/src/settlement_simulation.rs | 4 +- 3 files changed, 44 insertions(+), 29 deletions(-) diff --git a/crates/driver/src/boundary/liquidity/balancer/v2/mod.rs b/crates/driver/src/boundary/liquidity/balancer/v2/mod.rs index 703058e3fd..a09315c254 100644 --- a/crates/driver/src/boundary/liquidity/balancer/v2/mod.rs +++ b/crates/driver/src/boundary/liquidity/balancer/v2/mod.rs @@ -57,7 +57,7 @@ fn to_interaction( // change this assumption, we would need to change it there as well. GPv2Settlement::at(&web3, receiver.0), BalancerV2Vault::at(&web3, pool.vault.into()), - Arc::new(Allowances::empty(receiver.0)), + Allowances::empty(receiver.0), ); let interaction = handler.swap( diff --git a/crates/solver/src/liquidity/balancer_v2.rs b/crates/solver/src/liquidity/balancer_v2.rs index 31f045b6d3..cf7df80f65 100644 --- a/crates/solver/src/liquidity/balancer_v2.rs +++ b/crates/solver/src/liquidity/balancer_v2.rs @@ -61,13 +61,19 @@ impl BalancerV2Liquidity { let pools = self.pool_fetcher.fetch(pairs, block).await?; let tokens = pools.relevant_tokens(); - let allowances = Arc::new( - self.allowance_manager - .get_allowances(tokens, self.vault.address()) - .await?, - ); - let weighted_product_orders = pools + let allowances = self + .allowance_manager + .get_allowances(tokens, self.vault.address()) + .await?; + + let inner = Arc::new(Inner { + allowances, + settlement: self.settlement.clone(), + vault: self.vault.clone(), + }); + + let weighted_product_orders: Vec<_> = pools .weighted_pools .into_iter() .map(|pool| WeightedProductOrder { @@ -77,13 +83,11 @@ impl BalancerV2Liquidity { version: pool.version, settlement_handling: Arc::new(SettlementHandler { pool_id: pool.common.id, - settlement: self.settlement.clone(), - vault: self.vault.clone(), - allowances: allowances.clone(), + inner: inner.clone(), }), }) .collect(); - let stable_pool_orders = pools + let stable_pool_orders: Vec<_> = pools .stable_pools .into_iter() .map(|pool| StablePoolOrder { @@ -93,9 +97,7 @@ impl BalancerV2Liquidity { amplification_parameter: pool.amplification_parameter, settlement_handling: Arc::new(SettlementHandler { pool_id: pool.common.id, - settlement: self.settlement.clone(), - vault: self.vault.clone(), - allowances: allowances.clone(), + inner: inner.clone(), }), }) .collect(); @@ -125,9 +127,13 @@ impl LiquidityCollecting for BalancerV2Liquidity { pub struct SettlementHandler { pool_id: H256, + inner: Arc, +} + +struct Inner { settlement: GPv2Settlement, vault: BalancerV2Vault, - allowances: Arc, + allowances: Allowances, } impl SettlementHandler { @@ -135,18 +141,20 @@ impl SettlementHandler { pool_id: H256, settlement: GPv2Settlement, vault: BalancerV2Vault, - allowances: Arc, + allowances: Allowances, ) -> Self { SettlementHandler { pool_id, - settlement, - vault, - allowances, + inner: Arc::new(Inner { + settlement, + vault, + allowances, + }), } } pub fn vault(&self) -> &BalancerV2Vault { - &self.vault + &self.inner.vault } pub fn pool_id(&self) -> H256 { @@ -159,8 +167,8 @@ impl SettlementHandler { output: TokenAmount, ) -> BalancerSwapGivenOutInteraction { BalancerSwapGivenOutInteraction { - settlement: self.settlement.clone(), - vault: self.vault.clone(), + settlement: self.inner.settlement.clone(), + vault: self.inner.vault.clone(), pool_id: self.pool_id, asset_in_max: input_max, asset_out: output, @@ -198,7 +206,11 @@ impl SettlementHandler { execution: AmmOrderExecution, encoder: &mut SettlementEncoder, ) -> Result<()> { - if let Some(approval) = self.allowances.approve_token(execution.input_max.clone())? { + if let Some(approval) = self + .inner + .allowances + .approve_token(execution.input_max.clone())? + { encoder.append_to_execution_plan_internalizable( Arc::new(approval), execution.internalizable, @@ -436,17 +448,20 @@ mod tests { #[test] fn encodes_swaps_in_settlement() { let (settlement, vault) = dummy_contracts(); - let handler = SettlementHandler { - pool_id: H256([0x90; 32]), + let inner = Arc::new(Inner { settlement: settlement.clone(), vault: vault.clone(), - allowances: Arc::new(Allowances::new( + allowances: Allowances::new( vault.address(), hashmap! { H160([0x70; 20]) => 0.into(), H160([0x71; 20]) => 100.into(), }, - )), + ), + }); + let handler = SettlementHandler { + pool_id: H256([0x90; 32]), + inner, }; let mut encoder = SettlementEncoder::new(Default::default()); diff --git a/crates/solver/src/settlement_simulation.rs b/crates/solver/src/settlement_simulation.rs index f3399977b4..78d111719a 100644 --- a/crates/solver/src/settlement_simulation.rs +++ b/crates/solver/src/settlement_simulation.rs @@ -527,10 +527,10 @@ mod tests { .unwrap(), contract.clone(), balancer_vault, - Arc::new(Allowances::new( + Allowances::new( contract.address(), hashmap! {"0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48".parse().unwrap()=> U256::from_dec_str("18000000000000000000000000").unwrap()}, - )), + ), )), };