diff --git a/examples/simple.rs b/examples/simple.rs index 526f47f..58909cb 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, str::FromStr, time::Duration}; +use std::{net::SocketAddr, str::FromStr}; use atm0s_small_p2p::{P2pNetwork, P2pNetworkConfig, PeerAddress}; use clap::Parser; @@ -54,22 +54,11 @@ async fn main() { priv_key: key, cert: cert, tick_ms: 100, + seeds: args.sdn_seeds.into_iter().map(|s| PeerAddress::from_str(s.as_str()).expect("should parse address")).collect::>(), }) .await .expect("should create network"); - let sdn_seeds = args.sdn_seeds.into_iter().map(|s| PeerAddress::from_str(s.as_str()).expect("should parse address")).collect::>(); - - let requester = p2p.requester(); - tokio::spawn(async move { - loop { - for seed in &sdn_seeds { - requester.try_connect(seed.clone()); - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - }); - loop { p2p.recv().await.expect("should ok"); } diff --git a/src/discovery.rs b/src/discovery.rs index ec46def..94231ee 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -13,11 +13,20 @@ pub struct PeerDiscoverySync(Vec<(PeerId, u64, NetworkAddress)>); #[derive(Debug, Default)] pub struct PeerDiscovery { + seeds: Vec, local: Option<(PeerId, NetworkAddress)>, remotes: BTreeMap, } impl PeerDiscovery { + pub fn new(seeds: Vec) -> Self { + Self { + seeds, + local: None, + remotes: Default::default(), + } + } + pub fn enable_local(&mut self, peer_id: PeerId, address: NetworkAddress) { log::info!("[PeerDiscovery] enable local as {address}"); self.local = Some((peer_id, address)); @@ -57,7 +66,7 @@ impl PeerDiscovery { } } pub fn remotes(&self) -> impl Iterator + '_ { - self.remotes.iter().map(|(p, (_, a))| PeerAddress(*p, a.clone())) + self.remotes.iter().map(|(p, (_, a))| PeerAddress(*p, a.clone())).chain(self.seeds.iter().map(|s| s.clone())) } } diff --git a/src/lib.rs b/src/lib.rs index 3319226..987d8ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,6 +123,7 @@ pub struct P2pNetworkConfig { pub priv_key: PrivatePkcs8KeyDer<'static>, pub cert: CertificateDer<'static>, pub tick_ms: u64, + pub seeds: Vec, } #[derive(Debug, PartialEq, Eq)] @@ -152,7 +153,7 @@ impl P2pNetwork { let endpoint = make_server_endpoint(cfg.listen_addr, cfg.priv_key, cfg.cert)?; let (internal_tx, internal_rx) = channel(10); let (control_tx, control_rx) = unbounded_channel(); - let mut discovery = PeerDiscovery::default(); + let mut discovery = PeerDiscovery::new(cfg.seeds); let router = SharedRouterTable::new(cfg.peer_id); if let Some(addr) = cfg.advertise { diff --git a/src/tests.rs b/src/tests.rs index 51126d3..44c08ef 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -12,7 +12,7 @@ mod visualization; pub const DEFAULT_CLUSTER_CERT: &[u8] = include_bytes!("../certs/dev.cluster.cert"); pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key"); -async fn create_node(advertise: bool, peer_id: u64) -> (P2pNetwork, PeerAddress) { +async fn create_node(advertise: bool, peer_id: u64, seeds: Vec) -> (P2pNetwork, PeerAddress) { let _ = rustls::crypto::ring::default_provider().install_default(); let key: PrivatePkcs8KeyDer<'_> = PrivatePkcs8KeyDer::from(DEFAULT_CLUSTER_KEY.to_vec()); @@ -31,6 +31,7 @@ async fn create_node(advertise: bool, peer_id: u64) -> (P2pNetwork, PeerAddress) priv_key: key, cert: cert, tick_ms: 100, + seeds, }) .await .unwrap(), diff --git a/src/tests/alias.rs b/src/tests/alias.rs index b8c72cb..dedafd8 100644 --- a/src/tests/alias.rs +++ b/src/tests/alias.rs @@ -8,7 +8,7 @@ use super::create_node; #[test(tokio::test)] async fn alias_guard() { - let (mut node1, _addr1) = create_node(true, 1).await; + let (mut node1, _addr1) = create_node(true, 1, vec![]).await; let mut service1 = AliasService::new(node1.create_service(0.into())); let service1_requester = service1.requester(); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); @@ -28,7 +28,7 @@ async fn alias_guard() { #[test(tokio::test)] async fn alias_multi_guards() { - let (mut node1, _addr1) = create_node(true, 1).await; + let (mut node1, _addr1) = create_node(true, 1, vec![]).await; let mut service1 = AliasService::new(node1.create_service(0.into())); let service1_requester = service1.requester(); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); @@ -53,14 +53,13 @@ async fn alias_multi_guards() { #[test(tokio::test)] async fn alias_scan() { - let (mut node1, addr1) = create_node(true, 1).await; + let (mut node1, addr1) = create_node(true, 1, vec![]).await; let mut service1 = AliasService::new(node1.create_service(0.into())); let service1_requester = service1.requester(); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); tokio::spawn(async move { while let Ok(_) = service1.recv().await {} }); - let (mut node2, _addr2) = create_node(false, 2).await; - let node2_requester = node2.requester(); + let (mut node2, _addr2) = create_node(false, 2, vec![addr1.clone()]).await; let mut service2 = AliasService::new(node2.create_service(0.into())); let service2_requester = service2.requester(); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); @@ -70,29 +69,24 @@ async fn alias_scan() { let alias_id: AliasId = 1000.into(); let _alia_guard = service1_requester.register(alias_id); - node2_requester.connect(addr1.clone()).await.expect("should connect success"); - tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(service2_requester.find(alias_id).await, Some(AliasFoundLocation::Scan(addr1.peer_id()))); } #[test(tokio::test)] async fn alias_hint() { - let (mut node1, addr1) = create_node(true, 1).await; + let (mut node1, addr1) = create_node(true, 1, vec![]).await; let mut service1 = AliasService::new(node1.create_service(0.into())); let service1_requester = service1.requester(); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); tokio::spawn(async move { while let Ok(_) = service1.recv().await {} }); - let (mut node2, _addr2) = create_node(false, 2).await; - let node2_requester = node2.requester(); + let (mut node2, _addr2) = create_node(false, 2, vec![addr1.clone()]).await; let mut service2 = AliasService::new(node2.create_service(0.into())); let service2_requester = service2.requester(); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); tokio::spawn(async move { while let Ok(_) = service2.recv().await {} }); - node2_requester.connect(addr1.clone()).await.expect("should connect success"); - tokio::time::sleep(Duration::from_secs(1)).await; // we register alias after connect @@ -106,20 +100,17 @@ async fn alias_hint() { #[test(tokio::test)] async fn alias_timeout() { - let (mut node1, addr1) = create_node(true, 1).await; + let (mut node1, addr1) = create_node(true, 1, vec![]).await; let mut service1 = AliasService::new(node1.create_service(0.into())); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); tokio::spawn(async move { while let Ok(_) = service1.recv().await {} }); - let (mut node2, _addr2) = create_node(false, 2).await; - let node2_requester = node2.requester(); + let (mut node2, _addr2) = create_node(false, 2, vec![addr1.clone()]).await; let mut service2 = AliasService::new(node2.create_service(0.into())); let service2_requester = service2.requester(); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); tokio::spawn(async move { while let Ok(_) = service2.recv().await {} }); - node2_requester.connect(addr1).await.expect("should connect success"); - tokio::time::sleep(Duration::from_secs(1)).await; let alias_id: AliasId = 1000.into(); diff --git a/src/tests/cross_nodes.rs b/src/tests/cross_nodes.rs index db9de49..a0ee044 100644 --- a/src/tests/cross_nodes.rs +++ b/src/tests/cross_nodes.rs @@ -8,17 +8,14 @@ use super::create_node; #[test(tokio::test)] async fn send_direct() { - let (mut node1, addr1) = create_node(true, 1).await; + let (mut node1, addr1) = create_node(true, 1, vec![]).await; let mut service1 = node1.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); - let (mut node2, addr2) = create_node(false, 2).await; - let node2_requester = node2.requester(); + let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await; let mut service2 = node2.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); - node2_requester.connect(addr1.clone()).await.expect("should connect success"); - tokio::time::sleep(Duration::from_secs(1)).await; let data = "from_node1".as_bytes().to_vec(); @@ -33,11 +30,11 @@ async fn send_direct() { #[test(tokio::test)] async fn send_error() { // without connect 2 peers, it should error to send data - let (mut node1, addr1) = create_node(true, 1).await; + let (mut node1, addr1) = create_node(true, 1, vec![]).await; let service1 = node1.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); - let (mut node2, addr2) = create_node(false, 2).await; + let (mut node2, addr2) = create_node(false, 2, vec![]).await; let service2 = node2.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); @@ -50,15 +47,15 @@ async fn send_error() { #[test(tokio::test)] async fn send_relay() { - let (mut node1, addr1) = create_node(false, 1).await; + let (mut node1, addr1) = create_node(false, 1, vec![]).await; let mut service1 = node1.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); - let (mut node2, addr2) = create_node(false, 2).await; + let (mut node2, addr2) = create_node(false, 2, vec![]).await; let node2_requester = node2.requester(); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); - let (mut node3, addr3) = create_node(false, 3).await; + let (mut node3, addr3) = create_node(false, 3, vec![]).await; let node3_requester = node3.requester(); let mut service3 = node3.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node3.recv().await {} }); @@ -79,17 +76,14 @@ async fn send_relay() { #[test(tokio::test)] async fn broadcast_direct() { - let (mut node1, addr1) = create_node(false, 1).await; + let (mut node1, addr1) = create_node(false, 1, vec![]).await; let mut service1 = node1.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); - let (mut node2, addr2) = create_node(false, 2).await; - let node2_requester = node2.requester(); + let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await; let mut service2 = node2.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); - node2_requester.connect(addr1.clone()).await.expect("should connect success"); - tokio::time::sleep(Duration::from_secs(1)).await; log::info!("sending broadcast message"); @@ -105,23 +99,18 @@ async fn broadcast_direct() { #[test(tokio::test)] async fn broadcast_relay() { - let (mut node1, addr1) = create_node(false, 1).await; + let (mut node1, addr1) = create_node(false, 1, vec![]).await; let mut service1 = node1.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); - let (mut node2, addr2) = create_node(false, 2).await; - let node2_requester = node2.requester(); + let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await; let mut service2 = node2.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); - let (mut node3, addr3) = create_node(false, 3).await; - let node3_requester = node3.requester(); + let (mut node3, addr3) = create_node(false, 3, vec![addr2.clone()]).await; let mut service3 = node3.create_service(0.into()); tokio::spawn(async move { while let Ok(_) = node3.recv().await {} }); - node2_requester.connect(addr1.clone()).await.expect("should connect success"); - node3_requester.connect(addr2.clone()).await.expect("should connect success"); - tokio::time::sleep(Duration::from_secs(1)).await; let data = "from_node1".as_bytes().to_vec(); diff --git a/src/tests/discovery.rs b/src/tests/discovery.rs index 010fef3..d03323b 100644 --- a/src/tests/discovery.rs +++ b/src/tests/discovery.rs @@ -6,18 +6,16 @@ use test_log::test; #[test(tokio::test)] async fn discovery_remain_node() { - let (mut node1, addr1) = create_node(true, 1).await; + let (mut node1, addr1) = create_node(true, 1, vec![]).await; log::info!("created node1 {addr1}"); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); - let (mut node2, addr2) = create_node(false, 2).await; + let (mut node2, addr2) = create_node(false, 2, vec![addr1]).await; log::info!("created node2 {addr2}"); - let node2_requester = node2.requester(); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); - let (mut node3, addr3) = create_node(false, 3).await; + let (mut node3, addr3) = create_node(false, 3, vec![addr2]).await; log::info!("created node3 {addr3}"); - let node3_requester = node3.requester(); let node3_neighbours = Arc::new(Mutex::new(HashSet::new())); let node3_neighbours_c = node3_neighbours.clone(); tokio::spawn(async move { @@ -36,11 +34,6 @@ async fn discovery_remain_node() { tokio::time::sleep(Duration::from_secs(1)).await; - node2_requester.connect(addr1).await.expect("should connect success"); - node3_requester.connect(addr2).await.expect("should connect success"); - - tokio::time::sleep(Duration::from_secs(1)).await; - // after some cycle node3 should have node1 as neighbour assert_eq!(node3_neighbours.lock().len(), 2); } diff --git a/src/tests/visualization.rs b/src/tests/visualization.rs index d3ba4da..8a0ef94 100644 --- a/src/tests/visualization.rs +++ b/src/tests/visualization.rs @@ -8,17 +8,15 @@ use super::create_node; #[test(tokio::test)] async fn discovery_new_node() { - let (mut node1, addr1) = create_node(true, 1).await; + let (mut node1, addr1) = create_node(true, 1, vec![]).await; let mut service1 = VisualizationService::new(None, false, node1.create_service(0.into())); tokio::spawn(async move { while let Ok(_) = node1.recv().await {} }); tokio::spawn(async move { while let Ok(_) = service1.recv().await {} }); - let (mut node2, addr2) = create_node(false, 2).await; + let (mut node2, addr2) = create_node(false, 2, vec![addr1.clone()]).await; let mut service2 = VisualizationService::new(Some(Duration::from_secs(1)), false, node2.create_service(0.into())); - let node2_requester = node2.requester(); tokio::spawn(async move { while let Ok(_) = node2.recv().await {} }); - node2_requester.connect(addr1.clone()).await.expect("should connect success"); tokio::time::sleep(Duration::from_secs(1)).await; let mut events = vec![