diff --git a/apps/fortuna/Cargo.lock b/apps/fortuna/Cargo.lock index a8ff76ccf..c6c249061 100644 --- a/apps/fortuna/Cargo.lock +++ b/apps/fortuna/Cargo.lock @@ -277,6 +277,20 @@ dependencies = [ "url", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -1488,12 +1502,13 @@ dependencies = [ [[package]] name = "fortuna" -version = "6.1.0" +version = "6.2.0" dependencies = [ "anyhow", "axum", "axum-macros", "axum-test", + "backoff", "base64 0.21.4", "bincode", "byteorder", diff --git a/apps/fortuna/Cargo.toml b/apps/fortuna/Cargo.toml index c943185c4..7da40ba3d 100644 --- a/apps/fortuna/Cargo.toml +++ b/apps/fortuna/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fortuna" -version = "6.1.0" +version = "6.2.0" edition = "2021" [dependencies] @@ -35,6 +35,7 @@ once_cell = "1.18.0" lazy_static = "1.4.0" url = "2.5.0" chrono = { version = "0.4.38", features = ["clock", "std"] , default-features = false} +backoff = { version = "0.4.0", features = ["futures", "tokio"] } [dev-dependencies] diff --git a/apps/fortuna/src/chain/ethereum.rs b/apps/fortuna/src/chain/ethereum.rs index 166182980..9a9509329 100644 --- a/apps/fortuna/src/chain/ethereum.rs +++ b/apps/fortuna/src/chain/ethereum.rs @@ -270,7 +270,7 @@ impl EntropyReader for PythContract { sequence_number: u64, user_random_number: [u8; 32], provider_revelation: [u8; 32], - ) -> Result> { + ) -> Result { let result: Result>> = self .reveal_with_callback( provider, @@ -281,19 +281,6 @@ impl EntropyReader for PythContract { .estimate_gas() .await; - match result { - Ok(gas) => Ok(Some(gas)), - Err(e) => match e { - ContractError::ProviderError { e } => Err(anyhow!(e)), - _ => { - tracing::info!( - sequence_number = sequence_number, - "Gas estimation failed. error: {:?}", - e - ); - Ok(None) - } - }, - } + result.map_err(|e| e.into()) } } diff --git a/apps/fortuna/src/chain/reader.rs b/apps/fortuna/src/chain/reader.rs index 5245821d8..2bec67b73 100644 --- a/apps/fortuna/src/chain/reader.rs +++ b/apps/fortuna/src/chain/reader.rs @@ -57,15 +57,14 @@ pub trait EntropyReader: Send + Sync { to_block: BlockNumber, ) -> Result>; - /// Simulate a reveal with callback. Returns Some(gas) if the estimation was successful. - /// Returns None otherwise. Returns an error if the gas could not be estimated. + /// Estimate the gas required to reveal a random number with a callback. async fn estimate_reveal_with_callback_gas( &self, provider: Address, sequence_number: u64, user_random_number: [u8; 32], provider_revelation: [u8; 32], - ) -> Result>; + ) -> Result; } /// An in-flight request stored in the contract. @@ -189,8 +188,8 @@ pub mod mock { sequence_number: u64, user_random_number: [u8; 32], provider_revelation: [u8; 32], - ) -> Result> { - Ok(Some(U256::from(5))) + ) -> Result { + Ok(U256::from(5)) } } } diff --git a/apps/fortuna/src/keeper.rs b/apps/fortuna/src/keeper.rs index d40c0796b..a34867fa9 100644 --- a/apps/fortuna/src/keeper.rs +++ b/apps/fortuna/src/keeper.rs @@ -20,8 +20,8 @@ use { anyhow, Result, }, + backoff::ExponentialBackoff, ethers::{ - contract::ContractError, providers::{ Http, Middleware, @@ -45,7 +45,7 @@ use { registry::Registry, }, std::{ - collections::HashMap, + collections::HashSet, sync::{ atomic::AtomicU64, Arc, @@ -204,7 +204,7 @@ async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber { /// Run threads to handle events for the last `BACKLOG_RANGE` blocks, watch for new blocks and /// handle any events for the new blocks. -#[tracing::instrument(name="keeper", skip_all, fields(chain_id=chain_state.id))] +#[tracing::instrument(name = "keeper", skip_all, fields(chain_id = chain_state.id))] pub async fn run_keeper_threads( private_key: String, chain_eth_config: EthereumConfig, @@ -225,7 +225,7 @@ pub async fn run_keeper_threads( ); let keeper_address = contract.client().inner().inner().inner().signer().address(); - let fulfilled_requests_cache = Arc::new(RwLock::new(HashMap::::new())); + let fulfilled_requests_cache = Arc::new(RwLock::new(HashSet::::new())); // Spawn a thread to handle the events from last BACKLOG_RANGE blocks. let gas_limit: U256 = chain_eth_config.gas_limit.into(); @@ -307,40 +307,74 @@ pub async fn run_keeper_threads( } -/// Process an event for a chain. It estimates the gas for the reveal with callback and +/// Process an event with backoff. It will retry the reveal on failure for 5 minutes. +#[tracing::instrument(name = "process_event_with_backoff", skip_all, fields( + sequence_number = event.sequence_number +))] +pub async fn process_event_with_backoff( + event: RequestedWithCallbackEvent, + chain_state: BlockchainState, + contract: Arc, + gas_limit: U256, + metrics: Arc, +) { + metrics + .requests + .get_or_create(&AccountLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }) + .inc(); + tracing::info!("Started processing event"); + let mut backoff = ExponentialBackoff::default(); + backoff.max_elapsed_time = Some(Duration::from_secs(300)); // retry for 5 minutes + match backoff::future::retry_notify( + backoff, + || async { + process_event(&event, &chain_state, &contract, gas_limit, metrics.clone()).await + }, + |e, dur| { + tracing::error!("Error happened at {:?}: {}", dur, e); + }, + ) + .await + { + Ok(()) => { + tracing::info!("Processed event",); + } + Err(e) => { + tracing::error!("Failed to process event: {:?}", e); + } + } + metrics + .requests_processed + .get_or_create(&AccountLabel { + chain_id: chain_state.id.clone(), + address: chain_state.provider_address.to_string(), + }) + .inc(); +} + + +/// Process a callback on a chain. It estimates the gas for the reveal with callback and /// submits the transaction if the gas estimate is below the gas limit. -/// It will return an Error if the gas estimation failed with a provider error or if the -/// reveal with callback failed with a provider error. +/// It will return a permanent or transient error depending on the error type and whether +/// retry is possible or not. pub async fn process_event( - event: RequestedWithCallbackEvent, + event: &RequestedWithCallbackEvent, chain_config: &BlockchainState, contract: &Arc, gas_limit: U256, metrics: Arc, - fulfilled_requests_cache: Arc>>, -) -> Result<()> { +) -> Result<(), backoff::Error> { + // ignore requests that are not for the configured provider if chain_config.provider_address != event.provider_address { - fulfilled_requests_cache - .write() - .await - .insert(event.sequence_number, RequestState::Fulfilled); return Ok(()); } - let provider_revelation = match chain_config.state.reveal(event.sequence_number) { - Ok(result) => result, - Err(e) => { - tracing::error!( - sequence_number = &event.sequence_number, - "Error while revealing with error: {:?}", - e - ); - fulfilled_requests_cache - .write() - .await - .insert(event.sequence_number, RequestState::Fulfilled); - return Ok(()); - } - }; + let provider_revelation = chain_config + .state + .reveal(event.sequence_number) + .map_err(|e| backoff::Error::permanent(anyhow!("Error revealing: {:?}", e)))?; let gas_estimate_res = chain_config .contract @@ -353,159 +387,98 @@ pub async fn process_event( .in_current_span() .await; - match gas_estimate_res { - Ok(gas_estimate_option) => match gas_estimate_option { - Some(gas_estimate) => { - // Pad the gas estimate by 33% - let (gas_estimate, _) = gas_estimate - .saturating_mul(U256::from(4)) - .div_mod(U256::from(3)); + let gas_estimate = gas_estimate_res.map_err(|e| { + // we consider the error transient even if it is a contract revert since + // it can be because of routing to a lagging RPC node. Retrying such errors will + // incur a few additional RPC calls, but it is fine. + backoff::Error::transient(anyhow!("Error estimating gas for reveal: {:?}", e)) + })?; + + // Pad the gas estimate by 33% + let gas_estimate = gas_estimate.saturating_mul(4.into()) / 3; + + if gas_estimate > gas_limit { + return Err(backoff::Error::permanent(anyhow!( + "Gas estimate for reveal with callback is higher than the gas limit {} > {}", + gas_estimate, + gas_limit + ))); + } - if gas_estimate > gas_limit { - tracing::error!( - sequence_number = &event.sequence_number, - "Gas estimate for reveal with callback is higher than the gas limit" - ); + let contract_call = contract + .reveal_with_callback( + event.provider_address, + event.sequence_number, + event.user_random_number, + provider_revelation, + ) + .gas(gas_estimate); - fulfilled_requests_cache - .write() - .await - .insert(event.sequence_number, RequestState::Fulfilled); - return Ok(()); - } - let contract_call = contract - .reveal_with_callback( - event.provider_address, - event.sequence_number, - event.user_random_number, - provider_revelation, - ) - .gas(gas_estimate); - - let res = contract_call.send().await; - - let pending_tx = match res { - Ok(pending_tx) => pending_tx, - Err(e) => match e { - // If there is a provider error, we weren't able to send the transaction. - // We will return an error. So, that the caller can decide what to do (retry). - ContractError::ProviderError { e } => return Err(e.into()), - // For all the other errors, it is likely the case we won't be able to reveal for - // ever. We will return an Ok(()) to signal that we have processed this reveal - // and concluded that its Ok to not reveal. - _ => { - fulfilled_requests_cache - .write() - .await - .insert(event.sequence_number, RequestState::Processed); - tracing::error!( - sequence_number = &event.sequence_number, - "Error while revealing with error: {:?}", - e - ); - return Ok(()); - } - }, - }; - - match pending_tx.await { - Ok(res) => match res { - Some(res) => { - tracing::info!( - sequence_number = &event.sequence_number, - transaction_hash = &res.transaction_hash.to_string(), - gas_used = ?res.gas_used, - "Revealed with res: {:?}", - res - ); - - if let Some(gas_used) = res.gas_used { - let gas_used = gas_used.as_u128() as f64 / 1e18; - metrics - .total_gas_spent - .get_or_create(&AccountLabel { - chain_id: chain_config.id.clone(), - address: contract - .client() - .inner() - .inner() - .inner() - .signer() - .address() - .to_string(), - }) - .inc_by(gas_used); - } - - metrics - .reveals - .get_or_create(&AccountLabel { - chain_id: chain_config.id.clone(), - address: chain_config.provider_address.to_string(), - }) - .inc(); - - fulfilled_requests_cache - .write() - .await - .insert(event.sequence_number, RequestState::Fulfilled); - Ok(()) - } - None => { - tracing::error!( - sequence_number = &event.sequence_number, - "Can't verify the reveal" - ); - // It is better to return an error in this scenario - // For the caller to retry - Err(anyhow!("Can't verify the reveal")) - } - }, - - Err(e) => { - tracing::error!( - sequence_number = &event.sequence_number, - "Error while revealing with error: {:?}", - e - ); - Err(e.into()) - } - } - } - None => { - tracing::info!( - sequence_number = &event.sequence_number, - "Not fulfilling event" - ); - fulfilled_requests_cache - .write() - .await - .insert(event.sequence_number, RequestState::Processed); - Ok(()) - } - }, - Err(e) => { - tracing::error!( - sequence_number = &event.sequence_number, - "Error while simulating reveal with error: {:?}", - e - ); - Err(e) - } + let pending_tx = contract_call.send().await.map_err(|e| { + backoff::Error::transient(anyhow!("Error submitting the reveal transaction: {:?}", e)) + })?; + + let receipt = pending_tx + .await + .map_err(|e| { + backoff::Error::transient(anyhow!("Error waiting for transaction receipt {:?}", e)) + })? + .ok_or_else(|| { + backoff::Error::transient(anyhow!( + "Can't verify the reveal, probably dropped from mempool" + )) + })?; + + tracing::info!( + sequence_number = &event.sequence_number, + transaction_hash = &receipt.transaction_hash.to_string(), + gas_used = ?receipt.gas_used, + "Revealed with res: {:?}", + receipt + ); + + if let Some(gas_used) = receipt.gas_used { + let gas_used = gas_used.as_u128() as f64 / 1e18; + metrics + .total_gas_spent + .get_or_create(&AccountLabel { + chain_id: chain_config.id.clone(), + address: contract + .client() + .inner() + .inner() + .inner() + .signer() + .address() + .to_string(), + }) + .inc_by(gas_used); } + + metrics + .reveals + .get_or_create(&AccountLabel { + chain_id: chain_config.id.clone(), + address: chain_config.provider_address.to_string(), + }) + .inc(); + + Ok(()) } /// Process a range of blocks in batches. It calls the `process_single_block_batch` method for each batch. -#[tracing::instrument(skip_all, fields(range_from_block=block_range.from, range_to_block=block_range.to))] +#[tracing::instrument(skip_all, fields( + range_from_block = block_range.from, range_to_block = block_range.to +))] pub async fn process_block_range( block_range: BlockRange, contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, metrics: Arc, - fulfilled_requests_cache: Arc>>, + fulfilled_requests_cache: Arc>>, ) { let BlockRange { from: first_block, @@ -541,14 +514,16 @@ pub async fn process_block_range( /// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled. /// It won't reprocess it. If the request was already processed, it will reprocess it. /// If the process fails, it will retry indefinitely. -#[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))] +#[tracing::instrument(name = "batch", skip_all, fields( + batch_from_block = block_range.from, batch_to_block = block_range.to +))] pub async fn process_single_block_batch( block_range: BlockRange, contract: Arc, gas_limit: U256, chain_state: api::BlockchainState, metrics: Arc, - fulfilled_requests_cache: Arc>>, + fulfilled_requests_cache: Arc>>, ) { loop { let events_res = chain_state @@ -560,69 +535,23 @@ pub async fn process_single_block_batch( Ok(events) => { tracing::info!(num_of_events = &events.len(), "Processing",); for event in &events { - if let Some(state) = fulfilled_requests_cache - .read() + // the write lock guarantees we spawn only one task per sequence number + let newly_inserted = fulfilled_requests_cache + .write() .await - .get(&event.sequence_number) - { - match state { - RequestState::Fulfilled => { - tracing::info!( - sequence_number = &event.sequence_number, - "Skipping already fulfilled request", - ); - continue; - } - RequestState::Processed => { - tracing::info!( - sequence_number = &event.sequence_number, - "Reprocessing already processed request", - ); - metrics - .requests_reprocessed - .get_or_create(&AccountLabel { - chain_id: chain_state.id.clone(), - address: chain_state.provider_address.to_string(), - }) - .inc(); - } - } - } - metrics - .requests - .get_or_create(&AccountLabel { - chain_id: chain_state.id.clone(), - address: chain_state.provider_address.to_string(), - }) - .inc(); - tracing::info!(sequence_number = &event.sequence_number, "Processing event",); - while let Err(e) = process_event( - event.clone(), - &chain_state, - &contract, - gas_limit, - metrics.clone(), - fulfilled_requests_cache.clone(), - ) - .in_current_span() - .await - { - tracing::error!( - sequence_number = &event.sequence_number, - "Error while processing event. Waiting for {} seconds before retry. error: {:?}", - RETRY_INTERVAL.as_secs(), - e + .insert(event.sequence_number); + if newly_inserted { + spawn( + process_event_with_backoff( + event.clone(), + chain_state.clone(), + contract.clone(), + gas_limit, + metrics.clone(), + ) + .in_current_span(), ); - time::sleep(RETRY_INTERVAL).await; } - tracing::info!(sequence_number = &event.sequence_number, "Processed event",); - metrics - .requests_processed - .get_or_create(&AccountLabel { - chain_id: chain_state.id.clone(), - address: chain_state.provider_address.to_string(), - }) - .inc(); } tracing::info!(num_of_events = &events.len(), "Processed",); break; @@ -641,7 +570,9 @@ pub async fn process_single_block_batch( /// Wrapper for the `watch_blocks` method. If there was an error while watching, it will retry after a delay. /// It retries indefinitely. -#[tracing::instrument(name="watch_blocks", skip_all, fields(initial_safe_block=latest_safe_block))] +#[tracing::instrument(name = "watch_blocks", skip_all, fields( + initial_safe_block = latest_safe_block +))] pub async fn watch_blocks_wrapper( chain_state: BlockchainState, latest_safe_block: BlockNumber, @@ -765,7 +696,7 @@ pub async fn process_new_blocks( contract: Arc, gas_limit: U256, metrics: Arc, - fulfilled_requests_cache: Arc>>, + fulfilled_requests_cache: Arc>>, ) { tracing::info!("Waiting for new block ranges to process"); loop { @@ -792,7 +723,7 @@ pub async fn process_backlog( gas_limit: U256, chain_state: BlockchainState, metrics: Arc, - fulfilled_requests_cache: Arc>>, + fulfilled_requests_cache: Arc>>, ) { tracing::info!("Processing backlog"); process_block_range(