diff --git a/packages/ciphernode/aggregator/src/plaintext_aggregator.rs b/packages/ciphernode/aggregator/src/plaintext_aggregator.rs index 878bc045..c63a4dda 100644 --- a/packages/ciphernode/aggregator/src/plaintext_aggregator.rs +++ b/packages/ciphernode/aggregator/src/plaintext_aggregator.rs @@ -1,7 +1,8 @@ use actix::prelude::*; use anyhow::Result; use enclave_core::{ - DecryptionshareCreated, Die, E3id, EnclaveEvent, EventBus, OrderedSet, PlaintextAggregated, E3RequestComplete, Seed + DecryptionshareCreated, Die, E3RequestComplete, E3id, EnclaveEvent, EventBus, OrderedSet, + PlaintextAggregated, Seed, }; use fhe::{Fhe, GetAggregatePlaintext}; use sortition::{GetHasNode, Sortition}; @@ -107,8 +108,10 @@ impl Actor for PlaintextAggregator { impl Handler for PlaintextAggregator { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - if let EnclaveEvent::DecryptionshareCreated { data, .. } = msg { - ctx.notify(data) + match msg { + EnclaveEvent::DecryptionshareCreated { data, .. } => ctx.notify(data), + EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die), + _ => (), } } } @@ -191,7 +194,6 @@ impl Handler for PlaintextAggregator { self.bus.do_send(event); - Ok(()) } } @@ -199,6 +201,6 @@ impl Handler for PlaintextAggregator { impl Handler for PlaintextAggregator { type Result = (); fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result { - ctx.stop() + ctx.stop() } } diff --git a/packages/ciphernode/aggregator/src/publickey_aggregator.rs b/packages/ciphernode/aggregator/src/publickey_aggregator.rs index aa00b864..7274dafe 100644 --- a/packages/ciphernode/aggregator/src/publickey_aggregator.rs +++ b/packages/ciphernode/aggregator/src/publickey_aggregator.rs @@ -1,7 +1,7 @@ use actix::prelude::*; use anyhow::Result; use enclave_core::{ - Die, E3id, EnclaveEvent, EventBus, KeyshareCreated, OrderedSet, PublicKeyAggregated, Seed + Die, E3id, EnclaveEvent, EventBus, KeyshareCreated, OrderedSet, PublicKeyAggregated, Seed, }; use fhe::{Fhe, GetAggregatePublicKey}; use sortition::{GetHasNode, GetNodes, Sortition}; @@ -116,8 +116,10 @@ impl Actor for PublicKeyAggregator { impl Handler for PublicKeyAggregator { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - if let EnclaveEvent::KeyshareCreated { data, .. } = msg { - ctx.notify(data) + match msg { + EnclaveEvent::KeyshareCreated { data, .. } => ctx.notify(data), + EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die), + _ => (), } } } @@ -223,6 +225,6 @@ impl Handler for PublicKeyAggregator { impl Handler for PublicKeyAggregator { type Result = (); fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result { - ctx.stop() + ctx.stop() } } diff --git a/packages/ciphernode/keyshare/src/keyshare.rs b/packages/ciphernode/keyshare/src/keyshare.rs index f2bfb449..2d036a97 100644 --- a/packages/ciphernode/keyshare/src/keyshare.rs +++ b/packages/ciphernode/keyshare/src/keyshare.rs @@ -2,7 +2,8 @@ use actix::prelude::*; use anyhow::{anyhow, Context, Result}; use data::{Data, Get, Insert}; use enclave_core::{ - CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, Die, EnclaveErrorType, EnclaveEvent, EventBus, FromError, KeyshareCreated + CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, Die, EnclaveErrorType, + EnclaveEvent, EventBus, FromError, KeyshareCreated, }; use fhe::{DecryptCiphertext, Fhe}; use std::sync::Arc; @@ -34,8 +35,9 @@ impl Handler for Keyshare { fn handle(&mut self, event: EnclaveEvent, ctx: &mut actix::Context) -> Self::Result { match event { - EnclaveEvent::CiphernodeSelected { data, .. } => ctx.address().do_send(data), - EnclaveEvent::CiphertextOutputPublished { data, .. } => ctx.address().do_send(data), + EnclaveEvent::CiphernodeSelected { data, .. } => ctx.notify(data), + EnclaveEvent::CiphertextOutputPublished { data, .. } => ctx.notify(data), + EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die), _ => (), } } @@ -101,7 +103,7 @@ impl Handler for Keyshare { impl Handler for Keyshare { type Result = (); fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result { - ctx.stop() + ctx.stop() } } diff --git a/packages/ciphernode/router/src/e3_request_router.rs b/packages/ciphernode/router/src/e3_request_router.rs index 07f3d903..57975361 100644 --- a/packages/ciphernode/router/src/e3_request_router.rs +++ b/packages/ciphernode/router/src/e3_request_router.rs @@ -1,15 +1,14 @@ use crate::CommitteeMetaFactory; use super::CommitteeMeta; +use actix::{Actor, Addr, Context, Handler, Recipient}; use aggregator::PlaintextAggregator; use aggregator::PublicKeyAggregator; -use enclave_core::Die; use enclave_core::E3RequestComplete; use enclave_core::{E3id, EnclaveEvent, EventBus, Subscribe}; use fhe::Fhe; use keyshare::Keyshare; - -use actix::{Actor, Addr, Context, Handler, Recipient}; +use std::collections::HashSet; use std::{collections::HashMap, sync::Arc}; /// Helper class to buffer events for downstream instances incase events arrive in the wrong order @@ -72,20 +71,6 @@ impl E3RequestContext { } }); } - - pub fn cleanup(&mut self) { - if let Some(keyshare) = self.keyshare.take() { - keyshare.do_send(Die); - } - if let Some(plaintext) = self.plaintext.take() { - plaintext.do_send(Die); - } - if let Some(publickey) = self.publickey.take() { - publickey.do_send(Die); - } - self.fhe = None; - self.meta = None; - } } /// Format of the hook that needs to be passed to E3RequestRouter @@ -99,6 +84,7 @@ pub type EventHook = Box; // dependencies pub struct E3RequestRouter { contexts: HashMap, + completed: HashSet, hooks: Vec, buffer: EventBuffer, bus: Addr, @@ -124,6 +110,11 @@ impl Handler for E3RequestRouter { return; }; + if self.completed.contains(&e3_id) { + // TODO: Log warning that e3 event was received for completed e3_id + return; + } + let context = self.contexts.entry(e3_id.clone()).or_default(); for hook in &mut self.hooks { @@ -132,21 +123,27 @@ impl Handler for E3RequestRouter { context.forward_message(&msg, &mut self.buffer); - // Here we are detemining that by receiving the PlaintextAggregated event our request is - // complete and we can notify everyone. This might change as we consider other factors - // when determining if the request is complete - if let EnclaveEvent::PlaintextAggregated { .. } = msg { - // Local event to clean up context - let event = EnclaveEvent::from(E3RequestComplete { - e3_id: e3_id.clone(), - }); - - // Send to bus so all other actors can react to a request being complete. - self.bus.do_send(event); - - // clean up context - context.cleanup(); - self.contexts.remove(&e3_id); + match msg.clone() { + EnclaveEvent::PlaintextAggregated { .. } => { + // Here we are detemining that by receiving the PlaintextAggregated event our request is + // complete and we can notify everyone. This might change as we consider other factors + // when determining if the request is complete + let bus = self.bus.clone(); + + let event = EnclaveEvent::from(E3RequestComplete { + e3_id: e3_id.clone(), + }); + + // Send to bus so all other actors can react to a request being complete. + bus.do_send(event); + } + EnclaveEvent::E3RequestComplete { .. } => { + // Note this will be sent above to the children who can kill themselves based on + // the event + self.contexts.remove(&e3_id); + self.completed.insert(e3_id); + } + _ => (), } } } @@ -165,6 +162,7 @@ impl E3RequestRouterBuilder { pub fn build(self) -> Addr { let e3r = E3RequestRouter { contexts: HashMap::new(), + completed: HashSet::new(), hooks: self.hooks, buffer: EventBuffer::default(), bus: self.bus.clone(),