Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjustments to Token Info Fetcher #1931

Merged
merged 5 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ pub async fn run(args: Arguments) {
.expect("failed to create pool cache"),
);
let block_retriever = args.shared.current_block.retriever(web3.clone());
let token_info_fetcher = Arc::new(CachedTokenInfoFetcher::new(Box::new(TokenInfoFetcher {
let token_info_fetcher = Arc::new(CachedTokenInfoFetcher::new(Arc::new(TokenInfoFetcher {
web3: web3.clone(),
})));
let balancer_pool_fetcher = if baseline_sources.contains(&BaselineSource::BalancerV2) {
Expand Down
2 changes: 1 addition & 1 deletion crates/driver/src/boundary/liquidity/balancer/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async fn init_liquidity(
.flatten()
.collect(),
};
let token_info_fetcher = Arc::new(CachedTokenInfoFetcher::new(Box::new(TokenInfoFetcher {
let token_info_fetcher = Arc::new(CachedTokenInfoFetcher::new(Arc::new(TokenInfoFetcher {
web3: web3.clone(),
})));

Expand Down
2 changes: 1 addition & 1 deletion crates/orderbook/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ pub async fn run(args: Arguments) {
.expect("failed to create pool cache"),
);
let block_retriever = args.shared.current_block.retriever(web3.clone());
let token_info_fetcher = Arc::new(CachedTokenInfoFetcher::new(Box::new(TokenInfoFetcher {
let token_info_fetcher = Arc::new(CachedTokenInfoFetcher::new(Arc::new(TokenInfoFetcher {
web3: web3.clone(),
})));
let balancer_pool_fetcher = if baseline_sources.contains(&BaselineSource::BalancerV2) {
Expand Down
2 changes: 1 addition & 1 deletion crates/shared/src/sources/balancer_v2/pool_fetching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ mod tests {
.await
.unwrap();
let token_info_fetcher =
Arc::new(CachedTokenInfoFetcher::new(Box::new(TokenInfoFetcher {
Arc::new(CachedTokenInfoFetcher::new(Arc::new(TokenInfoFetcher {
web3: web3.clone(),
})));
let block_stream = ethrpc::current_block::current_block_stream(
Expand Down
280 changes: 168 additions & 112 deletions crates/shared/src/token_info.rs
Original file line number Diff line number Diff line change
@@ -1,173 +1,229 @@
use {
crate::ethrpc::Web3,
crate::{ethcontract_error::EthcontractErrorType, ethrpc::Web3},
anyhow::Result,
async_trait::async_trait,
contracts::ERC20,
ethcontract::{batch::CallBatch, H160},
std::{collections::HashMap, sync::Arc},
tokio::sync::Mutex,
ethcontract::{errors::MethodError, H160},
futures::{
future::{BoxFuture, Shared},
FutureExt,
},
model::order::BUY_ETH_ADDRESS,
std::{
collections::HashMap,
sync::{Arc, Mutex},
},
thiserror::Error,
};

const MAX_BATCH_SIZE: usize = 100;

#[cfg_attr(test, derive(Eq, PartialEq))]
#[derive(Clone, Debug, Default)]
pub struct TokenInfo {
pub decimals: Option<u8>,
pub symbol: Option<String>,
}

pub struct TokenInfoFetcher {
pub web3: Web3,
}
#[derive(Clone, Debug, Error)]
#[error("error fetching token info: {0}")]
pub struct Error(String);

#[mockall::automock]
#[async_trait]
pub trait TokenInfoFetching: Send + Sync {
/// Retrieves information for a token.
async fn get_token_info(&self, address: H160) -> Result<TokenInfo, Error>;

/// Retrieves all token information.
/// Default implementation calls get_token_info for each token and ignores
/// errors.
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo>;
}

pub struct TokenInfoFetcher {
pub web3: Web3,
}

impl TokenInfoFetcher {
async fn fetch_token(&self, address: H160) -> Result<TokenInfo, Error> {
if address == BUY_ETH_ADDRESS {
return Ok(TokenInfo {
decimals: Some(18),
symbol: Some("NATIVE_ASSET".to_string()),
});
}

let erc20 = ERC20::at(&self.web3, address);
let (decimals, symbol) = futures::join!(
erc20.methods().decimals().call(),
erc20.methods().symbol().call(),
);

Ok(TokenInfo {
decimals: classify_error(decimals)?,
symbol: classify_error(symbol)?,
})
}
}

fn classify_error<T>(result: Result<T, MethodError>) -> Result<Option<T>, Error> {
match result {
Ok(value) => Ok(Some(value)),
Err(err) => match EthcontractErrorType::classify(&err) {
EthcontractErrorType::Node => Err(Error(err.to_string())),
EthcontractErrorType::Contract => Ok(None),
},
}
}

#[async_trait]
impl TokenInfoFetching for TokenInfoFetcher {
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo> {
let mut batch = CallBatch::new(self.web3.transport());
let futures = addresses
.iter()
.map(|address| {
let erc20 = ERC20::at(&self.web3, *address);
(
erc20.methods().decimals().batch_call(&mut batch),
erc20.methods().symbol().batch_call(&mut batch),
)
})
.collect::<Vec<_>>();

batch.execute_all(MAX_BATCH_SIZE).await;
let mut resolved_futures = Vec::with_capacity(futures.len());
for (decimals, symbol) in futures {
resolved_futures.push((decimals.await, symbol.await));
async fn get_token_info(&self, address: H160) -> Result<TokenInfo, Error> {
let info = self.fetch_token(address).await;
if let Err(err) = &info {
tracing::debug!(?err, token = ?address, "failed to fetch token info");
}
addresses
.iter()
.zip(resolved_futures)
.map(|(address, (decimals, symbol))| {
if decimals.is_err() {
tracing::trace!("Failed to fetch token info for token {}", address);
}
(
*address,
TokenInfo {
decimals: decimals.ok(),
symbol: symbol.ok(),
},
)
})
.collect()

info
}

async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo> {
futures::future::join_all(addresses.iter().copied().map(|address| async move {
let info = self.fetch_token(address).await;
if let Err(err) = &info {
tracing::debug!(?err, token = ?address, "failed to fetch token info");
}

(address, info.unwrap_or_default())
}))
.await
.into_iter()
.collect()
}
}

type SharedTokenInfo = Shared<BoxFuture<'static, Result<TokenInfo, Error>>>;

pub struct CachedTokenInfoFetcher {
inner: Box<dyn TokenInfoFetching>,
cache: Arc<Mutex<HashMap<H160, TokenInfo>>>,
inner: Arc<dyn TokenInfoFetching>,
cache: Arc<Mutex<HashMap<H160, SharedTokenInfo>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have to keep an eye on memory consumption here.
Storing a whole future instead of a String and a u8 for A LOT of tokens might be a problem. Maybe it makes sense to have sth like:

enum CacheEntry {
    Resolved(TokenInfo),
    InFlight(SharedTokenInfoRequest)
}

No need to change in this PR, though.

Copy link
Contributor Author

@nlordell nlordell Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing a whole future instead of a String and a u8 for A LOT of tokens might be a problem

Its a boxed future though, so it should just be a ptr + vtable, which is as big as a TokenInfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, to my surprise, SharedTokenInfo is smaller than a TokenInfo (without counting the data that exists on the heap): https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=4d4fb54c802e8db652c6608eb9c83d35.

This is likely because of a combination of boxing and null-pointer optimization.

With that in mind, I imagine that this type is fairly space-optimized.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a boxed future though, so it should just be a ptr + vtable, which is as big as a TokenInfo.

Yeah. The handle to the future is very small but you'll still keep the actual allocation for the future around which I was mainly worried about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't though right?

Internally, Shared has an UnsafeCell to:

enum FutureOrOutput<Fut: Future> {
    Future(Fut),
    Output(Fut::Output),
}

Here, Fut = BoxFuture, so its 16 bytes on x86_64 (ptr + vtable). AFAIU, once the unsafe cell gets set to FutureOrOutput::Output, then the future will get dropped right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, true. I assumed the SharedFuture would basically keep the future alive the whole time and simply have a special case for returning a resolved value instead of swapping the future allocation for the resolved result. 👌

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example here:

use futures::FutureExt;
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

struct Foo;

impl Future for Foo {
    type Output = i32;
    
    fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
        Poll::Ready(42)
    }
}

impl Drop for Foo {
    fn drop(&mut self) {
        println!("Foo::drop");
    }
}

fn main() {
    let foo = Foo.boxed().shared();

    {
        println!("cloning handles");
        for i in 0..3 {
            let answer = futures::executor::block_on(foo.clone());
            println!("answer {i}: {answer}");
        }
    }
    
    // We now have a handle to `foo` which was resolved, if we await it, we
    // shouldn't drop the future again.
    println!("foo is resolved");
    let _ = futures::executor::block_on(foo);
}

Output:

cloning handles
Foo::drop
answer 0: 42
answer 1: 42
answer 2: 42
foo is resolved

The future gets dropped the first time it resolves, and only dropped once.

The only issue would be if there are "dangling" futures in the HashMap. This can be fixed by removing dangling Shared futures that aren't resolved. I also don't suspect this will happen in practice either (especially since follow up token info requests will drive the Shared future forward).

}

impl CachedTokenInfoFetcher {
pub fn new(inner: Box<dyn TokenInfoFetching>) -> Self {
pub fn new(inner: Arc<dyn TokenInfoFetching>) -> Self {
Self {
inner,
cache: Arc::new(Mutex::new(HashMap::new())),
}
}
}

impl CachedTokenInfoFetcher {
async fn fetch_token(&self, address: H160) -> Result<TokenInfo, Error> {
let fetch = {
let mut cache = self.cache.lock().unwrap();
cache
.entry(address)
.or_insert({
let inner = self.inner.clone();
async move { inner.get_token_info(address).await }
.boxed()
.shared()
})
.clone()
};

let info = fetch.await;
if info.is_err() {
let mut cache = self.cache.lock().unwrap();
if let Some(Err(_)) = cache.get(&address).and_then(|fetch| fetch.peek()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this condition checking? Can't we just uncoditionally remove the future in case of error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is checking that the shared future stored in the map is resolved to an error (with Shared::peek).

Imagine 3 concurrent requests Rn where:

  1. R1 starts a shared fetch of some token information
  2. R2 uses the same shared fetch
  3. The shared future resolves to an error, R1 removes the shared future from the cache
  4. R3 starts a new shared fetch of token information for the same address, the cache entry was removed in 3, so it creates a new one
  5. R2 gets scheduled and removes the new shared future that was created by R3.

This isn't extremely contrived, as there is an await point before clearing the cache, so if R2 doesn't get polled for some time, this would happen. That being said, I don't think this will happen regularly, and at worse we will be slightly sub-optimal and duplicate some queries.

Another approach would be to peek at the shared future when we get the first cache lock and replace it use the shared future in case it is an error. The downside being that we can hold some useless Error(string)s in the cache that won't ever get used.

cache.remove(&address);
}
}

info
}
}

#[async_trait]
impl TokenInfoFetching for CachedTokenInfoFetcher {
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo> {
let mut cache = self.cache.lock().await;

// Compute set of requested addresses that are not in cache.
let to_fetch: Vec<H160> = addresses
.iter()
.filter(|address| !cache.contains_key(address))
.cloned()
.collect();

// Fetch token infos not yet in cache.
if !to_fetch.is_empty() {
let fetched = self.inner.get_token_infos(to_fetch.as_slice()).await;

// Add valid token infos to cache.
cache.extend(
fetched
.into_iter()
.filter(|(_, token_info)| token_info.decimals.is_some()),
);
};
async fn get_token_info(&self, address: H160) -> Result<TokenInfo, Error> {
self.fetch_token(address).await
}

// Return token infos from the cache.
addresses
.iter()
.map(|address| {
if cache.contains_key(address) {
(*address, cache[address].clone())
} else {
(
*address,
TokenInfo {
decimals: None,
symbol: None,
},
)
}
})
.collect()
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo> {
futures::future::join_all(addresses.iter().copied().map(|address| async move {
(
address,
self.get_token_info(address).await.unwrap_or_default(),
)
}))
.await
.into_iter()
.collect()
}
}

#[cfg(test)]
mod tests {
use {super::*, maplit::hashmap};
use {super::*, maplit::hashmap, mockall::predicate::*};

#[tokio::test]
async fn cached_token_info_fetcher() {
let address0 = H160::zero();
let address1 = H160::from_low_u64_be(1);
let address = H160::from_low_u64_be;

let mut mock_token_info_fetcher = MockTokenInfoFetching::new();
mock_token_info_fetcher
.expect_get_token_infos()
.expect_get_token_info()
.with(eq(address(0)))
.times(1)
.return_once(move |_| {
hashmap! {
address0 => TokenInfo { decimals: Some(18), symbol: Some("CAT".to_string()) },
}
Ok(TokenInfo {
decimals: Some(18),
symbol: Some("CAT".to_string()),
})
});
mock_token_info_fetcher
.expect_get_token_infos()
.times(2)
.returning(|_| {
hashmap! {
H160::from_low_u64_be(1) => TokenInfo { decimals: None, symbol: None },
}
.expect_get_token_info()
.with(eq(address(1)))
.times(1)
.return_once(move |_| {
Ok(TokenInfo {
decimals: None,
symbol: None,
})
});
let cached_token_info_fetcher =
CachedTokenInfoFetcher::new(Box::new(mock_token_info_fetcher));

// Fetching a cached item should work.
let token_infos = cached_token_info_fetcher.get_token_infos(&[address0]).await;
assert!(token_infos.contains_key(&address0) && token_infos[&address0].decimals == Some(18));

// Should panic because of the times(1) constraint above, unless the cache is
// working as expected.
cached_token_info_fetcher.get_token_infos(&[address0]).await;

// Fetching an item that is unavailable should work.
let token_infos = cached_token_info_fetcher.get_token_infos(&[address1]).await;
assert!(token_infos.contains_key(&address1) && token_infos[&address1].decimals.is_none());
mock_token_info_fetcher
.expect_get_token_info()
.with(eq(address(2)))
.times(2)
.returning(|_| Err(Error("some error".to_string())));

// Should try to refetch the item thus satisfying the times(2) constraint above.
cached_token_info_fetcher.get_token_infos(&[address1]).await;
let cached_token_info_fetcher =
CachedTokenInfoFetcher::new(Arc::new(mock_token_info_fetcher));

// Fetches tokens, using `TokenInfo::default()` for the failed token.
let addresses = [address(0), address(1), address(2)];
let token_infos = cached_token_info_fetcher.get_token_infos(&addresses).await;
assert_eq!(
token_infos,
hashmap! {
address(0) => TokenInfo {
decimals: Some(18),
symbol: Some("CAT".to_string()),
},
address(1) => TokenInfo {
decimals: None,
symbol: None,
},
address(2) => TokenInfo::default(),
}
);

// Fetch again, if the the two token 0 and 1 are fetched again (i.e. the
// cache is not working) then this will panic because of the `times(1)`
// constraint on our mock fetcher. Note that token 2 gets fetched again
// because it failed to fetch the first time.
let cached_token_infos = cached_token_info_fetcher.get_token_infos(&addresses).await;
assert_eq!(token_infos, cached_token_infos);
}
}
2 changes: 1 addition & 1 deletion crates/solver/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub async fn run(args: Arguments) {
));

let block_retriever = args.shared.current_block.retriever(web3.clone());
let token_info_fetcher = Arc::new(CachedTokenInfoFetcher::new(Box::new(TokenInfoFetcher {
let token_info_fetcher = Arc::new(CachedTokenInfoFetcher::new(Arc::new(TokenInfoFetcher {
web3: web3.clone(),
})));
let gas_price_estimator = Arc::new(
Expand Down