From 3ea4aa9eb8ab4b583ccc54c1f36831af5c64f0f3 Mon Sep 17 00:00:00 2001 From: Mateo Date: Wed, 24 Apr 2024 11:06:51 +0200 Subject: [PATCH] Store auction with liquidity --- Cargo.lock | 1 + .../src/infra/persistence/dto/order.rs | 1 - crates/driver/Cargo.toml | 1 + crates/driver/src/domain/competition/mod.rs | 2 + crates/driver/src/domain/eth/gas.rs | 4 + crates/driver/src/infra/config/file/load.rs | 1 + crates/driver/src/infra/config/file/mod.rs | 6 +- crates/driver/src/infra/mod.rs | 1 + crates/driver/src/infra/persistence/dto.rs | 395 ++++++++++++++++++ crates/driver/src/infra/persistence/mod.rs | 77 ++++ crates/driver/src/infra/solver/mod.rs | 27 +- crates/driver/src/run.rs | 18 +- crates/s3/src/lib.rs | 2 +- 13 files changed, 524 insertions(+), 12 deletions(-) create mode 100644 crates/driver/src/infra/persistence/dto.rs create mode 100644 crates/driver/src/infra/persistence/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 0f300ef995..99b629925d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1661,6 +1661,7 @@ dependencies = [ "prometheus-metric-storage", "rand", "reqwest", + "s3", "secp256k1", "serde", "serde_json", diff --git a/crates/autopilot/src/infra/persistence/dto/order.rs b/crates/autopilot/src/infra/persistence/dto/order.rs index 85d5aab2b8..145ebc635b 100644 --- a/crates/autopilot/src/infra/persistence/dto/order.rs +++ b/crates/autopilot/src/infra/persistence/dto/order.rs @@ -258,7 +258,6 @@ impl From for domain::auction::order::EcdsaSignature { } } -#[serde_as] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum FeePolicy { diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index afd9f6f1a1..02cba3f674 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -17,6 +17,7 @@ path = "src/main.rs" [dependencies] app-data = { path = "../app-data" } bytes-hex = { path = "../bytes-hex" } +s3 = { path = "../s3" } async-trait = { workspace = true } axum = { workspace = true } bigdecimal = { workspace = true } diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index 0a55de5adc..9626895705 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -78,6 +78,8 @@ impl Competition { } })?; + self.solver + .store_auction_with_liquidity(auction, &liquidity); observe::postprocessing(&solutions, auction.deadline().driver()); // Discard solutions that don't have unique ID. diff --git a/crates/driver/src/domain/eth/gas.rs b/crates/driver/src/domain/eth/gas.rs index 3c53ed526c..c3b59c5954 100644 --- a/crates/driver/src/domain/eth/gas.rs +++ b/crates/driver/src/domain/eth/gas.rs @@ -71,6 +71,10 @@ impl GasPrice { .into() } + pub fn base(&self) -> FeePerGas { + self.max + } + pub fn max(&self) -> FeePerGas { self.max } diff --git a/crates/driver/src/infra/config/file/load.rs b/crates/driver/src/infra/config/file/load.rs index e5e8c02199..e99d587739 100644 --- a/crates/driver/src/infra/config/file/load.rs +++ b/crates/driver/src/infra/config/file/load.rs @@ -89,6 +89,7 @@ pub async fn load(chain: eth::ChainId, path: &Path) -> infra::Config { true => SolutionMerging::Allowed, false => SolutionMerging::Forbidden, }, + s3: config.s3, } })) .await, diff --git a/crates/driver/src/infra/config/file/mod.rs b/crates/driver/src/infra/config/file/mod.rs index 54a5e9e036..b199ebf2b6 100644 --- a/crates/driver/src/infra/config/file/mod.rs +++ b/crates/driver/src/infra/config/file/mod.rs @@ -1,6 +1,6 @@ pub use load::load; use { - crate::{domain::eth, util::serialize}, + crate::{domain::eth, infra::persistence::S3, util::serialize}, reqwest::Url, serde::{Deserialize, Serialize}, serde_with::serde_as, @@ -198,6 +198,10 @@ struct SolverConfig { /// auction together. #[serde(default)] merge_solutions: bool, + + /// S3 path for storing the auctions with liquidity + #[serde(default)] + s3: Option, } #[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Serialize)] diff --git a/crates/driver/src/infra/mod.rs b/crates/driver/src/infra/mod.rs index 2b809ed7f6..b33526f545 100644 --- a/crates/driver/src/infra/mod.rs +++ b/crates/driver/src/infra/mod.rs @@ -6,6 +6,7 @@ pub mod liquidity; pub mod mempool; pub mod notify; pub mod observe; +pub mod persistence; pub mod simulator; pub mod solver; pub mod time; diff --git a/crates/driver/src/infra/persistence/dto.rs b/crates/driver/src/infra/persistence/dto.rs new file mode 100644 index 0000000000..ab98e645da --- /dev/null +++ b/crates/driver/src/infra/persistence/dto.rs @@ -0,0 +1,395 @@ +use { + crate::domain::{ + competition::{ + self, + auction::{self}, + order::{self, fees}, + Auction, + }, + eth::{self}, + liquidity, + time, + }, + app_data::AppDataHash, + model::order::OrderUid, + number::serialization::HexOrDecimalU256, + primitive_types::{H160, U256}, + serde::Serialize, + serde_with::serde_as, + std::collections::BTreeMap, +}; + +#[serde_as] +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AuctionWithLiquidity { + prices: BTreeMap, + gas_price: GasPrice, + deadline: Deadline, + orders: Vec, + liquidity: Vec, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Deadline { + driver: i64, + solvers: i64, +} + +impl From for Deadline { + fn from(value: time::Deadline) -> Self { + Self { + driver: value.driver().timestamp(), + solvers: value.solvers().timestamp(), + } + } +} + +#[serde_as] +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Token { + decimals: Option, + symbol: Option, + address: H160, + #[serde_as(as = "Option")] + price: Option, + #[serde_as(as = "HexOrDecimalU256")] + available_balance: U256, + trusted: bool, +} + +impl From for Token { + fn from(value: auction::Token) -> Self { + Self { + decimals: value.decimals, + symbol: value.symbol, + address: value.address.into(), + price: value.price.map(Into::into), + available_balance: value.available_balance.into(), + trusted: value.trusted, + } + } +} + +#[serde_as] +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GasPrice { + #[serde_as(as = "HexOrDecimalU256")] + max: U256, + #[serde_as(as = "HexOrDecimalU256")] + tip: U256, + #[serde_as(as = "HexOrDecimalU256")] + base: U256, +} + +impl From for GasPrice { + fn from(value: eth::GasPrice) -> Self { + Self { + max: value.max().into(), + tip: value.tip().into(), + base: value.base().into(), + } + } +} + +impl AuctionWithLiquidity { + pub fn build(auction: &Auction, liquidity: &[liquidity::Liquidity]) -> Self { + Self { + prices: auction + .tokens() + .iter() + .cloned() + .map(|token| (token.address.into(), token.into())) + .collect::>(), + gas_price: auction.gas_price().into(), + deadline: auction.deadline().into(), + orders: auction.orders().iter().cloned().map(Into::into).collect(), + liquidity: liquidity.iter().cloned().map(Into::into).collect(), + } + } +} + +#[serde_as] +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Liquidity { + id: usize, + gas: U256, + kind: String, +} + +impl From for Liquidity { + fn from(value: crate::domain::Liquidity) -> Self { + Self { + id: value.id.into(), + gas: value.gas.into(), + kind: { + let kind: &'static str = (&value.kind).into(); + kind.to_string() + }, + } + } +} + +#[serde_as] +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Order { + uid: OrderUid, + sell_token: H160, + buy_token: H160, + #[serde_as(as = "HexOrDecimalU256")] + sell_amount: U256, + #[serde_as(as = "HexOrDecimalU256")] + buy_amount: U256, + protocol_fees: Vec, + valid_to: u32, + side: Side, + kind: Kind, + receiver: Option, + partial: Partial, + pre_interactions: Vec, + post_interactions: Vec, + sell_token_balance: SellTokenSource, + buy_token_balance: BuyTokenDestination, + app_data: AppDataHash, + #[serde(flatten)] + signature: Signature, +} + +impl From for Order { + fn from(value: competition::Order) -> Self { + Self { + uid: OrderUid(value.uid.0 .0), + sell_token: value.sell.token.into(), + buy_token: value.buy.token.into(), + sell_amount: value.sell.amount.into(), + buy_amount: value.buy.amount.into(), + protocol_fees: value.protocol_fees.into_iter().map(Into::into).collect(), + valid_to: value.valid_to.into(), + kind: value.kind.into(), + side: value.side.into(), + receiver: value.receiver.map(Into::into), + partial: value.partial.into(), + pre_interactions: value.pre_interactions.into_iter().map(Into::into).collect(), + post_interactions: value + .post_interactions + .into_iter() + .map(Into::into) + .collect(), + sell_token_balance: value.sell_token_balance.into(), + buy_token_balance: value.buy_token_balance.into(), + app_data: AppDataHash(value.app_data.0 .0.into()), + signature: Signature { + signing_scheme: value.signature.scheme.into(), + signature: value.signature.data.into(), + }, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "snake_case")] +enum Side { + Buy, + Sell, +} + +impl From for Side { + fn from(value: order::Side) -> Self { + match value { + order::Side::Buy => Self::Buy, + order::Side::Sell => Self::Sell, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "snake_case")] +pub enum SellTokenSource { + Erc20, + External, + Internal, +} + +impl From for SellTokenSource { + fn from(value: order::SellTokenBalance) -> Self { + match value { + order::SellTokenBalance::Erc20 => Self::Erc20, + order::SellTokenBalance::Internal => Self::Internal, + order::SellTokenBalance::External => Self::External, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "snake_case")] +enum BuyTokenDestination { + Erc20, + Internal, +} + +impl From for BuyTokenDestination { + fn from(value: order::BuyTokenBalance) -> Self { + match value { + order::BuyTokenBalance::Erc20 => Self::Erc20, + order::BuyTokenBalance::Internal => Self::Internal, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +enum Partial { + Yes { available: U256 }, + No, +} + +impl From for Partial { + fn from(value: order::Partial) -> Self { + match value { + order::Partial::Yes { available } => Self::Yes { + available: available.into(), + }, + order::Partial::No => Self::No, + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +enum FeePolicy { + #[serde(rename_all = "camelCase")] + Surplus { factor: f64, max_volume_factor: f64 }, + #[serde(rename_all = "camelCase")] + PriceImprovement { + factor: f64, + max_volume_factor: f64, + quote: Quote, + }, + #[serde(rename_all = "camelCase")] + Volume { factor: f64 }, +} + +impl From for FeePolicy { + fn from(value: order::FeePolicy) -> Self { + match value { + order::FeePolicy::Surplus { + factor, + max_volume_factor, + } => Self::Surplus { + factor, + max_volume_factor, + }, + order::FeePolicy::PriceImprovement { + factor, + max_volume_factor, + quote, + } => Self::PriceImprovement { + factor, + max_volume_factor, + quote: quote.into(), + }, + order::FeePolicy::Volume { factor } => Self::Volume { factor }, + } + } +} + +#[serde_as] +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Quote { + #[serde_as(as = "HexOrDecimalU256")] + sell_amount: U256, + sell_token: H160, + #[serde_as(as = "HexOrDecimalU256")] + buy_amount: U256, + buy_token: H160, + #[serde_as(as = "HexOrDecimalU256")] + fee_amount: U256, + fee_token: H160, +} + +impl From for Quote { + fn from(value: fees::Quote) -> Self { + Self { + sell_amount: value.sell.amount.into(), + sell_token: value.sell.token.into(), + buy_amount: value.buy.amount.into(), + buy_token: value.buy.token.into(), + fee_amount: value.fee.amount.into(), + fee_token: value.fee.token.into(), + } + } +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +enum Kind { + Market, + Limit, + Liquidity, +} + +impl From for Kind { + fn from(value: order::Kind) -> Self { + match value { + order::Kind::Market => Self::Market, + order::Kind::Limit => Self::Limit, + order::Kind::Liquidity => Self::Liquidity, + } + } +} + +/// Signature over the order data. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +struct Signature { + signing_scheme: Scheme, + #[serde(with = "bytes_hex")] + signature: Vec, +} + +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "lowercase")] +enum Scheme { + Eip712, + EthSign, + Eip1271, + PreSign, +} + +impl From for Scheme { + fn from(value: order::signature::Scheme) -> Self { + match value { + order::signature::Scheme::Eip712 => Self::Eip1271, + order::signature::Scheme::EthSign => Self::EthSign, + order::signature::Scheme::Eip1271 => Self::Eip712, + order::signature::Scheme::PreSign => Self::PreSign, + } + } +} + +#[serde_as] +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Interaction { + target: H160, + #[serde_as(as = "HexOrDecimalU256")] + value: U256, + #[serde(with = "bytes_hex")] + call_data: Vec, +} + +impl From for Interaction { + fn from(value: eth::Interaction) -> Self { + Self { + target: value.target.into(), + value: value.value.into(), + call_data: value.call_data.into(), + } + } +} diff --git a/crates/driver/src/infra/persistence/mod.rs b/crates/driver/src/infra/persistence/mod.rs new file mode 100644 index 0000000000..f5f9781249 --- /dev/null +++ b/crates/driver/src/infra/persistence/mod.rs @@ -0,0 +1,77 @@ +mod dto; + +use { + crate::{ + domain::{ + competition::{auction::Id, Auction}, + liquidity, + }, + infra::solver::Config, + }, + serde::Deserialize, + std::sync::Arc, + tracing::Instrument, +}; + +#[derive(Clone, Debug, Default, Deserialize)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +pub struct S3 { + /// The s3_instance_upload_* arguments configure how auction instances + /// should be uploaded to AWS S3. + /// They must either all be set or all not set. + pub s3_instance_upload_bucket: String, + + /// Prepended to the auction id to form the final instance filename on S3. + /// Something like "staging/mainnet/" + pub s3_instance_upload_filename_prefix: String, +} + +impl From for s3::Config { + fn from(value: S3) -> Self { + Self { + bucket: value.s3_instance_upload_bucket, + filename_prefix: value.s3_instance_upload_filename_prefix, + } + } +} + +#[derive(Clone, Debug)] +pub struct Persistence { + s3: Option>, +} + +impl Persistence { + pub async fn build(config: &Config) -> Self { + if let Some(s3) = &config.s3 { + Self { + s3: Some(Arc::new(s3::Uploader::new(s3.clone().into()).await)), + } + } else { + Self { s3: None } + } + } + + /// Saves the given auction with liquidity + pub fn archive_auction(&self, id: Id, auction: &Auction, liquidity: &[liquidity::Liquidity]) { + let Some(uploader) = self.s3.clone() else { + return; + }; + let auction_with_liquidity = dto::AuctionWithLiquidity::build(auction, liquidity); + tokio::spawn( + async move { + match uploader + .upload(id.to_string(), auction_with_liquidity) + .await + { + Ok(key) => { + tracing::info!(?key, "uploaded auction to s3"); + } + Err(err) => { + tracing::warn!(?err, "failed to upload auction to s3"); + } + } + } + .instrument(tracing::Span::current()), + ); + } +} diff --git a/crates/driver/src/infra/solver/mod.rs b/crates/driver/src/infra/solver/mod.rs index ac13aa60c5..fd7d633b13 100644 --- a/crates/driver/src/infra/solver/mod.rs +++ b/crates/driver/src/infra/solver/mod.rs @@ -10,7 +10,11 @@ use { liquidity, time::Remaining, }, - infra::{blockchain::Ethereum, config::file::FeeHandler}, + infra::{ + blockchain::Ethereum, + config::file::FeeHandler, + persistence::{Persistence, S3}, + }, util, }, anyhow::Result, @@ -85,6 +89,7 @@ pub struct Solver { client: reqwest::Client, config: Config, eth: Ethereum, + persistence: Persistence, } #[derive(Debug, Clone)] @@ -108,10 +113,12 @@ pub struct Config { /// TODO: Remove once all solvers are moved to use limit orders for quoting pub quote_using_limit_orders: bool, pub merge_solutions: SolutionMerging, + /// S3 path for storing the auctions with liquidity + pub s3: Option, } impl Solver { - pub fn new(config: Config, eth: Ethereum) -> Result { + pub async fn new(config: Config, eth: Ethereum) -> Result { let mut headers = reqwest::header::HeaderMap::new(); headers.insert( reqwest::header::CONTENT_TYPE, @@ -124,12 +131,15 @@ impl Solver { headers.insert(header_name, val.parse()?); } + let persistence = Persistence::build(&config).await; + Ok(Self { client: reqwest::ClientBuilder::new() .default_headers(headers) .build()?, config, eth, + persistence, }) } @@ -230,6 +240,19 @@ impl Solver { }; tokio::task::spawn(future.in_current_span()); } + + /// Store the auction with liquidity in a S3 bucket + pub fn store_auction_with_liquidity( + &self, + auction: &Auction, + liquidity: &[liquidity::Liquidity], + ) { + let Some(auction_id) = auction.id() else { + return; + }; + self.persistence + .archive_auction(auction_id, auction, liquidity); + } } /// Controls whether or not the driver is allowed to merge multiple solutions diff --git a/crates/driver/src/run.rs b/crates/driver/src/run.rs index 0c0b11ceb5..87ea40ca6a 100644 --- a/crates/driver/src/run.rs +++ b/crates/driver/src/run.rs @@ -13,6 +13,7 @@ use { }, }, clap::Parser, + futures::future::join_all, std::{net::SocketAddr, sync::Arc, time::Duration}, tokio::sync::oneshot, }; @@ -51,7 +52,7 @@ async fn run_with(args: cli::Args, addr_sender: Option Ethereum { Ethereum::new(ethrpc, config.contracts, gas).await } -fn solvers(config: &config::Config, eth: &Ethereum) -> Vec { - config - .solvers - .iter() - .map(|config| Solver::new(config.clone(), eth.clone()).unwrap()) - .collect() +async fn solvers(config: &config::Config, eth: &Ethereum) -> Vec { + join_all( + config + .solvers + .iter() + .map(|config| async move { Solver::new(config.clone(), eth.clone()).await.unwrap() }) + .collect::>(), + ) + .await } async fn liquidity(config: &config::Config, eth: &Ethereum) -> liquidity::Fetcher { diff --git a/crates/s3/src/lib.rs b/crates/s3/src/lib.rs index 197a196e0d..e1e3ea28a7 100644 --- a/crates/s3/src/lib.rs +++ b/crates/s3/src/lib.rs @@ -15,7 +15,7 @@ pub struct Config { pub filename_prefix: String, } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Uploader { bucket: String, filename_prefix: String,