Skip to content

Commit

Permalink
Test P2p actor forwarding and encoding events downstream (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryardley authored Aug 26, 2024
1 parent e277f4c commit 1ed022c
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 20 deletions.
3 changes: 1 addition & 2 deletions packages/ciphernode/core/src/eventbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ impl Handler<EnclaveEvent> for EventBus {

fn handle(&mut self, event: EnclaveEvent, _: &mut Context<Self>) {
// Deduplicate by id
if self.ids.contains(&event.clone().into()) {
if self.ids.contains(&event.get_id()) {
// We have seen this before
println!("Duplicate {}", EventId::from(event));
return;
}

Expand Down
4 changes: 4 additions & 0 deletions packages/ciphernode/core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ impl EnclaveEvent {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::Error> {
bincode::deserialize(bytes)
}

pub fn get_id(&self) -> EventId {
self.clone().into()
}
}

impl From<EnclaveEvent> for EventId {
Expand Down
51 changes: 43 additions & 8 deletions packages/ciphernode/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod p2p;

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use crate::{
ciphernode::Ciphernode,
Expand All @@ -41,6 +41,7 @@ mod tests {
eventbus::{EventBus, GetHistory, Subscribe},
events::{ComputationRequested, E3id, EnclaveEvent, KeyshareCreated, PublicKeyAggregated},
fhe::{Fhe, WrappedPublicKey, WrappedPublicKeyShare},
p2p::P2p,
};
use actix::prelude::*;
use anyhow::*;
Expand All @@ -50,6 +51,8 @@ mod tests {
};
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
use tokio::sync::Mutex;
use tokio::{sync::mpsc::channel, time::sleep};

// Simulating a local node
fn setup_local_ciphernode(
Expand Down Expand Up @@ -207,12 +210,44 @@ mod tests {
Ok(())
}

// TODO: Test p2p
fn test_p2p_event_broadcasting() {
// Setup two Vec<u8> channels to simulate libp2p
// 1. command channel
// 2. event channel
// Pass them to the p2p actor
// connect the p2p actor to the event bus actor and monitor which events are broadcast
#[actix::test]
async fn test_p2p_event_broadcasting() -> Result<()> {
let (tx, mut output) = channel(100); // Transmit byte events to the network
let (_, rx) = channel(100); // Receive byte events from the network
let bus = EventBus::new(true).start();
let _ = P2p::spawn_and_listen(bus.clone(), tx.clone(), rx);

// Capture messages from output on msgs vec
let msgs: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
let msgs_loop = msgs.clone();

tokio::spawn(async move {
while let Some(msg) = output.recv().await {
msgs_loop.lock().await.push(msg);
}
});

let evt_1 = EnclaveEvent::from(ComputationRequested {
e3_id: E3id::new("1234"),
nodecount: 3,
threshold: 123,
sortition_seed: 123,
});

let evt_2 = EnclaveEvent::from(ComputationRequested {
e3_id: E3id::new("1235"),
nodecount: 3,
threshold: 123,
sortition_seed: 123,
});

bus.do_send(evt_1.clone());
bus.do_send(evt_2.clone());

sleep(Duration::from_millis(1)).await; // need to push to next tick

assert_eq!(*msgs.lock().await, vec![evt_1.to_bytes()?, evt_2.to_bytes()?], "P2p did not transmit events to the network");

Ok(())
}
}
100 changes: 90 additions & 10 deletions packages/ciphernode/core/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,104 @@
/// Actor for connecting to an libp2p client via it's mpsc channel interface
/// This Actor should be responsible for
use std::collections::HashSet;

/// Actor for connecting to an libp2p client via it's mpsc channel interface
/// This Actor should be responsible for
/// 1. Sending and Recieving Vec<u8> messages with libp2p
/// 2. Converting between Vec<u8> and EnclaveEvents::Xxxxxxxxx()
/// 3. Broadcasting over the local eventbus
/// 4. Listening to the local eventbus for messages to be published to libp2p
use actix::{Actor, Context};
use actix::prelude::*;
use tokio::sync::mpsc::{Receiver, Sender};
use p2p::EnclaveRouter;

pub struct P2p;
use crate::{
eventbus::{EventBus, Subscribe},
events::{EnclaveEvent, EventId},
};

pub struct P2p {
bus: Addr<EventBus>,
tx: Sender<Vec<u8>>,
sent_events: HashSet<EventId>,
}

impl Actor for P2p{
impl Actor for P2p {
type Context = Context<Self>;
}

#[derive(Message, Clone, Debug, PartialEq, Eq)]
#[rtype(result = "anyhow::Result<()>")]
struct LibP2pEvent(pub Vec<u8>);

impl P2p {
pub fn new() {
// Construct owning Libp2p module
pub fn new(bus: Addr<EventBus>, tx: Sender<Vec<u8>>) -> Self {
Self {
bus,
tx,
sent_events: HashSet::new(),
}
}
pub fn from_channel(tx:Sender<Vec<u8>>, rx:Receiver<Vec<u8>>){
// Construct from tx/rx

/// Start a new P2p actor listening for libp2p messages on the given Receiver and forwarding
/// them to the actor
pub fn spawn_and_listen(
bus: Addr<EventBus>,
tx: Sender<Vec<u8>>, // Transmit byte events to the network
mut rx: Receiver<Vec<u8>>, // Receive byte events from the network
) -> Addr<Self> {
// Create a new Actor
let p2p = P2p::new(bus.clone(), tx).start();

// Listen on all events
bus.do_send(Subscribe {
event_type: String::from("*"),
listener: p2p.clone().recipient(),
});

// Clone this to go in the spawned future
let p2p_addr = p2p.clone();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
p2p_addr.do_send(LibP2pEvent(msg))
}
});

// Return the address
p2p
}
}

impl Handler<LibP2pEvent> for P2p {
type Result = anyhow::Result<()>;
fn handle(&mut self, msg: LibP2pEvent, _: &mut Self::Context) -> Self::Result {
let LibP2pEvent(bytes) = msg;
match EnclaveEvent::from_bytes(&bytes) {
Ok(event) => {
self.bus.do_send(event.clone());
self.sent_events.insert(event.into());
}
Err(err) => println!("Error: {}", err),
}
Ok(())
}
}

impl Handler<EnclaveEvent> for P2p {
type Result = ResponseFuture<()>;
fn handle(&mut self, event: EnclaveEvent, _: &mut Self::Context) -> Self::Result {
let sent_events = self.sent_events.clone();
let tx = self.tx.clone();
let evt = event.clone();
Box::pin(async move {
let id: EventId = evt.clone().into();
if sent_events.contains(&id) {
return;
}

match evt.to_bytes() {
Ok(bytes) => {
let _ = tx.send(bytes).await;
}
Err(error) => println!("Error: {}", error),
}
})
}
}

0 comments on commit 1ed022c

Please sign in to comment.