Skip to content

Commit

Permalink
Add test for networking layer
Browse files Browse the repository at this point in the history
  • Loading branch information
ryardley committed Nov 22, 2024
1 parent 15f72ed commit dce1f1f
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 6 deletions.
77 changes: 77 additions & 0 deletions packages/ciphernode/p2p/src/bin/p2p_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use anyhow::Result;
use p2p::EnclaveRouter;
use std::time::Duration;
use std::{collections::HashSet, env};
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<()> {
let name = env::args().nth(1).expect("need name");
println!("{} starting up", name);

let (mut router, tx, mut rx) = EnclaveRouter::new()?;
let keypair = libp2p::identity::Keypair::generate_ed25519();
router
.with_identity(&keypair)
.connect_swarm()?
.join_topic("test-topic")?;

let router_task = tokio::spawn({
let name = name.clone();
async move {
println!("{} starting router task", name);
if let Err(e) = router.start().await {
println!("{} router task failed: {}", name, e);
}
println!("{} router task finished", name);
}
});

// Give network time to initialize
sleep(Duration::from_secs(1)).await;

// Send our message first
println!("{} sending message", name);
tx.send(name.as_bytes().to_vec()).await?;
println!("{} message sent", name);

let expected: HashSet<String> = vec![
"alice".to_string(),
"bob".to_string(),
"charlie".to_string(),
]
.into_iter()
.filter(|n| *n != name)
.collect();

println!("{} waiting for messages from: {:?}", name, expected);

// Then wait to receive from others
let mut received = HashSet::new();
while received != expected {
if let Some(msg) = rx.recv().await {
match String::from_utf8(msg) {
Ok(msg) => {
if !received.contains(&msg) {
println!("{} received '{}'", name, msg);
received.insert(msg);
}
}
Err(e) => println!("{} received invalid UTF8: {}", name, e),
}
}
}

println!("{} received all expected messages", name);

// Make sure router task is still running
if router_task.is_finished() {
println!("{} warning: router task finished early", name);
}

// Give some time for final message propagation
sleep(Duration::from_secs(1)).await;

println!("{} finished successfully", name);
Ok(())
}
13 changes: 7 additions & 6 deletions packages/ciphernode/p2p/src/libp2p_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use libp2p::{
};
use libp2p::{identify, mdns};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::{io, select};
use tracing::{debug, error, info, trace, warn};
use anyhow::Result;

#[derive(NetworkBehaviour)]
pub struct NodeBehaviour {
Expand All @@ -32,7 +32,7 @@ pub struct EnclaveRouter {
}

impl EnclaveRouter {
pub fn new() -> Result<(Self, Sender<Vec<u8>>, Receiver<Vec<u8>>), Box<dyn Error>> {
pub fn new() -> Result<(Self, Sender<Vec<u8>>, Receiver<Vec<u8>>)> {
let (evt_tx, evt_rx) = channel(100); // TODO : tune this param
let (cmd_tx, cmd_rx) = channel(100); // TODO : tune this param
let message_id_fn = |message: &gossipsub::Message| {
Expand Down Expand Up @@ -62,11 +62,12 @@ impl EnclaveRouter {
))
}

pub fn with_identity(&mut self, keypair: &identity::Keypair) {
pub fn with_identity(&mut self, keypair: &identity::Keypair) -> &mut Self {
self.identity = Some(keypair.clone());
self
}

pub fn connect_swarm(&mut self) -> Result<&Self, Box<dyn Error>> {
pub fn connect_swarm(&mut self) -> Result<&mut Self> {
let connection_limits = connection_limits::Behaviour::new(ConnectionLimits::default());
let identify_config = IdentifyBehaviour::new(
identify::Config::new(
Expand Down Expand Up @@ -112,7 +113,7 @@ impl EnclaveRouter {
Ok(self)
}

pub fn join_topic(&mut self, topic_name: &str) -> Result<&Self, Box<dyn Error>> {
pub fn join_topic(&mut self, topic_name: &str) -> Result<&mut Self> {
let topic = gossipsub::IdentTopic::new(topic_name);
self.topic = Some(topic.clone());
self.swarm
Expand All @@ -125,7 +126,7 @@ impl EnclaveRouter {
}

/// Listen on the default multiaddr
pub async fn start(&mut self) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
pub async fn start(&mut self) -> Result<()> {
self.swarm
.as_mut()
.unwrap()
Expand Down
10 changes: 10 additions & 0 deletions packages/ciphernode/p2p/tests/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/env bash
pkill p2p_messaging
trap 'killall p2p_test 2>/dev/null' EXIT

cargo run --bin p2p_test alice &
cargo run --bin p2p_test bob &
cargo run --bin p2p_test charlie &

# Wait for all background processes to complete
wait

0 comments on commit dce1f1f

Please sign in to comment.