Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only mark tokens as unsupported based on metrics for a limited time #3205

Merged
merged 6 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 100 additions & 30 deletions crates/driver/src/domain/competition/bad_tokens/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use {super::Quality, crate::domain::eth, dashmap::DashMap, std::sync::Arc};
use {
super::Quality,
crate::domain::eth,
dashmap::DashMap,
std::{
sync::Arc,
time::{Duration, Instant},
},
};

#[derive(Default)]
#[derive(Default, Debug)]
struct TokenStatistics {
attempts: u32,
fails: u32,
}

#[derive(Default, Clone)]
pub struct DetectorBuilder(Arc<DashMap<eth::TokenAddress, TokenStatistics>>);

impl DetectorBuilder {
pub fn build(self, failure_ratio: f64, required_measurements: u32, log_only: bool) -> Detector {
Detector {
failure_ratio,
required_measurements,
counter: self.0,
log_only,
}
}
flagged_unsupported_at: Option<Instant>,
}

/// Monitors tokens to determine whether they are considered "unsupported" based
Expand All @@ -31,19 +26,46 @@ pub struct Detector {
required_measurements: u32,
counter: Arc<DashMap<eth::TokenAddress, TokenStatistics>>,
log_only: bool,
token_freeze_time: Duration,
}

impl Detector {
pub fn get_quality(&self, token: &eth::TokenAddress) -> Option<Quality> {
let measurements = self.counter.get(token)?;
let is_unsupported = self.is_unsupported(&measurements);
pub fn new(
failure_ratio: f64,
required_measurements: u32,
log_only: bool,
token_freeze_time: Duration,
) -> Self {
Self {
failure_ratio,
required_measurements,
counter: Default::default(),
log_only,
token_freeze_time,
}
}

pub fn get_quality(&self, token: &eth::TokenAddress, now: Instant) -> Option<Quality> {
let stats = self.counter.get(token)?;
if stats
.flagged_unsupported_at
.is_some_and(|t| now.duration_since(t) > self.token_freeze_time)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this correct? if the flagged_unsupported_at is some and the time between freezing period and now is bigger than token freeze time, should it return None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is explained in the comment. I think the confusion might come from the interface. I think all but 1 strategy (the hardcoded list) can only really return whether a token should be dropped but not if it needs to be kept.
The reason is that it's enough for a single metric to indicate that a token is bad but it's not enough if only 1 strategy says the token is good.
This could maybe be improved in a follow up PR adjusting these functions to return Quality instead of Option<Quality> and have the wrapping detector only pay attention to Quality::Unsupported results in the short circuiting logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the upcoming PR you proposed would be really nice! thanks for the explanation!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually after thinking about this more Option<Quality> seems correct to me. That way we can express:

  1. not enough information to make a decision
  2. information indicates good
  3. information indicates bad

I think I'll just adjust the comment and make it more explicit what gets returned. Because the current code focuses only on whether or not we have enough information to mark the token as unsupported.

// Sometimes tokens only cause issues temporarily. If the token's freeze
// period expired we give it another chance to see if it still behaves badly.
return None;
}

let is_unsupported = self.stats_indicate_unsupported(&stats);
(!self.log_only && is_unsupported).then_some(Quality::Unsupported)
}

fn is_unsupported(&self, measurements: &TokenStatistics) -> bool {
let token_failure_ratio = measurements.fails as f64 / measurements.attempts as f64;
measurements.attempts >= self.required_measurements
&& token_failure_ratio >= self.failure_ratio
fn stats_indicate_unsupported(&self, stats: &TokenStatistics) -> bool {
let token_failure_ratio = match stats.attempts {
0 => return false,
attempts => f64::from(stats.fails) / f64::from(attempts),
};
stats.attempts >= self.required_measurements && token_failure_ratio >= self.failure_ratio
}

/// Updates the tokens that participated in settlements by
Expand All @@ -54,32 +76,80 @@ impl Detector {
token_pairs: &[(eth::TokenAddress, eth::TokenAddress)],
failure: bool,
) {
let mut unsupported_tokens = vec![];
let now = Instant::now();
let mut new_unsupported_tokens = vec![];
token_pairs
.iter()
.flat_map(|(token_a, token_b)| [token_a, token_b])
.for_each(|token| {
let measurement = self
let mut stats = self
.counter
.entry(*token)
.and_modify(|counter| {
counter.attempts += 1;
counter.fails += u32::from(failure)
counter.fails += u32::from(failure);
})
.or_insert_with(|| TokenStatistics {
attempts: 1,
fails: u32::from(failure),
flagged_unsupported_at: None,
});
m-lord-renkse marked this conversation as resolved.
Show resolved Hide resolved
if self.is_unsupported(&measurement) {
unsupported_tokens.push(token);

// token neeeds to be frozen as unsupported for a while
if self.stats_indicate_unsupported(&stats)
&& stats
.flagged_unsupported_at
.is_none_or(|t| now.duration_since(t) > self.token_freeze_time)
{
new_unsupported_tokens.push(token);
stats.flagged_unsupported_at = Some(now);
}
});

if !unsupported_tokens.is_empty() {
if !new_unsupported_tokens.is_empty() {
tracing::debug!(
tokens = ?unsupported_tokens,
tokens = ?new_unsupported_tokens,
"mark tokens as unsupported"
);
}
}
}

#[cfg(test)]
mod tests {
use {super::*, ethcontract::H160};

/// Tests that a token only gets marked temporarily as unsupported.
/// After the freeze period it will be allowed again.
#[tokio::test]
async fn unfreeze_bad_tokens() {
const FREEZE_DURATION: Duration = Duration::from_millis(50);
let detector = Detector::new(0.5, 2, false, FREEZE_DURATION);

let token_a = eth::TokenAddress(eth::ContractAddress(H160([1; 20])));
let token_b = eth::TokenAddress(eth::ContractAddress(H160([2; 20])));

// token is reported as supported while we don't have enough measurements
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);
detector.update_tokens(&[(token_a, token_b)], true);
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);
detector.update_tokens(&[(token_a, token_b)], true);

// after we got enough measurements the token gets marked as bad
assert_eq!(
detector.get_quality(&token_a, Instant::now()),
Some(Quality::Unsupported)
);

// after the freeze period is over the token gets reported as good again
tokio::time::sleep(FREEZE_DURATION).await;
assert_eq!(detector.get_quality(&token_a, Instant::now()), None);

// after an unfreeze another bad measurement is enough to freeze it again
detector.update_tokens(&[(token_a, token_b)], true);
assert_eq!(
detector.get_quality(&token_a, Instant::now()),
Some(Quality::Unsupported)
);
}
}
2 changes: 1 addition & 1 deletion crates/driver/src/domain/competition/bad_tokens/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl Detector {
}

if let Some(metrics) = &self.metrics {
return metrics.get_quality(&token);
return metrics.get_quality(&token, now);
}

None
Expand Down
5 changes: 2 additions & 3 deletions crates/driver/src/infra/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ impl Api {
app = routes::metrics(app);
app = routes::healthz(app);

let metrics_bad_token_detector_builder = bad_tokens::metrics::DetectorBuilder::default();

// Multiplex each solver as part of the API. Multiple solvers are multiplexed
// on the same driver so only one liquidity collector collects the liquidity
// for all of them. This is important because liquidity collection is
Expand All @@ -81,10 +79,11 @@ impl Api {
}

if bad_token_config.enable_metrics_strategy {
bad_tokens.with_metrics_detector(metrics_bad_token_detector_builder.clone().build(
bad_tokens.with_metrics_detector(bad_tokens::metrics::Detector::new(
bad_token_config.metrics_strategy_failure_ratio,
bad_token_config.metrics_strategy_required_measurements,
bad_token_config.metrics_strategy_log_only,
bad_token_config.metrics_strategy_token_freeze_time,
));
}

Expand Down
3 changes: 3 additions & 0 deletions crates/driver/src/infra/config/file/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ pub async fn load(chain: Chain, path: &Path) -> infra::Config {
.bad_token_detection
.metrics_strategy_required_measurements,
metrics_strategy_log_only: config.bad_token_detection.metrics_strategy_log_only,
metrics_strategy_token_freeze_time: config
.bad_token_detection
.metrics_strategy_token_freeze_time,
},
settle_queue_size: config.settle_queue_size,
}
Expand Down
13 changes: 13 additions & 0 deletions crates/driver/src/infra/config/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,15 @@ pub struct BadTokenDetectionConfig {
rename = "metrics-bad-token-detection-log-only"
)]
pub metrics_strategy_log_only: bool,

/// How long the metrics based bad token detection should flag a token as
/// unsupported before it allows to solve for that token again.
#[serde(
default = "default_metrics_bad_token_detector_freeze_time",
rename = "metrics-bad-token-detection-token-freeze-time",
with = "humantime_serde"
)]
pub metrics_strategy_token_freeze_time: Duration,
}

impl Default for BadTokenDetectionConfig {
Expand All @@ -742,3 +751,7 @@ fn default_settle_queue_size() -> usize {
fn default_metrics_bad_token_detector_log_only() -> bool {
true
}

fn default_metrics_bad_token_detector_freeze_time() -> Duration {
Duration::from_secs(60 * 10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use from_mins(10)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, it doesn't exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah you're right, it is nightly api.

}
3 changes: 2 additions & 1 deletion crates/driver/src/infra/solver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use {
derive_more::{From, Into},
num::BigRational,
reqwest::header::HeaderName,
std::collections::HashMap,
std::{collections::HashMap, time::Duration},
tap::TapFallible,
thiserror::Error,
tracing::Instrument,
Expand Down Expand Up @@ -317,4 +317,5 @@ pub struct BadTokenDetection {
pub metrics_strategy_failure_ratio: f64,
pub metrics_strategy_required_measurements: u32,
pub metrics_strategy_log_only: bool,
pub metrics_strategy_token_freeze_time: Duration,
}
Loading