Skip to content

Commit

Permalink
Perform all-nodes broadcasts
Browse files Browse the repository at this point in the history
  • Loading branch information
snoyberg committed Jun 6, 2024
1 parent c6c91bd commit 3e5ad5a
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 5 deletions.
36 changes: 36 additions & 0 deletions packages/cosmos/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub(crate) struct PerformQueryBuilder<'a, Request> {
req: Request,
action: Action,
should_retry: bool,
all_nodes: bool,
}

impl<Request: GrpcRequest> PerformQueryBuilder<'_, Request> {
Expand All @@ -163,6 +164,11 @@ impl<Request: GrpcRequest> PerformQueryBuilder<'_, Request> {
self.should_retry = false;
self
}

fn all_nodes(mut self) -> Self {
self.all_nodes = true;
self
}
}

pub(crate) struct PerformQueryWrapper<Res> {
Expand Down Expand Up @@ -320,6 +326,7 @@ impl Cosmos {
req,
action,
should_retry: true,
all_nodes: false,
}
}

Expand All @@ -329,6 +336,7 @@ impl Cosmos {
req,
action,
should_retry,
all_nodes,
}: PerformQueryBuilder<'_, Request>,
) -> Result<PerformQueryWrapper<Request::Response>, QueryError> {
let mut attempt = 0;
Expand All @@ -344,6 +352,32 @@ impl Cosmos {
if cosmos.pool.builder.get_log_requests() {
tracing::info!("{action}");
}
if all_nodes && cosmos.get_cosmos_builder().get_all_nodes_broadcast() {
tracing::debug!("Initiating all-nodes broadcast");
let cosmos = cosmos.clone();
let req = req.clone();
let grpc_url = cosmos_inner.grpc_url().clone();
tokio::spawn(async move {
let mut all_nodes = cosmos.pool.all_nodes();
while let Some(mut guard) = all_nodes.next().await {
let cosmos_inner = guard.get_inner_mut();
if cosmos_inner.grpc_url() == &grpc_url {
continue;
}
match cosmos.perform_query_inner(req.clone(), cosmos_inner).await {
Ok(_) => tracing::debug!(
"Successfully performed an all-nodes broadcast to {}",
cosmos_inner.grpc_url(),
),
Err((err, _)) => {
tracing::debug!(
"Failed doing an all-nodes broadcast: {err}"
)
}
}
}
});
}
match cosmos.perform_query_inner(req.clone(), cosmos_inner).await {
Ok(x) => {
cosmos_inner.log_query_result(QueryResult::Success);
Expand All @@ -366,6 +400,7 @@ impl Cosmos {
}
}
};

if attempt >= cosmos.pool.builder.query_retries() || !should_retry || !can_retry {
break Err(QueryError {
action,
Expand Down Expand Up @@ -1577,6 +1612,7 @@ impl TxBuilder {
},
mk_action(),
)
.all_nodes()
.run()
.await?;
let res = tonic.into_inner().tx_response.ok_or_else(|| {
Expand Down
23 changes: 21 additions & 2 deletions packages/cosmos/src/client/node_chooser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ impl NodeChooser {
}
}

pub(super) fn all_nodes(&self) -> impl Iterator<Item = &Node> {
std::iter::once(&*self.primary).chain(self.fallbacks.iter())
pub(super) fn all_nodes(&self) -> AllNodes {
AllNodes {
primary: Some(&self.primary),
fallbacks: self.fallbacks.iter(),
}
}
}

Expand All @@ -78,3 +81,19 @@ pub(crate) enum QueryResult {
},
OtherError,
}

pub(crate) struct AllNodes<'a> {
primary: Option<&'a Node>,
fallbacks: std::slice::Iter<'a, Node>,
}

impl<'a> Iterator for AllNodes<'a> {
type Item = &'a Node;

fn next(&mut self) -> Option<Self::Item> {
match self.primary.take() {
Some(primary) => Some(primary),
None => self.fallbacks.next(),
}
}
}
32 changes: 31 additions & 1 deletion packages/cosmos/src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use crate::{
CosmosBuilder,
};

use super::{node::Node, node_chooser::NodeChooser};
use super::{
node::Node,
node_chooser::{AllNodes, NodeChooser},
};

#[derive(Clone)]
pub(super) struct Pool {
Expand Down Expand Up @@ -54,6 +57,13 @@ impl Pool {
})
}

pub(super) fn all_nodes(&self) -> AllNodeGuards {
AllNodeGuards {
pool: self,
all_nodes: self.node_chooser.all_nodes(),
}
}

pub(crate) async fn get_with_node(&self, node: &Node) -> Result<NodeGuard, ConnectionError> {
let permit = self
.semaphore
Expand All @@ -68,3 +78,23 @@ impl Pool {
})
}
}

pub(crate) struct AllNodeGuards<'a> {
pool: &'a Pool,
all_nodes: AllNodes<'a>,
}

impl AllNodeGuards<'_> {
pub(crate) async fn next(&mut self) -> Option<NodeGuard> {
let inner = self.all_nodes.next()?.clone();
let _permit = self
.pool
.semaphore
.clone()
.acquire_owned()
.await
.expect("AllNodeGuards::next: semaphore has been closed");

Some(NodeGuard { inner, _permit })
}
}
4 changes: 2 additions & 2 deletions packages/cosmos/src/client/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::osmosis::{
use super::node::Node;

#[async_trait]
pub(crate) trait GrpcRequest: Clone + Sized {
type Response;
pub(crate) trait GrpcRequest: Clone + Sized + Send + 'static {
type Response: Send;

async fn perform(
req: tonic::Request<Self>,
Expand Down
17 changes: 17 additions & 0 deletions packages/cosmos/src/cosmos_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct CosmosBuilder {
rate_limit_per_second: Option<u64>,
log_requests: Option<bool>,
max_decoding_message_size: Option<usize>,
all_nodes_broadcast: bool,
}

impl CosmosBuilder {
Expand Down Expand Up @@ -86,6 +87,7 @@ impl CosmosBuilder {
is_fast_chain: matches!(hrp.as_str(), "sei" | "inj"),
log_requests: None,
max_decoding_message_size: None,
all_nodes_broadcast: true,
}
}

Expand Down Expand Up @@ -453,6 +455,21 @@ impl CosmosBuilder {
pub fn set_max_decoding_message_size(&mut self, max_decoding_message_size: usize) {
self.max_decoding_message_size = Some(max_decoding_message_size);
}

/// When broadcasting transactions, should we also broadcast to all fallback nodes?
///
/// This is intended to work around cases where broadcasting to the primary
/// node is failing, but other kinds of queries are working.
///
/// Default: [true]
pub fn get_all_nodes_broadcast(&self) -> bool {
self.all_nodes_broadcast
}

/// See [Self::get_all_nodes_broadcast]
pub fn set_all_nodes_broadcast(&mut self, value: bool) {
self.all_nodes_broadcast = value;
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
Expand Down

0 comments on commit 3e5ad5a

Please sign in to comment.