From d76aac6ce3031eeaa30ac76a44b018c91f46a681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=B3=CE=BB?= Date: Fri, 13 Sep 2024 22:15:37 +1000 Subject: [PATCH] Add Sortition Module to Plaintext Aggregator (#60) --- .../core/src/plaintext_aggregator.rs | 95 +++++++++++++------ .../core/src/plaintext_sequencer.rs | 25 +++-- .../core/src/publickey_aggregator.rs | 7 +- packages/ciphernode/core/src/registry.rs | 14 ++- 4 files changed, 98 insertions(+), 43 deletions(-) diff --git a/packages/ciphernode/core/src/plaintext_aggregator.rs b/packages/ciphernode/core/src/plaintext_aggregator.rs index 29b88960..75c7c2ec 100644 --- a/packages/ciphernode/core/src/plaintext_aggregator.rs +++ b/packages/ciphernode/core/src/plaintext_aggregator.rs @@ -1,5 +1,6 @@ use crate::{ - ordered_set::OrderedSet, PlaintextAggregated, DecryptionshareCreated, E3id, EnclaveEvent, EventBus, Fhe, GetAggregatePlaintext + ordered_set::OrderedSet, DecryptionshareCreated, E3id, EnclaveEvent, EventBus, Fhe, + GetAggregatePlaintext, GetHasNode, PlaintextAggregated, Sortition, }; use actix::prelude::*; use anyhow::{anyhow, Result}; @@ -9,6 +10,7 @@ pub enum PlaintextAggregatorState { Collecting { nodecount: usize, shares: OrderedSet>, + seed: u64, }, Computing { shares: OrderedSet>, @@ -28,25 +30,38 @@ struct ComputeAggregate { pub struct PlaintextAggregator { fhe: Addr, bus: Addr, + sortition: Addr, e3_id: E3id, state: PlaintextAggregatorState, } impl PlaintextAggregator { - pub fn new(fhe: Addr, bus: Addr, e3_id: E3id, nodecount: usize) -> Self { + pub fn new( + fhe: Addr, + bus: Addr, + sortition: Addr, + e3_id: E3id, + nodecount: usize, + seed: u64, + ) -> Self { PlaintextAggregator { fhe, bus, + sortition, e3_id, state: PlaintextAggregatorState::Collecting { nodecount, shares: OrderedSet::new(), + seed, }, } } pub fn add_share(&mut self, share: Vec) -> Result { - let PlaintextAggregatorState::Collecting { nodecount, shares } = &mut self.state else { + let PlaintextAggregatorState::Collecting { + nodecount, shares, .. + } = &mut self.state + else { return Err(anyhow::anyhow!("Can only add share in Collecting state")); }; @@ -78,44 +93,68 @@ impl Actor for PlaintextAggregator { impl Handler for PlaintextAggregator { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - match msg { - EnclaveEvent::DecryptionshareCreated { data, .. } => ctx.notify(data), - _ => () + if let EnclaveEvent::DecryptionshareCreated { data, .. } = msg { + ctx.notify(data) } } } impl Handler for PlaintextAggregator { - type Result = Result<()>; - fn handle(&mut self, event: DecryptionshareCreated, ctx: &mut Self::Context) -> Self::Result { - if event.e3_id != self.e3_id { - return Err(anyhow!( - "Wrong e3_id sent to aggregator. This should not happen." - )); - } - let PlaintextAggregatorState::Collecting { .. } = self.state else { - return Err(anyhow!( - "Aggregator has been closed for collecting keyshares." - )); + type Result = ResponseActFuture>; + + fn handle(&mut self, event: DecryptionshareCreated, _: &mut Self::Context) -> Self::Result { + let PlaintextAggregatorState::Collecting { + nodecount, seed, .. + } = self.state + else { + println!("Aggregator has been closed for collecting."); + return Box::pin(fut::ready(Ok(()))); }; - // add the keyshare and - self.state = self.add_share(event.decryption_share)?; + let size = nodecount; + let address = event.node; + let e3_id = event.e3_id.clone(); + let decryption_share = event.decryption_share.clone(); - // Check the state and if it has changed to the computing - if let PlaintextAggregatorState::Computing { shares } = &self.state { - ctx.notify(ComputeAggregate { - shares: shares.clone(), - }) - } + Box::pin( + self.sortition + .send(GetHasNode { + address, + size, + seed, + }) + .into_actor(self) + .map(move |res, act, ctx| { + let has_node = res?; + if !has_node { + println!("Node not found in committee"); // TODO: log properly + return Ok(()); + } + + if e3_id != act.e3_id { + println!("Wrong e3_id sent to aggregator. This should not happen."); + return Ok(()); + } + + // add the keyshare and + act.state = act.add_share(decryption_share)?; + + // Check the state and if it has changed to the computing + if let PlaintextAggregatorState::Computing { shares } = &act.state { + ctx.notify(ComputeAggregate { + shares: shares.clone(), + }) + } - Ok(()) + Ok(()) + }), + ) } } impl Handler for PlaintextAggregator { type Result = ResponseActFuture>; - fn handle(&mut self, msg: ComputeAggregate, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: ComputeAggregate, _: &mut Self::Context) -> Self::Result { Box::pin( self.fhe .send(GetAggregatePlaintext { @@ -139,5 +178,3 @@ impl Handler for PlaintextAggregator { ) } } - - diff --git a/packages/ciphernode/core/src/plaintext_sequencer.rs b/packages/ciphernode/core/src/plaintext_sequencer.rs index 87de9c5e..bf548e85 100644 --- a/packages/ciphernode/core/src/plaintext_sequencer.rs +++ b/packages/ciphernode/core/src/plaintext_sequencer.rs @@ -4,21 +4,32 @@ use actix::prelude::*; -use crate::{E3id, EnclaveEvent, EventBus, Fhe, PlaintextAggregator}; +use crate::{E3id, EnclaveEvent, EventBus, Fhe, PlaintextAggregator, Sortition}; pub struct PlaintextSequencer { fhe: Addr, e3_id: E3id, bus: Addr, + sortition: Addr, nodecount: usize, + seed: u64, child: Option>, } impl PlaintextSequencer { - pub fn new(fhe: Addr, e3_id: E3id, bus: Addr, nodecount: usize) -> Self { + pub fn new( + fhe: Addr, + e3_id: E3id, + bus: Addr, + sortition: Addr, + nodecount: usize, + seed: u64, + ) -> Self { Self { fhe, e3_id, bus, + sortition, + seed, nodecount, child: None, } @@ -30,14 +41,16 @@ impl Actor for PlaintextSequencer { impl Handler for PlaintextSequencer { type Result = (); - fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { let fhe = self.fhe.clone(); let bus = self.bus.clone(); + let sortition = self.sortition.clone(); let nodecount = self.nodecount; let e3_id = self.e3_id.clone(); - let sink = self - .child - .get_or_insert_with(|| PlaintextAggregator::new(fhe, bus, e3_id, nodecount).start()); + let seed = self.seed; + let sink = self.child.get_or_insert_with(|| { + PlaintextAggregator::new(fhe, bus, sortition, e3_id, nodecount, seed).start() + }); sink.do_send(msg); } } diff --git a/packages/ciphernode/core/src/publickey_aggregator.rs b/packages/ciphernode/core/src/publickey_aggregator.rs index b287309c..f6bc761c 100644 --- a/packages/ciphernode/core/src/publickey_aggregator.rs +++ b/packages/ciphernode/core/src/publickey_aggregator.rs @@ -121,6 +121,8 @@ impl Handler for PublicKeyAggregator { nodecount, seed, .. } = self.state.clone() else { + println!("Aggregator has been closed for collecting keyshares."); // TODO: log properly + return Box::pin(fut::ready(Ok(()))); }; @@ -151,11 +153,6 @@ impl Handler for PublicKeyAggregator { return Ok(()); } - let PublicKeyAggregatorState::Collecting { .. } = act.state else { - println!("Aggregator has been closed for collecting keyshares."); // TODO: log properly - return Ok(()); - }; - // add the keyshare and act.state = act.add_keyshare(pubkey)?; diff --git a/packages/ciphernode/core/src/registry.rs b/packages/ciphernode/core/src/registry.rs index 81ca54fb..440a1306 100644 --- a/packages/ciphernode/core/src/registry.rs +++ b/packages/ciphernode/core/src/registry.rs @@ -16,6 +16,7 @@ use std::{ #[derive(Clone)] struct CommitteeMeta { nodecount: usize, + seed: u64, } pub struct Registry { @@ -95,12 +96,17 @@ impl Handler for Registry { let fhe = store(&e3_id, &mut self.fhes, fhe_factory); let meta = CommitteeMeta { nodecount: data.nodecount, + seed: data.sortition_seed, }; self.meta.entry(e3_id.clone()).or_insert(meta.clone()); - let public_key_sequencer_factory = - self.public_key_sequencer_factory(e3_id.clone(), meta.clone(), fhe.clone(), sortition_seed); + let public_key_sequencer_factory = self.public_key_sequencer_factory( + e3_id.clone(), + meta.clone(), + fhe.clone(), + sortition_seed, + ); store(&e3_id, &mut self.public_keys, public_key_sequencer_factory); let ciphernode_sequencer_factory = self.ciphernode_sequencer_factory(fhe.clone()); @@ -172,8 +178,10 @@ impl Registry { fhe: Addr, ) -> impl FnOnce() -> Addr { let bus = self.bus.clone(); + let sortition = self.sortition.clone(); let nodecount = meta.nodecount; - move || PlaintextSequencer::new(fhe, e3_id, bus, nodecount).start() + let seed = meta.seed; + move || PlaintextSequencer::new(fhe, e3_id, bus, sortition, nodecount, seed).start() } fn forward_message(&self, e3_id: &E3id, msg: EnclaveEvent) {