Skip to content

Commit

Permalink
Merge pull request #32 from fpco/timeout-protection
Browse files Browse the repository at this point in the history
Timeout protection, overall better fallback/retry logic
  • Loading branch information
snoyberg authored Jun 26, 2024
2 parents b570ed7 + 89f8b14 commit d4c00cf
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 95 deletions.
238 changes: 143 additions & 95 deletions packages/cosmos/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use cosmos_sdk_proto::{
traits::Message,
};
use parking_lot::{Mutex, RwLock};
use tokio::time::Instant;
use tokio::{task::JoinSet, time::Instant};
use tonic::{service::Interceptor, Status};

use crate::{
Expand Down Expand Up @@ -338,118 +338,166 @@ impl Cosmos {
all_nodes,
}: PerformQueryBuilder<'_, Request>,
) -> Result<PerformQueryWrapper<Request::Response>, QueryError> {
// If we're broadcasting a transaction and want to use all nodes,
// first we kick off a task to broadcast to all nodes.
// This is redundant with our broadcasts below, perhaps in the future
// we'll look at optimizing this further.
let all_nodes_handle = if all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast()
{
tracing::debug!("Initiating all-nodes broadcast");
let cosmos = cosmos.clone();
// This function is responsible for running queries against blockchain nodes.
// There are two primary ways of operating:
//
// All nodes: this is used when broadcasting transactions. The idea is that we
// want to broadcast to _all_ non-blocked nodes, since sometimes some nodes are
// unable to rebroadcast transactions from their mempool over P2P. In this case,
// we want to broadcast to all nodes immediately, return from this function as
// soon as the first success comes through, and let broadcasts to other nodes
// continue in the background.
//
// Regular: for everything else, we don't want to spam all nodes with every
// request. Instead, we get a priority list of the healthiest nodes and try them
// in order. We delay each successive node by a configurable amount of time to
// allow the earlier nodes to complete. We want to return from this function as
// soon as the first success comes through, but we want to cancel all remaining
// work at that point.
//
// Regardless, we track all of our tasks in a JoinSet.
let mut set = JoinSet::new();

// Set up channels for the individual workers to send back either success or
// error results. We keep separate channels, since we may want to optimistically
// exit on an early success, but likely want to wait for all nodes on failure.
let (tx_success, mut rx_success) = tokio::sync::mpsc::channel(1);
let (tx_error, mut rx_error) = tokio::sync::mpsc::channel(1);

// Grab some config values.
let all_nodes_broadcast =
all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast();
let delay = cosmos.get_cosmos_builder().get_delay_before_fallback();
let total_attempts = cosmos.pool.builder.query_retries();

// Get the set of nodes we should run against.
let nodes = if all_nodes_broadcast {
cosmos
.pool
.all_nodes()
.filter(|node| match node.node_health_level() {
crate::error::NodeHealthLevel::Unblocked { error_count: _ } => true,
crate::error::NodeHealthLevel::Blocked => false,
})
.cloned()
.collect()
} else {
cosmos.pool.node_chooser.choose_nodes()
};

if cosmos.pool.builder.get_log_requests() {
tracing::info!("{action}");
}

for node in nodes {
// Cloning for passing into the async move
let tx_success = tx_success.clone();
let tx_error = tx_error.clone();
let action = action.clone();
let req = req.clone();
Some(tokio::spawn(async move {
let all_nodes = cosmos.pool.all_nodes();
let mut success = None;
for node in all_nodes {
let cosmos = cosmos.clone();
let (tx_complete, rx_complete) = tokio::sync::oneshot::channel();
set.spawn(async move {
for attempt in 1..=total_attempts {
let _permit = cosmos.pool.get_node_permit().await;
match node.node_health_level() {
crate::error::NodeHealthLevel::Unblocked { error_count: _ } => {}
// // Don't use a node which is rate limiting us
crate::error::NodeHealthLevel::Blocked => continue,
}
match cosmos.perform_query_inner(req.clone(), node).await {
match cosmos.perform_query_inner(req.clone(), &node).await {
Ok(tonic) => {
tracing::debug!(
"Successfully performed an all-nodes broadcast to {}",
node.grpc_url(),
);
if success.is_none() {
success = Some(PerformQueryWrapper {
node.log_query_result(QueryResult::Success);
tx_success
.try_send(PerformQueryWrapper {
grpc_url: node.grpc_url().clone(),
tonic,
});
}
})
.ok();
break;
}
Err((err, _)) => {
tracing::debug!("Failed doing an all-nodes broadcast: {err}")
Err((err, can_retry)) => {
tracing::debug!("Error performing a query. Attempt {attempt} of {total_attempts}. can_retry={can_retry}. should_retry={should_retry}. {err:?}");
node.log_query_result(if can_retry {
QueryResult::NetworkError {
err: err.clone(),
action: action.clone(),
}
} else {
QueryResult::OtherError
});
tx_error.try_send((err, node.grpc_url().clone())).ok();
if !can_retry || !should_retry {
break;
}
}
}
}
success
}))
} else {
None
};

let nodes = cosmos.pool.node_chooser.choose_nodes();
let mut first_error = None;
let total_attempts = cosmos.pool.builder.query_retries();

// We want to keep trying up to our total attempts amount.
// We're willing to reuse nodes that we had tried previously.
let nodes = nodes.into_iter().cycle().take(total_attempts);

for (idx, node) in nodes.enumerate() {
let _permit = cosmos.pool.get_node_permit().await;
if cosmos.pool.builder.get_log_requests() {
tracing::info!("{action}");
}
tx_complete.send(()).ok();
});

match cosmos.perform_query_inner(req.clone(), &node).await {
Ok(x) => {
node.log_query_result(QueryResult::Success);
return Ok(PerformQueryWrapper {
grpc_url: node.grpc_url().clone(),
tonic: x,
});
}
Err((err, can_retry)) => {
node.log_query_result(if can_retry {
QueryResult::NetworkError {
err: err.clone(),
action: action.clone(),
// If we're not doing an all-nodes broadcast, then check we wait-and-check.
// The waiting is handled by the sleep call. If a successful response
// comes in while timeout-ing, then we'll get that result and return it.
if !all_nodes_broadcast {
tokio::select! {
// Timeout occurred, keep going
_ = tokio::time::sleep(delay) => (),
// Child task finished, also keep going, don't wait for the timeout
_ = rx_complete => (),
// We got a success, let's use it!
success = rx_success.recv() => match success {
// rx_success yielded a value, stop doing other work and return the success
Some(success) => {
set.abort_all();
return Ok(success);
}
} else {
QueryResult::OtherError
});

tracing::debug!(
"Error performing a query, retrying. Attempt {} of {total_attempts}. {err:?}",
idx + 1,
);

if first_error.is_none() {
first_error = Some((err, node.grpc_url().clone()));
}

if !can_retry || !should_retry {
break;
// All send channels are closed, which should never happen at this point
None => unreachable!(),
}
}
};
}
}

// If we did an all-nodes broadcast and one was successful, use it.
if let Some(handle) = all_nodes_handle {
if let Ok(Some(res)) = handle.await {
return Ok(res);
// Drop the remaining send side of channels to avoid deadlocks while waiting below.
std::mem::drop(tx_success);
std::mem::drop(tx_error);

// Now we wait for either a success or error to come back.
let res = tokio::select! {
success = rx_success.recv() => match success {
Some(success) => Ok(success),
None => {
// No successes, but let's see if we can get a more accurate error
Err(rx_error.recv().await)
},
},
error = rx_error.recv() => {
// We got an error, let's see if we also got a success
match rx_success.recv().await {
Some(success) => Ok(success),
None => Err(error),
}
}
};

// Now that we have our results, stop running the other
// tasks, unless we're doing an all-nodes broadcast.
if !all_nodes_broadcast {
set.abort_all();
}

let (err, grpc_url) = match first_error {
Some(pair) => pair,
None => (
QueryErrorDetails::ConnectionError(ConnectionError::NoHealthyFound),
cosmos.get_cosmos_builder().grpc_url_arc().clone(),
),
};
Err(QueryError {
action,
builder: cosmos.pool.builder.clone(),
height: cosmos.height,
query: err,
grpc_url,
node_health: cosmos.pool.node_chooser.health_report(),
res.map_err(|first_error| {
let (err, grpc_url) = match first_error {
Some(pair) => pair,
None => (
QueryErrorDetails::ConnectionError(ConnectionError::NoHealthyFound),
cosmos.get_cosmos_builder().grpc_url_arc().clone(),
),
};
QueryError {
action,
builder: cosmos.pool.builder.clone(),
height: cosmos.height,
query: err,
grpc_url,
node_health: cosmos.pool.node_chooser.health_report(),
}
})
}

Expand Down
15 changes: 15 additions & 0 deletions packages/cosmos/src/cosmos_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct CosmosBuilder {
http2_keep_alive_interval: Option<Duration>,
keep_alive_while_idle: Option<bool>,
simulate_with_gas_coin: bool,
delay_before_fallback: Option<tokio::time::Duration>,
}

impl CosmosBuilder {
Expand Down Expand Up @@ -94,6 +95,7 @@ impl CosmosBuilder {
http2_keep_alive_interval: None,
keep_alive_while_idle: None,
simulate_with_gas_coin,
delay_before_fallback: None,
}
}

Expand Down Expand Up @@ -503,6 +505,19 @@ impl CosmosBuilder {
pub fn set_simulate_with_gas_coin(&mut self, value: bool) {
self.simulate_with_gas_coin = value;
}

/// How long to delay between each fallback node attempt?
///
/// Default: 500ms
pub fn get_delay_before_fallback(&self) -> tokio::time::Duration {
self.delay_before_fallback
.unwrap_or(tokio::time::Duration::from_millis(500))
}

/// See [Self::get_delay_before_fallback]
pub fn set_delay_before_fallback(&mut self, delay: tokio::time::Duration) {
self.delay_before_fallback = Some(delay);
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
Expand Down

0 comments on commit d4c00cf

Please sign in to comment.