From 1ed022cee3f0b5dd097be9f200efd620343638fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=B3=CE=BB?= Date: Mon, 26 Aug 2024 22:43:36 +1000 Subject: [PATCH] Test P2p actor forwarding and encoding events downstream (#17) --- packages/ciphernode/core/src/eventbus.rs | 3 +- packages/ciphernode/core/src/events.rs | 4 + packages/ciphernode/core/src/lib.rs | 51 ++++++++++-- packages/ciphernode/core/src/p2p.rs | 100 ++++++++++++++++++++--- 4 files changed, 138 insertions(+), 20 deletions(-) diff --git a/packages/ciphernode/core/src/eventbus.rs b/packages/ciphernode/core/src/eventbus.rs index 9d63941f..6a3c8cff 100644 --- a/packages/ciphernode/core/src/eventbus.rs +++ b/packages/ciphernode/core/src/eventbus.rs @@ -79,9 +79,8 @@ impl Handler for EventBus { fn handle(&mut self, event: EnclaveEvent, _: &mut Context) { // Deduplicate by id - if self.ids.contains(&event.clone().into()) { + if self.ids.contains(&event.get_id()) { // We have seen this before - println!("Duplicate {}", EventId::from(event)); return; } diff --git a/packages/ciphernode/core/src/events.rs b/packages/ciphernode/core/src/events.rs index b638db4d..051052e9 100644 --- a/packages/ciphernode/core/src/events.rs +++ b/packages/ciphernode/core/src/events.rs @@ -73,6 +73,10 @@ impl EnclaveEvent { pub fn from_bytes(bytes: &[u8]) -> Result { bincode::deserialize(bytes) } + + pub fn get_id(&self) -> EventId { + self.clone().into() + } } impl From for EventId { diff --git a/packages/ciphernode/core/src/lib.rs b/packages/ciphernode/core/src/lib.rs index ff9dff94..493d0667 100644 --- a/packages/ciphernode/core/src/lib.rs +++ b/packages/ciphernode/core/src/lib.rs @@ -32,7 +32,7 @@ mod p2p; #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; use crate::{ ciphernode::Ciphernode, @@ -41,6 +41,7 @@ mod tests { eventbus::{EventBus, GetHistory, Subscribe}, events::{ComputationRequested, E3id, EnclaveEvent, KeyshareCreated, PublicKeyAggregated}, fhe::{Fhe, WrappedPublicKey, WrappedPublicKeyShare}, + p2p::P2p, }; use actix::prelude::*; use anyhow::*; @@ -50,6 +51,8 @@ mod tests { }; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; + use tokio::sync::Mutex; + use tokio::{sync::mpsc::channel, time::sleep}; // Simulating a local node fn setup_local_ciphernode( @@ -207,12 +210,44 @@ mod tests { Ok(()) } - // TODO: Test p2p - fn test_p2p_event_broadcasting() { - // Setup two Vec channels to simulate libp2p - // 1. command channel - // 2. event channel - // Pass them to the p2p actor - // connect the p2p actor to the event bus actor and monitor which events are broadcast + #[actix::test] + async fn test_p2p_event_broadcasting() -> Result<()> { + let (tx, mut output) = channel(100); // Transmit byte events to the network + let (_, rx) = channel(100); // Receive byte events from the network + let bus = EventBus::new(true).start(); + let _ = P2p::spawn_and_listen(bus.clone(), tx.clone(), rx); + + // Capture messages from output on msgs vec + let msgs: Arc>>> = Arc::new(Mutex::new(Vec::new())); + let msgs_loop = msgs.clone(); + + tokio::spawn(async move { + while let Some(msg) = output.recv().await { + msgs_loop.lock().await.push(msg); + } + }); + + let evt_1 = EnclaveEvent::from(ComputationRequested { + e3_id: E3id::new("1234"), + nodecount: 3, + threshold: 123, + sortition_seed: 123, + }); + + let evt_2 = EnclaveEvent::from(ComputationRequested { + e3_id: E3id::new("1235"), + nodecount: 3, + threshold: 123, + sortition_seed: 123, + }); + + bus.do_send(evt_1.clone()); + bus.do_send(evt_2.clone()); + + sleep(Duration::from_millis(1)).await; // need to push to next tick + + assert_eq!(*msgs.lock().await, vec![evt_1.to_bytes()?, evt_2.to_bytes()?], "P2p did not transmit events to the network"); + + Ok(()) } } diff --git a/packages/ciphernode/core/src/p2p.rs b/packages/ciphernode/core/src/p2p.rs index 6a5e9bb2..3bcfa929 100644 --- a/packages/ciphernode/core/src/p2p.rs +++ b/packages/ciphernode/core/src/p2p.rs @@ -1,24 +1,104 @@ -/// Actor for connecting to an libp2p client via it's mpsc channel interface -/// This Actor should be responsible for +use std::collections::HashSet; + +/// Actor for connecting to an libp2p client via it's mpsc channel interface +/// This Actor should be responsible for /// 1. Sending and Recieving Vec messages with libp2p /// 2. Converting between Vec and EnclaveEvents::Xxxxxxxxx() /// 3. Broadcasting over the local eventbus /// 4. Listening to the local eventbus for messages to be published to libp2p -use actix::{Actor, Context}; +use actix::prelude::*; use tokio::sync::mpsc::{Receiver, Sender}; -use p2p::EnclaveRouter; -pub struct P2p; +use crate::{ + eventbus::{EventBus, Subscribe}, + events::{EnclaveEvent, EventId}, +}; + +pub struct P2p { + bus: Addr, + tx: Sender>, + sent_events: HashSet, +} -impl Actor for P2p{ +impl Actor for P2p { type Context = Context; } +#[derive(Message, Clone, Debug, PartialEq, Eq)] +#[rtype(result = "anyhow::Result<()>")] +struct LibP2pEvent(pub Vec); + impl P2p { - pub fn new() { - // Construct owning Libp2p module + pub fn new(bus: Addr, tx: Sender>) -> Self { + Self { + bus, + tx, + sent_events: HashSet::new(), + } } - pub fn from_channel(tx:Sender>, rx:Receiver>){ - // Construct from tx/rx + + /// Start a new P2p actor listening for libp2p messages on the given Receiver and forwarding + /// them to the actor + pub fn spawn_and_listen( + bus: Addr, + tx: Sender>, // Transmit byte events to the network + mut rx: Receiver>, // Receive byte events from the network + ) -> Addr { + // Create a new Actor + let p2p = P2p::new(bus.clone(), tx).start(); + + // Listen on all events + bus.do_send(Subscribe { + event_type: String::from("*"), + listener: p2p.clone().recipient(), + }); + + // Clone this to go in the spawned future + let p2p_addr = p2p.clone(); + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + p2p_addr.do_send(LibP2pEvent(msg)) + } + }); + + // Return the address + p2p + } +} + +impl Handler for P2p { + type Result = anyhow::Result<()>; + fn handle(&mut self, msg: LibP2pEvent, _: &mut Self::Context) -> Self::Result { + let LibP2pEvent(bytes) = msg; + match EnclaveEvent::from_bytes(&bytes) { + Ok(event) => { + self.bus.do_send(event.clone()); + self.sent_events.insert(event.into()); + } + Err(err) => println!("Error: {}", err), + } + Ok(()) + } +} + +impl Handler for P2p { + type Result = ResponseFuture<()>; + fn handle(&mut self, event: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + let sent_events = self.sent_events.clone(); + let tx = self.tx.clone(); + let evt = event.clone(); + Box::pin(async move { + let id: EventId = evt.clone().into(); + if sent_events.contains(&id) { + return; + } + + match evt.to_bytes() { + Ok(bytes) => { + let _ = tx.send(bytes).await; + } + Err(error) => println!("Error: {}", error), + } + }) } }