Skip to content

Commit

Permalink
Merge pull request #30 from fpco/use-all-nodes-successes-better
Browse files Browse the repository at this point in the history
Better use all-nodes successes
  • Loading branch information
snoyberg authored Jun 20, 2024
2 parents fc888a6 + a5022ed commit c4f9d09
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions packages/cosmos/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,14 +342,14 @@ impl Cosmos {
// first we kick off a task to broadcast to all nodes.
// This is redundant with our broadcasts below, perhaps in the future
// we'll look at optimizing this further.
let all_nodes_success = Arc::new(parking_lot::Mutex::new(None));
if all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast() {
let all_nodes_handle = if all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast()
{
tracing::debug!("Initiating all-nodes broadcast");
let cosmos = cosmos.clone();
let req = req.clone();
let all_nodes_success = all_nodes_success.clone();
tokio::spawn(async move {
Some(tokio::spawn(async move {
let all_nodes = cosmos.pool.all_nodes();
let mut success = None;
for node in all_nodes {
let _permit = cosmos.pool.get_node_permit().await;
match node.node_health_level() {
Expand All @@ -363,9 +363,8 @@ impl Cosmos {
"Successfully performed an all-nodes broadcast to {}",
node.grpc_url(),
);
let mut guard = all_nodes_success.lock();
if guard.is_none() {
*guard = Some(PerformQueryWrapper {
if success.is_none() {
success = Some(PerformQueryWrapper {
grpc_url: node.grpc_url().clone(),
tonic,
});
Expand All @@ -376,8 +375,11 @@ impl Cosmos {
}
}
}
});
}
success
}))
} else {
None
};

let nodes = cosmos.pool.node_chooser.choose_nodes();
let mut first_error = None;
Expand Down Expand Up @@ -428,8 +430,10 @@ impl Cosmos {
}

// If we did an all-nodes broadcast and one was successful, use it.
if let Some(res) = all_nodes_success.lock().take() {
return Ok(res);
if let Some(handle) = all_nodes_handle {
if let Ok(Some(res)) = handle.await {
return Ok(res);
}
}

let (err, grpc_url) = match first_error {
Expand Down

0 comments on commit c4f9d09

Please sign in to comment.