Skip to content

Commit

Permalink
Avoid double balance caching race condition (#2067)
Browse files Browse the repository at this point in the history
# Description
While working on dockerizing the e2e tests I noticed that we can run
into an annoying race condition caused by caching balances in 2 places
(balancer fetcher and solvable orders).
Given this scenario:
1. start solvable orders block
2. it notices a new block appeared so it asks the balance fetcher for
the new balances
3. balancer fetcher did not yet see the new block so it returns the old
unchanged balances
4. solvable orders caches old balances

This case is not a big deal in production but I'm fairly confident this
causes our flaky e2e tests. Because those e2e tests only produce blocks
when a tx needs to get mined we can encounter the scenario where that
test waits for an order to be settled which will never happen because
the `autopilot` never noticed that the user has the required balances by
now.

# Changes
Removes caching in the `SolvableOrdersCache` because we already have a
cache in the balance fetcher. Also we wouldn't really want to build new
auction on the same block anyway so we shouldn't even be able to use
balances cached by the `SolvableOrderCache`.

## How to test
e2e tests still pass
  • Loading branch information
MartinquaXD authored Nov 21, 2023
1 parent 148a922 commit def4f38
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 44 deletions.
54 changes: 11 additions & 43 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use {
},
std::{
collections::{BTreeMap, HashMap, HashSet},
iter::FromIterator,
sync::{Arc, Mutex, Weak},
time::Duration,
},
Expand Down Expand Up @@ -86,7 +85,6 @@ type Balances = HashMap<Query, U256>;
struct Inner {
auction: Option<Auction>,
orders: SolvableOrders,
balances: Balances,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -128,7 +126,6 @@ impl SolvableOrdersCache {
latest_settlement_block: 0,
block: 0,
},
balances: Default::default(),
}),
native_price_estimator,
signature_validator,
Expand Down Expand Up @@ -176,21 +173,13 @@ impl SolvableOrdersCache {
let removed = counter.checkpoint("invalid_signature", &orders);
order_events.extend(removed.into_iter().map(|o| (o, OrderEventLabel::Invalid)));

// If we update due to an explicit notification we can reuse existing balances
// as they cannot have changed.
let old_balances = {
let inner = self.cache.lock().unwrap();
if inner.orders.block == block {
inner.balances.clone()
} else {
HashMap::new()
}
};
let (mut new_balances, missing_queries) = new_balances(&old_balances, &orders);
let missing_queries: Vec<_> = orders.iter().map(Query::from_order).collect();
let fetched_balances = self.balance_fetcher.get_balances(&missing_queries).await;
for (query, balance) in missing_queries.into_iter().zip(fetched_balances) {
let balance = match balance {
Ok(balance) => balance,
let balances = missing_queries
.into_iter()
.zip(fetched_balances)
.filter_map(|(query, balance)| match balance {
Ok(balance) => Some((query, balance)),
Err(err) => {
tracing::warn!(
owner = ?query.owner,
Expand All @@ -199,17 +188,16 @@ impl SolvableOrdersCache {
error = ?err,
"failed to get balance"
);
continue;
None
}
};
new_balances.insert(query, balance);
}
})
.collect::<HashMap<_, _>>();

let orders = orders_with_balance(orders, &new_balances, self.ethflow_contract_address);
let orders = orders_with_balance(orders, &balances, self.ethflow_contract_address);
let removed = counter.checkpoint("insufficient_balance", &orders);
order_events.extend(removed.into_iter().map(|o| (o, OrderEventLabel::Invalid)));

let orders = filter_dust_orders(orders, &new_balances, self.ethflow_contract_address);
let orders = filter_dust_orders(orders, &balances, self.ethflow_contract_address);
let removed = counter.checkpoint("dust_order", &orders);
order_events.extend(removed.into_iter().map(|o| (o, OrderEventLabel::Filtered)));

Expand Down Expand Up @@ -257,7 +245,6 @@ impl SolvableOrdersCache {
latest_settlement_block: db_solvable_orders.latest_settlement_block,
block,
},
balances: new_balances,
};

tracing::debug!(%block, ?id, "updated current auction cache");
Expand Down Expand Up @@ -360,25 +347,6 @@ async fn filter_invalid_signature_orders(
.collect()
}

/// Returns existing balances and Vec of queries that need to be performed.
fn new_balances(old_balances: &Balances, orders: &[Order]) -> (HashMap<Query, U256>, Vec<Query>) {
let mut new_balances = HashMap::new();
let mut missing_queries = HashSet::new();
for order in orders {
let query = Query::from_order(order);
match old_balances.get(&query) {
Some(balance) => {
new_balances.insert(query, *balance);
}
None => {
missing_queries.insert(query);
}
}
}
let missing_queries = Vec::from_iter(missing_queries);
(new_balances, missing_queries)
}

/// Removes orders that can't possibly be settled because there isn't enough
/// balance.
fn orders_with_balance(
Expand Down
2 changes: 1 addition & 1 deletion crates/shared/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct ApiMetrics {
impl ApiMetrics {
// Status codes we care about in our application. Populated with:
// `rg -oIN 'StatusCode::[A-Z_]+' | sort | uniq`.
const INITIAL_STATUSES: &[StatusCode] = &[
const INITIAL_STATUSES: &'static [StatusCode] = &[
StatusCode::OK,
StatusCode::CREATED,
StatusCode::BAD_REQUEST,
Expand Down

0 comments on commit def4f38

Please sign in to comment.