From 3d17fbb899c530faf5e0411b06ca4ceac58cd892 Mon Sep 17 00:00:00 2001 From: Daniel Savu <23065004+daniel-savu@users.noreply.github.com> Date: Mon, 5 Feb 2024 19:11:57 +0000 Subject: [PATCH] Cosmos grpc fallbackprovider (#3139) ### Description Implements grpc fallback provider logic for cosmos - initially tried implementing the fallback provider deprioritization logic at middleware level like in the EVM. The difference between ethers and cosmrs is that in the latter, middleware can only live at the transport layer (`tower` crate level). - based on this github [issue](https://github.com/hyperium/tonic/issues/733), that actually doesn't look possible, because the http::Request type isn't `Clone` so it can't be submitted to multiple providers - ended up implementing the fallback provider at the application layer, by keeping an array of grpc channels - There is now a `call` method in `hyperlane_core::FallbackProvider` which I'm actually really happy with. This method handles the fallbackprovider-specific logic by taking in an async closure, running it on each provider, and iterating providers if the closure call fails. In `grpc.rs` you can see how this is slightly verbose but I think it's quite manageable. The only part that bugs me is having to duplicate `Pin::from(Box::from(future))`, but that's need afaict because the regular closure returns an anonymous type - adds `grpcUrls` and `customGrpcUrls` config items - tests the cosmos fallback provider e2e ### Drive-by changes ### Related issues - Fixes: https://github.com/hyperlane-xyz/issues/issues/998 ### Backward compatibility ### Testing --- rust/Cargo.lock | 22 +- rust/Cargo.toml | 2 +- rust/agents/relayer/Cargo.toml | 2 +- rust/agents/validator/Cargo.toml | 2 +- rust/chains/hyperlane-cosmos/Cargo.toml | 3 +- rust/chains/hyperlane-cosmos/src/error.rs | 7 + rust/chains/hyperlane-cosmos/src/lib.rs | 1 + .../src/payloads/aggregate_ism.rs | 4 +- .../hyperlane-cosmos/src/payloads/general.rs | 2 +- .../src/payloads/ism_routes.rs | 12 +- .../hyperlane-cosmos/src/payloads/mailbox.rs | 20 +- .../src/payloads/merkle_tree_hook.rs | 8 +- .../src/payloads/multisig_ism.rs | 4 +- .../src/payloads/validator_announce.rs | 6 +- .../hyperlane-cosmos/src/providers/grpc.rs | 349 ++++++++++++------ .../src/rpc_clients/fallback.rs | 140 +++++++ .../hyperlane-cosmos/src/rpc_clients/mod.rs | 3 + .../hyperlane-cosmos/src/trait_builder.rs | 11 +- rust/chains/hyperlane-ethereum/Cargo.toml | 2 +- .../src/rpc_clients/fallback.rs | 119 +++--- .../hyperlane-ethereum/src/trait_builder.rs | 9 +- rust/chains/hyperlane-fuel/Cargo.toml | 2 +- rust/chains/hyperlane-sealevel/Cargo.toml | 2 +- rust/config/mainnet3_config.json | 8 +- rust/ethers-prometheus/src/json_rpc_client.rs | 6 +- .../templates/external-secret.yaml | 6 +- .../src/settings/parser/connection_parser.rs | 19 +- .../hyperlane-base/src/settings/parser/mod.rs | 106 ++++-- rust/hyperlane-core/Cargo.toml | 2 + .../src/rpc_clients/fallback.rs | 171 ++++++++- rust/hyperlane-core/src/rpc_clients/mod.rs | 6 +- rust/utils/abigen/src/lib.rs | 1 + rust/utils/run-locally/src/cosmos/types.rs | 12 +- typescript/sdk/src/metadata/agentConfig.ts | 2 +- .../sdk/src/metadata/chainMetadataTypes.ts | 6 + 35 files changed, 766 insertions(+), 311 deletions(-) create mode 100644 rust/chains/hyperlane-cosmos/src/rpc_clients/fallback.rs create mode 100644 rust/chains/hyperlane-cosmos/src/rpc_clients/mod.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8f3bf3e33ad..ae5e616edf4 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -4145,7 +4145,7 @@ dependencies = [ "hyperlane-fuel", "hyperlane-sealevel", "hyperlane-test", - "itertools 0.11.0", + "itertools 0.12.0", "maplit", "paste", "prometheus", @@ -4192,7 +4192,7 @@ dependencies = [ "fixed-hash 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "getrandom 0.2.11", "hex 0.4.3", - "itertools 0.11.0", + "itertools 0.12.0", "num 0.4.1", "num-derive 0.4.1", "num-traits", @@ -4226,6 +4226,7 @@ dependencies = [ "hyperlane-core", "injective-protobuf", "injective-std", + "itertools 0.12.0", "once_cell", "protobuf", "ripemd", @@ -4438,7 +4439,7 @@ dependencies = [ "hyperlane-core", "hyperlane-sealevel-interchain-security-module-interface", "hyperlane-sealevel-message-recipient-interface", - "itertools 0.11.0", + "itertools 0.12.0", "log", "num-derive 0.4.1", "num-traits", @@ -4464,7 +4465,7 @@ dependencies = [ "hyperlane-sealevel-test-ism", "hyperlane-sealevel-test-send-receiver", "hyperlane-test-utils", - "itertools 0.11.0", + "itertools 0.12.0", "log", "num-derive 0.4.1", "num-traits", @@ -4981,15 +4982,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.12.0" @@ -6918,7 +6910,7 @@ dependencies = [ "hyperlane-core", "hyperlane-ethereum", "hyperlane-test", - "itertools 0.11.0", + "itertools 0.12.0", "num-derive 0.4.1", "num-traits", "once_cell", @@ -7565,7 +7557,7 @@ dependencies = [ "hyperlane-base", "hyperlane-core", "hyperlane-test", - "itertools 0.11.0", + "itertools 0.12.0", "migration", "num-bigint 0.4.4", "prometheus", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 1310918ed4e..1c9a2e2ff49 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -97,7 +97,7 @@ hyper = "0.14" hyper-tls = "0.5.0" injective-protobuf = "0.2.2" injective-std = "0.1.5" -itertools = "0.11.0" +itertools = "*" jobserver = "=0.1.26" jsonrpc-core = "18.0" k256 = { version = "0.13.1", features = ["std", "ecdsa"] } diff --git a/rust/agents/relayer/Cargo.toml b/rust/agents/relayer/Cargo.toml index 0bd5697971e..d6eb28a153d 100644 --- a/rust/agents/relayer/Cargo.toml +++ b/rust/agents/relayer/Cargo.toml @@ -34,7 +34,7 @@ tokio = { workspace = true, features = ["rt", "macros", "parking_lot"] } tracing-futures.workspace = true tracing.workspace = true -hyperlane-core = { path = "../../hyperlane-core", features = ["agent"] } +hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "fallback-provider"] } hyperlane-base = { path = "../../hyperlane-base" } hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" } diff --git a/rust/agents/validator/Cargo.toml b/rust/agents/validator/Cargo.toml index f562938db2f..bfea4c16a34 100644 --- a/rust/agents/validator/Cargo.toml +++ b/rust/agents/validator/Cargo.toml @@ -24,7 +24,7 @@ tokio = { workspace = true, features = ["rt", "macros", "parking_lot"] } tracing-futures.workspace = true tracing.workspace = true -hyperlane-core = { path = "../../hyperlane-core", features = ["agent"] } +hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "fallback-provider"] } hyperlane-base = { path = "../../hyperlane-base" } hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" } hyperlane-cosmos = { path = "../../chains/hyperlane-cosmos" } diff --git a/rust/chains/hyperlane-cosmos/Cargo.toml b/rust/chains/hyperlane-cosmos/Cargo.toml index b48aa3590d6..6115a61e6d5 100644 --- a/rust/chains/hyperlane-cosmos/Cargo.toml +++ b/rust/chains/hyperlane-cosmos/Cargo.toml @@ -22,6 +22,7 @@ hyper = { workspace = true } hyper-tls = { workspace = true } injective-protobuf = { workspace = true } injective-std = { workspace = true } +itertools = { workspace = true } once_cell = { workspace = true } protobuf = { workspace = true } ripemd = { workspace = true } @@ -38,4 +39,4 @@ tracing = { workspace = true } tracing-futures = { workspace = true } url = { workspace = true } -hyperlane-core = { path = "../../hyperlane-core" } +hyperlane-core = { path = "../../hyperlane-core", features = ["fallback-provider"]} diff --git a/rust/chains/hyperlane-cosmos/src/error.rs b/rust/chains/hyperlane-cosmos/src/error.rs index 2c3e1e7475c..06fffaff7e5 100644 --- a/rust/chains/hyperlane-cosmos/src/error.rs +++ b/rust/chains/hyperlane-cosmos/src/error.rs @@ -1,5 +1,6 @@ use cosmrs::proto::prost; use hyperlane_core::ChainCommunicationError; +use std::fmt::Debug; /// Errors from the crates specific to the hyperlane-cosmos /// implementation. @@ -28,6 +29,9 @@ pub enum HyperlaneCosmosError { /// Tonic error #[error("{0}")] Tonic(#[from] tonic::transport::Error), + /// Tonic codegen error + #[error("{0}")] + TonicGenError(#[from] tonic::codegen::StdError), /// Tendermint RPC Error #[error(transparent)] TendermintError(#[from] tendermint_rpc::error::Error), @@ -37,6 +41,9 @@ pub enum HyperlaneCosmosError { /// Protobuf error #[error("{0}")] Protobuf(#[from] protobuf::ProtobufError), + /// Fallback providers failed + #[error("Fallback providers failed. (Errors: {0:?})")] + FallbackProvidersFailed(Vec), } impl From for ChainCommunicationError { diff --git a/rust/chains/hyperlane-cosmos/src/lib.rs b/rust/chains/hyperlane-cosmos/src/lib.rs index 82a4a0ece17..c0ce3ad5498 100644 --- a/rust/chains/hyperlane-cosmos/src/lib.rs +++ b/rust/chains/hyperlane-cosmos/src/lib.rs @@ -16,6 +16,7 @@ mod multisig_ism; mod payloads; mod providers; mod routing_ism; +mod rpc_clients; mod signers; mod trait_builder; mod types; diff --git a/rust/chains/hyperlane-cosmos/src/payloads/aggregate_ism.rs b/rust/chains/hyperlane-cosmos/src/payloads/aggregate_ism.rs index 8276675ff7c..23bb35a8f85 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/aggregate_ism.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/aggregate_ism.rs @@ -1,11 +1,11 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct VerifyRequest { pub verify: VerifyRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct VerifyRequestInner { pub metadata: String, pub message: String, diff --git a/rust/chains/hyperlane-cosmos/src/payloads/general.rs b/rust/chains/hyperlane-cosmos/src/payloads/general.rs index 488cae2d379..af2a4b0b6e0 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/general.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/general.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct EmptyStruct {} #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/rust/chains/hyperlane-cosmos/src/payloads/ism_routes.rs b/rust/chains/hyperlane-cosmos/src/payloads/ism_routes.rs index 052a1cc48b1..1f659840e4e 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/ism_routes.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/ism_routes.rs @@ -1,12 +1,12 @@ use super::general::EmptyStruct; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct IsmRouteRequest { pub route: IsmRouteRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct IsmRouteRequestInner { pub message: String, // hexbinary } @@ -16,22 +16,22 @@ pub struct IsmRouteRespnose { pub ism: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct QueryRoutingIsmGeneralRequest { pub routing_ism: T, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct QueryRoutingIsmRouteResponse { pub ism: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct QueryIsmGeneralRequest { pub ism: T, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct QueryIsmModuleTypeRequest { pub module_type: EmptyStruct, } diff --git a/rust/chains/hyperlane-cosmos/src/payloads/mailbox.rs b/rust/chains/hyperlane-cosmos/src/payloads/mailbox.rs index 145ba5b16c4..75eef04595e 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/mailbox.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/mailbox.rs @@ -3,52 +3,52 @@ use serde::{Deserialize, Serialize}; use super::general::EmptyStruct; // Requests -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct GeneralMailboxQuery { pub mailbox: T, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct CountRequest { pub count: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct NonceRequest { pub nonce: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RecipientIsmRequest { pub recipient_ism: RecipientIsmRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RecipientIsmRequestInner { pub recipient_addr: String, // hexbinary } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DefaultIsmRequest { pub default_ism: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DeliveredRequest { pub message_delivered: DeliveredRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DeliveredRequestInner { pub id: String, // hexbinary } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ProcessMessageRequest { pub process: ProcessMessageRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct ProcessMessageRequestInner { pub metadata: String, pub message: String, diff --git a/rust/chains/hyperlane-cosmos/src/payloads/merkle_tree_hook.rs b/rust/chains/hyperlane-cosmos/src/payloads/merkle_tree_hook.rs index 7635f0ef72a..e9606287715 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/merkle_tree_hook.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/merkle_tree_hook.rs @@ -4,24 +4,24 @@ use super::general::EmptyStruct; const TREE_DEPTH: usize = 32; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct MerkleTreeGenericRequest { pub merkle_hook: T, } // --------- Requests --------- -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct MerkleTreeRequest { pub tree: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct MerkleTreeCountRequest { pub count: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct CheckPointRequest { pub check_point: EmptyStruct, } diff --git a/rust/chains/hyperlane-cosmos/src/payloads/multisig_ism.rs b/rust/chains/hyperlane-cosmos/src/payloads/multisig_ism.rs index 204e726dc7c..c56588d1d67 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/multisig_ism.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/multisig_ism.rs @@ -1,11 +1,11 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct VerifyInfoRequest { pub verify_info: VerifyInfoRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct VerifyInfoRequestInner { pub message: String, // hexbinary } diff --git a/rust/chains/hyperlane-cosmos/src/payloads/validator_announce.rs b/rust/chains/hyperlane-cosmos/src/payloads/validator_announce.rs index fdf449c7c44..cf4e5eb1f8a 100644 --- a/rust/chains/hyperlane-cosmos/src/payloads/validator_announce.rs +++ b/rust/chains/hyperlane-cosmos/src/payloads/validator_announce.rs @@ -2,17 +2,17 @@ use serde::{Deserialize, Serialize}; use super::general::EmptyStruct; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct GetAnnouncedValidatorsRequest { pub get_announced_validators: EmptyStruct, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct GetAnnounceStorageLocationsRequest { pub get_announce_storage_locations: GetAnnounceStorageLocationsRequestInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct GetAnnounceStorageLocationsRequestInner { pub validators: Vec, } diff --git a/rust/chains/hyperlane-cosmos/src/providers/grpc.rs b/rust/chains/hyperlane-cosmos/src/providers/grpc.rs index 3594c7398e8..a6bc070abac 100644 --- a/rust/chains/hyperlane-cosmos/src/providers/grpc.rs +++ b/rust/chains/hyperlane-cosmos/src/providers/grpc.rs @@ -24,7 +24,9 @@ use cosmrs::{ tx::{self, Fee, MessageExt, SignDoc, SignerInfo}, Any, Coin, }; +use derive_new::new; use hyperlane_core::{ + rpc_clients::{BlockNumberGetter, FallbackProvider}, ChainCommunicationError, ChainResult, ContractLocator, FixedPointNumber, HyperlaneDomain, U256, }; use protobuf::Message as _; @@ -33,9 +35,10 @@ use tonic::{ transport::{Channel, Endpoint}, GrpcMethod, IntoRequest, }; +use url::Url; -use crate::HyperlaneCosmosError; use crate::{address::CosmosAddress, CosmosAmount}; +use crate::{rpc_clients::CosmosFallbackProvider, HyperlaneCosmosError}; use crate::{signers::Signer, ConnectionConf}; /// A multiplier applied to a simulated transaction's gas usage to @@ -45,6 +48,36 @@ const GAS_ESTIMATE_MULTIPLIER: f64 = 1.25; /// be valid for. const TIMEOUT_BLOCKS: u64 = 1000; +#[derive(Debug, Clone, new)] +struct CosmosChannel { + channel: Channel, + /// The url that this channel is connected to. + /// Not explicitly used, but useful for debugging. + _url: Url, +} + +#[async_trait] +impl BlockNumberGetter for CosmosChannel { + async fn get_block_number(&self) -> Result { + let mut client = ServiceClient::new(self.channel.clone()); + let request = tonic::Request::new(GetLatestBlockRequest {}); + + let response = client + .get_latest_block(request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + let height = response + .block + .ok_or_else(|| ChainCommunicationError::from_other_str("block not present"))? + .header + .ok_or_else(|| ChainCommunicationError::from_other_str("header not present"))? + .height; + + Ok(height as u64) + } +} + #[async_trait] /// Cosmwasm GRPC Provider pub trait WasmProvider: Send + Sync { @@ -56,14 +89,14 @@ pub trait WasmProvider: Send + Sync { async fn latest_block_height(&self) -> ChainResult; /// Perform a wasm query against the stored contract address. - async fn wasm_query( + async fn wasm_query( &self, payload: T, block_height: Option, ) -> ChainResult>; /// Perform a wasm query against a specified contract address. - async fn wasm_query_to( + async fn wasm_query_to( &self, to: String, payload: T, @@ -71,14 +104,17 @@ pub trait WasmProvider: Send + Sync { ) -> ChainResult>; /// Send a wasm tx. - async fn wasm_send( + async fn wasm_send( &self, payload: T, gas_limit: Option, ) -> ChainResult; /// Estimate gas for a wasm tx. - async fn wasm_estimate_gas(&self, payload: T) -> ChainResult; + async fn wasm_estimate_gas( + &self, + payload: T, + ) -> ChainResult; } #[derive(Debug, Clone)] @@ -95,7 +131,7 @@ pub struct WasmGrpcProvider { signer: Option, /// GRPC Channel that can be cheaply cloned. /// See `` - channel: Channel, + provider: CosmosFallbackProvider, gas_price: CosmosAmount, } @@ -108,9 +144,21 @@ impl WasmGrpcProvider { locator: Option, signer: Option, ) -> ChainResult { - let endpoint = - Endpoint::new(conf.get_grpc_url()).map_err(Into::::into)?; - let channel = endpoint.connect_lazy(); + // get all the configured grpc urls and convert them to a Vec + let channels: Result, _> = conf + .get_grpc_urls() + .into_iter() + .map(|url| { + Endpoint::new(url.to_string()) + .map(|e| CosmosChannel::new(e.connect_lazy(), url)) + .map_err(Into::::into) + }) + .collect(); + let mut builder = FallbackProvider::builder(); + builder = builder.add_providers(channels?); + let fallback_provider = builder.build(); + let provider = CosmosFallbackProvider::new(fallback_provider); + let contract_address = locator .map(|l| { CosmosAddress::from_h256( @@ -126,7 +174,7 @@ impl WasmGrpcProvider { conf, contract_address, signer, - channel, + provider, gas_price, }) } @@ -225,21 +273,36 @@ impl WasmGrpcProvider { // https://github.com/cosmos/cosmjs/blob/44893af824f0712d1f406a8daa9fcae335422235/packages/stargate/src/modules/tx/queries.ts#L67 signatures: vec![vec![]], }; - - let mut client = TxServiceClient::new(self.channel.clone()); let tx_bytes = raw_tx .to_bytes() .map_err(ChainCommunicationError::from_other)?; - #[allow(deprecated)] - let sim_req = tonic::Request::new(SimulateRequest { tx: None, tx_bytes }); - let gas_used = client - .simulate(sim_req) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner() - .gas_info - .ok_or_else(|| ChainCommunicationError::from_other_str("gas info not present"))? - .gas_used; + let gas_used = self + .provider + .call(move |provider| { + let tx_bytes_clone = tx_bytes.clone(); + let future = async move { + let mut client = TxServiceClient::new(provider.channel.clone()); + #[allow(deprecated)] + let sim_req = tonic::Request::new(SimulateRequest { + tx: None, + tx_bytes: tx_bytes_clone, + }); + let gas_used = client + .simulate(sim_req) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner() + .gas_info + .ok_or_else(|| { + ChainCommunicationError::from_other_str("gas info not present") + })? + .gas_used; + + Ok(gas_used) + }; + Box::pin(future) + }) + .await?; let gas_estimate = (gas_used as f64 * GAS_ESTIMATE_MULTIPLIER) as u64; @@ -248,14 +311,25 @@ impl WasmGrpcProvider { /// Fetches balance for a given `address` and `denom` pub async fn get_balance(&self, address: String, denom: String) -> ChainResult { - let mut client = QueryBalanceClient::new(self.channel.clone()); - - let balance_request = tonic::Request::new(QueryBalanceRequest { address, denom }); - let response = client - .balance(balance_request) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner(); + let response = self + .provider + .call(move |provider| { + let address = address.clone(); + let denom = denom.clone(); + let future = async move { + let mut client = QueryBalanceClient::new(provider.channel.clone()); + let balance_request = + tonic::Request::new(QueryBalanceRequest { address, denom }); + let response = client + .balance(balance_request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(response) + }; + Box::pin(future) + }) + .await?; let balance = response .balance @@ -272,14 +346,23 @@ impl WasmGrpcProvider { return self.account_query_injective(account).await; } - let mut client = QueryAccountClient::new(self.channel.clone()); - - let request = tonic::Request::new(QueryAccountRequest { address: account }); - let response = client - .account(request) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner(); + let response = self + .provider + .call(move |provider| { + let address = account.clone(); + let future = async move { + let mut client = QueryAccountClient::new(provider.channel.clone()); + let request = tonic::Request::new(QueryAccountRequest { address }); + let response = client + .account(request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(response) + }; + Box::pin(future) + }) + .await?; let account = BaseAccount::decode( response @@ -294,32 +377,46 @@ impl WasmGrpcProvider { /// Injective-specific logic for querying an account. async fn account_query_injective(&self, account: String) -> ChainResult { - let request = tonic::Request::new( - injective_std::types::cosmos::auth::v1beta1::QueryAccountRequest { address: account }, - ); - - // Borrowed from the logic of `QueryAccountClient` in `cosmrs`, but using injective types. - - let mut grpc_client = tonic::client::Grpc::new(self.channel.clone()); - grpc_client - .ready() - .await - .map_err(Into::::into)?; - - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/cosmos.auth.v1beta1.Query/Account"); - let mut req: tonic::Request< - injective_std::types::cosmos::auth::v1beta1::QueryAccountRequest, - > = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("cosmos.auth.v1beta1.Query", "Account")); - - let response: tonic::Response< - injective_std::types::cosmos::auth::v1beta1::QueryAccountResponse, - > = grpc_client - .unary(req, path, codec) - .await - .map_err(Into::::into)?; + let response = self + .provider + .call(move |provider| { + let address = account.clone(); + let future = async move { + let request = tonic::Request::new( + injective_std::types::cosmos::auth::v1beta1::QueryAccountRequest { + address, + }, + ); + + // Borrowed from the logic of `QueryAccountClient` in `cosmrs`, but using injective types. + + let mut grpc_client = tonic::client::Grpc::new(provider.channel.clone()); + grpc_client + .ready() + .await + .map_err(Into::::into)?; + + let codec = tonic::codec::ProstCodec::default(); + let path = + http::uri::PathAndQuery::from_static("/cosmos.auth.v1beta1.Query/Account"); + let mut req: tonic::Request< + injective_std::types::cosmos::auth::v1beta1::QueryAccountRequest, + > = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("cosmos.auth.v1beta1.Query", "Account")); + + let response: tonic::Response< + injective_std::types::cosmos::auth::v1beta1::QueryAccountResponse, + > = grpc_client + .unary(req, path, codec) + .await + .map_err(Into::::into)?; + + Ok(response) + }; + Box::pin(future) + }) + .await?; let mut eth_account = injective_protobuf::proto::account::EthAccount::parse_from_bytes( response @@ -349,14 +446,23 @@ impl WasmGrpcProvider { #[async_trait] impl WasmProvider for WasmGrpcProvider { async fn latest_block_height(&self) -> ChainResult { - let mut client = ServiceClient::new(self.channel.clone()); - let request = tonic::Request::new(GetLatestBlockRequest {}); + let response = self + .provider + .call(move |provider| { + let future = async move { + let mut client = ServiceClient::new(provider.channel.clone()); + let request = tonic::Request::new(GetLatestBlockRequest {}); + let response = client + .get_latest_block(request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(response) + }; + Box::pin(future) + }) + .await?; - let response = client - .get_latest_block(request) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner(); let height = response .block .ok_or_else(|| ChainCommunicationError::from_other_str("block not present"))? @@ -369,7 +475,7 @@ impl WasmProvider for WasmGrpcProvider { async fn wasm_query(&self, payload: T, block_height: Option) -> ChainResult> where - T: Serialize + Send + Sync, + T: Serialize + Send + Sync + Clone, { let contract_address = self.contract_address.as_ref().ok_or_else(|| { ChainCommunicationError::from_other_str("No contract address available") @@ -385,39 +491,48 @@ impl WasmProvider for WasmGrpcProvider { block_height: Option, ) -> ChainResult> where - T: Serialize + Send + Sync, + T: Serialize + Send + Sync + Clone, { - let mut client = WasmQueryClient::new(self.channel.clone()); - let mut request = tonic::Request::new(QuerySmartContractStateRequest { - address: to, - query_data: serde_json::to_string(&payload)?.as_bytes().to_vec(), - }); - - if let Some(block_height) = block_height { - request - .metadata_mut() - .insert("x-cosmos-block-height", block_height.into()); - } - - let response = client - .smart_contract_state(request) - .await - .map_err(ChainCommunicationError::from_other)? - .into_inner(); + let query_data = serde_json::to_string(&payload)?.as_bytes().to_vec(); + let response = self + .provider + .call(move |provider| { + let to = to.clone(); + let query_data = query_data.clone(); + let future = async move { + let mut client = WasmQueryClient::new(provider.channel.clone()); + + let mut request = tonic::Request::new(QuerySmartContractStateRequest { + address: to, + query_data, + }); + if let Some(block_height) = block_height { + request + .metadata_mut() + .insert("x-cosmos-block-height", block_height.into()); + } + let response = client + .smart_contract_state(request) + .await + .map_err(ChainCommunicationError::from_other)? + .into_inner(); + Ok(response) + }; + Box::pin(future) + }) + .await?; Ok(response.data) } async fn wasm_send(&self, payload: T, gas_limit: Option) -> ChainResult where - T: Serialize + Send + Sync, + T: Serialize + Send + Sync + Clone, { let signer = self.get_signer()?; - let mut client = TxServiceClient::new(self.channel.clone()); let contract_address = self.contract_address.as_ref().ok_or_else(|| { ChainCommunicationError::from_other_str("No contract address available") })?; - let msgs = vec![MsgExecuteContract { sender: signer.address.clone(), contract: contract_address.address(), @@ -426,9 +541,6 @@ impl WasmProvider for WasmGrpcProvider { } .to_any() .map_err(ChainCommunicationError::from_other)?]; - - // We often use U256s to represent gas limits, but Cosmos expects u64s. Try to convert, - // and if it fails, just fallback to None which will result in gas estimation. let gas_limit: Option = gas_limit.and_then(|limit| match limit.try_into() { Ok(limit) => Some(limit), Err(err) => { @@ -439,20 +551,30 @@ impl WasmProvider for WasmGrpcProvider { None } }); - - let tx_req = BroadcastTxRequest { - tx_bytes: self.generate_raw_signed_tx(msgs, gas_limit).await?, - mode: BroadcastMode::Sync as i32, - }; - - let tx_res = client - .broadcast_tx(tx_req) - .await - .map_err(Into::::into)? - .into_inner() - .tx_response - .ok_or_else(|| ChainCommunicationError::from_other_str("Empty tx_response"))?; - + let tx_bytes = self.generate_raw_signed_tx(msgs, gas_limit).await?; + let tx_res = self + .provider + .call(move |provider| { + let tx_bytes = tx_bytes.clone(); + let future = async move { + let mut client = TxServiceClient::new(provider.channel.clone()); + // We often use U256s to represent gas limits, but Cosmos expects u64s. Try to convert, + // and if it fails, just fallback to None which will result in gas estimation. + let tx_req = BroadcastTxRequest { + tx_bytes, + mode: BroadcastMode::Sync as i32, + }; + client + .broadcast_tx(tx_req) + .await + .map_err(Into::::into)? + .into_inner() + .tx_response + .ok_or_else(|| ChainCommunicationError::from_other_str("Empty tx_response")) + }; + Box::pin(future) + }) + .await?; Ok(tx_res) } @@ -482,3 +604,10 @@ impl WasmProvider for WasmGrpcProvider { Ok(response) } } + +#[async_trait] +impl BlockNumberGetter for WasmGrpcProvider { + async fn get_block_number(&self) -> Result { + self.latest_block_height().await + } +} diff --git a/rust/chains/hyperlane-cosmos/src/rpc_clients/fallback.rs b/rust/chains/hyperlane-cosmos/src/rpc_clients/fallback.rs new file mode 100644 index 00000000000..cf933ee73d4 --- /dev/null +++ b/rust/chains/hyperlane-cosmos/src/rpc_clients/fallback.rs @@ -0,0 +1,140 @@ +use std::{ + fmt::{Debug, Formatter}, + ops::Deref, +}; + +use derive_new::new; +use hyperlane_core::rpc_clients::FallbackProvider; + +/// Wrapper of `FallbackProvider` for use in `hyperlane-cosmos` +#[derive(new, Clone)] +pub struct CosmosFallbackProvider { + fallback_provider: FallbackProvider, +} + +impl Deref for CosmosFallbackProvider { + type Target = FallbackProvider; + + fn deref(&self) -> &Self::Target { + &self.fallback_provider + } +} + +impl Debug for CosmosFallbackProvider +where + C: Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + self.fallback_provider.fmt(f) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use async_trait::async_trait; + use hyperlane_core::rpc_clients::test::ProviderMock; + use hyperlane_core::rpc_clients::{BlockNumberGetter, FallbackProviderBuilder}; + use hyperlane_core::ChainCommunicationError; + use tokio::time::sleep; + + use super::*; + + #[derive(Debug, Clone)] + struct CosmosProviderMock(ProviderMock); + + impl Deref for CosmosProviderMock { + type Target = ProviderMock; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl Default for CosmosProviderMock { + fn default() -> Self { + Self(ProviderMock::default()) + } + } + + impl CosmosProviderMock { + fn new(request_sleep: Option) -> Self { + Self(ProviderMock::new(request_sleep)) + } + } + + #[async_trait] + impl BlockNumberGetter for CosmosProviderMock { + async fn get_block_number(&self) -> Result { + Ok(0) + } + } + + impl Into> for CosmosProviderMock { + fn into(self) -> Box { + Box::new(self) + } + } + + impl CosmosFallbackProvider { + async fn low_level_test_call(&mut self) -> Result<(), ChainCommunicationError> { + self.call(|provider| { + provider.push("GET", "http://localhost:1234"); + let future = async move { + let body = tonic::body::BoxBody::default(); + let response = http::Response::builder().status(200).body(body).unwrap(); + if let Some(sleep_duration) = provider.request_sleep() { + sleep(sleep_duration).await; + } + Ok(response) + }; + Box::pin(future) + }) + .await?; + Ok(()) + } + } + + #[tokio::test] + async fn test_first_provider_is_attempted() { + let fallback_provider_builder = FallbackProviderBuilder::default(); + let providers = vec![ + CosmosProviderMock::default(), + CosmosProviderMock::default(), + CosmosProviderMock::default(), + ]; + let fallback_provider = fallback_provider_builder.add_providers(providers).build(); + let mut cosmos_fallback_provider = CosmosFallbackProvider::new(fallback_provider); + cosmos_fallback_provider + .low_level_test_call() + .await + .unwrap(); + let provider_call_count: Vec<_> = + ProviderMock::get_call_counts(&cosmos_fallback_provider).await; + assert_eq!(provider_call_count, vec![1, 0, 0]); + } + + #[tokio::test] + async fn test_one_stalled_provider() { + let fallback_provider_builder = FallbackProviderBuilder::default(); + let providers = vec![ + CosmosProviderMock::new(Some(Duration::from_millis(10))), + CosmosProviderMock::default(), + CosmosProviderMock::default(), + ]; + let fallback_provider = fallback_provider_builder + .add_providers(providers) + .with_max_block_time(Duration::from_secs(0)) + .build(); + let mut cosmos_fallback_provider = CosmosFallbackProvider::new(fallback_provider); + cosmos_fallback_provider + .low_level_test_call() + .await + .unwrap(); + + let provider_call_count: Vec<_> = + ProviderMock::get_call_counts(&cosmos_fallback_provider).await; + assert_eq!(provider_call_count, vec![0, 0, 1]); + } +} diff --git a/rust/chains/hyperlane-cosmos/src/rpc_clients/mod.rs b/rust/chains/hyperlane-cosmos/src/rpc_clients/mod.rs new file mode 100644 index 00000000000..536845688d5 --- /dev/null +++ b/rust/chains/hyperlane-cosmos/src/rpc_clients/mod.rs @@ -0,0 +1,3 @@ +pub use self::fallback::*; + +mod fallback; diff --git a/rust/chains/hyperlane-cosmos/src/trait_builder.rs b/rust/chains/hyperlane-cosmos/src/trait_builder.rs index 2bacb4d2f55..1bb3627b9de 100644 --- a/rust/chains/hyperlane-cosmos/src/trait_builder.rs +++ b/rust/chains/hyperlane-cosmos/src/trait_builder.rs @@ -2,12 +2,13 @@ use std::str::FromStr; use derive_new::new; use hyperlane_core::{ChainCommunicationError, FixedPointNumber}; +use url::Url; /// Cosmos connection configuration #[derive(Debug, Clone)] pub struct ConnectionConf { /// The GRPC url to connect to - grpc_url: String, + grpc_urls: Vec, /// The RPC url to connect to rpc_url: String, /// The chain ID @@ -76,8 +77,8 @@ pub enum ConnectionConfError { impl ConnectionConf { /// Get the GRPC url - pub fn get_grpc_url(&self) -> String { - self.grpc_url.clone() + pub fn get_grpc_urls(&self) -> Vec { + self.grpc_urls.clone() } /// Get the RPC url @@ -112,7 +113,7 @@ impl ConnectionConf { /// Create a new connection configuration pub fn new( - grpc_url: String, + grpc_urls: Vec, rpc_url: String, chain_id: String, bech32_prefix: String, @@ -121,7 +122,7 @@ impl ConnectionConf { contract_address_bytes: usize, ) -> Self { Self { - grpc_url, + grpc_urls, rpc_url, chain_id, bech32_prefix, diff --git a/rust/chains/hyperlane-ethereum/Cargo.toml b/rust/chains/hyperlane-ethereum/Cargo.toml index 8d6db17f45b..a72855a00b9 100644 --- a/rust/chains/hyperlane-ethereum/Cargo.toml +++ b/rust/chains/hyperlane-ethereum/Cargo.toml @@ -30,7 +30,7 @@ tracing-futures.workspace = true tracing.workspace = true url.workspace = true -hyperlane-core = { path = "../../hyperlane-core" } +hyperlane-core = { path = "../../hyperlane-core", features = ["fallback-provider"]} ethers-prometheus = { path = "../../ethers-prometheus", features = ["serde"] } [build-dependencies] diff --git a/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs b/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs index 2ac6ae009f8..1243ecc1060 100644 --- a/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs +++ b/rust/chains/hyperlane-ethereum/src/rpc_clients/fallback.rs @@ -12,23 +12,23 @@ use serde_json::Value; use tokio::time::sleep; use tracing::{instrument, warn_span}; -use ethers_prometheus::json_rpc_client::PrometheusJsonRpcClientConfigExt; +use ethers_prometheus::json_rpc_client::{JsonRpcBlockGetter, PrometheusJsonRpcClientConfigExt}; use crate::rpc_clients::{categorize_client_response, CategorizedResponse}; /// Wrapper of `FallbackProvider` for use in `hyperlane-ethereum` #[derive(new)] -pub struct EthereumFallbackProvider(FallbackProvider); +pub struct EthereumFallbackProvider(FallbackProvider); -impl Deref for EthereumFallbackProvider { - type Target = FallbackProvider; +impl Deref for EthereumFallbackProvider { + type Target = FallbackProvider; fn deref(&self) -> &Self::Target { &self.0 } } -impl Debug for EthereumFallbackProvider +impl Debug for EthereumFallbackProvider where C: JsonRpcClient + PrometheusJsonRpcClientConfigExt, { @@ -73,16 +73,17 @@ impl From for ProviderError { #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait)] -impl JsonRpcClient for EthereumFallbackProvider +impl JsonRpcClient for EthereumFallbackProvider> where C: JsonRpcClient + + Into> + PrometheusJsonRpcClientConfigExt - + Into> + Clone, + JsonRpcBlockGetter: BlockNumberGetter, { type Error = ProviderError; - // TODO: Refactor the reusable parts of this function when implementing the cosmos-specific logic + // TODO: Refactor to use `FallbackProvider::call` #[instrument] async fn request(&self, method: &str, params: T) -> Result where @@ -108,7 +109,7 @@ where let resp = fut.await; self.handle_stalled_provider(priority, provider).await; let _span = - warn_span!("request_with_fallback", fallback_count=%idx, provider_index=%priority.index, ?provider).entered(); + warn_span!("request", fallback_count=%idx, provider_index=%priority.index, ?provider).entered(); match categorize_client_response(method, resp) { IsOk(v) => return Ok(serde_json::from_value(v)?), @@ -125,41 +126,37 @@ where #[cfg(test)] mod tests { use ethers_prometheus::json_rpc_client::{JsonRpcBlockGetter, BLOCK_NUMBER_RPC}; + use hyperlane_core::rpc_clients::test::ProviderMock; use hyperlane_core::rpc_clients::FallbackProviderBuilder; use super::*; - use std::sync::{Arc, Mutex}; #[derive(Debug, Clone)] - struct ProviderMock { - // Store requests as tuples of (method, params) - // Even if the tests were single-threaded, need the arc-mutex - // for interior mutability in `JsonRpcClient::request` - requests: Arc>>, - } + struct EthereumProviderMock(ProviderMock); - impl ProviderMock { - fn new() -> Self { - Self { - requests: Arc::new(Mutex::new(vec![])), - } + impl Deref for EthereumProviderMock { + type Target = ProviderMock; + + fn deref(&self) -> &Self::Target { + &self.0 } + } - fn push(&self, method: &str, params: T) { - self.requests - .lock() - .unwrap() - .push((method.to_owned(), format!("{:?}", params))); + impl Default for EthereumProviderMock { + fn default() -> Self { + Self(ProviderMock::default()) } + } - fn requests(&self) -> Vec<(String, String)> { - self.requests.lock().unwrap().clone() + impl EthereumProviderMock { + fn new(request_sleep: Option) -> Self { + Self(ProviderMock::new(request_sleep)) } } - impl Into> for ProviderMock { - fn into(self) -> Box { - Box::new(JsonRpcBlockGetter::new(self.clone())) + impl Into> for EthereumProviderMock { + fn into(self) -> JsonRpcBlockGetter { + JsonRpcBlockGetter::new(self) } } @@ -171,7 +168,7 @@ mod tests { } #[async_trait] - impl JsonRpcClient for ProviderMock { + impl JsonRpcClient for EthereumProviderMock { type Error = HttpClientError; /// Pushes the `(method, params)` to the back of the `requests` queue, @@ -182,12 +179,14 @@ mod tests { params: T, ) -> Result { self.push(method, params); - sleep(Duration::from_millis(10)).await; + if let Some(sleep_duration) = self.request_sleep() { + sleep(sleep_duration).await; + } dummy_return_value() } } - impl PrometheusJsonRpcClientConfigExt for ProviderMock { + impl PrometheusJsonRpcClientConfigExt for EthereumProviderMock { fn node_host(&self) -> &str { todo!() } @@ -197,35 +196,32 @@ mod tests { } } - async fn get_call_counts(fallback_provider: &FallbackProvider) -> Vec { - fallback_provider - .inner - .priorities - .read() - .await - .iter() - .map(|p| { - let provider = &fallback_provider.inner.providers[p.index]; - provider.requests().len() - }) - .collect() + impl EthereumFallbackProvider> + where + C: JsonRpcClient + + PrometheusJsonRpcClientConfigExt + + Into> + + Clone, + JsonRpcBlockGetter: BlockNumberGetter, + { + async fn low_level_test_call(&self) { + self.request::<_, u64>(BLOCK_NUMBER_RPC, ()).await.unwrap(); + } } #[tokio::test] async fn test_first_provider_is_attempted() { let fallback_provider_builder = FallbackProviderBuilder::default(); let providers = vec![ - ProviderMock::new(), - ProviderMock::new(), - ProviderMock::new(), + EthereumProviderMock::default(), + EthereumProviderMock::default(), + EthereumProviderMock::default(), ]; let fallback_provider = fallback_provider_builder.add_providers(providers).build(); let ethereum_fallback_provider = EthereumFallbackProvider::new(fallback_provider); - ethereum_fallback_provider - .request::<_, u64>(BLOCK_NUMBER_RPC, ()) - .await - .unwrap(); - let provider_call_count: Vec<_> = get_call_counts(ðereum_fallback_provider).await; + ethereum_fallback_provider.low_level_test_call().await; + let provider_call_count: Vec<_> = + ProviderMock::get_call_counts(ðereum_fallback_provider).await; assert_eq!(provider_call_count, vec![1, 0, 0]); } @@ -233,21 +229,18 @@ mod tests { async fn test_one_stalled_provider() { let fallback_provider_builder = FallbackProviderBuilder::default(); let providers = vec![ - ProviderMock::new(), - ProviderMock::new(), - ProviderMock::new(), + EthereumProviderMock::new(Some(Duration::from_millis(10))), + EthereumProviderMock::default(), + EthereumProviderMock::default(), ]; let fallback_provider = fallback_provider_builder .add_providers(providers) .with_max_block_time(Duration::from_secs(0)) .build(); let ethereum_fallback_provider = EthereumFallbackProvider::new(fallback_provider); - ethereum_fallback_provider - .request::<_, u64>(BLOCK_NUMBER_RPC, ()) - .await - .unwrap(); - - let provider_call_count: Vec<_> = get_call_counts(ðereum_fallback_provider).await; + ethereum_fallback_provider.low_level_test_call().await; + let provider_call_count: Vec<_> = + ProviderMock::get_call_counts(ðereum_fallback_provider).await; assert_eq!(provider_call_count, vec![0, 0, 2]); } diff --git a/rust/chains/hyperlane-ethereum/src/trait_builder.rs b/rust/chains/hyperlane-ethereum/src/trait_builder.rs index 31fa128d865..a0ab93af480 100644 --- a/rust/chains/hyperlane-ethereum/src/trait_builder.rs +++ b/rust/chains/hyperlane-ethereum/src/trait_builder.rs @@ -16,8 +16,8 @@ use reqwest::{Client, Url}; use thiserror::Error; use ethers_prometheus::json_rpc_client::{ - JsonRpcClientMetrics, JsonRpcClientMetricsBuilder, NodeInfo, PrometheusJsonRpcClient, - PrometheusJsonRpcClientConfig, + JsonRpcBlockGetter, JsonRpcClientMetrics, JsonRpcClientMetricsBuilder, NodeInfo, + PrometheusJsonRpcClient, PrometheusJsonRpcClientConfig, }; use ethers_prometheus::middleware::{ MiddlewareMetrics, PrometheusMiddleware, PrometheusMiddlewareConf, @@ -116,7 +116,10 @@ pub trait BuildableWithProvider { builder = builder.add_provider(metrics_provider); } let fallback_provider = builder.build(); - let ethereum_fallback_provider = EthereumFallbackProvider::new(fallback_provider); + let ethereum_fallback_provider = EthereumFallbackProvider::< + _, + JsonRpcBlockGetter>, + >::new(fallback_provider); self.build( ethereum_fallback_provider, locator, diff --git a/rust/chains/hyperlane-fuel/Cargo.toml b/rust/chains/hyperlane-fuel/Cargo.toml index 7dabcdd514c..82bdbc782e2 100644 --- a/rust/chains/hyperlane-fuel/Cargo.toml +++ b/rust/chains/hyperlane-fuel/Cargo.toml @@ -19,7 +19,7 @@ tracing-futures.workspace = true tracing.workspace = true url.workspace = true -hyperlane-core = { path = "../../hyperlane-core" } +hyperlane-core = { path = "../../hyperlane-core", features = ["fallback-provider"]} [build-dependencies] abigen = { path = "../../utils/abigen", features = ["fuels"] } diff --git a/rust/chains/hyperlane-sealevel/Cargo.toml b/rust/chains/hyperlane-sealevel/Cargo.toml index 248e3dfac14..ab4e9b17f63 100644 --- a/rust/chains/hyperlane-sealevel/Cargo.toml +++ b/rust/chains/hyperlane-sealevel/Cargo.toml @@ -24,7 +24,7 @@ tracing.workspace = true url.workspace = true account-utils = { path = "../../sealevel/libraries/account-utils" } -hyperlane-core = { path = "../../hyperlane-core", features = ["solana"] } +hyperlane-core = { path = "../../hyperlane-core", features = ["solana", "fallback-provider"] } hyperlane-sealevel-interchain-security-module-interface = { path = "../../sealevel/libraries/interchain-security-module-interface" } hyperlane-sealevel-mailbox = { path = "../../sealevel/programs/mailbox", features = ["no-entrypoint"] } hyperlane-sealevel-igp = { path = "../../sealevel/programs/hyperlane-sealevel-igp", features = ["no-entrypoint"] } diff --git a/rust/config/mainnet3_config.json b/rust/config/mainnet3_config.json index 65e0e8ebd74..522196bf79a 100644 --- a/rust/config/mainnet3_config.json +++ b/rust/config/mainnet3_config.json @@ -516,7 +516,11 @@ "http": "https://rpc-injective.goldenratiostaking.net:443" } ], - "grpcUrl": "https://injective-grpc.publicnode.com/", + "grpcUrls": [ + { + "http": "https://injective-grpc.goldenratiostaking.net:443" + } + ], "canonicalAsset": "inj", "bech32Prefix": "inj", "gasPrice": { @@ -896,4 +900,4 @@ } }, "defaultRpcConsensusType": "fallback" -} +} \ No newline at end of file diff --git a/rust/ethers-prometheus/src/json_rpc_client.rs b/rust/ethers-prometheus/src/json_rpc_client.rs index 7e0c4d1feee..2cc8defe9ba 100644 --- a/rust/ethers-prometheus/src/json_rpc_client.rs +++ b/rust/ethers-prometheus/src/json_rpc_client.rs @@ -186,9 +186,11 @@ where } } -impl From> for Box { +impl From> + for JsonRpcBlockGetter> +{ fn from(val: PrometheusJsonRpcClient) -> Self { - Box::new(JsonRpcBlockGetter::new(val)) + JsonRpcBlockGetter::new(val) } } diff --git a/rust/helm/hyperlane-agent/templates/external-secret.yaml b/rust/helm/hyperlane-agent/templates/external-secret.yaml index 5d0eae5ced9..36f287eca38 100644 --- a/rust/helm/hyperlane-agent/templates/external-secret.yaml +++ b/rust/helm/hyperlane-agent/templates/external-secret.yaml @@ -29,7 +29,7 @@ spec: {{- if not .disabled }} HYP_CHAINS_{{ .name | upper }}_CUSTOMRPCURLS: {{ printf "'{{ .%s_rpcs | mustFromJson | join \",\" }}'" .name }} {{- if eq .protocol "cosmos" }} - HYP_CHAINS_{{ .name | upper }}_GRPCURL: {{ printf "'{{ .%s_grpc }}'" .name }} + HYP_CHAINS_{{ .name | upper }}_GRPCURLS: {{ printf "'{{ .%s_grpcs | mustFromJson | join \",\" }}'" .name }} {{- end }} {{- end }} {{- end }} @@ -44,9 +44,9 @@ spec: remoteRef: key: {{ printf "%s-rpc-endpoints-%s" $.Values.hyperlane.runEnv .name }} {{- if eq .protocol "cosmos" }} - - secretKey: {{ printf "%s_grpc" .name }} + - secretKey: {{ printf "%s_grpcs" .name }} remoteRef: - key: {{ printf "%s-grpc-endpoint-%s" $.Values.hyperlane.runEnv .name }} + key: {{ printf "%s-grpc-endpoints-%s" $.Values.hyperlane.runEnv .name }} {{- end }} {{- end }} {{- end }} diff --git a/rust/hyperlane-base/src/settings/parser/connection_parser.rs b/rust/hyperlane-base/src/settings/parser/connection_parser.rs index 5d42ce44117..0d47d6eca9e 100644 --- a/rust/hyperlane-base/src/settings/parser/connection_parser.rs +++ b/rust/hyperlane-base/src/settings/parser/connection_parser.rs @@ -6,7 +6,7 @@ use url::Url; use crate::settings::envs::*; use crate::settings::ChainConnectionConf; -use super::{parse_cosmos_gas_price, ValueParser}; +use super::{parse_base_and_override_urls, parse_cosmos_gas_price, ValueParser}; pub fn build_ethereum_connection_conf( rpcs: &[Url], @@ -43,19 +43,8 @@ pub fn build_cosmos_connection_conf( err: &mut ConfigParsingError, ) -> Option { let mut local_err = ConfigParsingError::default(); - - let grpc_url = chain - .chain(&mut local_err) - .get_key("grpcUrl") - .parse_string() - .end() - .or_else(|| { - local_err.push( - &chain.cwp + "grpc_url", - eyre!("Missing grpc definitions for chain"), - ); - None - }); + let grpcs = + parse_base_and_override_urls(chain, "grpcUrls", "customGrpcUrls", "http", &mut local_err); let chain_id = chain .chain(&mut local_err) @@ -114,7 +103,7 @@ pub fn build_cosmos_connection_conf( None } else { Some(ChainConnectionConf::Cosmos(h_cosmos::ConnectionConf::new( - grpc_url.unwrap().to_string(), + grpcs, rpcs.first().unwrap().to_string(), chain_id.unwrap().to_string(), prefix.unwrap().to_string(), diff --git a/rust/hyperlane-base/src/settings/parser/mod.rs b/rust/hyperlane-base/src/settings/parser/mod.rs index 76c88fe1b3b..3bfb52f91ff 100644 --- a/rust/hyperlane-base/src/settings/parser/mod.rs +++ b/rust/hyperlane-base/src/settings/parser/mod.rs @@ -18,6 +18,7 @@ use hyperlane_core::{ use itertools::Itertools; use serde::Deserialize; use serde_json::Value; +use url::Url; pub use self::json_value_parser::ValueParser; pub use super::envs::*; @@ -134,47 +135,7 @@ fn parse_chain( .parse_u32() .unwrap_or(1); - let rpcs_base = chain - .chain(&mut err) - .get_key("rpcUrls") - .into_array_iter() - .map(|urls| { - urls.filter_map(|v| { - v.chain(&mut err) - .get_key("http") - .parse_from_str("Invalid http url") - .end() - }) - .collect_vec() - }) - .unwrap_or_default(); - - let rpc_overrides = chain - .chain(&mut err) - .get_opt_key("customRpcUrls") - .parse_string() - .end() - .map(|urls| { - urls.split(',') - .filter_map(|url| { - url.parse() - .take_err(&mut err, || &chain.cwp + "customRpcUrls") - }) - .collect_vec() - }); - - let rpcs = rpc_overrides.unwrap_or(rpcs_base); - - if rpcs.is_empty() { - err.push( - &chain.cwp + "rpc_urls", - eyre!("Missing base rpc definitions for chain"), - ); - err.push( - &chain.cwp + "custom_rpc_urls", - eyre!("Also missing rpc overrides for chain"), - ); - } + let rpcs = parse_base_and_override_urls(&chain, "rpcUrls", "customRpcUrls", "http", &mut err); let from = chain .chain(&mut err) @@ -418,3 +379,66 @@ fn parse_cosmos_gas_price(gas_price: ValueParser) -> ConfigResult Vec { + chain + .chain(err) + .get_key(key) + .into_array_iter() + .map(|urls| { + urls.filter_map(|v| { + v.chain(err) + .get_key(protocol) + .parse_from_str("Invalid url") + .end() + }) + .collect_vec() + }) + .unwrap_or_default() +} + +fn parse_custom_urls( + chain: &ValueParser, + key: &str, + err: &mut ConfigParsingError, +) -> Option> { + chain + .chain(err) + .get_opt_key(key) + .parse_string() + .end() + .map(|urls| { + urls.split(',') + .filter_map(|url| url.parse().take_err(err, || &chain.cwp + "customGrpcUrls")) + .collect_vec() + }) +} + +fn parse_base_and_override_urls( + chain: &ValueParser, + base_key: &str, + override_key: &str, + protocol: &str, + err: &mut ConfigParsingError, +) -> Vec { + let base = parse_urls(chain, base_key, protocol, err); + let overrides = parse_custom_urls(chain, override_key, err); + let combined = overrides.unwrap_or(base); + + if combined.is_empty() { + err.push( + &chain.cwp + "rpc_urls", + eyre!("Missing base rpc definitions for chain"), + ); + err.push( + &chain.cwp + "custom_rpc_urls", + eyre!("Also missing rpc overrides for chain"), + ); + } + combined +} diff --git a/rust/hyperlane-core/Cargo.toml b/rust/hyperlane-core/Cargo.toml index 8329bce5f2a..40468bef6ca 100644 --- a/rust/hyperlane-core/Cargo.toml +++ b/rust/hyperlane-core/Cargo.toml @@ -37,6 +37,7 @@ serde_json = { workspace = true } sha3 = { workspace = true } strum = { workspace = true, optional = true, features = ["derive"] } thiserror = { workspace = true } +tokio = { workspace = true, optional = true, features = ["rt", "time"] } tracing.workspace = true primitive-types = { workspace = true, optional = true } solana-sdk = { workspace = true, optional = true } @@ -54,3 +55,4 @@ agent = ["ethers", "strum"] strum = ["dep:strum"] ethers = ["dep:ethers-core", "dep:ethers-contract", "dep:ethers-providers", "dep:primitive-types"] solana = ["dep:solana-sdk"] +fallback-provider = ["tokio"] diff --git a/rust/hyperlane-core/src/rpc_clients/fallback.rs b/rust/hyperlane-core/src/rpc_clients/fallback.rs index 6adcb76f559..6f75cbc4c8b 100644 --- a/rust/hyperlane-core/src/rpc_clients/fallback.rs +++ b/rust/hyperlane-core/src/rpc_clients/fallback.rs @@ -1,15 +1,22 @@ use async_rwlock::RwLock; use async_trait::async_trait; use derive_new::new; +use itertools::Itertools; use std::{ - fmt::Debug, + fmt::{Debug, Formatter}, + future::Future, + marker::PhantomData, + pin::Pin, sync::Arc, time::{Duration, Instant}, }; -use tracing::info; +use tokio; +use tracing::{info, trace, warn_span}; use crate::ChainCommunicationError; +use super::RpcClientError; + /// Read the current block number from a chain. #[async_trait] pub trait BlockNumberGetter: Send + Sync + Debug { @@ -38,7 +45,6 @@ impl PrioritizedProviderInner { } } } - /// Sub-providers and priority information pub struct PrioritizedProviders { /// Unsorted list of providers this provider calls @@ -49,28 +55,56 @@ pub struct PrioritizedProviders { /// A provider that bundles multiple providers and attempts to call the first, /// then the second, and so on until a response is received. -pub struct FallbackProvider { +/// +/// Although no trait bounds are used in the struct definition, the intended purpose of `B` +/// is to be bound by `BlockNumberGetter` and have `T` be convertible to `B`. That is, +/// inner providers should be able to get the current block number, or be convertible into +/// something that is. +pub struct FallbackProvider { /// The sub-providers called by this provider pub inner: Arc>, max_block_time: Duration, + _phantom: PhantomData, } -impl Clone for FallbackProvider { +impl Clone for FallbackProvider { fn clone(&self) -> Self { Self { inner: self.inner.clone(), max_block_time: self.max_block_time, + _phantom: PhantomData, } } } -impl FallbackProvider +impl Debug for FallbackProvider +where + T: Debug, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + // iterate the inner providers and write them to the formatter + f.debug_struct("FallbackProvider") + .field( + "providers", + &self + .inner + .providers + .iter() + .map(|v| format!("{:?}", v)) + .join(", "), + ) + .finish() + } +} + +impl FallbackProvider where - T: Into> + Debug + Clone, + T: Into + Debug + Clone, + B: BlockNumberGetter, { /// Convenience method for creating a `FallbackProviderBuilder` with same /// `JsonRpcClient` types - pub fn builder() -> FallbackProviderBuilder { + pub fn builder() -> FallbackProviderBuilder { FallbackProviderBuilder::default() } @@ -112,7 +146,7 @@ where return; } - let block_getter: Box = provider.clone().into(); + let block_getter: B = provider.clone().into(); let current_block_height = block_getter .get_block_number() .await @@ -130,25 +164,62 @@ where .await; } } + + /// Call the first provider, then the second, and so on (in order of priority) until a response is received. + /// If all providers fail, return an error. + pub async fn call( + &self, + mut f: impl FnMut(T) -> Pin> + Send>>, + ) -> Result { + let mut errors = vec![]; + // make sure we do at least 4 total retries. + while errors.len() <= 3 { + if !errors.is_empty() { + tokio::time::sleep(Duration::from_millis(100)).await; + } + let priorities_snapshot = self.take_priorities_snapshot().await; + for (idx, priority) in priorities_snapshot.iter().enumerate() { + let provider = &self.inner.providers[priority.index]; + let resp = f(provider.clone()).await; + self.handle_stalled_provider(priority, provider).await; + let _span = + warn_span!("FallbackProvider::call", fallback_count=%idx, provider_index=%priority.index, ?provider).entered(); + match resp { + Ok(v) => return Ok(v), + Err(e) => { + trace!( + error=?e, + "Got error from inner fallback provider", + ); + errors.push(e) + } + } + } + } + + Err(RpcClientError::FallbackProvidersFailed(errors).into()) + } } /// Builder to create a new fallback provider. #[derive(Debug, Clone)] -pub struct FallbackProviderBuilder { +pub struct FallbackProviderBuilder { providers: Vec, max_block_time: Duration, + _phantom: PhantomData, } -impl Default for FallbackProviderBuilder { +impl Default for FallbackProviderBuilder { fn default() -> Self { Self { providers: Vec::new(), max_block_time: MAX_BLOCK_TIME, + _phantom: PhantomData, } } } -impl FallbackProviderBuilder { +impl FallbackProviderBuilder { /// Add a new provider to the set. Each new provider will be a lower /// priority than the previous. pub fn add_provider(mut self, provider: T) -> Self { @@ -170,7 +241,7 @@ impl FallbackProviderBuilder { } /// Create a fallback provider. - pub fn build(self) -> FallbackProvider { + pub fn build(self) -> FallbackProvider { let provider_count = self.providers.len(); let prioritized_providers = PrioritizedProviders { providers: self.providers, @@ -184,6 +255,80 @@ impl FallbackProviderBuilder { FallbackProvider { inner: Arc::new(prioritized_providers), max_block_time: self.max_block_time, + _phantom: PhantomData, + } + } +} + +/// Utilities to import when testing chain-specific fallback providers +pub mod test { + use super::*; + use std::{ + ops::Deref, + sync::{Arc, Mutex}, + }; + + /// Provider that stores requests and optionally sleeps before returning a dummy value + #[derive(Debug, Clone)] + pub struct ProviderMock { + // Store requests as tuples of (method, params) + // Even if the tests were single-threaded, need the arc-mutex + // for interior mutability in `JsonRpcClient::request` + requests: Arc>>, + request_sleep: Option, + } + + impl Default for ProviderMock { + fn default() -> Self { + Self { + requests: Arc::new(Mutex::new(vec![])), + request_sleep: None, + } + } + } + + impl ProviderMock { + /// Create a new provider + pub fn new(request_sleep: Option) -> Self { + Self { + request_sleep, + ..Default::default() + } + } + + /// Push a request to the internal store for later inspection + pub fn push(&self, method: &str, params: T) { + self.requests + .lock() + .unwrap() + .push((method.to_owned(), format!("{:?}", params))); + } + + /// Get the stored requests + pub fn requests(&self) -> Vec<(String, String)> { + self.requests.lock().unwrap().clone() + } + + /// Set the sleep duration + pub fn request_sleep(&self) -> Option { + self.request_sleep + } + + /// Get how many times each provider was called + pub async fn get_call_counts, B>( + fallback_provider: &FallbackProvider, + ) -> Vec { + fallback_provider + .inner + .priorities + .read() + .await + .iter() + .map(|p| { + let provider = &fallback_provider.inner.providers[p.index]; + provider.requests().len() + }) + .collect() } } } diff --git a/rust/hyperlane-core/src/rpc_clients/mod.rs b/rust/hyperlane-core/src/rpc_clients/mod.rs index 78851f9f26d..02aaae99f59 100644 --- a/rust/hyperlane-core/src/rpc_clients/mod.rs +++ b/rust/hyperlane-core/src/rpc_clients/mod.rs @@ -1,4 +1,8 @@ -pub use self::{error::*, fallback::*}; +pub use self::error::*; + +#[cfg(feature = "fallback-provider")] +pub use self::fallback::*; mod error; +#[cfg(feature = "fallback-provider")] mod fallback; diff --git a/rust/utils/abigen/src/lib.rs b/rust/utils/abigen/src/lib.rs index 2e8d5ca0814..b4b7970fdc0 100644 --- a/rust/utils/abigen/src/lib.rs +++ b/rust/utils/abigen/src/lib.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "fuels")] use fuels_code_gen::ProgramType; use std::collections::BTreeSet; use std::ffi::OsStr; diff --git a/rust/utils/run-locally/src/cosmos/types.rs b/rust/utils/run-locally/src/cosmos/types.rs index 795d12ff39d..ed95331cea3 100644 --- a/rust/utils/run-locally/src/cosmos/types.rs +++ b/rust/utils/run-locally/src/cosmos/types.rs @@ -118,7 +118,7 @@ pub struct AgentConfig { pub protocol: String, pub chain_id: String, pub rpc_urls: Vec, - pub grpc_url: String, + pub grpc_urls: Vec, pub bech32_prefix: String, pub signer: AgentConfigSigner, pub index: AgentConfigIndex, @@ -156,7 +156,15 @@ impl AgentConfig { network.launch_resp.endpoint.rpc_addr.replace("tcp://", "") ), }], - grpc_url: format!("http://{}", network.launch_resp.endpoint.grpc_addr), + grpc_urls: vec![ + // The first url points to a nonexistent node, but is used for checking fallback provider logic + AgentUrl { + http: "localhost:1337".to_string(), + }, + AgentUrl { + http: format!("http://{}", network.launch_resp.endpoint.grpc_addr), + }, + ], bech32_prefix: "osmo".to_string(), signer: AgentConfigSigner { typ: "cosmosKey".to_string(), diff --git a/typescript/sdk/src/metadata/agentConfig.ts b/typescript/sdk/src/metadata/agentConfig.ts index e7133b4c4a1..5aa89687ffc 100644 --- a/typescript/sdk/src/metadata/agentConfig.ts +++ b/typescript/sdk/src/metadata/agentConfig.ts @@ -124,7 +124,7 @@ export const AgentChainMetadataSchema = ChainMetadataSchemaObject.merge( .string() .optional() .describe( - 'Specify a comma seperated list of custom RPC URLs to use for this chain. If not specified, the default RPC urls will be used.', + 'Specify a comma separated list of custom RPC URLs to use for this chain. If not specified, the default RPC urls will be used.', ), rpcConsensusType: z .nativeEnum(RpcConsensusType) diff --git a/typescript/sdk/src/metadata/chainMetadataTypes.ts b/typescript/sdk/src/metadata/chainMetadataTypes.ts index b13cea0349b..565a151af03 100644 --- a/typescript/sdk/src/metadata/chainMetadataTypes.ts +++ b/typescript/sdk/src/metadata/chainMetadataTypes.ts @@ -115,6 +115,12 @@ export const ChainMetadataSchemaObject = z.object({ .array(RpcUrlSchema) .describe('For cosmos chains only, a list of gRPC API URLs') .optional(), + customGrpcUrls: z + .string() + .optional() + .describe( + 'Specify a comma separated list of custom GRPC URLs to use for this chain. If not specified, the default GRPC urls will be used.', + ), blockExplorers: z .array( z.object({