From d197727d91f2a510370115d1769597106d04b23e Mon Sep 17 00:00:00 2001 From: erhant Date: Thu, 6 Feb 2025 13:32:21 +0300 Subject: [PATCH] always dial mode --- Cargo.lock | 8 +++---- Cargo.toml | 2 +- compute/src/node/core.rs | 2 +- compute/src/node/diagnostic.rs | 41 +++++++++++++++++++++------------- p2p/src/client.rs | 13 +++++++++-- p2p/src/commands.rs | 13 +++++++---- 6 files changed, 51 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33c7efa..d66f8cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1031,7 +1031,7 @@ dependencies = [ [[package]] name = "dkn-compute" -version = "0.3.3" +version = "0.3.4" dependencies = [ "async-trait", "base64 0.22.1", @@ -1066,7 +1066,7 @@ dependencies = [ [[package]] name = "dkn-p2p" -version = "0.3.3" +version = "0.3.4" dependencies = [ "dkn-utils", "env_logger 0.11.6", @@ -1082,11 +1082,11 @@ dependencies = [ [[package]] name = "dkn-utils" -version = "0.3.3" +version = "0.3.4" [[package]] name = "dkn-workflows" -version = "0.3.3" +version = "0.3.4" dependencies = [ "dkn-utils", "dotenvy", diff --git a/Cargo.toml b/Cargo.toml index b0656cb..be65e17 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ default-members = ["compute"] [workspace.package] edition = "2021" -version = "0.3.3" +version = "0.3.4" license = "Apache-2.0" readme = "README.md" diff --git a/compute/src/node/core.rs b/compute/src/node/core.rs index 0bad4ad..6100736 100644 --- a/compute/src/node/core.rs +++ b/compute/src/node/core.rs @@ -11,7 +11,7 @@ impl DriaComputeNode { /// Number of seconds between refreshing for diagnostic prints. const DIAGNOSTIC_REFRESH_INTERVAL_SECS: u64 = 30; /// Number of seconds between refreshing the available nodes. - const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 10 * 60; // 30 minutes + const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 10 * 60; // prepare durations for sleeps let mut diagnostic_refresh_interval = diff --git a/compute/src/node/diagnostic.rs b/compute/src/node/diagnostic.rs index a26a1d1..ba89b9d 100644 --- a/compute/src/node/diagnostic.rs +++ b/compute/src/node/diagnostic.rs @@ -1,3 +1,4 @@ +use dkn_p2p::libp2p::multiaddr::Protocol; use std::time::Duration; use tokio::time::Instant; @@ -86,23 +87,31 @@ impl DriaComputeNode { }; // dial all rpc nodes - for rpc_addr in self.dria_nodes.rpc_nodes.iter() { - log::info!("Dialling RPC node: {}", rpc_addr); - - let fut = self.p2p.dial(rpc_addr.clone()); - match tokio::time::timeout(Duration::from_secs(10), fut).await { - Err(timeout) => { - log::error!("Timeout dialling RPC node: {:?}", timeout); - } - Ok(res) => match res { - Err(e) => { - log::warn!("Error dialling RPC node: {:?}", e); + for addr in self.dria_nodes.rpc_nodes.iter() { + log::info!("Dialling RPC node: {}", addr); + + // get peer id from rpc address + if let Some(peer_id) = addr.iter().find_map(|p| match p { + Protocol::P2p(peer_id) => Some(peer_id), + _ => None, + }) { + let fut = self.p2p.dial(peer_id, addr.clone()); + match tokio::time::timeout(Duration::from_secs(10), fut).await { + Err(timeout) => { + log::error!("Timeout dialling RPC node: {:?}", timeout); } - Ok(_) => { - log::info!("Successfully dialled RPC node: {}", rpc_addr); - } - }, - }; + Ok(res) => match res { + Err(e) => { + log::warn!("Error dialling RPC node: {:?}", e); + } + Ok(_) => { + log::info!("Successfully dialled RPC node: {}", addr); + } + }, + }; + } else { + log::warn!("Missing peerID in address: {}", addr); + } } log::info!("Finished refreshing!"); diff --git a/p2p/src/client.rs b/p2p/src/client.rs index 1a02157..a9b9403 100644 --- a/p2p/src/client.rs +++ b/p2p/src/client.rs @@ -3,6 +3,7 @@ use libp2p::futures::StreamExt; use libp2p::gossipsub::{Message, MessageId}; use libp2p::kad::{GetClosestPeersError, GetClosestPeersOk, QueryResult}; use libp2p::request_response::{self, ResponseChannel}; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::SwarmEvent; use libp2p::{autonat, gossipsub, identify, kad, multiaddr::Protocol, noise, tcp, yamux}; use libp2p::{Multiaddr, PeerId, Swarm, SwarmBuilder}; @@ -182,8 +183,16 @@ impl DriaP2PClient { /// Handles a single command, which originates from `DriaP2PCommander`. pub async fn handle_command(&mut self, command: DriaP2PCommand) { match command { - DriaP2PCommand::Dial { peer_id, sender } => { - let _ = sender.send(self.swarm.dial(peer_id)); + DriaP2PCommand::Dial { + peer_id, + address, + sender, + } => { + let opts = DialOpts::peer_id(peer_id) + .addresses(vec![address]) + .condition(PeerCondition::Always) + .build(); + let _ = sender.send(self.swarm.dial(opts)); } DriaP2PCommand::NetworkInfo { sender } => { let _ = sender.send(self.swarm.network_info()); diff --git a/p2p/src/commands.rs b/p2p/src/commands.rs index ce971e1..e972a46 100644 --- a/p2p/src/commands.rs +++ b/p2p/src/commands.rs @@ -22,9 +22,10 @@ pub enum DriaP2PCommand { PeerCounts { sender: oneshot::Sender<(usize, usize)>, }, - /// Dial a peer. + /// Dial a known peer. Dial { - peer_id: Multiaddr, + peer_id: PeerId, + address: Multiaddr, sender: oneshot::Sender>, }, /// Subscribe to a topic. @@ -206,11 +207,15 @@ impl DriaP2PCommander { } /// Dials a given peer. - pub async fn dial(&mut self, peer_id: Multiaddr) -> Result<()> { + pub async fn dial(&mut self, peer_id: PeerId, address: Multiaddr) -> Result<()> { let (sender, receiver) = oneshot::channel(); self.sender - .send(DriaP2PCommand::Dial { peer_id, sender }) + .send(DriaP2PCommand::Dial { + peer_id, + address, + sender, + }) .await .wrap_err("could not send")?;