Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added custom reconnect service #514

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cw-orch-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ uid = "0.1.7"
toml = "0.8"
http = "1.1.0"
libc-print = "0.1.23"
tower = { version = "0.5.1", features = ["reconnect"] }
pin-project-lite = "0.2.15"
futures-core = "0.3.31"
futures = "0.3.31"
futures-util = "0.3.31"
hyper = "1.5.0"
http-body-util = "0.1.2"
http-body = "1.0.1"

[dev-dependencies]
cw-orch-daemon = { path = "." }
Expand Down
4 changes: 2 additions & 2 deletions cw-orch-daemon/examples/manual_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use cosmrs::{AccountId, Any};
use cosmwasm_std::Addr;
use cw_orch::prelude::*;
use cw_orch_core::environment::ChainInfoOwned;
use cw_orch_daemon::Channel;
use prost::Message;
use std::io::{self, Write};
use std::sync::Arc;
use tonic::transport::Channel;

// ANCHOR: full_counter_example
use counter_contract::CounterContract;
Expand Down Expand Up @@ -85,7 +85,7 @@ impl QuerySender for ManualSender {
type Error = DaemonError;
type Options = ManualSenderOptions;

fn channel(&self) -> tonic::transport::Channel {
fn channel(&self) -> Channel {
self.grpc_channel.clone()
}
}
Expand Down
9 changes: 9 additions & 0 deletions cw-orch-daemon/examples/querier-daemon.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// ANCHOR: full_counter_example

use std::{thread::sleep, time::Duration};

use cw_orch::{anyhow, prelude::*};
use cw_orch_daemon::senders::QueryOnlyDaemon;

Expand All @@ -18,5 +20,12 @@ pub fn main() -> anyhow::Result<()> {
.balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?;
assert!(!balances.is_empty());

sleep(Duration::from_secs(10));
log::info!("Resuming queries");
let balances = chain
.bank_querier()
.balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?;
assert!(!balances.is_empty());

Ok(())
}
34 changes: 31 additions & 3 deletions cw-orch-daemon/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@ use cosmrs::proto::cosmos::base::tendermint::v1beta1::{
};
use cw_orch_core::{environment::ChainInfoOwned, log::connectivity_target};
use http::Uri;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
use tonic::transport::{ClientTlsConfig, Endpoint};
use tower::ServiceBuilder;

use super::error::DaemonError;
use crate::service::reconnect::{ChannelCreationArgs, ChannelFactory, Reconnect};
use crate::service::retry::{Retry, RetryAttemps, RetryLayer};

/// A helper for constructing a gRPC channel
pub struct GrpcChannel {}

pub type Channel = Reconnect<ChannelFactory, ChannelCreationArgs>;
pub type TowerChannel = Retry<RetryAttemps, tonic::transport::Channel>;

impl GrpcChannel {
/// Connect to any of the provided gRPC endpoints
pub async fn connect(grpc: &[String], chain_id: &str) -> Result<Channel, DaemonError> {
pub async fn get_channel(grpc: &[String], chain_id: &str) -> Result<TowerChannel, DaemonError> {
if grpc.is_empty() {
return Err(DaemonError::GRPCListIsEmpty);
}
Expand Down Expand Up @@ -66,13 +72,35 @@ impl GrpcChannel {
return Err(DaemonError::CannotConnectGRPC);
}

Ok(successful_connections.pop().unwrap())
let retry_policy = RetryAttemps::count(3);
let retry_layer = RetryLayer::new(retry_policy);

let service = ServiceBuilder::new()
.layer(retry_layer)
.service(successful_connections.pop().unwrap());

Ok(service)
}

pub async fn connect(grpc: &[String], chain_id: &str) -> Result<Channel, DaemonError> {
let target = (grpc.to_vec(), chain_id.to_string());
let channel = Reconnect::new(ChannelFactory {}, target).with_attemps(3);
Self::verify_connection(channel.clone()).await?;
Ok(channel)
}

/// Create a gRPC channel from the chain info
pub async fn from_chain_info(chain_info: &ChainInfoOwned) -> Result<Channel, DaemonError> {
GrpcChannel::connect(&chain_info.grpc_urls, &chain_info.chain_id).await
}

async fn verify_connection(channel: Channel) -> Result<(), DaemonError> {
let mut client = ServiceClient::new(channel.clone());

// Verify that we're able to query the node info
client.get_node_info(GetNodeInfoRequest {}).await?;
Ok(())
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{
cosmos_modules, error::DaemonError, queriers::Node, senders::Wallet, tx_resp::CosmTxResponse,
};
use crate::Channel;
use crate::{
queriers::CosmWasm,
senders::{builder::SenderBuilder, query::QuerySender, tx::TxSender},
Expand Down Expand Up @@ -31,7 +32,6 @@ use std::{
str::{from_utf8, FromStr},
time::Duration,
};
use tonic::transport::Channel;

pub const INSTANTIATE_2_TYPE_URL: &str = "/cosmwasm.wasm.v1.MsgInstantiateContract2";

Expand Down
1 change: 1 addition & 0 deletions cw-orch-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod keys;
pub mod live_mock;
pub mod queriers;
pub mod senders;
pub mod service;
pub mod tx_broadcaster;
pub mod tx_builder;

Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/live_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::queriers::Bank;
use crate::queriers::CosmWasm;
use crate::queriers::Staking;
use crate::Channel;
use crate::RUNTIME;
use cosmwasm_std::testing::{MockApi, MockStorage};
use cosmwasm_std::Addr;
Expand All @@ -24,7 +25,6 @@ use cw_orch_core::environment::ChainInfoOwned;
use cw_orch_core::environment::WasmQuerier;
use std::marker::PhantomData;
use std::str::FromStr;
use tonic::transport::Channel;

use crate::channel::GrpcChannel;

Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/queriers/authz.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::Channel;
use crate::{cosmos_modules, error::DaemonError, Daemon};
use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest;
use cosmwasm_std::Addr;
use cw_orch_core::environment::{Querier, QuerierGetter};
use tokio::runtime::Handle;
use tonic::transport::Channel;

/// Queries for Cosmos AuthZ Module
/// All the async function are prefixed with `_`
Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/queriers/bank.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::Channel;
use crate::{cosmos_modules, error::DaemonError, senders::query::QuerySender, DaemonBase};
use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest;
use cosmwasm_std::{Addr, Coin, StdError};
use cw_orch_core::environment::{BankQuerier, Querier, QuerierGetter};
use tokio::runtime::Handle;
use tonic::transport::Channel;

/// Queries for Cosmos Bank Module
/// All the async function are prefixed with `_`
Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/queriers/cosmwasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{marker::PhantomData, str::FromStr};

use crate::senders::query::QuerySender;
use crate::senders::QueryOnlySender;
use crate::Channel;
use crate::{cosmos_modules, error::DaemonError, DaemonBase};
use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest;
use cosmrs::AccountId;
Expand All @@ -15,7 +16,6 @@ use cw_orch_core::{
environment::{Querier, QuerierGetter, WasmQuerier},
};
use tokio::runtime::Handle;
use tonic::transport::Channel;

/// Querier for the CosmWasm SDK module
/// All the async function are prefixed with `_`
Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/queriers/feegrant.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::Channel;
use crate::{cosmos_modules, error::DaemonError, Daemon};
use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest;
use cosmwasm_std::Addr;
use cw_orch_core::environment::{Querier, QuerierGetter};
use tokio::runtime::Handle;
use tonic::transport::Channel;

/// Querier for the Cosmos Gov module
/// All the async function are prefixed with `_`
Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/queriers/gov.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::Channel;
use crate::{cosmos_modules, error::DaemonError, Daemon};
use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest;
use cosmwasm_std::Addr;
use cw_orch_core::environment::{Querier, QuerierGetter};
use tokio::runtime::Handle;
use tonic::transport::Channel;

/// Querier for the Cosmos Gov module
/// All the async function are prefixed with `_`
Expand Down
3 changes: 1 addition & 2 deletions cw-orch-daemon/src/queriers/ibc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::Channel;
use crate::{cosmos_modules, error::DaemonError, Daemon};
use cosmos_modules::ibc_channel;
use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest;
Expand All @@ -13,8 +14,6 @@ use cosmrs::proto::ibc::{
use cw_orch_core::environment::{Querier, QuerierGetter};
use prost::Message;
use tokio::runtime::Handle;
use tonic::transport::Channel;

/// Querier for the Cosmos IBC module
/// All the async function are prefixed with `_`
pub struct Ibc {
Expand Down
3 changes: 1 addition & 2 deletions cw-orch-daemon/src/queriers/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
tx_resp::CosmTxResponse, DaemonBase,
};

use crate::Channel;
use cosmrs::{
proto::cosmos::{
base::query::v1beta1::PageRequest,
Expand All @@ -18,8 +19,6 @@ use cw_orch_core::{
log::query_target,
};
use tokio::runtime::Handle;
use tonic::transport::Channel;

/// Querier for the Tendermint node.
/// Supports queries for block and tx information
/// All the async function are prefixed with `_`
Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/queriers/staking.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fmt::Display;

use crate::Channel;
use crate::{cosmos_modules, error::DaemonError, Daemon};
use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest;
use cosmwasm_std::{Addr, StdError};
use cw_orch_core::environment::{Querier, QuerierGetter};
use tokio::runtime::Handle;
use tonic::transport::Channel;

use super::bank::cosmrs_to_cosmwasm_coin;

Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/senders/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::{
sign::{Signer, SigningAccount},
tx::TxSender,
};
use crate::Channel;
use crate::{
core::parse_cw_coins,
cosmos_modules::{self, auth::BaseAccount},
Expand Down Expand Up @@ -33,7 +34,6 @@ use cw_orch_core::{
CoreEnvVars, CwEnvError,
};
use std::{str::FromStr, sync::Arc};
use tonic::transport::Channel;

#[cfg(feature = "eth")]
use crate::proto::injective::InjectiveSigner;
Expand Down
4 changes: 2 additions & 2 deletions cw-orch-daemon/src/senders/cosmos_batch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{DaemonBase, INSTANTIATE_2_TYPE_URL};
use crate::{Channel, DaemonBase, INSTANTIATE_2_TYPE_URL};

use crate::{error::DaemonError, tx_resp::CosmTxResponse};

Expand Down Expand Up @@ -86,7 +86,7 @@ impl QuerySender for CosmosBatchSender {
type Error = DaemonError;
type Options = CosmosBatchOptions;

fn channel(&self) -> tonic::transport::Channel {
fn channel(&self) -> Channel {
self.sender.channel()
}
}
Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/senders/query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tonic::transport::Channel;
use crate::Channel;

use crate::DaemonError;

Expand Down
2 changes: 1 addition & 1 deletion cw-orch-daemon/src/senders/query_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{error::DaemonError, DaemonBase, GrpcChannel};

use cw_orch_core::environment::ChainInfoOwned;

use tonic::transport::Channel;
use crate::Channel;

use super::{builder::SenderBuilder, query::QuerySender};

Expand Down
39 changes: 39 additions & 0 deletions cw-orch-daemon/src/service/attempts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#[derive(Clone)]
pub enum Attempts {
Unlimited,
Count(usize),
}

impl std::fmt::Display for Attempts {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Attempts::Unlimited => write!(f, "unlimited")?,
Attempts::Count(count) => write!(f, "{}", count)?,
}
Ok(())
}
}

impl Attempts {
pub fn can_retry(&self) -> bool {
match self {
Attempts::Unlimited => true,
Attempts::Count(count) => *count > 0,
}
}

/// Verifies the attempt can retry
/// If it can retry, decrements the counter
pub fn retry(&mut self) -> bool {
let can_retry = self.can_retry();
if can_retry {
self.decrement();
}
can_retry
}
fn decrement(&mut self) {
if let Attempts::Count(count) = self {
*count -= 1
}
}
}
3 changes: 3 additions & 0 deletions cw-orch-daemon/src/service/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod attempts;
pub mod reconnect;
pub mod retry;
28 changes: 28 additions & 0 deletions cw-orch-daemon/src/service/reconnect/factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::{future::Future, pin::Pin};

use tower::Service;

use crate::{DaemonError, GrpcChannel, TowerChannel};

pub type ChannelCreationArgs = (Vec<String>, String);
#[derive(Clone)]
pub struct ChannelFactory {}

impl Service<ChannelCreationArgs> for ChannelFactory {
type Response = TowerChannel;

type Error = DaemonError;

type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}

fn call(&mut self, req: ChannelCreationArgs) -> Self::Future {
Box::pin(async move { GrpcChannel::get_channel(req.0.as_ref(), &req.1).await })
}
}
Loading
Loading