Skip to content

Commit

Permalink
feat: switch to use peer_id for send and recv (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
giangndm authored Oct 9, 2024
1 parent b15b120 commit dd4b66e
Show file tree
Hide file tree
Showing 18 changed files with 576 additions and 418 deletions.
23 changes: 15 additions & 8 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{net::SocketAddr, time::Duration};
use std::{net::SocketAddr, str::FromStr, time::Duration};

use atm0s_small_p2p::{P2pNetwork, P2pNetworkConfig};
use atm0s_small_p2p::{P2pNetwork, P2pNetworkConfig, PeerAddress};
use clap::Parser;
use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
Expand All @@ -13,12 +13,16 @@ pub const DEFAULT_CLUSTER_KEY: &[u8] = include_bytes!("../certs/dev.cluster.key"
#[command(author, version, about, long_about = None)]
struct Args {
/// UDP/TCP port for serving QUIC/TCP connection for SDN network
#[arg(env, long, default_value = "127.0.0.1:11111")]
#[arg(env, long, default_value_t = 1)]
sdn_peer_id: u64,

/// UDP/TCP port for serving QUIC/TCP connection for SDN network
#[arg(env, long, default_value = "0.0.0.0:11111")]
sdn_listener: SocketAddr,

/// Seeds
#[arg(env, long, value_delimiter = ',')]
sdn_seeds: Vec<SocketAddr>,
sdn_seeds: Vec<String>,

/// Allow it broadcast address to other peers
/// This allows other peer can active connect to this node
Expand All @@ -44,20 +48,23 @@ async fn main() {
let cert = CertificateDer::from(DEFAULT_CLUSTER_CERT.to_vec());

let mut p2p = P2pNetwork::new(P2pNetworkConfig {
addr: args.sdn_listener,
advertise: args.sdn_advertise_address,
peer_id: args.sdn_peer_id.into(),
listen_addr: args.sdn_listener,
advertise: args.sdn_advertise_address.map(|a| a.into()),
priv_key: key,
cert: cert,
tick_ms: 100,
})
.await
.expect("should create network");

let sdn_seeds = args.sdn_seeds.into_iter().map(|s| PeerAddress::from_str(s.as_str()).expect("should parse address")).collect::<Vec<_>>();

let requester = p2p.requester();
tokio::spawn(async move {
loop {
for seed in &args.sdn_seeds {
requester.try_connect((*seed).into());
for seed in &sdn_seeds {
requester.try_connect(seed.clone());
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Expand Down
80 changes: 41 additions & 39 deletions src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ use tokio::sync::mpsc::Sender;

use crate::{
msg::{BroadcastMsgId, P2pServiceId, PeerMessage},
peer::PeerAlias,
peer::PeerConnectionAlias,
router::{RouteAction, SharedRouterTable},
service::P2pServiceEvent,
stream::P2pQuicStream,
utils::ErrorExt,
PeerAddress,
ConnectionId, PeerId,
};

#[derive(Debug)]
struct SharedCtxInternal {
peers: HashMap<PeerAddress, PeerAlias>,
conns: HashMap<ConnectionId, PeerConnectionAlias>,
received_broadcast_msg: LruCache<BroadcastMsgId, ()>,
services: [Option<Sender<P2pServiceEvent>>; 256],
}
Expand All @@ -32,20 +32,20 @@ impl SharedCtxInternal {
self.services[**service_id as usize].clone()
}

fn register_peer(&mut self, peer: PeerAddress, alias: PeerAlias) {
self.peers.insert(peer, alias);
fn register_conn(&mut self, conn: ConnectionId, alias: PeerConnectionAlias) {
self.conns.insert(conn, alias);
}

fn unregister_peer(&mut self, peer: &PeerAddress) {
self.peers.remove(peer);
fn unregister_conn(&mut self, conn: &ConnectionId) {
self.conns.remove(conn);
}

fn peer(&self, peer: &PeerAddress) -> Option<PeerAlias> {
self.peers.get(peer).cloned()
fn conn(&self, conn: &ConnectionId) -> Option<PeerConnectionAlias> {
self.conns.get(conn).cloned()
}

fn peers(&self) -> Vec<PeerAlias> {
self.peers.values().cloned().collect::<Vec<_>>()
fn conns(&self) -> Vec<PeerConnectionAlias> {
self.conns.values().cloned().collect::<Vec<_>>()
}

/// check if we already got the message
Expand All @@ -71,7 +71,7 @@ impl SharedCtx {
pub fn new(router: SharedRouterTable) -> Self {
Self {
ctx: Arc::new(RwLock::new(SharedCtxInternal {
peers: Default::default(),
conns: Default::default(),
received_broadcast_msg: LruCache::new(8192.try_into().expect("should ok")),
services: std::array::from_fn(|_| None),
})),
Expand All @@ -83,20 +83,20 @@ impl SharedCtx {
self.ctx.write().set_service(service_id, tx);
}

pub fn register_peer(&self, peer: PeerAddress, alias: PeerAlias) {
self.ctx.write().register_peer(peer, alias);
pub fn register_conn(&self, conn: ConnectionId, alias: PeerConnectionAlias) {
self.ctx.write().register_conn(conn, alias);
}

pub fn unregister_peer(&self, peer: &PeerAddress) {
self.ctx.write().unregister_peer(peer);
pub fn unregister_conn(&self, conn: &ConnectionId) {
self.ctx.write().unregister_conn(conn);
}

pub fn peer(&self, peer: &PeerAddress) -> Option<PeerAlias> {
self.ctx.read().peer(peer)
pub fn conn(&self, conn: &ConnectionId) -> Option<PeerConnectionAlias> {
self.ctx.read().conn(conn)
}

pub fn peers(&self) -> Vec<PeerAlias> {
self.ctx.read().peers()
pub fn conns(&self) -> Vec<PeerConnectionAlias> {
self.ctx.read().conns()
}

pub fn router(&self) -> &SharedRouterTable {
Expand All @@ -114,29 +114,29 @@ impl SharedCtx {
self.ctx.write().check_broadcast_msg(id)
}

pub fn try_send_unicast(&self, service_id: P2pServiceId, dest: PeerAddress, data: Vec<u8>) -> anyhow::Result<()> {
pub fn try_send_unicast(&self, service_id: P2pServiceId, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
let next = self.router.action(&dest).ok_or(anyhow!("route not found"))?;
match next {
RouteAction::Local => {
panic!("unsupported send to local node")
}
RouteAction::Next(next) => {
let source = self.router.local_address();
self.peer(&next).ok_or(anyhow!("peer not found"))?.try_send(PeerMessage::Unicast(source, dest, service_id, data))?;
let source = self.router.local_id();
self.conn(&next).ok_or(anyhow!("peer not found"))?.try_send(PeerMessage::Unicast(source, dest, service_id, data))?;
Ok(())
}
}
}

pub async fn send_unicast(&self, service_id: P2pServiceId, dest: PeerAddress, data: Vec<u8>) -> anyhow::Result<()> {
pub async fn send_unicast(&self, service_id: P2pServiceId, dest: PeerId, data: Vec<u8>) -> anyhow::Result<()> {
let next = self.router.action(&dest).ok_or(anyhow!("route not found"))?;
match next {
RouteAction::Local => {
panic!("unsupported send to local node")
}
RouteAction::Next(next) => {
let source = self.router.local_address();
self.peer(&next).ok_or(anyhow!("peer not found"))?.send(PeerMessage::Unicast(source, dest, service_id, data)).await?;
let source = self.router.local_id();
self.conn(&next).ok_or(anyhow!("peer not found"))?.send(PeerMessage::Unicast(source, dest, service_id, data)).await?;
Ok(())
}
}
Expand All @@ -145,37 +145,39 @@ impl SharedCtx {
pub fn try_send_broadcast(&self, service_id: P2pServiceId, data: Vec<u8>) {
let msg_id = BroadcastMsgId::rand();
self.check_broadcast_msg(msg_id);
let source = self.router.local_address();
let peers = self.peers();
log::debug!("[ShareCtx] broadcast to {peers:?} nodes");
for peer in peers {
peer.try_send(PeerMessage::Broadcast(source, service_id, msg_id, data.clone()))
let source = self.router.local_id();
let conns = self.conns();
log::debug!("[ShareCtx] broadcast to {conns:?} connections");
for conn_alias in conns {
conn_alias
.try_send(PeerMessage::Broadcast(source, service_id, msg_id, data.clone()))
.print_on_err("[ShareCtx] broadcast data over peer alias");
}
}

pub async fn send_broadcast(&self, service_id: P2pServiceId, data: Vec<u8>) {
let msg_id = BroadcastMsgId::rand();
self.check_broadcast_msg(msg_id);
let source = self.router.local_address();
let peers = self.peers();
log::debug!("[ShareCtx] broadcast to {peers:?} nodes");
for peer in peers {
peer.send(PeerMessage::Broadcast(source, service_id, msg_id, data.clone()))
let source = self.router.local_id();
let conns = self.conns();
log::debug!("[ShareCtx] broadcast to {conns:?} connections");
for conn_alias in conns {
conn_alias
.send(PeerMessage::Broadcast(source, service_id, msg_id, data.clone()))
.await
.print_on_err("[ShareCtx] broadcast data over peer alias");
}
}

pub async fn open_stream(&self, service: P2pServiceId, dest: PeerAddress, meta: Vec<u8>) -> anyhow::Result<P2pQuicStream> {
pub async fn open_stream(&self, service: P2pServiceId, dest: PeerId, meta: Vec<u8>) -> anyhow::Result<P2pQuicStream> {
let next = self.router.action(&dest).ok_or(anyhow!("route not found"))?;
match next {
RouteAction::Local => {
panic!("unsupported open_stream to local node")
}
RouteAction::Next(next) => {
let source = self.router.local_address();
Ok(self.peer(&next).ok_or(anyhow!("peer not found"))?.open_stream(service, source, dest, meta).await?)
let source = self.router.local_id();
Ok(self.conn(&next).ok_or(anyhow!("peer not found"))?.open_stream(service, source, dest, meta).await?)
}
}
}
Expand Down
85 changes: 47 additions & 38 deletions src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,29 @@ use std::collections::BTreeMap;

use serde::{Deserialize, Serialize};

use super::PeerAddress;
use crate::{NetworkAddress, PeerAddress};

use super::PeerId;

const TIMEOUT_AFTER: u64 = 30_000;

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerDiscoverySync(Vec<(PeerAddress, u64)>);
pub struct PeerDiscoverySync(Vec<(PeerId, u64, NetworkAddress)>);

#[derive(Debug, Default)]
pub struct PeerDiscovery {
local: Option<PeerAddress>,
remotes: BTreeMap<PeerAddress, u64>,
local: Option<(PeerId, NetworkAddress)>,
remotes: BTreeMap<PeerId, (u64, NetworkAddress)>,
}

impl PeerDiscovery {
pub fn enable_local(&mut self, local: PeerAddress) {
log::info!("[PeerDiscovery] enable local as {local}");
self.local = Some(local);
pub fn enable_local(&mut self, peer_id: PeerId, address: NetworkAddress) {
log::info!("[PeerDiscovery] enable local as {address}");
self.local = Some((peer_id, address));
}

pub fn clear_timeout(&mut self, now_ms: u64) {
self.remotes.retain(|peer, last_updated| {
self.remotes.retain(|peer, (last_updated, _addr)| {
if *last_updated + TIMEOUT_AFTER > now_ms {
true
} else {
Expand All @@ -32,76 +34,82 @@ impl PeerDiscovery {
});
}

pub fn create_sync_for(&self, now_ms: u64, dest: &PeerAddress) -> PeerDiscoverySync {
let iter = self.local.iter().map(|p| (*p, now_ms));
PeerDiscoverySync(self.remotes.iter().filter(|(k, _)| !dest.eq(k)).map(|(k, v)| (*k, *v)).chain(iter).collect::<Vec<_>>())
pub fn create_sync_for(&self, now_ms: u64, dest: &PeerId) -> PeerDiscoverySync {
let iter = self.local.iter().map(|(p, addr)| (*p, now_ms, addr.clone()));
PeerDiscoverySync(
self.remotes
.iter()
.filter(|(k, _)| !dest.eq(k))
.map(|(k, (v1, v2))| (*k, *v1, v2.clone()))
.chain(iter)
.collect::<Vec<_>>(),
)
}

pub fn apply_sync(&mut self, now_ms: u64, sync: PeerDiscoverySync) {
log::debug!("[PeerDiscovery] apply sync with addrs: {:?}", sync.0);
for (peer, last_updated) in sync.0.into_iter() {
for (peer, last_updated, address) in sync.0.into_iter() {
if last_updated + TIMEOUT_AFTER > now_ms {
if self.remotes.insert(peer, last_updated).is_none() {
if self.remotes.insert(peer, (last_updated, address)).is_none() {
log::info!("[PeerDiscovery] added new peer {peer}");
}
}
}
}

pub fn remotes(&self) -> impl Iterator<Item = &PeerAddress> {
self.remotes.keys().into_iter()
pub fn remotes(&self) -> impl Iterator<Item = PeerAddress> + '_ {
self.remotes.iter().map(|(p, (_, a))| PeerAddress(*p, a.clone()))
}
}

#[cfg(test)]
mod test {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use crate::{discovery::PeerDiscoverySync, PeerAddress};
use crate::{discovery::PeerDiscoverySync, PeerAddress, PeerId};

use super::{PeerDiscovery, TIMEOUT_AFTER};

fn create_peer(i: u16) -> PeerAddress {
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), i).into()
}

#[test_log::test]
fn create_local_sync() {
let mut discovery = PeerDiscovery::default();

let peer1 = create_peer(1);
let peer2 = create_peer(1);
let peer1 = PeerId(1);
let peer1_addr: PeerAddress = "[email protected]:9000".parse().expect("should parse peer address");

let peer2 = PeerId(2);

assert_eq!(discovery.create_sync_for(0, &peer2), PeerDiscoverySync(vec![]));

discovery.enable_local(peer1);
discovery.enable_local(peer1, peer1_addr.network_address().clone());

assert_eq!(discovery.create_sync_for(100, &peer2), PeerDiscoverySync(vec![(peer1, 100)]));
assert_eq!(discovery.create_sync_for(100, &peer2), PeerDiscoverySync(vec![(peer1, 100, peer1_addr.network_address().clone())]));
assert_eq!(discovery.remotes().next(), None);
}

#[test_log::test]
fn apply_sync() {
let mut discovery = PeerDiscovery::default();

let peer1 = create_peer(1);
let peer2 = create_peer(2);
let peer1 = PeerId(1);
let peer1_addr: PeerAddress = "[email protected]:9000".parse().expect("should parse peer address");

discovery.apply_sync(100, PeerDiscoverySync(vec![(peer1, 90)]));
let peer2 = PeerId(2);

assert_eq!(discovery.create_sync_for(100, &peer2), PeerDiscoverySync(vec![(peer1, 90)]));
discovery.apply_sync(100, PeerDiscoverySync(vec![(peer1, 90, peer1_addr.network_address().clone())]));

assert_eq!(discovery.create_sync_for(100, &peer2), PeerDiscoverySync(vec![(peer1, 90, peer1_addr.network_address().clone())]));
assert_eq!(discovery.create_sync_for(100, &peer1), PeerDiscoverySync(vec![]));
assert_eq!(discovery.remotes().collect::<Vec<_>>(), vec![&peer1]);
assert_eq!(discovery.remotes().collect::<Vec<_>>(), vec![peer1_addr]);
}

#[test_log::test]
fn apply_sync_timeout() {
let mut discovery = PeerDiscovery::default();

let peer1 = create_peer(1);
let peer2 = create_peer(2);
let peer1 = PeerId(1);
let peer1_addr: PeerAddress = "[email protected]:9000".parse().expect("should parse peer address");

let peer2 = PeerId(2);

discovery.apply_sync(TIMEOUT_AFTER + 100, PeerDiscoverySync(vec![(peer1, 100)]));
discovery.apply_sync(TIMEOUT_AFTER + 100, PeerDiscoverySync(vec![(peer1, 100, peer1_addr.network_address().clone())]));

assert_eq!(discovery.create_sync_for(100, &peer2), PeerDiscoverySync(vec![]));
assert_eq!(discovery.create_sync_for(100, &peer1), PeerDiscoverySync(vec![]));
Expand All @@ -112,11 +120,12 @@ mod test {
fn clear_timeout() {
let mut discovery = PeerDiscovery::default();

let peer1 = create_peer(1);
let peer1 = PeerId(1);
let peer1_addr: PeerAddress = "[email protected]:9000".parse().expect("should parse peer address");

discovery.apply_sync(100, PeerDiscoverySync(vec![(peer1, 90)]));
discovery.apply_sync(100, PeerDiscoverySync(vec![(peer1, 90, peer1_addr.network_address().clone())]));

assert_eq!(discovery.remotes().collect::<Vec<_>>(), vec![&peer1]);
assert_eq!(discovery.remotes().collect::<Vec<_>>(), vec![peer1_addr]);

discovery.clear_timeout(TIMEOUT_AFTER + 90);

Expand Down
Loading

0 comments on commit dd4b66e

Please sign in to comment.