From 4795e1094454540fb8d81287d6093575e4e65618 Mon Sep 17 00:00:00 2001 From: timorleph <145755355+timorleph@users.noreply.github.com> Date: Tue, 26 Mar 2024 10:56:36 +0100 Subject: [PATCH] A0-4160: Make packing part of unit creation (#425) * Make packing part of unit creation * Better variable name from review and doc update * Small review change --- Cargo.lock | 2 +- consensus/Cargo.toml | 2 +- consensus/src/consensus.rs | 34 ++-- consensus/src/creation/mod.rs | 120 +++++++------- consensus/src/creation/packer.rs | 31 ++++ consensus/src/member.rs | 2 +- consensus/src/runway/mod.rs | 109 +++++-------- consensus/src/runway/packer.rs | 242 ----------------------------- consensus/src/testing/consensus.rs | 115 +++++++++----- consensus/src/testing/creation.rs | 46 +++--- consensus/src/testing/dag.rs | 53 +++++-- docs/src/internals.md | 14 +- 12 files changed, 292 insertions(+), 478 deletions(-) create mode 100644 consensus/src/creation/packer.rs delete mode 100644 consensus/src/runway/packer.rs diff --git a/Cargo.lock b/Cargo.lock index ec54630..b93ed13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.36.0" +version = "0.36.1" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index b2edd4d..36234e0 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.36.0" +version = "0.36.1" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index f3c4cc1..fc7dfac 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -7,24 +7,39 @@ use log::{debug, error}; use crate::{ config::Config, - creation, + creation::{self, SignedUnitWithParents}, extension::Service as Extender, handle_task_termination, reconstruction::Service as ReconstructionService, runway::{NotificationIn, NotificationOut}, - Hasher, Receiver, Round, Sender, SpawnHandle, Terminator, + Data, DataProvider, Hasher, MultiKeychain, Receiver, Round, Sender, SpawnHandle, Terminator, }; -pub(crate) async fn run( +pub struct IO> { + pub incoming_notifications: Receiver>, + pub outgoing_notifications: Sender>, + pub units_for_runway: Sender>, + pub data_provider: DP, + pub ordered_batch_tx: Sender>, + pub starting_round: oneshot::Receiver>, +} + +pub async fn run>( conf: Config, - incoming_notifications: Receiver>, - outgoing_notifications: Sender>, - ordered_batch_tx: Sender>, + io: IO, + keychain: MK, spawn_handle: impl SpawnHandle, - starting_round: oneshot::Receiver>, mut terminator: Terminator, ) { debug!(target: "AlephBFT", "{:?} Starting all services...", conf.node_ix()); + let IO { + incoming_notifications, + outgoing_notifications, + units_for_runway, + data_provider, + ordered_batch_tx, + starting_round, + } = io; let index = conf.node_ix(); @@ -41,13 +56,14 @@ pub(crate) async fn run( let creator_terminator = terminator.add_offspring_connection("creator"); let io = creation::IO { - outgoing_units: outgoing_notifications.clone(), + outgoing_units: units_for_runway, incoming_parents: parents_from_dag, + data_provider, }; let creator_handle = spawn_handle .spawn_essential( "consensus/creation", - creation::run(conf.into(), io, starting_round, creator_terminator), + creation::run(conf, io, keychain, starting_round, creator_terminator), ) .shared(); let creator_handle_for_panic = creator_handle.clone(); diff --git a/consensus/src/creation/mod.rs b/consensus/src/creation/mod.rs index c0c1fd9..24125b3 100644 --- a/consensus/src/creation/mod.rs +++ b/consensus/src/creation/mod.rs @@ -1,8 +1,7 @@ use crate::{ - config::{Config as GeneralConfig, DelaySchedule}, - runway::NotificationOut, - units::{PreUnit, Unit}, - Hasher, NodeCount, NodeIndex, Receiver, Round, Sender, Terminator, + config::Config, + units::{PreUnit, SignedUnit, Unit}, + Data, DataProvider, Hasher, MultiKeychain, Receiver, Round, Sender, Terminator, }; use futures::{ channel::{ @@ -13,41 +12,14 @@ use futures::{ }; use futures_timer::Delay; use log::{debug, error, trace, warn}; -use std::fmt::{Debug, Formatter}; mod creator; +mod packer; pub use creator::Creator; +use packer::Packer; -/// The configuration needed for the process creating new units. -#[derive(Clone)] -pub struct Config { - node_id: NodeIndex, - n_members: NodeCount, - create_lag: DelaySchedule, - max_round: Round, -} - -impl Debug for Config { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Config") - .field("node id", &self.node_id) - .field("member count", &self.n_members) - .field("max round", &self.max_round) - .finish() - } -} - -impl From for Config { - fn from(conf: GeneralConfig) -> Self { - Config { - node_id: conf.node_ix(), - n_members: conf.n_members(), - create_lag: conf.delay_config().unit_creation_delay.clone(), - max_round: conf.max_round(), - } - } -} +const LOG_TARGET: &str = "AlephBFT-creator"; enum CreatorError { OutChannelClosed(SendError), @@ -60,9 +32,12 @@ impl From> for CreatorError { } } -pub struct IO { - pub(crate) incoming_parents: Receiver>, - pub(crate) outgoing_units: Sender>, +pub type SignedUnitWithParents = (SignedUnit, Vec<::Hash>); + +pub struct IO> { + pub incoming_parents: Receiver>, + pub outgoing_units: Sender>, + pub data_provider: DP, } async fn create_unit( @@ -74,7 +49,7 @@ async fn create_unit( match creator.create_unit(round) { Ok(unit) => return Ok(unit), Err(err) => { - trace!(target: "AlephBFT-creator", "Creator unable to create a new unit at round {}: {}.", round, err) + trace!(target: LOG_TARGET, "Creator unable to create a new unit at round {}: {}.", round, err) } } process_unit(creator, incoming_parents).await?; @@ -114,7 +89,7 @@ async fn keep_processing_units_until( result? }, _ = until.fuse() => { - debug!(target: "AlephBFT-creator", "Delay passed."); + debug!(target: LOG_TARGET, "Delay passed."); }, } Ok(()) @@ -133,86 +108,97 @@ async fn keep_processing_units_until( /// /// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html /// Section 5.1 for a discussion of this component. -pub async fn run( +pub async fn run>( conf: Config, - mut io: IO, + mut io: IO, + keychain: MK, mut starting_round: oneshot::Receiver>, mut terminator: Terminator, ) { futures::select! { - _ = read_starting_round_and_run_creator(conf, &mut io, &mut starting_round).fuse() => - debug!(target: "AlephBFT-creator", "Creator is about to finish."), + _ = read_starting_round_and_run_creator(conf, &mut io, keychain, &mut starting_round).fuse() => + debug!(target: LOG_TARGET, "Creator is about to finish."), _ = terminator.get_exit().fuse() => - debug!(target: "AlephBFT-creator", "Received an exit signal."), + debug!(target: LOG_TARGET, "Received an exit signal."), } terminator.terminate_sync().await; } -async fn read_starting_round_and_run_creator( +async fn read_starting_round_and_run_creator< + H: Hasher, + D: Data, + MK: MultiKeychain, + DP: DataProvider, +>( conf: Config, - io: &mut IO, + io: &mut IO, + keychain: MK, starting_round: &mut oneshot::Receiver>, ) { let maybe_round = starting_round.await; let starting_round = match maybe_round { Ok(Some(round)) => round, Ok(None) => { - warn!(target: "AlephBFT-creator", "None starting round provided. Exiting."); + warn!(target: LOG_TARGET, "None starting round provided. Exiting."); return; } Err(e) => { - error!(target: "AlephBFT-creator", "Starting round not provided: {}", e); + error!(target: LOG_TARGET, "Starting round not provided: {}", e); return; } }; - if let Err(err) = run_creator(conf, io, starting_round).await { + if let Err(err) = run_creator(conf, io, keychain, starting_round).await { match err { CreatorError::OutChannelClosed(e) => { - warn!(target: "AlephBFT-creator", "Notification send error: {}. Exiting.", e) + warn!(target: LOG_TARGET, "Notification send error: {}. Exiting.", e) } CreatorError::ParentsChannelClosed => { - debug!(target: "AlephBFT-creator", "Incoming parent channel closed, exiting.") + debug!(target: LOG_TARGET, "Incoming parent channel closed, exiting.") } } } } -async fn run_creator( +async fn run_creator>( conf: Config, - io: &mut IO, + io: &mut IO, + keychain: MK, starting_round: Round, ) -> anyhow::Result<(), CreatorError> { - let Config { - node_id, - n_members, - create_lag, - max_round, - } = conf; + let node_id = conf.node_ix(); + let n_members = conf.n_members(); + let create_delay = conf.delay_config().unit_creation_delay.clone(); + let max_round = conf.max_round(); + let session_id = conf.session_id(); let mut creator = Creator::new(node_id, n_members); + let packer = Packer::new(keychain, session_id); let incoming_parents = &mut io.incoming_parents; let outgoing_units = &io.outgoing_units; + let data_provider = &mut io.data_provider; - debug!(target: "AlephBFT-creator", "Creator starting from round {}", starting_round); + debug!(target: LOG_TARGET, "Creator starting from round {}", starting_round); for round in starting_round..max_round { // Skip waiting if someone created a unit of a higher round. // In such a case at least 2/3 nodes created units from this round so we aren't skipping a // delay we should observe. let skip_delay = creator.current_round() > round; if !skip_delay { - let lag = Delay::new(create_lag(round.into())); + let delay = Delay::new(create_delay(round.into())); - keep_processing_units_until(&mut creator, incoming_parents, lag).await?; + keep_processing_units_until(&mut creator, incoming_parents, delay).await?; } - let (unit, parent_hashes) = create_unit(round, &mut creator, incoming_parents).await?; - - trace!(target: "AlephBFT-creator", "Created a new unit {:?} at round {:?}.", unit, round); + let (preunit, parent_hashes) = create_unit(round, &mut creator, incoming_parents).await?; + trace!(target: LOG_TARGET, "Created a new preunit {:?} at round {:?}.", preunit, round); + let data = data_provider.get_data().await; + trace!(target: LOG_TARGET, "Received data: {:?}.", data); + let unit = packer.pack(preunit, data); - outgoing_units.unbounded_send(NotificationOut::CreatedPreUnit(unit, parent_hashes))?; + outgoing_units.unbounded_send((unit, parent_hashes))?; } - warn!(target: "AlephBFT-creator", "Maximum round reached. Not creating another unit."); + warn!(target: LOG_TARGET, "Maximum round reached. Not creating another unit."); Ok(()) } diff --git a/consensus/src/creation/packer.rs b/consensus/src/creation/packer.rs new file mode 100644 index 0000000..a7ad362 --- /dev/null +++ b/consensus/src/creation/packer.rs @@ -0,0 +1,31 @@ +use crate::{ + units::{FullUnit, PreUnit, SignedUnit}, + Data, Hasher, MultiKeychain, SessionId, Signed, +}; + +/// The component responsible for packing Data into PreUnits, +/// and signing the outcome, thus creating SignedUnits that are sent back to Runway. +pub struct Packer { + keychain: MK, + session_id: SessionId, +} + +impl Packer { + pub fn new(keychain: MK, session_id: SessionId) -> Self { + Packer { + keychain, + session_id, + } + } + + pub fn pack( + &self, + preunit: PreUnit, + data: Option, + ) -> SignedUnit { + Signed::sign( + FullUnit::new(preunit, data, self.session_id), + &self.keychain, + ) + } +} diff --git a/consensus/src/member.rs b/consensus/src/member.rs index defba2b..4ff66d6 100644 --- a/consensus/src/member.rs +++ b/consensus/src/member.rs @@ -644,7 +644,7 @@ pub async fn run_session< runway::run( config_copy, runway_io, - &keychain, + keychain.clone(), spawn_copy, network_io, runway_terminator, diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 048d4e6..7374088 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -1,10 +1,12 @@ use crate::{ alerts::{Alert, ForkProof, ForkingNotification, NetworkMessage}, - consensus, handle_task_termination, + consensus, + creation::SignedUnitWithParents, + handle_task_termination, member::UnitMessage, units::{ - ControlHash, PreUnit, SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, - UnitStoreStatus, Validator, + ControlHash, SignedUnit, UncheckedSignedUnit, Unit, UnitCoord, UnitStore, UnitStoreStatus, + Validator, }, Config, Data, DataProvider, FinalizationHandler, Hasher, Index, Keychain, MultiKeychain, NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature, Signed, SpawnHandle, @@ -22,13 +24,11 @@ use log::{debug, error, info, trace, warn}; use std::{collections::HashSet, convert::TryFrom, fmt, marker::PhantomData, time::Duration}; mod collection; -mod packer; use crate::backup::{BackupLoader, BackupSaver}; #[cfg(feature = "initial_unit_collection")] use collection::{Collection, IO as CollectionIO}; pub use collection::{NewestUnitResponse, Salt}; -use packer::Packer; /// Type for incoming notifications: Runway to Consensus. #[derive(Clone, Eq, PartialEq)] @@ -43,9 +43,6 @@ pub(crate) enum NotificationIn { /// Type for outgoing notifications: Consensus to Runway. #[derive(Debug, Eq, PartialEq)] pub(crate) enum NotificationOut { - /// Notification about a preunit created by this Consensus Node. Member is meant to - /// disseminate this preunit among other nodes. - CreatedPreUnit(PreUnit, Vec), /// Notification that some units are needed but missing. The role of the Member /// is to fetch these unit (somehow). MissingUnits(Vec), @@ -141,8 +138,7 @@ where finalization_handler: FH, backup_units_for_saver: Sender>, backup_units_from_saver: Receiver>, - preunits_for_packer: Sender>, - signed_units_from_packer: Receiver>, + signed_units_from_creation: Receiver>, exiting: bool, } @@ -253,8 +249,7 @@ struct RunwayConfig, MK: MultiKey responses_for_collection: Sender>, ordered_batch_rx: Receiver>, resolved_requests: Sender>, - preunits_for_packer: Sender>, - signed_units_from_packer: Receiver>, + signed_units_from_creation: Receiver>, } impl Runway @@ -280,8 +275,7 @@ where responses_for_collection, ordered_batch_rx, resolved_requests, - preunits_for_packer, - signed_units_from_packer, + signed_units_from_creation, } = config; let store = UnitStore::new(n_members, max_round); @@ -303,8 +297,7 @@ where backup_units_for_saver, backup_units_from_saver, responses_for_collection, - preunits_for_packer, - signed_units_from_packer, + signed_units_from_creation, exiting: false, } } @@ -558,7 +551,7 @@ where } } - fn on_packed(&mut self, signed_unit: SignedUnit) { + fn on_created(&mut self, signed_unit: SignedUnit) { debug!(target: "AlephBFT-runway", "{:?} On create notification.", self.index()); self.store.add_unit(signed_unit, false); } @@ -583,12 +576,6 @@ where fn on_consensus_notification(&mut self, notification: NotificationOut) { match notification { - NotificationOut::CreatedPreUnit(pu, _) => { - if self.preunits_for_packer.unbounded_send(pu).is_err() { - warn!(target: "AlephBFT-runway", "{:?} preunits_for_packer channel should be open", self.index()); - self.exiting = true; - } - } NotificationOut::MissingUnits(coords) => { self.on_missing_coords(coords); } @@ -746,10 +733,10 @@ where } }, - signed_unit = self.signed_units_from_packer.next() => match signed_unit { - Some(signed_unit) => self.on_packed(signed_unit), + signed_unit = self.signed_units_from_creation.next() => match signed_unit { + Some((signed_unit, _)) => self.on_created(signed_unit), None => { - error!(target: "AlephBFT-runway", "{:?} Packer stream closed.", index); + error!(target: "AlephBFT-runway", "{:?} Creation stream closed.", index); break; } }, @@ -900,7 +887,7 @@ impl< pub(crate) async fn run( config: Config, runway_io: RunwayIO, - keychain: &MK, + keychain: MK, spawn_handle: SH, network_io: NetworkIO, mut terminator: Terminator, @@ -914,23 +901,38 @@ pub(crate) async fn run( MK: MultiKeychain, SH: SpawnHandle, { + let RunwayIO { + data_provider, + finalization_handler, + backup_write, + backup_read, + _phantom: _, + } = runway_io; + let (tx_consensus, consensus_stream) = mpsc::unbounded(); let (consensus_sink, rx_consensus) = mpsc::unbounded(); let (ordered_batch_tx, ordered_batch_rx) = mpsc::unbounded(); + let (signed_units_for_runway, signed_units_from_creation) = mpsc::unbounded(); let consensus_terminator = terminator.add_offspring_connection("AlephBFT-consensus"); let consensus_config = config.clone(); let consensus_spawner = spawn_handle.clone(); let (starting_round_sender, starting_round) = oneshot::channel(); + let consensus_keychain = keychain.clone(); let consensus_handle = spawn_handle.spawn_essential("runway/consensus", async move { consensus::run( consensus_config, - consensus_stream, - consensus_sink, - ordered_batch_tx, + consensus::IO { + incoming_notifications: consensus_stream, + outgoing_notifications: consensus_sink, + units_for_runway: signed_units_for_runway, + data_provider, + ordered_batch_tx, + starting_round, + }, + consensus_keychain, consensus_spawner, - starting_round, consensus_terminator, ) .await @@ -945,7 +947,7 @@ pub(crate) async fn run( let mut backup_saver = BackupSaver::new( backup_units_from_runway, backup_units_for_runway, - runway_io.backup_write, + backup_write, ); async move { backup_saver.run(backup_saver_terminator).await; @@ -989,7 +991,7 @@ pub(crate) async fn run( let backup_loading_handle = spawn_handle .spawn_essential("runway/loading", { - let mut backup_loader = BackupLoader::new(runway_io.backup_read, index, session_id); + let mut backup_loader = BackupLoader::new(backup_read, index, session_id); async move { backup_loader .run( @@ -1005,7 +1007,7 @@ pub(crate) async fn run( #[cfg(feature = "initial_unit_collection")] let starting_round_handle = match initial_unit_collection( - keychain, + &keychain, &validator, &network_io.unit_messages_for_network, unit_collections_sender, @@ -1022,14 +1024,6 @@ pub(crate) async fn run( }; pin_mut!(starting_round_handle); - let RunwayIO { - data_provider, - finalization_handler, - .. - } = runway_io; - let (preunits_for_packer, preunits_from_runway) = mpsc::unbounded(); - let (signed_units_for_runway, signed_units_from_packer) = mpsc::unbounded(); - let runway_handle = spawn_handle .spawn_essential("runway", { let runway_config = RunwayConfig { @@ -1046,8 +1040,7 @@ pub(crate) async fn run( responses_for_collection, resolved_requests: network_io.resolved_requests, max_round: config.max_round(), - preunits_for_packer, - signed_units_from_packer, + signed_units_from_creation, }; let runway_terminator = terminator.add_offspring_connection("AlephBFT-runway"); let validator = validator.clone(); @@ -1059,29 +1052,6 @@ pub(crate) async fn run( .fuse(); pin_mut!(runway_handle); - let packer_handle = spawn_handle - .spawn_essential("runway/packer", { - let packer_terminator = terminator.add_offspring_connection("AlephBFT-packer"); - let mut packer = Packer::new( - data_provider, - preunits_from_runway, - signed_units_for_runway, - keychain.clone(), - config.session_id(), - ); - - async move { - match packer.run(packer_terminator).await { - Ok(()) => (), - Err(()) => { - debug!(target: "AlephBFT-runway", "{:?} Packer task terminated abnormally", index) - } - } - } - }) - .fuse(); - pin_mut!(packer_handle); - loop { futures::select! { _ = runway_handle => { @@ -1096,10 +1066,6 @@ pub(crate) async fn run( debug!(target: "AlephBFT-runway", "{:?} Consensus task terminated early.", index); break; }, - _ = packer_handle => { - debug!(target: "AlephBFT-runway", "{:?} Packer task terminated early.", index); - break; - }, _ = backup_saver_handle => { debug!(target: "AlephBFT-runway", "{:?} Backup saving task terminated early.", index); break; @@ -1122,7 +1088,6 @@ pub(crate) async fn run( handle_task_termination(consensus_handle, "AlephBFT-runway", "Consensus", index).await; handle_task_termination(alerter_handle, "AlephBFT-runway", "Alerter", index).await; handle_task_termination(runway_handle, "AlephBFT-runway", "Runway", index).await; - handle_task_termination(packer_handle, "AlephBFT-runway", "Packer", index).await; handle_task_termination(backup_saver_handle, "AlephBFT-runway", "BackupSaver", index).await; debug!(target: "AlephBFT-runway", "{:?} Runway ended.", index); diff --git a/consensus/src/runway/packer.rs b/consensus/src/runway/packer.rs deleted file mode 100644 index fdd3d21..0000000 --- a/consensus/src/runway/packer.rs +++ /dev/null @@ -1,242 +0,0 @@ -use crate::{ - units::{FullUnit, PreUnit, SignedUnit}, - Data, DataProvider, Hasher, MultiKeychain, NodeIndex, Receiver, Sender, SessionId, Signed, - Terminator, -}; -use futures::{pin_mut, FutureExt, StreamExt}; -use log::{debug, error}; -use std::marker::PhantomData; - -/// The component responsible for packing Data from DataProvider into received PreUnits, -/// and signing the outcome, thus creating SignedUnits that are sent back to Runway. -pub struct Packer -where - H: Hasher, - D: Data, - DP: DataProvider, - MK: MultiKeychain, -{ - data_provider: DP, - preunits_from_runway: Receiver>, - signed_units_for_runway: Sender>, - keychain: MK, - session_id: SessionId, - _phantom: PhantomData, -} - -impl Packer -where - H: Hasher, - D: Data, - DP: DataProvider, - MK: MultiKeychain, -{ - pub fn new( - data_provider: DP, - preunits_from_runway: Receiver>, - signed_units_for_runway: Sender>, - keychain: MK, - session_id: SessionId, - ) -> Self { - Self { - data_provider, - preunits_from_runway, - signed_units_for_runway, - keychain, - session_id, - _phantom: PhantomData, - } - } - - fn index(&self) -> NodeIndex { - self.keychain.index() - } - - /// The main loop. - async fn pack(&mut self) { - loop { - // the order is important: first wait for a PreUnit, then ask for fresh Data - let preunit = match self.preunits_from_runway.next().await { - Some(preunit) => preunit, - None => { - error!(target: "AlephBFT-packer", "{:?} Runway PreUnit stream closed.", self.index()); - break; - } - }; - debug!(target: "AlephBFT-packer", "{:?} Received PreUnit.", self.index()); - let data = self.data_provider.get_data().await; - debug!(target: "AlephBFT-packer", "{:?} Received data.", self.index()); - let full_unit = FullUnit::new(preunit, data, self.session_id); - let signed_unit = Signed::sign(full_unit, &self.keychain); - if self - .signed_units_for_runway - .unbounded_send(signed_unit) - .is_err() - { - error!(target: "AlephBFT-packer", "{:?} Could not send SignedUnit to Runway.", self.index()); - break; - } - } - } - - /// Run the main loop until receiving a signal to exit. - pub async fn run(&mut self, mut terminator: Terminator) -> Result<(), ()> { - debug!(target: "AlephBFT-packer", "{:?} Packer started.", self.index()); - let pack = self.pack().fuse(); - pin_mut!(pack); - - futures::select! { - _ = pack => Err(()), - _ = terminator.get_exit().fuse() => { - terminator.terminate_sync().await; - Ok(()) - }, - } - } -} - -#[cfg(test)] -mod tests { - use super::Packer; - use crate::{ - units::{ControlHash, PreUnit, SignedUnit}, - NodeCount, NodeIndex, Receiver, Sender, SessionId, Terminator, - }; - use aleph_bft_mock::{Data, DataProvider, Hasher64, Keychain, StalledDataProvider}; - use aleph_bft_types::NodeMap; - use futures::{ - channel::{mpsc, oneshot}, - pin_mut, FutureExt, StreamExt, - }; - - const SESSION_ID: SessionId = 43; - const NODE_ID: NodeIndex = NodeIndex(0); - const N_MEMBERS: NodeCount = NodeCount(4); - - struct Preliminaries { - preunits_channel: Sender>, - signed_units_channel: Receiver>, - packer: Packer, - terminator: (Terminator, oneshot::Sender<()>), - preunit: PreUnit, - } - - fn prepare(keychain: Keychain) -> Preliminaries { - let data_provider = DataProvider::new(); - let (preunits_channel, preunits_from_runway) = mpsc::unbounded(); - let (signed_units_for_runway, signed_units_channel) = mpsc::unbounded(); - let packer = Packer::new( - data_provider, - preunits_from_runway, - signed_units_for_runway, - keychain, - SESSION_ID, - ); - let (_exit_tx, exit_rx) = oneshot::channel(); - let parent_map = NodeMap::with_size(N_MEMBERS); - let control_hash = ControlHash::new(&parent_map); - let terminator = Terminator::create_root(exit_rx, "AlephBFT-packer"); - let preunit = PreUnit::new(NODE_ID, 0, control_hash); - Preliminaries { - preunits_channel, - signed_units_channel, - packer, - terminator: (terminator, _exit_tx), - preunit, - } - } - - #[tokio::test] - async fn unit_packed() { - let keychain = Keychain::new(N_MEMBERS, NODE_ID); - let Preliminaries { - preunits_channel, - signed_units_channel, - mut packer, - terminator, - preunit, - .. - } = prepare(keychain); - let packer_handle = packer.run(terminator.0).fuse(); - preunits_channel - .unbounded_send(preunit.clone()) - .expect("Packer PreUnit channel closed"); - pin_mut!(packer_handle); - pin_mut!(signed_units_channel); - let unit = futures::select! { - unit = signed_units_channel.next() => match unit { - Some(unit) => unit, - None => panic!("Packer SignedUnit channel closed"), - }, - _ = packer_handle => panic!("Packer terminated early"), - } - .into_unchecked() - .into_signable(); - assert_eq!(SESSION_ID, unit.session_id()); - assert_eq!(unit.as_pre_unit(), &preunit); - } - - #[tokio::test] - async fn preunits_channel_closed() { - let keychain = Keychain::new(N_MEMBERS, NODE_ID); - let Preliminaries { - mut packer, - terminator, - .. - } = prepare(keychain); - assert_eq!(packer.run(terminator.0).await, Err(())); - } - - #[tokio::test] - async fn signed_units_channel_closed() { - let keychain = Keychain::new(N_MEMBERS, NODE_ID); - let Preliminaries { - preunits_channel, - mut packer, - terminator, - preunit, - .. - } = prepare(keychain); - preunits_channel - .unbounded_send(preunit) - .expect("Packer PreUnit channel closed"); - assert_eq!(packer.run(terminator.0).await, Err(())); - } - - #[tokio::test] - async fn handles_requests_concurrently() { - let keychain = Keychain::new(N_MEMBERS, NODE_ID); - let data_provider = StalledDataProvider::new(); - let (preunits_channel, preunits_from_runway) = mpsc::unbounded::>(); - let (signed_units_for_runway, _signed_units_channel) = mpsc::unbounded(); - let mut packer = Packer::new( - data_provider, - preunits_from_runway, - signed_units_for_runway, - keychain, - SESSION_ID, - ); - let (exit_tx, exit_rx) = oneshot::channel(); - let parent_map = NodeMap::with_size(N_MEMBERS); - let control_hash = ControlHash::new(&parent_map); - let preunit = PreUnit::new(NODE_ID, 0, control_hash); - let packer_handle = packer.run(Terminator::create_root(exit_rx, "AlephBFT-packer")); - for _ in 0..3 { - preunits_channel - .unbounded_send(preunit.clone()) - .expect("Packer PreUnit channel closed"); - } - // in spite of StalledDataProvider halting Provider's pack loop, - // we expect the component to handle the exit request immediately - exit_tx.send(()).expect("Packer exit channel closed"); - // let's send more PreUnits just to be sure that we can - for _ in 0..3 { - preunits_channel - .unbounded_send(preunit.clone()) - .expect("Packer PreUnit channel closed"); - } - packer_handle - .await - .expect("Packer terminated with an error"); - } -} diff --git a/consensus/src/testing/consensus.rs b/consensus/src/testing/consensus.rs index ee68c73..269838b 100644 --- a/consensus/src/testing/consensus.rs +++ b/consensus/src/testing/consensus.rs @@ -1,12 +1,12 @@ use crate::{ consensus, + creation::SignedUnitWithParents as GenericSignedUnitWithParents, runway::{NotificationIn, NotificationOut}, testing::{complete_oneshot, gen_config, gen_delay_config, init_log}, units::{ControlHash, PreUnit, Unit, UnitCoord}, Hasher, NodeCount, NodeIndex, NodeMap, SpawnHandle, Terminator, }; -use aleph_bft_mock::{Hasher64, Spawner}; -use codec::Encode; +use aleph_bft_mock::{Data, DataProvider, Hasher64, Keychain, Spawner}; use futures::{ channel::{ mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, @@ -23,6 +23,8 @@ use std::{ task::{Context, Poll}, }; +type SignedUnitWithParents = GenericSignedUnitWithParents; + // This struct allows to create a Hub to interconnect several instances of the Consensus engine, without // requiring the Member wrapper. The Hub notifies all connected instances about newly created units and // is able to answer unit requests as well. WrongControlHashes are not supported, which means that this @@ -30,18 +32,20 @@ use std::{ // Usage: 1) create an instance using new(n_members), 2) connect all n_members instances, 0, 1, 2, ..., n_members - 1. // 3) run the HonestHub instance as a Future. pub(crate) struct HonestHub { - n_members: usize, + n_members: NodeCount, ntfct_out_rxs: HashMap>>, ntfct_in_txs: HashMap>>, + units_from_consensus: HashMap>, units_by_coord: HashMap>, } impl HonestHub { - pub(crate) fn new(n_members: usize) -> Self { + pub(crate) fn new(n_members: NodeCount) -> Self { HonestHub { n_members, ntfct_out_rxs: HashMap::new(), ntfct_in_txs: HashMap::new(), + units_from_consensus: HashMap::new(), units_by_coord: HashMap::new(), } } @@ -52,17 +56,21 @@ impl HonestHub { ) -> ( UnboundedSender>, UnboundedReceiver>, + UnboundedSender, ) { let (tx_in, rx_in) = unbounded(); let (tx_out, rx_out) = unbounded(); + let (units_for_hub, units_from_consensus) = unbounded(); self.ntfct_in_txs.insert(node_ix, tx_in); self.ntfct_out_rxs.insert(node_ix, rx_out); - (tx_out, rx_in) + self.units_from_consensus + .insert(node_ix, units_from_consensus); + (tx_out, rx_in, units_for_hub) } fn send_to_all(&mut self, ntfct: NotificationIn) { assert!( - self.ntfct_in_txs.len() == self.n_members, + self.ntfct_in_txs.len() == self.n_members.0, "Must connect to all nodes before running the hub." ); for (_ix, tx) in self.ntfct_in_txs.iter() { @@ -80,13 +88,6 @@ impl HonestHub { fn on_notification(&mut self, node_ix: NodeIndex, ntfct: NotificationOut) { match ntfct { - NotificationOut::CreatedPreUnit(pu, _parent_hashes) => { - let hash = pu.using_encoded(Hasher64::hash); - let u = Unit::new(pu, hash); - let coord = UnitCoord::new(u.round(), u.creator()); - self.units_by_coord.insert(coord, u.clone()); - self.send_to_all(NotificationIn::NewUnits(vec![u])); - } NotificationOut::MissingUnits(coords) => { let mut response_units = Vec::new(); for coord in coords { @@ -111,13 +112,22 @@ impl HonestHub { } } } + + fn on_unit(&mut self, (unit, _): SignedUnitWithParents) { + let u = unit.into_unchecked().as_signable().unit(); + let coord = UnitCoord::new(u.round(), u.creator()); + self.units_by_coord.insert(coord, u.clone()); + self.send_to_all(NotificationIn::NewUnits(vec![u])); + } } impl Future for HonestHub { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut ready_ixs: Vec = Vec::new(); + let mut ready_unit_ixs: Vec = Vec::new(); let mut buffer = Vec::new(); + let mut unit_buffer = Vec::new(); for (ix, rx) in self.ntfct_out_rxs.iter_mut() { loop { match rx.poll_next_unpin(cx) { @@ -134,6 +144,25 @@ impl Future for HonestHub { } } } + for (ix, units_from_consensus) in self.units_from_consensus.iter_mut() { + loop { + match units_from_consensus.poll_next_unpin(cx) { + Poll::Ready(Some(unit)) => { + unit_buffer.push(unit); + } + Poll::Ready(None) => { + ready_unit_ixs.push(*ix); + break; + } + Poll::Pending => { + break; + } + } + } + } + for unit in unit_buffer { + self.on_unit(unit); + } for (ix, ntfct) in buffer { self.on_notification(ix, ntfct); } @@ -150,7 +179,7 @@ impl Future for HonestHub { #[tokio::test] async fn agree_on_first_batch() { init_log(); - let n_members: usize = 16; + let n_members = NodeCount(16); let mut hub = HonestHub::new(n_members); let mut exits = vec![]; @@ -159,23 +188,30 @@ async fn agree_on_first_batch() { let mut handles = vec![]; - for node_ix in 0..n_members { - let (tx, rx) = hub.connect(NodeIndex(node_ix)); - let conf = gen_config(NodeIndex(node_ix), n_members.into(), gen_delay_config()); + for node_ix in n_members.into_iterator() { + let (tx, rx, units_for_hub) = hub.connect(node_ix); + let conf = gen_config(node_ix, n_members, gen_delay_config()); let (exit_tx, exit_rx) = oneshot::channel(); exits.push(exit_tx); let (batch_tx, batch_rx) = unbounded(); batch_rxs.push(batch_rx); let starting_round = complete_oneshot(Some(0)); + let keychain = Keychain::new(n_members, node_ix); + let data_provider = DataProvider::new(); handles.push(spawner.spawn_essential( "consensus", consensus::run( conf, - rx, - tx, - batch_tx, + consensus::IO { + incoming_notifications: rx, + outgoing_notifications: tx, + units_for_runway: units_for_hub, + data_provider, + ordered_batch_tx: batch_tx, + starting_round, + }, + keychain, spawner, - starting_round, Terminator::create_root(exit_rx, "AlephBFT-consensus"), ), )); @@ -190,7 +226,7 @@ async fn agree_on_first_batch() { batches.push(batch); } - for node_ix in 1..n_members { + for node_ix in 1..n_members.0 { assert_eq!(batches[0], batches[node_ix]); } } @@ -198,41 +234,50 @@ async fn agree_on_first_batch() { #[tokio::test] async fn catches_wrong_control_hash() { init_log(); - let n_nodes = 4; + let n_nodes = NodeCount(4); let spawner = Spawner::new(); - let node_ix = 0; + let node_ix = NodeIndex(0); let (mut tx_in, rx_in) = unbounded(); let (tx_out, mut rx_out) = unbounded(); + let (units_for_us, _units_from_consensus) = unbounded(); - let conf = gen_config(NodeIndex(node_ix), n_nodes.into(), gen_delay_config()); + let conf = gen_config(node_ix, n_nodes, gen_delay_config()); let (exit_tx, exit_rx) = oneshot::channel(); let (batch_tx, _batch_rx) = unbounded(); let starting_round = complete_oneshot(Some(0)); + let keychain = Keychain::new(n_nodes, node_ix); + let data_provider = DataProvider::new(); let consensus_handle = spawner.spawn_essential( "consensus", consensus::run( conf, - rx_in, - tx_out, - batch_tx, + consensus::IO { + incoming_notifications: rx_in, + outgoing_notifications: tx_out, + units_for_runway: units_for_us, + data_provider, + ordered_batch_tx: batch_tx, + starting_round, + }, + keychain, spawner, - starting_round, Terminator::create_root(exit_rx, "AlephBFT-consensus"), ), ); - let empty_control_hash = ControlHash::new(&(vec![None; n_nodes]).into()); - let other_initial_units: Vec<_> = (1..n_nodes) - .map(NodeIndex) + let empty_control_hash = ControlHash::new(&(vec![None; n_nodes.0]).into()); + let other_initial_units: Vec<_> = n_nodes + .into_iterator() + .skip(1) .map(|creator| PreUnit::::new(creator, 0, empty_control_hash.clone())) .map(|pu| Unit::new(pu, rand::random())) .collect(); let _ = tx_in .send(NotificationIn::NewUnits(other_initial_units.clone())) .await; - let mut parent_hashes = NodeMap::with_size(NodeCount(n_nodes)); - for (id, unit) in other_initial_units.into_iter().enumerate() { - parent_hashes.insert(NodeIndex(id + 1), unit.hash()); + let mut parent_hashes = NodeMap::with_size(n_nodes); + for unit in other_initial_units.into_iter() { + parent_hashes.insert(unit.creator(), unit.hash()); } let bad_pu = PreUnit::::new(1.into(), 1, ControlHash::new(&parent_hashes)); let bad_control_hash: ::Hash = [0, 1, 0, 1, 0, 1, 0, 1]; diff --git a/consensus/src/testing/creation.rs b/consensus/src/testing/creation.rs index 152c43d..6ffe3c6 100644 --- a/consensus/src/testing/creation.rs +++ b/consensus/src/testing/creation.rs @@ -1,35 +1,28 @@ use crate::{ - creation::{run, IO}, - runway::NotificationOut as GenericNotificationOut, + creation::{run, SignedUnitWithParents as GenericSignedUnitWithParents, IO}, testing::{gen_config, gen_delay_config}, - units::{FullUnit as GenericFullUnit, PreUnit as GenericPreUnit, Unit as GenericUnit}, + units::Unit as GenericUnit, NodeCount, Receiver, Round, Sender, Terminator, }; -use aleph_bft_mock::{Data, Hasher64}; +use aleph_bft_mock::{Data, DataProvider, Hasher64, Keychain}; use futures::{ channel::{mpsc, oneshot}, FutureExt, StreamExt, }; -type PreUnit = GenericPreUnit; type Unit = GenericUnit; -type FullUnit = GenericFullUnit; -type NotificationOut = GenericNotificationOut; - -fn preunit_to_unit(preunit: PreUnit) -> Unit { - FullUnit::new(preunit, Some(0), 0).unit() -} +type SignedUnitWithParents = GenericSignedUnitWithParents; struct TestController { max_round_per_creator: Vec, parents_for_creators: Sender, - units_from_creators: Receiver, + units_from_creators: Receiver, } impl TestController { fn new( parents_for_creators: Sender, - units_from_creators: Receiver, + units_from_creators: Receiver, n_members: NodeCount, ) -> Self { TestController { @@ -42,22 +35,18 @@ impl TestController { async fn control_until(&mut self, max_round: Round) { let mut round_reached = 0; while round_reached < max_round { - let notification = self + let (unit, _) = self .units_from_creators .next() .await .expect("Creator output channel isn't closed."); - let preunit = match notification { - NotificationOut::CreatedPreUnit(preunit, _) => preunit, - _ => panic!("Unexpected notification from creator."), - }; - let unit = preunit_to_unit(preunit); + let unit = unit.into_unchecked().as_signable().unit(); if unit.round() > round_reached { round_reached = unit.round(); } self.max_round_per_creator[unit.creator().0] += 1; self.parents_for_creators - .unbounded_send(unit.clone()) + .unbounded_send(unit) .expect("Creator input channel isn't closed."); } } @@ -72,34 +61,37 @@ struct TestSetup { } fn setup_test(n_members: NodeCount) -> TestSetup { - let (notifications_for_controller, notifications_from_creators) = mpsc::unbounded(); + let (units_for_controller, units_from_creators) = mpsc::unbounded(); let (units_for_creators, units_from_controller) = mpsc::unbounded(); - let test_controller = - TestController::new(units_for_creators, notifications_from_creators, n_members); + let test_controller = TestController::new(units_for_creators, units_from_creators, n_members); let mut handles = Vec::new(); let mut killers = Vec::new(); let mut units_for_creators = Vec::new(); - for node_ix in 0..n_members.0 { + for node_ix in n_members.into_iterator() { let (parents_for_creator, parents_from_controller) = mpsc::unbounded(); let io = IO { incoming_parents: parents_from_controller, - outgoing_units: notifications_for_controller.clone(), + outgoing_units: units_for_controller.clone(), + data_provider: DataProvider::new(), }; - let config = gen_config(node_ix.into(), n_members, gen_delay_config()); + let config = gen_config(node_ix, n_members, gen_delay_config()); let (starting_round_for_consensus, starting_round) = oneshot::channel(); units_for_creators.push(parents_for_creator); + let keychain = Keychain::new(n_members, node_ix); + let (killer, exit) = oneshot::channel::<()>(); let handle = tokio::spawn(async move { run( - config.into(), + config, io, + keychain, starting_round, Terminator::create_root(exit, "AlephBFT-creator"), ) diff --git a/consensus/src/testing/dag.rs b/consensus/src/testing/dag.rs index 764dcc0..8de2a1c 100644 --- a/consensus/src/testing/dag.rs +++ b/consensus/src/testing/dag.rs @@ -1,11 +1,12 @@ use crate::{ consensus, - runway::{NotificationIn, NotificationOut}, + creation::SignedUnitWithParents as GenericSignedUnitWithParents, + runway::{NotificationIn as GenericNotificationIn, NotificationOut as GenericNotificationOut}, testing::{complete_oneshot, gen_config, gen_delay_config}, units::{ControlHash, PreUnit, Unit}, NodeCount, NodeIndex, NodeMap, NodeSubset, Receiver, Round, Sender, SpawnHandle, Terminator, }; -use aleph_bft_mock::{Hash64, Hasher64, Spawner}; +use aleph_bft_mock::{Data, DataProvider, Hash64, Hasher64, Keychain, Spawner}; use futures::{ channel::{mpsc, oneshot}, stream::StreamExt, @@ -16,6 +17,10 @@ use log::{debug, error, trace}; use rand::{distributions::Open01, prelude::*}; use std::{cmp, collections::HashMap, time::Duration}; +type NotificationIn = GenericNotificationIn; +type NotificationOut = GenericNotificationOut; +type SignedUnitWithParents = GenericSignedUnitWithParents; + #[derive(Clone)] struct UnitWithParents { unit: Unit, @@ -56,8 +61,9 @@ impl UnitWithParents { } struct ConsensusDagFeeder { - tx_in: Sender>, - rx_out: Receiver>, + tx_in: Sender, + rx_out: Receiver, + units_from_creator: Receiver, units: Vec, units_map: HashMap, } @@ -67,22 +73,25 @@ impl ConsensusDagFeeder { units: Vec, ) -> ( Self, - Receiver>, - Sender>, + Receiver, + Sender, + Sender, ) { let units_map = units.iter().map(|u| (u.hash(), u.clone())).collect(); let (tx_in, rx_in) = mpsc::unbounded(); let (tx_out, rx_out) = mpsc::unbounded(); + let (units_for_feeder, units_from_creator) = mpsc::unbounded(); let cdf = ConsensusDagFeeder { tx_in, rx_out, + units_from_creator, units, units_map, }; - (cdf, rx_in, tx_out) + (cdf, rx_in, tx_out, units_for_feeder) } - fn on_consensus_notification(&self, notification: NotificationOut) { + fn on_consensus_notification(&self, notification: NotificationOut) { match notification { NotificationOut::WrongControlHash(h) => { // We need to answer these requests as otherwise reconstruction cannot make progress @@ -107,7 +116,14 @@ impl ConsensusDagFeeder { } loop { - let notification = self.rx_out.next().await; + let notification = loop { + futures::select! { + notification = self.rx_out.next() => { + break notification; + }, + _ = self.units_from_creator.next() => continue, + } + }; match notification { Some(notification) => self.on_consensus_notification(notification), None => { @@ -124,8 +140,10 @@ async fn run_consensus_on_dag( n_members: NodeCount, deadline_ms: u64, ) -> Vec> { - let (feeder, rx_in, tx_out) = ConsensusDagFeeder::new(units); - let conf = gen_config(NodeIndex(0), n_members, gen_delay_config()); + let node_id = NodeIndex(0); + let (feeder, rx_in, tx_out, units_for_feeder) = ConsensusDagFeeder::new(units); + let conf = gen_config(node_id, n_members, gen_delay_config()); + let keychain = Keychain::new(n_members, node_id); let (_exit_tx, exit_rx) = oneshot::channel(); let (batch_tx, mut batch_rx) = mpsc::unbounded(); let spawner = Spawner::new(); @@ -134,11 +152,16 @@ async fn run_consensus_on_dag( "consensus", consensus::run( conf, - rx_in, - tx_out, - batch_tx, + consensus::IO { + incoming_notifications: rx_in, + outgoing_notifications: tx_out, + units_for_runway: units_for_feeder, + data_provider: DataProvider::new(), + ordered_batch_tx: batch_tx, + starting_round, + }, + keychain, spawner, - starting_round, Terminator::create_root(exit_rx, "AlephBFT-consensus"), ), ); diff --git a/docs/src/internals.md b/docs/src/internals.md index 2c2561a..9801bf5 100644 --- a/docs/src/internals.md +++ b/docs/src/internals.md @@ -2,21 +2,19 @@ To explain the inner workings of AlephBFT it is instructive to follow the path of a unit: from the very start when it is created to the moment when its round is decided and it's data is placed in one of the output batches. Here we give a brief overview and subsequently go more into details of specific components in dedicated subsections. -1. The unit is created by one of the node's `Creator` component -- implemented in `src/creation/`. Creator sends a notification to an outer component. -2. The newly created unit is filled with data, session information and a signature. This is done in `src/runway.rs` which then sends it to `src/member.rs` Subsequently a recurring task of broadcasting this unit is put in the task queue. The unit will be broadcast to all other nodes a few times (with some delays in between). -3. The unit is received by another node -- happens in `src/member.rs` and immediately send to `src/runway.rs` where it passes some validation (signature checks etc.). If all these checks pass and the unit is not detected to be a fork, then it is placed in the `UnitStore` -- the `store` field of the `Runway` struct. +1. The unit is created by one of the node's `Creator` component -- implemented in `creation/`. Creator sends the produced unit to `runway/`, which then sends it to `member.rs`. +2. A recurring task of broadcasting this unit is put in the task queue. The unit will be broadcast to all other nodes a few times (with some delays in between). +3. The unit is received by another node -- happens in `member.rs` and immediately send to `runway/` where it passes some validation (signature checks etc.). If all these checks pass and the unit is not detected to be a fork, then it is placed in the `UnitStore` -- the `store` field of the `Runway` struct. 4. The idea is that this store keeps only **legit units** in the sense defined in [the section on alerts](how_alephbft_does_it.md#25-alerts----dealing-with-fork-spam). Thus no fork is ever be put there unless coming from an alert. -5. At a suitable moment the units from the store are further moved to a component responsible for reconstructing the explicit parents for these units -- implemented in `src/reconstruction/parents.rs`. -6. Each unit whose parents are successfully decoded, is added to the "Dag". Each unit in the Dag is legit + has all its parents in the Dag. This is ensured by the implementation in `src/reconstruction/dag.rs`. -7. Dag units are passed to a component called the `Extender` -- see the files in `src/extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md). +5. At a suitable moment the units from the store are further moved to a component responsible for reconstructing the explicit parents for these units -- implemented in `reconstruction/parents.rs`. +6. Each unit whose parents are successfully decoded, is added to the "Dag". Each unit in the Dag is legit + has all its parents in the Dag. This is ensured by the implementation in `reconstruction/dag.rs`. +7. Dag units are passed to a component called the `Extender` -- see the files in `extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md). 8. Once a unit's data is placed in one of batches by the `Extender` then its path is over and can be safely discarded. ### 5.1 Creator The creator produces units according to the AlephBFT protocol rules. It will wait until the prespecified delay has passed and attempt to create a unit using a maximal number of parents. If it is not possible yet, it will wait till the first moment enough parents are available. After creating the last unit, the creator stops producing new ones, although this is never expected to happen during correct execution. -Since the creator does not have access to the `DataIO` object and to the `Keychain` it is not able to create the unit "fully", for this reason it only chooses parents, the rest is filled by the `Runway`. - ### 5.2 Unit Store in Runway As mentioned, the idea is that this stores only legit units and passes them to the reconstructing component. In case a fork is detected by a node `i`, all `i`'s units are attached to the appropriate alert.