Skip to content

Commit

Permalink
Remove Orchestrators (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
ryardley authored Sep 19, 2024
1 parent b45043c commit 8365531
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 585 deletions.
42 changes: 21 additions & 21 deletions packages/ciphernode/core/src/ciphernode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use crate::{
eventbus::EventBus,
events::{EnclaveEvent, KeyshareCreated},
fhe::{Fhe, GenerateKeyshare},
CiphernodeSelected, CiphertextOutputPublished, DecryptCiphertext, DecryptionshareCreated, Get,
Subscribe,
ActorFactory, CiphernodeSelected, CiphertextOutputPublished, DecryptCiphertext,
DecryptionshareCreated, Get,
};
use actix::prelude::*;
use alloy_primitives::Address;
Expand All @@ -30,25 +30,6 @@ impl Ciphernode {
address,
}
}

pub async fn attach(
bus: Addr<EventBus>,
fhe: Addr<Fhe>,
data: Addr<Data>,
address: Address,
) -> Addr<Self> {
let node = Ciphernode::new(bus.clone(), fhe, data, address).start();
let _ = bus
.send(Subscribe::new("CiphernodeSelected", node.clone().into()))
.await;
let _ = bus
.send(Subscribe::new(
"CiphertextOutputPublished",
node.clone().into(),
))
.await;
node
}
}

impl Handler<EnclaveEvent> for Ciphernode {
Expand Down Expand Up @@ -167,3 +148,22 @@ async fn on_decryption_requested(

Ok(())
}

pub struct CiphernodeFactory;
impl CiphernodeFactory {
pub fn create(bus: Addr<EventBus>, data: Addr<Data>, address: Address) -> ActorFactory {
Box::new(move |ctx, evt| {
// Save Ciphernode on CiphernodeSelected
let EnclaveEvent::CiphernodeSelected { .. } = evt else {
return;
};

let Some(ref fhe) = ctx.fhe else {
return;
};

ctx.ciphernode =
Some(Ciphernode::new(bus.clone(), fhe.clone(), data.clone(), address).start())
})
}
}
93 changes: 0 additions & 93 deletions packages/ciphernode/core/src/ciphernode_orchestrator.rs

This file was deleted.

26 changes: 26 additions & 0 deletions packages/ciphernode/core/src/committee_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::{ActorFactory, CommitteeRequested, EnclaveEvent};

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CommitteeMeta {
pub nodecount: usize,
pub seed: u64,
}

pub struct CommitteeMetaFactory;

impl CommitteeMetaFactory {
pub fn create() -> ActorFactory {
Box::new(move |ctx, evt| {
let EnclaveEvent::CommitteeRequested { data, .. }: crate::EnclaveEvent = evt else {
return;
};
let CommitteeRequested {
nodecount,
sortition_seed: seed,
..
} = data;

ctx.meta = Some(CommitteeMeta { nodecount, seed });
})
}
}
145 changes: 145 additions & 0 deletions packages/ciphernode/core/src/e3_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::collections::HashMap;

use actix::{Actor, Addr, Context, Handler, Recipient};

use crate::{
Ciphernode, CommitteeMeta, E3id, EnclaveEvent, EventBus, Fhe, PlaintextAggregator,
PublicKeyAggregator, Subscribe,
};

#[derive(Default)]
// TODO: Set this up with a Typestate pattern
pub struct E3RequestContext {
pub ciphernode: Option<Addr<Ciphernode>>,
pub fhe: Option<Addr<Fhe>>,
pub plaintext: Option<Addr<PlaintextAggregator>>,
pub publickey: Option<Addr<PublicKeyAggregator>>,
pub meta: Option<CommitteeMeta>,
}

struct EventBuffer {
buffer: HashMap<String, Vec<EnclaveEvent>>,
}

impl Default for EventBuffer {
fn default() -> Self {
Self {
buffer: HashMap::new(),
}
}
}

impl EventBuffer {
pub fn add(&mut self, key: &str, event: EnclaveEvent) {
self.buffer.entry(key.to_string()).or_default().push(event)
}

pub fn take(&mut self, key: &str) -> Vec<EnclaveEvent> {
self.buffer
.get_mut(key)
.map(std::mem::take)
.unwrap_or_default()
}
}

impl E3RequestContext {
fn recipients(&self) -> Vec<(String, Option<Recipient<EnclaveEvent>>)> {
vec![
(
"ciphernode".to_owned(),
self.ciphernode.clone().map(|addr| addr.into()),
),
(
"plaintext".to_owned(),
self.plaintext.clone().map(|addr| addr.into()),
),
(
"publickey".to_owned(),
self.publickey.clone().map(|addr| addr.into()),
),
]
}

fn forward_message(&self, msg: &EnclaveEvent, buffer: &mut EventBuffer) {
self.recipients().into_iter().for_each(|(key, recipient)| {
if let Some(act) = recipient {
act.do_send(msg.clone());
for m in buffer.take(&key) {
act.do_send(m);
}
} else {
buffer.add(&key, msg.clone());
}
});
}
}

struct E3RequestBuffers {
ciphernode: Vec<EnclaveEvent>,
publickey: Vec<EnclaveEvent>,
plaintext: Vec<EnclaveEvent>,
}

pub type ActorFactory = Box<dyn FnMut(&mut E3RequestContext, EnclaveEvent)>;

// TODO: setup typestate pattern so that we have to place factories within correct order of
// dependencies
pub struct E3RequestManager {
contexts: HashMap<E3id, E3RequestContext>,
factories: Vec<ActorFactory>,
buffer: EventBuffer,
}

impl E3RequestManager {
pub fn builder(bus: Addr<EventBus>) -> E3RequestManagerBuilder {
E3RequestManagerBuilder {
bus,
factories: vec![],
}
}
}

pub struct E3RequestManagerBuilder {
pub bus: Addr<EventBus>,
pub factories: Vec<ActorFactory>,
}
impl E3RequestManagerBuilder {
pub fn add_hook(mut self, listener: ActorFactory) -> Self {
self.factories.push(listener);
self
}

pub fn build(self) -> Addr<E3RequestManager> {
let e3r = E3RequestManager {
contexts: HashMap::new(),
factories: self.factories,
buffer: EventBuffer::default(),
};

let addr = e3r.start();
self.bus
.do_send(Subscribe::new("*", addr.clone().recipient()));
addr
}
}

impl Actor for E3RequestManager {
type Context = Context<Self>;
}

impl Handler<EnclaveEvent> for E3RequestManager {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result {
let Some(e3_id) = msg.get_e3_id() else {
return;
};

let context = self.contexts.entry(e3_id).or_default();

for factory in &mut self.factories {
factory(context, msg.clone());
}

context.forward_message(&msg, &mut self.buffer);
}
}
27 changes: 27 additions & 0 deletions packages/ciphernode/core/src/fhe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
CiphertextSerializer, DecryptionShareSerializer, PublicKeySerializer,
PublicKeyShareSerializer, SecretKeySerializer,
},
ActorFactory, CommitteeRequested, EnclaveEvent,
};
use actix::{Actor, Context, Handler, Message};
use anyhow::*;
Expand Down Expand Up @@ -182,3 +183,29 @@ impl Handler<GetAggregatePlaintext> for Fhe {
Ok(bincode::serialize(&decoded)?)
}
}

pub struct FheFactory;

impl FheFactory {
pub fn create(rng: Arc<Mutex<ChaCha20Rng>>) -> ActorFactory {
Box::new(move |ctx, evt| {
// Saving the fhe on Committee Requested
let EnclaveEvent::CommitteeRequested { data, .. } = evt else {
return;
};
let CommitteeRequested {
degree,
moduli,
plaintext_modulus,
crp,
..
} = data;

ctx.fhe = Some(
Fhe::from_raw_params(&moduli, degree, plaintext_modulus, &crp, rng.clone())
.unwrap()
.start(),
);
})
}
}
Loading

0 comments on commit 8365531

Please sign in to comment.