Skip to content

Commit

Permalink
Merge branch 'main' into format-generated-code
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinquaXD authored Nov 30, 2023
2 parents be49a82 + df37aff commit a004d54
Show file tree
Hide file tree
Showing 23 changed files with 585 additions and 81 deletions.
31 changes: 30 additions & 1 deletion crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use {
http_client,
price_estimation::{self, NativePriceEstimators},
},
std::{net::SocketAddr, num::NonZeroUsize, time::Duration},
std::{
net::SocketAddr,
num::{NonZeroUsize, ParseFloatError},
time::Duration,
},
url::Url,
};

Expand Down Expand Up @@ -203,6 +207,16 @@ pub struct Arguments {
value_parser = shared::arguments::duration_from_seconds,
)]
pub solve_deadline: Duration,

/// Time interval in days between each cleanup operation of the
/// `order_events` database table.
#[clap(long, env, default_value = "1", value_parser = duration_from_days)]
pub order_events_cleanup_interval: Duration,

/// Age threshold in days for order events to be eligible for cleanup in the
/// `order_events` database table.
#[clap(long, env, default_value = "30", value_parser = duration_from_days)]
pub order_events_cleanup_threshold: Duration,
}

impl std::fmt::Display for Arguments {
Expand Down Expand Up @@ -270,6 +284,21 @@ impl std::fmt::Display for Arguments {
writeln!(f, "score_cap: {}", self.score_cap)?;
display_option(f, "shadow", &self.shadow)?;
writeln!(f, "solve_deadline: {:?}", self.solve_deadline)?;
writeln!(
f,
"order_events_cleanup_interval: {:?}",
self.order_events_cleanup_interval
)?;
writeln!(
f,
"order_events_cleanup_threshold: {:?}",
self.order_events_cleanup_threshold
)?;
Ok(())
}
}

fn duration_from_days(s: &str) -> Result<Duration, ParseFloatError> {
let days = s.parse::<f64>()?;
Ok(Duration::from_secs_f64(days * 86_400.0))
}
6 changes: 6 additions & 0 deletions crates/autopilot/src/database/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use {
order_events::{self, OrderEvent},
},
model::order::OrderUid,
sqlx::Error,
};

impl super::Postgres {
Expand All @@ -19,6 +20,11 @@ impl super::Postgres {
tracing::warn!(?err, "failed to insert order events");
}
}

/// Deletes events before the provided timestamp.
pub async fn delete_order_events_before(&self, timestamp: DateTime<Utc>) -> Result<u64, Error> {
order_events::delete_order_events_before(&self.0, timestamp).await
}
}

async fn store_order_events(
Expand Down
1 change: 1 addition & 0 deletions crates/autopilot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod driver_api;
pub mod driver_model;
pub mod event_updater;
pub mod on_settlement_event_updater;
pub mod periodic_db_cleanup;
pub mod protocol;
pub mod run;
pub mod run_loop;
Expand Down
177 changes: 177 additions & 0 deletions crates/autopilot/src/periodic_db_cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use {
crate::database::Postgres,
chrono::{DateTime, Utc},
std::time::Duration,
tokio::time,
};

pub struct OrderEventsCleanerConfig {
cleanup_interval: Duration,
event_age_threshold: chrono::Duration,
}

impl OrderEventsCleanerConfig {
pub fn new(cleanup_interval: Duration, event_age_threshold: Duration) -> Self {
OrderEventsCleanerConfig {
cleanup_interval,
event_age_threshold: chrono::Duration::from_std(event_age_threshold).unwrap(),
}
}
}

pub struct OrderEventsCleaner {
config: OrderEventsCleanerConfig,
db: Postgres,
}

impl OrderEventsCleaner {
pub fn new(config: OrderEventsCleanerConfig, db: Postgres) -> Self {
OrderEventsCleaner { config, db }
}

pub async fn run_forever(self) -> ! {
let mut interval = time::interval(self.config.cleanup_interval);
loop {
interval.tick().await;

let timestamp: DateTime<Utc> = Utc::now() - self.config.event_age_threshold;
match self.db.delete_order_events_before(timestamp).await {
Ok(affected_rows_count) => {
tracing::debug!(affected_rows_count, timestamp = %timestamp.to_string(), "order events cleanup");
Metrics::get().order_events_cleanup_total.inc()
}
Err(err) => {
tracing::warn!(?err, "failed to delete order events before {}", timestamp)
}
}
}
}
}

#[derive(prometheus_metric_storage::MetricStorage)]
struct Metrics {
/// The total number of successful `order_events` table cleanups
#[metric(name = "periodic_db_cleanup")]
order_events_cleanup_total: prometheus::IntCounter,
}

impl Metrics {
fn get() -> &'static Self {
Metrics::instance(observe::metrics::get_storage_registry()).unwrap()
}
}

#[cfg(test)]
mod tests {
use {
super::*,
database::{
byte_array::ByteArray,
order_events::{OrderEvent, OrderEventLabel},
},
itertools::Itertools,
sqlx::{PgPool, Row},
};

// Note: `tokio::time::advance` was not used in these tests. While it is a
// useful tool for controlling time flow in asynchronous tests, it causes
// complications when used with `sqlx::PgPool`. Specifically, pausing or
// advancing time with `tokio::time::advance` can interfere with the pool's
// ability to acquire database connections, leading to panics and unpredictable
// behavior in tests. Given these issues, tests were designed without
// manipulating the timer, to maintain stability and reliability in the
// database connection handling.
#[tokio::test]
#[ignore]
async fn order_events_cleaner_flow() {
let db = Postgres::new("postgresql://").await.unwrap();
let mut ex = db.0.begin().await.unwrap();
database::clear_DANGER_(&mut ex).await.unwrap();

let now = Utc::now();
let event_a = OrderEvent {
order_uid: ByteArray([1; 56]),
timestamp: now - chrono::Duration::milliseconds(300),
label: OrderEventLabel::Created,
};
let event_b = OrderEvent {
order_uid: ByteArray([2; 56]),
timestamp: now - chrono::Duration::milliseconds(100),
label: OrderEventLabel::Created,
};
let event_c = OrderEvent {
order_uid: ByteArray([3; 56]),
timestamp: now,
label: OrderEventLabel::Created,
};

database::order_events::insert_order_event(&mut ex, &event_a)
.await
.unwrap();
database::order_events::insert_order_event(&mut ex, &event_b)
.await
.unwrap();
database::order_events::insert_order_event(&mut ex, &event_c)
.await
.unwrap();

ex.commit().await.unwrap();

let ids = order_event_ids_before(&db.0).await;
assert_eq!(ids.len(), 3);
assert!(ids.contains(&event_a.order_uid));
assert!(ids.contains(&event_b.order_uid));
assert!(ids.contains(&event_c.order_uid));

let config =
OrderEventsCleanerConfig::new(Duration::from_millis(50), Duration::from_millis(200));
let cleaner = OrderEventsCleaner::new(config, db.clone());

tokio::task::spawn(cleaner.run_forever());

// delete `order_a` after the initialization
time::sleep(Duration::from_millis(20)).await;
let ids = order_event_ids_before(&db.0).await;
assert_eq!(ids.len(), 2);
assert!(!ids.contains(&event_a.order_uid));
assert!(ids.contains(&event_b.order_uid));
assert!(ids.contains(&event_c.order_uid));

// nothing deleted after the first interval
time::sleep(Duration::from_millis(50)).await;
let ids = order_event_ids_before(&db.0).await;
assert_eq!(ids.len(), 2);
assert!(!ids.contains(&event_a.order_uid));
assert!(ids.contains(&event_b.order_uid));
assert!(ids.contains(&event_c.order_uid));

// delete `event_b` only
time::sleep(Duration::from_millis(100)).await;
let ids = order_event_ids_before(&db.0).await;
assert_eq!(ids.len(), 1);
assert!(!ids.contains(&event_b.order_uid));
assert!(ids.contains(&event_c.order_uid));

// delete `event_c`
time::sleep(Duration::from_millis(200)).await;
let ids = order_event_ids_before(&db.0).await;
assert!(ids.is_empty());
}

async fn order_event_ids_before(pool: &PgPool) -> Vec<ByteArray<56>> {
const QUERY: &str = r#"
SELECT order_uid
FROM order_events
"#;
sqlx::query(QUERY)
.fetch_all(pool)
.await
.unwrap()
.iter()
.map(|row| {
let order_uid: ByteArray<56> = row.try_get(0).unwrap();
order_uid
})
.collect_vec()
}
}
15 changes: 15 additions & 0 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,21 @@ pub async fn run(args: Arguments) {
.instrument(tracing::info_span!("on_settlement_event_updater")),
);

let order_events_cleaner_config = crate::periodic_db_cleanup::OrderEventsCleanerConfig::new(
args.order_events_cleanup_interval,
args.order_events_cleanup_threshold,
);
let order_events_cleaner = crate::periodic_db_cleanup::OrderEventsCleaner::new(
order_events_cleaner_config,
db.clone(),
);

tokio::task::spawn(
order_events_cleaner
.run_forever()
.instrument(tracing::info_span!("order_events_cleaner")),
);

if args.enable_colocation {
if args.drivers.is_empty() {
panic!("colocation is enabled but no drivers are configured");
Expand Down
1 change: 1 addition & 0 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub type PgTransaction<'a> = sqlx::Transaction<'a, sqlx::Postgres>;
/// The names of all tables we use in the db.
pub const ALL_TABLES: &[&str] = &[
"orders",
"order_events",
"trades",
"invalidations",
"quotes",
Expand Down
32 changes: 24 additions & 8 deletions crates/database/src/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use {
crate::OrderUid,
chrono::Utc,
sqlx::{types::chrono::DateTime, PgConnection},
sqlx::{types::chrono::DateTime, PgConnection, PgPool},
};

/// Describes what kind of event was registered for an order.
Expand Down Expand Up @@ -51,13 +51,13 @@ pub async fn insert_order_event(
event: &OrderEvent,
) -> Result<(), sqlx::Error> {
const QUERY: &str = r#"
INSERT INTO order_events (
order_uid,
timestamp,
label
)
VALUES ($1, $2, $3)
"#;
INSERT INTO order_events (
order_uid,
timestamp,
label
)
VALUES ($1, $2, $3)
"#;
sqlx::query(QUERY)
.bind(event.order_uid)
.bind(event.timestamp)
Expand All @@ -66,3 +66,19 @@ VALUES ($1, $2, $3)
.await
.map(|_| ())
}

/// Deletes rows before the provided timestamp from the `order_events` table.
pub async fn delete_order_events_before(
pool: &PgPool,
timestamp: DateTime<Utc>,
) -> Result<u64, sqlx::Error> {
const QUERY: &str = r#"
DELETE FROM order_events
WHERE timestamp < $1
"#;
sqlx::query(QUERY)
.bind(timestamp)
.execute(pool)
.await
.map(|result| result.rows_affected())
}
2 changes: 1 addition & 1 deletion crates/driver/src/boundary/liquidity/balancer/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn to_interaction(
// change this assumption, we would need to change it there as well.
GPv2Settlement::at(&web3, receiver.0),
BalancerV2Vault::at(&web3, pool.vault.into()),
Arc::new(Allowances::empty(receiver.0)),
Allowances::empty(receiver.0),
);

let interaction = handler.swap(
Expand Down
2 changes: 2 additions & 0 deletions crates/shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use std::{
time::{Duration, Instant},
};

pub use rate_limiter::{RateLimiter, RateLimiterError, RateLimitingStrategy};

/// Run a future and callback with the time the future took. The call back can
/// for example log the time.
pub async fn measure_time<T>(future: impl Future<Output = T>, timer: impl FnOnce(Duration)) -> T {
Expand Down
Loading

0 comments on commit a004d54

Please sign in to comment.