Skip to content

Commit

Permalink
feat: visualization service (#3)
Browse files Browse the repository at this point in the history
* feat: visualization service

* fixed missing tokio features
  • Loading branch information
giangndm authored Oct 8, 2024
1 parent 6b5be58 commit 9d5bfe9
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ description = "A lightweight peer-to-peer network utilizing the atm0s routing me
license = "MIT"

[dependencies]
tokio = { version = "1", features = ["io-util", "sync"] }
tokio = { version = "1", features = ["io-util", "sync", "macros"] }
anyhow = { version = "1" }
quinn = { version = "0.11", features = ["ring", "runtime-tokio", "futures-io"] }
rustls = { version = "0.23", features = ["ring", "std"] }
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use msg::{P2pServiceId, PeerMessage};
use neighbours::NetworkNeighbours;
use peer::PeerConnection;
use quinn::{Endpoint, Incoming, VarInt};
use router::{RouterTableSync, SharedRouterTable};
use router::RouterTableSync;
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use serde::{Deserialize, Serialize};
use tokio::{
Expand Down Expand Up @@ -37,6 +37,7 @@ mod tests;
mod utils;

pub use requester::P2pNetworkRequester;
pub use router::SharedRouterTable;
pub use service::*;
pub use stream::P2pQuicStream;
pub use utils::*;
Expand Down
4 changes: 4 additions & 0 deletions src/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ impl P2pNetworkRequester {
self.control_tx.send(ControlCmd::Connect(addr, Some(tx))).expect("should send to main loop");
rx.await?
}

pub fn try_connect(&self, addr: PeerAddress) {
self.control_tx.send(ControlCmd::Connect(addr, None)).expect("should send to main loop");
}
}
26 changes: 17 additions & 9 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,26 @@ pub enum RouteAction {
}

#[derive(Debug)]
pub struct RouterTable {
struct RouterTable {
address: PeerAddress,
peers: BTreeMap<PeerAddress, PeerMemory>,
directs: BTreeMap<PeerAddress, PathMetric>,
}

impl RouterTable {
pub fn new(address: PeerAddress) -> Self {
fn new(address: PeerAddress) -> Self {
Self {
address,
peers: Default::default(),
directs: Default::default(),
}
}

pub fn local_address(&self) -> PeerAddress {
fn local_address(&self) -> PeerAddress {
self.address
}

pub fn create_sync(&self, dest: &PeerAddress) -> RouterTableSync {
fn create_sync(&self, dest: &PeerAddress) -> RouterTableSync {
RouterTableSync(
self.peers
.iter()
Expand All @@ -70,7 +70,7 @@ impl RouterTable {
)
}

pub fn apply_sync(&mut self, from: PeerAddress, sync: RouterTableSync) {
fn apply_sync(&mut self, from: PeerAddress, sync: RouterTableSync) {
let direct_metric = self.directs.get(&from).expect("should have direct metric with apply_sync");
// ensure we have memory for each sync paths
for (peer, _) in sync.0.iter() {
Expand Down Expand Up @@ -109,14 +109,14 @@ impl RouterTable {
self.peers.retain(|_k, v| v.best().is_some());
}

pub fn set_direct(&mut self, from: PeerAddress, ttl_ms: u16) {
fn set_direct(&mut self, from: PeerAddress, ttl_ms: u16) {
self.directs.insert(from, (1, ttl_ms).into());
let memory = self.peers.entry(from).or_default();
memory.paths.insert(from, PathMetric { relay_hops: 0, rtt_ms: ttl_ms });
Self::select_best_for(&from, memory);
}

pub fn del_direct(&mut self, from: &PeerAddress) {
fn del_direct(&mut self, from: &PeerAddress) {
self.directs.remove(&from);
if let Some(memory) = self.peers.get_mut(from) {
memory.paths.remove(&from);
Expand All @@ -127,7 +127,7 @@ impl RouterTable {
}
}

pub fn action(&self, dest: &PeerAddress) -> Option<RouteAction> {
fn action(&self, dest: &PeerAddress) -> Option<RouteAction> {
if self.address.eq(dest) {
Some(RouteAction::Local)
} else {
Expand All @@ -136,7 +136,7 @@ impl RouterTable {
}

/// Get next remote
pub fn next_remote(&self, next: &PeerAddress) -> Option<(PeerAddress, PathMetric)> {
fn next_remote(&self, next: &PeerAddress) -> Option<(PeerAddress, PathMetric)> {
let memory = self.peers.get(next)?;
let best = memory.best()?;
let metric = memory.best_metric().expect("should have metric");
Expand All @@ -152,6 +152,10 @@ impl RouterTable {
);
}
}

fn neighbours(&self) -> Vec<(PeerAddress, u16)> {
self.directs.iter().map(|(k, v)| (*k, v.rtt_ms)).collect()
}
}

impl PathMetric {
Expand Down Expand Up @@ -240,6 +244,10 @@ impl SharedRouterTable {
pub fn next_remote(&self, dest: &PeerAddress) -> Option<(PeerAddress, PathMetric)> {
self.table.read().next_remote(dest)
}

pub fn neighbours(&self) -> Vec<(PeerAddress, u16)> {
self.table.read().neighbours()
}
}

#[cfg(test)]
Expand Down
11 changes: 10 additions & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use tokio::sync::mpsc::{channel, Receiver, Sender};

use crate::{ctx::SharedCtx, msg::P2pServiceId, stream::P2pQuicStream, PeerAddress};
use crate::{ctx::SharedCtx, msg::P2pServiceId, router::SharedRouterTable, stream::P2pQuicStream, PeerAddress};

pub mod alias_service;
pub mod visualization_service;

const SERVICE_CHANNEL_SIZE: usize = 10;

Expand Down Expand Up @@ -58,6 +59,10 @@ impl P2pService {
self.ctx.open_stream(self.service, dest, meta).await
}

pub fn router(&self) -> &SharedRouterTable {
self.ctx.router()
}

pub async fn recv(&mut self) -> Option<P2pServiceEvent> {
self.rx.recv().await
}
Expand All @@ -83,4 +88,8 @@ impl P2pServiceRequester {
pub async fn open_stream(&self, dest: PeerAddress, meta: Vec<u8>) -> anyhow::Result<P2pQuicStream> {
self.ctx.open_stream(self.service, dest, meta).await
}

pub fn router(&self) -> &SharedRouterTable {
self.ctx.router()
}
}
115 changes: 115 additions & 0 deletions src/service/visualization_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::{
collections::{HashMap, VecDeque},
time::Duration,
};

use serde::{Deserialize, Serialize};
use tokio::{select, time::Interval};

use crate::{now_ms, ErrorExt, PeerAddress};

use super::{P2pService, P2pServiceEvent};

#[derive(Debug, PartialEq, Eq)]
pub enum VisualizationServiceEvent {
PeerJoined(PeerAddress, Vec<(PeerAddress, u16)>),
PeerUpdated(PeerAddress, Vec<(PeerAddress, u16)>),
PeerLeaved(PeerAddress),
}

#[derive(Debug, Serialize, Deserialize)]
enum Message {
Scan,
Info(Vec<(PeerAddress, u16)>),
}

pub struct VisualizationService {
service: P2pService,
neighbours: HashMap<PeerAddress, u64>,
ticker: Interval,
collect_interval: Option<Duration>,
collect_me: bool,
outs: VecDeque<VisualizationServiceEvent>,
}

impl VisualizationService {
pub fn new(collect_interval: Option<Duration>, collect_me: bool, service: P2pService) -> Self {
let ticker = tokio::time::interval(collect_interval.unwrap_or(Duration::from_secs(100)));

Self {
ticker,
collect_interval,
collect_me,
neighbours: HashMap::new(),
outs: if collect_me {
VecDeque::from([VisualizationServiceEvent::PeerJoined(service.router().local_address(), vec![])])
} else {
VecDeque::new()
},
service,
}
}

pub async fn recv(&mut self) -> anyhow::Result<VisualizationServiceEvent> {
loop {
if let Some(out) = self.outs.pop_front() {
return Ok(out);
}

select! {
_ = self.ticker.tick() => {
if let Some(interval) = self.collect_interval {
if self.collect_me {
// for update local node
self.outs.push_back(VisualizationServiceEvent::PeerUpdated(self.service.router().local_address(), self.service.router().neighbours()));
}

let requester = self.service.requester();
tokio::spawn(async move {
requester.send_broadcast(bincode::serialize(&Message::Scan).expect("should convert to buf")).await;
});

let now = now_ms();
let mut timeout_peers = vec![];
for (peer, last_updated) in self.neighbours.iter() {
if now >= *last_updated + interval.as_millis() as u64 * 2 {
timeout_peers.push(*peer);
self.outs.push_back(VisualizationServiceEvent::PeerLeaved(*peer));
}
}

for peer in timeout_peers {
self.neighbours.remove(&peer);
}
}
}
event = self.service.recv() => match event.expect("should work") {
P2pServiceEvent::Unicast(from, data) | P2pServiceEvent::Broadcast(from, data) => {
if let Ok(msg) = bincode::deserialize::<Message>(&data) {
match msg {
Message::Scan => {
let requester = self.service.requester();
let neighbours: Vec<(PeerAddress, u16)> = requester.router().neighbours();
tokio::spawn(async move {
requester
.send_unicast(from, bincode::serialize(&Message::Info(neighbours)).expect("should convert to buf"))
.await
.print_on_err("send neighbour info to visualization collector");
});
}
Message::Info(neighbours) => {
if self.neighbours.insert(from, now_ms()).is_none() {
self.outs.push_back(VisualizationServiceEvent::PeerJoined(from, neighbours));
} else {
self.outs.push_back(VisualizationServiceEvent::PeerUpdated(from, neighbours));
}
}
}
}
}
P2pServiceEvent::Stream(..) => {}
}
}
}
}
}
1 change: 1 addition & 0 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{P2pNetwork, P2pNetworkConfig, PeerAddress};
mod alias;
mod cross_nodes;
mod discovery;
mod visualization;

pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert");
pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key");
Expand Down
47 changes: 47 additions & 0 deletions src/tests/visualization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::time::Duration;

use test_log::test;

use crate::visualization_service::{VisualizationService, VisualizationServiceEvent};

use super::create_random_node;

#[test(tokio::test)]
async fn discovery_new_node() {
let (mut node1, addr1) = create_random_node(true).await;
let mut service1 = VisualizationService::new(None, false, node1.create_service(0.into()));
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });
tokio::spawn(async move { while let Ok(_) = service1.recv().await {} });

let (mut node2, addr2) = create_random_node(false).await;
let mut service2 = VisualizationService::new(Some(Duration::from_secs(1)), false, node2.create_service(0.into()));
let node2_requester = node2.requester();
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

node2_requester.connect(addr1).await.expect("should connect success");
tokio::time::sleep(Duration::from_secs(1)).await;

let mut events = vec![
tokio::time::timeout(Duration::from_secs(3), service2.recv()).await.unwrap().unwrap(),
tokio::time::timeout(Duration::from_secs(3), service2.recv()).await.unwrap().unwrap(),
];

for event in events.iter_mut() {
match event {
VisualizationServiceEvent::PeerJoined(_, neighbours) | VisualizationServiceEvent::PeerUpdated(_, neighbours) => {
for (_, rtt) in neighbours.iter_mut() {
*rtt = 0;
}
}
VisualizationServiceEvent::PeerLeaved(_) => {}
}
}

assert_eq!(
events,
vec![
VisualizationServiceEvent::PeerJoined(addr1, vec![(addr2, 0)]),
VisualizationServiceEvent::PeerUpdated(addr1, vec![(addr2, 0)]),
]
);
}

0 comments on commit 9d5bfe9

Please sign in to comment.