From bd4852990c382213a8db55a1ef7d25e36a27626b Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 13 Sep 2024 23:44:14 +1000 Subject: [PATCH] Order events without sequencers --- .../core/src/ciphernode_sequencer.rs | 44 --------- packages/ciphernode/core/src/lib.rs | 6 -- .../core/src/plaintext_sequencer.rs | 56 ----------- .../core/src/publickey_sequencer.rs | 50 ---------- packages/ciphernode/core/src/registry.rs | 95 +++++++++++++------ 5 files changed, 68 insertions(+), 183 deletions(-) delete mode 100644 packages/ciphernode/core/src/ciphernode_sequencer.rs delete mode 100644 packages/ciphernode/core/src/plaintext_sequencer.rs delete mode 100644 packages/ciphernode/core/src/publickey_sequencer.rs diff --git a/packages/ciphernode/core/src/ciphernode_sequencer.rs b/packages/ciphernode/core/src/ciphernode_sequencer.rs deleted file mode 100644 index 591e955d..00000000 --- a/packages/ciphernode/core/src/ciphernode_sequencer.rs +++ /dev/null @@ -1,44 +0,0 @@ -// sequence and persist events for a single E3 request in the correct order -// TODO: spawn and store a ciphernode upon start and forward all events to it in order -// TODO: if the ciphernode fails restart the node by replaying all stored events back to it - -use actix::prelude::*; -use alloy_primitives::Address; - -use crate::{Ciphernode, Data, EnclaveEvent, EventBus, Fhe}; - -pub struct CiphernodeSequencer { - fhe: Addr, - data: Addr, - bus: Addr, - child: Option>, - address: Address, -} -impl CiphernodeSequencer { - pub fn new(fhe: Addr, data: Addr, bus: Addr, address:Address) -> Self { - Self { - fhe, - bus, - data, - child: None, - address - } - } -} -impl Actor for CiphernodeSequencer { - type Context = Context; -} - -impl Handler for CiphernodeSequencer { - type Result = (); - fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { - let bus = self.bus.clone(); - let fhe = self.fhe.clone(); - let data = self.data.clone(); - let address = self.address; - let sink = self - .child - .get_or_insert_with(|| Ciphernode::new(bus, fhe, data, address).start()); - sink.do_send(msg); - } -} diff --git a/packages/ciphernode/core/src/lib.rs b/packages/ciphernode/core/src/lib.rs index 96d3e1f7..556cde12 100644 --- a/packages/ciphernode/core/src/lib.rs +++ b/packages/ciphernode/core/src/lib.rs @@ -4,7 +4,6 @@ mod ciphernode; mod ciphernode_selector; -mod ciphernode_sequencer; mod data; mod enclave_contract; mod eventbus; @@ -14,9 +13,7 @@ mod logger; mod ordered_set; mod p2p; mod plaintext_aggregator; -mod plaintext_sequencer; mod publickey_aggregator; -mod publickey_sequencer; mod registry; mod serializers; mod sortition; @@ -25,7 +22,6 @@ mod sortition; pub use actix::prelude::*; pub use ciphernode::*; pub use ciphernode_selector::*; -pub use ciphernode_sequencer::*; pub use data::*; pub use eventbus::*; pub use events::*; @@ -33,9 +29,7 @@ pub use fhe::*; pub use logger::*; pub use p2p::*; pub use plaintext_aggregator::*; -pub use plaintext_sequencer::*; pub use publickey_aggregator::*; -pub use publickey_sequencer::*; pub use registry::*; pub use sortition::*; diff --git a/packages/ciphernode/core/src/plaintext_sequencer.rs b/packages/ciphernode/core/src/plaintext_sequencer.rs deleted file mode 100644 index bf548e85..00000000 --- a/packages/ciphernode/core/src/plaintext_sequencer.rs +++ /dev/null @@ -1,56 +0,0 @@ -// sequence and persist events for a single E3 request in the correct order -// TODO: spawn and store a ciphernode upon start and forward all events to it in order -// TODO: if the ciphernode fails restart the node by replaying all stored events back to it - -use actix::prelude::*; - -use crate::{E3id, EnclaveEvent, EventBus, Fhe, PlaintextAggregator, Sortition}; - -pub struct PlaintextSequencer { - fhe: Addr, - e3_id: E3id, - bus: Addr, - sortition: Addr, - nodecount: usize, - seed: u64, - child: Option>, -} -impl PlaintextSequencer { - pub fn new( - fhe: Addr, - e3_id: E3id, - bus: Addr, - sortition: Addr, - nodecount: usize, - seed: u64, - ) -> Self { - Self { - fhe, - e3_id, - bus, - sortition, - seed, - nodecount, - child: None, - } - } -} -impl Actor for PlaintextSequencer { - type Context = Context; -} - -impl Handler for PlaintextSequencer { - type Result = (); - fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { - let fhe = self.fhe.clone(); - let bus = self.bus.clone(); - let sortition = self.sortition.clone(); - let nodecount = self.nodecount; - let e3_id = self.e3_id.clone(); - let seed = self.seed; - let sink = self.child.get_or_insert_with(|| { - PlaintextAggregator::new(fhe, bus, sortition, e3_id, nodecount, seed).start() - }); - sink.do_send(msg); - } -} diff --git a/packages/ciphernode/core/src/publickey_sequencer.rs b/packages/ciphernode/core/src/publickey_sequencer.rs deleted file mode 100644 index e43aaf51..00000000 --- a/packages/ciphernode/core/src/publickey_sequencer.rs +++ /dev/null @@ -1,50 +0,0 @@ -// sequence and persist events for a single E3 request in the correct order -// TODO: if the sequencer fails restart the node by replaying all stored events back to it - -use actix::prelude::*; - -use crate::{E3id, EnclaveEvent, EventBus, Fhe, PublicKeyAggregator, Sortition}; - -pub struct PublicKeySequencer { - fhe: Addr, - e3_id: E3id, - bus: Addr, - sortition:Addr, - nodecount: usize, - seed: u64, - child: Option>, -} - -impl PublicKeySequencer { - pub fn new(fhe: Addr, e3_id: E3id, sortition:Addr,bus: Addr, nodecount: usize, seed:u64) -> Self { - Self { - fhe, - e3_id, - bus, - sortition, - seed, - nodecount, - child: None, - } - } -} - -impl Actor for PublicKeySequencer { - type Context = Context; -} - -impl Handler for PublicKeySequencer { - type Result = (); - fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { - let fhe = self.fhe.clone(); - let bus = self.bus.clone(); - let sortition = self.sortition.clone(); - let nodecount = self.nodecount; - let e3_id = self.e3_id.clone(); - let seed = self.seed; - let dest = self - .child - .get_or_insert_with(|| PublicKeyAggregator::new(fhe, bus, sortition, e3_id, nodecount, seed).start()); - dest.do_send(msg); - } -} diff --git a/packages/ciphernode/core/src/registry.rs b/packages/ciphernode/core/src/registry.rs index 440a1306..cbc392d8 100644 --- a/packages/ciphernode/core/src/registry.rs +++ b/packages/ciphernode/core/src/registry.rs @@ -2,8 +2,9 @@ // TODO: vertically modularize this so there is a registry for each function that get rolled up into one based // on config use crate::{ - CiphernodeSequencer, CommitteeRequested, Data, E3id, EnclaveEvent, EventBus, Fhe, - PlaintextSequencer, PublicKeySequencer, Sortition, Subscribe, + Ciphernode, CommitteeRequested, Data, E3id, EnclaveEvent, EventBus, Fhe, + PlaintextAggregator, PublicKeyAggregator, Sortition, + Subscribe, }; use actix::prelude::*; use alloy_primitives::Address; @@ -21,14 +22,15 @@ struct CommitteeMeta { pub struct Registry { bus: Addr, - ciphernodes: HashMap>, + ciphernodes: HashMap>, data: Addr, sortition: Addr, address: Address, fhes: HashMap>, - plaintexts: HashMap>, + plaintexts: HashMap>, + buffers: HashMap>>, meta: HashMap, - public_keys: HashMap>, + public_keys: HashMap>, rng: Arc>, } @@ -49,6 +51,7 @@ impl Registry { ciphernodes: HashMap::new(), plaintexts: HashMap::new(), public_keys: HashMap::new(), + buffers: HashMap::new(), meta: HashMap::new(), fhes: HashMap::new(), } @@ -101,16 +104,16 @@ impl Handler for Registry { self.meta.entry(e3_id.clone()).or_insert(meta.clone()); - let public_key_sequencer_factory = self.public_key_sequencer_factory( + let public_key_factory = self.public_key_factory( e3_id.clone(), meta.clone(), fhe.clone(), sortition_seed, ); - store(&e3_id, &mut self.public_keys, public_key_sequencer_factory); + store(&e3_id, &mut self.public_keys, public_key_factory); - let ciphernode_sequencer_factory = self.ciphernode_sequencer_factory(fhe.clone()); - store(&e3_id, &mut self.ciphernodes, ciphernode_sequencer_factory); + let ciphernode_factory = self.ciphernode_factory(fhe.clone()); + store(&e3_id, &mut self.ciphernodes, ciphernode_factory); } EnclaveEvent::CiphertextOutputPublished { .. } => { let Some(fhe) = self.fhes.get(&e3_id) else { @@ -121,9 +124,9 @@ impl Handler for Registry { return; }; - let plaintext_sequencer_factory = - self.plaintext_sequencer_factory(e3_id.clone(), meta.clone(), fhe.clone()); - store(&e3_id, &mut self.plaintexts, plaintext_sequencer_factory); + let plaintext_factory = + self.plaintext_factory(e3_id.clone(), meta.clone(), fhe.clone()); + store(&e3_id, &mut self.plaintexts, plaintext_factory); } _ => (), }; @@ -148,53 +151,91 @@ impl Registry { } } - fn public_key_sequencer_factory( + fn public_key_factory( &self, e3_id: E3id, meta: CommitteeMeta, fhe: Addr, seed: u64, - ) -> impl FnOnce() -> Addr { + ) -> impl FnOnce() -> Addr { let bus = self.bus.clone(); let nodecount = meta.nodecount; let sortition = self.sortition.clone(); - move || PublicKeySequencer::new(fhe, e3_id, sortition, bus, nodecount, seed).start() + move || PublicKeyAggregator::new(fhe, bus, sortition, e3_id, nodecount, seed).start() } - fn ciphernode_sequencer_factory( - &self, - fhe: Addr, - ) -> impl FnOnce() -> Addr { + fn ciphernode_factory(&self, fhe: Addr) -> impl FnOnce() -> Addr { let data = self.data.clone(); let bus = self.bus.clone(); let address = self.address; - move || CiphernodeSequencer::new(fhe, data, bus, address).start() + move || Ciphernode::new(bus, fhe, data, address).start() } - fn plaintext_sequencer_factory( + fn plaintext_factory( &self, e3_id: E3id, meta: CommitteeMeta, fhe: Addr, - ) -> impl FnOnce() -> Addr { + ) -> impl FnOnce() -> Addr { let bus = self.bus.clone(); let sortition = self.sortition.clone(); let nodecount = meta.nodecount; let seed = meta.seed; - move || PlaintextSequencer::new(fhe, e3_id, bus, sortition, nodecount, seed).start() + move || PlaintextAggregator::new(fhe, bus, sortition, e3_id, nodecount, seed).start() } - fn forward_message(&self, e3_id: &E3id, msg: EnclaveEvent) { + fn store_msg(&mut self, e3_id: E3id, msg: EnclaveEvent, key: &str) { + self.buffers + .entry(e3_id) + .or_default() + .entry(key.to_owned()) + .or_default() + .push(msg); + } + + fn get_msgs(&self, e3_id: E3id, key: &str) -> Vec { + self.buffers + .get(&e3_id) + .and_then(|inner_map| inner_map.get(key)) + .cloned() + .unwrap_or_default() + } + + fn forward_message(&mut self, e3_id: &E3id, msg: EnclaveEvent) { + // Buffer events for each thing that has not been created + // TODO: Needs tidying up + // TODO: use an enum for the buffer keys if let Some(act) = self.public_keys.get(e3_id) { - act.clone().recipient().do_send(msg.clone()); + let msgs = self.get_msgs(e3_id.clone(), "public_keys"); + let recipient = act.clone().recipient(); + for m in msgs { + recipient.do_send(m); + } + recipient.do_send(msg.clone()); + } else { + self.store_msg(e3_id.clone(), msg.clone(), "public_keys"); } if let Some(act) = self.plaintexts.get(e3_id) { - act.do_send(msg.clone()); + let msgs = self.get_msgs(e3_id.clone(), "plaintexts"); + let recipient = act.clone().recipient(); + for m in msgs { + recipient.do_send(m); + } + recipient.do_send(msg.clone()); + } else { + self.store_msg(e3_id.clone(), msg.clone(), "plaintexts"); } if let Some(act) = self.ciphernodes.get(e3_id) { - act.do_send(msg.clone()); + let msgs = self.get_msgs(e3_id.clone(), "ciphernodes"); + let recipient = act.clone().recipient(); + for m in msgs { + recipient.do_send(m); + } + recipient.do_send(msg.clone()); + } else { + self.store_msg(e3_id.clone(), msg.clone(), "ciphernodes"); } } }