diff --git a/crates/driver/src/boundary/liquidity/zeroex.rs b/crates/driver/src/boundary/liquidity/zeroex.rs index 9dc2cfbf81..761344d8bc 100644 --- a/crates/driver/src/boundary/liquidity/zeroex.rs +++ b/crates/driver/src/boundary/liquidity/zeroex.rs @@ -93,9 +93,9 @@ pub async fn collector( http_client_factory.builder(), config.base_url.clone(), config.api_key.clone(), - blocks, + blocks.clone(), )?); - Ok(Box::new(ZeroExLiquidity::new( - web3, api, contract, settlement, - ))) + Ok(Box::new( + ZeroExLiquidity::new(web3, api, contract, settlement, blocks).await, + )) } diff --git a/crates/solver/src/liquidity/zeroex.rs b/crates/solver/src/liquidity/zeroex.rs index 417d862698..209275aece 100644 --- a/crates/solver/src/liquidity/zeroex.rs +++ b/crates/solver/src/liquidity/zeroex.rs @@ -11,6 +11,8 @@ use { }, anyhow::Result, contracts::{GPv2Settlement, IZeroEx}, + ethrpc::current_block::{into_stream, CurrentBlockStream}, + futures::StreamExt, model::{order::OrderKind, TokenPair}, primitive_types::{H160, U256}, shared::{ @@ -23,25 +25,39 @@ use { collections::{HashMap, HashSet}, sync::Arc, }, + tokio::sync::RwLock, }; +type OrderbookCache = RwLock>; + pub struct ZeroExLiquidity { - pub api: Arc, pub zeroex: Arc, - pub gpv2: GPv2Settlement, pub allowance_manager: Box, + pub orderbook_cache: Arc, } type OrderBuckets = HashMap<(H160, H160), Vec>; impl ZeroExLiquidity { - pub fn new(web3: Web3, api: Arc, zeroex: IZeroEx, gpv2: GPv2Settlement) -> Self { - let allowance_manager = AllowanceManager::new(web3, gpv2.address()); + pub async fn new( + web3: Web3, + api: Arc, + zeroex: IZeroEx, + gpv2: GPv2Settlement, + blocks_stream: CurrentBlockStream, + ) -> Self { + let gpv2_address = gpv2.address(); + let allowance_manager = AllowanceManager::new(web3, gpv2_address); + let orderbook_cache = Arc::new(RwLock::new(vec![])); + let cache = orderbook_cache.clone(); + tokio::spawn(async move { + Self::run_orderbook_fetching(api, blocks_stream, cache, gpv2_address).await + }); + Self { - api, zeroex: Arc::new(zeroex), - gpv2, allowance_manager: Box::new(allowance_manager), + orderbook_cache, } } @@ -77,6 +93,40 @@ impl ZeroExLiquidity { }; Some(Liquidity::LimitOrder(limit_order)) } + + async fn run_orderbook_fetching( + api: Arc, + blocks_stream: CurrentBlockStream, + orderbook_cache: Arc, + gpv2_address: H160, + ) { + let mut block_stream = into_stream(blocks_stream); + while block_stream.next().await.is_some() { + let queries = &[ + // orders fillable by anyone + OrdersQuery::default(), + // orders fillable only by our settlement contract + OrdersQuery { + sender: Some(gpv2_address), + ..Default::default() + }, + ]; + let zeroex_orders_results = + futures::future::join_all(queries.iter().map(|query| api.get_orders(query))).await; + let zeroex_orders = zeroex_orders_results + .into_iter() + .flat_map(|result| { + result.unwrap_or_else(|err| { + tracing::error!(?err, "ZeroExResponse error during liqudity fetching"); + vec![] + }) + }) + .collect(); + + let mut cache = orderbook_cache.write().await; + *cache = zeroex_orders; + } + } } #[async_trait::async_trait] @@ -86,28 +136,7 @@ impl LiquidityCollecting for ZeroExLiquidity { pairs: HashSet, _block: Block, ) -> Result> { - let queries = &[ - // orders fillable by anyone - OrdersQuery::default(), - // orders fillable only by our settlement contract - OrdersQuery { - sender: Some(self.gpv2.address()), - ..Default::default() - }, - ]; - - let zeroex_orders_results = - futures::future::join_all(queries.iter().map(|query| self.api.get_orders(query))).await; - let zeroex_orders = zeroex_orders_results - .into_iter() - .flat_map(|result| match result { - Ok(order_record_vec) => order_record_vec, - Err(err) => { - tracing::warn!("ZeroExResponse error during liqudity fetching: {}", err); - vec![] - } - }); - + let zeroex_orders = self.orderbook_cache.read().await.clone(); let order_buckets = generate_order_buckets(zeroex_orders, pairs); let filtered_zeroex_orders = get_useful_orders(order_buckets, 5); let tokens: HashSet<_> = filtered_zeroex_orders @@ -131,12 +160,13 @@ impl LiquidityCollecting for ZeroExLiquidity { } fn generate_order_buckets( - zeroex_orders: impl Iterator, + zeroex_orders: Vec, relevant_pairs: HashSet, ) -> OrderBuckets { // divide orders in buckets let mut buckets = OrderBuckets::default(); zeroex_orders + .into_iter() .filter( |record| match TokenPair::new(record.order.taker_token, record.order.maker_token) { Some(pair) => relevant_pairs.contains(&pair), @@ -249,8 +279,7 @@ pub mod tests { let order_1 = order_with_tokens(token_a, token_b); let order_2 = order_with_tokens(token_b, token_a); let order_3 = order_with_tokens(token_b, token_a); - let order_buckets = - generate_order_buckets([order_1, order_2, order_3].into_iter(), relevant_pairs); + let order_buckets = generate_order_buckets(vec![order_1, order_2, order_3], relevant_pairs); assert_eq!(order_buckets.keys().len(), 2); assert_eq!(order_buckets[&(token_a, token_b)].len(), 1); assert_eq!(order_buckets[&(token_b, token_a)].len(), 2); @@ -275,8 +304,7 @@ pub mod tests { let order_1 = order_with_tokens(token_ignore, token_b); let order_2 = order_with_tokens(token_a, token_ignore); let order_3 = order_with_tokens(token_ignore, token_ignore); - let order_buckets = - generate_order_buckets([order_1, order_2, order_3].into_iter(), relevant_pairs); + let order_buckets = generate_order_buckets(vec![order_1, order_2, order_3], relevant_pairs); let filtered_zeroex_orders = get_useful_orders(order_buckets, 1); assert_eq!(filtered_zeroex_orders.len(), 0); } @@ -302,8 +330,7 @@ pub mod tests { let order_1 = order_with_fillable_amount(1_000); let order_2 = order_with_fillable_amount(100); let order_3 = order_with_fillable_amount(10_000); - let order_buckets = - generate_order_buckets([order_1, order_2, order_3].into_iter(), relevant_pairs); + let order_buckets = generate_order_buckets(vec![order_1, order_2, order_3], relevant_pairs); let filtered_zeroex_orders = get_useful_orders(order_buckets, 1); assert_eq!(filtered_zeroex_orders.len(), 2); assert_eq!( @@ -341,8 +368,7 @@ pub mod tests { let order_1 = order_with_amount(10_000_000, 1_000_000); let order_2 = order_with_amount(1_000, 100); let order_3 = order_with_amount(100_000, 1_000); - let order_buckets = - generate_order_buckets([order_1, order_2, order_3].into_iter(), relevant_pairs); + let order_buckets = generate_order_buckets(vec![order_1, order_2, order_3], relevant_pairs); let filtered_zeroex_orders = get_useful_orders(order_buckets, 1); assert_eq!(filtered_zeroex_orders.len(), 2); // First item in the list will be on the basis of maker_amount/taker_amount