From e1f447ce7c6f123b0dd3a736eb8039b471c7d725 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Tue, 29 Oct 2024 16:09:33 +0000 Subject: [PATCH 1/9] Added custom reconnect service --- cw-orch-daemon/Cargo.toml | 2 + cw-orch-daemon/examples/manual_sender.rs | 6 +- cw-orch-daemon/examples/querier-daemon.rs | 12 ++ cw-orch-daemon/src/channel.rs | 21 +- cw-orch-daemon/src/core.rs | 2 +- cw-orch-daemon/src/lib.rs | 1 + cw-orch-daemon/src/live_mock.rs | 12 +- cw-orch-daemon/src/queriers/authz.rs | 2 +- cw-orch-daemon/src/queriers/bank.rs | 2 +- cw-orch-daemon/src/queriers/cosmwasm.rs | 2 +- cw-orch-daemon/src/queriers/feegrant.rs | 2 +- cw-orch-daemon/src/queriers/gov.rs | 2 +- cw-orch-daemon/src/queriers/ibc.rs | 3 +- cw-orch-daemon/src/queriers/node.rs | 3 +- cw-orch-daemon/src/queriers/staking.rs | 2 +- cw-orch-daemon/src/senders/cosmos.rs | 4 +- cw-orch-daemon/src/senders/cosmos_batch.rs | 4 +- cw-orch-daemon/src/senders/query.rs | 2 +- cw-orch-daemon/src/senders/query_only.rs | 4 +- cw-orch-daemon/src/service/factory.rs | 28 +++ cw-orch-daemon/src/service/future.rs | 74 +++++++ cw-orch-daemon/src/service/mod.rs | 3 + cw-orch-daemon/src/service/reconnect.rs | 190 ++++++++++++++++++ cw-orch-daemon/src/sync/core.rs | 2 +- .../interchain/interchain-core/src/channel.rs | 27 ++- .../interchain-core/src/ibc_query.rs | 5 +- .../interchain/interchain-daemon/src/error.rs | 2 +- .../interchain-daemon/src/ibc_tracker.rs | 7 +- .../interchain-daemon/src/interchain_env.rs | 2 +- .../interchain-daemon/src/packet_inspector.rs | 4 +- packages/interchain/proto/Cargo.toml | 1 + packages/interchain/proto/src/tokenfactory.rs | 2 +- 32 files changed, 391 insertions(+), 44 deletions(-) create mode 100644 cw-orch-daemon/src/service/factory.rs create mode 100644 cw-orch-daemon/src/service/future.rs create mode 100644 cw-orch-daemon/src/service/mod.rs create mode 100644 cw-orch-daemon/src/service/reconnect.rs diff --git a/cw-orch-daemon/Cargo.toml b/cw-orch-daemon/Cargo.toml index 23c7feea0..95e607ead 100644 --- a/cw-orch-daemon/Cargo.toml +++ b/cw-orch-daemon/Cargo.toml @@ -78,6 +78,8 @@ uid = "0.1.7" toml = "0.8" http = "1.1.0" libc-print = "0.1.23" +tower = { version = "0.5.1", features = ["reconnect"] } +pin-project-lite = "0.2.15" [dev-dependencies] cw-orch-daemon = { path = "." } diff --git a/cw-orch-daemon/examples/manual_sender.rs b/cw-orch-daemon/examples/manual_sender.rs index 9c9b4fdc4..90ea0a439 100644 --- a/cw-orch-daemon/examples/manual_sender.rs +++ b/cw-orch-daemon/examples/manual_sender.rs @@ -18,10 +18,10 @@ use cosmrs::{AccountId, Any}; use cosmwasm_std::Addr; use cw_orch::prelude::*; use cw_orch_core::environment::ChainInfoOwned; +use cw_orch_daemon::Channel; use prost::Message; use std::io::{self, Write}; use std::sync::Arc; -use tonic::transport::Channel; // ANCHOR: full_counter_example use counter_contract::CounterContract; @@ -72,7 +72,7 @@ impl SenderBuilder for ManualSenderOptions { type Sender = ManualSender; async fn build(&self, chain_info: &Arc) -> Result { - let grpc_channel = GrpcChannel::from_chain_info(chain_info.as_ref()).await?; + let grpc_channel = GrpcChannel::from_chain_info(chain_info.as_ref()).await; Ok(ManualSender { chain_info: chain_info.clone(), sender: Addr::unchecked(self.sender_address.clone()), @@ -85,7 +85,7 @@ impl QuerySender for ManualSender { type Error = DaemonError; type Options = ManualSenderOptions; - fn channel(&self) -> tonic::transport::Channel { + fn channel(&self) -> Channel { self.grpc_channel.clone() } } diff --git a/cw-orch-daemon/examples/querier-daemon.rs b/cw-orch-daemon/examples/querier-daemon.rs index 54d85dc04..7d4840784 100644 --- a/cw-orch-daemon/examples/querier-daemon.rs +++ b/cw-orch-daemon/examples/querier-daemon.rs @@ -1,5 +1,7 @@ // ANCHOR: full_counter_example +use std::{thread::sleep, time::Duration}; + use cw_orch::{anyhow, prelude::*}; use cw_orch_daemon::senders::QueryOnlyDaemon; @@ -18,5 +20,15 @@ pub fn main() -> anyhow::Result<()> { .balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?; assert!(!balances.is_empty()); + log::info!("Sleeping 10s"); + sleep(Duration::from_secs(10)); + log::info!("FInished sleeping"); + + let balances = chain + .bank_querier() + .balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?; + assert!(!balances.is_empty()); + log::info!("Finished example"); + Ok(()) } diff --git a/cw-orch-daemon/src/channel.rs b/cw-orch-daemon/src/channel.rs index 6f58ac461..320e06f7b 100644 --- a/cw-orch-daemon/src/channel.rs +++ b/cw-orch-daemon/src/channel.rs @@ -3,16 +3,26 @@ use cosmrs::proto::cosmos::base::tendermint::v1beta1::{ }; use cw_orch_core::{environment::ChainInfoOwned, log::connectivity_target}; use http::Uri; -use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; +use tonic::transport::{ClientTlsConfig, Endpoint}; + +use crate::service::{ + factory::{ChannelCreationArgs, ChannelFactory}, + reconnect::Reconnect, +}; use super::error::DaemonError; /// A helper for constructing a gRPC channel pub struct GrpcChannel {} +pub type Channel = Reconnect; + impl GrpcChannel { /// Connect to any of the provided gRPC endpoints - pub async fn connect(grpc: &[String], chain_id: &str) -> Result { + pub async fn get_channel( + grpc: &[String], + chain_id: &str, + ) -> Result { if grpc.is_empty() { return Err(DaemonError::GRPCListIsEmpty); } @@ -69,8 +79,13 @@ impl GrpcChannel { Ok(successful_connections.pop().unwrap()) } + pub async fn connect(grpc: &[String], chain_id: &str) -> Channel { + let channel = Reconnect::new(ChannelFactory {}, (grpc.to_vec(), chain_id.to_string())); + channel.clone() + } + /// Create a gRPC channel from the chain info - pub async fn from_chain_info(chain_info: &ChainInfoOwned) -> Result { + pub async fn from_chain_info(chain_info: &ChainInfoOwned) -> Channel { GrpcChannel::connect(&chain_info.grpc_urls, &chain_info.chain_id).await } } diff --git a/cw-orch-daemon/src/core.rs b/cw-orch-daemon/src/core.rs index f3b7f5fd9..4e43f47fa 100644 --- a/cw-orch-daemon/src/core.rs +++ b/cw-orch-daemon/src/core.rs @@ -1,6 +1,7 @@ use super::{ cosmos_modules, error::DaemonError, queriers::Node, senders::Wallet, tx_resp::CosmTxResponse, }; +use crate::Channel; use crate::{ queriers::CosmWasm, senders::{builder::SenderBuilder, query::QuerySender, tx::TxSender}, @@ -31,7 +32,6 @@ use std::{ str::{from_utf8, FromStr}, time::Duration, }; -use tonic::transport::Channel; pub const INSTANTIATE_2_TYPE_URL: &str = "/cosmwasm.wasm.v1.MsgInstantiateContract2"; diff --git a/cw-orch-daemon/src/lib.rs b/cw-orch-daemon/src/lib.rs index 18f529eac..a64e2da78 100644 --- a/cw-orch-daemon/src/lib.rs +++ b/cw-orch-daemon/src/lib.rs @@ -12,6 +12,7 @@ pub mod keys; pub mod live_mock; pub mod queriers; pub mod senders; +pub mod service; pub mod tx_broadcaster; pub mod tx_builder; diff --git a/cw-orch-daemon/src/live_mock.rs b/cw-orch-daemon/src/live_mock.rs index 98aae87cc..1ba947823 100644 --- a/cw-orch-daemon/src/live_mock.rs +++ b/cw-orch-daemon/src/live_mock.rs @@ -4,6 +4,7 @@ use crate::queriers::Bank; use crate::queriers::CosmWasm; use crate::queriers::Staking; +use crate::Channel; use crate::RUNTIME; use cosmwasm_std::testing::{MockApi, MockStorage}; use cosmwasm_std::Addr; @@ -24,7 +25,6 @@ use cw_orch_core::environment::ChainInfoOwned; use cw_orch_core::environment::WasmQuerier; use std::marker::PhantomData; use std::str::FromStr; -use tonic::transport::Channel; use crate::channel::GrpcChannel; @@ -191,12 +191,10 @@ impl WasmMockQuerier { impl WasmMockQuerier { /// Creates a querier from chain information pub fn new(chain: ChainInfoOwned) -> Self { - let channel = RUNTIME - .block_on(GrpcChannel::connect( - &chain.grpc_urls, - chain.chain_id.as_str(), - )) - .unwrap(); + let channel = RUNTIME.block_on(GrpcChannel::connect( + &chain.grpc_urls, + chain.chain_id.as_str(), + )); WasmMockQuerier { channel } } diff --git a/cw-orch-daemon/src/queriers/authz.rs b/cw-orch-daemon/src/queriers/authz.rs index 14766f873..50859fcfa 100644 --- a/cw-orch-daemon/src/queriers/authz.rs +++ b/cw-orch-daemon/src/queriers/authz.rs @@ -3,7 +3,7 @@ use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::Addr; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; +use crate::Channel; /// Queries for Cosmos AuthZ Module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/bank.rs b/cw-orch-daemon/src/queriers/bank.rs index ac7fa3831..499788933 100644 --- a/cw-orch-daemon/src/queriers/bank.rs +++ b/cw-orch-daemon/src/queriers/bank.rs @@ -3,7 +3,7 @@ use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::{Addr, Coin, StdError}; use cw_orch_core::environment::{BankQuerier, Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; +use crate::Channel; /// Queries for Cosmos Bank Module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/cosmwasm.rs b/cw-orch-daemon/src/queriers/cosmwasm.rs index 6fa942e91..d122532d2 100644 --- a/cw-orch-daemon/src/queriers/cosmwasm.rs +++ b/cw-orch-daemon/src/queriers/cosmwasm.rs @@ -2,6 +2,7 @@ use std::{marker::PhantomData, str::FromStr}; use crate::senders::query::QuerySender; use crate::senders::QueryOnlySender; +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, DaemonBase}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmrs::AccountId; @@ -15,7 +16,6 @@ use cw_orch_core::{ environment::{Querier, QuerierGetter, WasmQuerier}, }; use tokio::runtime::Handle; -use tonic::transport::Channel; /// Querier for the CosmWasm SDK module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/feegrant.rs b/cw-orch-daemon/src/queriers/feegrant.rs index a899f1598..3a99eeaad 100644 --- a/cw-orch-daemon/src/queriers/feegrant.rs +++ b/cw-orch-daemon/src/queriers/feegrant.rs @@ -1,9 +1,9 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::Addr; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; /// Querier for the Cosmos Gov module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/gov.rs b/cw-orch-daemon/src/queriers/gov.rs index 18cfcb916..a6240f77b 100644 --- a/cw-orch-daemon/src/queriers/gov.rs +++ b/cw-orch-daemon/src/queriers/gov.rs @@ -3,7 +3,7 @@ use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::Addr; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; +use crate::Channel; /// Querier for the Cosmos Gov module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/ibc.rs b/cw-orch-daemon/src/queriers/ibc.rs index 0c96f00b9..3b2aadeda 100644 --- a/cw-orch-daemon/src/queriers/ibc.rs +++ b/cw-orch-daemon/src/queriers/ibc.rs @@ -1,3 +1,4 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmos_modules::ibc_channel; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; @@ -13,8 +14,6 @@ use cosmrs::proto::ibc::{ use cw_orch_core::environment::{Querier, QuerierGetter}; use prost::Message; use tokio::runtime::Handle; -use tonic::transport::Channel; - /// Querier for the Cosmos IBC module /// All the async function are prefixed with `_` pub struct Ibc { diff --git a/cw-orch-daemon/src/queriers/node.rs b/cw-orch-daemon/src/queriers/node.rs index 0958da8a2..68b6c3f9b 100644 --- a/cw-orch-daemon/src/queriers/node.rs +++ b/cw-orch-daemon/src/queriers/node.rs @@ -5,6 +5,7 @@ use crate::{ tx_resp::CosmTxResponse, DaemonBase, }; +use crate::Channel; use cosmrs::{ proto::cosmos::{ base::query::v1beta1::PageRequest, @@ -18,8 +19,6 @@ use cw_orch_core::{ log::query_target, }; use tokio::runtime::Handle; -use tonic::transport::Channel; - /// Querier for the Tendermint node. /// Supports queries for block and tx information /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/staking.rs b/cw-orch-daemon/src/queriers/staking.rs index 98936ea43..19d92d0ff 100644 --- a/cw-orch-daemon/src/queriers/staking.rs +++ b/cw-orch-daemon/src/queriers/staking.rs @@ -1,11 +1,11 @@ use std::fmt::Display; +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::{Addr, StdError}; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use tonic::transport::Channel; use super::bank::cosmrs_to_cosmwasm_coin; diff --git a/cw-orch-daemon/src/senders/cosmos.rs b/cw-orch-daemon/src/senders/cosmos.rs index 1fba13405..4950fda63 100644 --- a/cw-orch-daemon/src/senders/cosmos.rs +++ b/cw-orch-daemon/src/senders/cosmos.rs @@ -4,6 +4,7 @@ use super::{ sign::{Signer, SigningAccount}, tx::TxSender, }; +use crate::Channel; use crate::{ core::parse_cw_coins, cosmos_modules::{self, auth::BaseAccount}, @@ -33,7 +34,6 @@ use cw_orch_core::{ CoreEnvVars, CwEnvError, }; use std::{str::FromStr, sync::Arc}; -use tonic::transport::Channel; #[cfg(feature = "eth")] use crate::proto::injective::InjectiveSigner; @@ -98,7 +98,7 @@ impl Wallet { Ok(Self { chain_info: chain_info.clone(), - grpc_channel: GrpcChannel::from_chain_info(chain_info.as_ref()).await?, + grpc_channel: GrpcChannel::from_chain_info(chain_info.as_ref()).await, private_key: pk, secp, options, diff --git a/cw-orch-daemon/src/senders/cosmos_batch.rs b/cw-orch-daemon/src/senders/cosmos_batch.rs index d939ff20c..4001e165b 100644 --- a/cw-orch-daemon/src/senders/cosmos_batch.rs +++ b/cw-orch-daemon/src/senders/cosmos_batch.rs @@ -1,4 +1,4 @@ -use crate::{DaemonBase, INSTANTIATE_2_TYPE_URL}; +use crate::{Channel, DaemonBase, INSTANTIATE_2_TYPE_URL}; use crate::{error::DaemonError, tx_resp::CosmTxResponse}; @@ -86,7 +86,7 @@ impl QuerySender for CosmosBatchSender { type Error = DaemonError; type Options = CosmosBatchOptions; - fn channel(&self) -> tonic::transport::Channel { + fn channel(&self) -> Channel { self.sender.channel() } } diff --git a/cw-orch-daemon/src/senders/query.rs b/cw-orch-daemon/src/senders/query.rs index 208c3f0a7..2ccca67f9 100644 --- a/cw-orch-daemon/src/senders/query.rs +++ b/cw-orch-daemon/src/senders/query.rs @@ -1,4 +1,4 @@ -use tonic::transport::Channel; +use crate::Channel; use crate::DaemonError; diff --git a/cw-orch-daemon/src/senders/query_only.rs b/cw-orch-daemon/src/senders/query_only.rs index 7238e1e1e..638c4b58d 100644 --- a/cw-orch-daemon/src/senders/query_only.rs +++ b/cw-orch-daemon/src/senders/query_only.rs @@ -4,7 +4,7 @@ use crate::{error::DaemonError, DaemonBase, GrpcChannel}; use cw_orch_core::environment::ChainInfoOwned; -use tonic::transport::Channel; +use crate::Channel; use super::{builder::SenderBuilder, query::QuerySender}; @@ -26,7 +26,7 @@ impl SenderBuilder for () { type Sender = QueryOnlySender; async fn build(&self, chain_info: &Arc) -> Result { - let channel = GrpcChannel::from_chain_info(chain_info.as_ref()).await?; + let channel = GrpcChannel::from_chain_info(chain_info.as_ref()).await; Ok(QueryOnlySender { channel, diff --git a/cw-orch-daemon/src/service/factory.rs b/cw-orch-daemon/src/service/factory.rs new file mode 100644 index 000000000..88426978f --- /dev/null +++ b/cw-orch-daemon/src/service/factory.rs @@ -0,0 +1,28 @@ +use std::{future::Future, pin::Pin}; + +use tower::Service; + +use crate::{DaemonError, GrpcChannel}; + +pub type ChannelCreationArgs = (Vec, String); +#[derive(Clone)] +pub struct ChannelFactory {} + +impl Service for ChannelFactory { + type Response = tonic::transport::Channel; + + type Error = DaemonError; + + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, req: ChannelCreationArgs) -> Self::Future { + Box::pin(async move { GrpcChannel::get_channel(req.0.as_ref(), &req.1).await }) + } +} diff --git a/cw-orch-daemon/src/service/future.rs b/cw-orch-daemon/src/service/future.rs new file mode 100644 index 000000000..b0f2a7d04 --- /dev/null +++ b/cw-orch-daemon/src/service/future.rs @@ -0,0 +1,74 @@ +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower::BoxError; + +pin_project! { + /// Future that resolves to the response or failure to connect. + #[derive(Debug)] + pub struct ResponseFuture { + #[pin] + inner: Inner, + } +} + +pin_project! { + #[project = InnerProj] + #[derive(Debug)] + enum Inner { + Future { + #[pin] + fut: F, + }, + Error { + error: Option, + }, + } +} + +impl Inner { + fn future(fut: F) -> Self { + Self::Future { fut } + } + + fn error(error: Option) -> Self { + Self::Error { error } + } +} + +impl ResponseFuture { + pub(crate) fn new(inner: F) -> Self { + ResponseFuture { + inner: Inner::future(inner), + } + } + + pub(crate) fn error(error: E) -> Self { + ResponseFuture { + inner: Inner::error(Some(error)), + } + } +} + +impl Future for ResponseFuture +where + F: Future>, + E: Into, + ME: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + match me.inner.project() { + InnerProj::Future { fut } => fut.poll(cx).map_err(Into::into), + InnerProj::Error { error } => { + let e = error.take().expect("Polled after ready.").into(); + Poll::Ready(Err(e)) + } + } + } +} diff --git a/cw-orch-daemon/src/service/mod.rs b/cw-orch-daemon/src/service/mod.rs new file mode 100644 index 000000000..b6ab7d535 --- /dev/null +++ b/cw-orch-daemon/src/service/mod.rs @@ -0,0 +1,3 @@ +pub mod factory; +pub mod future; +pub mod reconnect; diff --git a/cw-orch-daemon/src/service/reconnect.rs b/cw-orch-daemon/src/service/reconnect.rs new file mode 100644 index 000000000..098ad8d5b --- /dev/null +++ b/cw-orch-daemon/src/service/reconnect.rs @@ -0,0 +1,190 @@ +use crate::service::future::ResponseFuture; +use crate::DaemonError; +use log::trace; +use std::fmt; +use std::sync::{Arc, Mutex}; +use std::thread::sleep; +use std::time::Duration; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tower::{BoxError, Service}; + +use super::factory::ChannelFactory; + +/// Reconnect to failed services. +pub struct Reconnect +where + M: Service, + M::Error: std::fmt::Debug, + + M: Sync, + Target: Sync, +{ + mk_service: M, + state: Arc>>, + target: Target, +} + +impl Clone for Reconnect +where + M: Service, + M::Error: std::fmt::Debug, + M: Clone, + Target: Clone, + M: Sync, + Target: Sync, +{ + fn clone(&self) -> Self { + Self { + mk_service: self.mk_service.clone(), + state: self.state.clone(), + target: self.target.clone(), + } + } +} + +#[derive(Debug)] +enum State { + Error(E), + Idle, + Connecting(F), + Connected(S), +} + +impl Reconnect +where + M: Service, + M::Error: std::fmt::Debug, + M: Sync, + Target: Sync, +{ + /// Lazily connect and reconnect to a [`Service`]. + pub fn new(mk_service: M, target: Target) -> Self { + Reconnect { + mk_service, + state: Arc::new(Mutex::new(State::Idle)), + target, + } + } + + /// Reconnect to a already connected [`Service`]. + pub fn with_connection(init_conn: M::Response, mk_service: M, target: Target) -> Self { + Reconnect { + mk_service, + state: Arc::new(Mutex::new(State::Connected(init_conn))), + target, + } + } +} + +impl Service for Reconnect +where + ChannelFactory: Service, + >::Error: std::fmt::Debug, + >::Future: Unpin, + BoxError: From<>::Error> + From, + + S: Service, + Target: Clone + Sync, +{ + type Response = S::Response; + type Error = BoxError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + let mut state = self.state.lock().unwrap(); + match &mut *state { + State::Idle | State::Error(_) => { + trace!("poll_ready; idle"); + match self.mk_service.poll_ready(cx) { + Poll::Ready(r) => r?, + Poll::Pending => { + trace!("poll_ready; MakeService not ready"); + return Poll::Pending; + } + } + + let fut = self.mk_service.call(self.target.clone()); + drop(state); + self.state = Arc::new(Mutex::new(State::Connecting(fut))); + continue; + } + State::Connecting(ref mut f) => { + trace!("poll_ready; connecting"); + match Pin::new(f).poll(cx) { + Poll::Ready(Ok(service)) => { + drop(state); + self.state = Arc::new(Mutex::new(State::Connected(service))); + } + Poll::Pending => { + trace!("poll_ready; not ready"); + return Poll::Pending; + } + Poll::Ready(Err(e)) => { + trace!("poll_ready; error, retrying in {} seconds", 5); + drop(state); + self.state = Arc::new(Mutex::new(State::Error(e))); + sleep(Duration::from_secs(5)); + } + } + } + State::Connected(ref mut inner) => { + trace!("poll_ready; connected"); + match inner.poll_ready(cx) { + Poll::Ready(Ok(())) => { + trace!("poll_ready; ready"); + return Poll::Ready(Ok(())); + } + Poll::Pending => { + trace!("poll_ready; not ready"); + return Poll::Pending; + } + Poll::Ready(Err(_)) => { + trace!("poll_ready; error"); + + drop(state); + self.state = Arc::new(Mutex::new(State::Idle)); + } + } + } + } + } + + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + let mut state = self.state.lock().unwrap(); + let service = match &mut *state { + State::Connected(ref mut service) => service, + State::Error(error) => panic!("{:?}", error), + _ => panic!("service not ready; poll_ready must be called first"), + }; + + let fut = service.call(request); + ResponseFuture::new(fut) + } +} + +impl fmt::Debug for Reconnect +where + M: Service + fmt::Debug, + M::Future: fmt::Debug, + M::Response: fmt::Debug, + Target: fmt::Debug, + M::Error: std::fmt::Debug, + M: Sync, + Target: Sync, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Reconnect") + .field("mk_service", &self.mk_service) + .field("state", &self.state) + .field("target", &self.target) + .finish() + } +} diff --git a/cw-orch-daemon/src/sync/core.rs b/cw-orch-daemon/src/sync/core.rs index 7320cc17b..39c73206a 100644 --- a/cw-orch-daemon/src/sync/core.rs +++ b/cw-orch-daemon/src/sync/core.rs @@ -1,6 +1,7 @@ use std::{fmt::Debug, ops::DerefMut}; use super::super::senders::Wallet; +use crate::Channel; use crate::{ queriers::{Bank, CosmWasmBase, Node}, senders::{builder::SenderBuilder, query::QuerySender}, @@ -14,7 +15,6 @@ use cw_orch_core::{ use cw_orch_traits::stargate::Stargate; use serde::Serialize; use tokio::runtime::Handle; -use tonic::transport::Channel; use crate::senders::tx::TxSender; diff --git a/packages/interchain/interchain-core/src/channel.rs b/packages/interchain/interchain-core/src/channel.rs index 98e4320d2..365299d87 100644 --- a/packages/interchain/interchain-core/src/channel.rs +++ b/packages/interchain/interchain-core/src/channel.rs @@ -12,7 +12,7 @@ use crate::InterchainError; /// Identifies a channel between two IBC connected chains. /// This describes only 1 side of the channel -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct IbcPort { /// The chain id of the network which belongs on one side of the channel pub chain_id: NetworkId, @@ -31,10 +31,21 @@ pub struct IbcPort { pub chain: Channel, } +impl std::fmt::Debug for IbcPort { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("IbcPort") + .field("chain_id", &self.chain_id) + .field("connection_id", &self.connection_id) + .field("port", &self.port) + .field("channel", &self.channel) + .finish() + } +} + /// Store information about a channel between 2 blockchains /// The order of port_a and port_b is not important /// Even if there is a src and dst chain, the order for an IBC channel doesn't matter -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct InterchainChannel where Channel: Clone + Send + Sync, @@ -45,6 +56,18 @@ where pub port_b: IbcPort, } +impl std::fmt::Debug for InterchainChannel +where + Channel: Clone + Send + Sync, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InterchainChannel") + .field("port_a", &self.port_a) + .field("port_b", &self.port_b) + .finish() + } +} + // TODO some of those queries may be implemented (or are already implemented) in the IBC querier file ? impl InterchainChannel where diff --git a/packages/interchain/interchain-core/src/ibc_query.rs b/packages/interchain/interchain-core/src/ibc_query.rs index b22e6c939..2c31b4b2d 100644 --- a/packages/interchain/interchain-core/src/ibc_query.rs +++ b/packages/interchain/interchain-core/src/ibc_query.rs @@ -2,6 +2,7 @@ use crate::results::NetworkId; use cosmwasm_std::Api; use cw_orch_core::environment::CwEnv; use cw_orch_core::environment::QueryHandler; +use cw_orch_daemon::Channel; use cw_orch_mock::{MockBase, MockState}; /// Adds additional capabilities to CwEnv for use with ibc environments @@ -20,9 +21,9 @@ pub trait IbcQueryHandler: CwEnv { #[cfg(feature = "daemon")] // Temporary until we can actually push to cw-orch-daemon impl IbcQueryHandler for cw_orch_daemon::Daemon { - type Handler = tonic::transport::Channel; + type Handler = Channel; - fn ibc_handler(&self) -> tonic::transport::Channel { + fn ibc_handler(&self) -> Channel { self.channel() } diff --git a/packages/interchain/interchain-daemon/src/error.rs b/packages/interchain/interchain-daemon/src/error.rs index f31fefc77..bb756582c 100644 --- a/packages/interchain/interchain-daemon/src/error.rs +++ b/packages/interchain/interchain-daemon/src/error.rs @@ -1,9 +1,9 @@ #![allow(missing_docs)] use cosmwasm_std::StdError; +use cw_orch_daemon::Channel; use cw_orch_interchain_core::{channel::InterchainChannel, results::NetworkId, InterchainError}; use thiserror::Error; -use tonic::transport::Channel; #[derive(Error, Debug)] pub enum InterchainDaemonError { diff --git a/packages/interchain/interchain-daemon/src/ibc_tracker.rs b/packages/interchain/interchain-daemon/src/ibc_tracker.rs index 52ccb6a35..f775cb15e 100644 --- a/packages/interchain/interchain-daemon/src/ibc_tracker.rs +++ b/packages/interchain/interchain-daemon/src/ibc_tracker.rs @@ -5,7 +5,7 @@ use cosmrs::proto::ibc::core::channel::v1::State; use cw_orch_core::contract::interface_traits::ContractInstance; use cw_orch_core::environment::Environment; use cw_orch_daemon::queriers::{Ibc, Node}; -use cw_orch_daemon::Daemon; +use cw_orch_daemon::{Channel, Daemon}; use cw_orch_interchain_core::env::contract_port; use diff::Diff; use futures_util::future::join_all; @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::error::Error; use std::{fmt::Display, time::Duration}; -use tonic::{async_trait, transport::Channel}; +use tonic::async_trait; use self::logged_state::LoggedState; @@ -117,8 +117,9 @@ mod logged_state { fmt::{Debug, Display}, }; + use cw_orch_daemon::Channel; use diff::Diff; - use tonic::{async_trait, transport::Channel}; + use tonic::async_trait; #[async_trait] pub trait LoggedState: diff --git a/packages/interchain/interchain-daemon/src/interchain_env.rs b/packages/interchain/interchain-daemon/src/interchain_env.rs index f7ed82fc8..9d8e9e3c1 100644 --- a/packages/interchain/interchain-daemon/src/interchain_env.rs +++ b/packages/interchain/interchain-daemon/src/interchain_env.rs @@ -6,9 +6,9 @@ use cw_orch_interchain_core::channel::{IbcPort, InterchainChannel}; use cw_orch_interchain_core::env::{ChainId, ChannelCreation}; use cw_orch_interchain_core::{InterchainEnv, NestedPacketsFlow, SinglePacketFlow}; +use cw_orch_daemon::Channel; use ibc_relayer_types::core::ics04_channel::packet::Sequence; use tokio::time::sleep; -use tonic::transport::Channel; use crate::channel_creator::{ChannelCreationValidator, ChannelCreator}; use crate::interchain_log::InterchainLog; diff --git a/packages/interchain/interchain-daemon/src/packet_inspector.rs b/packages/interchain/interchain-daemon/src/packet_inspector.rs index c72d4f039..ca9d85c90 100644 --- a/packages/interchain/interchain-daemon/src/packet_inspector.rs +++ b/packages/interchain/interchain-daemon/src/packet_inspector.rs @@ -18,10 +18,10 @@ use futures_util::FutureExt; use crate::{IcDaemonResult, InterchainDaemonError}; use cw_orch_interchain_core::results::NetworkId; +use cw_orch_daemon::Channel; use futures::future::try_join_all; use ibc_relayer_types::core::ics04_channel::packet::Sequence; use ibc_relayer_types::core::ics24_host::identifier::{ChannelId, PortId}; -use tonic::transport::Channel; use std::collections::HashMap; @@ -176,7 +176,7 @@ impl PacketInspector { } else { // If no custom channel was registered, we try to get it from the registry let chain_data: ChainInfoOwned = parse_network(chain_id).unwrap().into(); // TODO, no unwrap here ? - Ok(GrpcChannel::connect(&chain_data.grpc_urls, chain_id).await?) + Ok(GrpcChannel::connect(&chain_data.grpc_urls, chain_id).await) } } diff --git a/packages/interchain/proto/Cargo.toml b/packages/interchain/proto/Cargo.toml index 28324984c..c1089ca91 100644 --- a/packages/interchain/proto/Cargo.toml +++ b/packages/interchain/proto/Cargo.toml @@ -13,6 +13,7 @@ repository.workspace = true cw-orch-interchain-core = { workspace = true } cw-orch-traits = { workspace = true } cw-orch-core = { workspace = true } +cw-orch-daemon = { workspace = true } anyhow = { workspace = true } diff --git a/packages/interchain/proto/src/tokenfactory.rs b/packages/interchain/proto/src/tokenfactory.rs index eeaefd93f..066d1f5b3 100644 --- a/packages/interchain/proto/src/tokenfactory.rs +++ b/packages/interchain/proto/src/tokenfactory.rs @@ -1,12 +1,12 @@ #![allow(non_snake_case)] +use cw_orch_daemon::Channel; use cw_orch_interchain_core::{ channel::InterchainChannel, IbcQueryHandler, InterchainEnv, InterchainError, NestedPacketsFlow, }; use ibc_proto::ibc::apps::transfer::v1::MsgTransfer; use osmosis_std::types::osmosis::tokenfactory::v1beta1::{MsgCreateDenom, MsgMint}; use prost::{Message, Name}; -use tonic::transport::Channel; use cosmwasm_std::Coin; use cw_orch_core::environment::{CwEnv, TxHandler}; From 888e1220b37ee2a6764340a2b9aa8c65f42e07de Mon Sep 17 00:00:00 2001 From: Kayanski Date: Wed, 30 Oct 2024 08:41:56 +0000 Subject: [PATCH 2/9] Fix warnings and reconnect definition --- cw-orch-daemon/src/service/future.rs | 10 -------- cw-orch-daemon/src/service/reconnect.rs | 23 +++++++++---------- cw-orch-daemon/tests/querier.rs | 11 +++------ packages/clone-testing/src/core.rs | 9 +++++--- .../interchain/interchain-daemon/src/error.rs | 6 ++--- .../interchain-daemon/src/interchain_env.rs | 2 +- 6 files changed, 24 insertions(+), 37 deletions(-) diff --git a/cw-orch-daemon/src/service/future.rs b/cw-orch-daemon/src/service/future.rs index b0f2a7d04..cd28666e9 100644 --- a/cw-orch-daemon/src/service/future.rs +++ b/cw-orch-daemon/src/service/future.rs @@ -33,10 +33,6 @@ impl Inner { fn future(fut: F) -> Self { Self::Future { fut } } - - fn error(error: Option) -> Self { - Self::Error { error } - } } impl ResponseFuture { @@ -45,12 +41,6 @@ impl ResponseFuture { inner: Inner::future(inner), } } - - pub(crate) fn error(error: E) -> Self { - ResponseFuture { - inner: Inner::error(Some(error)), - } - } } impl Future for ResponseFuture diff --git a/cw-orch-daemon/src/service/reconnect.rs b/cw-orch-daemon/src/service/reconnect.rs index 098ad8d5b..58d9097b2 100644 --- a/cw-orch-daemon/src/service/reconnect.rs +++ b/cw-orch-daemon/src/service/reconnect.rs @@ -1,5 +1,4 @@ use crate::service::future::ResponseFuture; -use crate::DaemonError; use log::trace; use std::fmt; use std::sync::{Arc, Mutex}; @@ -12,8 +11,6 @@ use std::{ }; use tower::{BoxError, Service}; -use super::factory::ChannelFactory; - /// Reconnect to failed services. pub struct Reconnect where @@ -24,6 +21,7 @@ where Target: Sync, { mk_service: M, + #[allow(clippy::type_complexity)] state: Arc>>, target: Target, } @@ -80,19 +78,19 @@ where } } -impl Service for Reconnect +impl Service for Reconnect where - ChannelFactory: Service, - >::Error: std::fmt::Debug, - >::Future: Unpin, - BoxError: From<>::Error> + From, + M: Service + Sync, + M::Error: std::fmt::Debug, + M::Future: Unpin, + BoxError: From + From, S: Service, Target: Clone + Sync, { type Response = S::Response; type Error = BoxError; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { loop { @@ -153,15 +151,16 @@ where } } } - - Poll::Ready(Ok(())) } fn call(&mut self, request: Request) -> Self::Future { let mut state = self.state.lock().unwrap(); let service = match &mut *state { State::Connected(ref mut service) => service, - State::Error(error) => panic!("{:?}", error), + State::Error(error) => panic!( + "service not ready; poll_ready must be called first: {:?}", + error + ), _ => panic!("service not ready; poll_ready must be called first"), }; diff --git a/cw-orch-daemon/tests/querier.rs b/cw-orch-daemon/tests/querier.rs index 9bc334d1d..87f9c3b3d 100644 --- a/cw-orch-daemon/tests/querier.rs +++ b/cw-orch-daemon/tests/querier.rs @@ -23,19 +23,14 @@ mod queriers { tx::{self, Msg}, AccountId, Denom, }; + use cw_orch_daemon::Channel; - pub async fn build_channel() -> tonic::transport::Channel { + pub async fn build_channel() -> Channel { let network = networks::LOCAL_JUNO; let grpcs = vec![network.grpc_urls[0].into()]; - let channel = GrpcChannel::connect(&grpcs, network.chain_id).await; - - asserting!("channel connection is succesful") - .that(&channel) - .is_ok(); - - channel.unwrap() + GrpcChannel::connect(&grpcs, network.chain_id).await } /* diff --git a/packages/clone-testing/src/core.rs b/packages/clone-testing/src/core.rs index 9ff00b395..ccf2f1aeb 100644 --- a/packages/clone-testing/src/core.rs +++ b/packages/clone-testing/src/core.rs @@ -17,7 +17,9 @@ use cw_orch_core::{ }, CwEnvError, }; -use cw_orch_daemon::{queriers::Node, read_network_config, DEFAULT_DEPLOYMENT, RUNTIME}; +use cw_orch_daemon::{ + queriers::Node, read_network_config, GrpcChannel, DEFAULT_DEPLOYMENT, RUNTIME, +}; use cw_utils::NativeBalance; use serde::Serialize; use tokio::runtime::Runtime; @@ -235,10 +237,11 @@ impl CloneTesting { let bank = BankKeeper::new().with_remote(remote_channel.clone()); - // We update the block_height + // We update the block_height, and open a second channel just for that (to make sure we are not too dependent on clone-cw-multi-test deps) + let node_channel = rt.block_on(GrpcChannel::from_chain_info(&chain)); let block_info = remote_channel .rt - .block_on(Node::new_async(remote_channel.channel.clone())._block_info()) + .block_on(Node::new_async(node_channel)._block_info()) .unwrap(); // Finally we instantiate a new app diff --git a/packages/interchain/interchain-daemon/src/error.rs b/packages/interchain/interchain-daemon/src/error.rs index bb756582c..314e3f726 100644 --- a/packages/interchain/interchain-daemon/src/error.rs +++ b/packages/interchain/interchain-daemon/src/error.rs @@ -1,8 +1,7 @@ #![allow(missing_docs)] use cosmwasm_std::StdError; -use cw_orch_daemon::Channel; -use cw_orch_interchain_core::{channel::InterchainChannel, results::NetworkId, InterchainError}; +use cw_orch_interchain_core::{results::NetworkId, InterchainError}; use thiserror::Error; #[derive(Error, Debug)] @@ -40,7 +39,8 @@ pub enum InterchainDaemonError { #[error("Channel creation events not found from chain {src_chain} on following channel : {channel:?}")] ChannelCreationEventsNotFound { src_chain: NetworkId, - channel: InterchainChannel, + /// Meant to contain InterchainChannel, but this is a large variant so used a string to remove warning + channel: String, }, #[error("Configuration already registered for chain {0}")] diff --git a/packages/interchain/interchain-daemon/src/interchain_env.rs b/packages/interchain/interchain-daemon/src/interchain_env.rs index 9d8e9e3c1..e1d31c7d3 100644 --- a/packages/interchain/interchain-daemon/src/interchain_env.rs +++ b/packages/interchain/interchain-daemon/src/interchain_env.rs @@ -367,7 +367,7 @@ impl DaemonInterchain { Err(InterchainDaemonError::ChannelCreationEventsNotFound { src_chain: src_chain.to_string(), - channel: ibc_channel.clone(), + channel: format!("{ibc_channel:?}"), }) } From a1c961ec0b43ac03d856b454698abffdf2dcff7b Mon Sep 17 00:00:00 2001 From: Kayanski Date: Wed, 30 Oct 2024 10:11:01 +0000 Subject: [PATCH 3/9] Renamed and implemented clone --- cw-orch-daemon/Cargo.toml | 6 + cw-orch-daemon/examples/querier-daemon.rs | 9 +- cw-orch-daemon/src/channel.rs | 23 ++-- cw-orch-daemon/src/service/mod.rs | 3 +- .../src/service/{ => reconnect}/factory.rs | 4 +- .../src/service/{ => reconnect}/future.rs | 0 .../{reconnect.rs => reconnect/mod.rs} | 7 +- cw-orch-daemon/src/service/retry/future.rs | 121 ++++++++++++++++++ .../src/service/retry/implementation.rs | 95 ++++++++++++++ cw-orch-daemon/src/service/retry/layer.rs | 27 ++++ cw-orch-daemon/src/service/retry/mod.rs | 91 +++++++++++++ cw-orch-daemon/src/service/retry/policy.rs | 94 ++++++++++++++ 12 files changed, 457 insertions(+), 23 deletions(-) rename cw-orch-daemon/src/service/{ => reconnect}/factory.rs (88%) rename cw-orch-daemon/src/service/{ => reconnect}/future.rs (100%) rename cw-orch-daemon/src/service/{reconnect.rs => reconnect/mod.rs} (97%) create mode 100644 cw-orch-daemon/src/service/retry/future.rs create mode 100644 cw-orch-daemon/src/service/retry/implementation.rs create mode 100644 cw-orch-daemon/src/service/retry/layer.rs create mode 100644 cw-orch-daemon/src/service/retry/mod.rs create mode 100644 cw-orch-daemon/src/service/retry/policy.rs diff --git a/cw-orch-daemon/Cargo.toml b/cw-orch-daemon/Cargo.toml index 95e607ead..ba172389c 100644 --- a/cw-orch-daemon/Cargo.toml +++ b/cw-orch-daemon/Cargo.toml @@ -80,6 +80,12 @@ http = "1.1.0" libc-print = "0.1.23" tower = { version = "0.5.1", features = ["reconnect"] } pin-project-lite = "0.2.15" +futures-core = "0.3.31" +futures = "0.3.31" +futures-util = "0.3.31" +hyper = "1.5.0" +http-body-util = "0.1.2" +http-body = "1.0.1" [dev-dependencies] cw-orch-daemon = { path = "." } diff --git a/cw-orch-daemon/examples/querier-daemon.rs b/cw-orch-daemon/examples/querier-daemon.rs index 7d4840784..c6690c850 100644 --- a/cw-orch-daemon/examples/querier-daemon.rs +++ b/cw-orch-daemon/examples/querier-daemon.rs @@ -1,7 +1,5 @@ // ANCHOR: full_counter_example -use std::{thread::sleep, time::Duration}; - use cw_orch::{anyhow, prelude::*}; use cw_orch_daemon::senders::QueryOnlyDaemon; @@ -20,15 +18,10 @@ pub fn main() -> anyhow::Result<()> { .balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?; assert!(!balances.is_empty()); - log::info!("Sleeping 10s"); - sleep(Duration::from_secs(10)); - log::info!("FInished sleeping"); - let balances = chain .bank_querier() - .balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?; + .balance(&Addr::unchecked("faulty-address"), None)?; assert!(!balances.is_empty()); - log::info!("Finished example"); Ok(()) } diff --git a/cw-orch-daemon/src/channel.rs b/cw-orch-daemon/src/channel.rs index 320e06f7b..8863e3e3f 100644 --- a/cw-orch-daemon/src/channel.rs +++ b/cw-orch-daemon/src/channel.rs @@ -4,25 +4,21 @@ use cosmrs::proto::cosmos::base::tendermint::v1beta1::{ use cw_orch_core::{environment::ChainInfoOwned, log::connectivity_target}; use http::Uri; use tonic::transport::{ClientTlsConfig, Endpoint}; - -use crate::service::{ - factory::{ChannelCreationArgs, ChannelFactory}, - reconnect::Reconnect, -}; +use tower::ServiceBuilder; use super::error::DaemonError; +use crate::service::reconnect::{ChannelCreationArgs, ChannelFactory, Reconnect}; +use crate::service::retry::{Attempts, Retry, RetryLayer}; /// A helper for constructing a gRPC channel pub struct GrpcChannel {} pub type Channel = Reconnect; +pub type TowerChannel = Retry; impl GrpcChannel { /// Connect to any of the provided gRPC endpoints - pub async fn get_channel( - grpc: &[String], - chain_id: &str, - ) -> Result { + pub async fn get_channel(grpc: &[String], chain_id: &str) -> Result { if grpc.is_empty() { return Err(DaemonError::GRPCListIsEmpty); } @@ -76,7 +72,14 @@ impl GrpcChannel { return Err(DaemonError::CannotConnectGRPC); } - Ok(successful_connections.pop().unwrap()) + let retry_policy = Attempts(3); + let retry_layer = RetryLayer::new(retry_policy); + + let service = ServiceBuilder::new() + .layer(retry_layer) + .service(successful_connections.pop().unwrap()); + + Ok(service) } pub async fn connect(grpc: &[String], chain_id: &str) -> Channel { diff --git a/cw-orch-daemon/src/service/mod.rs b/cw-orch-daemon/src/service/mod.rs index b6ab7d535..66056d332 100644 --- a/cw-orch-daemon/src/service/mod.rs +++ b/cw-orch-daemon/src/service/mod.rs @@ -1,3 +1,2 @@ -pub mod factory; -pub mod future; pub mod reconnect; +pub mod retry; diff --git a/cw-orch-daemon/src/service/factory.rs b/cw-orch-daemon/src/service/reconnect/factory.rs similarity index 88% rename from cw-orch-daemon/src/service/factory.rs rename to cw-orch-daemon/src/service/reconnect/factory.rs index 88426978f..dcac90979 100644 --- a/cw-orch-daemon/src/service/factory.rs +++ b/cw-orch-daemon/src/service/reconnect/factory.rs @@ -2,14 +2,14 @@ use std::{future::Future, pin::Pin}; use tower::Service; -use crate::{DaemonError, GrpcChannel}; +use crate::{DaemonError, GrpcChannel, TowerChannel}; pub type ChannelCreationArgs = (Vec, String); #[derive(Clone)] pub struct ChannelFactory {} impl Service for ChannelFactory { - type Response = tonic::transport::Channel; + type Response = TowerChannel; type Error = DaemonError; diff --git a/cw-orch-daemon/src/service/future.rs b/cw-orch-daemon/src/service/reconnect/future.rs similarity index 100% rename from cw-orch-daemon/src/service/future.rs rename to cw-orch-daemon/src/service/reconnect/future.rs diff --git a/cw-orch-daemon/src/service/reconnect.rs b/cw-orch-daemon/src/service/reconnect/mod.rs similarity index 97% rename from cw-orch-daemon/src/service/reconnect.rs rename to cw-orch-daemon/src/service/reconnect/mod.rs index 58d9097b2..2a55e75b9 100644 --- a/cw-orch-daemon/src/service/reconnect.rs +++ b/cw-orch-daemon/src/service/reconnect/mod.rs @@ -1,4 +1,6 @@ -use crate::service::future::ResponseFuture; +pub mod factory; +pub mod future; +use future::ResponseFuture; use log::trace; use std::fmt; use std::sync::{Arc, Mutex}; @@ -11,6 +13,9 @@ use std::{ }; use tower::{BoxError, Service}; +pub use factory::ChannelCreationArgs; +pub use factory::ChannelFactory; + /// Reconnect to failed services. pub struct Reconnect where diff --git a/cw-orch-daemon/src/service/retry/future.rs b/cw-orch-daemon/src/service/retry/future.rs new file mode 100644 index 000000000..159251ab8 --- /dev/null +++ b/cw-orch-daemon/src/service/retry/future.rs @@ -0,0 +1,121 @@ +//! Future types + +use super::policy::Policy; +use super::Retry; +use futures_core::ready; +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::Service; + +pin_project! { + /// The [`Future`] returned by a [`Retry`] service. + #[derive(Debug)] + pub struct ResponseFuture + where + P: Policy, + S: Service, + { + request: Option, + #[pin] + retry: Retry, + #[pin] + state: State, + } +} + +pin_project! { + #[project = StateProj] + #[derive(Debug)] + enum State { + // Polling the future from [`Service::call`] + Called { + #[pin] + future: F + }, + // Polling the future from [`Policy::retry`] + Waiting { + #[pin] + waiting: P + }, + // Polling [`Service::poll_ready`] after [`Waiting`] was OK. + Retrying, + } +} + +impl ResponseFuture +where + P: Policy, + S: Service, +{ + pub(crate) fn new( + request: Option, + retry: Retry, + future: S::Future, + ) -> ResponseFuture { + ResponseFuture { + request, + retry, + state: State::Called { future }, + } + } +} + +impl Future for ResponseFuture +where + P: Policy, + S: Service, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + match this.state.as_mut().project() { + StateProj::Called { future } => { + let mut result = ready!(future.poll(cx)); + if let Some(req) = &mut this.request { + match this.retry.policy.retry(req, &mut result) { + Some(waiting) => { + this.state.set(State::Waiting { waiting }); + } + None => return Poll::Ready(result), + } + } else { + // request wasn't cloned, so no way to retry it + return Poll::Ready(result); + } + } + StateProj::Waiting { waiting } => { + ready!(waiting.poll(cx)); + + this.state.set(State::Retrying); + } + StateProj::Retrying => { + // NOTE: we assume here that + // + // this.retry.poll_ready() + // + // is equivalent to + // + // this.retry.service.poll_ready() + // + // we need to make that assumption to avoid adding an Unpin bound to the Policy + // in Ready to make it Unpin so that we can get &mut Ready as needed to call + // poll_ready on it. + ready!(this.retry.as_mut().project().service.poll_ready(cx))?; + let mut req = this + .request + .take() + .expect("retrying requires cloned request"); + (req, *this.request) = this.retry.policy.clone_request(req); + this.state.set(State::Called { + future: this.retry.as_mut().project().service.call(req), + }); + } + } + } + } +} diff --git a/cw-orch-daemon/src/service/retry/implementation.rs b/cw-orch-daemon/src/service/retry/implementation.rs new file mode 100644 index 000000000..0f19ccc01 --- /dev/null +++ b/cw-orch-daemon/src/service/retry/implementation.rs @@ -0,0 +1,95 @@ +use crate::RUNTIME; + +use super::Policy; +use futures::TryStreamExt; +use futures_util::future; +use http::{request::Parts, Request}; +use http_body_util::{BodyExt, Full}; +use hyper::body::Bytes; +use tonic::body::BoxBody; + +type Req = http::Request; +type Res = http::Response; + +#[derive(Clone)] +pub struct Attempts(pub usize); + +impl Policy for Attempts { + type Future = future::Ready<()>; + + fn retry(&mut self, _req: &mut Req, result: &mut Result) -> Option { + match result { + Ok(_) => { + log::trace!("Entering the middleware ok"); + // Treat all `Response`s as success, + // so don't retry... + None + } + Err(_) => { + log::trace!("Entering the middleware error"); + // Treat all errors as failures... + // But we limit the number of attempts... + if self.0 > 0 { + log::trace!("Try this again, there was a failure"); + // Try again! + self.0 -= 1; + Some(future::ready(())) + } else { + // Used all our attempts, no retry... + None + } + } + } + } + + fn clone_request(&mut self, req: Req) -> (Req, Option) { + // Convert body to Bytes so it can be cloned + let (parts, original_body) = req.into_parts(); + + // Try to capture the Bytes from the original body + let bytes = futures::executor::block_on(async move { + RUNTIME + .spawn(async move { consume_unsync_body(original_body).await }) + .await + .unwrap() + }); + + // Re-create the request with the captured bytes in a new BoxBody + let req = create_request(parts.clone(), bytes.clone()); + let cloned_req = create_request(parts, bytes); + + (req, Some(cloned_req)) + // Some(req.clone()) + } +} + +async fn consume_unsync_body(body: BoxBody) -> Vec { + // Accumulate bytes asynchronously + + body.into_data_stream() + .try_fold(Vec::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk); + Ok(acc) + }) + .await + .unwrap() +} + +fn create_request(parts: Parts, body: Vec) -> http::Request { + let bytes = Bytes::from(body); + let full_body = Full::new(bytes); + let mut request = Request::builder() + .method(parts.method) + .uri(parts.uri) + .version(parts.version) + .body( + full_body + .map_err(|_err| tonic::Status::internal("Body error")) + .boxed_unsync(), + ) + .unwrap(); + + *request.headers_mut() = parts.headers; + + request +} diff --git a/cw-orch-daemon/src/service/retry/layer.rs b/cw-orch-daemon/src/service/retry/layer.rs new file mode 100644 index 000000000..65a15e02f --- /dev/null +++ b/cw-orch-daemon/src/service/retry/layer.rs @@ -0,0 +1,27 @@ +use super::Retry; +use tower::Layer; + +/// Retry requests based on a policy +#[derive(Debug, Clone)] +pub struct RetryLayer

{ + policy: P, +} + +impl

RetryLayer

{ + /// Creates a new [`RetryLayer`] from a retry policy. + pub const fn new(policy: P) -> Self { + RetryLayer { policy } + } +} + +impl Layer for RetryLayer

+where + P: Clone, +{ + type Service = Retry; + + fn layer(&self, service: S) -> Self::Service { + let policy = self.policy.clone(); + Retry::new(policy, service) + } +} diff --git a/cw-orch-daemon/src/service/retry/mod.rs b/cw-orch-daemon/src/service/retry/mod.rs new file mode 100644 index 000000000..4dd8dc797 --- /dev/null +++ b/cw-orch-daemon/src/service/retry/mod.rs @@ -0,0 +1,91 @@ +pub mod future; +pub mod implementation; +pub mod layer; +pub mod policy; +pub use implementation::Attempts; +pub use layer::RetryLayer; +pub use policy::Policy; + +use crate::service::retry::future::ResponseFuture; +use pin_project_lite::pin_project; +use std::task::{Context, Poll}; +use tower::Service; + +pin_project! { + /// Configure retrying requests of "failed" responses. + /// + /// A [`Policy`] classifies what is a "failed" response. + /// + /// # Clone + /// + /// This middleware requires that the inner `Service` implements [`Clone`], + /// because the `Service` must be stored in each [`ResponseFuture`] in + /// order to retry the request in the event of a failure. If the inner + /// `Service` type does not implement `Clone`, the [`Buffer`] middleware + /// can be added to make any `Service` cloneable. + /// + /// [`Buffer`]: crate::buffer::Buffer + /// + /// The `Policy` must also implement `Clone`. This middleware will + /// clone the policy for each _request session_. This means a new clone + /// of the policy will be created for each initial request and any subsequent + /// retries of that request. Therefore, any state stored in the `Policy` instance + /// is for that request session only. In order to share data across request + /// sessions, that shared state may be stored in an [`Arc`], so that all clones + /// of the `Policy` type reference the same instance of the shared state. + /// + /// [`Arc`]: std::sync::Arc + #[derive(Clone, Debug)] + pub struct Retry { + policy: P, + service: S, + } +} + +// ===== impl Retry ===== + +impl Retry { + /// Retry the inner service depending on this [`Policy`]. + pub const fn new(policy: P, service: S) -> Self { + Retry { policy, service } + } + + /// Get a reference to the inner service + pub fn get_ref(&self) -> &S { + &self.service + } + + /// Get a mutable reference to the inner service + pub fn get_mut(&mut self) -> &mut S { + &mut self.service + } + + /// Consume `self`, returning the inner service + pub fn into_inner(self) -> S { + self.service + } +} + +impl Service for Retry +where + P: Policy + Clone, + S: Service + Clone, +{ + type Response = S::Response; + type Error = S::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // NOTE: the Future::poll impl for ResponseFuture assumes that Retry::poll_ready is + // equivalent to Ready.service.poll_ready. If this ever changes, that code must be updated + // as well. + self.service.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let (request, cloned) = self.policy.clone_request(request); + let future = self.service.call(request); + + ResponseFuture::new(cloned, self.clone(), future) + } +} diff --git a/cw-orch-daemon/src/service/retry/policy.rs b/cw-orch-daemon/src/service/retry/policy.rs new file mode 100644 index 000000000..7f300794d --- /dev/null +++ b/cw-orch-daemon/src/service/retry/policy.rs @@ -0,0 +1,94 @@ +use std::future::Future; + +/// A "retry policy" to classify if a request should be retried. +/// +/// # Example +/// +/// ``` +/// use tower::retry::Policy; +/// use futures_util::future; +/// +/// type Req = String; +/// type Res = String; +/// +/// struct Attempts(usize); +/// +/// impl Policy for Attempts { +/// type Future = future::Ready<()>; +/// +/// fn retry(&mut self, req: &mut Req, result: &mut Result) -> Option { +/// match result { +/// Ok(_) => { +/// // Treat all `Response`s as success, +/// // so don't retry... +/// None +/// }, +/// Err(_) => { +/// // Treat all errors as failures... +/// // But we limit the number of attempts... +/// if self.0 > 0 { +/// // Try again! +/// self.0 -= 1; +/// Some(future::ready(())) +/// } else { +/// // Used all our attempts, no retry... +/// None +/// } +/// } +/// } +/// } +/// +/// fn clone_request(&mut self, req: &Req) -> Option { +/// Some(req.clone()) +/// } +/// } +/// ``` +pub trait Policy { + /// The [`Future`] type returned by [`Policy::retry`]. + type Future: Future; + + /// Check the policy if a certain request should be retried. + /// + /// This method is passed a reference to the original request, and either + /// the [`Service::Response`] or [`Service::Error`] from the inner service. + /// + /// If the request should **not** be retried, return `None`. + /// + /// If the request *should* be retried, return `Some` future that will delay + /// the next retry of the request. This can be used to sleep for a certain + /// duration, to wait for some external condition to be met before retrying, + /// or resolve right away, if the request should be retried immediately. + /// + /// ## Mutating Requests + /// + /// The policy MAY chose to mutate the `req`: if the request is mutated, the + /// mutated request will be sent to the inner service in the next retry. + /// This can be helpful for use cases like tracking the retry count in a + /// header. + /// + /// ## Mutating Results + /// + /// The policy MAY chose to mutate the result. This enables the retry + /// policy to convert a failure into a success and vice versa. For example, + /// if the policy is used to poll while waiting for a state change, the + /// policy can switch the result to emit a specific error when retries are + /// exhausted. + /// + /// The policy can also record metadata on the request to include + /// information about the number of retries required or to record that a + /// failure failed after exhausting all retries. + /// + /// [`Service::Response`]: crate::Service::Response + /// [`Service::Error`]: crate::Service::Error + fn retry(&mut self, req: &mut Req, result: &mut Result) -> Option; + + /// Tries to clone a request before being passed to the inner service. + /// + /// If the request cannot be cloned, return [`None`]. Moreover, the retry + /// function will not be called if the [`None`] is returned. + fn clone_request(&mut self, req: Req) -> (Req, Option); +} + +// Ensure `Policy` is object safe +#[cfg(test)] +fn _obj_safe(_: Box>>) {} From 88a4258e229d9ad8a98c3b73c34305578fd72b45 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Wed, 30 Oct 2024 10:11:16 +0000 Subject: [PATCH 4/9] formatting --- cw-orch-daemon/src/queriers/authz.rs | 2 +- cw-orch-daemon/src/queriers/bank.rs | 2 +- cw-orch-daemon/src/queriers/gov.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cw-orch-daemon/src/queriers/authz.rs b/cw-orch-daemon/src/queriers/authz.rs index 50859fcfa..72e0d2d78 100644 --- a/cw-orch-daemon/src/queriers/authz.rs +++ b/cw-orch-daemon/src/queriers/authz.rs @@ -1,9 +1,9 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::Addr; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use crate::Channel; /// Queries for Cosmos AuthZ Module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/bank.rs b/cw-orch-daemon/src/queriers/bank.rs index 499788933..2fc19b811 100644 --- a/cw-orch-daemon/src/queriers/bank.rs +++ b/cw-orch-daemon/src/queriers/bank.rs @@ -1,9 +1,9 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, senders::query::QuerySender, DaemonBase}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::{Addr, Coin, StdError}; use cw_orch_core::environment::{BankQuerier, Querier, QuerierGetter}; use tokio::runtime::Handle; -use crate::Channel; /// Queries for Cosmos Bank Module /// All the async function are prefixed with `_` diff --git a/cw-orch-daemon/src/queriers/gov.rs b/cw-orch-daemon/src/queriers/gov.rs index a6240f77b..ce8ec1184 100644 --- a/cw-orch-daemon/src/queriers/gov.rs +++ b/cw-orch-daemon/src/queriers/gov.rs @@ -1,9 +1,9 @@ +use crate::Channel; use crate::{cosmos_modules, error::DaemonError, Daemon}; use cosmrs::proto::cosmos::base::query::v1beta1::PageRequest; use cosmwasm_std::Addr; use cw_orch_core::environment::{Querier, QuerierGetter}; use tokio::runtime::Handle; -use crate::Channel; /// Querier for the Cosmos Gov module /// All the async function are prefixed with `_` From b44398e3c4573c6c76ea6b032ce800ea9063c5b3 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Wed, 30 Oct 2024 10:14:28 +0000 Subject: [PATCH 5/9] Fix doc and readbility --- cw-orch-daemon/src/service/retry/implementation.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cw-orch-daemon/src/service/retry/implementation.rs b/cw-orch-daemon/src/service/retry/implementation.rs index 0f19ccc01..03f35ec3e 100644 --- a/cw-orch-daemon/src/service/retry/implementation.rs +++ b/cw-orch-daemon/src/service/retry/implementation.rs @@ -47,8 +47,9 @@ impl Policy for Attempts { let (parts, original_body) = req.into_parts(); // Try to capture the Bytes from the original body + // This is circumvoluted, I'm not sure how to call an async function within a sync function that is used inside a future later let bytes = futures::executor::block_on(async move { - RUNTIME + tokio::runtime::Handle::current() .spawn(async move { consume_unsync_body(original_body).await }) .await .unwrap() From 019dc1ae4b3c59bc37201ea8e69cdd3390de1b4e Mon Sep 17 00:00:00 2001 From: Kayanski Date: Wed, 30 Oct 2024 10:14:53 +0000 Subject: [PATCH 6/9] Clippy --- cw-orch-daemon/src/service/retry/implementation.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/cw-orch-daemon/src/service/retry/implementation.rs b/cw-orch-daemon/src/service/retry/implementation.rs index 03f35ec3e..7ea3d0077 100644 --- a/cw-orch-daemon/src/service/retry/implementation.rs +++ b/cw-orch-daemon/src/service/retry/implementation.rs @@ -1,5 +1,3 @@ -use crate::RUNTIME; - use super::Policy; use futures::TryStreamExt; use futures_util::future; From 86de0d49cc9e9b67b83d9b9cd267c3d36465f342 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Mon, 4 Nov 2024 13:44:25 +0000 Subject: [PATCH 7/9] Channel could error in creator --- cw-orch-daemon/examples/manual_sender.rs | 2 +- cw-orch-daemon/src/channel.rs | 15 ++++++++++----- cw-orch-daemon/src/live_mock.rs | 10 ++++++---- cw-orch-daemon/src/senders/cosmos.rs | 2 +- cw-orch-daemon/src/senders/query_only.rs | 2 +- cw-orch-daemon/tests/querier.rs | 4 +++- packages/clone-testing/src/core.rs | 2 +- .../interchain-daemon/src/packet_inspector.rs | 2 +- 8 files changed, 24 insertions(+), 15 deletions(-) diff --git a/cw-orch-daemon/examples/manual_sender.rs b/cw-orch-daemon/examples/manual_sender.rs index 90ea0a439..5643b60d9 100644 --- a/cw-orch-daemon/examples/manual_sender.rs +++ b/cw-orch-daemon/examples/manual_sender.rs @@ -72,7 +72,7 @@ impl SenderBuilder for ManualSenderOptions { type Sender = ManualSender; async fn build(&self, chain_info: &Arc) -> Result { - let grpc_channel = GrpcChannel::from_chain_info(chain_info.as_ref()).await; + let grpc_channel = GrpcChannel::from_chain_info(chain_info.as_ref()).await?; Ok(ManualSender { chain_info: chain_info.clone(), sender: Addr::unchecked(self.sender_address.clone()), diff --git a/cw-orch-daemon/src/channel.rs b/cw-orch-daemon/src/channel.rs index 8863e3e3f..0d091e072 100644 --- a/cw-orch-daemon/src/channel.rs +++ b/cw-orch-daemon/src/channel.rs @@ -4,7 +4,7 @@ use cosmrs::proto::cosmos::base::tendermint::v1beta1::{ use cw_orch_core::{environment::ChainInfoOwned, log::connectivity_target}; use http::Uri; use tonic::transport::{ClientTlsConfig, Endpoint}; -use tower::ServiceBuilder; +use tower::{MakeService, ServiceBuilder}; use super::error::DaemonError; use crate::service::reconnect::{ChannelCreationArgs, ChannelFactory, Reconnect}; @@ -82,13 +82,18 @@ impl GrpcChannel { Ok(service) } - pub async fn connect(grpc: &[String], chain_id: &str) -> Channel { - let channel = Reconnect::new(ChannelFactory {}, (grpc.to_vec(), chain_id.to_string())); - channel.clone() + pub async fn connect(grpc: &[String], chain_id: &str) -> Result { + // We construct a channel using the factory + let target = (grpc.to_vec(), chain_id.to_string()); + let channel = ChannelFactory {}.make_service(target.clone()).await?; + + // Then we create the reconnect service + let channel = Reconnect::with_connection(channel, ChannelFactory {}, target); + Ok(channel) } /// Create a gRPC channel from the chain info - pub async fn from_chain_info(chain_info: &ChainInfoOwned) -> Channel { + pub async fn from_chain_info(chain_info: &ChainInfoOwned) -> Result { GrpcChannel::connect(&chain_info.grpc_urls, &chain_info.chain_id).await } } diff --git a/cw-orch-daemon/src/live_mock.rs b/cw-orch-daemon/src/live_mock.rs index 1ba947823..3aef80109 100644 --- a/cw-orch-daemon/src/live_mock.rs +++ b/cw-orch-daemon/src/live_mock.rs @@ -191,10 +191,12 @@ impl WasmMockQuerier { impl WasmMockQuerier { /// Creates a querier from chain information pub fn new(chain: ChainInfoOwned) -> Self { - let channel = RUNTIME.block_on(GrpcChannel::connect( - &chain.grpc_urls, - chain.chain_id.as_str(), - )); + let channel = RUNTIME + .block_on(GrpcChannel::connect( + &chain.grpc_urls, + chain.chain_id.as_str(), + )) + .unwrap(); WasmMockQuerier { channel } } diff --git a/cw-orch-daemon/src/senders/cosmos.rs b/cw-orch-daemon/src/senders/cosmos.rs index 4950fda63..2126e5ea8 100644 --- a/cw-orch-daemon/src/senders/cosmos.rs +++ b/cw-orch-daemon/src/senders/cosmos.rs @@ -98,7 +98,7 @@ impl Wallet { Ok(Self { chain_info: chain_info.clone(), - grpc_channel: GrpcChannel::from_chain_info(chain_info.as_ref()).await, + grpc_channel: GrpcChannel::from_chain_info(chain_info.as_ref()).await?, private_key: pk, secp, options, diff --git a/cw-orch-daemon/src/senders/query_only.rs b/cw-orch-daemon/src/senders/query_only.rs index 638c4b58d..0bc74fadc 100644 --- a/cw-orch-daemon/src/senders/query_only.rs +++ b/cw-orch-daemon/src/senders/query_only.rs @@ -26,7 +26,7 @@ impl SenderBuilder for () { type Sender = QueryOnlySender; async fn build(&self, chain_info: &Arc) -> Result { - let channel = GrpcChannel::from_chain_info(chain_info.as_ref()).await; + let channel = GrpcChannel::from_chain_info(chain_info.as_ref()).await?; Ok(QueryOnlySender { channel, diff --git a/cw-orch-daemon/tests/querier.rs b/cw-orch-daemon/tests/querier.rs index 87f9c3b3d..783669189 100644 --- a/cw-orch-daemon/tests/querier.rs +++ b/cw-orch-daemon/tests/querier.rs @@ -30,7 +30,9 @@ mod queriers { let grpcs = vec![network.grpc_urls[0].into()]; - GrpcChannel::connect(&grpcs, network.chain_id).await + let channel = GrpcChannel::connect(&grpcs, network.chain_id).await; + + channel.unwrap() } /* diff --git a/packages/clone-testing/src/core.rs b/packages/clone-testing/src/core.rs index ccf2f1aeb..233bc1131 100644 --- a/packages/clone-testing/src/core.rs +++ b/packages/clone-testing/src/core.rs @@ -238,7 +238,7 @@ impl CloneTesting { let bank = BankKeeper::new().with_remote(remote_channel.clone()); // We update the block_height, and open a second channel just for that (to make sure we are not too dependent on clone-cw-multi-test deps) - let node_channel = rt.block_on(GrpcChannel::from_chain_info(&chain)); + let node_channel = rt.block_on(GrpcChannel::from_chain_info(&chain))?; let block_info = remote_channel .rt .block_on(Node::new_async(node_channel)._block_info()) diff --git a/packages/interchain/interchain-daemon/src/packet_inspector.rs b/packages/interchain/interchain-daemon/src/packet_inspector.rs index ca9d85c90..0f0260bf0 100644 --- a/packages/interchain/interchain-daemon/src/packet_inspector.rs +++ b/packages/interchain/interchain-daemon/src/packet_inspector.rs @@ -176,7 +176,7 @@ impl PacketInspector { } else { // If no custom channel was registered, we try to get it from the registry let chain_data: ChainInfoOwned = parse_network(chain_id).unwrap().into(); // TODO, no unwrap here ? - Ok(GrpcChannel::connect(&chain_data.grpc_urls, chain_id).await) + Ok(GrpcChannel::connect(&chain_data.grpc_urls, chain_id).await?) } } From 960e29085408e97c29926230586646ce7c3a1792 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Mon, 4 Nov 2024 15:17:30 +0000 Subject: [PATCH 8/9] Added Attempts --- cw-orch-daemon/examples/querier-daemon.rs | 6 ++- cw-orch-daemon/src/channel.rs | 19 ++++--- .../src/service/reconnect/future.rs | 10 ++++ cw-orch-daemon/src/service/reconnect/mod.rs | 51 +++++++++++++++---- .../src/service/retry/implementation.rs | 42 +++++++++++++-- 5 files changed, 107 insertions(+), 21 deletions(-) diff --git a/cw-orch-daemon/examples/querier-daemon.rs b/cw-orch-daemon/examples/querier-daemon.rs index c6690c850..cb068b33f 100644 --- a/cw-orch-daemon/examples/querier-daemon.rs +++ b/cw-orch-daemon/examples/querier-daemon.rs @@ -1,5 +1,7 @@ // ANCHOR: full_counter_example +use std::{thread::sleep, time::Duration}; + use cw_orch::{anyhow, prelude::*}; use cw_orch_daemon::senders::QueryOnlyDaemon; @@ -18,9 +20,11 @@ pub fn main() -> anyhow::Result<()> { .balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?; assert!(!balances.is_empty()); + sleep(Duration::from_secs(10)); + log::info!("Resuming queries"); let balances = chain .bank_querier() - .balance(&Addr::unchecked("faulty-address"), None)?; + .balance(&Addr::unchecked(LOCAL_JUNO_SENDER), None)?; assert!(!balances.is_empty()); Ok(()) diff --git a/cw-orch-daemon/src/channel.rs b/cw-orch-daemon/src/channel.rs index 0d091e072..1a0a2836d 100644 --- a/cw-orch-daemon/src/channel.rs +++ b/cw-orch-daemon/src/channel.rs @@ -4,7 +4,7 @@ use cosmrs::proto::cosmos::base::tendermint::v1beta1::{ use cw_orch_core::{environment::ChainInfoOwned, log::connectivity_target}; use http::Uri; use tonic::transport::{ClientTlsConfig, Endpoint}; -use tower::{MakeService, ServiceBuilder}; +use tower::ServiceBuilder; use super::error::DaemonError; use crate::service::reconnect::{ChannelCreationArgs, ChannelFactory, Reconnect}; @@ -72,7 +72,7 @@ impl GrpcChannel { return Err(DaemonError::CannotConnectGRPC); } - let retry_policy = Attempts(3); + let retry_policy = Attempts::Count(3); let retry_layer = RetryLayer::new(retry_policy); let service = ServiceBuilder::new() @@ -83,12 +83,9 @@ impl GrpcChannel { } pub async fn connect(grpc: &[String], chain_id: &str) -> Result { - // We construct a channel using the factory let target = (grpc.to_vec(), chain_id.to_string()); - let channel = ChannelFactory {}.make_service(target.clone()).await?; - - // Then we create the reconnect service - let channel = Reconnect::with_connection(channel, ChannelFactory {}, target); + let channel = Reconnect::new(ChannelFactory {}, target).with_attemps(3); + Self::verify_connection(channel.clone()).await?; Ok(channel) } @@ -96,6 +93,14 @@ impl GrpcChannel { pub async fn from_chain_info(chain_info: &ChainInfoOwned) -> Result { GrpcChannel::connect(&chain_info.grpc_urls, &chain_info.chain_id).await } + + async fn verify_connection(channel: Channel) -> Result<(), DaemonError> { + let mut client = ServiceClient::new(channel.clone()); + + // Verify that we're able to query the node info + client.get_node_info(GetNodeInfoRequest {}).await?; + Ok(()) + } } #[cfg(test)] diff --git a/cw-orch-daemon/src/service/reconnect/future.rs b/cw-orch-daemon/src/service/reconnect/future.rs index cd28666e9..b0f2a7d04 100644 --- a/cw-orch-daemon/src/service/reconnect/future.rs +++ b/cw-orch-daemon/src/service/reconnect/future.rs @@ -33,6 +33,10 @@ impl Inner { fn future(fut: F) -> Self { Self::Future { fut } } + + fn error(error: Option) -> Self { + Self::Error { error } + } } impl ResponseFuture { @@ -41,6 +45,12 @@ impl ResponseFuture { inner: Inner::future(inner), } } + + pub(crate) fn error(error: E) -> Self { + ResponseFuture { + inner: Inner::error(Some(error)), + } + } } impl Future for ResponseFuture diff --git a/cw-orch-daemon/src/service/reconnect/mod.rs b/cw-orch-daemon/src/service/reconnect/mod.rs index 2a55e75b9..afaaa7461 100644 --- a/cw-orch-daemon/src/service/reconnect/mod.rs +++ b/cw-orch-daemon/src/service/reconnect/mod.rs @@ -1,11 +1,11 @@ pub mod factory; pub mod future; use future::ResponseFuture; -use log::trace; -use std::fmt; +use log::{debug, trace}; use std::sync::{Arc, Mutex}; use std::thread::sleep; use std::time::Duration; +use std::{fmt, mem}; use std::{ future::Future, pin::Pin, @@ -16,6 +16,8 @@ use tower::{BoxError, Service}; pub use factory::ChannelCreationArgs; pub use factory::ChannelFactory; +use super::retry::Attempts; + /// Reconnect to failed services. pub struct Reconnect where @@ -29,6 +31,7 @@ where #[allow(clippy::type_complexity)] state: Arc>>, target: Target, + attempts: Attempts, } impl Clone for Reconnect @@ -45,6 +48,7 @@ where mk_service: self.mk_service.clone(), state: self.state.clone(), target: self.target.clone(), + attempts: self.attempts.clone(), } } } @@ -57,6 +61,15 @@ enum State { Connected(S), } +impl State { + pub(crate) fn unwrap_err(self) -> E { + match self { + State::Error(e) => e, + _ => panic!("Not error"), + } + } +} + impl Reconnect where M: Service, @@ -70,6 +83,7 @@ where mk_service, state: Arc::new(Mutex::new(State::Idle)), target, + attempts: Attempts::Unlimited, } } @@ -79,8 +93,14 @@ where mk_service, state: Arc::new(Mutex::new(State::Connected(init_conn))), target, + attempts: Attempts::Unlimited, } } + + pub fn with_attemps(mut self, attempts: usize) -> Self { + self.attempts = Attempts::Count(attempts); + self + } } impl Service for Reconnect @@ -110,7 +130,6 @@ where return Poll::Pending; } } - let fut = self.mk_service.call(self.target.clone()); drop(state); self.state = Arc::new(Mutex::new(State::Connecting(fut))); @@ -128,10 +147,20 @@ where return Poll::Pending; } Poll::Ready(Err(e)) => { - trace!("poll_ready; error, retrying in {} seconds", 5); drop(state); - self.state = Arc::new(Mutex::new(State::Error(e))); - sleep(Duration::from_secs(5)); + if self.attempts.retry() { + trace!("poll_ready; error"); + debug!( + "Connection error, retrying in {} seconds, {} attemps left", + 5, self.attempts + ); + self.state = Arc::new(Mutex::new(State::Error(e))); + sleep(Duration::from_secs(5)); + } else { + self.state = Arc::new(Mutex::new(State::Error(e))); + + break; + } } } } @@ -156,16 +185,18 @@ where } } } + + Poll::Ready(Ok(())) } fn call(&mut self, request: Request) -> Self::Future { let mut state = self.state.lock().unwrap(); let service = match &mut *state { State::Connected(ref mut service) => service, - State::Error(error) => panic!( - "service not ready; poll_ready must be called first: {:?}", - error - ), + State::Error(_) => { + let state = mem::replace(&mut *state, State::Idle); + return ResponseFuture::error(state.unwrap_err()); + } _ => panic!("service not ready; poll_ready must be called first"), }; diff --git a/cw-orch-daemon/src/service/retry/implementation.rs b/cw-orch-daemon/src/service/retry/implementation.rs index 7ea3d0077..7652146f6 100644 --- a/cw-orch-daemon/src/service/retry/implementation.rs +++ b/cw-orch-daemon/src/service/retry/implementation.rs @@ -10,7 +10,44 @@ type Req = http::Request; type Res = http::Response; #[derive(Clone)] -pub struct Attempts(pub usize); +pub enum Attempts { + Unlimited, + Count(usize), +} + +impl std::fmt::Display for Attempts { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Attempts::Unlimited => write!(f, "unlimited")?, + Attempts::Count(count) => write!(f, "{}", count)?, + } + Ok(()) + } +} + +impl Attempts { + pub fn can_retry(&self) -> bool { + match self { + Attempts::Unlimited => true, + Attempts::Count(count) => *count > 0, + } + } + + /// Verifies the attempt can retry + /// If it can retry, decrements the counter + pub fn retry(&mut self) -> bool { + let can_retry = self.can_retry(); + if can_retry { + self.decrement(); + } + can_retry + } + fn decrement(&mut self) { + if let Attempts::Count(count) = self { + *count -= 1 + } + } +} impl Policy for Attempts { type Future = future::Ready<()>; @@ -27,10 +64,9 @@ impl Policy for Attempts { log::trace!("Entering the middleware error"); // Treat all errors as failures... // But we limit the number of attempts... - if self.0 > 0 { + if self.retry() { log::trace!("Try this again, there was a failure"); // Try again! - self.0 -= 1; Some(future::ready(())) } else { // Used all our attempts, no retry... From fbcbca1cde390568faa5bca4bfc68dae0a370fd4 Mon Sep 17 00:00:00 2001 From: Kayanski Date: Mon, 4 Nov 2024 15:23:59 +0000 Subject: [PATCH 9/9] Attempts separation --- cw-orch-daemon/src/channel.rs | 6 +-- cw-orch-daemon/src/service/attempts.rs | 39 +++++++++++++++ cw-orch-daemon/src/service/mod.rs | 1 + cw-orch-daemon/src/service/reconnect/mod.rs | 2 +- .../src/service/retry/implementation.rs | 48 ++++++------------- cw-orch-daemon/src/service/retry/mod.rs | 2 +- 6 files changed, 59 insertions(+), 39 deletions(-) create mode 100644 cw-orch-daemon/src/service/attempts.rs diff --git a/cw-orch-daemon/src/channel.rs b/cw-orch-daemon/src/channel.rs index 1a0a2836d..c1b2b7abe 100644 --- a/cw-orch-daemon/src/channel.rs +++ b/cw-orch-daemon/src/channel.rs @@ -8,13 +8,13 @@ use tower::ServiceBuilder; use super::error::DaemonError; use crate::service::reconnect::{ChannelCreationArgs, ChannelFactory, Reconnect}; -use crate::service::retry::{Attempts, Retry, RetryLayer}; +use crate::service::retry::{Retry, RetryAttemps, RetryLayer}; /// A helper for constructing a gRPC channel pub struct GrpcChannel {} pub type Channel = Reconnect; -pub type TowerChannel = Retry; +pub type TowerChannel = Retry; impl GrpcChannel { /// Connect to any of the provided gRPC endpoints @@ -72,7 +72,7 @@ impl GrpcChannel { return Err(DaemonError::CannotConnectGRPC); } - let retry_policy = Attempts::Count(3); + let retry_policy = RetryAttemps::count(3); let retry_layer = RetryLayer::new(retry_policy); let service = ServiceBuilder::new() diff --git a/cw-orch-daemon/src/service/attempts.rs b/cw-orch-daemon/src/service/attempts.rs new file mode 100644 index 000000000..b5107a79e --- /dev/null +++ b/cw-orch-daemon/src/service/attempts.rs @@ -0,0 +1,39 @@ +#[derive(Clone)] +pub enum Attempts { + Unlimited, + Count(usize), +} + +impl std::fmt::Display for Attempts { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Attempts::Unlimited => write!(f, "unlimited")?, + Attempts::Count(count) => write!(f, "{}", count)?, + } + Ok(()) + } +} + +impl Attempts { + pub fn can_retry(&self) -> bool { + match self { + Attempts::Unlimited => true, + Attempts::Count(count) => *count > 0, + } + } + + /// Verifies the attempt can retry + /// If it can retry, decrements the counter + pub fn retry(&mut self) -> bool { + let can_retry = self.can_retry(); + if can_retry { + self.decrement(); + } + can_retry + } + fn decrement(&mut self) { + if let Attempts::Count(count) = self { + *count -= 1 + } + } +} diff --git a/cw-orch-daemon/src/service/mod.rs b/cw-orch-daemon/src/service/mod.rs index 66056d332..3effe0a2d 100644 --- a/cw-orch-daemon/src/service/mod.rs +++ b/cw-orch-daemon/src/service/mod.rs @@ -1,2 +1,3 @@ +pub mod attempts; pub mod reconnect; pub mod retry; diff --git a/cw-orch-daemon/src/service/reconnect/mod.rs b/cw-orch-daemon/src/service/reconnect/mod.rs index afaaa7461..e497d0c6e 100644 --- a/cw-orch-daemon/src/service/reconnect/mod.rs +++ b/cw-orch-daemon/src/service/reconnect/mod.rs @@ -16,7 +16,7 @@ use tower::{BoxError, Service}; pub use factory::ChannelCreationArgs; pub use factory::ChannelFactory; -use super::retry::Attempts; +use super::attempts::Attempts; /// Reconnect to failed services. pub struct Reconnect diff --git a/cw-orch-daemon/src/service/retry/implementation.rs b/cw-orch-daemon/src/service/retry/implementation.rs index 7652146f6..4ac0c14e3 100644 --- a/cw-orch-daemon/src/service/retry/implementation.rs +++ b/cw-orch-daemon/src/service/retry/implementation.rs @@ -1,3 +1,5 @@ +use crate::service::attempts::Attempts; + use super::Policy; use futures::TryStreamExt; use futures_util::future; @@ -10,46 +12,24 @@ type Req = http::Request; type Res = http::Response; #[derive(Clone)] -pub enum Attempts { - Unlimited, - Count(usize), -} +pub struct RetryAttemps(pub Attempts); -impl std::fmt::Display for Attempts { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Attempts::Unlimited => write!(f, "unlimited")?, - Attempts::Count(count) => write!(f, "{}", count)?, - } - Ok(()) +impl RetryAttemps { + pub fn unlimited() -> Self { + Self(Attempts::Unlimited) } -} - -impl Attempts { - pub fn can_retry(&self) -> bool { - match self { - Attempts::Unlimited => true, - Attempts::Count(count) => *count > 0, - } + pub fn count(count: usize) -> Self { + Self(Attempts::Count(count)) } +} - /// Verifies the attempt can retry - /// If it can retry, decrements the counter - pub fn retry(&mut self) -> bool { - let can_retry = self.can_retry(); - if can_retry { - self.decrement(); - } - can_retry - } - fn decrement(&mut self) { - if let Attempts::Count(count) = self { - *count -= 1 - } +impl From for RetryAttemps { + fn from(value: Attempts) -> Self { + Self(value) } } -impl Policy for Attempts { +impl Policy for RetryAttemps { type Future = future::Ready<()>; fn retry(&mut self, _req: &mut Req, result: &mut Result) -> Option { @@ -64,7 +44,7 @@ impl Policy for Attempts { log::trace!("Entering the middleware error"); // Treat all errors as failures... // But we limit the number of attempts... - if self.retry() { + if self.0.retry() { log::trace!("Try this again, there was a failure"); // Try again! Some(future::ready(())) diff --git a/cw-orch-daemon/src/service/retry/mod.rs b/cw-orch-daemon/src/service/retry/mod.rs index 4dd8dc797..af466be4d 100644 --- a/cw-orch-daemon/src/service/retry/mod.rs +++ b/cw-orch-daemon/src/service/retry/mod.rs @@ -2,7 +2,7 @@ pub mod future; pub mod implementation; pub mod layer; pub mod policy; -pub use implementation::Attempts; +pub use implementation::RetryAttemps; pub use layer::RetryLayer; pub use policy::Policy;