From 4beebe7a2ec30220be602bedf26fff3c161c4880 Mon Sep 17 00:00:00 2001 From: ktdlr Date: Mon, 21 Oct 2024 20:45:06 +1100 Subject: [PATCH] Add test for hydration --- packages/ciphernode/Cargo.lock | 1 + packages/ciphernode/core/src/events.rs | 36 ++++- packages/ciphernode/data/src/data_store.rs | 4 +- .../ciphernode/enclave_node/src/aggregator.rs | 2 +- .../ciphernode/enclave_node/src/ciphernode.rs | 2 +- packages/ciphernode/keyshare/Cargo.toml | 1 + packages/ciphernode/keyshare/src/keyshare.rs | 3 + .../router/src/ciphernode_selector.rs | 28 +++- packages/ciphernode/router/src/context.rs | 8 + .../router/src/e3_request_router.rs | 20 ++- .../tests/test_aggregation_and_decryption.rs | 139 +++++++++++++++--- 11 files changed, 212 insertions(+), 32 deletions(-) diff --git a/packages/ciphernode/Cargo.lock b/packages/ciphernode/Cargo.lock index 2a795236..efc250b2 100644 --- a/packages/ciphernode/Cargo.lock +++ b/packages/ciphernode/Cargo.lock @@ -3194,6 +3194,7 @@ dependencies = [ "enclave-core", "fhe 0.1.0", "serde", + "tracing", ] [[package]] diff --git a/packages/ciphernode/core/src/events.rs b/packages/ciphernode/core/src/events.rs index 527ad643..925028f3 100644 --- a/packages/ciphernode/core/src/events.rs +++ b/packages/ciphernode/core/src/events.rs @@ -120,10 +120,13 @@ pub enum EnclaveEvent { id: EventId, data: E3RequestComplete, }, - // CommitteeSelected, - // OutputDecrypted, - // CiphernodeRegistered, - // CiphernodeDeregistered, + Shutdown { + id: EventId, + data: Shutdown, + }, // CommitteeSelected, + // OutputDecrypted, + // CiphernodeRegistered, + // CiphernodeDeregistered, } impl EnclaveEvent { @@ -147,6 +150,7 @@ impl EnclaveEvent { EnclaveEvent::CiphernodeAdded { .. } => true, EnclaveEvent::CiphernodeRemoved { .. } => true, EnclaveEvent::E3RequestComplete { .. } => true, + EnclaveEvent::Shutdown { .. } => true, _ => false, } } @@ -166,6 +170,7 @@ impl From for EventId { EnclaveEvent::CiphernodeRemoved { id, .. } => id, EnclaveEvent::EnclaveError { id, .. } => id, EnclaveEvent::E3RequestComplete { id, .. } => id, + EnclaveEvent::Shutdown { id, .. } => id, } } } @@ -196,6 +201,7 @@ impl EnclaveEvent { EnclaveEvent::CiphernodeRemoved { data, .. } => format!("{}", data), EnclaveEvent::E3RequestComplete { data, .. } => format!("{}", data), EnclaveEvent::EnclaveError { data, .. } => format!("{:?}", data), + EnclaveEvent::Shutdown { data, .. } => format!("{:?}", data), // _ => "".to_string(), } } @@ -206,6 +212,8 @@ pub trait FromError { fn from_error(err_type: EnclaveErrorType, error: Self::Error) -> Self; } + +// TODO: These From traits should be handled by a macro impl From for EnclaveEvent { fn from(data: KeyshareCreated) -> Self { EnclaveEvent::KeyshareCreated { @@ -305,6 +313,15 @@ impl From for EnclaveEvent { } } +impl From for EnclaveEvent { + fn from(data: Shutdown) -> Self { + EnclaveEvent::Shutdown { + id: EventId::from(data.clone()), + data: data.clone(), + } + } +} + impl FromError for EnclaveEvent { type Error = anyhow::Error; fn from_error(err_type: EnclaveErrorType, error: Self::Error) -> Self { @@ -505,6 +522,17 @@ impl Display for Die { } } + +/// Represents a shutdown event triggered by SIG TERM +#[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[rtype(result = "()")] +pub struct Shutdown; +impl Display for Shutdown { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Shutdown",) + } +} + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Seed(pub [u8; 32]); impl From for u64 { diff --git a/packages/ciphernode/data/src/data_store.rs b/packages/ciphernode/data/src/data_store.rs index a8c8f1d6..4ba59060 100644 --- a/packages/ciphernode/data/src/data_store.rs +++ b/packages/ciphernode/data/src/data_store.rs @@ -64,7 +64,7 @@ impl DataStore { } /// Construct a data store from an InMemStore actor - pub fn from_in_mem(addr: Addr) -> Self { + pub fn from_in_mem(addr: &Addr) -> Self { Self { get: addr.clone().recipient(), insert: addr.clone().recipient(), @@ -88,7 +88,7 @@ impl DataStore { /// #[actix_rt::main] /// async fn main() -> Result<()>{ /// let addr = InMemStore::new(false).start(); - /// let store = DataStore::from_in_mem(addr); + /// let store = DataStore::from_in_mem(&addr); /// assert_eq!(store.base("//foo") /// .scope("bar") /// .scope("/baz") diff --git a/packages/ciphernode/enclave_node/src/aggregator.rs b/packages/ciphernode/enclave_node/src/aggregator.rs index d3307b78..328ed40d 100644 --- a/packages/ciphernode/enclave_node/src/aggregator.rs +++ b/packages/ciphernode/enclave_node/src/aggregator.rs @@ -55,7 +55,7 @@ impl MainAggregator { rand_chacha::ChaCha20Rng::from_rng(OsRng).expect("Failed to create RNG"), )); - let store = DataStore::from_in_mem(InMemStore::new(true).start()); + let store = DataStore::from_in_mem(&InMemStore::new(true).start()); let repositories = store.repositories(); let sortition = Sortition::attach(bus.clone(), repositories.sortition()); let signer = pull_eth_signer_from_env("PRIVATE_KEY").await?; diff --git a/packages/ciphernode/enclave_node/src/ciphernode.rs b/packages/ciphernode/enclave_node/src/ciphernode.rs index bac04dde..2f74f37b 100644 --- a/packages/ciphernode/enclave_node/src/ciphernode.rs +++ b/packages/ciphernode/enclave_node/src/ciphernode.rs @@ -61,7 +61,7 @@ impl MainCiphernode { )); let bus = EventBus::new(true).start(); // TODO: switch to Sled actor - let store = DataStore::from_in_mem(InMemStore::new(true).start()); + let store = DataStore::from_in_mem(&InMemStore::new(true).start()); let repositories = store.repositories(); let sortition = Sortition::attach(bus.clone(), repositories.sortition()); diff --git a/packages/ciphernode/keyshare/Cargo.toml b/packages/ciphernode/keyshare/Cargo.toml index b25e6bad..f4084acd 100644 --- a/packages/ciphernode/keyshare/Cargo.toml +++ b/packages/ciphernode/keyshare/Cargo.toml @@ -11,3 +11,4 @@ actix = { workspace = true } anyhow = { workspace = true } serde = { workspace = true } async-trait = { workspace = true } +tracing = { workspace = true } diff --git a/packages/ciphernode/keyshare/src/keyshare.rs b/packages/ciphernode/keyshare/src/keyshare.rs index f30c89d3..bca9c958 100644 --- a/packages/ciphernode/keyshare/src/keyshare.rs +++ b/packages/ciphernode/keyshare/src/keyshare.rs @@ -9,6 +9,7 @@ use enclave_core::{ use fhe::{DecryptCiphertext, Fhe}; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use tracing::warn; pub struct Keyshare { fhe: Arc, @@ -84,6 +85,7 @@ impl Handler for Keyshare { EnclaveEvent::CiphernodeSelected { data, .. } => ctx.notify(data), EnclaveEvent::CiphertextOutputPublished { data, .. } => ctx.notify(data), EnclaveEvent::E3RequestComplete { .. } => ctx.notify(Die), + EnclaveEvent::Shutdown { .. } => ctx.notify(Die), _ => (), } } @@ -162,6 +164,7 @@ impl Handler for Keyshare { impl Handler for Keyshare { type Result = (); fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result { + warn!("Keyshare is shutting down"); ctx.stop() } } diff --git a/packages/ciphernode/router/src/ciphernode_selector.rs b/packages/ciphernode/router/src/ciphernode_selector.rs index 236ff0fe..b0dbd08c 100644 --- a/packages/ciphernode/router/src/ciphernode_selector.rs +++ b/packages/ciphernode/router/src/ciphernode_selector.rs @@ -1,7 +1,7 @@ /// CiphernodeSelector is an actor that determines if a ciphernode is part of a committee and if so /// forwards a CiphernodeSelected event to the event bus use actix::prelude::*; -use enclave_core::{CiphernodeSelected, EnclaveEvent, EventBus, Subscribe}; +use enclave_core::{CiphernodeSelected, E3Requested, EnclaveEvent, EventBus, Shutdown, Subscribe}; use sortition::{GetHasNode, Sortition}; use tracing::info; @@ -28,24 +28,32 @@ impl CiphernodeSelector { let addr = CiphernodeSelector::new(bus.clone(), sortition, address).start(); bus.do_send(Subscribe::new("E3Requested", addr.clone().recipient())); + bus.do_send(Subscribe::new("Shutdown", addr.clone().recipient())); addr } } impl Handler for CiphernodeSelector { + type Result = (); + fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { + match msg { + EnclaveEvent::E3Requested { data, .. } => ctx.notify(data), + EnclaveEvent::Shutdown { data, .. } => ctx.notify(data), + _ => (), + } + } +} + +impl Handler for CiphernodeSelector { type Result = ResponseFuture<()>; - fn handle(&mut self, event: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, data: E3Requested, _ctx: &mut Self::Context) -> Self::Result { let address = self.address.clone(); let sortition = self.sortition.clone(); let bus = self.bus.clone(); Box::pin(async move { - let EnclaveEvent::E3Requested { data, .. } = event else { - return; - }; - let seed = data.seed; let size = data.threshold_m; @@ -70,3 +78,11 @@ impl Handler for CiphernodeSelector { }) } } + +impl Handler for CiphernodeSelector { + type Result = (); + fn handle(&mut self, _msg: Shutdown, ctx: &mut Self::Context) -> Self::Result { + tracing::info!("Killing CiphernodeSelector"); + ctx.stop(); + } +} diff --git a/packages/ciphernode/router/src/context.rs b/packages/ciphernode/router/src/context.rs index 22d2e4d1..c3643132 100644 --- a/packages/ciphernode/router/src/context.rs +++ b/packages/ciphernode/router/src/context.rs @@ -82,6 +82,14 @@ impl E3RequestContext { }); } + pub fn forward_message_now(&self, msg: &EnclaveEvent) { + self.recipients().into_iter().for_each(|(_, recipient)| { + if let Some(act) = recipient { + act.do_send(msg.clone()); + } + }); + } + /// Accept a DataStore ID and a Keystore actor address pub fn set_keyshare(&mut self, value: Addr) { self.keyshare = Some(value); diff --git a/packages/ciphernode/router/src/e3_request_router.rs b/packages/ciphernode/router/src/e3_request_router.rs index daf57787..35d45c32 100644 --- a/packages/ciphernode/router/src/e3_request_router.rs +++ b/packages/ciphernode/router/src/e3_request_router.rs @@ -3,6 +3,7 @@ use crate::E3RequestContext; use crate::E3RequestContextParams; use crate::E3RequestContextSnapshot; use crate::RepositoriesFactory; +use actix::AsyncContext; use actix::{Actor, Addr, Context, Handler}; use anyhow::*; use async_trait::async_trait; @@ -12,6 +13,7 @@ use data::FromSnapshotWithParams; use data::Repository; use data::Snapshot; use enclave_core::E3RequestComplete; +use enclave_core::Shutdown; use enclave_core::{E3id, EnclaveEvent, EventBus, Subscribe}; use serde::Deserialize; use serde::Serialize; @@ -105,7 +107,13 @@ impl Actor for E3RequestRouter { impl Handler for E3RequestRouter { type Result = (); - fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { + // If we are shuttomg down then bail on anything else + if let EnclaveEvent::Shutdown { data, .. } = msg { + ctx.notify(data); + return; + } + let Some(e3_id) = msg.get_e3_id() else { return; }; @@ -155,6 +163,16 @@ impl Handler for E3RequestRouter { } } +impl Handler for E3RequestRouter { + type Result = (); + fn handle(&mut self, msg: Shutdown, _ctx: &mut Self::Context) -> Self::Result { + let shutdown_evt = EnclaveEvent::from(msg); + for (_, ctx) in self.contexts.iter() { + ctx.forward_message_now(&shutdown_evt) + } + } +} + #[derive(Serialize, Deserialize)] pub struct E3RequestRouterSnapshot { contexts: Vec, diff --git a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs index 4c92d461..30e4c1bf 100644 --- a/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs +++ b/packages/ciphernode/tests/tests/test_aggregation_and_decryption.rs @@ -2,7 +2,7 @@ use data::{DataStore, InMemStore}; use enclave_core::{ CiphernodeAdded, CiphernodeSelected, CiphertextOutputPublished, DecryptionshareCreated, E3RequestComplete, E3Requested, E3id, EnclaveEvent, EventBus, GetHistory, KeyshareCreated, - OrderedSet, PlaintextAggregated, PublicKeyAggregated, ResetHistory, Seed, + OrderedSet, PlaintextAggregated, PublicKeyAggregated, ResetHistory, Seed, Shutdown, }; use fhe::{setup_crp_params, ParamsWithCrp, SharedRng}; use logger::SimpleLogger; @@ -14,7 +14,7 @@ use router::{ use sortition::Sortition; use actix::prelude::*; -use alloy::primitives::Address; +use alloy::{primitives::Address, signers::k256::sha2::digest::Reset}; use anyhow::*; use fhe_rs::{ bfv::{BfvParameters, Ciphertext, Encoding, Plaintext, PublicKey, SecretKey}, @@ -29,22 +29,31 @@ use tokio::sync::Mutex; use tokio::{sync::mpsc::channel, time::sleep}; // Simulating a local node +type LocalCiphernodeTuple = ( + String, // Address + Addr, + Addr, + Addr, + Addr, +); + async fn setup_local_ciphernode( bus: &Addr, rng: &SharedRng, logging: bool, addr: &str, -) -> Result<()> { + data: Option>, +) -> Result { // create data actor for saving data - let data_actor = InMemStore::new(logging).start(); // TODO: Use a sled backed Data Actor - let store = DataStore::from_in_mem(data_actor); + let data_actor = data.unwrap_or_else(|| InMemStore::new(logging).start()); + let store = DataStore::from_in_mem(&data_actor); let repositories = store.repositories(); // create ciphernode actor for managing ciphernode flow let sortition = Sortition::attach(bus.clone(), repositories.sortition()); CiphernodeSelector::attach(bus.clone(), sortition.clone(), addr); - E3RequestRouter::builder(bus.clone(), store) + let router = E3RequestRouter::builder(bus.clone(), store) .add_feature(FheFeature::create(rng.clone())) .add_feature(PublicKeyAggregatorFeature::create( bus.clone(), @@ -58,8 +67,8 @@ async fn setup_local_ciphernode( .build() .await?; - SimpleLogger::attach(addr, bus.clone()); - Ok(()) + let logger = SimpleLogger::attach(addr, bus.clone()); + Ok((addr.to_owned(), data_actor, sortition, router, logger)) } fn generate_pk_share( @@ -151,14 +160,15 @@ async fn create_local_ciphernodes( bus: &Addr, rng: &SharedRng, count: u32, -) -> Result> { +) -> Result> { let eth_addrs = create_random_eth_addrs(count); - + let mut result = vec![]; for addr in ð_addrs { - setup_local_ciphernode(&bus, &rng, true, addr).await?; + let tuple = setup_local_ciphernode(&bus, &rng, true, addr, None).await?; + result.push(tuple); } - Ok(eth_addrs) + Ok(result) } fn encrypt_ciphertext( @@ -246,9 +256,14 @@ fn to_decryptionshare_events( result } -#[actix::test] -async fn test_public_key_aggregation_and_decryption() -> Result<()> { - // Setup +fn get_common_setup() -> Result<( + Addr, + SharedRng, + Seed, + Arc, + CommonRandomPoly, + E3id, +)> { let bus = EventBus::new(true).start(); let rng = create_shared_rng_from_u64(42); let seed = create_seed_from_u64(123); @@ -256,9 +271,20 @@ async fn test_public_key_aggregation_and_decryption() -> Result<()> { let crpoly = CommonRandomPoly::deserialize(&crp_bytes.clone(), ¶ms)?; let e3_id = E3id::new("1234"); + Ok((bus, rng, seed, params, crpoly, e3_id)) +} + +#[actix::test] +async fn test_public_key_aggregation_and_decryption() -> Result<()> { + // Setup + let (bus, rng, seed, params, crpoly, e3_id) = get_common_setup()?; // Setup actual ciphernodes and dispatch add events - let eth_addrs = create_local_ciphernodes(&bus, &rng, 3).await?; + let ciphernode_addrs = create_local_ciphernodes(&bus, &rng, 3).await?; + let eth_addrs = ciphernode_addrs + .iter() + .map(|tup| tup.0.to_owned()) + .collect(); let add_events = add_ciphernodes(&bus, ð_addrs).await?; let e3_request_event = EnclaveEvent::from(E3Requested { e3_id: e3_id.clone(), @@ -355,7 +381,86 @@ async fn test_public_key_aggregation_and_decryption() -> Result<()> { } #[actix::test] -async fn test_actors_all_die_when_requested() {} +async fn test_stopped_keyshares_retain_state() -> Result<()> { + let (bus, rng, seed, params, crpoly, e3_id) = get_common_setup()?; + + let eth_addrs = create_random_eth_addrs(2); + + let cn1 = setup_local_ciphernode(&bus, &rng, true, ð_addrs[0], None).await?; + let cn2 = setup_local_ciphernode(&bus, &rng, true, ð_addrs[1], None).await?; + add_ciphernodes(&bus, ð_addrs).await?; + + sleep(Duration::from_millis(1)).await; + + // Send e3request + let e3_request_event = EnclaveEvent::from(E3Requested { + e3_id: e3_id.clone(), + threshold_m: 2, + seed: seed.clone(), + params: params.to_bytes(), + src_chain_id: 1, + }); + bus.send(e3_request_event.clone()).await?; + + sleep(Duration::from_millis(1)).await; + + let history = bus.send(GetHistory).await?; + + // SEND SHUTDOWN! + bus.send(EnclaveEvent::from(Shutdown)).await?; + + // Reset history + bus.send(ResetHistory).await?; + sleep(Duration::from_millis(1)).await; + + // Check event count is correct + assert_eq!(history.len(), 7); + + // Get the address and the data actor from the two ciphernodes + // and rehydrate them to new actors + let (addr1, data1, ..) = cn1; + let (addr2, data2, ..) = cn2; + + // Apply the address and data node to two new actors + // Here we test that hydration occurred sucessfully + setup_local_ciphernode(&bus, &rng, true, &addr1, Some(data1)).await?; + setup_local_ciphernode(&bus, &rng, true, &addr2, Some(data2)).await?; + // get the public key from history. + let pubkey: PublicKey = history + .iter() + .filter_map(|evt| match evt { + EnclaveEvent::KeyshareCreated { data, .. } => { + PublicKeyShare::deserialize(&data.pubkey, ¶ms, crpoly.clone()).ok() + } + _ => None, + }) + .aggregate()?; + + let raw_plaintext = vec![1234u64, 873827u64]; + let (ciphertext, expected) = encrypt_ciphertext(¶ms, pubkey, raw_plaintext)?; + + bus.send( + EnclaveEvent::from(CiphertextOutputPublished { + ciphertext_output: ciphertext.to_bytes(), + e3_id: e3_id.clone(), + }) + .clone(), + ) + .await?; + + sleep(Duration::from_millis(1)).await; + + let history = bus.send(GetHistory).await?; + + let actual = history.iter().find_map(|evt| match evt { + EnclaveEvent::PlaintextAggregated { data, .. } => Some(data.decrypted_output.clone()), + _ => None, + }); + + assert_eq!(actual, Some(expected)); + + Ok(()) +} #[actix::test] async fn test_p2p_actor_forwards_events_to_network() -> Result<()> {