diff --git a/sentinel-config-testnet.json b/sentinel-config-testnet.json index 9d56c5f9b9..b8a61594ce 100644 --- a/sentinel-config-testnet.json +++ b/sentinel-config-testnet.json @@ -2,12 +2,10 @@ "chain_configs": { "ethereum": { "ethereum": { - "enabled": false, + "enabled": true, "ibc_handler_address": "0xa390514f803a3b318b93bf6cd4beeb9f8299a0eb", "signers": [ - { - "raw": "0xf2e42670eb5083a1def99dcf2fbc685d8fd6e5bb9c94a8885e031c3e6a1e4529" - } + { "raw": "0xc7a7febac36c64d3cd757747494e4de734af5f05339a2fc818f589709e738fff" } ], "eth_rpc_api": "wss://eth-sepolia.g.alchemy.com/v2/Xn_VBUDyUtXUYb9O6b5ZmuBNDaSlH-BB", "transfer_module": { @@ -75,6 +73,7 @@ } ] }, + "gas_config": { "gas_price": "1.0", "gas_denom": "muno", @@ -94,74 +93,43 @@ } }, "interactions": [ - - { - "source": { - "chain": "ethereum", - "channel": "channel-81" - }, - "destination": { - "chain": "union", - "channel": "channel-89" - }, - "protocol": { - "Ucs01": { - "receivers": ["union1qgvmcfkpd66wat6shhfas0z8z9dzp683mcj9tq"], - "contract": "union1m87a5scxnnk83wfwapxlufzm58qe2v65985exff70z95a2yr86yq7hl08h" - } - }, - "memo": "{\"forward\":{\"receiver\":\"84cB5E16918547aD6181fe6513861a7eA476f2EC\",\"port\":\"wasm.union1m87a5scxnnk83wfwapxlufzm58qe2v65985exff70z95a2yr86yq7hl08h\",\"channel\":\"channel-70\"}}", - "sending_memo_probability": 1, - "denoms": [ - "0x1c7d4b196cb0c7b01d743fbc6116a902379c7238", - "muno", - "0x08210F9170F89Ab7658F0B5E3fF39b0E03C594D4" - ], - "send_packet_interval": 50, - "expect_full_cycle": 900, - "amount_min": 1, - "amount_max": 3 - }, { "source": { "chain": "union", - "channel": "channel-89" + "channel": "channel-6" }, "destination": { - "chain": "ethereum", - "channel": "channel-81" + "chain": "bera", + "channel": "channel-80" }, "protocol": { "Ucs01": { - "receivers": ["0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238"], + "receivers": ["0x614E946f6D769Ad2983E4d4B81DDeBBFA51B09b5"], "contract": "union1m87a5scxnnk83wfwapxlufzm58qe2v65985exff70z95a2yr86yq7hl08h" } }, - "memo": "{\"forward\":{\"receiver\":\"84cB5E16918547aD6181fe6513861a7eA476f2EC\",\"port\":\"wasm.union1m37cxl0ld4uaw3r4lv9nt2uw69xxf8xfjrf7a4w9hamv6xvp6ddqqfaaaa\",\"channel\":\"channel-6\"}}", - "sending_memo_probability": 0.0, - "denoms": [ - "factory/union1m37cxl0ld4uaw3r4lv9nt2uw69xxf8xfjrf7a4w9hamv6xvp6ddqqfaaaa/0xe619529b4396a62ab6d88ff2bb195e83c11e909ad9", - "factory/union1m37cxl0ld4uaw3r4lv9nt2uw69xxf8xfjrf7a4w9hamv6xvp6ddqqfaaaa/0x742f7232c73bea91be2828fa14129f68015c3f895b", - "muno" - ], - "send_packet_interval": 50, + "memo": "{\"forward\":{\"receiver\":\"614E946f6D769Ad2983E4d4B81DDeBBFA51B09b5\",\"port\":\"wasm.union1m87a5scxnnk83wfwapxlufzm58qe2v65985exff70z95a2yr86yq7hl08h\",\"channel\":\"channel-80\"}}", + "sending_memo_probability": 0, + "denoms": ["muno"], + "send_packet_interval": 1, "expect_full_cycle": 900, "amount_min": 1, - "amount_max": 3 + "amount_max": 1, + "max_retry": 3 } ], "single_interaction": { "source": { - "chain": "bera", - "channel": "channel-80" - }, - "destination": { "chain": "union", "channel": "channel-6" }, + "destination": { + "chain": "bera", + "channel": "channel-80" + }, "protocol": { "Ucs01": { - "receivers": ["union1qgvmcfkpd66wat6shhfas0z8z9dzp683mcj9tq"], + "receivers": ["0x614E946f6D769Ad2983E4d4B81DDeBBFA51B09b5"], "contract": "union1m87a5scxnnk83wfwapxlufzm58qe2v65985exff70z95a2yr86yq7hl08h" } }, @@ -171,6 +139,7 @@ "send_packet_interval": 1, "expect_full_cycle": 900, "amount_min": 1, - "amount_max": 1 + "amount_max": 1, + "max_retry": 3 } } diff --git a/sentinel/src/chains.rs b/sentinel/src/chains.rs index cd3fe06f41..658059fdcf 100644 --- a/sentinel/src/chains.rs +++ b/sentinel/src/chains.rs @@ -1,47 +1,54 @@ -use std::{ collections::HashMap, str::FromStr, sync::Arc, time::Duration }; +use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use bech32::FromBase32; use chain_utils::{ - cosmos_sdk::{ BroadcastTxCommitError, CosmosSdkChainExt }, - ethereum::{ EthereumExecutionRpcs, EthereumExecutionRpcsExt, IBCHandlerEvents }, + cosmos_sdk::{BroadcastTxCommitError, CosmosSdkChainExt}, + ethereum::{EthereumExecutionRpcs, EthereumExecutionRpcsExt, IBCHandlerEvents}, }; use chrono::Utc; -use contracts::{ erc20, ibc_packet::IBCPacketEvents, ucs01_relay::{ LocalToken, UCS01Relay } }; +use contracts::{ + erc20, + ibc_packet::IBCPacketEvents, + ucs01_relay::{LocalToken, UCS01Relay}, +}; use ethers::{ abi::RawLog, contract::EthLogDecode, core::k256::ecdsa, - middleware::{ NonceManagerMiddleware, SignerMiddleware }, - providers::{ Middleware, Provider, Ws }, + middleware::{NonceManagerMiddleware, SignerMiddleware}, + providers::{Middleware, Provider, Ws}, signers::LocalWallet, - types::{ Address, Filter, H256 }, + types::{Address, Filter, H256}, utils::secret_key_to_address, }; use futures::StreamExt; -use hex::{ self, encode as hex_encode }; +use hex::{self, encode as hex_encode}; use prost::Message; -use protos::{ google::protobuf::Any, ibc::applications::transfer::v1::MsgTransfer }; -use rand::{ prelude::SliceRandom, rngs::StdRng, Rng, SeedableRng }; -use serde::{ Deserialize, Serialize }; -use tendermint_rpc::{ event::EventData, SubscriptionClient }; -use ucs01_relay::msg::{ ExecuteMsg, TransferMsg }; -use ucs01_relay_api::types::{ Ics20Ack, JsonWasm, Ucs01Ack }; +use protos::{google::protobuf::Any, ibc::applications::transfer::v1::MsgTransfer}; +use rand::{prelude::SliceRandom, rngs::StdRng, Rng, SeedableRng}; +use serde::{Deserialize, Serialize}; +use tendermint_rpc::{event::EventData, SubscriptionClient}; +use ucs01_relay::msg::{ExecuteMsg, TransferMsg}; +use ucs01_relay_api::types::{Ics20Ack, JsonWasm, Ucs01Ack}; use unionlabs::{ cosmos::base::coin::Coin, cosmwasm::wasm::msg_execute_contract::MsgExecuteContract, - encoding::{ self, DecodeAs }, - events::{ AcknowledgePacket, RecvPacket, SendPacket, WriteAcknowledgement }, + encoding::{self, DecodeAs}, + events::{AcknowledgePacket, RecvPacket, SendPacket, WriteAcknowledgement}, google::protobuf::any, hash::H160, - ibc::core::{ channel::channel::{ self, Channel }, client::height::Height }, - id::{ ChannelId, ClientId }, - tendermint::abci::{ event::Event as TendermintEvent, event_attribute::EventAttribute }, + ibc::core::{ + channel::channel::{self, Channel}, + client::height::Height, + }, + id::{ChannelId, ClientId}, + tendermint::abci::{event::Event as TendermintEvent, event_attribute::EventAttribute}, uint::U256, validated::ValidateT, }; use crate::{ - config::{ CosmosConfig, EthereumConfig, EventTrackerConfig, TransferModule }, + config::{CosmosConfig, EthereumConfig, EventTrackerConfig, TransferModule}, context::EventStateMap, }; pub type IbcEvent = unionlabs::events::IbcEvent; @@ -54,7 +61,8 @@ pub trait IbcTransfer: Send + Sync { destination_channel: ChannelId, denom: String, amount: u64, - memo: String + memo: String, + max_retry: u64, ); } @@ -67,7 +75,7 @@ pub trait IbcListen: Send + Sync { // the protocol here. For know i'll try bruteforce but it's not a good solution. fn write_handler_packet_ack_hex_controller( &self, - ack_hex: Vec //protocol: Protocol + ack_hex: Vec, //protocol: Protocol ) -> bool { // match protocol { // Protocol::Ics20 => { @@ -94,10 +102,8 @@ pub trait IbcListen: Send + Sync { // } // Try to decode as Ics20Ack first; - if - let Ok(val) = Ics20Ack::decode_as::( - cosmwasm_std::Binary::from(ack_hex.clone()).as_slice() - ) + if let Ok(val) = + Ics20Ack::decode_as::(cosmwasm_std::Binary::from(ack_hex.clone()).as_slice()) { match val { Ics20Ack::Result(_) => { @@ -109,11 +115,9 @@ pub trait IbcListen: Send + Sync { } } - if - let Ok(val) = Ucs01Ack::decode_as::( - cosmwasm_std::Binary::from(ack_hex.clone()).as_slice() - ) - { + if let Ok(val) = Ucs01Ack::decode_as::( + cosmwasm_std::Binary::from(ack_hex.clone()).as_slice(), + ) { return val == Ucs01Ack::Success; } else { tracing::warn!("Failed to decode ack_hex: {:?} ", ack_hex); @@ -126,7 +130,7 @@ pub trait IbcListen: Send + Sync { ibc_event: IbcEvent, event_state_map: &EventStateMap, block_number: u64, - tx_hash: Option + tx_hash: Option, ); fn handle_ibc_event_boxed<'a>( @@ -134,30 +138,26 @@ pub trait IbcListen: Send + Sync { ibc_event: IbcEvent, event_state_map: &'a EventStateMap, _block_number: u64, - tx_hash: Option + tx_hash: Option, ) -> std::pin::Pin + Send + 'a>> { Box::pin(async move { let (packet_sequence, key) = match &ibc_event { - IbcEvent::SendPacket(e) => - ( - e.packet_sequence, - format!("{}->{}", e.packet_src_channel, e.packet_dst_channel), - ), - IbcEvent::RecvPacket(e) => - ( - e.packet_sequence, - format!("{}->{}", e.packet_src_channel, e.packet_dst_channel), - ), - IbcEvent::WriteAcknowledgement(e) => - ( - e.packet_sequence, - format!("{}->{}", e.packet_src_channel, e.packet_dst_channel), - ), - IbcEvent::AcknowledgePacket(e) => - ( - e.packet_sequence, - format!("{}->{}", e.packet_src_channel, e.packet_dst_channel), - ), + IbcEvent::SendPacket(e) => ( + e.packet_sequence, + format!("{}->{}", e.packet_src_channel, e.packet_dst_channel), + ), + IbcEvent::RecvPacket(e) => ( + e.packet_sequence, + format!("{}->{}", e.packet_src_channel, e.packet_dst_channel), + ), + IbcEvent::WriteAcknowledgement(e) => ( + e.packet_sequence, + format!("{}->{}", e.packet_src_channel, e.packet_dst_channel), + ), + IbcEvent::AcknowledgePacket(e) => ( + e.packet_sequence, + format!("{}->{}", e.packet_src_channel, e.packet_dst_channel), + ), // Handle other events if necessary, _ => { return; @@ -166,17 +166,22 @@ pub trait IbcListen: Send + Sync { let sequence: i32 = packet_sequence.get() as i32; - let mut entry = event_state_map.entry(key.clone()).or_insert_with(HashMap::new); + let mut entry = event_state_map + .entry(key.clone()) + .or_insert_with(HashMap::new); let sequence_entry = entry.entry(sequence).or_insert_with(|| { let mut event_map = HashMap::new(); for idx in 0..4 { - event_map.insert(idx, EventTrackerConfig { + event_map.insert( idx, - arrived: false, - arrived_time: None, - tx_hash: None, - }); + EventTrackerConfig { + idx, + arrived: false, + arrived_time: None, + tx_hash: None, + }, + ); } event_map }); @@ -265,10 +270,9 @@ pub trait IbcListen: Send + Sync { } } IbcEvent::AcknowledgePacket(_) => { - if - sequence_entry.get(&0).map_or(false, |e| e.arrived) && - sequence_entry.get(&1).map_or(false, |e| e.arrived) && - sequence_entry.get(&2).map_or(false, |e| e.arrived) + if sequence_entry.get(&0).map_or(false, |e| e.arrived) + && sequence_entry.get(&1).map_or(false, |e| e.arrived) + && sequence_entry.get(&2).map_or(false, |e| e.arrived) { if let Some(event_data) = sequence_entry.get_mut(&3) { event_data.arrived = true; @@ -350,21 +354,25 @@ impl IbcListen for Chain { ibc_event: IbcEvent, event_state_map: &EventStateMap, block_number: u64, - tx_hash: Option + tx_hash: Option, ) { match self { Chain::Ethereum(ethereum) => { - ethereum.handle_ibc_event(ibc_event, event_state_map, block_number, tx_hash).await; + ethereum + .handle_ibc_event(ibc_event, event_state_map, block_number, tx_hash) + .await; } Chain::Cosmos(cosmos) => { - cosmos.handle_ibc_event(ibc_event, event_state_map, block_number, tx_hash).await; + cosmos + .handle_ibc_event(ibc_event, event_state_map, block_number, tx_hash) + .await; } } } fn write_handler_packet_ack_hex_controller( &self, - ack_hex: Vec // protocol: Protocol // TODO: Add it after find a way + ack_hex: Vec, // protocol: Protocol // TODO: Add it after find a way ) -> bool { IbcListen::write_handler_packet_ack_hex_controller(self, ack_hex /* , protocol*/) } @@ -378,28 +386,35 @@ impl IbcTransfer for Chain { destination_channel: ChannelId, denom: String, amount: u64, - memo: String + memo: String, + max_retry: u64, ) { match self { Chain::Ethereum(ethereum) => { - ethereum.send_ibc_transfer( - protocol, - channel, - destination_channel, - denom, - amount, - memo - ).await; + ethereum + .send_ibc_transfer( + protocol, + channel, + destination_channel, + denom, + amount, + memo, + max_retry, + ) + .await; } Chain::Cosmos(cosmos) => { - cosmos.send_ibc_transfer( - protocol, - channel, - destination_channel, - denom, - amount, - memo - ).await; + cosmos + .send_ibc_transfer( + protocol, + channel, + destination_channel, + denom, + amount, + memo, + max_retry, + ) + .await; } } } @@ -408,11 +423,14 @@ impl IbcTransfer for Chain { #[derive(Debug, Clone)] pub struct Ethereum { pub rpc: EthereumRpc, - pub relays: Vec< - UCS01Relay>>, LocalWallet>> - >, - pub signer_middlewares: Vec< - Arc>>, LocalWallet>> + pub relays: + Vec>>, LocalWallet>>>, + pub signer_middlewares: Arc< + Vec< + tokio::sync::Mutex< + Arc>>, LocalWallet>>, + >, + >, >, pub ucs01_contract: String, pub msg_senders: Vec, @@ -447,7 +465,16 @@ impl IbcListen for Ethereum { continue; } latest_checked_block = latest_block; - tracing::info!(block = latest_block, "Fetching Ethereum latest_block."); + let chain_id = provider + .get_chainid() + .await + .expect("Failed to get chain ID") + .as_u64(); + tracing::info!( + block = latest_block, + chain_id = chain_id, + "Fetching Ethereum latest_block." + ); // Update the filter to fetch logs from the latest block processed + 1 let filter = Filter::new() .address(ethers::types::H160::from(self.rpc.ibc_handler_address)) @@ -457,8 +484,7 @@ impl IbcListen for Ethereum { let logs = provider.get_logs(&filter).await.unwrap(); let logs_clone = logs.clone(); // Clone logs for processing - futures::stream - ::iter(logs_clone.clone()) + futures::stream::iter(logs_clone.clone()) .filter_map(|log| async move { let raw_log = RawLog { topics: log.topics.clone(), @@ -477,7 +503,7 @@ impl IbcListen for Ethereum { ( unionlabs::id::Bounded<9, 64>, unionlabs::id::Ics024IdentifierCharacters, - ) + ), >, String, unionlabs::validated::Validated< @@ -485,9 +511,9 @@ impl IbcListen for Ethereum { ( unionlabs::id::Bounded<9, 64>, unionlabs::id::Ics024IdentifierCharacters, - ) - > - > + ), + >, + >, > = ibchandler_events_to_ibc_event(raw_log, &self.rpc, latest_block).await; if let Some(ibc_event) = ibc_event { @@ -495,11 +521,13 @@ impl IbcListen for Ethereum { ibc_event, &event_state_map, latest_block, - transaction_hash - ).await; + transaction_hash, + ) + .await; } } - }).await; + }) + .await; } } @@ -508,22 +536,20 @@ impl IbcListen for Ethereum { ibc_event: IbcEvent, event_state_map: &EventStateMap, block_number: u64, - tx_hash: Option + tx_hash: Option, ) { - IbcListen::handle_ibc_event_boxed( - self, - ibc_event, - event_state_map, - block_number, - tx_hash - ).await; + IbcListen::handle_ibc_event_boxed(self, ibc_event, event_state_map, block_number, tx_hash) + .await; } } impl IbcListen for Cosmos { async fn listen(&self, event_state_map: &EventStateMap) { tracing::info!("Listening to Cosmos chain events"); - let mut subs = self.chain.tm_client - .subscribe(tendermint_rpc::query::EventType::Tx.into()).await + let mut subs = self + .chain + .tm_client + .subscribe(tendermint_rpc::query::EventType::Tx.into()) + .await .unwrap(); loop { tokio::select! { @@ -585,15 +611,10 @@ impl IbcListen for Cosmos { ibc_event: IbcEvent, event_state_map: &EventStateMap, block_number: u64, - tx_hash: Option + tx_hash: Option, ) { - IbcListen::handle_ibc_event_boxed( - self, - ibc_event, - event_state_map, - block_number, - tx_hash - ).await; + IbcListen::handle_ibc_event_boxed(self, ibc_event, event_state_map, block_number, tx_hash) + .await; } } @@ -601,17 +622,17 @@ impl IbcTransfer for Ethereum { async fn send_ibc_transfer( &self, protocol: Protocol, - _channel: ChannelId, + channel: ChannelId, destination_channel: ChannelId, denom: String, amount: u64, - memo: String + memo: String, + max_retry: u64, ) { let mut rng = StdRng::from_entropy(); - let index = rng.gen_range(0..self.relays.len()); // Select a random index - + let index = rng.gen_range(0..self.relays.len()); let relay = &self.relays[index]; - let signer_middleware = &self.signer_middlewares[index]; + let signer_middleware = self.signer_middlewares[index].lock().await; let msg_sender = self.msg_senders[index]; let denom_address = match ethers::types::H160::from_str(&denom) { @@ -627,9 +648,10 @@ impl IbcTransfer for Ethereum { relay .get_denom_address( destination_channel.clone().to_string(), - formatted_denom.clone() + formatted_denom.clone(), ) - .call().await + .call() + .await .unwrap() } }; @@ -659,22 +681,33 @@ impl IbcTransfer for Ethereum { } if balance < amount.into() { - tracing::warn!("Insufficient balance: {}. Requested amount: {}", balance, amount); + tracing::warn!( + "Insufficient balance: {}. Requested amount: {}", + balance, + amount + ); return; } - let allowance = erc_contract.allowance(msg_sender, self.relay_addr).await.unwrap(); + let allowance = erc_contract + .allowance(msg_sender, self.relay_addr) + .await + .unwrap(); if allowance < amount.into() { let _ = erc_contract .approve(self.relay_addr, (U256::MAX / U256::from(2)).into()) - .send().await; + .send() + .await; tracing::info!("We can not transfer after approve, returning now."); return; } let mut debug_msg; match protocol { - Protocol::Ucs01 { receivers, contract } => { + Protocol::Ucs01 { + ref receivers, + ref contract, + } => { let mut rng = StdRng::from_entropy(); let index = rng.gen_range(0..receivers.len()); // Select a random index @@ -683,67 +716,103 @@ impl IbcTransfer for Ethereum { let mut final_receiver = receiver.encode_to_vec().into(); if memo.is_empty() { - let (_hrp, data, _variant) = bech32 - ::decode(&receiver) - .expect("Invalid Bech32 address"); + let (_hrp, data, _variant) = + bech32::decode(&receiver).expect("Invalid Bech32 address"); - let bytes: Vec = Vec:: - ::from_base32(&data) - .expect("Invalid base32 data"); + let bytes: Vec = + Vec::::from_base32(&data).expect("Invalid base32 data"); final_receiver = bytes.into(); } debug_msg = format!( - "[Ethereum] -> Sent IBC transfer. memo: {}. Sending denom: {}. To: {}. Amount: {}, contract: {}", + "[Ethereum] -> Sent IBC transfer. memo: {}. Sending denom: {}. To: {:?}. Amount: {}, contract: {}, from {:?}", memo, denom, final_receiver, amount, - contract + contract, + msg_sender ); - let tx_rcp: Option = match - relay - .send( - destination_channel.clone().to_string(), - final_receiver, - [ - LocalToken { - denom: denom_address, - amount: amount as u128, - }, - ].into(), - memo, - (Height { - revision_number: 0, - revision_height: 0, - }).into(), - u64::MAX - ) - .send().await + + let tx_rcp: Option = match relay + .send( + destination_channel.clone().to_string(), + final_receiver, + [LocalToken { + denom: denom_address, + amount: amount as u128, + }] + .into(), + memo.clone(), + (Height { + revision_number: 0, + revision_height: 0, + }) + .into(), + u64::MAX, + ) + .send() + .await { - Ok(response) => - match response.await { - Ok(receipt) => - Some(receipt.expect("Failed to get transaction receipt")), - Err(e) => { - tracing::error!("Failed to get transaction receipt: {:?}", e); - return; - } + Ok(response) => match response.await { + Ok(receipt) => Some(receipt.expect("Failed to get transaction receipt")), + Err(e) => { + tracing::error!("Failed to get transaction receipt: {:?}", e); + return; } + }, Err(ethers::contract::ContractError::MiddlewareError { e }) => { if e.to_string().contains("replacement transaction underprice") { - tracing::warn!("Replacement transaction underprice."); + if max_retry == 0 { + tracing::warn!( + "Replacement transaction underprice. No more retrying" + ); + } else { + tracing::info!("Retrying transaction."); + self.send_ibc_transfer( + protocol, + channel, + destination_channel, + denom, + amount, + memo, + max_retry - 1, + ); + } } else { tracing::error!( - "Failed to send transaction eth->union: {:?}", + "MiddlewareError Failed to send transaction eth->union: {:?}", e.to_string() ); } return; } Err(e) => { - tracing::error!("Failed to send transaction eth->union: {:?}", e); + if e.to_string().contains("nonce too low") + || e.to_string() + .contains("Contract call reverted with data: 0x") + { + if max_retry == 0 { + tracing::warn!("No more retrying"); + } else { + tracing::info!( + "Retrying transaction. msg_sender: {:?}", + msg_sender + ); + self.send_ibc_transfer( + protocol, + channel, + destination_channel, + denom, + amount, + memo, + max_retry - 1, + ); + } + } else { + tracing::error!("Failed to send transaction eth->union: {:?}", e); + } return; } }; @@ -752,7 +821,10 @@ impl IbcTransfer for Ethereum { tracing::info!(debug_msg); } - Protocol::Ics20 { receivers: _, module: _ } => { + Protocol::Ics20 { + receivers: _, + module: _, + } => { unimplemented!("Ics20 protocol not implemented"); // TODO: Do we even have this case? } } @@ -762,9 +834,9 @@ impl IbcTransfer for Ethereum { impl Ethereum { pub async fn new(config: EthereumConfig) -> Self { let ethereum_rpc = EthereumRpc { - provider: Arc::new( - Provider::new(Ws::connect(config.eth_rpc_api.clone()).await.unwrap()) - ), + provider: Arc::new(Provider::new( + Ws::connect(config.eth_rpc_api.clone()).await.unwrap(), + )), ibc_handler_address: config.ibc_handler_address, }; @@ -788,26 +860,28 @@ impl Ethereum { let provider: Arc> = ethereum_rpc.provider.clone(); - let chain_id = provider.get_chainid().await.expect("Failed to get chain ID").as_u64(); + let chain_id = provider + .get_chainid() + .await + .expect("Failed to get chain ID") + .as_u64(); let wallet = LocalWallet::new_with_signer(signing_key, address_of_privkey, chain_id); - let signer_middleware = Arc::new( - SignerMiddleware::new( - NonceManagerMiddleware::new(provider.clone(), address_of_privkey), - wallet.clone() - ) - ); + let signer_middleware = Arc::new(SignerMiddleware::new( + NonceManagerMiddleware::new(provider.clone(), address_of_privkey), + wallet.clone(), + )); let relay = UCS01Relay::new(relay_addr, signer_middleware.clone()); relays.push(relay); - signers_middleware.push(signer_middleware); + signers_middleware.push(tokio::sync::Mutex::new(signer_middleware)); msg_senders.push(address_of_privkey); } Ethereum { rpc: ethereum_rpc, relays, - signer_middlewares: signers_middleware, + signer_middlewares: Arc::new(signers_middleware), ucs01_contract, msg_senders, relay_addr, @@ -829,16 +903,17 @@ impl IbcTransfer for Cosmos { async fn send_ibc_transfer( &self, protocol: Protocol, - _channel: ChannelId, + channel: ChannelId, destination_channel: ChannelId, denom: String, amount: u64, - memo: String + memo: String, + max_retry: u64, ) { self.chain.keyring.with(|signer| async move { let mut debug_msg; let transfer_msg = match protocol { - Protocol::Ics20 { receivers, module } => { + Protocol::Ics20 { ref receivers, ref module } => { let mut rng = StdRng::from_entropy(); let receiver = match receivers.choose(&mut rng) { @@ -880,7 +955,7 @@ impl IbcTransfer for Cosmos { value: msg.encode_to_vec().into(), } } - Protocol::Ucs01 { receivers, contract } => { + Protocol::Ucs01 { ref receivers, ref contract } => { let mut rng = StdRng::from_entropy(); let receiver = match receivers.choose(&mut rng) { Some(receiver) => receiver, @@ -956,6 +1031,20 @@ impl IbcTransfer for Cosmos { Err(BroadcastTxCommitError::SimulateTx(e)) => { if e.contains("account sequence mismatch") { tracing::warn!("Account sequence mismatch."); + if max_retry == 0 { + tracing::warn!("Account sequence mismatch. No more retrying"); + } else { + tracing::info!("Retrying transaction."); + self.send_ibc_transfer( + protocol, + channel, + destination_channel, + denom, + amount, + memo, + max_retry - 1 + ); + } } else { tracing::error!("Failed to simulate tx!{:?}", e.to_string()); } @@ -970,7 +1059,9 @@ impl IbcTransfer for Cosmos { impl Cosmos { pub async fn new(config: CosmosConfig) -> Self { - let cosmos = chain_utils::cosmos::Cosmos::new(config.chain_config).await.unwrap(); + let cosmos = chain_utils::cosmos::Cosmos::new(config.chain_config) + .await + .unwrap(); Cosmos { chain: cosmos } } @@ -980,14 +1071,15 @@ async fn get_channel_for_eth_ack_packet( eth_rpcs: &EthereumRpc, port_id: String, channel_id: String, - block_number: u64 + block_number: u64, ) -> Option { tracing::info!(port_id, channel_id, block_number); let channel_result = eth_rpcs .ibc_handler() .get_channel(port_id.clone(), channel_id.clone()) - .block(block_number).await; + .block(block_number) + .await; match channel_result { Ok(channel) => channel.try_into().ok(), @@ -1006,158 +1098,151 @@ async fn get_channel_for_eth_ack_packet( async fn ibchandler_events_to_ibc_event( log: RawLog, eth_rpcs: &EthereumRpc, - block_number: u64 + block_number: u64, ) -> Option { match IBCHandlerEvents::decode_log(&log) { Ok(event) => { // Handle the decoded event similarly to Tendermint events match event { - IBCHandlerEvents::PacketEvent(packet_event) => - match packet_event { - IBCPacketEvents::SendPacketFilter(event) => { - if - let Some(channel) = get_channel_for_eth_ack_packet( - eth_rpcs, - event.source_port.clone(), - event.source_channel.clone(), - block_number - ).await - { - Some( - IbcEvent::SendPacket(SendPacket { - packet_sequence: event.sequence.try_into().unwrap(), - packet_src_port: event.source_port.parse().unwrap(), - packet_src_channel: event.source_channel.parse().unwrap(), - packet_dst_port: channel.counterparty.port_id, - packet_dst_channel: channel.counterparty.channel_id - .to_string() - .parse() - .unwrap(), - packet_timeout_height: event.timeout_height.into(), - packet_timeout_timestamp: event.timeout_timestamp, - packet_data_hex: hex_encode(event.data).into(), - packet_channel_ordering: channel.ordering, - connection_id: channel.connection_hops[0].clone(), - }) - ) - } else { - None - } - } - IBCPacketEvents::RecvPacketFilter(event) => { - if - let Some(channel) = get_channel_for_eth_ack_packet( - eth_rpcs, - event.packet.destination_port.clone(), - event.packet.destination_channel.clone(), - block_number - ).await - { - tracing::info!("Found channel for packet: {:?}", channel); - Some( - IbcEvent::RecvPacket(RecvPacket { - packet_sequence: event.packet.sequence.try_into().unwrap(), - packet_src_port: event.packet.source_port.parse().unwrap(), - packet_src_channel: event.packet.source_channel - .parse() - .unwrap(), - packet_dst_port: event.packet.destination_port - .parse() - .unwrap(), - packet_dst_channel: event.packet.destination_channel - .parse() - .unwrap(), - packet_timeout_height: event.packet.timeout_height.into(), - packet_timeout_timestamp: event.packet.timeout_timestamp, - packet_data_hex: hex_encode(event.packet.data).into(), - packet_channel_ordering: channel.ordering, - connection_id: channel.connection_hops[0].clone(), - }) - ) - } else { - tracing::error!( - "Could not find channel for packet: {:?}", - event.packet - ); - None - } + IBCHandlerEvents::PacketEvent(packet_event) => match packet_event { + IBCPacketEvents::SendPacketFilter(event) => { + if let Some(channel) = get_channel_for_eth_ack_packet( + eth_rpcs, + event.source_port.clone(), + event.source_channel.clone(), + block_number, + ) + .await + { + Some(IbcEvent::SendPacket(SendPacket { + packet_sequence: event.sequence.try_into().unwrap(), + packet_src_port: event.source_port.parse().unwrap(), + packet_src_channel: event.source_channel.parse().unwrap(), + packet_dst_port: channel.counterparty.port_id, + packet_dst_channel: channel + .counterparty + .channel_id + .to_string() + .parse() + .unwrap(), + packet_timeout_height: event.timeout_height.into(), + packet_timeout_timestamp: event.timeout_timestamp, + packet_data_hex: hex_encode(event.data).into(), + packet_channel_ordering: channel.ordering, + connection_id: channel.connection_hops[0].clone(), + })) + } else { + None } - IBCPacketEvents::AcknowledgePacketFilter(event) => { - if - let Some(channel) = get_channel_for_eth_ack_packet( - eth_rpcs, - event.packet.source_port.clone(), - event.packet.source_channel.clone(), - block_number - ).await - { - Some( - IbcEvent::AcknowledgePacket(AcknowledgePacket { - packet_sequence: event.packet.sequence.try_into().unwrap(), - packet_src_port: event.packet.source_port.parse().unwrap(), - packet_src_channel: event.packet.source_channel - .parse() - .unwrap(), - packet_dst_port: event.packet.destination_port - .parse() - .unwrap(), - packet_dst_channel: event.packet.destination_channel - .parse() - .unwrap(), - packet_timeout_height: event.packet.timeout_height.into(), - packet_timeout_timestamp: event.packet.timeout_timestamp, - packet_channel_ordering: channel.ordering, - connection_id: channel.connection_hops[0].clone(), - }) - ) - } else { - None - } + } + IBCPacketEvents::RecvPacketFilter(event) => { + if let Some(channel) = get_channel_for_eth_ack_packet( + eth_rpcs, + event.packet.destination_port.clone(), + event.packet.destination_channel.clone(), + block_number, + ) + .await + { + tracing::info!("Found channel for packet: {:?}", channel); + Some(IbcEvent::RecvPacket(RecvPacket { + packet_sequence: event.packet.sequence.try_into().unwrap(), + packet_src_port: event.packet.source_port.parse().unwrap(), + packet_src_channel: event.packet.source_channel.parse().unwrap(), + packet_dst_port: event.packet.destination_port.parse().unwrap(), + packet_dst_channel: event + .packet + .destination_channel + .parse() + .unwrap(), + packet_timeout_height: event.packet.timeout_height.into(), + packet_timeout_timestamp: event.packet.timeout_timestamp, + packet_data_hex: hex_encode(event.packet.data).into(), + packet_channel_ordering: channel.ordering, + connection_id: channel.connection_hops[0].clone(), + })) + } else { + tracing::error!( + "Could not find channel for packet: {:?}", + event.packet + ); + None } - IBCPacketEvents::WriteAcknowledgementFilter(event) => { - if - let Some(channel) = get_channel_for_eth_ack_packet( - eth_rpcs, - event.packet.destination_port.clone(), - event.packet.destination_channel.clone(), - block_number - ).await - { - Some( - IbcEvent::WriteAcknowledgement(WriteAcknowledgement { - packet_sequence: event.packet.sequence.try_into().unwrap(), - packet_src_port: event.packet.source_port - .to_string() - .parse() - .unwrap(), - packet_src_channel: event.packet.destination_channel - .parse() - .unwrap(), - packet_dst_port: event.packet.destination_port - .parse() - .unwrap(), - packet_dst_channel: event.packet.destination_channel - .parse() - .unwrap(), - packet_timeout_height: Height { - revision_number: 0, - revision_height: 0, - }, - packet_ack_hex: event.acknowledgement.to_vec(), - packet_data_hex: hex_encode(event.packet.data).into(), - packet_timeout_timestamp: 0, - connection_id: channel.connection_hops[0].clone(), - }) - ) - } else { - None - } + } + IBCPacketEvents::AcknowledgePacketFilter(event) => { + if let Some(channel) = get_channel_for_eth_ack_packet( + eth_rpcs, + event.packet.source_port.clone(), + event.packet.source_channel.clone(), + block_number, + ) + .await + { + Some(IbcEvent::AcknowledgePacket(AcknowledgePacket { + packet_sequence: event.packet.sequence.try_into().unwrap(), + packet_src_port: event.packet.source_port.parse().unwrap(), + packet_src_channel: event.packet.source_channel.parse().unwrap(), + packet_dst_port: event.packet.destination_port.parse().unwrap(), + packet_dst_channel: event + .packet + .destination_channel + .parse() + .unwrap(), + packet_timeout_height: event.packet.timeout_height.into(), + packet_timeout_timestamp: event.packet.timeout_timestamp, + packet_channel_ordering: channel.ordering, + connection_id: channel.connection_hops[0].clone(), + })) + } else { + None } - _ => { - tracing::warn!("Unhandled packet event type."); + } + IBCPacketEvents::WriteAcknowledgementFilter(event) => { + if let Some(channel) = get_channel_for_eth_ack_packet( + eth_rpcs, + event.packet.destination_port.clone(), + event.packet.destination_channel.clone(), + block_number, + ) + .await + { + Some(IbcEvent::WriteAcknowledgement(WriteAcknowledgement { + packet_sequence: event.packet.sequence.try_into().unwrap(), + packet_src_port: event + .packet + .source_port + .to_string() + .parse() + .unwrap(), + packet_src_channel: event + .packet + .destination_channel + .parse() + .unwrap(), + packet_dst_port: event.packet.destination_port.parse().unwrap(), + packet_dst_channel: event + .packet + .destination_channel + .parse() + .unwrap(), + packet_timeout_height: Height { + revision_number: 0, + revision_height: 0, + }, + packet_ack_hex: event.acknowledgement.to_vec(), + packet_data_hex: hex_encode(event.packet.data).into(), + packet_timeout_timestamp: 0, + connection_id: channel.connection_hops[0].clone(), + })) + } else { None } } + _ => { + tracing::warn!("Unhandled packet event type."); + None + } + }, _ => { tracing::warn!("Unhandled event type."); None diff --git a/sentinel/src/config.rs b/sentinel/src/config.rs index 4d4892468a..ae0ca29eec 100644 --- a/sentinel/src/config.rs +++ b/sentinel/src/config.rs @@ -58,6 +58,7 @@ pub struct IbcInteraction { pub memo: String, pub sending_memo_probability: f64, pub denoms: Vec, + pub max_retry: u64, } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/sentinel/src/context.rs b/sentinel/src/context.rs index 01924a3fdf..6e39dcf118 100644 --- a/sentinel/src/context.rs +++ b/sentinel/src/context.rs @@ -107,6 +107,7 @@ impl Context { denom, amount, memo, + interaction.max_retry, ) .await; } @@ -119,6 +120,7 @@ impl Context { denom, amount, memo, + interaction.max_retry, ) .await; } @@ -167,7 +169,9 @@ impl Context { loop { interval.tick().await; for ethereum in ðereum_chains { - for signer_middleware in ðereum.signer_middlewares { + for signer_middleware in &*ethereum.signer_middlewares { + let signer_middleware = signer_middleware.lock().await; + let address = signer_middleware.address(); let provider: Arc> = ethereum.rpc.provider.clone(); @@ -182,7 +186,7 @@ impl Context { Ok(balance) => { if balance < min_balance { tracing::error!( - "[INSUFFICIENT BALANCE] Insufficient native balance for address {}. Balance: {}, Required: {}. Chain ID: {}", + "[INSUFFICIENT BALANCE] Insufficient native balance for address {:?}. Balance: {}, Required: {}. Chain ID: {}", address, balance, min_balance, @@ -190,7 +194,7 @@ impl Context { ); } else { tracing::info!( - "Sufficient ETH balance for address {}. Balance: {}, Required: {}", + "Sufficient ETH balance for address {:?}. Balance: {}, Required: {}", address, balance, min_balance @@ -199,7 +203,7 @@ impl Context { } Err(e) => tracing::error!( - "Error checking native balance for address {}. Required: {}. Error: {:?}", + "Error checking native balance for address {:?}. Required: {}. Error: {:?}", address, min_balance, e @@ -293,6 +297,7 @@ impl Context { denom, amount, memo, + interaction.max_retry, ) .await; } @@ -305,6 +310,7 @@ impl Context { denom, amount, memo, + interaction.max_retry, ) .await; }