Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Actor Destruction #142

Merged
merged 7 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions packages/ciphernode/aggregator/src/plaintext_aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use actix::prelude::*;
use anyhow::Result;
use enclave_core::{
DecryptionshareCreated, E3id, EnclaveEvent, EventBus, OrderedSet, PlaintextAggregated, Seed,
DecryptionshareCreated, Die, E3RequestComplete, E3id, EnclaveEvent, EventBus, OrderedSet,
PlaintextAggregated, Seed,
};
use fhe::{Fhe, GetAggregatePlaintext};
use sortition::{GetHasNode, Sortition};
Expand Down Expand Up @@ -107,8 +108,10 @@ impl Actor for PlaintextAggregator {
impl Handler<EnclaveEvent> for PlaintextAggregator {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
if let EnclaveEvent::DecryptionshareCreated { data, .. } = msg {
ctx.notify(data)
match msg {
EnclaveEvent::DecryptionshareCreated { data, .. } => ctx.notify(data),
EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die),
ryardley marked this conversation as resolved.
Show resolved Hide resolved
_ => (),
}
}
}
Expand Down Expand Up @@ -194,3 +197,10 @@ impl Handler<ComputeAggregate> for PlaintextAggregator {
Ok(())
}
}

impl Handler<Die> for PlaintextAggregator {
type Result = ();
fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result {
ctx.stop()
}
}
ryardley marked this conversation as resolved.
Show resolved Hide resolved
15 changes: 12 additions & 3 deletions packages/ciphernode/aggregator/src/publickey_aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use actix::prelude::*;
use anyhow::Result;
use enclave_core::{
E3id, EnclaveEvent, EventBus, KeyshareCreated, OrderedSet, PublicKeyAggregated, Seed,
Die, E3id, EnclaveEvent, EventBus, KeyshareCreated, OrderedSet, PublicKeyAggregated, Seed,
};
use fhe::{Fhe, GetAggregatePublicKey};
use sortition::{GetHasNode, GetNodes, Sortition};
Expand Down Expand Up @@ -116,8 +116,10 @@ impl Actor for PublicKeyAggregator {
impl Handler<EnclaveEvent> for PublicKeyAggregator {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result {
if let EnclaveEvent::KeyshareCreated { data, .. } = msg {
ctx.notify(data)
match msg {
EnclaveEvent::KeyshareCreated { data, .. } => ctx.notify(data),
EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die),
_ => (),
}
}
}
Expand Down Expand Up @@ -219,3 +221,10 @@ impl Handler<NotifyNetwork> for PublicKeyAggregator {
)
}
}

impl Handler<Die> for PublicKeyAggregator {
type Result = ();
fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result {
ctx.stop()
}
}
28 changes: 27 additions & 1 deletion packages/ciphernode/core/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use actix::Message;
use actix::{Actor, Addr, Message};
use alloy::{
hex,
primitives::{Uint, U256},
Expand Down Expand Up @@ -116,6 +116,10 @@ pub enum EnclaveEvent {
id: EventId,
data: EnclaveError,
},
E3RequestComplete {
id: EventId,
data: E3RequestComplete,
},
// CommitteeSelected,
// OutputDecrypted,
// CiphernodeRegistered,
Expand All @@ -142,6 +146,7 @@ impl EnclaveEvent {
EnclaveEvent::E3Requested { .. } => true,
EnclaveEvent::CiphernodeAdded { .. } => true,
EnclaveEvent::CiphernodeRemoved { .. } => true,
EnclaveEvent::E3RequestComplete { .. } => true,
_ => false,
}
}
Expand All @@ -160,6 +165,7 @@ impl From<EnclaveEvent> for EventId {
EnclaveEvent::CiphernodeAdded { id, .. } => id,
EnclaveEvent::CiphernodeRemoved { id, .. } => id,
EnclaveEvent::EnclaveError { id, .. } => id,
EnclaveEvent::E3RequestComplete { id, .. } => id,
}
}
}
Expand Down Expand Up @@ -238,6 +244,15 @@ impl From<PlaintextAggregated> for EnclaveEvent {
}
}

impl From<E3RequestComplete> for EnclaveEvent {
fn from(data: E3RequestComplete) -> Self {
EnclaveEvent::E3RequestComplete {
id: EventId::from(data.clone()),
data: data.clone(),
}
}
}

impl From<CiphernodeSelected> for EnclaveEvent {
fn from(data: CiphernodeSelected) -> Self {
EnclaveEvent::CiphernodeSelected {
Expand Down Expand Up @@ -349,6 +364,13 @@ pub struct PlaintextAggregated {
pub src_chain_id: u64,
}

/// E3RequestComplete event is a local only event
#[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[rtype(result = "()")]
pub struct E3RequestComplete {
pub e3_id: E3id,
}

#[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[rtype(result = "()")]
pub struct CiphernodeAdded {
Expand All @@ -372,6 +394,10 @@ pub struct EnclaveError {
pub message: String,
}

#[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[rtype(result = "()")]
pub struct Die;

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Seed(pub [u8; 32]);
impl From<Seed> for u64 {
Expand Down
14 changes: 11 additions & 3 deletions packages/ciphernode/keyshare/src/keyshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use actix::prelude::*;
use anyhow::{anyhow, Context, Result};
use data::{Data, Get, Insert};
use enclave_core::{
CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, EnclaveErrorType,
CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, Die, EnclaveErrorType,
EnclaveEvent, EventBus, FromError, KeyshareCreated,
};
use fhe::{DecryptCiphertext, Fhe};
Expand Down Expand Up @@ -35,8 +35,9 @@ impl Handler<EnclaveEvent> for Keyshare {

fn handle(&mut self, event: EnclaveEvent, ctx: &mut actix::Context<Self>) -> Self::Result {
match event {
EnclaveEvent::CiphernodeSelected { data, .. } => ctx.address().do_send(data),
EnclaveEvent::CiphertextOutputPublished { data, .. } => ctx.address().do_send(data),
EnclaveEvent::CiphernodeSelected { data, .. } => ctx.notify(data),
EnclaveEvent::CiphertextOutputPublished { data, .. } => ctx.notify(data),
EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die),
_ => (),
}
}
Expand Down Expand Up @@ -99,6 +100,13 @@ impl Handler<CiphertextOutputPublished> for Keyshare {
}
}

impl Handler<Die> for Keyshare {
type Result = ();
fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result {
ctx.stop()
}
}

async fn on_decryption_requested(
fhe: Arc<Fhe>,
data: Addr<Data>,
Expand Down
37 changes: 34 additions & 3 deletions packages/ciphernode/router/src/e3_request_router.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::CommitteeMetaFactory;

use super::CommitteeMeta;
use actix::{Actor, Addr, Context, Handler, Recipient};
use aggregator::PlaintextAggregator;
use aggregator::PublicKeyAggregator;
use enclave_core::E3RequestComplete;
use enclave_core::{E3id, EnclaveEvent, EventBus, Subscribe};
use fhe::Fhe;
use keyshare::Keyshare;

use actix::{Actor, Addr, Context, Handler, Recipient};
use std::collections::HashSet;
use std::{collections::HashMap, sync::Arc};

/// Helper class to buffer events for downstream instances incase events arrive in the wrong order
Expand Down Expand Up @@ -83,8 +84,10 @@ pub type EventHook = Box<dyn FnMut(&mut E3RequestContext, EnclaveEvent)>;
// dependencies
pub struct E3RequestRouter {
contexts: HashMap<E3id, E3RequestContext>,
completed: HashSet<E3id>,
hooks: Vec<EventHook>,
buffer: EventBuffer,
bus: Addr<EventBus>,
}

impl E3RequestRouter {
Expand All @@ -107,13 +110,39 @@ impl Handler<EnclaveEvent> for E3RequestRouter {
return;
};

let context = self.contexts.entry(e3_id).or_default();
if self.completed.contains(&e3_id) {
// TODO: Log warning that e3 event was received for completed e3_id
ryardley marked this conversation as resolved.
Show resolved Hide resolved
return;
}

let context = self.contexts.entry(e3_id.clone()).or_default();

for hook in &mut self.hooks {
hook(context, msg.clone());
}

context.forward_message(&msg, &mut self.buffer);

match &msg {
EnclaveEvent::PlaintextAggregated { .. } => {
// Here we are detemining that by receiving the PlaintextAggregated event our request is
// complete and we can notify everyone. This might change as we consider other factors
// when determining if the request is complete
let event = EnclaveEvent::from(E3RequestComplete {
e3_id: e3_id.clone(),
});

// Send to bus so all other actors can react to a request being complete.
self.bus.do_send(event);
}
EnclaveEvent::E3RequestComplete { .. } => {
// Note this will be sent above to the children who can kill themselves based on
// the event
self.contexts.remove(&e3_id);
self.completed.insert(e3_id);
}
_ => (),
}
}
}

Expand All @@ -131,8 +160,10 @@ impl E3RequestRouterBuilder {
pub fn build(self) -> Addr<E3RequestRouter> {
let e3r = E3RequestRouter {
contexts: HashMap::new(),
completed: HashSet::new(),
hooks: self.hooks,
buffer: EventBuffer::default(),
bus: self.bus.clone(),
};

let addr = e3r.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use data::Data;
use enclave_core::{
CiphernodeAdded, CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated,
E3Requested, E3id, EnclaveEvent, EventBus, GetHistory, KeyshareCreated, OrderedSet,
PlaintextAggregated, PublicKeyAggregated, ResetHistory, Seed,
E3RequestComplete, E3Requested, E3id, EnclaveEvent, EventBus, GetHistory, KeyshareCreated,
OrderedSet, PlaintextAggregated, PublicKeyAggregated, ResetHistory, Seed,
};
use fhe::{setup_crp_params, ParamsWithCrp, SharedRng};
use logger::SimpleLogger;
Expand Down Expand Up @@ -181,7 +181,7 @@ async fn test_public_key_aggregation_and_decryption() -> Result<()> {
e3_id: e3_id.clone(),
nodes: OrderedSet::from(eth_addrs.clone()),
src_chain_id: 1
})
}),
]
);

Expand Down Expand Up @@ -223,7 +223,7 @@ async fn test_public_key_aggregation_and_decryption() -> Result<()> {
sleep(Duration::from_millis(1)).await; // need to push to next tick
let history = bus.send(GetHistory).await?;

assert_eq!(history.len(), 5);
assert_eq!(history.len(), 6);

assert_eq!(
history,
Expand All @@ -248,6 +248,9 @@ async fn test_public_key_aggregation_and_decryption() -> Result<()> {
e3_id: e3_id.clone(),
decrypted_output: expected_raw_plaintext.clone(),
src_chain_id: 1
}),
EnclaveEvent::from(E3RequestComplete {
e3_id: e3_id.clone()
})
]
);
Expand Down
Loading