Skip to content

Commit

Permalink
feat(balances): adding balance providers weights and retrying
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother committed Jan 3, 2025
1 parent 5c30faa commit 27c0819
Show file tree
Hide file tree
Showing 13 changed files with 357 additions and 92 deletions.
9 changes: 1 addition & 8 deletions src/analytics/balance_lookup_info.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use {
parquet_derive::ParquetRecordWriter,
serde::Serialize,
std::{sync::Arc, time::Duration},
};
use {parquet_derive::ParquetRecordWriter, serde::Serialize, std::sync::Arc};

#[derive(Debug, Clone, Serialize, ParquetRecordWriter)]
#[serde(rename_all = "camelCase")]
pub struct BalanceLookupInfo {
pub timestamp: chrono::NaiveDateTime,
pub latency_secs: f64,

pub symbol: String,
pub implementation_chain_id: String,
Expand All @@ -29,7 +24,6 @@ pub struct BalanceLookupInfo {
impl BalanceLookupInfo {
#[allow(clippy::too_many_arguments)]
pub fn new(
latency: Duration,
symbol: String,
implementation_chain_id: String,
quantity: String,
Expand All @@ -45,7 +39,6 @@ impl BalanceLookupInfo {
) -> Self {
Self {
timestamp: wc::analytics::time::now(),
latency_secs: latency.as_secs_f64(),
symbol,
implementation_chain_id,
quantity,
Expand Down
15 changes: 11 additions & 4 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use {
project::{storage::Config as StorageConfig, Config as RegistryConfig},
providers::{ProviderKind, ProvidersConfig, Weight},
storage::irn::Config as IrnConfig,
utils::rate_limit::RateLimitingConfig,
utils::{crypto::CaipNamespaces, rate_limit::RateLimitingConfig},
},
serde::de::DeserializeOwned,
std::{collections::HashMap, fmt::Display},
};
pub use {
arbitrum::*, aurora::*, base::*, berachain::*, binance::*, getblock::*, infura::*, lava::*,
mantle::*, morph::*, near::*, pokt::*, publicnode::*, quicknode::*, server::*, unichain::*,
zksync::*, zora::*,
mantle::*, morph::*, near::*, pokt::*, publicnode::*, quicknode::*, server::*, solscan::*,
unichain::*, zerion::*, zksync::*, zora::*,
};
mod arbitrum;
mod aurora;
Expand All @@ -33,7 +33,9 @@ mod pokt;
mod publicnode;
mod quicknode;
mod server;
pub mod solscan;
mod unichain;
pub mod zerion;
mod zksync;
mod zora;

Expand Down Expand Up @@ -87,6 +89,11 @@ pub trait ProviderConfig {
fn provider_kind(&self) -> ProviderKind;
}

pub trait BalanceProviderConfig {
fn supported_namespaces(self) -> HashMap<CaipNamespaces, Weight>;
fn provider_kind(&self) -> ProviderKind;
}

#[cfg(test)]
#[cfg(not(feature = "test-mock-bundler"))] // These tests depend on environment variables
mod test {
Expand Down Expand Up @@ -277,7 +284,7 @@ mod test {
infura_project_id: "INFURA_PROJECT_ID".to_string(),
pokt_project_id: "POKT_PROJECT_ID".to_string(),
quicknode_api_tokens: "QUICKNODE_API_TOKENS".to_string(),
zerion_api_key: Some("ZERION_API_KEY".to_owned()),
zerion_api_key: "ZERION_API_KEY".to_owned(),
coinbase_api_key: Some("COINBASE_API_KEY".to_owned()),
coinbase_app_id: Some("COINBASE_APP_ID".to_owned()),
one_inch_api_key: Some("ONE_INCH_API_KEY".to_owned()),
Expand Down
39 changes: 39 additions & 0 deletions src/env/solscan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use {
super::BalanceProviderConfig,
crate::{
providers::{Priority, Weight},
utils::crypto::CaipNamespaces,
},
std::collections::HashMap,
};

pub struct SolScanConfig {
pub api_key: String,
pub supported_namespaces: HashMap<CaipNamespaces, Weight>,
}

impl SolScanConfig {
pub fn new(api_key: String) -> Self {
Self {
api_key,
supported_namespaces: default_supported_namespaces(),
}
}
}

impl BalanceProviderConfig for SolScanConfig {
fn supported_namespaces(self) -> HashMap<CaipNamespaces, Weight> {
self.supported_namespaces
}

fn provider_kind(&self) -> crate::providers::ProviderKind {
crate::providers::ProviderKind::SolScan
}
}

fn default_supported_namespaces() -> HashMap<CaipNamespaces, Weight> {
HashMap::from([(
CaipNamespaces::Solana,
Weight::new(Priority::Normal).unwrap(),
)])
}
40 changes: 40 additions & 0 deletions src/env/zerion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use {
super::BalanceProviderConfig,
crate::{
providers::{Priority, Weight},
utils::crypto::CaipNamespaces,
},
std::collections::HashMap,
};

#[derive(Debug)]
pub struct ZerionConfig {
pub api_key: String,
pub supported_namespaces: HashMap<CaipNamespaces, Weight>,
}

impl ZerionConfig {
pub fn new(api_key: String) -> Self {
Self {
api_key,
supported_namespaces: default_supported_namespaces(),
}
}
}

impl BalanceProviderConfig for ZerionConfig {
fn supported_namespaces(self) -> HashMap<CaipNamespaces, Weight> {
self.supported_namespaces
}

fn provider_kind(&self) -> crate::providers::ProviderKind {
crate::providers::ProviderKind::Zerion
}
}

fn default_supported_namespaces() -> HashMap<CaipNamespaces, Weight> {
HashMap::from([(
CaipNamespaces::Eip155,
Weight::new(Priority::Normal).unwrap(),
)])
}
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub enum RpcError {
#[error("Failed to reach the balance provider")]
BalanceProviderError,

#[error("Requested balance provider for the namespace is temporarily unavailable: {0}")]
BalanceTemporarilyUnavailable(String),

#[error("Failed to reach the fungible price provider: {0}")]
FungiblePriceProviderError(String),

Expand Down Expand Up @@ -282,6 +285,14 @@ impl IntoResponse for RpcError {
)),
)
.into_response(),
Self::BalanceTemporarilyUnavailable(namespace) => (
StatusCode::SERVICE_UNAVAILABLE,
Json(new_error_response(
"chainId".to_string(),
format!("Requested namespace {namespace} balance provider is temporarily unavailable"),
)),
)
.into_response(),
Self::InvalidChainIdFormat(chain_id) => (
StatusCode::BAD_REQUEST,
Json(new_error_response(
Expand Down
45 changes: 27 additions & 18 deletions src/handlers/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ use {
ethers::{abi::Address, types::H160},
hyper::HeaderMap,
serde::{Deserialize, Serialize},
std::{
net::SocketAddr,
sync::Arc,
time::{Duration, SystemTime},
},
std::{net::SocketAddr, sync::Arc},
tap::TapFallible,
tracing::log::{debug, error},
wc::future::FutureExt,
};

const PROVIDER_MAX_CALLS: usize = 2;

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BalanceQueryParams {
Expand Down Expand Up @@ -108,20 +106,32 @@ async fn handler_internal(
return Err(RpcError::InvalidAddress);
}

let provider = state
let providers = state
.providers
.balance_providers
.get(&namespace)
.ok_or_else(|| RpcError::UnsupportedNamespace(namespace))?;
.get_balance_provider_for_namespace(&namespace, PROVIDER_MAX_CALLS)?;

let start = SystemTime::now();
let mut response = provider
.get_balance(address.clone(), query.clone().0, state.metrics.clone())
.await
.tap_err(|e| {
error!("Failed to call balance with {}", e);
})?;
let latency = start.elapsed().unwrap_or(Duration::from_secs(0));
let mut balance_response = None;
for provider in providers.iter() {
let provider_response = provider
.get_balance(address.clone(), query.clone().0, state.metrics.clone())
.await
.tap_err(|e| {
error!("Failed to call balance with {}", e);
});

match provider_response {
Ok(response) => {
balance_response = Some(response);
break;
}
e => {
debug!("Balance provider returned an error {e:?}, trying the next provider");
}
};
}
let mut response = balance_response.ok_or(RpcError::BalanceTemporarilyUnavailable(
namespace.to_string(),
))?;

{
let origin = headers
Expand All @@ -137,7 +147,6 @@ async fn handler_internal(
.unwrap_or((None, None, None));
for balance in &response.balances {
state.analytics.balance_lookup(BalanceLookupInfo::new(
latency,
balance.symbol.clone(),
balance.chain_id.clone().unwrap_or_default(),
balance.quantity.numeric.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub async fn rpc_call(
Some(provider_id) => {
let provider = vec![state
.providers
.get_provider_by_provider_id(&provider_id)
.get_rpc_provider_by_provider_id(&provider_id)
.ok_or_else(|| RpcError::UnsupportedProvider(provider_id.clone()))?];

if let Some(ref testing_project_id) = state.config.server.testing_project_id {
Expand All @@ -94,7 +94,7 @@ pub async fn rpc_call(
}
None => state
.providers
.get_provider_for_chain_id(&chain_id, PROVIDER_PROXY_MAX_CALLS)?,
.get_rpc_provider_for_chain_id(&chain_id, PROVIDER_PROXY_MAX_CALLS)?,
};

for (i, provider) in providers.iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/supported_chains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ pub async fn handler(state: State<Arc<AppState>>) -> Result<Json<SupportedChains
async fn handler_internal(
State(state): State<Arc<AppState>>,
) -> Result<Json<SupportedChains>, RpcError> {
Ok(Json(state.providers.supported_chains.clone()))
Ok(Json(state.providers.rpc_supported_chains.clone()))
}
Loading

0 comments on commit 27c0819

Please sign in to comment.