From c2cb0044f3171d467bd957dc30df13449d43012a Mon Sep 17 00:00:00 2001 From: ryardley Date: Thu, 19 Sep 2024 11:54:06 +1000 Subject: [PATCH] Remove Orchestrators --- packages/ciphernode/core/src/ciphernode.rs | 42 ++-- .../core/src/ciphernode_orchestrator.rs | 93 -------- .../ciphernode/core/src/committee_meta.rs | 26 +++ packages/ciphernode/core/src/e3_request.rs | 145 ++++++++++++ packages/ciphernode/core/src/fhe.rs | 27 +++ packages/ciphernode/core/src/lib.rs | 45 ++-- .../ciphernode/core/src/main_aggregator.rs | 25 +- .../ciphernode/core/src/main_ciphernode.rs | 29 ++- packages/ciphernode/core/src/orchestrator.rs | 216 ------------------ .../core/src/plaintext_aggregator.rs | 33 ++- .../core/src/plaintext_orchestrator.rs | 101 -------- .../core/src/publickey_aggregator.rs | 33 ++- .../core/src/publickey_orchestrator.rs | 102 --------- 13 files changed, 332 insertions(+), 585 deletions(-) delete mode 100644 packages/ciphernode/core/src/ciphernode_orchestrator.rs create mode 100644 packages/ciphernode/core/src/committee_meta.rs create mode 100644 packages/ciphernode/core/src/e3_request.rs delete mode 100644 packages/ciphernode/core/src/orchestrator.rs delete mode 100644 packages/ciphernode/core/src/plaintext_orchestrator.rs delete mode 100644 packages/ciphernode/core/src/publickey_orchestrator.rs diff --git a/packages/ciphernode/core/src/ciphernode.rs b/packages/ciphernode/core/src/ciphernode.rs index 7bd4f271..8bf0e9c5 100644 --- a/packages/ciphernode/core/src/ciphernode.rs +++ b/packages/ciphernode/core/src/ciphernode.rs @@ -3,8 +3,8 @@ use crate::{ eventbus::EventBus, events::{EnclaveEvent, KeyshareCreated}, fhe::{Fhe, GenerateKeyshare}, - CiphernodeSelected, CiphertextOutputPublished, DecryptCiphertext, DecryptionshareCreated, Get, - Subscribe, + ActorFactory, CiphernodeSelected, CiphertextOutputPublished, DecryptCiphertext, + DecryptionshareCreated, Get, }; use actix::prelude::*; use alloy_primitives::Address; @@ -30,25 +30,6 @@ impl Ciphernode { address, } } - - pub async fn attach( - bus: Addr, - fhe: Addr, - data: Addr, - address: Address, - ) -> Addr { - let node = Ciphernode::new(bus.clone(), fhe, data, address).start(); - let _ = bus - .send(Subscribe::new("CiphernodeSelected", node.clone().into())) - .await; - let _ = bus - .send(Subscribe::new( - "CiphertextOutputPublished", - node.clone().into(), - )) - .await; - node - } } impl Handler for Ciphernode { @@ -167,3 +148,22 @@ async fn on_decryption_requested( Ok(()) } + +pub struct CiphernodeFactory; +impl CiphernodeFactory { + pub fn create(bus: Addr, data: Addr, address: Address) -> ActorFactory { + Box::new(move |ctx, evt| { + // Save Ciphernode on CiphernodeSelected + let EnclaveEvent::CiphernodeSelected { .. } = evt else { + return; + }; + + let Some(ref fhe) = ctx.fhe else { + return; + }; + + ctx.ciphernode = + Some(Ciphernode::new(bus.clone(), fhe.clone(), data.clone(), address).start()) + }) + } +} diff --git a/packages/ciphernode/core/src/ciphernode_orchestrator.rs b/packages/ciphernode/core/src/ciphernode_orchestrator.rs deleted file mode 100644 index 4fc80a99..00000000 --- a/packages/ciphernode/core/src/ciphernode_orchestrator.rs +++ /dev/null @@ -1,93 +0,0 @@ -// TODO: spawn and supervise child actors -use crate::{Ciphernode, Data, E3id, EnclaveEvent, EventBus, Fhe, InitializeWithEnclaveEvent}; -use actix::prelude::*; -use alloy_primitives::Address; -use std::collections::HashMap; - -pub struct CiphernodeOrchestrator { - bus: Addr, - data: Addr, - address: Address, - ciphernodes: HashMap>, - buffers: HashMap>, -} - -impl CiphernodeOrchestrator { - pub fn new(bus: Addr, data: Addr, address: Address) -> Self { - Self { - bus, - data, - address, - ciphernodes: HashMap::new(), - buffers: HashMap::new(), - } - } - - pub fn attach(bus: Addr, data: Addr, address: Address) -> Addr { - CiphernodeOrchestrator::new(bus, data, address).start() - } -} - -impl Actor for CiphernodeOrchestrator { - type Context = Context; -} - -impl Handler for CiphernodeOrchestrator { - type Result = (); - fn handle(&mut self, msg: InitializeWithEnclaveEvent, _: &mut Self::Context) -> Self::Result { - let InitializeWithEnclaveEvent { fhe, event, .. } = msg; - let EnclaveEvent::CiphernodeSelected { data, .. } = event else { - return; - }; - let ciphernode_factory = self.ciphernode_factory(fhe.clone()); - self.ciphernodes - .entry(data.e3_id.clone()) - .or_insert_with(ciphernode_factory); - } -} - -impl Handler for CiphernodeOrchestrator { - type Result = (); - - fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { - let Some(e3_id) = msg.get_e3_id() else { - return; - }; - - self.forward_message(&e3_id, msg); - } -} - -impl CiphernodeOrchestrator { - fn ciphernode_factory(&self, fhe: Addr) -> impl FnOnce() -> Addr { - let data = self.data.clone(); - let bus = self.bus.clone(); - let address = self.address; - move || Ciphernode::new(bus, fhe, data, address).start() - } - - fn store_msg(&mut self, e3_id: E3id, msg: EnclaveEvent) { - self.buffers.entry(e3_id).or_default().push(msg); - } - - fn take_msgs(&mut self, e3_id: E3id) -> Vec { - self.buffers - .get_mut(&e3_id) - .map(std::mem::take) - .unwrap_or_default() - } - - fn forward_message(&mut self, e3_id: &E3id, msg: EnclaveEvent) { - // Buffer events for each thing that has not been created - if let Some(act) = self.ciphernodes.clone().get(e3_id) { - let msgs = self.take_msgs(e3_id.clone()); - let recipient = act.clone().recipient(); - recipient.do_send(msg.clone()); - for m in msgs { - recipient.do_send(m); - } - } else { - self.store_msg(e3_id.clone(), msg.clone()); - } - } -} diff --git a/packages/ciphernode/core/src/committee_meta.rs b/packages/ciphernode/core/src/committee_meta.rs new file mode 100644 index 00000000..78be9141 --- /dev/null +++ b/packages/ciphernode/core/src/committee_meta.rs @@ -0,0 +1,26 @@ +use crate::{ActorFactory, CommitteeRequested, EnclaveEvent}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct CommitteeMeta { + pub nodecount: usize, + pub seed: u64, +} + +pub struct CommitteeMetaFactory; + +impl CommitteeMetaFactory { + pub fn create() -> ActorFactory { + Box::new(move |ctx, evt| { + let EnclaveEvent::CommitteeRequested { data, .. }: crate::EnclaveEvent = evt else { + return; + }; + let CommitteeRequested { + nodecount, + sortition_seed: seed, + .. + } = data; + + ctx.meta = Some(CommitteeMeta { nodecount, seed }); + }) + } +} diff --git a/packages/ciphernode/core/src/e3_request.rs b/packages/ciphernode/core/src/e3_request.rs new file mode 100644 index 00000000..32de3914 --- /dev/null +++ b/packages/ciphernode/core/src/e3_request.rs @@ -0,0 +1,145 @@ +use std::collections::HashMap; + +use actix::{Actor, Addr, Context, Handler, Recipient}; + +use crate::{ + Ciphernode, CommitteeMeta, E3id, EnclaveEvent, EventBus, Fhe, PlaintextAggregator, + PublicKeyAggregator, Subscribe, +}; + +#[derive(Default)] +// TODO: Set this up with a Typestate pattern +pub struct E3RequestContext { + pub ciphernode: Option>, + pub fhe: Option>, + pub plaintext: Option>, + pub publickey: Option>, + pub meta: Option, +} + +struct EventBuffer { + buffer: HashMap>, +} + +impl Default for EventBuffer { + fn default() -> Self { + Self { + buffer: HashMap::new(), + } + } +} + +impl EventBuffer { + pub fn add(&mut self, key: &str, event: EnclaveEvent) { + self.buffer.entry(key.to_string()).or_default().push(event) + } + + pub fn take(&mut self, key: &str) -> Vec { + self.buffer + .get_mut(key) + .map(std::mem::take) + .unwrap_or_default() + } +} + +impl E3RequestContext { + fn recipients(&self) -> Vec<(String, Option>)> { + vec![ + ( + "ciphernode".to_owned(), + self.ciphernode.clone().map(|addr| addr.into()), + ), + ( + "plaintext".to_owned(), + self.plaintext.clone().map(|addr| addr.into()), + ), + ( + "publickey".to_owned(), + self.publickey.clone().map(|addr| addr.into()), + ), + ] + } + + fn forward_message(&self, msg: &EnclaveEvent, buffer: &mut EventBuffer) { + self.recipients().into_iter().for_each(|(key, recipient)| { + if let Some(act) = recipient { + act.do_send(msg.clone()); + for m in buffer.take(&key) { + act.do_send(m); + } + } else { + buffer.add(&key, msg.clone()); + } + }); + } +} + +struct E3RequestBuffers { + ciphernode: Vec, + publickey: Vec, + plaintext: Vec, +} + +pub type ActorFactory = Box; + +// TODO: setup typestate pattern so that we have to place factories within correct order of +// dependencies +pub struct E3RequestManager { + contexts: HashMap, + factories: Vec, + buffer: EventBuffer, +} + +impl E3RequestManager { + pub fn builder(bus: Addr) -> E3RequestManagerBuilder { + E3RequestManagerBuilder { + bus, + factories: vec![], + } + } +} + +pub struct E3RequestManagerBuilder { + pub bus: Addr, + pub factories: Vec, +} +impl E3RequestManagerBuilder { + pub fn add_hook(mut self, listener: ActorFactory) -> Self { + self.factories.push(listener); + self + } + + pub fn build(self) -> Addr { + let e3r = E3RequestManager { + contexts: HashMap::new(), + factories: self.factories, + buffer: EventBuffer::default(), + }; + + let addr = e3r.start(); + self.bus + .do_send(Subscribe::new("*", addr.clone().recipient())); + addr + } +} + +impl Actor for E3RequestManager { + type Context = Context; +} + +impl Handler for E3RequestManager { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + let Some(e3_id) = msg.get_e3_id() else { + return; + }; + + let context = self.contexts.entry(e3_id).or_default(); + + for factory in &mut self.factories { + factory(context, msg.clone()); + } + + context.forward_message(&msg, &mut self.buffer); + } +} diff --git a/packages/ciphernode/core/src/fhe.rs b/packages/ciphernode/core/src/fhe.rs index 544594de..f3394c18 100644 --- a/packages/ciphernode/core/src/fhe.rs +++ b/packages/ciphernode/core/src/fhe.rs @@ -4,6 +4,7 @@ use crate::{ CiphertextSerializer, DecryptionShareSerializer, PublicKeySerializer, PublicKeyShareSerializer, SecretKeySerializer, }, + ActorFactory, CommitteeRequested, EnclaveEvent, }; use actix::{Actor, Context, Handler, Message}; use anyhow::*; @@ -182,3 +183,29 @@ impl Handler for Fhe { Ok(bincode::serialize(&decoded)?) } } + +pub struct FheFactory; + +impl FheFactory { + pub fn create(rng: Arc>) -> ActorFactory { + Box::new(move |ctx, evt| { + // Saving the fhe on Committee Requested + let EnclaveEvent::CommitteeRequested { data, .. } = evt else { + return; + }; + let CommitteeRequested { + degree, + moduli, + plaintext_modulus, + crp, + .. + } = data; + + ctx.fhe = Some( + Fhe::from_raw_params(&moduli, degree, plaintext_modulus, &crp, rng.clone()) + .unwrap() + .start(), + ); + }) + } +} diff --git a/packages/ciphernode/core/src/lib.rs b/packages/ciphernode/core/src/lib.rs index f437dec9..3131e409 100644 --- a/packages/ciphernode/core/src/lib.rs +++ b/packages/ciphernode/core/src/lib.rs @@ -3,9 +3,10 @@ // #![warn(missing_docs, unused_imports)] mod ciphernode; -mod ciphernode_orchestrator; mod ciphernode_selector; +mod committee_meta; mod data; +mod e3_request; mod enclave_contract; mod eventbus; pub mod events; @@ -16,13 +17,10 @@ mod fhe; mod logger; mod main_aggregator; mod main_ciphernode; -mod orchestrator; mod ordered_set; mod p2p; mod plaintext_aggregator; -mod plaintext_orchestrator; mod publickey_aggregator; -mod publickey_orchestrator; mod serializers; mod sortition; mod utils; @@ -30,24 +28,23 @@ mod utils; // TODO: this is too permissive pub use actix::prelude::*; pub use ciphernode::*; -pub use ciphernode_orchestrator::*; pub use ciphernode_selector::*; +pub use committee_meta::*; pub use data::*; +pub use e3_request::*; pub use eventbus::*; pub use events::*; -pub use serializers::*; pub use fhe::*; pub use logger::*; pub use main_aggregator::*; pub use main_ciphernode::*; -pub use orchestrator::*; pub use p2p::*; pub use plaintext_aggregator::*; -pub use plaintext_orchestrator::*; pub use publickey_aggregator::*; -pub use publickey_orchestrator::*; +pub use serializers::*; pub use sortition::*; pub use utils::*; + // TODO: move these out to a test folder #[cfg(test)] mod tests { @@ -57,14 +54,12 @@ mod tests { eventbus::{EventBus, GetHistory}, events::{CommitteeRequested, E3id, EnclaveEvent, KeyshareCreated, PublicKeyAggregated}, p2p::P2p, - serializers::{ - CiphertextSerializer, DecryptionShareSerializer, PublicKeySerializer, - PublicKeyShareSerializer, - }, + serializers::{CiphertextSerializer, DecryptionShareSerializer, PublicKeyShareSerializer}, utils::{setup_crp_params, ParamsWithCrp}, - CiphernodeAdded, CiphernodeOrchestrator, CiphernodeSelected, CiphertextOutputPublished, - DecryptionshareCreated, Orchestrator, PlaintextAggregated, PlaintextOrchestrator, - PublicKeyOrchestrator, ResetHistory, SharedRng, Sortition, + CiphernodeAdded, CiphernodeFactory, CiphernodeSelected, CiphertextOutputPublished, + CommitteeMetaFactory, DecryptionshareCreated, E3RequestManager, FheFactory, + PlaintextAggregated, PlaintextAggregatorFactory, PublicKeyAggregatorFactory, ResetHistory, + SharedRng, Sortition, }; use actix::prelude::*; use alloy_primitives::Address; @@ -94,18 +89,24 @@ mod tests { // create ciphernode actor for managing ciphernode flow let sortition = Sortition::attach(bus.clone()); CiphernodeSelector::attach(bus.clone(), sortition.clone(), addr); - Orchestrator::builder(bus.clone(), rng) - .public_key(PublicKeyOrchestrator::attach( + + E3RequestManager::builder(bus.clone()) + .add_hook(CommitteeMetaFactory::create()) + .add_hook(FheFactory::create(rng.clone())) + .add_hook(PublicKeyAggregatorFactory::create( bus.clone(), sortition.clone(), )) - .plaintext(PlaintextOrchestrator::attach( + .add_hook(PlaintextAggregatorFactory::create( bus.clone(), sortition.clone(), )) - .ciphernode(CiphernodeOrchestrator::attach(bus.clone(), data, addr)) - .build() - .await; + .add_hook(CiphernodeFactory::create( + bus.clone(), + data.clone(), + addr, + )) + .build(); } fn generate_pk_share( diff --git a/packages/ciphernode/core/src/main_aggregator.rs b/packages/ciphernode/core/src/main_aggregator.rs index c6076f7b..faf4ecb8 100644 --- a/packages/ciphernode/core/src/main_aggregator.rs +++ b/packages/ciphernode/core/src/main_aggregator.rs @@ -1,8 +1,8 @@ use std::sync::{Arc, Mutex}; use crate::{ - EventBus, Orchestrator, P2p, PlaintextOrchestrator, PublicKeyOrchestrator, SimpleLogger, - Sortition, + committee_meta::CommitteeMetaFactory, E3RequestManager, EventBus, FheFactory, P2p, + PlaintextAggregatorFactory, PublicKeyAggregatorFactory, SimpleLogger, Sortition, }; use actix::{Actor, Addr, Context}; use rand::SeedableRng; @@ -13,9 +13,9 @@ use tokio::task::JoinHandle; /// Suprvises all children // TODO: add supervision logic pub struct MainAggregator { + e3_manager: Addr, bus: Addr, sortition: Addr, - orchestrator: Addr, p2p: Addr, } @@ -23,13 +23,13 @@ impl MainAggregator { pub fn new( bus: Addr, sortition: Addr, - orchestrator: Addr, p2p: Addr, + e3_manager: Addr, ) -> Self { Self { + e3_manager, bus, sortition, - orchestrator, p2p, } } @@ -40,23 +40,26 @@ impl MainAggregator { )); let bus = EventBus::new(true).start(); let sortition = Sortition::attach(bus.clone()); - let orchestrator = Orchestrator::builder(bus.clone(), rng) - .public_key(PublicKeyOrchestrator::attach( + + let e3_manager = E3RequestManager::builder(bus.clone()) + .add_hook(CommitteeMetaFactory::create()) + .add_hook(FheFactory::create(rng.clone())) + .add_hook(PublicKeyAggregatorFactory::create( bus.clone(), sortition.clone(), )) - .plaintext(PlaintextOrchestrator::attach( + .add_hook(PlaintextAggregatorFactory::create( bus.clone(), sortition.clone(), )) - .build() - .await; + .build(); + let (p2p_addr, join_handle) = P2p::spawn_libp2p(bus.clone()).expect("Failed to setup libp2p"); SimpleLogger::attach("AGGREGATOR", bus.clone()); - let main_addr = MainAggregator::new(bus, sortition, orchestrator, p2p_addr).start(); + let main_addr = MainAggregator::new(bus, sortition, p2p_addr, e3_manager).start(); (main_addr, join_handle) } } diff --git a/packages/ciphernode/core/src/main_ciphernode.rs b/packages/ciphernode/core/src/main_ciphernode.rs index a7241e62..94af2618 100644 --- a/packages/ciphernode/core/src/main_ciphernode.rs +++ b/packages/ciphernode/core/src/main_ciphernode.rs @@ -1,7 +1,9 @@ use std::sync::{Arc, Mutex}; use crate::{ - CiphernodeOrchestrator, CiphernodeSelector, Data, EventBus, Orchestrator, P2p, SimpleLogger, Sortition + CiphernodeFactory, CiphernodeSelector, CommitteeMetaFactory, Data, E3RequestManager, EventBus, + FheFactory, P2p, PlaintextAggregatorFactory, PublicKeyAggregatorFactory, SimpleLogger, + Sortition, }; use actix::{Actor, Addr, Context}; use alloy_primitives::Address; @@ -18,7 +20,7 @@ pub struct MainCiphernode { data: Addr, sortition: Addr, selector: Addr, - orchestrator: Addr, + e3_manager: Addr, p2p: Addr, } @@ -29,8 +31,8 @@ impl MainCiphernode { data: Addr, sortition: Addr, selector: Addr, - orchestrator: Addr, p2p: Addr, + e3_manager: Addr, ) -> Self { Self { addr, @@ -38,7 +40,7 @@ impl MainCiphernode { data, sortition, selector, - orchestrator, + e3_manager, p2p, } } @@ -56,27 +58,22 @@ impl MainCiphernode { let sortition = Sortition::attach(bus.clone()); let selector = CiphernodeSelector::attach(bus.clone(), sortition.clone(), address); - let orchestrator = Orchestrator::builder(bus.clone(), rng) - .ciphernode(CiphernodeOrchestrator::attach( + let e3_manager = E3RequestManager::builder(bus.clone()) + .add_hook(CommitteeMetaFactory::create()) + .add_hook(FheFactory::create(rng.clone())) + .add_hook(CiphernodeFactory::create( bus.clone(), data.clone(), address, )) - .build() - .await; + .build(); let (p2p_addr, join_handle) = P2p::spawn_libp2p(bus.clone()).expect("Failed to setup libp2p"); - SimpleLogger::attach("CIPHERNODE",bus.clone()); + SimpleLogger::attach("CIPHERNODE", bus.clone()); let main_addr = MainCiphernode::new( - address, - bus, - data, - sortition, - selector, - orchestrator, - p2p_addr, + address, bus, data, sortition, selector, p2p_addr, e3_manager, ) .start(); (main_addr, join_handle) diff --git a/packages/ciphernode/core/src/orchestrator.rs b/packages/ciphernode/core/src/orchestrator.rs deleted file mode 100644 index a542c527..00000000 --- a/packages/ciphernode/core/src/orchestrator.rs +++ /dev/null @@ -1,216 +0,0 @@ -// TODO: spawn and supervise child actors -use crate::{ - CiphernodeOrchestrator, CommitteeRequested, E3id, EnclaveEvent, EventBus, Fhe, - PlaintextOrchestrator, PublicKeyOrchestrator, Subscribe, -}; -use actix::prelude::*; -use rand_chacha::ChaCha20Rng; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; - -#[derive(Message, Clone, Debug, PartialEq, Eq)] -#[rtype(result = "()")] -pub struct InitializeWithEnclaveEvent { - pub fhe: Addr, - pub meta: CommitteeMeta, - pub event: EnclaveEvent, -} - -impl InitializeWithEnclaveEvent { - pub fn new(fhe: Addr, meta: CommitteeMeta, event: EnclaveEvent) -> Self { - Self { fhe, meta, event } - } -} - -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct CommitteeMeta { - pub nodecount: usize, - pub seed: u64, -} - -pub struct OrchestratorBuilder { - bus: Addr, - rng: Arc>, - public_key: Option>, - plaintext: Option>, - ciphernode: Option>, -} - -impl OrchestratorBuilder { - pub fn new(bus: Addr, rng: Arc>) -> Self { - Self { - bus, - rng, - public_key: None, - plaintext: None, - ciphernode: None, - } - } - - pub fn public_key(mut self, value: Addr) -> Self { - self.public_key = Some(value); - self - } - - pub fn plaintext(mut self, value: Addr) -> Self { - self.plaintext = Some(value); - self - } - - pub fn ciphernode(mut self, value: Addr) -> Self { - self.ciphernode = Some(value); - self - } - - pub async fn build(self) -> Addr { - let bus = self.bus; - let rng = self.rng; - let public_key = self.public_key; - let plaintext = self.plaintext; - let ciphernode = self.ciphernode; - Orchestrator::attach(bus, rng, public_key, plaintext, ciphernode).await - } -} - -pub struct Orchestrator { - fhes: HashMap>, - meta: HashMap, - public_key: Option>, - plaintext: Option>, - ciphernode: Option>, - rng: Arc>, -} - -impl Orchestrator { - pub fn builder(bus: Addr, rng: Arc>) -> OrchestratorBuilder { - OrchestratorBuilder::new(bus, rng) - } - - async fn attach( - bus: Addr, - rng: Arc>, - public_key: Option>, - plaintext: Option>, - ciphernode: Option>, - ) -> Addr { - let addr = Orchestrator { - rng, - public_key, - plaintext, - ciphernode, - meta: HashMap::new(), - fhes: HashMap::new(), - } - .start(); - bus.send(Subscribe::new("*", addr.clone().into())) - .await - .unwrap(); - addr - } -} - -impl Actor for Orchestrator { - type Context = Context; -} - -impl Handler for Orchestrator { - type Result = (); - - fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { - let Some(e3_id) = msg.get_e3_id() else { - return; - }; - - { - match msg.clone() { - EnclaveEvent::CommitteeRequested { data, .. } => { - let CommitteeRequested { - degree, - moduli, - plaintext_modulus, - crp, - .. - } = data; - - let fhe_factory = self.fhe_factory(moduli, degree, plaintext_modulus, crp); - let fhe = self.fhes.entry(e3_id.clone()).or_insert_with(fhe_factory); - let meta = CommitteeMeta { - nodecount: data.nodecount, - seed: data.sortition_seed, - }; - self.meta.entry(e3_id.clone()).or_insert(meta.clone()); - - if let Some(addr) = self.public_key.clone() { - addr.do_send(InitializeWithEnclaveEvent { - event: msg.clone(), - fhe: fhe.clone(), - meta: meta.clone(), - }); - } - } - EnclaveEvent::CiphernodeSelected { data, .. } => { - if let Some(addr) = self.ciphernode.clone() { - if let Some(fhe) = self.fhes.get(&data.e3_id) { - if let Some(meta) = self.meta.get(&data.e3_id) { - addr.do_send(InitializeWithEnclaveEvent { - event: msg.clone(), - fhe: fhe.clone(), - meta: meta.clone(), - }); - } - } - } - } - EnclaveEvent::CiphertextOutputPublished { data, .. } => { - if let Some(plaintext) = self.plaintext.clone() { - if let Some(fhe) = self.fhes.get(&data.e3_id) { - if let Some(meta) = self.meta.get(&data.e3_id) { - plaintext.do_send(InitializeWithEnclaveEvent { - event: msg.clone(), - fhe: fhe.clone(), - meta: meta.clone(), - }); - }; - }; - }; - } - _ => (), - }; - }; - - self.forward_message(msg); - } -} - -impl Orchestrator { - fn fhe_factory( - &self, - moduli: Vec, - degree: usize, - plaintext_modulus: u64, - crp: Vec, - ) -> impl FnOnce() -> Addr { - let rng = self.rng.clone(); - move || { - Fhe::from_raw_params(&moduli, degree, plaintext_modulus, &crp, rng) - .unwrap() - .start() - } - } - - fn forward_message(&mut self, msg: EnclaveEvent) { - if let Some(addr) = self.ciphernode.clone() { - addr.do_send(msg.clone()) - } - - if let Some(addr) = self.public_key.clone() { - addr.do_send(msg.clone()) - } - - if let Some(addr) = self.plaintext.clone() { - addr.do_send(msg.clone()) - } - } -} diff --git a/packages/ciphernode/core/src/plaintext_aggregator.rs b/packages/ciphernode/core/src/plaintext_aggregator.rs index 75c7c2ec..618e2088 100644 --- a/packages/ciphernode/core/src/plaintext_aggregator.rs +++ b/packages/ciphernode/core/src/plaintext_aggregator.rs @@ -1,6 +1,6 @@ use crate::{ - ordered_set::OrderedSet, DecryptionshareCreated, E3id, EnclaveEvent, EventBus, Fhe, - GetAggregatePlaintext, GetHasNode, PlaintextAggregated, Sortition, + ordered_set::OrderedSet, ActorFactory, DecryptionshareCreated, E3id, EnclaveEvent, EventBus, + Fhe, GetAggregatePlaintext, GetHasNode, PlaintextAggregated, Sortition, }; use actix::prelude::*; use anyhow::{anyhow, Result}; @@ -178,3 +178,32 @@ impl Handler for PlaintextAggregator { ) } } + +pub struct PlaintextAggregatorFactory; +impl PlaintextAggregatorFactory { + pub fn create(bus: Addr, sortition: Addr) -> ActorFactory { + Box::new(move |ctx, evt| { + // Save plaintext aggregator + let EnclaveEvent::CiphertextOutputPublished { data, .. } = evt else { + return; + }; + let Some(ref fhe) = ctx.fhe else { + return; + }; + let Some(ref meta) = ctx.meta else { + return; + }; + ctx.plaintext = Some( + PlaintextAggregator::new( + fhe.clone(), + bus.clone(), + sortition.clone(), + data.e3_id, + meta.nodecount, + meta.seed, + ) + .start(), + ); + }) + } +} diff --git a/packages/ciphernode/core/src/plaintext_orchestrator.rs b/packages/ciphernode/core/src/plaintext_orchestrator.rs deleted file mode 100644 index db8d48a5..00000000 --- a/packages/ciphernode/core/src/plaintext_orchestrator.rs +++ /dev/null @@ -1,101 +0,0 @@ -// TODO: spawn and supervise child actors -use crate::{ - CommitteeMeta, E3id, EnclaveEvent, EventBus, Fhe, InitializeWithEnclaveEvent, - PlaintextAggregator, Sortition, -}; -use actix::prelude::*; -use std::collections::HashMap; - -pub struct PlaintextOrchestrator { - bus: Addr, - sortition: Addr, - buffers: HashMap>, - plaintexts: HashMap>, -} - -impl PlaintextOrchestrator { - pub fn new(bus: Addr, sortition: Addr) -> Self { - Self { - bus, - sortition, - plaintexts: HashMap::new(), - buffers: HashMap::new(), - } - } - - pub fn attach(bus: Addr, sortition: Addr) -> Addr { - PlaintextOrchestrator::new(bus.clone(), sortition).start() - } -} - -impl Actor for PlaintextOrchestrator { - type Context = Context; -} - -impl Handler for PlaintextOrchestrator { - type Result = (); - fn handle(&mut self, msg: InitializeWithEnclaveEvent, _: &mut Self::Context) -> Self::Result { - let InitializeWithEnclaveEvent { fhe, meta, event } = msg; - let EnclaveEvent::CiphertextOutputPublished { data, .. } = event else { - return; - }; - - let plaintext_factory = - self.plaintext_factory(data.e3_id.clone(), meta.clone(), fhe.clone()); - - self.plaintexts - .entry(data.e3_id) - .or_insert_with(plaintext_factory); - } -} - -impl Handler for PlaintextOrchestrator { - type Result = (); - - fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { - let Some(e3_id) = msg.get_e3_id() else { - return; - }; - self.forward_message(&e3_id, msg); - } -} - -impl PlaintextOrchestrator { - fn plaintext_factory( - &self, - e3_id: E3id, - meta: CommitteeMeta, - fhe: Addr, - ) -> impl FnOnce() -> Addr { - let bus = self.bus.clone(); - let sortition = self.sortition.clone(); - let nodecount = meta.nodecount; - let seed = meta.seed; - move || PlaintextAggregator::new(fhe, bus, sortition, e3_id, nodecount, seed).start() - } - - fn store_msg(&mut self, e3_id: E3id, msg: EnclaveEvent) { - self.buffers.entry(e3_id).or_default().push(msg); - } - - fn take_msgs(&mut self, e3_id: E3id) -> Vec { - self.buffers - .get_mut(&e3_id) - .map(std::mem::take) - .unwrap_or_default() - } - - fn forward_message(&mut self, e3_id: &E3id, msg: EnclaveEvent) { - // Buffer events for each thing that has not been created - if let Some(act) = self.plaintexts.clone().get(e3_id) { - let msgs = self.take_msgs(e3_id.clone()); - let recipient = act.clone().recipient(); - recipient.do_send(msg.clone()); - for m in msgs { - recipient.do_send(m); - } - } else { - self.store_msg(e3_id.clone(), msg.clone()); - } - } -} diff --git a/packages/ciphernode/core/src/publickey_aggregator.rs b/packages/ciphernode/core/src/publickey_aggregator.rs index f6bc761c..2b09ae85 100644 --- a/packages/ciphernode/core/src/publickey_aggregator.rs +++ b/packages/ciphernode/core/src/publickey_aggregator.rs @@ -3,7 +3,7 @@ use crate::{ events::{E3id, EnclaveEvent, KeyshareCreated, PublicKeyAggregated}, fhe::{Fhe, GetAggregatePublicKey}, ordered_set::OrderedSet, - GetHasNode, Sortition, + ActorFactory, GetHasNode, Sortition, }; use actix::prelude::*; use anyhow::Result; @@ -207,3 +207,34 @@ impl Handler for PublicKeyAggregator { ) } } + +pub struct PublicKeyAggregatorFactory; +impl PublicKeyAggregatorFactory { + pub fn create(bus: Addr, sortition: Addr) -> ActorFactory { + Box::new(move |ctx, evt| { + // Saving the publickey aggregator with deps on CommitteeRequested + let EnclaveEvent::CommitteeRequested { data, .. } = evt else { + return; + }; + + let Some(ref fhe) = ctx.fhe else { + return; + }; + let Some(ref meta) = ctx.meta else { + return; + }; + + ctx.publickey = Some( + PublicKeyAggregator::new( + fhe.clone(), + bus.clone(), + sortition.clone(), + data.e3_id, + meta.nodecount, + meta.seed, + ) + .start(), + ); + }) + } +} diff --git a/packages/ciphernode/core/src/publickey_orchestrator.rs b/packages/ciphernode/core/src/publickey_orchestrator.rs deleted file mode 100644 index c6930467..00000000 --- a/packages/ciphernode/core/src/publickey_orchestrator.rs +++ /dev/null @@ -1,102 +0,0 @@ -// TODO: spawn and supervise child actors -use crate::{ - E3id, EnclaveEvent, EventBus, Fhe, InitializeWithEnclaveEvent, PublicKeyAggregator, Sortition, -}; -use actix::prelude::*; -use std::collections::HashMap; - -pub struct PublicKeyOrchestrator { - bus: Addr, - sortition: Addr, - buffers: HashMap>, - public_keys: HashMap>, -} - -impl PublicKeyOrchestrator { - pub fn new(bus: Addr, sortition: Addr) -> Self { - Self { - bus, - sortition, - public_keys: HashMap::new(), - buffers: HashMap::new(), - } - } - - pub fn attach(bus: Addr, sortition: Addr) -> Addr { - PublicKeyOrchestrator::new(bus.clone(), sortition).start() - } -} - -impl Actor for PublicKeyOrchestrator { - type Context = Context; -} - -impl Handler for PublicKeyOrchestrator { - type Result = (); - fn handle(&mut self, msg: InitializeWithEnclaveEvent, _: &mut Self::Context) -> Self::Result { - let InitializeWithEnclaveEvent { fhe, event, .. } = msg; - let EnclaveEvent::CommitteeRequested { data, .. } = event else { - return; - }; - - let public_key_factory = self.public_key_factory( - fhe.clone(), - data.e3_id.clone(), - data.nodecount, - data.sortition_seed, - ); - self.public_keys - .entry(data.e3_id) - .or_insert_with(public_key_factory); - } -} - -impl Handler for PublicKeyOrchestrator { - type Result = (); - - fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { - let Some(e3_id) = msg.get_e3_id() else { - return; - }; - self.forward_message(&e3_id, msg); - } -} - -impl PublicKeyOrchestrator { - fn public_key_factory( - &self, - fhe: Addr, - e3_id: E3id, - nodecount: usize, - seed: u64, - ) -> impl FnOnce() -> Addr { - let bus = self.bus.clone(); - let sortition = self.sortition.clone(); - move || PublicKeyAggregator::new(fhe, bus, sortition, e3_id, nodecount, seed).start() - } - - fn store_msg(&mut self, e3_id: E3id, msg: EnclaveEvent) { - self.buffers.entry(e3_id).or_default().push(msg); - } - - fn take_msgs(&mut self, e3_id: E3id) -> Vec { - self.buffers - .get_mut(&e3_id) - .map(std::mem::take) - .unwrap_or_default() - } - - fn forward_message(&mut self, e3_id: &E3id, msg: EnclaveEvent) { - // Buffer events for each thing that has not been created - if let Some(act) = self.public_keys.clone().get(e3_id) { - let msgs = self.take_msgs(e3_id.clone()); - let recipient = act.clone().recipient(); - recipient.do_send(msg.clone()); - for m in msgs { - recipient.do_send(m); - } - } else { - self.store_msg(e3_id.clone(), msg.clone()); - } - } -}