diff --git a/Cargo.toml b/Cargo.toml index 864f89d..1307377 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ description = "A lightweight peer-to-peer network utilizing the atm0s routing me license = "MIT" [dependencies] -tokio = { version = "1", features = ["io-util", "sync"] } +tokio = { version = "1", features = ["io-util", "sync", "macros"] } anyhow = { version = "1" } quinn = { version = "0.11", features = ["ring", "runtime-tokio", "futures-io"] } rustls = { version = "0.23", features = ["ring", "std"] } diff --git a/src/lib.rs b/src/lib.rs index 6fe1803..b4a5ef0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,7 @@ use msg::{P2pServiceId, PeerMessage}; use neighbours::NetworkNeighbours; use peer::PeerConnection; use quinn::{Endpoint, Incoming, VarInt}; -use router::{RouterTableSync, SharedRouterTable}; +use router::RouterTableSync; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use serde::{Deserialize, Serialize}; use tokio::{ @@ -37,6 +37,7 @@ mod tests; mod utils; pub use requester::P2pNetworkRequester; +pub use router::SharedRouterTable; pub use service::*; pub use stream::P2pQuicStream; pub use utils::*; diff --git a/src/requester.rs b/src/requester.rs index 2e025b8..23f46e1 100644 --- a/src/requester.rs +++ b/src/requester.rs @@ -12,4 +12,8 @@ impl P2pNetworkRequester { self.control_tx.send(ControlCmd::Connect(addr, Some(tx))).expect("should send to main loop"); rx.await? } + + pub fn try_connect(&self, addr: PeerAddress) { + self.control_tx.send(ControlCmd::Connect(addr, None)).expect("should send to main loop"); + } } diff --git a/src/router.rs b/src/router.rs index 82c9876..9969770 100644 --- a/src/router.rs +++ b/src/router.rs @@ -41,14 +41,14 @@ pub enum RouteAction { } #[derive(Debug)] -pub struct RouterTable { +struct RouterTable { address: PeerAddress, peers: BTreeMap, directs: BTreeMap, } impl RouterTable { - pub fn new(address: PeerAddress) -> Self { + fn new(address: PeerAddress) -> Self { Self { address, peers: Default::default(), @@ -56,11 +56,11 @@ impl RouterTable { } } - pub fn local_address(&self) -> PeerAddress { + fn local_address(&self) -> PeerAddress { self.address } - pub fn create_sync(&self, dest: &PeerAddress) -> RouterTableSync { + fn create_sync(&self, dest: &PeerAddress) -> RouterTableSync { RouterTableSync( self.peers .iter() @@ -70,7 +70,7 @@ impl RouterTable { ) } - pub fn apply_sync(&mut self, from: PeerAddress, sync: RouterTableSync) { + fn apply_sync(&mut self, from: PeerAddress, sync: RouterTableSync) { let direct_metric = self.directs.get(&from).expect("should have direct metric with apply_sync"); // ensure we have memory for each sync paths for (peer, _) in sync.0.iter() { @@ -109,14 +109,14 @@ impl RouterTable { self.peers.retain(|_k, v| v.best().is_some()); } - pub fn set_direct(&mut self, from: PeerAddress, ttl_ms: u16) { + fn set_direct(&mut self, from: PeerAddress, ttl_ms: u16) { self.directs.insert(from, (1, ttl_ms).into()); let memory = self.peers.entry(from).or_default(); memory.paths.insert(from, PathMetric { relay_hops: 0, rtt_ms: ttl_ms }); Self::select_best_for(&from, memory); } - pub fn del_direct(&mut self, from: &PeerAddress) { + fn del_direct(&mut self, from: &PeerAddress) { self.directs.remove(&from); if let Some(memory) = self.peers.get_mut(from) { memory.paths.remove(&from); @@ -127,7 +127,7 @@ impl RouterTable { } } - pub fn action(&self, dest: &PeerAddress) -> Option { + fn action(&self, dest: &PeerAddress) -> Option { if self.address.eq(dest) { Some(RouteAction::Local) } else { @@ -136,7 +136,7 @@ impl RouterTable { } /// Get next remote - pub fn next_remote(&self, next: &PeerAddress) -> Option<(PeerAddress, PathMetric)> { + fn next_remote(&self, next: &PeerAddress) -> Option<(PeerAddress, PathMetric)> { let memory = self.peers.get(next)?; let best = memory.best()?; let metric = memory.best_metric().expect("should have metric"); @@ -152,6 +152,10 @@ impl RouterTable { ); } } + + fn neighbours(&self) -> Vec<(PeerAddress, u16)> { + self.directs.iter().map(|(k, v)| (*k, v.rtt_ms)).collect() + } } impl PathMetric { @@ -240,6 +244,10 @@ impl SharedRouterTable { pub fn next_remote(&self, dest: &PeerAddress) -> Option<(PeerAddress, PathMetric)> { self.table.read().next_remote(dest) } + + pub fn neighbours(&self) -> Vec<(PeerAddress, u16)> { + self.table.read().neighbours() + } } #[cfg(test)] diff --git a/src/service.rs b/src/service.rs index 80d9da0..91f428d 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,8 +1,9 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; -use crate::{ctx::SharedCtx, msg::P2pServiceId, stream::P2pQuicStream, PeerAddress}; +use crate::{ctx::SharedCtx, msg::P2pServiceId, router::SharedRouterTable, stream::P2pQuicStream, PeerAddress}; pub mod alias_service; +pub mod visualization_service; const SERVICE_CHANNEL_SIZE: usize = 10; @@ -58,6 +59,10 @@ impl P2pService { self.ctx.open_stream(self.service, dest, meta).await } + pub fn router(&self) -> &SharedRouterTable { + self.ctx.router() + } + pub async fn recv(&mut self) -> Option { self.rx.recv().await } @@ -83,4 +88,8 @@ impl P2pServiceRequester { pub async fn open_stream(&self, dest: PeerAddress, meta: Vec) -> anyhow::Result { self.ctx.open_stream(self.service, dest, meta).await } + + pub fn router(&self) -> &SharedRouterTable { + self.ctx.router() + } } diff --git a/src/service/visualization_service.rs b/src/service/visualization_service.rs new file mode 100644 index 0000000..556763a --- /dev/null +++ b/src/service/visualization_service.rs @@ -0,0 +1,115 @@ +use std::{ + collections::{HashMap, VecDeque}, + time::Duration, +}; + +use serde::{Deserialize, Serialize}; +use tokio::{select, time::Interval}; + +use crate::{now_ms, ErrorExt, PeerAddress}; + +use super::{P2pService, P2pServiceEvent}; + +#[derive(Debug, PartialEq, Eq)] +pub enum VisualizationServiceEvent { + PeerJoined(PeerAddress, Vec<(PeerAddress, u16)>), + PeerUpdated(PeerAddress, Vec<(PeerAddress, u16)>), + PeerLeaved(PeerAddress), +} + +#[derive(Debug, Serialize, Deserialize)] +enum Message { + Scan, + Info(Vec<(PeerAddress, u16)>), +} + +pub struct VisualizationService { + service: P2pService, + neighbours: HashMap, + ticker: Interval, + collect_interval: Option, + collect_me: bool, + outs: VecDeque, +} + +impl VisualizationService { + pub fn new(collect_interval: Option, collect_me: bool, service: P2pService) -> Self { + let ticker = tokio::time::interval(collect_interval.unwrap_or(Duration::from_secs(100))); + + Self { + ticker, + collect_interval, + collect_me, + neighbours: HashMap::new(), + outs: if collect_me { + VecDeque::from([VisualizationServiceEvent::PeerJoined(service.router().local_address(), vec![])]) + } else { + VecDeque::new() + }, + service, + } + } + + pub async fn recv(&mut self) -> anyhow::Result { + loop { + if let Some(out) = self.outs.pop_front() { + return Ok(out); + } + + select! { + _ = self.ticker.tick() => { + if let Some(interval) = self.collect_interval { + if self.collect_me { + // for update local node + self.outs.push_back(VisualizationServiceEvent::PeerUpdated(self.service.router().local_address(), self.service.router().neighbours())); + } + + let requester = self.service.requester(); + tokio::spawn(async move { + requester.send_broadcast(bincode::serialize(&Message::Scan).expect("should convert to buf")).await; + }); + + let now = now_ms(); + let mut timeout_peers = vec![]; + for (peer, last_updated) in self.neighbours.iter() { + if now >= *last_updated + interval.as_millis() as u64 * 2 { + timeout_peers.push(*peer); + self.outs.push_back(VisualizationServiceEvent::PeerLeaved(*peer)); + } + } + + for peer in timeout_peers { + self.neighbours.remove(&peer); + } + } + } + event = self.service.recv() => match event.expect("should work") { + P2pServiceEvent::Unicast(from, data) | P2pServiceEvent::Broadcast(from, data) => { + if let Ok(msg) = bincode::deserialize::(&data) { + match msg { + Message::Scan => { + let requester = self.service.requester(); + let neighbours: Vec<(PeerAddress, u16)> = requester.router().neighbours(); + tokio::spawn(async move { + requester + .send_unicast(from, bincode::serialize(&Message::Info(neighbours)).expect("should convert to buf")) + .await + .print_on_err("send neighbour info to visualization collector"); + }); + } + Message::Info(neighbours) => { + if self.neighbours.insert(from, now_ms()).is_none() { + self.outs.push_back(VisualizationServiceEvent::PeerJoined(from, neighbours)); + } else { + self.outs.push_back(VisualizationServiceEvent::PeerUpdated(from, neighbours)); + } + } + } + } + } + P2pServiceEvent::Stream(..) => {} + } + } + } + } +} diff --git a/src/tests.rs b/src/tests.rs index d6087d2..0ce7230 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -7,6 +7,7 @@ use crate::{P2pNetwork, P2pNetworkConfig, PeerAddress}; mod alias; mod cross_nodes; mod discovery; +mod visualization; pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert"); pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key"); diff --git a/src/tests/visualization.rs b/src/tests/visualization.rs new file mode 100644 index 0000000..cec4a40 --- /dev/null +++ b/src/tests/visualization.rs @@ -0,0 +1,47 @@ +use std::time::Duration; + +use test_log::test; + +use crate::visualization_service::{VisualizationService, VisualizationServiceEvent}; + +use super::create_random_node; + +#[test(tokio::test)] +async fn discovery_new_node() { + let (mut node1, addr1) = create_random_node(true).await; + let mut service1 = VisualizationService::new(None, false, node1.create_service(0.into())); + tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); + tokio::spawn(async move { while let Ok(_) = service1.recv().await {} }); + + let (mut node2, addr2) = create_random_node(false).await; + let mut service2 = VisualizationService::new(Some(Duration::from_secs(1)), false, node2.create_service(0.into())); + let node2_requester = node2.requester(); + tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); + + node2_requester.connect(addr1).await.expect("should connect success"); + tokio::time::sleep(Duration::from_secs(1)).await; + + let mut events = vec![ + tokio::time::timeout(Duration::from_secs(3), service2.recv()).await.unwrap().unwrap(), + tokio::time::timeout(Duration::from_secs(3), service2.recv()).await.unwrap().unwrap(), + ]; + + for event in events.iter_mut() { + match event { + VisualizationServiceEvent::PeerJoined(_, neighbours) | VisualizationServiceEvent::PeerUpdated(_, neighbours) => { + for (_, rtt) in neighbours.iter_mut() { + *rtt = 0; + } + } + VisualizationServiceEvent::PeerLeaved(_) => {} + } + } + + assert_eq!( + events, + vec![ + VisualizationServiceEvent::PeerJoined(addr1, vec![(addr2, 0)]), + VisualizationServiceEvent::PeerUpdated(addr1, vec![(addr2, 0)]), + ] + ); +}