Skip to content

Commit

Permalink
feat: remove rendevous + add initial kad setup
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Nov 17, 2024
1 parent 4f1756a commit ad7715e
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 4,904 deletions.
381 changes: 104 additions & 277 deletions packages/ciphernode/Cargo.lock

Large diffs are not rendered by default.

11 changes: 3 additions & 8 deletions packages/ciphernode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ alloy-primitives = { version = "0.6", default-features = false, features = [
"std",
] }
alloy-sol-types = { version = "0.6" }
argon2 = "0.5.2"
argon2 = "0.5.2"
anyhow = "1.0.86"
async-std = { version = "1.12", features = ["attributes"] }
async-trait = "0.1"
Expand All @@ -54,18 +54,13 @@ sha2 = "0.10.8"
tokio = { version = "1.38", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
libp2p = { version = "0.53.2", features = [
libp2p = { version = "0.54.1", features = [
"async-std",
"identify",
"macros",
"noise",
"ping",
"rendezvous",
"tcp",
"tokio",
"yamux",
"mdns",
"gossipsub",
"quic",
] }
zeroize = "1.6.0"
zeroize = "1.6.0"
4 changes: 1 addition & 3 deletions packages/ciphernode/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ libp2p = { workspace = true, features = [
"macros",
"noise",
"ping",
"rendezvous",
"tcp",
"kad",
"tokio",
"yamux",
"mdns",
"gossipsub",
"quic",
] }
Expand Down
13 changes: 13 additions & 0 deletions packages/ciphernode/p2p/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::time::Duration;

use libp2p::StreamProtocol;

pub const TICK_INTERVAL: Duration = Duration::from_secs(15);
pub const KADEMLIA_PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/enclave/kad/1.0.0");
pub const PORT_QUIC: u16 = 9091;
pub const LOCAL_KEY_PATH: &str = "./local_key";
pub const LOCAL_CERT_PATH: &str = "./cert.pem";
pub const GOSSIPSUB_PEER_DISCOVERY: &str = "enclave-keygen-peer-discovery";
pub const BOOTSTRAP_NODES: [&str; 0] = [
// TODO: Add bootstrap nodes
];
2 changes: 2 additions & 0 deletions packages/ciphernode/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
#![crate_type = "lib"]
#![warn(missing_docs, unused_imports)]

mod constants;
mod libp2p_router;
mod p2p;

pub use constants::*;
pub use libp2p_router::*;
pub use p2p::*;
176 changes: 91 additions & 85 deletions packages/ciphernode/p2p/src/libp2p_router.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
use futures::stream::StreamExt;
use libp2p::connection_limits::ConnectionLimits;
use libp2p::identify;
use libp2p::{
gossipsub, identity, mdns, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux,
connection_limits, futures::StreamExt, gossipsub, identify::Behaviour as IdentifyBehaviour,
identity, kad::store::MemoryStore, kad::Behaviour as KademliaBehaviour,
swarm::NetworkBehaviour, swarm::SwarmEvent,
};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::{io, select};
use tracing::{error, info, trace};
use tracing::{debug, error, info, trace, warn};

#[derive(NetworkBehaviour)]
pub struct MyBehaviour {
pub struct NodeBehaviour {
gossipsub: gossipsub::Behaviour,
mdns: mdns::tokio::Behaviour,
kademlia: KademliaBehaviour<MemoryStore>,
connection_limits: connection_limits::Behaviour,
identify: IdentifyBehaviour,
}

pub struct EnclaveRouter {
pub identity: Option<identity::Keypair>,
pub gossipsub_config: gossipsub::Config,
pub swarm: Option<libp2p::Swarm<MyBehaviour>>,
pub swarm: Option<libp2p::Swarm<NodeBehaviour>>,
pub topic: Option<gossipsub::IdentTopic>,
evt_tx: Sender<Vec<u8>>,
cmd_rx: Receiver<Vec<u8>>,
Expand All @@ -34,7 +39,6 @@ impl EnclaveRouter {
message.data.hash(&mut s);
gossipsub::MessageId::from(s.finish().to_string())
};

// TODO: Allow for config inputs to new()
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10))
Expand All @@ -61,44 +65,44 @@ impl EnclaveRouter {
self.identity = Some(keypair.clone());
}

pub fn connect_swarm(&mut self, discovery_type: String) -> Result<&Self, Box<dyn Error>> {
match discovery_type.as_str() {
"mdns" => {
// TODO: Use key if assigned already

let swarm = self
.identity
.clone()
.map_or_else(
|| libp2p::SwarmBuilder::with_new_identity(),
|id| libp2p::SwarmBuilder::with_existing_identity(id),
)
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_quic()
.with_behaviour(|key| {
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
self.gossipsub_config.clone(),
)?;

let mdns = mdns::tokio::Behaviour::new(
mdns::Config::default(),
key.public().to_peer_id(),
)?;
Ok(MyBehaviour { gossipsub, mdns })
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

self.swarm = Some(swarm);
}
_ => info!("Defaulting to MDNS discovery"),
}
pub fn connect_swarm(&mut self) -> Result<&Self, Box<dyn Error>> {
let connection_limits = connection_limits::Behaviour::new(ConnectionLimits::default());
let identify_config = IdentifyBehaviour::new(
identify::Config::new(
"/kad/0.1.0".into(),
self.identity.as_ref().unwrap().public(),
)
.with_interval(Duration::from_secs(60)), // do this so we can get timeouts for dropped WebRTC connections
);
let swarm = self
.identity
.clone()
.map_or_else(
|| libp2p::SwarmBuilder::with_new_identity(),
|id| libp2p::SwarmBuilder::with_existing_identity(id),
)
.with_tokio()
.with_quic()
.with_behaviour(|key| {
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
self.gossipsub_config.clone(),
)
.expect("Failed to create gossipsub behavior");

NodeBehaviour {
gossipsub,
kademlia: KademliaBehaviour::new(
key.public().to_peer_id(),
MemoryStore::new(key.public().to_peer_id()),
),
connection_limits,
identify: identify_config,
}
})?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

Ok(self)
}

Expand All @@ -120,49 +124,51 @@ impl EnclaveRouter {
.as_mut()
.unwrap()
.listen_on("/ip4/0.0.0.0/udp/0/quic-v1".parse()?)?;
self.swarm
.as_mut()
.unwrap()
.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

loop {
select! {
Some(line) = self.cmd_rx.recv() => {
if let Err(e) = self.swarm.as_mut().unwrap()
.behaviour_mut().gossipsub
.publish(self.topic.as_mut().unwrap().clone(), line) {
error!(error=?e, "Error publishing line to swarm");
}
}
event = self.swarm.as_mut().unwrap().select_next_some() => match event {
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
trace!("mDNS discovered a new peer: {peer_id}");
self.swarm.as_mut().unwrap().behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, _multiaddr) in list {
trace!("mDNS discover peer has expired: {peer_id}");
self.swarm.as_mut().unwrap().behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,
})) => {
trace!(
"Got message with id: {id} from peer: {peer_id}",
);
trace!("{:?}", message);
self.evt_tx.send(message.data).await?;
},
SwarmEvent::NewListenAddr { address, .. } => {
trace!("Local node is listening on {address}");
}
_ => {}
}
Some(line) = self.cmd_rx.recv() => {
if let Err(e) = self.swarm.as_mut().unwrap()
.behaviour_mut().gossipsub
.publish(self.topic.as_mut().unwrap().clone(), line) {
error!(error=?e, "Error publishing line to swarm");
}
}

event = self.swarm.as_mut().unwrap().select_next_some() => match event {

SwarmEvent::ConnectionEstablished { peer_id, .. } => {
info!("Connected to {peer_id}");
}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
warn!("Failed to dial {peer_id:?}: {error}");
}
SwarmEvent::IncomingConnectionError { error, .. } => {
warn!("{:#}", anyhow::Error::from(error))
}
SwarmEvent::Behaviour(NodeBehaviourEvent::Kademlia(e)) => {
debug!("Kademlia event: {:?}", e);
}


SwarmEvent::Behaviour(NodeBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,
})) => {
trace!(
"Got message with id: {id} from peer: {peer_id}",
);
trace!("{:?}", message);
self.evt_tx.send(message.data).await?;
},
SwarmEvent::NewListenAddr { address, .. } => {
trace!("Local node is listening on {address}");
}
_ => {}

}
}
}
}
}
23 changes: 21 additions & 2 deletions packages/ciphernode/p2p/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
use std::{collections::HashSet, error::Error};
use std::{
collections::HashSet,
error::Error,
net::{IpAddr, Ipv6Addr},
};

use libp2p::multiaddr::{Multiaddr, Protocol};

use crate::constants::PORT_QUIC;

use crate::libp2p_router::EnclaveRouter;
/// Actor for connecting to an libp2p client via it's mpsc channel interface
Expand Down Expand Up @@ -69,9 +77,20 @@ impl P2p {
let (mut libp2p, tx, rx) = EnclaveRouter::new()?;
let keypair = libp2p::identity::Keypair::generate_ed25519();
libp2p.with_identity(&keypair);
libp2p.connect_swarm("mdns".to_string())?;
libp2p.connect_swarm()?;
libp2p.join_topic("enclave-keygen-01")?;

let address_quic = Multiaddr::from(IpAddr::V6(Ipv6Addr::UNSPECIFIED))
.with(Protocol::Udp(PORT_QUIC))
.with(Protocol::QuicV1);

libp2p
.swarm
.as_mut()
.unwrap()
.listen_on(address_quic.clone())
.expect("listen on quic");

let p2p_addr = Self::spawn_and_listen(bus, tx, rx);
let handle = tokio::spawn(async move { libp2p.start().await.unwrap() });
Ok((p2p_addr, handle, keypair.public().to_peer_id().to_string()))
Expand Down
Loading

0 comments on commit ad7715e

Please sign in to comment.