Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Orchestrators #98

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions packages/ciphernode/core/src/ciphernode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::{
eventbus::EventBus,
events::{EnclaveEvent, KeyshareCreated},
fhe::{Fhe, GenerateKeyshare},
CiphernodeSelected, CiphertextOutputPublished, DecryptCiphertext, DecryptionshareCreated, Get,
Subscribe,
ActorFactory, CiphernodeSelected, CiphertextOutputPublished, DecryptCiphertext,
DecryptionshareCreated, Get,
};
use actix::prelude::*;
use alloy_primitives::Address;
Expand All @@ -30,25 +30,6 @@ impl Ciphernode {
address,
}
}

pub async fn attach(
bus: Addr<EventBus>,
fhe: Addr<Fhe>,
data: Addr<Data>,
address: Address,
) -> Addr<Self> {
let node = Ciphernode::new(bus.clone(), fhe, data, address).start();
let _ = bus
.send(Subscribe::new("CiphernodeSelected", node.clone().into()))
.await;
let _ = bus
.send(Subscribe::new(
"CiphertextOutputPublished",
node.clone().into(),
))
.await;
node
}
}

impl Handler<EnclaveEvent> for Ciphernode {
Expand Down Expand Up @@ -167,3 +148,22 @@ async fn on_decryption_requested(

Ok(())
}

pub struct CiphernodeFactory;
impl CiphernodeFactory {
pub fn create(bus: Addr<EventBus>, data: Addr<Data>, address: Address) -> ActorFactory {
Box::new(move |ctx, evt| {
// Save Ciphernode on CiphernodeSelected
let EnclaveEvent::CiphernodeSelected { .. } = evt else {
return;
};

let Some(ref fhe) = ctx.fhe else {
return;
};

ctx.ciphernode =
Some(Ciphernode::new(bus.clone(), fhe.clone(), data.clone(), address).start())
})
}
}
93 changes: 0 additions & 93 deletions packages/ciphernode/core/src/ciphernode_orchestrator.rs

This file was deleted.

26 changes: 26 additions & 0 deletions packages/ciphernode/core/src/committee_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::{ActorFactory, CommitteeRequested, EnclaveEvent};

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CommitteeMeta {
pub nodecount: usize,
pub seed: u64,
}

pub struct CommitteeMetaFactory;

impl CommitteeMetaFactory {
pub fn create() -> ActorFactory {
Box::new(move |ctx, evt| {
let EnclaveEvent::CommitteeRequested { data, .. }: crate::EnclaveEvent = evt else {
return;
};
let CommitteeRequested {
nodecount,
sortition_seed: seed,
..
} = data;

ctx.meta = Some(CommitteeMeta { nodecount, seed });
})
}
}
145 changes: 145 additions & 0 deletions packages/ciphernode/core/src/e3_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::collections::HashMap;

use actix::{Actor, Addr, Context, Handler, Recipient};

use crate::{
Ciphernode, CommitteeMeta, E3id, EnclaveEvent, EventBus, Fhe, PlaintextAggregator,
PublicKeyAggregator, Subscribe,
};

#[derive(Default)]
// TODO: Set this up with a Typestate pattern
pub struct E3RequestContext {
pub ciphernode: Option<Addr<Ciphernode>>,
pub fhe: Option<Addr<Fhe>>,
pub plaintext: Option<Addr<PlaintextAggregator>>,
pub publickey: Option<Addr<PublicKeyAggregator>>,
pub meta: Option<CommitteeMeta>,
}

struct EventBuffer {
buffer: HashMap<String, Vec<EnclaveEvent>>,
}

impl Default for EventBuffer {
fn default() -> Self {
Self {
buffer: HashMap::new(),
}
}
}

impl EventBuffer {
pub fn add(&mut self, key: &str, event: EnclaveEvent) {
self.buffer.entry(key.to_string()).or_default().push(event)
}

pub fn take(&mut self, key: &str) -> Vec<EnclaveEvent> {
self.buffer
.get_mut(key)
.map(std::mem::take)
.unwrap_or_default()
}
}

impl E3RequestContext {
fn recipients(&self) -> Vec<(String, Option<Recipient<EnclaveEvent>>)> {
vec![
(
"ciphernode".to_owned(),
self.ciphernode.clone().map(|addr| addr.into()),
),
(
"plaintext".to_owned(),
self.plaintext.clone().map(|addr| addr.into()),
),
(
"publickey".to_owned(),
self.publickey.clone().map(|addr| addr.into()),
),
]
}

fn forward_message(&self, msg: &EnclaveEvent, buffer: &mut EventBuffer) {
self.recipients().into_iter().for_each(|(key, recipient)| {
if let Some(act) = recipient {
act.do_send(msg.clone());
for m in buffer.take(&key) {
act.do_send(m);
}
} else {
buffer.add(&key, msg.clone());
}
});
}
}

struct E3RequestBuffers {
ciphernode: Vec<EnclaveEvent>,
publickey: Vec<EnclaveEvent>,
plaintext: Vec<EnclaveEvent>,
}

pub type ActorFactory = Box<dyn FnMut(&mut E3RequestContext, EnclaveEvent)>;

// TODO: setup typestate pattern so that we have to place factories within correct order of
// dependencies
pub struct E3RequestManager {
contexts: HashMap<E3id, E3RequestContext>,
factories: Vec<ActorFactory>,
buffer: EventBuffer,
}

impl E3RequestManager {
pub fn builder(bus: Addr<EventBus>) -> E3RequestManagerBuilder {
E3RequestManagerBuilder {
bus,
factories: vec![],
}
}
}

pub struct E3RequestManagerBuilder {
pub bus: Addr<EventBus>,
pub factories: Vec<ActorFactory>,
}
impl E3RequestManagerBuilder {
pub fn add_hook(mut self, listener: ActorFactory) -> Self {
self.factories.push(listener);
self
}

pub fn build(self) -> Addr<E3RequestManager> {
let e3r = E3RequestManager {
contexts: HashMap::new(),
factories: self.factories,
buffer: EventBuffer::default(),
};

let addr = e3r.start();
self.bus
.do_send(Subscribe::new("*", addr.clone().recipient()));
addr
}
}

impl Actor for E3RequestManager {
type Context = Context<Self>;
}

impl Handler<EnclaveEvent> for E3RequestManager {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result {
let Some(e3_id) = msg.get_e3_id() else {
return;
};

let context = self.contexts.entry(e3_id).or_default();

for factory in &mut self.factories {
factory(context, msg.clone());
}

context.forward_message(&msg, &mut self.buffer);
}
}
27 changes: 27 additions & 0 deletions packages/ciphernode/core/src/fhe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
CiphertextSerializer, DecryptionShareSerializer, PublicKeySerializer,
PublicKeyShareSerializer, SecretKeySerializer,
},
ActorFactory, CommitteeRequested, EnclaveEvent,
};
use actix::{Actor, Context, Handler, Message};
use anyhow::*;
Expand Down Expand Up @@ -182,3 +183,29 @@ impl Handler<GetAggregatePlaintext> for Fhe {
Ok(bincode::serialize(&decoded)?)
}
}

pub struct FheFactory;

impl FheFactory {
pub fn create(rng: Arc<Mutex<ChaCha20Rng>>) -> ActorFactory {
Box::new(move |ctx, evt| {
// Saving the fhe on Committee Requested
let EnclaveEvent::CommitteeRequested { data, .. } = evt else {
return;
};
let CommitteeRequested {
degree,
moduli,
plaintext_modulus,
crp,
..
} = data;

ctx.fhe = Some(
Fhe::from_raw_params(&moduli, degree, plaintext_modulus, &crp, rng.clone())
.unwrap()
.start(),
);
})
}
}
Loading
Loading