Skip to content

Commit

Permalink
feat: added config seeds for avoiding manual process with seeds (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm authored Oct 9, 2024
1 parent dd4b66e commit 651127f
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 70 deletions.
15 changes: 2 additions & 13 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, str::FromStr, time::Duration};
use std::{net::SocketAddr, str::FromStr};

use atm0s_small_p2p::{P2pNetwork, P2pNetworkConfig, PeerAddress};
use clap::Parser;
Expand Down Expand Up @@ -54,22 +54,11 @@ async fn main() {
priv_key: key,
cert: cert,
tick_ms: 100,
seeds: args.sdn_seeds.into_iter().map(|s| PeerAddress::from_str(s.as_str()).expect("should parse address")).collect::<Vec<_>>(),
})
.await
.expect("should create network");

let sdn_seeds = args.sdn_seeds.into_iter().map(|s| PeerAddress::from_str(s.as_str()).expect("should parse address")).collect::<Vec<_>>();

let requester = p2p.requester();
tokio::spawn(async move {
loop {
for seed in &sdn_seeds {
requester.try_connect(seed.clone());
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});

loop {
p2p.recv().await.expect("should ok");
}
Expand Down
11 changes: 10 additions & 1 deletion src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,20 @@ pub struct PeerDiscoverySync(Vec<(PeerId, u64, NetworkAddress)>);

#[derive(Debug, Default)]
pub struct PeerDiscovery {
seeds: Vec<PeerAddress>,
local: Option<(PeerId, NetworkAddress)>,
remotes: BTreeMap<PeerId, (u64, NetworkAddress)>,
}

impl PeerDiscovery {
pub fn new(seeds: Vec<PeerAddress>) -> Self {
Self {
seeds,
local: None,
remotes: Default::default(),
}
}

pub fn enable_local(&mut self, peer_id: PeerId, address: NetworkAddress) {
log::info!("[PeerDiscovery] enable local as {address}");
self.local = Some((peer_id, address));
Expand Down Expand Up @@ -57,7 +66,7 @@ impl PeerDiscovery {
}
}
pub fn remotes(&self) -> impl Iterator<Item = PeerAddress> + '_ {
self.remotes.iter().map(|(p, (_, a))| PeerAddress(*p, a.clone()))
self.remotes.iter().map(|(p, (_, a))| PeerAddress(*p, a.clone())).chain(self.seeds.iter().map(|s| s.clone()))
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub struct P2pNetworkConfig {
pub priv_key: PrivatePkcs8KeyDer<'static>,
pub cert: CertificateDer<'static>,
pub tick_ms: u64,
pub seeds: Vec<PeerAddress>,
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -152,7 +153,7 @@ impl P2pNetwork {
let endpoint = make_server_endpoint(cfg.listen_addr, cfg.priv_key, cfg.cert)?;
let (internal_tx, internal_rx) = channel(10);
let (control_tx, control_rx) = unbounded_channel();
let mut discovery = PeerDiscovery::default();
let mut discovery = PeerDiscovery::new(cfg.seeds);
let router = SharedRouterTable::new(cfg.peer_id);

if let Some(addr) = cfg.advertise {
Expand Down
3 changes: 2 additions & 1 deletion src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod visualization;
pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert");
pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key");

async fn create_node(advertise: bool, peer_id: u64) -> (P2pNetwork, PeerAddress) {
async fn create_node(advertise: bool, peer_id: u64, seeds: Vec<PeerAddress>) -> (P2pNetwork, PeerAddress) {
let _ = rustls::crypto::ring::default_provider().install_default();

let key: PrivatePkcs8KeyDer<'_> = PrivatePkcs8KeyDer::from(DEFAULT_CLUSTER_KEY.to_vec());
Expand All @@ -31,6 +31,7 @@ async fn create_node(advertise: bool, peer_id: u64) -> (P2pNetwork, PeerAddress)
priv_key: key,
cert: cert,
tick_ms: 100,
seeds,
})
.await
.unwrap(),
Expand Down
25 changes: 8 additions & 17 deletions src/tests/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::create_node;

#[test(tokio::test)]
async fn alias_guard() {
let (mut node1, _addr1) = create_node(true, 1).await;
let (mut node1, _addr1) = create_node(true, 1, vec![]).await;
let mut service1 = AliasService::new(node1.create_service(0.into()));
let service1_requester = service1.requester();
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });
Expand All @@ -28,7 +28,7 @@ async fn alias_guard() {

#[test(tokio::test)]
async fn alias_multi_guards() {
let (mut node1, _addr1) = create_node(true, 1).await;
let (mut node1, _addr1) = create_node(true, 1, vec![]).await;
let mut service1 = AliasService::new(node1.create_service(0.into()));
let service1_requester = service1.requester();
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });
Expand All @@ -53,14 +53,13 @@ async fn alias_multi_guards() {

#[test(tokio::test)]
async fn alias_scan() {
let (mut node1, addr1) = create_node(true, 1).await;
let (mut node1, addr1) = create_node(true, 1, vec![]).await;
let mut service1 = AliasService::new(node1.create_service(0.into()));
let service1_requester = service1.requester();
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });
tokio::spawn(async move { while let Ok(_) = service1.recv().await {} });

let (mut node2, _addr2) = create_node(false, 2).await;
let node2_requester = node2.requester();
let (mut node2, _addr2) = create_node(false, 2, vec![addr1.clone()]).await;
let mut service2 = AliasService::new(node2.create_service(0.into()));
let service2_requester = service2.requester();
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });
Expand All @@ -70,29 +69,24 @@ async fn alias_scan() {
let alias_id: AliasId = 1000.into();
let _alia_guard = service1_requester.register(alias_id);

node2_requester.connect(addr1.clone()).await.expect("should connect success");

tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(service2_requester.find(alias_id).await, Some(AliasFoundLocation::Scan(addr1.peer_id())));
}

#[test(tokio::test)]
async fn alias_hint() {
let (mut node1, addr1) = create_node(true, 1).await;
let (mut node1, addr1) = create_node(true, 1, vec![]).await;
let mut service1 = AliasService::new(node1.create_service(0.into()));
let service1_requester = service1.requester();
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });
tokio::spawn(async move { while let Ok(_) = service1.recv().await {} });

let (mut node2, _addr2) = create_node(false, 2).await;
let node2_requester = node2.requester();
let (mut node2, _addr2) = create_node(false, 2, vec![addr1.clone()]).await;
let mut service2 = AliasService::new(node2.create_service(0.into()));
let service2_requester = service2.requester();
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });
tokio::spawn(async move { while let Ok(_) = service2.recv().await {} });

node2_requester.connect(addr1.clone()).await.expect("should connect success");

tokio::time::sleep(Duration::from_secs(1)).await;

// we register alias after connect
Expand All @@ -106,20 +100,17 @@ async fn alias_hint() {

#[test(tokio::test)]
async fn alias_timeout() {
let (mut node1, addr1) = create_node(true, 1).await;
let (mut node1, addr1) = create_node(true, 1, vec![]).await;
let mut service1 = AliasService::new(node1.create_service(0.into()));
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });
tokio::spawn(async move { while let Ok(_) = service1.recv().await {} });

let (mut node2, _addr2) = create_node(false, 2).await;
let node2_requester = node2.requester();
let (mut node2, _addr2) = create_node(false, 2, vec![addr1.clone()]).await;
let mut service2 = AliasService::new(node2.create_service(0.into()));
let service2_requester = service2.requester();
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });
tokio::spawn(async move { while let Ok(_) = service2.recv().await {} });

node2_requester.connect(addr1).await.expect("should connect success");

tokio::time::sleep(Duration::from_secs(1)).await;

let alias_id: AliasId = 1000.into();
Expand Down
35 changes: 12 additions & 23 deletions src/tests/cross_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ use super::create_node;

#[test(tokio::test)]
async fn send_direct() {
let (mut node1, addr1) = create_node(true, 1).await;
let (mut node1, addr1) = create_node(true, 1, vec![]).await;
let mut service1 = node1.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });

let (mut node2, addr2) = create_node(false, 2).await;
let node2_requester = node2.requester();
let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await;
let mut service2 = node2.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

node2_requester.connect(addr1.clone()).await.expect("should connect success");

tokio::time::sleep(Duration::from_secs(1)).await;

let data = "from_node1".as_bytes().to_vec();
Expand All @@ -33,11 +30,11 @@ async fn send_direct() {
#[test(tokio::test)]
async fn send_error() {
// without connect 2 peers, it should error to send data
let (mut node1, addr1) = create_node(true, 1).await;
let (mut node1, addr1) = create_node(true, 1, vec![]).await;
let service1 = node1.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });

let (mut node2, addr2) = create_node(false, 2).await;
let (mut node2, addr2) = create_node(false, 2, vec![]).await;
let service2 = node2.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

Expand All @@ -50,15 +47,15 @@ async fn send_error() {

#[test(tokio::test)]
async fn send_relay() {
let (mut node1, addr1) = create_node(false, 1).await;
let (mut node1, addr1) = create_node(false, 1, vec![]).await;
let mut service1 = node1.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });

let (mut node2, addr2) = create_node(false, 2).await;
let (mut node2, addr2) = create_node(false, 2, vec![]).await;
let node2_requester = node2.requester();
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

let (mut node3, addr3) = create_node(false, 3).await;
let (mut node3, addr3) = create_node(false, 3, vec![]).await;
let node3_requester = node3.requester();
let mut service3 = node3.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node3.recv().await {} });
Expand All @@ -79,17 +76,14 @@ async fn send_relay() {

#[test(tokio::test)]
async fn broadcast_direct() {
let (mut node1, addr1) = create_node(false, 1).await;
let (mut node1, addr1) = create_node(false, 1, vec![]).await;
let mut service1 = node1.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });

let (mut node2, addr2) = create_node(false, 2).await;
let node2_requester = node2.requester();
let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await;
let mut service2 = node2.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

node2_requester.connect(addr1.clone()).await.expect("should connect success");

tokio::time::sleep(Duration::from_secs(1)).await;

log::info!("sending broadcast message");
Expand All @@ -105,23 +99,18 @@ async fn broadcast_direct() {

#[test(tokio::test)]
async fn broadcast_relay() {
let (mut node1, addr1) = create_node(false, 1).await;
let (mut node1, addr1) = create_node(false, 1, vec![]).await;
let mut service1 = node1.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });

let (mut node2, addr2) = create_node(false, 2).await;
let node2_requester = node2.requester();
let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await;
let mut service2 = node2.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

let (mut node3, addr3) = create_node(false, 3).await;
let node3_requester = node3.requester();
let (mut node3, addr3) = create_node(false, 3, vec![addr2.clone()]).await;
let mut service3 = node3.create_service(0.into());
tokio::spawn(async move { while let Ok(_) = node3.recv().await {} });

node2_requester.connect(addr1.clone()).await.expect("should connect success");
node3_requester.connect(addr2.clone()).await.expect("should connect success");

tokio::time::sleep(Duration::from_secs(1)).await;

let data = "from_node1".as_bytes().to_vec();
Expand Down
13 changes: 3 additions & 10 deletions src/tests/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ use test_log::test;

#[test(tokio::test)]
async fn discovery_remain_node() {
let (mut node1, addr1) = create_node(true, 1).await;
let (mut node1, addr1) = create_node(true, 1, vec![]).await;
log::info!("created node1 {addr1}");
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });

let (mut node2, addr2) = create_node(false, 2).await;
let (mut node2, addr2) = create_node(false, 2, vec![addr1]).await;
log::info!("created node2 {addr2}");
let node2_requester = node2.requester();
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

let (mut node3, addr3) = create_node(false, 3).await;
let (mut node3, addr3) = create_node(false, 3, vec![addr2]).await;
log::info!("created node3 {addr3}");
let node3_requester = node3.requester();
let node3_neighbours = Arc::new(Mutex::new(HashSet::new()));
let node3_neighbours_c = node3_neighbours.clone();
tokio::spawn(async move {
Expand All @@ -36,11 +34,6 @@ async fn discovery_remain_node() {

tokio::time::sleep(Duration::from_secs(1)).await;

node2_requester.connect(addr1).await.expect("should connect success");
node3_requester.connect(addr2).await.expect("should connect success");

tokio::time::sleep(Duration::from_secs(1)).await;

// after some cycle node3 should have node1 as neighbour
assert_eq!(node3_neighbours.lock().len(), 2);
}
6 changes: 2 additions & 4 deletions src/tests/visualization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ use super::create_node;

#[test(tokio::test)]
async fn discovery_new_node() {
let (mut node1, addr1) = create_node(true, 1).await;
let (mut node1, addr1) = create_node(true, 1, vec![]).await;
let mut service1 = VisualizationService::new(None, false, node1.create_service(0.into()));
tokio::spawn(async move { while let Ok(_) = node1.recv().await {} });
tokio::spawn(async move { while let Ok(_) = service1.recv().await {} });

let (mut node2, addr2) = create_node(false, 2).await;
let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await;
let mut service2 = VisualizationService::new(Some(Duration::from_secs(1)), false, node2.create_service(0.into()));
let node2_requester = node2.requester();
tokio::spawn(async move { while let Ok(_) = node2.recv().await {} });

node2_requester.connect(addr1.clone()).await.expect("should connect success");
tokio::time::sleep(Duration::from_secs(1)).await;

let mut events = vec![
Expand Down

0 comments on commit 651127f

Please sign in to comment.