From 31b29d229bd0f2e5b1807c04fe5c6cb411ecdbef Mon Sep 17 00:00:00 2001 From: Michael Snoyman Date: Tue, 25 Jun 2024 14:30:50 +0300 Subject: [PATCH 1/3] Timeout protection, overall better fallback/retry logic --- packages/cosmos/src/client.rs | 237 +++++++++++++++----------- packages/cosmos/src/cosmos_builder.rs | 15 ++ 2 files changed, 157 insertions(+), 95 deletions(-) diff --git a/packages/cosmos/src/client.rs b/packages/cosmos/src/client.rs index 18693b2..c90cde4 100644 --- a/packages/cosmos/src/client.rs +++ b/packages/cosmos/src/client.rs @@ -30,7 +30,7 @@ use cosmos_sdk_proto::{ traits::Message, }; use parking_lot::{Mutex, RwLock}; -use tokio::time::Instant; +use tokio::{task::JoinSet, time::Instant}; use tonic::{service::Interceptor, Status}; use crate::{ @@ -338,118 +338,165 @@ impl Cosmos { all_nodes, }: PerformQueryBuilder<'_, Request>, ) -> Result, QueryError> { - // If we're broadcasting a transaction and want to use all nodes, - // first we kick off a task to broadcast to all nodes. - // This is redundant with our broadcasts below, perhaps in the future - // we'll look at optimizing this further. - let all_nodes_handle = if all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast() - { - tracing::debug!("Initiating all-nodes broadcast"); - let cosmos = cosmos.clone(); + // This function is responsible for running queries against blockchain nodes. + // There are two primary ways of operating: + // + // All nodes: this is used when broadcasting transactions. The idea is that we + // want to broadcast to _all_ non-blocked nodes, since sometimes some nodes are + // unable to rebroadcast transactions from their mempool over P2P. In this case, + // we want to broadcast to all nodes immediately, return from this function as + // soon as the first success comes through, and let broadcasts to other nodes + // continue in the background. + // + // Regular: for everything else, we don't want to spam all nodes with every + // request. Instead, we get a priority list of the healthiest nodes and try them + // in order. We delay each successive node by a configurable amount of time to + // allow the earlier nodes to complete. We want to return from this function as + // soon as the first success comes through, but we want to cancel all remaining + // work at that point. + // + // Regardless, we track all of our tasks in a JoinSet. + let mut set = JoinSet::new(); + + // Set up channels for the individual workers to send back either success or + // error results. We keep separate channels, since we may want to optimistically + // exit on an early success, but likely want to wait for all nodes on failure. + let (tx_success, mut rx_success) = tokio::sync::mpsc::channel(1); + let (tx_error, mut rx_error) = tokio::sync::mpsc::channel(1); + + // Grab some config values. + let all_nodes = all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast(); + let delay = cosmos.get_cosmos_builder().get_delay_before_fallback(); + let total_attempts = cosmos.pool.builder.query_retries(); + + // Get the set of nodes we should run against. + let nodes = if all_nodes { + cosmos + .pool + .all_nodes() + .filter(|node| match node.node_health_level() { + crate::error::NodeHealthLevel::Unblocked { error_count: _ } => true, + crate::error::NodeHealthLevel::Blocked => false, + }) + .cloned() + .collect() + } else { + cosmos.pool.node_chooser.choose_nodes() + }; + + if cosmos.pool.builder.get_log_requests() { + tracing::info!("{action}"); + } + + for node in nodes { + // Cloning for passing into the async move + let tx_success = tx_success.clone(); + let tx_error = tx_error.clone(); + let action = action.clone(); let req = req.clone(); - Some(tokio::spawn(async move { - let all_nodes = cosmos.pool.all_nodes(); - let mut success = None; - for node in all_nodes { + let cosmos = cosmos.clone(); + let (tx_complete, rx_complete) = tokio::sync::oneshot::channel(); + set.spawn(async move { + for attempt in 1..=total_attempts { let _permit = cosmos.pool.get_node_permit().await; - match node.node_health_level() { - crate::error::NodeHealthLevel::Unblocked { error_count: _ } => {} - // // Don't use a node which is rate limiting us - crate::error::NodeHealthLevel::Blocked => continue, - } - match cosmos.perform_query_inner(req.clone(), node).await { + match cosmos.perform_query_inner(req.clone(), &node).await { Ok(tonic) => { - tracing::debug!( - "Successfully performed an all-nodes broadcast to {}", - node.grpc_url(), - ); - if success.is_none() { - success = Some(PerformQueryWrapper { + node.log_query_result(QueryResult::Success); + tx_success + .try_send(PerformQueryWrapper { grpc_url: node.grpc_url().clone(), tonic, - }); - } + }) + .ok(); + break; } - Err((err, _)) => { - tracing::debug!("Failed doing an all-nodes broadcast: {err}") + Err((err, can_retry)) => { + tracing::debug!("Error performing a query, retrying. Attempt {attempt} of {total_attempts}. {err:?}"); + node.log_query_result(if can_retry { + QueryResult::NetworkError { + err: err.clone(), + action: action.clone(), + } + } else { + QueryResult::OtherError + }); + tx_error.try_send((err, node.grpc_url().clone())).ok(); + if !can_retry || !should_retry { + break; + } } } } - success - })) - } else { - None - }; - - let nodes = cosmos.pool.node_chooser.choose_nodes(); - let mut first_error = None; - let total_attempts = cosmos.pool.builder.query_retries(); - - // We want to keep trying up to our total attempts amount. - // We're willing to reuse nodes that we had tried previously. - let nodes = nodes.into_iter().cycle().take(total_attempts); - - for (idx, node) in nodes.enumerate() { - let _permit = cosmos.pool.get_node_permit().await; - if cosmos.pool.builder.get_log_requests() { - tracing::info!("{action}"); - } + tx_complete.send(()).ok(); + }); - match cosmos.perform_query_inner(req.clone(), &node).await { - Ok(x) => { - node.log_query_result(QueryResult::Success); - return Ok(PerformQueryWrapper { - grpc_url: node.grpc_url().clone(), - tonic: x, - }); - } - Err((err, can_retry)) => { - node.log_query_result(if can_retry { - QueryResult::NetworkError { - err: err.clone(), - action: action.clone(), + // If we're not doing an all-nodes broadcast, then check we wait-and-check. + // The waiting is handled by the sleep call. If a successful response + // comes in while timeout-ing, then we'll get that result and return it. + if !all_nodes { + tokio::select! { + // Timeout occurred, keep going + _ = tokio::time::sleep(delay) => (), + // Child task finished, also keep going, don't wait for the timeout + _ = rx_complete => (), + // We got a success, let's use it! + success = rx_success.recv() => match success { + // rx_success yielded a value, stop doing other work and return the success + Some(success) => { + set.abort_all(); + return Ok(success); } - } else { - QueryResult::OtherError - }); - - tracing::debug!( - "Error performing a query, retrying. Attempt {} of {total_attempts}. {err:?}", - idx + 1, - ); - - if first_error.is_none() { - first_error = Some((err, node.grpc_url().clone())); - } - - if !can_retry || !should_retry { - break; + // All send channels are closed, which should never happen at this point + None => unreachable!(), } } - }; + } } - // If we did an all-nodes broadcast and one was successful, use it. - if let Some(handle) = all_nodes_handle { - if let Ok(Some(res)) = handle.await { - return Ok(res); + // Drop the remaining send side of channels to avoid deadlocks while waiting below. + std::mem::drop(tx_success); + std::mem::drop(tx_error); + + // Now we wait for either a success or error to come back. + let res = tokio::select! { + success = rx_success.recv() => match success { + Some(success) => Ok(success), + None => { + // No successes, but let's see if we can get a more accurate error + Err(rx_error.recv().await) + }, + }, + error = rx_error.recv() => { + // We got an error, let's see if we also got a success + match rx_success.recv().await { + Some(success) => Ok(success), + None => Err(error), + } } + }; + + // Now that we have our results, stop running the other + // tasks, unless we're doing an all-nodes broadcast. + if !all_nodes { + set.abort_all(); } - let (err, grpc_url) = match first_error { - Some(pair) => pair, - None => ( - QueryErrorDetails::ConnectionError(ConnectionError::NoHealthyFound), - cosmos.get_cosmos_builder().grpc_url_arc().clone(), - ), - }; - Err(QueryError { - action, - builder: cosmos.pool.builder.clone(), - height: cosmos.height, - query: err, - grpc_url, - node_health: cosmos.pool.node_chooser.health_report(), + res.map_err(|first_error| { + let (err, grpc_url) = match first_error { + Some(pair) => pair, + None => ( + QueryErrorDetails::ConnectionError(ConnectionError::NoHealthyFound), + cosmos.get_cosmos_builder().grpc_url_arc().clone(), + ), + }; + QueryError { + action, + builder: cosmos.pool.builder.clone(), + height: cosmos.height, + query: err, + grpc_url, + node_health: cosmos.pool.node_chooser.health_report(), + } }) } diff --git a/packages/cosmos/src/cosmos_builder.rs b/packages/cosmos/src/cosmos_builder.rs index 618fb4d..a6a708a 100644 --- a/packages/cosmos/src/cosmos_builder.rs +++ b/packages/cosmos/src/cosmos_builder.rs @@ -49,6 +49,7 @@ pub struct CosmosBuilder { http2_keep_alive_interval: Option, keep_alive_while_idle: Option, simulate_with_gas_coin: bool, + delay_before_fallback: Option, } impl CosmosBuilder { @@ -94,6 +95,7 @@ impl CosmosBuilder { http2_keep_alive_interval: None, keep_alive_while_idle: None, simulate_with_gas_coin, + delay_before_fallback: None, } } @@ -503,6 +505,19 @@ impl CosmosBuilder { pub fn set_simulate_with_gas_coin(&mut self, value: bool) { self.simulate_with_gas_coin = value; } + + /// How long to delay between each fallback node attempt? + /// + /// Default: 500ms + pub fn get_delay_before_fallback(&self) -> tokio::time::Duration { + self.delay_before_fallback + .unwrap_or(tokio::time::Duration::from_millis(500)) + } + + /// See [Self::get_delay_before_fallback] + pub fn set_delay_before_fallback(&mut self, delay: tokio::time::Duration) { + self.delay_before_fallback = Some(delay); + } } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] From 25308a20878304914abfd755848c99af727f5b82 Mon Sep 17 00:00:00 2001 From: Michael Snoyman Date: Tue, 25 Jun 2024 14:49:24 +0300 Subject: [PATCH 2/3] Clarify retries in debug logs --- packages/cosmos/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cosmos/src/client.rs b/packages/cosmos/src/client.rs index c90cde4..2446197 100644 --- a/packages/cosmos/src/client.rs +++ b/packages/cosmos/src/client.rs @@ -411,7 +411,7 @@ impl Cosmos { break; } Err((err, can_retry)) => { - tracing::debug!("Error performing a query, retrying. Attempt {attempt} of {total_attempts}. {err:?}"); + tracing::debug!("Error performing a query. Attempt {attempt} of {total_attempts}. can_retry={can_retry}. should_retry={should_retry}. {err:?}"); node.log_query_result(if can_retry { QueryResult::NetworkError { err: err.clone(), From 89f8b141a5a3d24553ab55c494495888668efdf6 Mon Sep 17 00:00:00 2001 From: Michael Snoyman Date: Wed, 26 Jun 2024 09:37:06 +0300 Subject: [PATCH 3/3] Rename to all_nodes_broadcast --- packages/cosmos/src/client.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/cosmos/src/client.rs b/packages/cosmos/src/client.rs index 2446197..ab6d741 100644 --- a/packages/cosmos/src/client.rs +++ b/packages/cosmos/src/client.rs @@ -365,12 +365,13 @@ impl Cosmos { let (tx_error, mut rx_error) = tokio::sync::mpsc::channel(1); // Grab some config values. - let all_nodes = all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast(); + let all_nodes_broadcast = + all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast(); let delay = cosmos.get_cosmos_builder().get_delay_before_fallback(); let total_attempts = cosmos.pool.builder.query_retries(); // Get the set of nodes we should run against. - let nodes = if all_nodes { + let nodes = if all_nodes_broadcast { cosmos .pool .all_nodes() @@ -433,7 +434,7 @@ impl Cosmos { // If we're not doing an all-nodes broadcast, then check we wait-and-check. // The waiting is handled by the sleep call. If a successful response // comes in while timeout-ing, then we'll get that result and return it. - if !all_nodes { + if !all_nodes_broadcast { tokio::select! { // Timeout occurred, keep going _ = tokio::time::sleep(delay) => (), @@ -477,7 +478,7 @@ impl Cosmos { // Now that we have our results, stop running the other // tasks, unless we're doing an all-nodes broadcast. - if !all_nodes { + if !all_nodes_broadcast { set.abort_all(); }