Skip to content

Commit

Permalink
Merge branch 'main' into dont_log_auction
Browse files Browse the repository at this point in the history
  • Loading branch information
fleupold authored Dec 1, 2023
2 parents f4ffca4 + c598d2c commit 722f8ec
Show file tree
Hide file tree
Showing 26 changed files with 613 additions and 92 deletions.
31 changes: 30 additions & 1 deletion crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use {
http_client,
price_estimation::{self, NativePriceEstimators},
},
std::{net::SocketAddr, num::NonZeroUsize, time::Duration},
std::{
net::SocketAddr,
num::{NonZeroUsize, ParseFloatError},
time::Duration,
},
url::Url,
};

Expand Down Expand Up @@ -203,6 +207,16 @@ pub struct Arguments {
value_parser = shared::arguments::duration_from_seconds,
)]
pub solve_deadline: Duration,

/// Time interval in days between each cleanup operation of the
/// `order_events` database table.
#[clap(long, env, default_value = "1", value_parser = duration_from_days)]
pub order_events_cleanup_interval: Duration,

/// Age threshold in days for order events to be eligible for cleanup in the
/// `order_events` database table.
#[clap(long, env, default_value = "30", value_parser = duration_from_days)]
pub order_events_cleanup_threshold: Duration,
}

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -270,6 +284,21 @@ impl std::fmt::Display for Arguments {
writeln!(f, "score_cap: {}", self.score_cap)?;
display_option(f, "shadow", &self.shadow)?;
writeln!(f, "solve_deadline: {:?}", self.solve_deadline)?;
writeln!(
f,
"order_events_cleanup_interval: {:?}",
self.order_events_cleanup_interval
)?;
writeln!(
f,
"order_events_cleanup_threshold: {:?}",
self.order_events_cleanup_threshold
)?;
Ok(())
}
}

fn duration_from_days(s: &str) -> Result<Duration, ParseFloatError> {
let days = s.parse::<f64>()?;
Ok(Duration::from_secs_f64(days * 86_400.0))
}
6 changes: 6 additions & 0 deletions crates/autopilot/src/database/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
order_events::{self, OrderEvent},
},
model::order::OrderUid,
sqlx::Error,
};

impl super::Postgres {
Expand All @@ -19,6 +20,11 @@ impl super::Postgres {
tracing::warn!(?err, "failed to insert order events");
}
}

/// Deletes events before the provided timestamp.
pub async fn delete_order_events_before(&self, timestamp: DateTime<Utc>) -> Result<u64, Error> {
order_events::delete_order_events_before(&self.0, timestamp).await
}
}

async fn store_order_events(
Expand Down
1 change: 1 addition & 0 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod driver_api;
pub mod driver_model;
pub mod event_updater;
pub mod on_settlement_event_updater;
pub mod periodic_db_cleanup;
pub mod protocol;
pub mod run;
pub mod run_loop;
Expand Down
177 changes: 177 additions & 0 deletions crates/autopilot/src/periodic_db_cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use {
crate::database::Postgres,
chrono::{DateTime, Utc},
std::time::Duration,
tokio::time,
};

pub struct OrderEventsCleanerConfig {
cleanup_interval: Duration,
event_age_threshold: chrono::Duration,
}

impl OrderEventsCleanerConfig {
pub fn new(cleanup_interval: Duration, event_age_threshold: Duration) -> Self {
OrderEventsCleanerConfig {
cleanup_interval,
event_age_threshold: chrono::Duration::from_std(event_age_threshold).unwrap(),
}
}
}

pub struct OrderEventsCleaner {
config: OrderEventsCleanerConfig,
db: Postgres,
}

impl OrderEventsCleaner {
pub fn new(config: OrderEventsCleanerConfig, db: Postgres) -> Self {
OrderEventsCleaner { config, db }
}

pub async fn run_forever(self) -> ! {
let mut interval = time::interval(self.config.cleanup_interval);
loop {
interval.tick().await;

let timestamp: DateTime<Utc> = Utc::now() - self.config.event_age_threshold;
match self.db.delete_order_events_before(timestamp).await {
Ok(affected_rows_count) => {
tracing::debug!(affected_rows_count, timestamp = %timestamp.to_string(), "order events cleanup");
Metrics::get().order_events_cleanup_total.inc()
}
Err(err) => {
tracing::warn!(?err, "failed to delete order events before {}", timestamp)
}
}
}
}
}

#[derive(prometheus_metric_storage::MetricStorage)]
struct Metrics {
/// The total number of successful `order_events` table cleanups
#[metric(name = "periodic_db_cleanup")]
order_events_cleanup_total: prometheus::IntCounter,
}

impl Metrics {
fn get() -> &'static Self {
Metrics::instance(observe::metrics::get_storage_registry()).unwrap()
}
}

#[cfg(test)]
mod tests {
use {
super::*,
database::{
byte_array::ByteArray,
order_events::{OrderEvent, OrderEventLabel},
},
itertools::Itertools,
sqlx::{PgPool, Row},
};

// Note: `tokio::time::advance` was not used in these tests. While it is a
// useful tool for controlling time flow in asynchronous tests, it causes
// complications when used with `sqlx::PgPool`. Specifically, pausing or
// advancing time with `tokio::time::advance` can interfere with the pool's
// ability to acquire database connections, leading to panics and unpredictable
// behavior in tests. Given these issues, tests were designed without
// manipulating the timer, to maintain stability and reliability in the
// database connection handling.
#[tokio::test]
#[ignore]
async fn order_events_cleaner_flow() {
let db = Postgres::new("postgresql://").await.unwrap();
let mut ex = db.0.begin().await.unwrap();
database::clear_DANGER_(&mut ex).await.unwrap();

let now = Utc::now();
let event_a = OrderEvent {
order_uid: ByteArray([1; 56]),
timestamp: now - chrono::Duration::milliseconds(300),
label: OrderEventLabel::Created,
};
let event_b = OrderEvent {
order_uid: ByteArray([2; 56]),
timestamp: now - chrono::Duration::milliseconds(100),
label: OrderEventLabel::Created,
};
let event_c = OrderEvent {
order_uid: ByteArray([3; 56]),
timestamp: now,
label: OrderEventLabel::Created,
};

database::order_events::insert_order_event(&mut ex, &event_a)
.await
.unwrap();
database::order_events::insert_order_event(&mut ex, &event_b)
.await
.unwrap();
database::order_events::insert_order_event(&mut ex, &event_c)
.await
.unwrap();

ex.commit().await.unwrap();

let ids = order_event_ids_before(&db.0).await;
assert_eq!(ids.len(), 3);
assert!(ids.contains(&event_a.order_uid));
assert!(ids.contains(&event_b.order_uid));
assert!(ids.contains(&event_c.order_uid));

let config =
OrderEventsCleanerConfig::new(Duration::from_millis(50), Duration::from_millis(200));
let cleaner = OrderEventsCleaner::new(config, db.clone());

tokio::task::spawn(cleaner.run_forever());

// delete `order_a` after the initialization
time::sleep(Duration::from_millis(20)).await;
let ids = order_event_ids_before(&db.0).await;
assert_eq!(ids.len(), 2);
assert!(!ids.contains(&event_a.order_uid));
assert!(ids.contains(&event_b.order_uid));
assert!(ids.contains(&event_c.order_uid));

// nothing deleted after the first interval
time::sleep(Duration::from_millis(50)).await;
let ids = order_event_ids_before(&db.0).await;
assert_eq!(ids.len(), 2);
assert!(!ids.contains(&event_a.order_uid));
assert!(ids.contains(&event_b.order_uid));
assert!(ids.contains(&event_c.order_uid));

// delete `event_b` only
time::sleep(Duration::from_millis(100)).await;
let ids = order_event_ids_before(&db.0).await;
assert_eq!(ids.len(), 1);
assert!(!ids.contains(&event_b.order_uid));
assert!(ids.contains(&event_c.order_uid));

// delete `event_c`
time::sleep(Duration::from_millis(200)).await;
let ids = order_event_ids_before(&db.0).await;
assert!(ids.is_empty());
}

async fn order_event_ids_before(pool: &PgPool) -> Vec<ByteArray<56>> {
const QUERY: &str = r#"
SELECT order_uid
FROM order_events
"#;
sqlx::query(QUERY)
.fetch_all(pool)
.await
.unwrap()
.iter()
.map(|row| {
let order_uid: ByteArray<56> = row.try_get(0).unwrap();
order_uid
})
.collect_vec()
}
}
17 changes: 17 additions & 0 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,21 @@ pub async fn run(args: Arguments) {
.instrument(tracing::info_span!("on_settlement_event_updater")),
);

let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new(
args.order_events_cleanup_interval,
args.order_events_cleanup_threshold,
);
let order_events_cleaner = crate::periodic_db_cleanup::OrderEventsCleaner::new(
order_events_cleaner_config,
db.clone(),
);

tokio::task::spawn(
order_events_cleaner
.run_forever()
.instrument(tracing::info_span!("order_events_cleaner")),
);

if args.enable_colocation {
if args.drivers.is_empty() {
panic!("colocation is enabled but no drivers are configured");
Expand Down Expand Up @@ -672,6 +687,8 @@ async fn shadow_mode(args: Arguments) -> ! {
.await
};

shared::metrics::serve_metrics(Arc::new(shadow::Liveness), args.metrics_address);

let shadow = shadow::RunLoop::new(
orderbook,
drivers,
Expand Down
20 changes: 19 additions & 1 deletion crates/autopilot/src/shadow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ use {
number::nonzero::U256 as NonZeroU256,
primitive_types::{H160, U256},
rand::seq::SliceRandom,
shared::token_list::AutoUpdatingTokenList,
shared::{metrics::LivenessChecking, token_list::AutoUpdatingTokenList},
std::{cmp, time::Duration},
tracing::Instrument,
};

pub struct Liveness;
#[async_trait::async_trait]
impl LivenessChecking for Liveness {
async fn is_alive(&self) -> bool {
// can we somehow check that we keep processing auctions?
true
}
}

pub struct RunLoop {
orderbook: protocol::Orderbook,
drivers: Vec<Driver>,
Expand Down Expand Up @@ -108,6 +117,7 @@ impl RunLoop {
async fn single_run(&self, id: AuctionId, auction: Auction) {
tracing::info!("solving");
Metrics::get().auction.set(id);
Metrics::get().orders.set(auction.orders.len() as _);

let mut participants = self.competition(id, &auction).await;

Expand Down Expand Up @@ -140,6 +150,7 @@ impl RunLoop {
.performance_rewards
.with_label_values(&[&driver.name])
.inc_by(reward.to_f64_lossy());
Metrics::get().wins.with_label_values(&[&driver.name]).inc();
}

let hex = |bytes: &[u8]| format!("0x{}", hex::encode(bytes));
Expand Down Expand Up @@ -292,13 +303,20 @@ struct Metrics {
/// Tracks the last seen auction.
auction: prometheus::IntGauge,

/// Tracks the number of orders in the auction.
orders: prometheus::IntGauge,

/// Tracks the result of every driver.
#[metric(labels("driver", "result"))]
results: prometheus::IntCounterVec,

/// Tracks the approximate performance rewards per driver
#[metric(labels("driver"))]
performance_rewards: prometheus::CounterVec,

/// Tracks the winner of every auction.
#[metric(labels("driver"))]
wins: prometheus::CounterVec,
}

impl Metrics {
Expand Down
15 changes: 5 additions & 10 deletions crates/contracts/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,16 +564,11 @@ fn generate_contract_with_config(

println!("cargo:rerun-if-changed={}", path.display());

config(
ContractBuilder::new()
// for some reason formatting the generate code is broken on nightly
.rustfmt(false)
.visibility_modifier("pub"),
)
.generate(&contract)
.unwrap()
.write_to_file(Path::new(&dest).join(format!("{name}.rs")))
.unwrap();
config(ContractBuilder::new().visibility_modifier("pub"))
.generate(&contract)
.unwrap()
.write_to_file(Path::new(&dest).join(format!("{name}.rs")))
.unwrap();
}

fn addr(s: &str) -> Address {
Expand Down
2 changes: 2 additions & 0 deletions crates/contracts/rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Applying our custom formatting to the generated code is broken.
# That's why we specifically unset those rules for this directory.
1 change: 1 addition & 0 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub type PgTransaction<'a> = sqlx::Transaction<'a, sqlx::Postgres>;
/// The names of all tables we use in the db.
pub const ALL_TABLES: &[&str] = &[
"orders",
"order_events",
"trades",
"invalidations",
"quotes",
Expand Down
Loading

0 comments on commit 722f8ec

Please sign in to comment.