Skip to content

Commit

Permalink
only mark tokens as unsupported based on metrics for a limited time (#…
Browse files Browse the repository at this point in the history
…3205)

# Description
Currently the bad token detection based on metrics will mark tokens as
unsupported forever. This is problematic for tokens which only have
issues temporarily. For example this can happen when the most important
pool for a token gets into a weird state or when a token gets paused or
a while.

# Changes
Adjusts the logic to freeze tokens for a configurable period of time.
Once the freeze period is over we give the token another chance (even if
the stats indicate that it's currently unsupported). To not run into
issues when a token is always bad the logic was built such that 1 more
`bad` measurement is enough to freeze the token again.
That way we can safely configure a very high `min_measurements` without
having periods where a token that was flagged as bad can issues again
because we need to get a lot new measurements to mark it as unsupported
again.

Additionally the PR simplifies how the metrics based bad token detector
gets instantiated and gives each solver their completely separate
instance (how it was originally communicated because each solver may
support different tokens).

## How to test
added a unit test
  • Loading branch information
MartinquaXD authored Jan 3, 2025
1 parent fca9a92 commit 3ca92b7
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 35 deletions.
130 changes: 100 additions & 30 deletions crates/driver/src/domain/competition/bad_tokens/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use {super::Quality, crate::domain::eth, dashmap::DashMap, std::sync::Arc};
use {
super::Quality,
crate::domain::eth,
dashmap::DashMap,
std::{
sync::Arc,
time::{Duration, Instant},
},
};

#[derive(Default)]
#[derive(Default, Debug)]
struct TokenStatistics {
attempts: u32,
fails: u32,
}

#[derive(Default, Clone)]
pub struct DetectorBuilder(Arc<DashMap<eth::TokenAddress, TokenStatistics>>);

impl DetectorBuilder {
pub fn build(self, failure_ratio: f64, required_measurements: u32, log_only: bool) -> Detector {
Detector {
failure_ratio,
required_measurements,
counter: self.0,
log_only,
}
}
flagged_unsupported_at: Option<Instant>,
}

/// Monitors tokens to determine whether they are considered "unsupported" based
Expand All @@ -31,19 +26,46 @@ pub struct Detector {
required_measurements: u32,
counter: Arc<DashMap<eth::TokenAddress, TokenStatistics>>,
log_only: bool,
token_freeze_time: Duration,
}

impl Detector {
pub fn get_quality(&self, token: &eth::TokenAddress) -> Option<Quality> {
let measurements = self.counter.get(token)?;
let is_unsupported = self.is_unsupported(&measurements);
pub fn new(
failure_ratio: f64,
required_measurements: u32,
log_only: bool,
token_freeze_time: Duration,
) -> Self {
Self {
failure_ratio,
required_measurements,
counter: Default::default(),
log_only,
token_freeze_time,
}
}

pub fn get_quality(&self, token: &eth::TokenAddress, now: Instant) -> Option<Quality> {
let stats = self.counter.get(token)?;
if stats
.flagged_unsupported_at
.is_some_and(|t| now.duration_since(t) > self.token_freeze_time)
{
// Sometimes tokens only cause issues temporarily. If the token's freeze
// period expired we give it another chance to see if it still behaves badly.
return None;
}

let is_unsupported = self.stats_indicate_unsupported(&stats);
(!self.log_only && is_unsupported).then_some(Quality::Unsupported)
}

fn is_unsupported(&self, measurements: &TokenStatistics) -> bool {
let token_failure_ratio = measurements.fails as f64 / measurements.attempts as f64;
measurements.attempts >= self.required_measurements
&& token_failure_ratio >= self.failure_ratio
fn stats_indicate_unsupported(&self, stats: &TokenStatistics) -> bool {
let token_failure_ratio = match stats.attempts {
0 => return false,
attempts => f64::from(stats.fails) / f64::from(attempts),
};
stats.attempts >= self.required_measurements && token_failure_ratio >= self.failure_ratio
}

/// Updates the tokens that participated in settlements by
Expand All @@ -54,32 +76,80 @@ impl Detector {
token_pairs: &[(eth::TokenAddress, eth::TokenAddress)],
failure: bool,
) {
let mut unsupported_tokens = vec![];
let now = Instant::now();
let mut new_unsupported_tokens = vec![];
token_pairs
.iter()
.flat_map(|(token_a, token_b)| [token_a, token_b])
.for_each(|token| {
let measurement = self
let mut stats = self
.counter
.entry(*token)
.and_modify(|counter| {
counter.attempts += 1;
counter.fails += u32::from(failure)
counter.fails += u32::from(failure);
})
.or_insert_with(|| TokenStatistics {
attempts: 1,
fails: u32::from(failure),
flagged_unsupported_at: None,
});
if self.is_unsupported(&measurement) {
unsupported_tokens.push(token);

// token neeeds to be frozen as unsupported for a while
if self.stats_indicate_unsupported(&stats)
&& stats
.flagged_unsupported_at
.is_none_or(|t| now.duration_since(t) > self.token_freeze_time)
{
new_unsupported_tokens.push(token);
stats.flagged_unsupported_at = Some(now);
}
});

if !unsupported_tokens.is_empty() {
if !new_unsupported_tokens.is_empty() {
tracing::debug!(
tokens = ?unsupported_tokens,
tokens = ?new_unsupported_tokens,
"mark tokens as unsupported"
);
}
}
}

#[cfg(test)]
mod tests {
use {super::*, ethcontract::H160};

/// Tests that a token only gets marked temporarily as unsupported.
/// After the freeze period it will be allowed again.
#[tokio::test]
async fn unfreeze_bad_tokens() {
const FREEZE_DURATION: Duration = Duration::from_millis(50);
let detector = Detector::new(0.5, 2, false, FREEZE_DURATION);

let token_a = eth::TokenAddress(eth::ContractAddress(H160([1; 20])));
let token_b = eth::TokenAddress(eth::ContractAddress(H160([2; 20])));

// token is reported as supported while we don't have enough measurements
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);
detector.update_tokens(&[(token_a, token_b)], true);
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);
detector.update_tokens(&[(token_a, token_b)], true);

// after we got enough measurements the token gets marked as bad
assert_eq!(
detector.get_quality(&token_a, Instant::now()),
Some(Quality::Unsupported)
);

// after the freeze period is over the token gets reported as good again
tokio::time::sleep(FREEZE_DURATION).await;
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);

// after an unfreeze another bad measurement is enough to freeze it again
detector.update_tokens(&[(token_a, token_b)], true);
assert_eq!(
detector.get_quality(&token_a, Instant::now()),
Some(Quality::Unsupported)
);
}
}
2 changes: 1 addition & 1 deletion crates/driver/src/domain/competition/bad_tokens/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Detector {
}

if let Some(metrics) = &self.metrics {
return metrics.get_quality(&token);
return metrics.get_quality(&token, now);
}

None
Expand Down
5 changes: 2 additions & 3 deletions crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ impl Api {
app = routes::metrics(app);
app = routes::healthz(app);

let metrics_bad_token_detector_builder = bad_tokens::metrics::DetectorBuilder::default();

// Multiplex each solver as part of the API. Multiple solvers are multiplexed
// on the same driver so only one liquidity collector collects the liquidity
// for all of them. This is important because liquidity collection is
Expand All @@ -81,10 +79,11 @@ impl Api {
}

if bad_token_config.enable_metrics_strategy {
bad_tokens.with_metrics_detector(metrics_bad_token_detector_builder.clone().build(
bad_tokens.with_metrics_detector(bad_tokens::metrics::Detector::new(
bad_token_config.metrics_strategy_failure_ratio,
bad_token_config.metrics_strategy_required_measurements,
bad_token_config.metrics_strategy_log_only,
bad_token_config.metrics_strategy_token_freeze_time,
));
}

Expand Down
3 changes: 3 additions & 0 deletions crates/driver/src/infra/config/file/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ pub async fn load(chain: Chain, path: &Path) -> infra::Config {
.bad_token_detection
.metrics_strategy_required_measurements,
metrics_strategy_log_only: config.bad_token_detection.metrics_strategy_log_only,
metrics_strategy_token_freeze_time: config
.bad_token_detection
.metrics_strategy_token_freeze_time,
},
settle_queue_size: config.settle_queue_size,
}
Expand Down
13 changes: 13 additions & 0 deletions crates/driver/src/infra/config/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,15 @@ pub struct BadTokenDetectionConfig {
rename = "metrics-bad-token-detection-log-only"
)]
pub metrics_strategy_log_only: bool,

/// How long the metrics based bad token detection should flag a token as
/// unsupported before it allows to solve for that token again.
#[serde(
default = "default_metrics_bad_token_detector_freeze_time",
rename = "metrics-bad-token-detection-token-freeze-time",
with = "humantime_serde"
)]
pub metrics_strategy_token_freeze_time: Duration,
}

impl Default for BadTokenDetectionConfig {
Expand All @@ -742,3 +751,7 @@ fn default_settle_queue_size() -> usize {
fn default_metrics_bad_token_detector_log_only() -> bool {
true
}

fn default_metrics_bad_token_detector_freeze_time() -> Duration {
Duration::from_secs(60 * 10)
}
3 changes: 2 additions & 1 deletion crates/driver/src/infra/solver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use {
derive_more::{From, Into},
num::BigRational,
reqwest::header::HeaderName,
std::collections::HashMap,
std::{collections::HashMap, time::Duration},
tap::TapFallible,
thiserror::Error,
tracing::Instrument,
Expand Down Expand Up @@ -317,4 +317,5 @@ pub struct BadTokenDetection {
pub metrics_strategy_failure_ratio: f64,
pub metrics_strategy_required_measurements: u32,
pub metrics_strategy_log_only: bool,
pub metrics_strategy_token_freeze_time: Duration,
}

0 comments on commit 3ca92b7

Please sign in to comment.