Skip to content

Commit

Permalink
Make actors kill themselves
Browse files Browse the repository at this point in the history
  • Loading branch information
ryardley committed Oct 9, 2024
1 parent 9e61d54 commit 2cdc1e4
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 45 deletions.
12 changes: 7 additions & 5 deletions packages/ciphernode/aggregator/src/plaintext_aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use actix::prelude::*;
use anyhow::Result;
use enclave_core::{
DecryptionshareCreated, Die, E3id, EnclaveEvent, EventBus, OrderedSet, PlaintextAggregated, E3RequestComplete, Seed
DecryptionshareCreated, Die, E3RequestComplete, E3id, EnclaveEvent, EventBus, OrderedSet,
PlaintextAggregated, Seed,
};
use fhe::{Fhe, GetAggregatePlaintext};
use sortition::{GetHasNode, Sortition};
Expand Down Expand Up @@ -107,8 +108,10 @@ impl Actor for PlaintextAggregator {
impl Handler<EnclaveEvent> for PlaintextAggregator {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
if let EnclaveEvent::DecryptionshareCreated { data, .. } = msg {
ctx.notify(data)
match msg {
EnclaveEvent::DecryptionshareCreated { data, .. } => ctx.notify(data),
EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die),
_ => (),
}
}
}
Expand Down Expand Up @@ -191,14 +194,13 @@ impl Handler<ComputeAggregate> for PlaintextAggregator {

self.bus.do_send(event);


Ok(())
}
}

impl Handler<Die> for PlaintextAggregator {
type Result = ();
fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result {
ctx.stop()
ctx.stop()
}
}
10 changes: 6 additions & 4 deletions packages/ciphernode/aggregator/src/publickey_aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use actix::prelude::*;
use anyhow::Result;
use enclave_core::{
Die, E3id, EnclaveEvent, EventBus, KeyshareCreated, OrderedSet, PublicKeyAggregated, Seed
Die, E3id, EnclaveEvent, EventBus, KeyshareCreated, OrderedSet, PublicKeyAggregated, Seed,
};
use fhe::{Fhe, GetAggregatePublicKey};
use sortition::{GetHasNode, GetNodes, Sortition};
Expand Down Expand Up @@ -116,8 +116,10 @@ impl Actor for PublicKeyAggregator {
impl Handler<EnclaveEvent> for PublicKeyAggregator {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
if let EnclaveEvent::KeyshareCreated { data, .. } = msg {
ctx.notify(data)
match msg {
EnclaveEvent::KeyshareCreated { data, .. } => ctx.notify(data),
EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die),
_ => (),
}
}
}
Expand Down Expand Up @@ -223,6 +225,6 @@ impl Handler<NotifyNetwork> for PublicKeyAggregator {
impl Handler<Die> for PublicKeyAggregator {
type Result = ();
fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result {
ctx.stop()
ctx.stop()
}
}
10 changes: 6 additions & 4 deletions packages/ciphernode/keyshare/src/keyshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use actix::prelude::*;
use anyhow::{anyhow, Context, Result};
use data::{Data, Get, Insert};
use enclave_core::{
CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, Die, EnclaveErrorType, EnclaveEvent, EventBus, FromError, KeyshareCreated
CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, Die, EnclaveErrorType,
EnclaveEvent, EventBus, FromError, KeyshareCreated,
};
use fhe::{DecryptCiphertext, Fhe};
use std::sync::Arc;
Expand Down Expand Up @@ -34,8 +35,9 @@ impl Handler<EnclaveEvent> for Keyshare {

fn handle(&mut self, event: EnclaveEvent, ctx: &mut actix::Context<Self>) -> Self::Result {
match event {
EnclaveEvent::CiphernodeSelected { data, .. } => ctx.address().do_send(data),
EnclaveEvent::CiphertextOutputPublished { data, .. } => ctx.address().do_send(data),
EnclaveEvent::CiphernodeSelected { data, .. } => ctx.notify(data),
EnclaveEvent::CiphertextOutputPublished { data, .. } => ctx.notify(data),
EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die),
_ => (),
}
}
Expand Down Expand Up @@ -101,7 +103,7 @@ impl Handler<CiphertextOutputPublished> for Keyshare {
impl Handler<Die> for Keyshare {
type Result = ();
fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result {
ctx.stop()
ctx.stop()
}
}

Expand Down
62 changes: 30 additions & 32 deletions packages/ciphernode/router/src/e3_request_router.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use crate::CommitteeMetaFactory;

use super::CommitteeMeta;
use actix::{Actor, Addr, Context, Handler, Recipient};
use aggregator::PlaintextAggregator;
use aggregator::PublicKeyAggregator;
use enclave_core::Die;
use enclave_core::E3RequestComplete;
use enclave_core::{E3id, EnclaveEvent, EventBus, Subscribe};
use fhe::Fhe;
use keyshare::Keyshare;

use actix::{Actor, Addr, Context, Handler, Recipient};
use std::collections::HashSet;
use std::{collections::HashMap, sync::Arc};

/// Helper class to buffer events for downstream instances incase events arrive in the wrong order
Expand Down Expand Up @@ -72,20 +71,6 @@ impl E3RequestContext {
}
});
}

pub fn cleanup(&mut self) {
if let Some(keyshare) = self.keyshare.take() {
keyshare.do_send(Die);
}
if let Some(plaintext) = self.plaintext.take() {
plaintext.do_send(Die);
}
if let Some(publickey) = self.publickey.take() {
publickey.do_send(Die);
}
self.fhe = None;
self.meta = None;
}
}

/// Format of the hook that needs to be passed to E3RequestRouter
Expand All @@ -99,6 +84,7 @@ pub type EventHook = Box<dyn FnMut(&mut E3RequestContext, EnclaveEvent)>;
// dependencies
pub struct E3RequestRouter {
contexts: HashMap<E3id, E3RequestContext>,
completed: HashSet<E3id>,
hooks: Vec<EventHook>,
buffer: EventBuffer,
bus: Addr<EventBus>,
Expand All @@ -124,6 +110,11 @@ impl Handler<EnclaveEvent> for E3RequestRouter {
return;
};

if self.completed.contains(&e3_id) {
// TODO: Log warning that e3 event was received for completed e3_id
return;
}

let context = self.contexts.entry(e3_id.clone()).or_default();

for hook in &mut self.hooks {
Expand All @@ -132,21 +123,27 @@ impl Handler<EnclaveEvent> for E3RequestRouter {

context.forward_message(&msg, &mut self.buffer);

// Here we are detemining that by receiving the PlaintextAggregated event our request is
// complete and we can notify everyone. This might change as we consider other factors
// when determining if the request is complete
if let EnclaveEvent::PlaintextAggregated { .. } = msg {
// Local event to clean up context
let event = EnclaveEvent::from(E3RequestComplete {
e3_id: e3_id.clone(),
});

// Send to bus so all other actors can react to a request being complete.
self.bus.do_send(event);

// clean up context
context.cleanup();
self.contexts.remove(&e3_id);
match msg.clone() {
EnclaveEvent::PlaintextAggregated { .. } => {
// Here we are detemining that by receiving the PlaintextAggregated event our request is
// complete and we can notify everyone. This might change as we consider other factors
// when determining if the request is complete
let bus = self.bus.clone();

let event = EnclaveEvent::from(E3RequestComplete {
e3_id: e3_id.clone(),
});

// Send to bus so all other actors can react to a request being complete.
bus.do_send(event);
}
EnclaveEvent::E3RequestComplete { .. } => {
// Note this will be sent above to the children who can kill themselves based on
// the event
self.contexts.remove(&e3_id);
self.completed.insert(e3_id);
}
_ => (),
}
}
}
Expand All @@ -165,6 +162,7 @@ impl E3RequestRouterBuilder {
pub fn build(self) -> Addr<E3RequestRouter> {
let e3r = E3RequestRouter {
contexts: HashMap::new(),
completed: HashSet::new(),
hooks: self.hooks,
buffer: EventBuffer::default(),
bus: self.bus.clone(),
Expand Down

0 comments on commit 2cdc1e4

Please sign in to comment.