Skip to content

Commit

Permalink
0x liquidity cache (#2639)
Browse files Browse the repository at this point in the history
  • Loading branch information
squadgazzz authored Apr 22, 2024
1 parent fa52caa commit 478b26d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 41 deletions.
8 changes: 4 additions & 4 deletions crates/driver/src/boundary/liquidity/zeroex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ pub async fn collector(
http_client_factory.builder(),
config.base_url.clone(),
config.api_key.clone(),
blocks,
blocks.clone(),
)?);
Ok(Box::new(ZeroExLiquidity::new(
web3, api, contract, settlement,
)))
Ok(Box::new(
ZeroExLiquidity::new(web3, api, contract, settlement, blocks).await,
))
}
100 changes: 63 additions & 37 deletions crates/solver/src/liquidity/zeroex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use {
},
anyhow::Result,
contracts::{GPv2Settlement, IZeroEx},
ethrpc::current_block::{into_stream, CurrentBlockStream},
futures::StreamExt,
model::{order::OrderKind, TokenPair},
primitive_types::{H160, U256},
shared::{
Expand All @@ -23,25 +25,39 @@ use {
collections::{HashMap, HashSet},
sync::Arc,
},
tokio::sync::RwLock,
};

type OrderbookCache = RwLock<Vec<OrderRecord>>;

pub struct ZeroExLiquidity {
pub api: Arc<dyn ZeroExApi>,
pub zeroex: Arc<IZeroEx>,
pub gpv2: GPv2Settlement,
pub allowance_manager: Box<dyn AllowanceManaging>,
pub orderbook_cache: Arc<OrderbookCache>,
}

type OrderBuckets = HashMap<(H160, H160), Vec<OrderRecord>>;

impl ZeroExLiquidity {
pub fn new(web3: Web3, api: Arc<dyn ZeroExApi>, zeroex: IZeroEx, gpv2: GPv2Settlement) -> Self {
let allowance_manager = AllowanceManager::new(web3, gpv2.address());
pub async fn new(
web3: Web3,
api: Arc<dyn ZeroExApi>,
zeroex: IZeroEx,
gpv2: GPv2Settlement,
blocks_stream: CurrentBlockStream,
) -> Self {
let gpv2_address = gpv2.address();
let allowance_manager = AllowanceManager::new(web3, gpv2_address);
let orderbook_cache = Arc::new(RwLock::new(vec![]));
let cache = orderbook_cache.clone();
tokio::spawn(async move {
Self::run_orderbook_fetching(api, blocks_stream, cache, gpv2_address).await
});

Self {
api,
zeroex: Arc::new(zeroex),
gpv2,
allowance_manager: Box::new(allowance_manager),
orderbook_cache,
}
}

Expand Down Expand Up @@ -77,6 +93,40 @@ impl ZeroExLiquidity {
};
Some(Liquidity::LimitOrder(limit_order))
}

async fn run_orderbook_fetching(
api: Arc<dyn ZeroExApi>,
blocks_stream: CurrentBlockStream,
orderbook_cache: Arc<OrderbookCache>,
gpv2_address: H160,
) {
let mut block_stream = into_stream(blocks_stream);
while block_stream.next().await.is_some() {
let queries = &[
// orders fillable by anyone
OrdersQuery::default(),
// orders fillable only by our settlement contract
OrdersQuery {
sender: Some(gpv2_address),
..Default::default()
},
];
let zeroex_orders_results =
futures::future::join_all(queries.iter().map(|query| api.get_orders(query))).await;
let zeroex_orders = zeroex_orders_results
.into_iter()
.flat_map(|result| {
result.unwrap_or_else(|err| {
tracing::error!(?err, "ZeroExResponse error during liqudity fetching");
vec![]
})
})
.collect();

let mut cache = orderbook_cache.write().await;
*cache = zeroex_orders;
}
}
}

#[async_trait::async_trait]
Expand All @@ -86,28 +136,7 @@ impl LiquidityCollecting for ZeroExLiquidity {
pairs: HashSet<TokenPair>,
_block: Block,
) -> Result<Vec<Liquidity>> {
let queries = &[
// orders fillable by anyone
OrdersQuery::default(),
// orders fillable only by our settlement contract
OrdersQuery {
sender: Some(self.gpv2.address()),
..Default::default()
},
];

let zeroex_orders_results =
futures::future::join_all(queries.iter().map(|query| self.api.get_orders(query))).await;
let zeroex_orders = zeroex_orders_results
.into_iter()
.flat_map(|result| match result {
Ok(order_record_vec) => order_record_vec,
Err(err) => {
tracing::warn!("ZeroExResponse error during liqudity fetching: {}", err);
vec![]
}
});

let zeroex_orders = self.orderbook_cache.read().await.clone();
let order_buckets = generate_order_buckets(zeroex_orders, pairs);
let filtered_zeroex_orders = get_useful_orders(order_buckets, 5);
let tokens: HashSet<_> = filtered_zeroex_orders
Expand All @@ -131,12 +160,13 @@ impl LiquidityCollecting for ZeroExLiquidity {
}

fn generate_order_buckets(
zeroex_orders: impl Iterator<Item = OrderRecord>,
zeroex_orders: Vec<OrderRecord>,
relevant_pairs: HashSet<TokenPair>,
) -> OrderBuckets {
// divide orders in buckets
let mut buckets = OrderBuckets::default();
zeroex_orders
.into_iter()
.filter(
|record| match TokenPair::new(record.order.taker_token, record.order.maker_token) {
Some(pair) => relevant_pairs.contains(&pair),
Expand Down Expand Up @@ -249,8 +279,7 @@ pub mod tests {
let order_1 = order_with_tokens(token_a, token_b);
let order_2 = order_with_tokens(token_b, token_a);
let order_3 = order_with_tokens(token_b, token_a);
let order_buckets =
generate_order_buckets([order_1, order_2, order_3].into_iter(), relevant_pairs);
let order_buckets = generate_order_buckets(vec![order_1, order_2, order_3], relevant_pairs);
assert_eq!(order_buckets.keys().len(), 2);
assert_eq!(order_buckets[&(token_a, token_b)].len(), 1);
assert_eq!(order_buckets[&(token_b, token_a)].len(), 2);
Expand All @@ -275,8 +304,7 @@ pub mod tests {
let order_1 = order_with_tokens(token_ignore, token_b);
let order_2 = order_with_tokens(token_a, token_ignore);
let order_3 = order_with_tokens(token_ignore, token_ignore);
let order_buckets =
generate_order_buckets([order_1, order_2, order_3].into_iter(), relevant_pairs);
let order_buckets = generate_order_buckets(vec![order_1, order_2, order_3], relevant_pairs);
let filtered_zeroex_orders = get_useful_orders(order_buckets, 1);
assert_eq!(filtered_zeroex_orders.len(), 0);
}
Expand All @@ -302,8 +330,7 @@ pub mod tests {
let order_1 = order_with_fillable_amount(1_000);
let order_2 = order_with_fillable_amount(100);
let order_3 = order_with_fillable_amount(10_000);
let order_buckets =
generate_order_buckets([order_1, order_2, order_3].into_iter(), relevant_pairs);
let order_buckets = generate_order_buckets(vec![order_1, order_2, order_3], relevant_pairs);
let filtered_zeroex_orders = get_useful_orders(order_buckets, 1);
assert_eq!(filtered_zeroex_orders.len(), 2);
assert_eq!(
Expand Down Expand Up @@ -341,8 +368,7 @@ pub mod tests {
let order_1 = order_with_amount(10_000_000, 1_000_000);
let order_2 = order_with_amount(1_000, 100);
let order_3 = order_with_amount(100_000, 1_000);
let order_buckets =
generate_order_buckets([order_1, order_2, order_3].into_iter(), relevant_pairs);
let order_buckets = generate_order_buckets(vec![order_1, order_2, order_3], relevant_pairs);
let filtered_zeroex_orders = get_useful_orders(order_buckets, 1);
assert_eq!(filtered_zeroex_orders.len(), 2);
// First item in the list will be on the basis of maker_amount/taker_amount
Expand Down

0 comments on commit 478b26d

Please sign in to comment.