From c264c311048a4ad73362a4769cc095771f3e6113 Mon Sep 17 00:00:00 2001 From: Amin Moghaddam Date: Tue, 4 Jun 2024 09:44:06 +0200 Subject: [PATCH] feat(fortuna): Traced client for better observability (#1651) * feat(fortuna): Traced client for better observability --- apps/fortuna/Cargo.lock | 2 +- apps/fortuna/Cargo.toml | 2 +- apps/fortuna/src/chain.rs | 1 + apps/fortuna/src/chain/ethereum.rs | 109 +++++++++++++------ apps/fortuna/src/chain/traced_client.rs | 137 ++++++++++++++++++++++++ apps/fortuna/src/command/run.rs | 70 ++++++++---- apps/fortuna/src/config.rs | 1 + apps/fortuna/src/keeper.rs | 94 ++++++++-------- 8 files changed, 314 insertions(+), 102 deletions(-) create mode 100644 apps/fortuna/src/chain/traced_client.rs diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index b227ce607..5e1498fb0 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -1502,7 +1502,7 @@ dependencies = [ [[package]] name = "fortuna" -version = "6.2.2" +version = "6.2.3" dependencies = [ "anyhow", "axum", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index d39741dc1..598286f0b 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "6.2.2" +version = "6.2.3" edition = "2021" [dependencies] diff --git a/apps/fortuna/src/chain.rs b/apps/fortuna/src/chain.rs index 6570dc305..b530c1539 100644 --- a/apps/fortuna/src/chain.rs +++ b/apps/fortuna/src/chain.rs @@ -1,3 +1,4 @@ pub(crate) mod eth_gas_oracle; pub(crate) mod ethereum; pub(crate) mod reader; +pub(crate) mod traced_client; diff --git a/apps/fortuna/src/chain/ethereum.rs b/apps/fortuna/src/chain/ethereum.rs index 2e33e5a1c..4a96c4a22 100644 --- a/apps/fortuna/src/chain/ethereum.rs +++ b/apps/fortuna/src/chain/ethereum.rs @@ -1,5 +1,6 @@ use { crate::{ + api::ChainId, chain::{ eth_gas_oracle::EthProviderOracle, reader::{ @@ -9,6 +10,10 @@ use { EntropyReader, RequestedWithCallbackEvent, }, + traced_client::{ + RpcMetrics, + TracedClient, + }, }, config::EthereumConfig, }, @@ -22,7 +27,6 @@ use { abi::RawLog, contract::{ abigen, - ContractError, EthLogDecode, }, core::types::Address, @@ -34,6 +38,7 @@ use { }, prelude::{ BlockId, + JsonRpcClient, PendingTransaction, TransactionRequest, }, @@ -67,15 +72,19 @@ abigen!( "../../target_chains/ethereum/entropy_sdk/solidity/abis/IEntropy.json" ); -pub type SignablePythContract = PythRandom< +pub type SignablePythContractInner = PythRandom< LegacyTxMiddleware< GasOracleMiddleware< - NonceManagerMiddleware, LocalWallet>>, - EthProviderOracle>, + NonceManagerMiddleware, LocalWallet>>, + EthProviderOracle>, >, >, >; +pub type SignablePythContract = SignablePythContractInner; +pub type InstrumentedSignablePythContract = SignablePythContractInner; + pub type PythContract = PythRandom>; +pub type InstrumentedPythContract = PythRandom>; /// Middleware that converts a transaction into a legacy transaction if use_legacy_tx is true. /// We can not use TransformerMiddleware because keeper calls fill_transaction first which bypasses @@ -157,32 +166,7 @@ impl Middleware for LegacyTxMiddleware { } } -impl SignablePythContract { - pub async fn from_config( - chain_config: &EthereumConfig, - private_key: &str, - ) -> Result { - let provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; - let chain_id = provider.get_chainid().await?; - let gas_oracle = EthProviderOracle::new(provider.clone()); - let wallet__ = private_key - .parse::()? - .with_chain_id(chain_id.as_u64()); - - let address = wallet__.address(); - - Ok(PythRandom::new( - chain_config.contract_addr, - Arc::new(LegacyTxMiddleware::new( - chain_config.legacy_tx, - GasOracleMiddleware::new( - NonceManagerMiddleware::new(SignerMiddleware::new(provider, wallet__), address), - gas_oracle, - ), - )), - )) - } - +impl SignablePythContractInner { /// Submit a request for a random number to the contract. /// /// This method is a version of the autogenned `request` method that parses the emitted logs @@ -249,10 +233,54 @@ impl SignablePythContract { Err(anyhow!("Request failed").into()) } } + + pub async fn from_config_and_provider( + chain_config: &EthereumConfig, + private_key: &str, + provider: Provider, + ) -> Result> { + let chain_id = provider.get_chainid().await?; + let gas_oracle = EthProviderOracle::new(provider.clone()); + let wallet__ = private_key + .parse::()? + .with_chain_id(chain_id.as_u64()); + + let address = wallet__.address(); + + Ok(PythRandom::new( + chain_config.contract_addr, + Arc::new(LegacyTxMiddleware::new( + chain_config.legacy_tx, + GasOracleMiddleware::new( + NonceManagerMiddleware::new(SignerMiddleware::new(provider, wallet__), address), + gas_oracle, + ), + )), + )) + } +} + +impl SignablePythContract { + pub async fn from_config(chain_config: &EthereumConfig, private_key: &str) -> Result { + let provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; + Self::from_config_and_provider(chain_config, private_key, provider).await + } +} + +impl InstrumentedSignablePythContract { + pub async fn from_config( + chain_config: &EthereumConfig, + private_key: &str, + chain_id: ChainId, + metrics: Arc, + ) -> Result { + let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?; + Self::from_config_and_provider(chain_config, private_key, provider).await + } } impl PythContract { - pub fn from_config(chain_config: &EthereumConfig) -> Result { + pub fn from_config(chain_config: &EthereumConfig) -> Result { let provider = Provider::::try_from(&chain_config.geth_rpc_addr)?; Ok(PythRandom::new( @@ -262,8 +290,23 @@ impl PythContract { } } +impl InstrumentedPythContract { + pub fn from_config( + chain_config: &EthereumConfig, + chain_id: ChainId, + metrics: Arc, + ) -> Result { + let provider = TracedClient::new(chain_id, &chain_config.geth_rpc_addr, metrics)?; + + Ok(PythRandom::new( + chain_config.contract_addr, + Arc::new(provider), + )) + } +} + #[async_trait] -impl EntropyReader for PythContract { +impl EntropyReader for PythRandom> { async fn get_request( &self, provider_address: Address, @@ -330,7 +373,7 @@ impl EntropyReader for PythContract { user_random_number: [u8; 32], provider_revelation: [u8; 32], ) -> Result { - let result: Result>> = self + let result = self .reveal_with_callback( provider, sequence_number, diff --git a/apps/fortuna/src/chain/traced_client.rs b/apps/fortuna/src/chain/traced_client.rs new file mode 100644 index 000000000..83f50810b --- /dev/null +++ b/apps/fortuna/src/chain/traced_client.rs @@ -0,0 +1,137 @@ +use { + crate::api::ChainId, + anyhow::Result, + axum::async_trait, + ethers::{ + prelude::Http, + providers::{ + HttpClientError, + JsonRpcClient, + Provider, + }, + }, + prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{ + counter::Counter, + family::Family, + histogram::Histogram, + }, + registry::Registry, + }, + std::{ + str::FromStr, + sync::Arc, + }, + tokio::{ + sync::RwLock, + time::Instant, + }, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, EncodeLabelSet)] +pub struct RpcLabel { + chain_id: ChainId, + method: String, +} + +#[derive(Debug)] +pub struct RpcMetrics { + count: Family, + latency: Family, + errors_count: Family, +} + +impl RpcMetrics { + pub async fn new(metrics_registry: Arc>) -> Self { + let count = Family::default(); + let mut guard = metrics_registry.write().await; + let sub_registry = guard.sub_registry_with_prefix("rpc_requests"); + sub_registry.register( + "count", + "The number of RPC requests made to the chain with the specified method.", + count.clone(), + ); + + let latency = Family::::new_with_constructor(|| { + Histogram::new( + [ + 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ] + .into_iter(), + ) + }); + sub_registry.register( + "latency", + "The latency of RPC requests to the chain with the specified method.", + latency.clone(), + ); + + let errors_count = Family::default(); + sub_registry.register( + "errors_count", + "The number of RPC requests made to the chain that failed.", + errors_count.clone(), + ); + + Self { + count, + latency, + errors_count, + } + } +} + +#[derive(Debug, Clone)] +pub struct TracedClient { + inner: Http, + + chain_id: ChainId, + metrics: Arc, +} + +#[async_trait] +impl JsonRpcClient for TracedClient { + type Error = HttpClientError; + + async fn request< + T: serde::Serialize + Send + Sync + std::fmt::Debug, + R: serde::de::DeserializeOwned + Send, + >( + &self, + method: &str, + params: T, + ) -> Result { + let start = Instant::now(); + let label = &RpcLabel { + chain_id: self.chain_id.clone(), + method: method.to_string(), + }; + self.metrics.count.get_or_create(label).inc(); + let res = match self.inner.request(method, params).await { + Ok(result) => Ok(result), + Err(e) => { + self.metrics.errors_count.get_or_create(label).inc(); + Err(e) + } + }; + + let latency = start.elapsed().as_secs_f64(); + self.metrics.latency.get_or_create(label).observe(latency); + res + } +} + +impl TracedClient { + pub fn new( + chain_id: ChainId, + url: &str, + metrics: Arc, + ) -> Result> { + Ok(Provider::new(TracedClient { + inner: Http::from_str(url)?, + chain_id, + metrics, + })) + } +} diff --git a/apps/fortuna/src/command/run.rs b/apps/fortuna/src/command/run.rs index 8955b00c9..c5ee892b4 100644 --- a/apps/fortuna/src/command/run.rs +++ b/apps/fortuna/src/command/run.rs @@ -5,7 +5,13 @@ use { BlockchainState, ChainId, }, - chain::ethereum::PythContract, + chain::{ + ethereum::InstrumentedPythContract, + traced_client::{ + RpcMetrics, + TracedClient, + }, + }, command::register_provider::CommitmentMetadata, config::{ Commitment, @@ -13,7 +19,10 @@ use { EthereumConfig, RunOptions, }, - keeper, + keeper::{ + self, + KeeperMetrics, + }, state::{ HashChainState, PebbleHashChain, @@ -27,10 +36,6 @@ use { axum::Router, ethers::{ middleware::Middleware, - providers::{ - Http, - Provider, - }, types::{ Address, BlockNumber, @@ -131,8 +136,10 @@ pub async fn run_keeper( config: Config, private_key: String, metrics_registry: Arc>, + rpc_metrics: Arc, ) -> Result<()> { let mut handles = Vec::new(); + let keeper_metrics = Arc::new(KeeperMetrics::new(metrics_registry).await); for (chain_id, chain_config) in chains { let chain_eth_config = config .chains @@ -144,7 +151,8 @@ pub async fn run_keeper( private_key, chain_eth_config, chain_config.clone(), - metrics_registry.clone(), + keeper_metrics.clone(), + rpc_metrics.clone(), ))); } @@ -157,11 +165,13 @@ pub async fn run(opts: &RunOptions) -> Result<()> { "Please specify a provider secret in the config file." ))?; let (tx_exit, rx_exit) = watch::channel(false); + let metrics_registry = Arc::new(RwLock::new(Registry::default())); + let rpc_metrics = Arc::new(RpcMetrics::new(metrics_registry.clone()).await); let mut tasks = Vec::new(); for (chain_id, chain_config) in config.chains.clone() { let secret_copy = secret.clone(); - + let rpc_metrics = rpc_metrics.clone(); tasks.push(spawn(async move { let state = setup_chain_state( &config.provider.address, @@ -169,6 +179,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> { config.provider.chain_sample_interval, &chain_id, &chain_config, + rpc_metrics, ) .await; @@ -207,21 +218,24 @@ pub async fn run(opts: &RunOptions) -> Result<()> { Ok::<(), Error>(()) }); - let metrics_registry = Arc::new(RwLock::new(Registry::default())); - if let Some(keeper_private_key) = config.keeper.private_key.load()? { spawn(run_keeper( chains.clone(), config.clone(), keeper_private_key, metrics_registry.clone(), + rpc_metrics.clone(), )); } else { tracing::info!("Not starting keeper service: no keeper private key specified. Please add one to the config if you would like to run the keeper service.") } // Spawn a thread to track latest block lag. This helps us know if the rpc is up and updated with the latest block. - spawn(track_block_timestamp_lag(config, metrics_registry.clone())); + spawn(track_block_timestamp_lag( + config, + metrics_registry.clone(), + rpc_metrics.clone(), + )); run_api(opts.addr.clone(), chains, metrics_registry, rx_exit).await?; @@ -234,8 +248,13 @@ async fn setup_chain_state( chain_sample_interval: u64, chain_id: &ChainId, chain_config: &EthereumConfig, + rpc_metrics: Arc, ) -> Result { - let contract = Arc::new(PythContract::from_config(&chain_config)?); + let contract = Arc::new(InstrumentedPythContract::from_config( + &chain_config, + chain_id.clone(), + rpc_metrics, + )?); let mut provider_commitments = chain_config.commitments.clone().unwrap_or(Vec::new()); provider_commitments.sort_by(|c1, c2| { c1.original_commitment_sequence_number @@ -289,7 +308,8 @@ async fn setup_chain_state( &commitment.seed, commitment.chain_length, chain_sample_interval, - )?; + ) + .map_err(|e| anyhow!("Failed to create hash chain: {}", e))?; hash_chains.push(pebble_hash_chain); } @@ -329,14 +349,17 @@ pub async fn check_block_timestamp_lag( chain_id: String, chain_config: EthereumConfig, metrics: Family, + rpc_metrics: Arc, ) { - let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { - Ok(r) => r, - Err(e) => { - tracing::error!("Failed to create provider for chain id - {:?}", e); - return; - } - }; + let provider = + match TracedClient::new(chain_id.clone(), &chain_config.geth_rpc_addr, rpc_metrics) { + Ok(r) => r, + Err(e) => { + tracing::error!("Failed to create provider for chain id - {:?}", e); + return; + } + }; + const INF_LAG: i64 = 1000000; // value that definitely triggers an alert let lag = match provider.get_block(BlockNumber::Latest).await { @@ -368,7 +391,11 @@ pub async fn check_block_timestamp_lag( } /// Tracks the difference between the server timestamp and the latest block timestamp for each chain -pub async fn track_block_timestamp_lag(config: Config, metrics_registry: Arc>) { +pub async fn track_block_timestamp_lag( + config: Config, + metrics_registry: Arc>, + rpc_metrics: Arc, +) { let metrics = Family::::default(); metrics_registry.write().await.register( "block_timestamp_lag", @@ -381,6 +408,7 @@ pub async fn track_block_timestamp_lag(config: Config, metrics_registry: Arc>, + metrics: Arc, + rpc_metrics: Arc, ) { - // Register metrics - let keeper_metrics = Arc::new(KeeperMetrics::new(metrics.clone()).await); - tracing::info!("starting keeper"); let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await; tracing::info!("latest safe block: {}", &latest_safe_block); let contract = Arc::new( - SignablePythContract::from_config(&chain_eth_config, &private_key) - .await - .expect("Chain config should be valid"), + InstrumentedSignablePythContract::from_config( + &chain_eth_config, + &private_key, + chain_state.id.clone(), + rpc_metrics.clone(), + ) + .await + .expect("Chain config should be valid"), ); let keeper_address = contract.client().inner().inner().inner().signer().address(); @@ -238,7 +245,7 @@ pub async fn run_keeper_threads( contract.clone(), gas_limit, chain_state.clone(), - keeper_metrics.clone(), + metrics.clone(), fulfilled_requests_cache.clone(), ) .in_current_span(), @@ -262,7 +269,7 @@ pub async fn run_keeper_threads( rx, Arc::clone(&contract), gas_limit, - keeper_metrics.clone(), + metrics.clone(), fulfilled_requests_cache.clone(), ) .in_current_span(), @@ -274,7 +281,18 @@ pub async fn run_keeper_threads( let chain_id = chain_state.id.clone(); let chain_config = chain_eth_config.clone(); let provider_address = chain_state.provider_address.clone(); - let keeper_metrics = keeper_metrics.clone(); + let keeper_metrics = metrics.clone(); + let contract = match InstrumentedPythContract::from_config( + &chain_config, + chain_id.clone(), + rpc_metrics, + ) { + Ok(r) => r, + Err(e) => { + tracing::error!("Error while connecting to pythnet contract. error: {:?}", e); + return; + } + }; loop { // There isn't a loop for indefinite trials. There is a new thread being spawned every `TRACK_INTERVAL` seconds. @@ -283,7 +301,7 @@ pub async fn run_keeper_threads( spawn( track_provider( chain_id.clone(), - chain_config.clone(), + contract.clone(), provider_address.clone(), keeper_metrics.clone(), ) @@ -292,7 +310,7 @@ pub async fn run_keeper_threads( spawn( track_balance( chain_id.clone(), - chain_config.clone(), + contract.client(), keeper_address.clone(), keeper_metrics.clone(), ) @@ -314,7 +332,7 @@ pub async fn run_keeper_threads( pub async fn process_event_with_backoff( event: RequestedWithCallbackEvent, chain_state: BlockchainState, - contract: Arc, + contract: Arc, gas_limit: U256, metrics: Arc, ) { @@ -363,7 +381,7 @@ pub async fn process_event_with_backoff( pub async fn process_event( event: &RequestedWithCallbackEvent, chain_config: &BlockchainState, - contract: &Arc, + contract: &InstrumentedSignablePythContract, gas_limit: U256, metrics: Arc, ) -> Result<(), backoff::Error> { @@ -493,7 +511,7 @@ pub async fn process_event( ))] pub async fn process_block_range( block_range: BlockRange, - contract: Arc, + contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, metrics: Arc, @@ -538,7 +556,7 @@ pub async fn process_block_range( ))] pub async fn process_single_block_batch( block_range: BlockRange, - contract: Arc, + contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, metrics: Arc, @@ -712,7 +730,7 @@ pub async fn watch_blocks( pub async fn process_new_blocks( chain_state: BlockchainState, mut rx: mpsc::Receiver, - contract: Arc, + contract: Arc, gas_limit: U256, metrics: Arc, fulfilled_requests_cache: Arc>>, @@ -738,7 +756,7 @@ pub async fn process_new_blocks( #[tracing::instrument(skip_all)] pub async fn process_backlog( backlog_range: BlockRange, - contract: Arc, + contract: Arc, gas_limit: U256, chain_state: BlockchainState, metrics: Arc, @@ -764,18 +782,10 @@ pub async fn process_backlog( #[tracing::instrument(skip_all)] pub async fn track_balance( chain_id: String, - chain_config: EthereumConfig, + provider: Arc>, address: Address, - metrics_registry: Arc, + metrics: Arc, ) { - let provider = match Provider::::try_from(&chain_config.geth_rpc_addr) { - Ok(r) => r, - Err(e) => { - tracing::error!("Error while connecting to geth rpc. error: {:?}", e); - return; - } - }; - let balance = match provider.get_balance(address, None).await { // This conversion to u128 is fine as the total balance will never cross the limits // of u128 practically. @@ -789,7 +799,7 @@ pub async fn track_balance( // The balance is in wei, so we need to divide by 1e18 to convert it to eth. let balance = balance as f64 / 1e18; - metrics_registry + metrics .balance .get_or_create(&AccountLabel { chain_id: chain_id.clone(), @@ -802,19 +812,11 @@ pub async fn track_balance( /// if there is a error the function will just return #[tracing::instrument(skip_all)] pub async fn track_provider( - chain_id: String, - chain_config: EthereumConfig, + chain_id: ChainId, + contract: InstrumentedPythContract, provider_address: Address, - metrics_registry: Arc, + metrics: Arc, ) { - let contract = match PythContract::from_config(&chain_config) { - Ok(r) => r, - Err(e) => { - tracing::error!("Error while connecting to pythnet contract. error: {:?}", e); - return; - } - }; - let provider_info = match contract.get_provider_info(provider_address).call().await { Ok(info) => info, Err(e) => { @@ -830,7 +832,7 @@ pub async fn track_provider( let current_sequence_number = provider_info.sequence_number; let end_sequence_number = provider_info.end_sequence_number; - metrics_registry + metrics .collected_fee .get_or_create(&AccountLabel { chain_id: chain_id.clone(), @@ -838,7 +840,7 @@ pub async fn track_provider( }) .set(collected_fee); - metrics_registry + metrics .current_sequence_number .get_or_create(&AccountLabel { chain_id: chain_id.clone(), @@ -848,7 +850,7 @@ pub async fn track_provider( // a long time for it to cross the limits of i64. // currently prometheus only supports i64 for Gauge types .set(current_sequence_number as i64); - metrics_registry + metrics .end_sequence_number .get_or_create(&AccountLabel { chain_id: chain_id.clone(),