-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adjustments to Token Info Fetcher #1931
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,173 +1,229 @@ | ||
use { | ||
crate::ethrpc::Web3, | ||
crate::{ethcontract_error::EthcontractErrorType, ethrpc::Web3}, | ||
anyhow::Result, | ||
async_trait::async_trait, | ||
contracts::ERC20, | ||
ethcontract::{batch::CallBatch, H160}, | ||
std::{collections::HashMap, sync::Arc}, | ||
tokio::sync::Mutex, | ||
ethcontract::{errors::MethodError, H160}, | ||
futures::{ | ||
future::{BoxFuture, Shared}, | ||
FutureExt, | ||
}, | ||
model::order::BUY_ETH_ADDRESS, | ||
std::{ | ||
collections::HashMap, | ||
sync::{Arc, Mutex}, | ||
}, | ||
thiserror::Error, | ||
}; | ||
|
||
const MAX_BATCH_SIZE: usize = 100; | ||
|
||
#[cfg_attr(test, derive(Eq, PartialEq))] | ||
#[derive(Clone, Debug, Default)] | ||
pub struct TokenInfo { | ||
pub decimals: Option<u8>, | ||
pub symbol: Option<String>, | ||
} | ||
|
||
pub struct TokenInfoFetcher { | ||
pub web3: Web3, | ||
} | ||
#[derive(Clone, Debug, Error)] | ||
#[error("error fetching token info: {0}")] | ||
pub struct Error(String); | ||
|
||
#[mockall::automock] | ||
#[async_trait] | ||
pub trait TokenInfoFetching: Send + Sync { | ||
/// Retrieves information for a token. | ||
async fn get_token_info(&self, address: H160) -> Result<TokenInfo, Error>; | ||
|
||
/// Retrieves all token information. | ||
/// Default implementation calls get_token_info for each token and ignores | ||
/// errors. | ||
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo>; | ||
} | ||
|
||
pub struct TokenInfoFetcher { | ||
pub web3: Web3, | ||
} | ||
|
||
impl TokenInfoFetcher { | ||
async fn fetch_token(&self, address: H160) -> Result<TokenInfo, Error> { | ||
if address == BUY_ETH_ADDRESS { | ||
return Ok(TokenInfo { | ||
decimals: Some(18), | ||
symbol: Some("NATIVE_ASSET".to_string()), | ||
}); | ||
} | ||
|
||
let erc20 = ERC20::at(&self.web3, address); | ||
let (decimals, symbol) = futures::join!( | ||
erc20.methods().decimals().call(), | ||
erc20.methods().symbol().call(), | ||
); | ||
|
||
Ok(TokenInfo { | ||
decimals: classify_error(decimals)?, | ||
symbol: classify_error(symbol)?, | ||
}) | ||
} | ||
} | ||
|
||
fn classify_error<T>(result: Result<T, MethodError>) -> Result<Option<T>, Error> { | ||
match result { | ||
Ok(value) => Ok(Some(value)), | ||
Err(err) => match EthcontractErrorType::classify(&err) { | ||
EthcontractErrorType::Node => Err(Error(err.to_string())), | ||
EthcontractErrorType::Contract => Ok(None), | ||
}, | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl TokenInfoFetching for TokenInfoFetcher { | ||
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo> { | ||
let mut batch = CallBatch::new(self.web3.transport()); | ||
let futures = addresses | ||
.iter() | ||
.map(|address| { | ||
let erc20 = ERC20::at(&self.web3, *address); | ||
( | ||
erc20.methods().decimals().batch_call(&mut batch), | ||
erc20.methods().symbol().batch_call(&mut batch), | ||
) | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
batch.execute_all(MAX_BATCH_SIZE).await; | ||
let mut resolved_futures = Vec::with_capacity(futures.len()); | ||
for (decimals, symbol) in futures { | ||
resolved_futures.push((decimals.await, symbol.await)); | ||
async fn get_token_info(&self, address: H160) -> Result<TokenInfo, Error> { | ||
let info = self.fetch_token(address).await; | ||
if let Err(err) = &info { | ||
tracing::debug!(?err, token = ?address, "failed to fetch token info"); | ||
} | ||
addresses | ||
.iter() | ||
.zip(resolved_futures) | ||
.map(|(address, (decimals, symbol))| { | ||
if decimals.is_err() { | ||
tracing::trace!("Failed to fetch token info for token {}", address); | ||
} | ||
( | ||
*address, | ||
TokenInfo { | ||
decimals: decimals.ok(), | ||
symbol: symbol.ok(), | ||
}, | ||
) | ||
}) | ||
.collect() | ||
|
||
info | ||
} | ||
|
||
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo> { | ||
futures::future::join_all(addresses.iter().copied().map(|address| async move { | ||
let info = self.fetch_token(address).await; | ||
if let Err(err) = &info { | ||
tracing::debug!(?err, token = ?address, "failed to fetch token info"); | ||
} | ||
|
||
(address, info.unwrap_or_default()) | ||
})) | ||
.await | ||
.into_iter() | ||
.collect() | ||
} | ||
} | ||
|
||
type SharedTokenInfo = Shared<BoxFuture<'static, Result<TokenInfo, Error>>>; | ||
|
||
pub struct CachedTokenInfoFetcher { | ||
inner: Box<dyn TokenInfoFetching>, | ||
cache: Arc<Mutex<HashMap<H160, TokenInfo>>>, | ||
inner: Arc<dyn TokenInfoFetching>, | ||
cache: Arc<Mutex<HashMap<H160, SharedTokenInfo>>>, | ||
} | ||
|
||
impl CachedTokenInfoFetcher { | ||
pub fn new(inner: Box<dyn TokenInfoFetching>) -> Self { | ||
pub fn new(inner: Arc<dyn TokenInfoFetching>) -> Self { | ||
Self { | ||
inner, | ||
cache: Arc::new(Mutex::new(HashMap::new())), | ||
} | ||
} | ||
} | ||
|
||
impl CachedTokenInfoFetcher { | ||
async fn fetch_token(&self, address: H160) -> Result<TokenInfo, Error> { | ||
let fetch = { | ||
let mut cache = self.cache.lock().unwrap(); | ||
cache | ||
.entry(address) | ||
.or_insert({ | ||
let inner = self.inner.clone(); | ||
async move { inner.get_token_info(address).await } | ||
.boxed() | ||
.shared() | ||
}) | ||
.clone() | ||
}; | ||
|
||
let info = fetch.await; | ||
if info.is_err() { | ||
let mut cache = self.cache.lock().unwrap(); | ||
if let Some(Err(_)) = cache.get(&address).and_then(|fetch| fetch.peek()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this condition checking? Can't we just uncoditionally remove the future in case of error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is checking that the shared future stored in the map is resolved to an error (with Imagine 3 concurrent requests
This isn't extremely contrived, as there is an Another approach would be to |
||
cache.remove(&address); | ||
} | ||
} | ||
|
||
info | ||
} | ||
} | ||
|
||
#[async_trait] | ||
impl TokenInfoFetching for CachedTokenInfoFetcher { | ||
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo> { | ||
let mut cache = self.cache.lock().await; | ||
|
||
// Compute set of requested addresses that are not in cache. | ||
let to_fetch: Vec<H160> = addresses | ||
.iter() | ||
.filter(|address| !cache.contains_key(address)) | ||
.cloned() | ||
.collect(); | ||
|
||
// Fetch token infos not yet in cache. | ||
if !to_fetch.is_empty() { | ||
let fetched = self.inner.get_token_infos(to_fetch.as_slice()).await; | ||
|
||
// Add valid token infos to cache. | ||
cache.extend( | ||
fetched | ||
.into_iter() | ||
.filter(|(_, token_info)| token_info.decimals.is_some()), | ||
); | ||
}; | ||
async fn get_token_info(&self, address: H160) -> Result<TokenInfo, Error> { | ||
self.fetch_token(address).await | ||
} | ||
|
||
// Return token infos from the cache. | ||
addresses | ||
.iter() | ||
.map(|address| { | ||
if cache.contains_key(address) { | ||
(*address, cache[address].clone()) | ||
} else { | ||
( | ||
*address, | ||
TokenInfo { | ||
decimals: None, | ||
symbol: None, | ||
}, | ||
) | ||
} | ||
}) | ||
.collect() | ||
async fn get_token_infos(&self, addresses: &[H160]) -> HashMap<H160, TokenInfo> { | ||
futures::future::join_all(addresses.iter().copied().map(|address| async move { | ||
( | ||
address, | ||
self.get_token_info(address).await.unwrap_or_default(), | ||
) | ||
})) | ||
.await | ||
.into_iter() | ||
.collect() | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use {super::*, maplit::hashmap}; | ||
use {super::*, maplit::hashmap, mockall::predicate::*}; | ||
|
||
#[tokio::test] | ||
async fn cached_token_info_fetcher() { | ||
let address0 = H160::zero(); | ||
let address1 = H160::from_low_u64_be(1); | ||
let address = H160::from_low_u64_be; | ||
|
||
let mut mock_token_info_fetcher = MockTokenInfoFetching::new(); | ||
mock_token_info_fetcher | ||
.expect_get_token_infos() | ||
.expect_get_token_info() | ||
.with(eq(address(0))) | ||
.times(1) | ||
.return_once(move |_| { | ||
hashmap! { | ||
address0 => TokenInfo { decimals: Some(18), symbol: Some("CAT".to_string()) }, | ||
} | ||
Ok(TokenInfo { | ||
decimals: Some(18), | ||
symbol: Some("CAT".to_string()), | ||
}) | ||
}); | ||
mock_token_info_fetcher | ||
.expect_get_token_infos() | ||
.times(2) | ||
.returning(|_| { | ||
hashmap! { | ||
H160::from_low_u64_be(1) => TokenInfo { decimals: None, symbol: None }, | ||
} | ||
.expect_get_token_info() | ||
.with(eq(address(1))) | ||
.times(1) | ||
.return_once(move |_| { | ||
Ok(TokenInfo { | ||
decimals: None, | ||
symbol: None, | ||
}) | ||
}); | ||
let cached_token_info_fetcher = | ||
CachedTokenInfoFetcher::new(Box::new(mock_token_info_fetcher)); | ||
|
||
// Fetching a cached item should work. | ||
let token_infos = cached_token_info_fetcher.get_token_infos(&[address0]).await; | ||
assert!(token_infos.contains_key(&address0) && token_infos[&address0].decimals == Some(18)); | ||
|
||
// Should panic because of the times(1) constraint above, unless the cache is | ||
// working as expected. | ||
cached_token_info_fetcher.get_token_infos(&[address0]).await; | ||
|
||
// Fetching an item that is unavailable should work. | ||
let token_infos = cached_token_info_fetcher.get_token_infos(&[address1]).await; | ||
assert!(token_infos.contains_key(&address1) && token_infos[&address1].decimals.is_none()); | ||
mock_token_info_fetcher | ||
.expect_get_token_info() | ||
.with(eq(address(2))) | ||
.times(2) | ||
.returning(|_| Err(Error("some error".to_string()))); | ||
|
||
// Should try to refetch the item thus satisfying the times(2) constraint above. | ||
cached_token_info_fetcher.get_token_infos(&[address1]).await; | ||
let cached_token_info_fetcher = | ||
CachedTokenInfoFetcher::new(Arc::new(mock_token_info_fetcher)); | ||
|
||
// Fetches tokens, using `TokenInfo::default()` for the failed token. | ||
let addresses = [address(0), address(1), address(2)]; | ||
let token_infos = cached_token_info_fetcher.get_token_infos(&addresses).await; | ||
assert_eq!( | ||
token_infos, | ||
hashmap! { | ||
address(0) => TokenInfo { | ||
decimals: Some(18), | ||
symbol: Some("CAT".to_string()), | ||
}, | ||
address(1) => TokenInfo { | ||
decimals: None, | ||
symbol: None, | ||
}, | ||
address(2) => TokenInfo::default(), | ||
} | ||
); | ||
|
||
// Fetch again, if the the two token 0 and 1 are fetched again (i.e. the | ||
// cache is not working) then this will panic because of the `times(1)` | ||
// constraint on our mock fetcher. Note that token 2 gets fetched again | ||
// because it failed to fetch the first time. | ||
let cached_token_infos = cached_token_info_fetcher.get_token_infos(&addresses).await; | ||
assert_eq!(token_infos, cached_token_infos); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might have to keep an eye on memory consumption here.
Storing a whole future instead of a
String
and au8
for A LOT of tokens might be a problem. Maybe it makes sense to have sth like:No need to change in this PR, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its a boxed future though, so it should just be a ptr + vtable, which is as big as a
TokenInfo
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, to my surprise,
SharedTokenInfo
is smaller than aTokenInfo
(without counting the data that exists on the heap): https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=4d4fb54c802e8db652c6608eb9c83d35.This is likely because of a combination of boxing and null-pointer optimization.
With that in mind, I imagine that this type is fairly space-optimized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. The handle to the future is very small but you'll still keep the actual allocation for the future around which I was mainly worried about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't though right?
Internally,
Shared
has anUnsafeCell
to:Here,
Fut = BoxFuture
, so its 16 bytes on x86_64 (ptr + vtable). AFAIU, once the unsafe cell gets set toFutureOrOutput::Output
, then the future will get dropped right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, true. I assumed the
SharedFuture
would basically keep the future alive the whole time and simply have a special case for returning a resolved value instead of swapping the future allocation for the resolved result. 👌There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example here:
Output:
The future gets dropped the first time it resolves, and only dropped once.
The only issue would be if there are "dangling" futures in the
HashMap
. This can be fixed by removing danglingShared
futures that aren't resolved. I also don't suspect this will happen in practice either (especially since follow up token info requests will drive theShared
future forward).