Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(balances): adding balance providers weights and retrying #883

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/event_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ jobs:
RPC_PROXY_PROVIDER_TENDERLY_API_KEY: ""
RPC_PROXY_PROVIDER_TENDERLY_ACCOUNT_ID: ""
RPC_PROXY_PROVIDER_TENDERLY_PROJECT_ID: ""
RPC_PROXY_PROVIDER_ZERION_API_KEY: ""
- run: docker logs mock-bundler-anvil-1
if: failure()
- run: docker logs mock-bundler-alto-1
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ $ docker run -p 3000:3000 \
-e RPC_PROXY_POKT_PROJECT_ID=<some_id> \
-e RPC_PROXY_INFURA_PROJECT_ID=<some_id> \
-e RPC_PROXY_QUICKNODE_API_TOKENS=<JSON> \
-e RPC_PROXY_ZERION_API_KEY=<some_id> \
-e RPC_PROXY_REGISTRY_API_URL=<registry_url> \
-e RPC_PROXY_REGISTRY_API_AUTH_TOKEN=<token> \
--name rpc -it rpc-proxy
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ services:
- RPC_PROXY_INFURA_PROJECT_ID=${RPC_PROXY_INFURA_PROJECT_ID}
- RPC_PROXY_POKT_PROJECT_ID=${RPC_PROXY_POKT_PROJECT_ID}
- RPC_PROXY_QUICKNODE_API_TOKENS=${RPC_PROXY_QUICKNODE_API_TOKENS}
- RPC_PROXY_ZERION_API_KEY=${RPC_PROXY_ZERION_API_KEY}
- RPC_PROXY_REGISTRY_API_URL=https://registry-prod-cf.walletconnect.com
- RPC_PROXY_REGISTRY_API_AUTH_TOKEN=${RPC_PROXY_REGISTRY_API_AUTH_TOKEN}
- RPC_PROXY_STORAGE_PROJECT_DATA_REDIS_ADDR_READ=redis://redis:6379/0
Expand Down
9 changes: 1 addition & 8 deletions src/analytics/balance_lookup_info.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
use {
parquet_derive::ParquetRecordWriter,
serde::Serialize,
std::{sync::Arc, time::Duration},
};
use {parquet_derive::ParquetRecordWriter, serde::Serialize, std::sync::Arc};

#[derive(Debug, Clone, Serialize, ParquetRecordWriter)]
#[serde(rename_all = "camelCase")]
pub struct BalanceLookupInfo {
pub timestamp: chrono::NaiveDateTime,
pub latency_secs: f64,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have per-providers latency, so this should be removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did data team already remove latency_secs from the schema? Otherwise this could cause null errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, @dnul is aware of this.

pub symbol: String,
pub implementation_chain_id: String,
Expand All @@ -29,7 +24,6 @@ pub struct BalanceLookupInfo {
impl BalanceLookupInfo {
#[allow(clippy::too_many_arguments)]
pub fn new(
latency: Duration,
symbol: String,
implementation_chain_id: String,
quantity: String,
Expand All @@ -45,7 +39,6 @@ impl BalanceLookupInfo {
) -> Self {
Self {
timestamp: wc::analytics::time::now(),
latency_secs: latency.as_secs_f64(),
symbol,
implementation_chain_id,
quantity,
Expand Down
15 changes: 11 additions & 4 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use {
project::{storage::Config as StorageConfig, Config as RegistryConfig},
providers::{ProviderKind, ProvidersConfig, Weight},
storage::irn::Config as IrnConfig,
utils::rate_limit::RateLimitingConfig,
utils::{crypto::CaipNamespaces, rate_limit::RateLimitingConfig},
},
serde::de::DeserializeOwned,
std::{collections::HashMap, fmt::Display},
};
pub use {
arbitrum::*, aurora::*, base::*, berachain::*, binance::*, getblock::*, infura::*, lava::*,
mantle::*, morph::*, near::*, pokt::*, publicnode::*, quicknode::*, server::*, unichain::*,
zksync::*, zora::*,
mantle::*, morph::*, near::*, pokt::*, publicnode::*, quicknode::*, server::*, solscan::*,
unichain::*, zerion::*, zksync::*, zora::*,
};
mod arbitrum;
mod aurora;
Expand All @@ -33,7 +33,9 @@ mod pokt;
mod publicnode;
mod quicknode;
mod server;
pub mod solscan;
mod unichain;
pub mod zerion;
mod zksync;
mod zora;

Expand Down Expand Up @@ -87,6 +89,11 @@ pub trait ProviderConfig {
fn provider_kind(&self) -> ProviderKind;
}

pub trait BalanceProviderConfig {
fn supported_namespaces(self) -> HashMap<CaipNamespaces, Weight>;
fn provider_kind(&self) -> ProviderKind;
}

#[cfg(test)]
#[cfg(not(feature = "test-mock-bundler"))] // These tests depend on environment variables
mod test {
Expand Down Expand Up @@ -277,7 +284,7 @@ mod test {
infura_project_id: "INFURA_PROJECT_ID".to_string(),
pokt_project_id: "POKT_PROJECT_ID".to_string(),
quicknode_api_tokens: "QUICKNODE_API_TOKENS".to_string(),
zerion_api_key: Some("ZERION_API_KEY".to_owned()),
zerion_api_key: "ZERION_API_KEY".to_owned(),
coinbase_api_key: Some("COINBASE_API_KEY".to_owned()),
coinbase_app_id: Some("COINBASE_APP_ID".to_owned()),
one_inch_api_key: Some("ONE_INCH_API_KEY".to_owned()),
Expand Down
39 changes: 39 additions & 0 deletions src/env/solscan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use {
super::BalanceProviderConfig,
crate::{
providers::{Priority, Weight},
utils::crypto::CaipNamespaces,
},
std::collections::HashMap,
};

pub struct SolScanConfig {
pub api_key: String,
pub supported_namespaces: HashMap<CaipNamespaces, Weight>,
}

impl SolScanConfig {
pub fn new(api_key: String) -> Self {
Self {
api_key,
supported_namespaces: default_supported_namespaces(),
}
}
}

impl BalanceProviderConfig for SolScanConfig {
fn supported_namespaces(self) -> HashMap<CaipNamespaces, Weight> {
self.supported_namespaces
}

fn provider_kind(&self) -> crate::providers::ProviderKind {
crate::providers::ProviderKind::SolScan
}
}

fn default_supported_namespaces() -> HashMap<CaipNamespaces, Weight> {
HashMap::from([(
CaipNamespaces::Solana,
Weight::new(Priority::Normal).unwrap(),
)])
}
40 changes: 40 additions & 0 deletions src/env/zerion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use {
super::BalanceProviderConfig,
crate::{
providers::{Priority, Weight},
utils::crypto::CaipNamespaces,
},
std::collections::HashMap,
};

#[derive(Debug)]
pub struct ZerionConfig {
pub api_key: String,
pub supported_namespaces: HashMap<CaipNamespaces, Weight>,
}

impl ZerionConfig {
pub fn new(api_key: String) -> Self {
Self {
api_key,
supported_namespaces: default_supported_namespaces(),
}
}
}

impl BalanceProviderConfig for ZerionConfig {
fn supported_namespaces(self) -> HashMap<CaipNamespaces, Weight> {
self.supported_namespaces
}

fn provider_kind(&self) -> crate::providers::ProviderKind {
crate::providers::ProviderKind::Zerion
}
}

fn default_supported_namespaces() -> HashMap<CaipNamespaces, Weight> {
HashMap::from([(
CaipNamespaces::Eip155,
Weight::new(Priority::Normal).unwrap(),
)])
}
11 changes: 11 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ pub enum RpcError {
#[error("Failed to reach the balance provider")]
BalanceProviderError,

#[error("Requested balance provider for the namespace is temporarily unavailable: {0}")]
BalanceTemporarilyUnavailable(String),

#[error("Failed to reach the fungible price provider: {0}")]
FungiblePriceProviderError(String),

Expand Down Expand Up @@ -282,6 +285,14 @@ impl IntoResponse for RpcError {
)),
)
.into_response(),
Self::BalanceTemporarilyUnavailable(namespace) => (
StatusCode::SERVICE_UNAVAILABLE,
Json(new_error_response(
"chainId".to_string(),
format!("Requested namespace {namespace} balance provider is temporarily unavailable"),
)),
)
.into_response(),
Self::InvalidChainIdFormat(chain_id) => (
StatusCode::BAD_REQUEST,
Json(new_error_response(
Expand Down
45 changes: 27 additions & 18 deletions src/handlers/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ use {
ethers::{abi::Address, types::H160},
hyper::HeaderMap,
serde::{Deserialize, Serialize},
std::{
net::SocketAddr,
sync::Arc,
time::{Duration, SystemTime},
},
std::{net::SocketAddr, sync::Arc},
tap::TapFallible,
tracing::log::{debug, error},
wc::future::FutureExt,
};

const PROVIDER_MAX_CALLS: usize = 2;

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BalanceQueryParams {
Expand Down Expand Up @@ -108,20 +106,32 @@ async fn handler_internal(
return Err(RpcError::InvalidAddress);
}

let provider = state
let providers = state
.providers
.balance_providers
.get(&namespace)
.ok_or_else(|| RpcError::UnsupportedNamespace(namespace))?;
.get_balance_provider_for_namespace(&namespace, PROVIDER_MAX_CALLS)?;

let start = SystemTime::now();
let mut response = provider
.get_balance(address.clone(), query.clone().0, state.metrics.clone())
.await
.tap_err(|e| {
error!("Failed to call balance with {}", e);
})?;
let latency = start.elapsed().unwrap_or(Duration::from_secs(0));
let mut balance_response = None;
for provider in providers.iter() {
let provider_response = provider
.get_balance(address.clone(), query.clone().0, state.metrics.clone())
.await
.tap_err(|e| {
error!("Failed to call balance with {}", e);
});

match provider_response {
Ok(response) => {
balance_response = Some(response);
break;
}
e => {
debug!("Balance provider returned an error {e:?}, trying the next provider");
}
};
}
let mut response = balance_response.ok_or(RpcError::BalanceTemporarilyUnavailable(
namespace.to_string(),
))?;

{
let origin = headers
Expand All @@ -137,7 +147,6 @@ async fn handler_internal(
.unwrap_or((None, None, None));
for balance in &response.balances {
state.analytics.balance_lookup(BalanceLookupInfo::new(
latency,
balance.symbol.clone(),
balance.chain_id.clone().unwrap_or_default(),
balance.quantity.numeric.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub async fn rpc_call(
Some(provider_id) => {
let provider = vec![state
.providers
.get_provider_by_provider_id(&provider_id)
.get_rpc_provider_by_provider_id(&provider_id)
.ok_or_else(|| RpcError::UnsupportedProvider(provider_id.clone()))?];

if let Some(ref testing_project_id) = state.config.server.testing_project_id {
Expand All @@ -94,7 +94,7 @@ pub async fn rpc_call(
}
None => state
.providers
.get_provider_for_chain_id(&chain_id, PROVIDER_PROXY_MAX_CALLS)?,
.get_rpc_provider_for_chain_id(&chain_id, PROVIDER_PROXY_MAX_CALLS)?,
};

for (i, provider) in providers.iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/supported_chains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ pub async fn handler(state: State<Arc<AppState>>) -> Result<Json<SupportedChains
async fn handler_internal(
State(state): State<Arc<AppState>>,
) -> Result<Json<SupportedChains>, RpcError> {
Ok(Json(state.providers.supported_chains.clone()))
Ok(Json(state.providers.rpc_supported_chains.clone()))
}
Loading
Loading