diff --git a/Cargo.lock b/Cargo.lock index 49b23859b9..b6269ed711 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1635,6 +1635,7 @@ dependencies = [ "chrono", "clap", "contracts", + "derivative", "ethabi", "ethcontract", "ethereum-types", diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index 46d273fb05..0bf2165aa3 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -19,6 +19,7 @@ async-trait = "0.1" axum = "0.6" bigdecimal = "0.3" chrono = { version = "0.4", features = ["clock"], default-features = false } +derivative = { workspace = true } ethabi = "18.0" ethereum-types = "0.14" ethrpc = { path = "../ethrpc" } diff --git a/crates/driver/src/boundary/liquidity/mod.rs b/crates/driver/src/boundary/liquidity/mod.rs index 6331e59c2d..ae2e658c6c 100644 --- a/crates/driver/src/boundary/liquidity/mod.rs +++ b/crates/driver/src/boundary/liquidity/mod.rs @@ -7,7 +7,6 @@ use { anyhow::Result, ethrpc::current_block::CurrentBlockStream, futures::future, - itertools::Itertools, model::TokenPair, shared::{ baseline_solver::BaseTokens, @@ -30,6 +29,7 @@ use { pub mod balancer; pub mod swapr; pub mod uniswap; +mod zeroex; /// The default poll interval for the block stream updating task. const BLOCK_POLL_INTERVAL: Duration = Duration::from_secs(1); @@ -67,26 +67,22 @@ impl Fetcher { let block_stream = blocks.stream(boundary::web3(eth)).await?; let block_retriever = blocks.retriever(boundary::web3(eth)); - let uni_v2: Vec<_> = future::join_all( + let uni_v2: Vec<_> = future::try_join_all( config .uniswap_v2 .iter() .map(|config| uniswap::v2::collector(eth, &block_stream, config)), ) - .await - .into_iter() - .try_collect()?; + .await?; let swapr_routers = config.swapr.iter().map(|config| config.router).collect(); - let swapr: Vec<_> = future::join_all( + let swapr: Vec<_> = future::try_join_all( config .swapr .iter() .map(|config| swapr::collector(eth, &block_stream, config)), ) - .await - .into_iter() - .try_collect()?; + .await?; let bal_v2: Vec<_> = config .balancer_v2 @@ -102,6 +98,16 @@ impl Fetcher { .map(|config| uniswap::v3::collector(eth, block_retriever.clone(), config)) .collect(); + let zeroex: Vec<_> = future::try_join_all( + config + .zeroex + .as_ref() + .map(|config| zeroex::collector(eth, block_stream.clone(), config)) + .into_iter() + .collect::>(), + ) + .await?; + let base_tokens = BaseTokens::new( eth.contracts().weth().address(), &config @@ -115,7 +121,7 @@ impl Fetcher { Ok(Self { blocks: block_stream, inner: LiquidityCollector { - liquidity_sources: [uni_v2, swapr, bal_v2, uni_v3] + liquidity_sources: [uni_v2, swapr, bal_v2, uni_v3, zeroex] .into_iter() .flatten() .collect(), diff --git a/crates/driver/src/boundary/liquidity/zeroex.rs b/crates/driver/src/boundary/liquidity/zeroex.rs new file mode 100644 index 0000000000..8a0eb8c4c6 --- /dev/null +++ b/crates/driver/src/boundary/liquidity/zeroex.rs @@ -0,0 +1,29 @@ +use { + crate::{infra, infra::Ethereum}, + ethrpc::current_block::CurrentBlockStream, + shared::{http_client::HttpClientFactory, zeroex_api::DefaultZeroExApi}, + solver::{liquidity::zeroex::ZeroExLiquidity, liquidity_collector::LiquidityCollecting}, + std::sync::Arc, +}; + +pub async fn collector( + eth: &Ethereum, + blocks: CurrentBlockStream, + config: &infra::liquidity::config::ZeroEx, +) -> anyhow::Result> { + let settlement = eth.contracts().settlement().clone(); + let web3 = settlement.raw_instance().web3().clone(); + let contract = contracts::IZeroEx::deployed(&web3).await?; + let http_client_factory = &HttpClientFactory::new(&shared::http_client::Arguments { + http_timeout: config.http_timeout, + }); + let api = Arc::new(DefaultZeroExApi::new( + http_client_factory, + config.base_url.clone(), + config.api_key.clone(), + blocks, + )?); + Ok(Box::new(ZeroExLiquidity::new( + web3, api, contract, settlement, + ))) +} diff --git a/crates/driver/src/infra/config/file/load.rs b/crates/driver/src/infra/config/file/load.rs index bd6f8f061e..5d75387842 100644 --- a/crates/driver/src/infra/config/file/load.rs +++ b/crates/driver/src/infra/config/file/load.rs @@ -240,6 +240,14 @@ pub async fn load(network: &blockchain::Network, path: &Path) -> infra::Config { }, }) .collect(), + zeroex: config + .liquidity + .zeroex + .map(|config| liquidity::config::ZeroEx { + base_url: config.base_url, + api_key: config.api_key, + http_timeout: config.http_timeout, + }), }, mempools: config .submission diff --git a/crates/driver/src/infra/config/file/mod.rs b/crates/driver/src/infra/config/file/mod.rs index be8b4e238b..0e50a83308 100644 --- a/crates/driver/src/infra/config/file/mod.rs +++ b/crates/driver/src/infra/config/file/mod.rs @@ -286,6 +286,10 @@ struct LiquidityConfig { #[serde(default)] balancer_v2: Vec, + /// Liquidity provided by 0x API. + #[serde(default)] + zeroex: Option, + /// The base URL used to connect to subgraph clients. graph_api_base_url: Option, } @@ -437,3 +441,21 @@ enum BalancerV2Config { enum BalancerV2Preset { BalancerV2, } + +#[derive(Clone, Debug, Deserialize)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +struct ZeroExConfig { + #[serde(default = "default_zeroex_base_url")] + pub base_url: String, + pub api_key: Option, + #[serde(with = "humantime_serde", default = "default_http_timeout")] + pub http_timeout: Duration, +} + +fn default_zeroex_base_url() -> String { + "https://api.0x.org/".to_string() +} + +fn default_http_timeout() -> Duration { + Duration::from_secs(10) +} diff --git a/crates/driver/src/infra/liquidity/config.rs b/crates/driver/src/infra/liquidity/config.rs index 22e3448da8..cf556097c3 100644 --- a/crates/driver/src/infra/liquidity/config.rs +++ b/crates/driver/src/infra/liquidity/config.rs @@ -1,5 +1,6 @@ use { crate::{domain::eth, infra::blockchain::contracts::deployment_address}, + derivative::Derivative, hex_literal::hex, reqwest::Url, std::{collections::HashSet, time::Duration}, @@ -27,6 +28,9 @@ pub struct Config { /// The collection of Balancer V2 compatible exchanges to fetch liquidity /// for. pub balancer_v2: Vec, + + /// 0x liquidity fetcher. + pub zeroex: Option, } /// Uniswap V2 (and Uniswap V2 clone) liquidity fetching options. @@ -231,3 +235,13 @@ impl BalancerV2 { }) } } + +/// ZeroEx liquidity fetching options. +#[derive(Clone, Derivative)] +#[derivative(Debug)] +pub struct ZeroEx { + pub base_url: String, + #[derivative(Debug = "ignore")] + pub api_key: Option, + pub http_timeout: Duration, +}