Skip to content

Commit

Permalink
feat(balances): adding balance providers weights and retrying
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother committed Jan 2, 2025
1 parent 5c30faa commit e52e9f1
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 87 deletions.
9 changes: 1 addition & 8 deletions src/analytics/balance_lookup_info.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use {
parquet_derive::ParquetRecordWriter,
serde::Serialize,
std::{sync::Arc, time::Duration},
};
use {parquet_derive::ParquetRecordWriter, serde::Serialize, std::sync::Arc};

#[derive(Debug, Clone, Serialize, ParquetRecordWriter)]
#[serde(rename_all = "camelCase")]
pub struct BalanceLookupInfo {
pub timestamp: chrono::NaiveDateTime,
pub latency_secs: f64,

pub symbol: String,
pub implementation_chain_id: String,
Expand All @@ -29,7 +24,6 @@ pub struct BalanceLookupInfo {
impl BalanceLookupInfo {
#[allow(clippy::too_many_arguments)]
pub fn new(
latency: Duration,
symbol: String,
implementation_chain_id: String,
quantity: String,
Expand All @@ -45,7 +39,6 @@ impl BalanceLookupInfo {
) -> Self {
Self {
timestamp: wc::analytics::time::now(),
latency_secs: latency.as_secs_f64(),
symbol,
implementation_chain_id,
quantity,
Expand Down
12 changes: 9 additions & 3 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use {
project::{storage::Config as StorageConfig, Config as RegistryConfig},
providers::{ProviderKind, ProvidersConfig, Weight},
storage::irn::Config as IrnConfig,
utils::rate_limit::RateLimitingConfig,
utils::{crypto::CaipNamespaces, rate_limit::RateLimitingConfig},
},
serde::de::DeserializeOwned,
std::{collections::HashMap, fmt::Display},
};
pub use {
arbitrum::*, aurora::*, base::*, berachain::*, binance::*, getblock::*, infura::*, lava::*,
mantle::*, morph::*, near::*, pokt::*, publicnode::*, quicknode::*, server::*, unichain::*,
zksync::*, zora::*,
zerion::*, zksync::*, zora::*,
};
mod arbitrum;
mod aurora;
Expand All @@ -34,6 +34,7 @@ mod publicnode;
mod quicknode;
mod server;
mod unichain;
mod zerion;
mod zksync;
mod zora;

Expand Down Expand Up @@ -87,6 +88,11 @@ pub trait ProviderConfig {
fn provider_kind(&self) -> ProviderKind;
}

pub trait BalanceProviderConfig {
fn supported_namespaces(self) -> HashMap<CaipNamespaces, Weight>;
fn provider_kind(&self) -> ProviderKind;
}

#[cfg(test)]
#[cfg(not(feature = "test-mock-bundler"))] // These tests depend on environment variables
mod test {
Expand Down Expand Up @@ -277,7 +283,7 @@ mod test {
infura_project_id: "INFURA_PROJECT_ID".to_string(),
pokt_project_id: "POKT_PROJECT_ID".to_string(),
quicknode_api_tokens: "QUICKNODE_API_TOKENS".to_string(),
zerion_api_key: Some("ZERION_API_KEY".to_owned()),
zerion_api_key: "ZERION_API_KEY".to_owned(),
coinbase_api_key: Some("COINBASE_API_KEY".to_owned()),
coinbase_app_id: Some("COINBASE_APP_ID".to_owned()),
one_inch_api_key: Some("ONE_INCH_API_KEY".to_owned()),
Expand Down
40 changes: 40 additions & 0 deletions src/env/zerion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use {
super::BalanceProviderConfig,
crate::{
providers::{Priority, Weight},
utils::crypto::CaipNamespaces,
},
std::collections::HashMap,
};

#[derive(Debug)]
pub struct ZerionConfig {
pub api_key: String,
pub supported_namespaces: HashMap<CaipNamespaces, Weight>,
}

impl ZerionConfig {
pub fn new(api_key: String) -> Self {
Self {
api_key,
supported_namespaces: default_supported_namespaces(),
}
}
}

impl BalanceProviderConfig for ZerionConfig {
fn supported_namespaces(self) -> HashMap<CaipNamespaces, Weight> {
self.supported_namespaces
}

fn provider_kind(&self) -> crate::providers::ProviderKind {
crate::providers::ProviderKind::Zerion
}
}

fn default_supported_namespaces() -> HashMap<CaipNamespaces, Weight> {
HashMap::from([(
CaipNamespaces::Eip155,
Weight::new(Priority::Normal).unwrap(),
)])
}
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub enum RpcError {
#[error("Failed to reach the balance provider")]
BalanceProviderError,

#[error("Requested balance provider for the namespace is temporarily unavailable: {0}")]
BalanceTemporarilyUnavailable(String),

#[error("Failed to reach the fungible price provider: {0}")]
FungiblePriceProviderError(String),

Expand Down Expand Up @@ -282,6 +285,14 @@ impl IntoResponse for RpcError {
)),
)
.into_response(),
Self::BalanceTemporarilyUnavailable(namespace) => (
StatusCode::SERVICE_UNAVAILABLE,
Json(new_error_response(
"chainId".to_string(),
format!("Requested namespace {namespace} balance provider is temporarily unavailable"),
)),
)
.into_response(),
Self::InvalidChainIdFormat(chain_id) => (
StatusCode::BAD_REQUEST,
Json(new_error_response(
Expand Down
45 changes: 27 additions & 18 deletions src/handlers/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ use {
ethers::{abi::Address, types::H160},
hyper::HeaderMap,
serde::{Deserialize, Serialize},
std::{
net::SocketAddr,
sync::Arc,
time::{Duration, SystemTime},
},
std::{net::SocketAddr, sync::Arc},
tap::TapFallible,
tracing::log::{debug, error},
wc::future::FutureExt,
};

const PROVIDER_MAX_CALLS: usize = 2;

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BalanceQueryParams {
Expand Down Expand Up @@ -108,20 +106,32 @@ async fn handler_internal(
return Err(RpcError::InvalidAddress);
}

let provider = state
let providers = state
.providers
.balance_providers
.get(&namespace)
.ok_or_else(|| RpcError::UnsupportedNamespace(namespace))?;
.get_balance_provider_for_namespace(&namespace, PROVIDER_MAX_CALLS)?;

let start = SystemTime::now();
let mut response = provider
.get_balance(address.clone(), query.clone().0, state.metrics.clone())
.await
.tap_err(|e| {
error!("Failed to call balance with {}", e);
})?;
let latency = start.elapsed().unwrap_or(Duration::from_secs(0));
let mut balance_response = None;
for (_i, provider) in providers.iter().enumerate() {
let provider_response = provider
.get_balance(address.clone(), query.clone().0, state.metrics.clone())
.await
.tap_err(|e| {
error!("Failed to call balance with {}", e);
});

match provider_response {
Ok(response) => {
balance_response = Some(response);
break;
}
e => {
debug!("Balance provider returned an error {e:?}, trying the next provider");
}
};
}
let mut response = balance_response.ok_or(RpcError::BalanceTemporarilyUnavailable(
namespace.to_string(),
))?;

{
let origin = headers
Expand All @@ -137,7 +147,6 @@ async fn handler_internal(
.unwrap_or((None, None, None));
for balance in &response.balances {
state.analytics.balance_lookup(BalanceLookupInfo::new(
latency,
balance.symbol.clone(),
balance.chain_id.clone().unwrap_or_default(),
balance.quantity.numeric.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub async fn rpc_call(
Some(provider_id) => {
let provider = vec![state
.providers
.get_provider_by_provider_id(&provider_id)
.get_rpc_provider_by_provider_id(&provider_id)
.ok_or_else(|| RpcError::UnsupportedProvider(provider_id.clone()))?];

if let Some(ref testing_project_id) = state.config.server.testing_project_id {
Expand All @@ -94,7 +94,7 @@ pub async fn rpc_call(
}
None => state
.providers
.get_provider_for_chain_id(&chain_id, PROVIDER_PROXY_MAX_CALLS)?,
.get_rpc_provider_for_chain_id(&chain_id, PROVIDER_PROXY_MAX_CALLS)?,
};

for (i, provider) in providers.iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/supported_chains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ pub async fn handler(state: State<Arc<AppState>>) -> Result<Json<SupportedChains
async fn handler_internal(
State(state): State<Arc<AppState>>,
) -> Result<Json<SupportedChains>, RpcError> {
Ok(Json(state.providers.supported_chains.clone()))
Ok(Json(state.providers.rpc_supported_chains.clone()))
}
46 changes: 26 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use {
env::{
ArbitrumConfig, AuroraConfig, BaseConfig, BerachainConfig, BinanceConfig, GetBlockConfig,
InfuraConfig, LavaConfig, MantleConfig, MorphConfig, NearConfig, PoktConfig,
PublicnodeConfig, QuicknodeConfig, UnichainConfig, ZKSyncConfig, ZoraConfig,
PublicnodeConfig, QuicknodeConfig, UnichainConfig, ZKSyncConfig, ZerionConfig, ZoraConfig,
},
error::RpcResult,
http::Request,
Expand All @@ -29,7 +29,8 @@ use {
ArbitrumProvider, AuroraProvider, BaseProvider, BerachainProvider, BinanceProvider,
GetBlockProvider, InfuraProvider, InfuraWsProvider, LavaProvider, MantleProvider,
MorphProvider, NearProvider, PoktProvider, ProviderRepository, PublicnodeProvider,
QuicknodeProvider, UnichainProvider, ZKSyncProvider, ZoraProvider, ZoraWsProvider,
QuicknodeProvider, UnichainProvider, ZKSyncProvider, ZerionProvider, ZoraProvider,
ZoraWsProvider,
},
sqlx::postgres::PgPoolOptions,
std::{
Expand Down Expand Up @@ -466,32 +467,33 @@ fn init_providers(config: &ProvidersConfig) -> ProviderRepository {

// Keep in-sync with SUPPORTED_CHAINS.md

providers.add_provider::<AuroraProvider, AuroraConfig>(AuroraConfig::default());
providers.add_provider::<ArbitrumProvider, ArbitrumConfig>(ArbitrumConfig::default());
providers
.add_provider::<PoktProvider, PoktConfig>(PoktConfig::new(config.pokt_project_id.clone()));
providers.add_rpc_provider::<AuroraProvider, AuroraConfig>(AuroraConfig::default());
providers.add_rpc_provider::<ArbitrumProvider, ArbitrumConfig>(ArbitrumConfig::default());
providers.add_rpc_provider::<PoktProvider, PoktConfig>(PoktConfig::new(
config.pokt_project_id.clone(),
));

providers.add_provider::<BaseProvider, BaseConfig>(BaseConfig::default());
providers.add_provider::<BinanceProvider, BinanceConfig>(BinanceConfig::default());
providers.add_provider::<ZKSyncProvider, ZKSyncConfig>(ZKSyncConfig::default());
providers.add_provider::<PublicnodeProvider, PublicnodeConfig>(PublicnodeConfig::default());
providers.add_provider::<QuicknodeProvider, QuicknodeConfig>(QuicknodeConfig::new(
providers.add_rpc_provider::<BaseProvider, BaseConfig>(BaseConfig::default());
providers.add_rpc_provider::<BinanceProvider, BinanceConfig>(BinanceConfig::default());
providers.add_rpc_provider::<ZKSyncProvider, ZKSyncConfig>(ZKSyncConfig::default());
providers.add_rpc_provider::<PublicnodeProvider, PublicnodeConfig>(PublicnodeConfig::default());
providers.add_rpc_provider::<QuicknodeProvider, QuicknodeConfig>(QuicknodeConfig::new(
config.quicknode_api_tokens.clone(),
));
providers.add_provider::<InfuraProvider, InfuraConfig>(InfuraConfig::new(
providers.add_rpc_provider::<InfuraProvider, InfuraConfig>(InfuraConfig::new(
config.infura_project_id.clone(),
));
providers.add_provider::<ZoraProvider, ZoraConfig>(ZoraConfig::default());
providers.add_provider::<NearProvider, NearConfig>(NearConfig::default());
providers.add_provider::<MantleProvider, MantleConfig>(MantleConfig::default());
providers.add_provider::<BerachainProvider, BerachainConfig>(BerachainConfig::default());
providers.add_provider::<UnichainProvider, UnichainConfig>(UnichainConfig::default());
providers.add_rpc_provider::<ZoraProvider, ZoraConfig>(ZoraConfig::default());
providers.add_rpc_provider::<NearProvider, NearConfig>(NearConfig::default());
providers.add_rpc_provider::<MantleProvider, MantleConfig>(MantleConfig::default());
providers.add_rpc_provider::<BerachainProvider, BerachainConfig>(BerachainConfig::default());
providers.add_rpc_provider::<UnichainProvider, UnichainConfig>(UnichainConfig::default());
providers
.add_provider::<LavaProvider, LavaConfig>(LavaConfig::new(config.lava_api_key.clone()));
providers.add_provider::<MorphProvider, MorphConfig>(MorphConfig::default());
.add_rpc_provider::<LavaProvider, LavaConfig>(LavaConfig::new(config.lava_api_key.clone()));
providers.add_rpc_provider::<MorphProvider, MorphConfig>(MorphConfig::default());

if let Some(getblock_access_tokens) = &config.getblock_access_tokens {
providers.add_provider::<GetBlockProvider, GetBlockConfig>(GetBlockConfig::new(
providers.add_rpc_provider::<GetBlockProvider, GetBlockConfig>(GetBlockConfig::new(
getblock_access_tokens.clone(),
));
};
Expand All @@ -501,6 +503,10 @@ fn init_providers(config: &ProvidersConfig) -> ProviderRepository {
));
providers.add_ws_provider::<ZoraWsProvider, ZoraConfig>(ZoraConfig::default());

providers.add_balance_provider::<ZerionProvider, ZerionConfig>(ZerionConfig::new(

Check failure on line 506 in src/lib.rs

View workflow job for this annotation

GitHub Actions / CI / Check App / Unit Tests

the trait bound `ZerionProvider: BalanceProviderFactory<env::zerion::ZerionConfig>` is not satisfied

Check failure on line 506 in src/lib.rs

View workflow job for this annotation

GitHub Actions / CI / Check App / Clippy

the trait bound `providers::zerion::ZerionProvider: providers::BalanceProviderFactory<env::zerion::ZerionConfig>` is not satisfied
config.zerion_api_key.clone(),
));

providers
}

Expand Down
Loading

0 comments on commit e52e9f1

Please sign in to comment.