Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic event forwarding test #17

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions packages/ciphernode/core/src/eventbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ impl Handler<EnclaveEvent> for EventBus {

fn handle(&mut self, event: EnclaveEvent, _: &mut Context<Self>) {
// Deduplicate by id
if self.ids.contains(&event.clone().into()) {
if self.ids.contains(&event.get_id()) {
// We have seen this before
println!("Duplicate {}", EventId::from(event));
return;
}

Expand Down
4 changes: 4 additions & 0 deletions packages/ciphernode/core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ impl EnclaveEvent {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::Error> {
bincode::deserialize(bytes)
}

pub fn get_id(&self) -> EventId {
self.clone().into()
}
}

impl From<EnclaveEvent> for EventId {
Expand Down
51 changes: 43 additions & 8 deletions packages/ciphernode/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod p2p;

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use crate::{
ciphernode::Ciphernode,
Expand All @@ -41,6 +41,7 @@ mod tests {
eventbus::{EventBus, GetHistory, Subscribe},
events::{ComputationRequested, E3id, EnclaveEvent, KeyshareCreated, PublicKeyAggregated},
fhe::{Fhe, WrappedPublicKey, WrappedPublicKeyShare},
p2p::P2p,
};
use actix::prelude::*;
use anyhow::*;
Expand All @@ -50,6 +51,8 @@ mod tests {
};
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
use tokio::sync::Mutex;
use tokio::{sync::mpsc::channel, time::sleep};

// Simulating a local node
fn setup_local_ciphernode(
Expand Down Expand Up @@ -207,12 +210,44 @@ mod tests {
Ok(())
}

// TODO: Test p2p
fn test_p2p_event_broadcasting() {
// Setup two Vec<u8> channels to simulate libp2p
// 1. command channel
// 2. event channel
// Pass them to the p2p actor
// connect the p2p actor to the event bus actor and monitor which events are broadcast
#[actix::test]
async fn test_p2p_event_broadcasting() -> Result<()> {
let (tx, mut output) = channel(100); // Transmit byte events to the network
let (_, rx) = channel(100); // Receive byte events from the network
let bus = EventBus::new(true).start();
let _ = P2p::spawn_and_listen(bus.clone(), tx.clone(), rx);

// Capture messages from output on msgs vec
let msgs: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
let msgs_loop = msgs.clone();

tokio::spawn(async move {
while let Some(msg) = output.recv().await {
msgs_loop.lock().await.push(msg);
}
});

let evt_1 = EnclaveEvent::from(ComputationRequested {
e3_id: E3id::new("1234"),
nodecount: 3,
threshold: 123,
sortition_seed: 123,
});

let evt_2 = EnclaveEvent::from(ComputationRequested {
e3_id: E3id::new("1235"),
nodecount: 3,
threshold: 123,
sortition_seed: 123,
});

bus.do_send(evt_1.clone());
bus.do_send(evt_2.clone());

sleep(Duration::from_millis(1)).await; // need to push to next tick

assert_eq!(*msgs.lock().await, vec![evt_1.to_bytes()?, evt_2.to_bytes()?], "P2p did not transmit events to the network");

Ok(())
}
}
100 changes: 90 additions & 10 deletions packages/ciphernode/core/src/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,104 @@
/// Actor for connecting to an libp2p client via it's mpsc channel interface
/// This Actor should be responsible for
use std::collections::HashSet;

/// Actor for connecting to an libp2p client via it's mpsc channel interface
/// This Actor should be responsible for
/// 1. Sending and Recieving Vec<u8> messages with libp2p
/// 2. Converting between Vec<u8> and EnclaveEvents::Xxxxxxxxx()
/// 3. Broadcasting over the local eventbus
/// 4. Listening to the local eventbus for messages to be published to libp2p
use actix::{Actor, Context};
use actix::prelude::*;
use tokio::sync::mpsc::{Receiver, Sender};
use p2p::EnclaveRouter;

pub struct P2p;
use crate::{
eventbus::{EventBus, Subscribe},
events::{EnclaveEvent, EventId},
};

pub struct P2p {
bus: Addr<EventBus>,
tx: Sender<Vec<u8>>,
sent_events: HashSet<EventId>,
}

impl Actor for P2p{
impl Actor for P2p {
type Context = Context<Self>;
}

#[derive(Message, Clone, Debug, PartialEq, Eq)]
#[rtype(result = "anyhow::Result<()>")]
struct LibP2pEvent(pub Vec<u8>);

impl P2p {
pub fn new() {
// Construct owning Libp2p module
pub fn new(bus: Addr<EventBus>, tx: Sender<Vec<u8>>) -> Self {
Self {
bus,
tx,
sent_events: HashSet::new(),
}
}
pub fn from_channel(tx:Sender<Vec<u8>>, rx:Receiver<Vec<u8>>){
// Construct from tx/rx

/// Start a new P2p actor listening for libp2p messages on the given Receiver and forwarding
/// them to the actor
pub fn spawn_and_listen(
bus: Addr<EventBus>,
tx: Sender<Vec<u8>>, // Transmit byte events to the network
mut rx: Receiver<Vec<u8>>, // Receive byte events from the network
) -> Addr<Self> {
// Create a new Actor
let p2p = P2p::new(bus.clone(), tx).start();

// Listen on all events
bus.do_send(Subscribe {
event_type: String::from("*"),
listener: p2p.clone().recipient(),
});

// Clone this to go in the spawned future
let p2p_addr = p2p.clone();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
p2p_addr.do_send(LibP2pEvent(msg))
}
});

// Return the address
p2p
}
}

impl Handler<LibP2pEvent> for P2p {
type Result = anyhow::Result<()>;
fn handle(&mut self, msg: LibP2pEvent, _: &mut Self::Context) -> Self::Result {
let LibP2pEvent(bytes) = msg;
match EnclaveEvent::from_bytes(&bytes) {
Ok(event) => {
self.bus.do_send(event.clone());
self.sent_events.insert(event.into());
}
Err(err) => println!("Error: {}", err),
}
Ok(())
}
}

impl Handler<EnclaveEvent> for P2p {
type Result = ResponseFuture<()>;
fn handle(&mut self, event: EnclaveEvent, _: &mut Self::Context) -> Self::Result {
let sent_events = self.sent_events.clone();
let tx = self.tx.clone();
let evt = event.clone();
Box::pin(async move {
let id: EventId = evt.clone().into();
if sent_events.contains(&id) {
return;
}

match evt.to_bytes() {
Ok(bytes) => {
let _ = tx.send(bytes).await;
}
Err(error) => println!("Error: {}", error),
}
})
}
}
Loading