Skip to content

Commit

Permalink
Speed up token meta data look up (#2134)
Browse files Browse the repository at this point in the history
# Description
We've noticed weird delays between the autopilot sending an auction and
the driver being ready to start working on it.
The linked issue suggests that we spend a lot of time converting the
auction and enriching it with token meta data.
This should be super fast because all the data should be cached at all
times.
However, was one token that caused it all to break down: `0xeeeee...`
(the address we use to denote the chains native token)

This is roughly what happened:
1. every driver receives auction
2. every driver collects all the addresses for which we don't have
cached metadata
3. this always yielded `0xeeee...` (it's not a real token so we never
get data when calling ERC20 functions on it)
4. every driver wasted some time sending RPC requests for `0xeeee`
5. every driver tried to take an exclusive lock to write no data (empty
vec) to the cache

Overall this wasted time with unnecessary network requests and serially
taking an exclusive lock for all drivers.

# Changes
* most importantly filter out `0xeee`
* avoid sending duplicate RPC requests by using `RequestSharing`
* don't take an exclusive lock if some other driver cached the same data
in the meantime
* moved logging of token balance update task out of the critical section

All changes combined results in us now spending a couple microseconds on
getting cached balances instead of seconds.
And because this was the bulk of the work captured by the log `auction
task execution time` we reduced that time to ~11ms.

## How to test
e2e tests should still pass and manual test confirmed the reduced
latency.

Fixes #2133
  • Loading branch information
MartinquaXD authored Dec 7, 2023
1 parent 07a966e commit bb5a241
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 46 deletions.
2 changes: 1 addition & 1 deletion crates/driver/src/infra/api/routes/solve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn route(
.tap_err(|err| {
observe::invalid_dto(err, "auction");
})?;
tracing::debug!(elapsed=?start.elapsed(), auction_id=%auction_id, "auction task execution time");
tracing::debug!(elapsed = ?start.elapsed(), "auction task execution time");
let auction = state.pre_processor().prioritize(auction).await;
let competition = state.competition();
let result = competition.solve(&auction).await;
Expand Down
132 changes: 87 additions & 45 deletions crates/driver/src/infra/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ use {
domain::eth,
infra::{blockchain, Ethereum},
},
anyhow::Result,
ethrpc::current_block::{self, CurrentBlockStream},
futures::StreamExt,
futures::{FutureExt, StreamExt},
itertools::Itertools,
model::order::BUY_ETH_ADDRESS,
shared::request_sharing::BoxRequestSharing,
std::{
collections::{HashMap, HashSet},
collections::HashMap,
sync::{Arc, RwLock},
},
tracing::Instrument,
Expand All @@ -29,6 +33,7 @@ impl Fetcher {
let inner = Arc::new(Inner {
eth,
cache: RwLock::new(HashMap::new()),
requests: BoxRequestSharing::labelled("token_info".into()),
});
tokio::task::spawn(
update_task(block_stream, Arc::downgrade(&inner))
Expand Down Expand Up @@ -70,10 +75,6 @@ async fn update_balances(inner: Arc<Inner>) -> Result<(), blockchain::Error> {
let futures = {
let cache = inner.cache.read().unwrap();
let tokens = cache.keys().cloned().collect::<Vec<_>>();
tracing::debug!(
tokens = tokens.len(),
"updating settlement contract balances"
);
tokens.into_iter().map(|token| {
let erc20 = inner.eth.erc20(token);
async move {
Expand All @@ -85,6 +86,11 @@ async fn update_balances(inner: Arc<Inner>) -> Result<(), blockchain::Error> {
})
};

tracing::debug!(
tokens = futures.len(),
"updating settlement contract balances"
);

// Don't hold on to the lock while fetching balances to allow concurrent
// updates. This may lead to new entries arriving in the meantime, however
// their balances should already be up-to-date.
Expand All @@ -93,75 +99,111 @@ async fn update_balances(inner: Arc<Inner>) -> Result<(), blockchain::Error> {
.into_iter()
.collect::<HashMap<_, _>>();

let mut cache = inner.cache.write().unwrap();
for (key, entry) in cache.iter_mut() {
if let Some(balance) = balances.remove(key) {
entry.balance = balance;
} else {
tracing::info!(?key, "key without balance update");
let mut keys_without_balances = vec![];
{
let mut cache = inner.cache.write().unwrap();
for (key, entry) in cache.iter_mut() {
if let Some(balance) = balances.remove(key) {
entry.balance = balance;
} else {
// Avoid logging while holding the exclusive lock.
keys_without_balances.push(*key);
}
}
}
if !keys_without_balances.is_empty() {
tracing::info!(keys = ?keys_without_balances, "updated keys without balance");
}

Ok(())
}

/// Provides metadata of tokens.
struct Inner {
eth: Ethereum,
cache: RwLock<HashMap<eth::TokenAddress, Metadata>>,
requests: BoxRequestSharing<eth::TokenAddress, Option<(eth::TokenAddress, Metadata)>>,
}

impl Inner {
/// Fetches `Metadata` of the requested tokens from a node.
async fn fetch_token_infos(
&self,
tokens: HashSet<eth::TokenAddress>,
) -> Vec<Result<(eth::TokenAddress, Metadata), blockchain::Error>> {
tokens: &[eth::TokenAddress],
) -> Vec<Option<(eth::TokenAddress, Metadata)>> {
let settlement = self.eth.contracts().settlement().address().into();
let futures = tokens.into_iter().map(|token| async move {
let token = self.eth.erc20(token);
// Use `try_join` because these calls get batched under the hood
// so if one of them fails the others will as well.
// Also this way we won't get incomplete data for a token.
let (decimals, symbol, balance) = futures::future::try_join3(
token.decimals(),
token.symbol(),
token.balance(settlement),
)
.await?;
Ok((
token.address(),
Metadata {
decimals,
symbol,
balance,
},
))
let futures = tokens.iter().map(|token| {
let build_request = |token: &eth::TokenAddress| {
let token = self.eth.erc20(*token);
async move {
// Use `try_join` because these calls get batched under the hood
// so if one of them fails the others will as well.
// Also this way we won't get incomplete data for a token.
let (decimals, symbol, balance) = futures::future::try_join3(
token.decimals(),
token.symbol(),
token.balance(settlement),
)
.await
.ok()?;

Some((
token.address(),
Metadata {
decimals,
symbol,
balance,
},
))
}
.boxed()
};

self.requests.shared_or_else(*token, build_request)
});
futures::future::join_all(futures).await
}

/// Ensures that all the missing tokens are in the cache afterwards while
/// taking into account that the function might be called multiple times
/// for the same tokens.
async fn cache_missing_tokens(&self, tokens: &[eth::TokenAddress]) {
if tokens.is_empty() {
return;
}

let fetched = self.fetch_token_infos(tokens).await;
{
let cache = self.cache.read().unwrap();
if tokens.iter().all(|token| cache.contains_key(token)) {
// Often multiple callers are racing to fetch the same Metadata.
// If somebody else already cached the data we don't want to take an
// exclusive lock for nothing.
return;
}
}
self.cache
.write()
.unwrap()
.extend(fetched.into_iter().flatten());
}

async fn get(&self, addresses: &[eth::TokenAddress]) -> HashMap<eth::TokenAddress, Metadata> {
let to_fetch: HashSet<_> = {
let to_fetch: Vec<_> = {
let cache = self.cache.read().unwrap();

// Compute set of requested addresses that are not in cache.
addresses
.iter()
.filter(|address| !cache.contains_key(*address))
// BUY_ETH_ADDRESS is just a marker and not a real address. We'll never be able to
// fetch data for it so ignore it to avoid taking exclusive locks all the time.
.filter(|address| !cache.contains_key(*address) && address.0.0 != BUY_ETH_ADDRESS)
.cloned()
.unique()
.collect()
};

// Fetch token infos not yet in cache.
if !to_fetch.is_empty() {
let fetched = self.fetch_token_infos(to_fetch).await;

// Add valid token infos to cache.
self.cache
.write()
.unwrap()
.extend(fetched.into_iter().flatten());
};
self.cache_missing_tokens(&to_fetch).await;

let cache = self.cache.read().unwrap();
// Return token infos from the cache.
Expand Down

0 comments on commit bb5a241

Please sign in to comment.