diff --git a/packages/ciphernode/core/src/ciphernode.rs b/packages/ciphernode/core/src/ciphernode.rs index 99ebd6a3..2e7bc0bf 100644 --- a/packages/ciphernode/core/src/ciphernode.rs +++ b/packages/ciphernode/core/src/ciphernode.rs @@ -7,12 +7,14 @@ use crate::{ Subscribe, }; use actix::prelude::*; +use alloy_primitives::Address; use anyhow::Result; pub struct Ciphernode { fhe: Addr, data: Addr, bus: Addr, + address: Address } impl Actor for Ciphernode { @@ -20,12 +22,12 @@ impl Actor for Ciphernode { } impl Ciphernode { - pub fn new(bus: Addr, fhe: Addr, data: Addr) -> Self { - Self { bus, fhe, data } + pub fn new(bus: Addr, fhe: Addr, data: Addr, address: Address) -> Self { + Self { bus, fhe, data, address } } - pub async fn attach(bus: Addr, fhe: Addr, data: Addr) -> Addr { - let node = Ciphernode::new(bus.clone(), fhe, data).start(); + pub async fn attach(bus: Addr, fhe: Addr, data: Addr, address: Address) -> Addr { + let node = Ciphernode::new(bus.clone(), fhe, data, address).start(); let _ = bus .send(Subscribe::new("CiphernodeSelected", node.clone().into())) .await; @@ -58,7 +60,8 @@ impl Handler for Ciphernode { let fhe = self.fhe.clone(); let data = self.data.clone(); let bus = self.bus.clone(); - Box::pin(async { on_ciphernode_selected(fhe, data, bus, event).await.unwrap() }) + let address = self.address; + Box::pin(async move { on_ciphernode_selected(fhe, data, bus, event, address).await.unwrap() }) } } @@ -69,8 +72,9 @@ impl Handler for Ciphernode { let fhe = self.fhe.clone(); let data = self.data.clone(); let bus = self.bus.clone(); - Box::pin(async { - on_decryption_requested(fhe, data, bus, event) + let address = self.address; + Box::pin(async move { + on_decryption_requested(fhe, data, bus, event, address) .await .unwrap() }) @@ -82,6 +86,7 @@ async fn on_ciphernode_selected( data: Addr, bus: Addr, event: CiphernodeSelected, + address: Address ) -> Result<()> { let CiphernodeSelected { e3_id, .. } = event; // generate keyshare @@ -98,7 +103,7 @@ async fn on_ciphernode_selected( data.do_send(Insert(format!("{}/pk", e3_id).into(), pubkey.clone())); // broadcast the KeyshareCreated message - let event = EnclaveEvent::from(KeyshareCreated { pubkey, e3_id }); + let event = EnclaveEvent::from(KeyshareCreated { pubkey, e3_id, node: address }); bus.do_send(event); Ok(()) @@ -109,6 +114,7 @@ async fn on_decryption_requested( data: Addr, bus: Addr, event: CiphertextOutputPublished, + address: Address ) -> Result<()> { let CiphertextOutputPublished { e3_id, @@ -130,6 +136,7 @@ async fn on_decryption_requested( let event = EnclaveEvent::from(DecryptionshareCreated { e3_id, decryption_share, + node: address }); bus.do_send(event); diff --git a/packages/ciphernode/core/src/ciphernode_sequencer.rs b/packages/ciphernode/core/src/ciphernode_sequencer.rs index ae3b48c9..591e955d 100644 --- a/packages/ciphernode/core/src/ciphernode_sequencer.rs +++ b/packages/ciphernode/core/src/ciphernode_sequencer.rs @@ -3,6 +3,7 @@ // TODO: if the ciphernode fails restart the node by replaying all stored events back to it use actix::prelude::*; +use alloy_primitives::Address; use crate::{Ciphernode, Data, EnclaveEvent, EventBus, Fhe}; @@ -11,14 +12,16 @@ pub struct CiphernodeSequencer { data: Addr, bus: Addr, child: Option>, + address: Address, } impl CiphernodeSequencer { - pub fn new(fhe: Addr, data: Addr, bus: Addr) -> Self { + pub fn new(fhe: Addr, data: Addr, bus: Addr, address:Address) -> Self { Self { fhe, bus, data, child: None, + address } } } @@ -32,9 +35,10 @@ impl Handler for CiphernodeSequencer { let bus = self.bus.clone(); let fhe = self.fhe.clone(); let data = self.data.clone(); + let address = self.address; let sink = self .child - .get_or_insert_with(|| Ciphernode::new(bus, fhe, data).start()); + .get_or_insert_with(|| Ciphernode::new(bus, fhe, data, address).start()); sink.do_send(msg); } } diff --git a/packages/ciphernode/core/src/events.rs b/packages/ciphernode/core/src/events.rs index 09b66e35..eb6ef6bd 100644 --- a/packages/ciphernode/core/src/events.rs +++ b/packages/ciphernode/core/src/events.rs @@ -246,6 +246,7 @@ impl fmt::Display for EnclaveEvent { pub struct KeyshareCreated { pub pubkey: Vec, pub e3_id: E3id, + pub node: Address } #[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -253,6 +254,7 @@ pub struct KeyshareCreated { pub struct DecryptionshareCreated { pub decryption_share: Vec, pub e3_id: E3id, + pub node: Address } #[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -343,6 +345,7 @@ mod tests { events::extract_enclave_event_name, serializers::PublicKeyShareSerializer, E3id, KeyshareCreated, }; + use alloy_primitives::address; use fhe::{ bfv::{BfvParametersBuilder, SecretKey}, mbfv::{CommonRandomPoly, PublicKeyShare}, @@ -381,6 +384,7 @@ mod tests { let kse = EnclaveEvent::from(KeyshareCreated { e3_id: E3id::from(1001), pubkey, + node: address!("d8dA6BF26964aF9D7eEd9e03E53415D37aA96045") }); let kse_bytes = kse.to_bytes()?; let _ = EnclaveEvent::from_bytes(&kse_bytes.clone()); diff --git a/packages/ciphernode/core/src/lib.rs b/packages/ciphernode/core/src/lib.rs index bf603062..96d3e1f7 100644 --- a/packages/ciphernode/core/src/lib.rs +++ b/packages/ciphernode/core/src/lib.rs @@ -82,8 +82,8 @@ mod tests { // create ciphernode actor for managing ciphernode flow let sortition = Sortition::attach(bus.clone()); - CiphernodeSelector::attach(bus.clone(), sortition, addr); - Registry::attach(bus.clone(), data.clone(), rng).await; + CiphernodeSelector::attach(bus.clone(), sortition.clone(), addr); + Registry::attach(bus.clone(), data.clone(), sortition, rng, addr).await; } fn setup_bfv_params( @@ -253,14 +253,17 @@ mod tests { EnclaveEvent::from(KeyshareCreated { pubkey: p1.clone(), e3_id: e3_id.clone(), + node: eth_addrs[0] }), EnclaveEvent::from(KeyshareCreated { pubkey: p2.clone(), e3_id: e3_id.clone(), + node: eth_addrs[1] }), EnclaveEvent::from(KeyshareCreated { pubkey: p3.clone(), - e3_id: e3_id.clone() + e3_id: e3_id.clone(), + node: eth_addrs[2] }), EnclaveEvent::from(PublicKeyAggregated { pubkey: PublicKeySerializer::to_bytes(pubkey.clone(), params.clone())?, @@ -321,14 +324,17 @@ mod tests { EnclaveEvent::from(DecryptionshareCreated { decryption_share: ds1.clone(), e3_id: e3_id.clone(), + node: eth_addrs[0] }), EnclaveEvent::from(DecryptionshareCreated { decryption_share: ds2.clone(), e3_id: e3_id.clone(), + node: eth_addrs[1] }), EnclaveEvent::from(DecryptionshareCreated { decryption_share: ds3.clone(), e3_id: e3_id.clone(), + node: eth_addrs[2] }), EnclaveEvent::from(PlaintextAggregated { e3_id: e3_id.clone(), diff --git a/packages/ciphernode/core/src/publickey_aggregator.rs b/packages/ciphernode/core/src/publickey_aggregator.rs index bc8733dd..b287309c 100644 --- a/packages/ciphernode/core/src/publickey_aggregator.rs +++ b/packages/ciphernode/core/src/publickey_aggregator.rs @@ -3,15 +3,17 @@ use crate::{ events::{E3id, EnclaveEvent, KeyshareCreated, PublicKeyAggregated}, fhe::{Fhe, GetAggregatePublicKey}, ordered_set::OrderedSet, + GetHasNode, Sortition, }; use actix::prelude::*; -use anyhow::{anyhow, Result}; +use anyhow::Result; #[derive(Debug, Clone)] pub enum PublicKeyAggregatorState { Collecting { nodecount: usize, keyshares: OrderedSet>, + seed: u64, }, Computing { keyshares: OrderedSet>, @@ -31,6 +33,7 @@ struct ComputeAggregate { pub struct PublicKeyAggregator { fhe: Addr, bus: Addr, + sortition: Addr, e3_id: E3id, state: PublicKeyAggregatorState, } @@ -42,14 +45,23 @@ pub struct PublicKeyAggregator { /// It is expected to change this mechanism as we work through adversarial scenarios and write tests /// for them. impl PublicKeyAggregator { - pub fn new(fhe: Addr, bus: Addr, e3_id: E3id, nodecount: usize) -> Self { + pub fn new( + fhe: Addr, + bus: Addr, + sortition: Addr, + e3_id: E3id, + nodecount: usize, + seed: u64, + ) -> Self { PublicKeyAggregator { fhe, bus, e3_id, + sortition, state: PublicKeyAggregatorState::Collecting { nodecount, keyshares: OrderedSet::new(), + seed, }, } } @@ -58,6 +70,7 @@ impl PublicKeyAggregator { let PublicKeyAggregatorState::Collecting { nodecount, keyshares, + .. } = &mut self.state else { return Err(anyhow::anyhow!("Can only add keyshare in Collecting state")); @@ -94,41 +107,68 @@ impl Actor for PublicKeyAggregator { impl Handler for PublicKeyAggregator { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - match msg { - EnclaveEvent::KeyshareCreated { data, .. } => ctx.notify(data), - _ => () + if let EnclaveEvent::KeyshareCreated { data, .. } = msg { + ctx.notify(data) } } } impl Handler for PublicKeyAggregator { - type Result = Result<()>; - - fn handle(&mut self, event: KeyshareCreated, ctx: &mut Self::Context) -> Self::Result { - - if event.e3_id != self.e3_id { - return Err(anyhow!( - "Wrong e3_id sent to aggregator. This should not happen." - )); - } + type Result = ResponseActFuture>; - let PublicKeyAggregatorState::Collecting { .. } = self.state else { - return Err(anyhow!( - "Aggregator has been closed for collecting keyshares." - )); + fn handle(&mut self, event: KeyshareCreated, _: &mut Self::Context) -> Self::Result { + let PublicKeyAggregatorState::Collecting { + nodecount, seed, .. + } = self.state.clone() + else { + return Box::pin(fut::ready(Ok(()))); }; - // add the keyshare and - self.state = self.add_keyshare(event.pubkey)?; + let size = nodecount; + let address = event.node; + let e3_id = event.e3_id.clone(); + let pubkey = event.pubkey.clone(); - // Check the state and if it has changed to the computing - if let PublicKeyAggregatorState::Computing { keyshares } = &self.state { - ctx.notify(ComputeAggregate { - keyshares: keyshares.clone(), - }) - } + Box::pin( + self.sortition + .send(GetHasNode { + address, + size, + seed, + }) + .into_actor(self) + .map(move |res, act, ctx| { + // NOTE: Returning Ok(()) on errors as we probably dont need a result type here since + // we will not be doing a send + let has_node = res?; + if !has_node { + println!("Node not found in committee"); // TODO: log properly + return Ok(()); + } + + if e3_id != act.e3_id { + println!("Wrong e3_id sent to aggregator. This should not happen."); + return Ok(()); + } + + let PublicKeyAggregatorState::Collecting { .. } = act.state else { + println!("Aggregator has been closed for collecting keyshares."); // TODO: log properly + return Ok(()); + }; + + // add the keyshare and + act.state = act.add_keyshare(pubkey)?; + + // Check the state and if it has changed to the computing + if let PublicKeyAggregatorState::Computing { keyshares } = &act.state { + ctx.notify(ComputeAggregate { + keyshares: keyshares.clone(), + }) + } - Ok(()) + Ok(()) + }), + ) } } @@ -170,5 +210,3 @@ impl Handler for PublicKeyAggregator { ) } } - - diff --git a/packages/ciphernode/core/src/publickey_sequencer.rs b/packages/ciphernode/core/src/publickey_sequencer.rs index f5db6a47..e43aaf51 100644 --- a/packages/ciphernode/core/src/publickey_sequencer.rs +++ b/packages/ciphernode/core/src/publickey_sequencer.rs @@ -3,22 +3,26 @@ use actix::prelude::*; -use crate::{E3id, EnclaveEvent, EventBus, Fhe, PublicKeyAggregator}; +use crate::{E3id, EnclaveEvent, EventBus, Fhe, PublicKeyAggregator, Sortition}; pub struct PublicKeySequencer { fhe: Addr, e3_id: E3id, bus: Addr, + sortition:Addr, nodecount: usize, + seed: u64, child: Option>, } impl PublicKeySequencer { - pub fn new(fhe: Addr, e3_id: E3id, bus: Addr, nodecount: usize) -> Self { + pub fn new(fhe: Addr, e3_id: E3id, sortition:Addr,bus: Addr, nodecount: usize, seed:u64) -> Self { Self { fhe, e3_id, bus, + sortition, + seed, nodecount, child: None, } @@ -34,11 +38,13 @@ impl Handler for PublicKeySequencer { fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { let fhe = self.fhe.clone(); let bus = self.bus.clone(); + let sortition = self.sortition.clone(); let nodecount = self.nodecount; let e3_id = self.e3_id.clone(); + let seed = self.seed; let dest = self .child - .get_or_insert_with(|| PublicKeyAggregator::new(fhe, bus, e3_id, nodecount).start()); + .get_or_insert_with(|| PublicKeyAggregator::new(fhe, bus, sortition, e3_id, nodecount, seed).start()); dest.do_send(msg); } } diff --git a/packages/ciphernode/core/src/registry.rs b/packages/ciphernode/core/src/registry.rs index 26ec9b1a..81ca54fb 100644 --- a/packages/ciphernode/core/src/registry.rs +++ b/packages/ciphernode/core/src/registry.rs @@ -3,9 +3,10 @@ // on config use crate::{ CiphernodeSequencer, CommitteeRequested, Data, E3id, EnclaveEvent, EventBus, Fhe, - PlaintextSequencer, PublicKeySequencer, Subscribe, + PlaintextSequencer, PublicKeySequencer, Sortition, Subscribe, }; use actix::prelude::*; +use alloy_primitives::Address; use rand_chacha::ChaCha20Rng; use std::{ collections::HashMap, @@ -21,6 +22,8 @@ pub struct Registry { bus: Addr, ciphernodes: HashMap>, data: Addr, + sortition: Addr, + address: Address, fhes: HashMap>, plaintexts: HashMap>, meta: HashMap, @@ -29,11 +32,19 @@ pub struct Registry { } impl Registry { - pub fn new(bus: Addr, data: Addr, rng: Arc>) -> Self { + pub fn new( + bus: Addr, + data: Addr, + sortition: Addr, + rng: Arc>, + address: Address, + ) -> Self { Self { bus, data, + sortition, rng, + address, ciphernodes: HashMap::new(), plaintexts: HashMap::new(), public_keys: HashMap::new(), @@ -45,9 +56,11 @@ impl Registry { pub async fn attach( bus: Addr, data: Addr, + sortition: Addr, rng: Arc>, + address: Address, ) -> Addr { - let addr = Registry::new(bus.clone(), data, rng).start(); + let addr = Registry::new(bus.clone(), data, sortition, rng, address).start(); bus.send(Subscribe::new("*", addr.clone().into())) .await .unwrap(); @@ -74,6 +87,7 @@ impl Handler for Registry { moduli, plaintext_modulus, crp, + sortition_seed, .. } = data; @@ -86,7 +100,7 @@ impl Handler for Registry { self.meta.entry(e3_id.clone()).or_insert(meta.clone()); let public_key_sequencer_factory = - self.public_key_sequencer_factory(e3_id.clone(), meta.clone(), fhe.clone()); + self.public_key_sequencer_factory(e3_id.clone(), meta.clone(), fhe.clone(), sortition_seed); store(&e3_id, &mut self.public_keys, public_key_sequencer_factory); let ciphernode_sequencer_factory = self.ciphernode_sequencer_factory(fhe.clone()); @@ -133,10 +147,12 @@ impl Registry { e3_id: E3id, meta: CommitteeMeta, fhe: Addr, + seed: u64, ) -> impl FnOnce() -> Addr { let bus = self.bus.clone(); let nodecount = meta.nodecount; - move || PublicKeySequencer::new(fhe, e3_id, bus, nodecount).start() + let sortition = self.sortition.clone(); + move || PublicKeySequencer::new(fhe, e3_id, sortition, bus, nodecount, seed).start() } fn ciphernode_sequencer_factory( @@ -145,7 +161,8 @@ impl Registry { ) -> impl FnOnce() -> Addr { let data = self.data.clone(); let bus = self.bus.clone(); - move || CiphernodeSequencer::new(fhe, data, bus).start() + let address = self.address; + move || CiphernodeSequencer::new(fhe, data, bus, address).start() } fn plaintext_sequencer_factory( @@ -160,15 +177,15 @@ impl Registry { } fn forward_message(&self, e3_id: &E3id, msg: EnclaveEvent) { - if let Some(act) = self.public_keys.get(&e3_id) { + if let Some(act) = self.public_keys.get(e3_id) { act.clone().recipient().do_send(msg.clone()); } - if let Some(act) = self.plaintexts.get(&e3_id) { + if let Some(act) = self.plaintexts.get(e3_id) { act.do_send(msg.clone()); } - if let Some(act) = self.ciphernodes.get(&e3_id) { + if let Some(act) = self.ciphernodes.get(e3_id) { act.do_send(msg.clone()); } }