diff --git a/crates/driver/src/domain/competition/bad_tokens/metrics.rs b/crates/driver/src/domain/competition/bad_tokens/metrics.rs index 50917c6d53..43e444dca2 100644 --- a/crates/driver/src/domain/competition/bad_tokens/metrics.rs +++ b/crates/driver/src/domain/competition/bad_tokens/metrics.rs @@ -1,23 +1,18 @@ -use {super::Quality, crate::domain::eth, dashmap::DashMap, std::sync::Arc}; +use { + super::Quality, + crate::domain::eth, + dashmap::DashMap, + std::{ + sync::Arc, + time::{Duration, Instant}, + }, +}; -#[derive(Default)] +#[derive(Default, Debug)] struct TokenStatistics { attempts: u32, fails: u32, -} - -#[derive(Default, Clone)] -pub struct DetectorBuilder(Arc>); - -impl DetectorBuilder { - pub fn build(self, failure_ratio: f64, required_measurements: u32, log_only: bool) -> Detector { - Detector { - failure_ratio, - required_measurements, - counter: self.0, - log_only, - } - } + flagged_unsupported_at: Option, } /// Monitors tokens to determine whether they are considered "unsupported" based @@ -31,19 +26,46 @@ pub struct Detector { required_measurements: u32, counter: Arc>, log_only: bool, + token_freeze_time: Duration, } impl Detector { - pub fn get_quality(&self, token: ð::TokenAddress) -> Option { - let measurements = self.counter.get(token)?; - let is_unsupported = self.is_unsupported(&measurements); + pub fn new( + failure_ratio: f64, + required_measurements: u32, + log_only: bool, + token_freeze_time: Duration, + ) -> Self { + Self { + failure_ratio, + required_measurements, + counter: Default::default(), + log_only, + token_freeze_time, + } + } + + pub fn get_quality(&self, token: ð::TokenAddress, now: Instant) -> Option { + let stats = self.counter.get(token)?; + if stats + .flagged_unsupported_at + .is_some_and(|t| now.duration_since(t) > self.token_freeze_time) + { + // Sometimes tokens only cause issues temporarily. If the token's freeze + // period expired we give it another chance to see if it still behaves badly. + return None; + } + + let is_unsupported = self.stats_indicate_unsupported(&stats); (!self.log_only && is_unsupported).then_some(Quality::Unsupported) } - fn is_unsupported(&self, measurements: &TokenStatistics) -> bool { - let token_failure_ratio = measurements.fails as f64 / measurements.attempts as f64; - measurements.attempts >= self.required_measurements - && token_failure_ratio >= self.failure_ratio + fn stats_indicate_unsupported(&self, stats: &TokenStatistics) -> bool { + let token_failure_ratio = match stats.attempts { + 0 => return false, + attempts => f64::from(stats.fails) / f64::from(attempts), + }; + stats.attempts >= self.required_measurements && token_failure_ratio >= self.failure_ratio } /// Updates the tokens that participated in settlements by @@ -54,32 +76,80 @@ impl Detector { token_pairs: &[(eth::TokenAddress, eth::TokenAddress)], failure: bool, ) { - let mut unsupported_tokens = vec![]; + let now = Instant::now(); + let mut new_unsupported_tokens = vec![]; token_pairs .iter() .flat_map(|(token_a, token_b)| [token_a, token_b]) .for_each(|token| { - let measurement = self + let mut stats = self .counter .entry(*token) .and_modify(|counter| { counter.attempts += 1; - counter.fails += u32::from(failure) + counter.fails += u32::from(failure); }) .or_insert_with(|| TokenStatistics { attempts: 1, fails: u32::from(failure), + flagged_unsupported_at: None, }); - if self.is_unsupported(&measurement) { - unsupported_tokens.push(token); + + // token neeeds to be frozen as unsupported for a while + if self.stats_indicate_unsupported(&stats) + && stats + .flagged_unsupported_at + .is_none_or(|t| now.duration_since(t) > self.token_freeze_time) + { + new_unsupported_tokens.push(token); + stats.flagged_unsupported_at = Some(now); } }); - if !unsupported_tokens.is_empty() { + if !new_unsupported_tokens.is_empty() { tracing::debug!( - tokens = ?unsupported_tokens, + tokens = ?new_unsupported_tokens, "mark tokens as unsupported" ); } } } + +#[cfg(test)] +mod tests { + use {super::*, ethcontract::H160}; + + /// Tests that a token only gets marked temporarily as unsupported. + /// After the freeze period it will be allowed again. + #[tokio::test] + async fn unfreeze_bad_tokens() { + const FREEZE_DURATION: Duration = Duration::from_millis(50); + let detector = Detector::new(0.5, 2, false, FREEZE_DURATION); + + let token_a = eth::TokenAddress(eth::ContractAddress(H160([1; 20]))); + let token_b = eth::TokenAddress(eth::ContractAddress(H160([2; 20]))); + + // token is reported as supported while we don't have enough measurements + assert_eq!(detector.get_quality(&token_a, Instant::now()), None); + detector.update_tokens(&[(token_a, token_b)], true); + assert_eq!(detector.get_quality(&token_a, Instant::now()), None); + detector.update_tokens(&[(token_a, token_b)], true); + + // after we got enough measurements the token gets marked as bad + assert_eq!( + detector.get_quality(&token_a, Instant::now()), + Some(Quality::Unsupported) + ); + + // after the freeze period is over the token gets reported as good again + tokio::time::sleep(FREEZE_DURATION).await; + assert_eq!(detector.get_quality(&token_a, Instant::now()), None); + + // after an unfreeze another bad measurement is enough to freeze it again + detector.update_tokens(&[(token_a, token_b)], true); + assert_eq!( + detector.get_quality(&token_a, Instant::now()), + Some(Quality::Unsupported) + ); + } +} diff --git a/crates/driver/src/domain/competition/bad_tokens/mod.rs b/crates/driver/src/domain/competition/bad_tokens/mod.rs index ffa1578425..d724bea442 100644 --- a/crates/driver/src/domain/competition/bad_tokens/mod.rs +++ b/crates/driver/src/domain/competition/bad_tokens/mod.rs @@ -132,7 +132,7 @@ impl Detector { } if let Some(metrics) = &self.metrics { - return metrics.get_quality(&token); + return metrics.get_quality(&token, now); } None diff --git a/crates/driver/src/infra/api/mod.rs b/crates/driver/src/infra/api/mod.rs index 3047c796a6..de7897cfa8 100644 --- a/crates/driver/src/infra/api/mod.rs +++ b/crates/driver/src/infra/api/mod.rs @@ -58,8 +58,6 @@ impl Api { app = routes::metrics(app); app = routes::healthz(app); - let metrics_bad_token_detector_builder = bad_tokens::metrics::DetectorBuilder::default(); - // Multiplex each solver as part of the API. Multiple solvers are multiplexed // on the same driver so only one liquidity collector collects the liquidity // for all of them. This is important because liquidity collection is @@ -81,10 +79,11 @@ impl Api { } if bad_token_config.enable_metrics_strategy { - bad_tokens.with_metrics_detector(metrics_bad_token_detector_builder.clone().build( + bad_tokens.with_metrics_detector(bad_tokens::metrics::Detector::new( bad_token_config.metrics_strategy_failure_ratio, bad_token_config.metrics_strategy_required_measurements, bad_token_config.metrics_strategy_log_only, + bad_token_config.metrics_strategy_token_freeze_time, )); } diff --git a/crates/driver/src/infra/config/file/load.rs b/crates/driver/src/infra/config/file/load.rs index 9c19aad70a..0c3e2b61f7 100644 --- a/crates/driver/src/infra/config/file/load.rs +++ b/crates/driver/src/infra/config/file/load.rs @@ -124,6 +124,9 @@ pub async fn load(chain: Chain, path: &Path) -> infra::Config { .bad_token_detection .metrics_strategy_required_measurements, metrics_strategy_log_only: config.bad_token_detection.metrics_strategy_log_only, + metrics_strategy_token_freeze_time: config + .bad_token_detection + .metrics_strategy_token_freeze_time, }, settle_queue_size: config.settle_queue_size, } diff --git a/crates/driver/src/infra/config/file/mod.rs b/crates/driver/src/infra/config/file/mod.rs index 5677e60be7..39ed135509 100644 --- a/crates/driver/src/infra/config/file/mod.rs +++ b/crates/driver/src/infra/config/file/mod.rs @@ -716,6 +716,15 @@ pub struct BadTokenDetectionConfig { rename = "metrics-bad-token-detection-log-only" )] pub metrics_strategy_log_only: bool, + + /// How long the metrics based bad token detection should flag a token as + /// unsupported before it allows to solve for that token again. + #[serde( + default = "default_metrics_bad_token_detector_freeze_time", + rename = "metrics-bad-token-detection-token-freeze-time", + with = "humantime_serde" + )] + pub metrics_strategy_token_freeze_time: Duration, } impl Default for BadTokenDetectionConfig { @@ -742,3 +751,7 @@ fn default_settle_queue_size() -> usize { fn default_metrics_bad_token_detector_log_only() -> bool { true } + +fn default_metrics_bad_token_detector_freeze_time() -> Duration { + Duration::from_secs(60 * 10) +} diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index cc0802a5a2..1b361b1c0b 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -22,7 +22,7 @@ use { derive_more::{From, Into}, num::BigRational, reqwest::header::HeaderName, - std::collections::HashMap, + std::{collections::HashMap, time::Duration}, tap::TapFallible, thiserror::Error, tracing::Instrument, @@ -317,4 +317,5 @@ pub struct BadTokenDetection { pub metrics_strategy_failure_ratio: f64, pub metrics_strategy_required_measurements: u32, pub metrics_strategy_log_only: bool, + pub metrics_strategy_token_freeze_time: Duration, }