From 3a2bafc33a5165411c39096c28af274d1e0524f9 Mon Sep 17 00:00:00 2001 From: timorleph <145755355+timorleph@users.noreply.github.com> Date: Fri, 12 Apr 2024 09:01:35 +0200 Subject: [PATCH] A0-4155: Unit saving pipeline (#432) * Unit saving pipeline * Do not save parents after all In particular this means that this doesn't change the API, so only patch version bump needed. --- Cargo.lock | 2 +- consensus/Cargo.toml | 2 +- consensus/src/backup/saver.rs | 62 ++++++++++++++----------- consensus/src/dag/mod.rs | 24 ++++------ consensus/src/runway/mod.rs | 41 ++++++++-------- consensus/src/testing/crash_recovery.rs | 2 +- consensus/src/units/mod.rs | 5 +- consensus/src/units/testing.rs | 16 ++++++- 8 files changed, 84 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e4909d6..d23cb39e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,7 +28,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.36.3" +version = "0.36.4" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 01645bac..5bd5a906 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.36.3" +version = "0.36.4" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/backup/saver.rs b/consensus/src/backup/saver.rs index 7e37fefd..252c0f9b 100644 --- a/consensus/src/backup/saver.rs +++ b/consensus/src/backup/saver.rs @@ -1,6 +1,10 @@ use std::pin::Pin; -use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator}; +use crate::{ + dag::DagUnit, + units::{UncheckedSignedUnit, WrappedUnit}, + Data, Hasher, MultiKeychain, Receiver, Sender, Terminator, +}; use codec::Encode; use futures::{AsyncWrite, AsyncWriteExt, FutureExt, StreamExt}; use log::{debug, error}; @@ -10,18 +14,18 @@ const LOG_TARGET: &str = "AlephBFT-backup-saver"; /// Component responsible for saving units into backup. /// It waits for items to appear on its receivers, and writes them to backup. /// It announces a successful write through an appropriate response sender. -pub struct BackupSaver { - units_from_runway: Receiver>, - responses_for_runway: Sender>, +pub struct BackupSaver { + units_from_runway: Receiver>, + responses_for_runway: Sender>, backup: Pin>, } -impl BackupSaver { +impl BackupSaver { pub fn new( - units_from_runway: Receiver>, - responses_for_runway: Sender>, + units_from_runway: Receiver>, + responses_for_runway: Sender>, backup: W, - ) -> BackupSaver { + ) -> BackupSaver { BackupSaver { units_from_runway, responses_for_runway, @@ -29,11 +33,9 @@ impl BackupSaver { } } - pub async fn save_item( - &mut self, - item: &UncheckedSignedUnit, - ) -> Result<(), std::io::Error> { - self.backup.write_all(&item.encode()).await?; + pub async fn save_unit(&mut self, unit: &DagUnit) -> Result<(), std::io::Error> { + let unit: UncheckedSignedUnit<_, _, _> = unit.clone().unpack().into(); + self.backup.write_all(&unit.encode()).await?; self.backup.flush().await } @@ -49,7 +51,7 @@ impl BackupSaver { break; }, }; - if let Err(e) = self.save_item(&item).await { + if let Err(e) = self.save_unit(&item).await { error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); break; } @@ -80,16 +82,17 @@ mod tests { StreamExt, }; - use aleph_bft_mock::{Data, Hasher64, Keychain, Saver, Signature}; + use aleph_bft_mock::{Data, Hasher64, Keychain, Saver}; use crate::{ backup::BackupSaver, - units::{creator_set, preunit_to_unchecked_signed_unit, UncheckedSignedUnit}, - NodeCount, NodeIndex, Terminator, + dag::ReconstructedUnit, + units::{creator_set, preunit_to_signed_unit, TestingSignedUnit}, + NodeCount, Terminator, }; - type TestBackupSaver = BackupSaver; - type TestUnit = UncheckedSignedUnit; + type TestUnit = ReconstructedUnit; + type TestBackupSaver = BackupSaver; struct PrepareSaverResponse { task: F, units_for_saver: mpsc::UnboundedSender, @@ -122,6 +125,7 @@ mod tests { #[tokio::test] async fn test_proper_relative_responses_ordering() { + let node_count = NodeCount(5); let PrepareSaverResponse { task, units_for_saver, @@ -133,17 +137,19 @@ mod tests { task.await; }); - let creators = creator_set(NodeCount(5)); - let keychains: Vec<_> = (0..5) - .map(|id| Keychain::new(NodeCount(5), NodeIndex(id))) + let creators = creator_set(node_count); + let keychains: Vec<_> = node_count + .into_iterator() + .map(|id| Keychain::new(node_count, id)) .collect(); - let units: Vec = (0..5) - .map(|k| { - preunit_to_unchecked_signed_unit( - creators[k].create_unit(0).unwrap(), + let units: Vec = node_count + .into_iterator() + .map(|id| { + ReconstructedUnit::initial(preunit_to_signed_unit( + creators[id.0].create_unit(0).unwrap(), 0, - &keychains[k], - ) + &keychains[id.0], + )) }) .collect(); diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index d97a89e5..106c1c9c 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -20,10 +20,12 @@ use validation::{Error as ValidationError, Validator}; const LOG_TARGET: &str = "AlephBFT-dag"; +pub type DagUnit = ReconstructedUnit>; + /// The result of sending some information to the Dag. pub struct DagResult { /// Units added to the dag. - pub units: Vec>>, + pub units: Vec>, /// Requests for more information. pub requests: Vec>, /// Alerts raised due to encountered forks. @@ -114,25 +116,16 @@ impl Dag { } } - fn handle_result(&mut self, result: &DagResult) { - // just clean the validator cache of units that we are returning - for unit in &result.units { - self.validator.finished_processing(&unit.hash()); - } - } - /// Add a unit to the Dag. pub fn add_unit>>( &mut self, unit: UncheckedSignedUnit, store: &UnitStore, ) -> DagResult { - let result = match self.validator.validate(unit, store) { + match self.validator.validate(unit, store) { Ok(unit) => self.reconstruction.add_unit(unit).into(), Err(e) => Self::handle_validation_error(e), - }; - self.handle_result(&result); - result + } } /// Add parents of a unit to the Dag. @@ -180,7 +173,6 @@ impl Dag { .add_parents(unit_hash, parent_hashes) .into(), ); - self.handle_result(&result); result } @@ -208,10 +200,14 @@ impl Dag { } } } - self.handle_result(&result); result } + /// Notify the dag that a unit has finished processing and can be cleared from the cache. + pub fn finished_processing(&mut self, hash: &H::Hash) { + self.validator.finished_processing(hash); + } + pub fn status(&self) -> DagStatus { self.validator.status() } diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 13e6a49c..b21d3e2c 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -1,7 +1,7 @@ use crate::{ alerts::{Alert, ForkingNotification, NetworkMessage}, creation, - dag::{Dag, DagResult, DagStatus, ReconstructedUnit, Request as ReconstructionRequest}, + dag::{Dag, DagResult, DagStatus, DagUnit, Request as ReconstructionRequest}, extension::{ExtenderUnit, Service as Extender}, handle_task_termination, member::UnitMessage, @@ -109,7 +109,7 @@ where { missing_coords: HashSet, missing_parents: HashSet, - store: UnitStore>>, + store: UnitStore>, keychain: MK, dag: Dag, alerts_for_alerter: Sender>, @@ -118,12 +118,12 @@ where unit_messages_for_network: Sender>, responses_for_collection: Sender>, resolved_requests: Sender>, - parents_for_creator: Sender>>, + parents_for_creator: Sender>, ordered_batch_rx: Receiver>, finalization_handler: FH, - backup_units_for_saver: Sender>, + backup_units_for_saver: Sender>, units_for_extender: Sender>, - backup_units_from_saver: Receiver>, + backup_units_from_saver: Receiver>, new_units_from_creation: Receiver>, exiting: bool, } @@ -210,15 +210,15 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> { struct RunwayConfig, MK: MultiKeychain> { finalization_handler: FH, - backup_units_for_saver: Sender>, + backup_units_for_saver: Sender>, units_for_extender: Sender>, - backup_units_from_saver: Receiver>, + backup_units_from_saver: Receiver>, alerts_for_alerter: Sender>, notifications_from_alerter: Receiver>, unit_messages_from_network: Receiver>, unit_messages_for_network: Sender>, responses_for_collection: Sender>, - parents_for_creator: Sender>>, + parents_for_creator: Sender>, ordered_batch_rx: Receiver>, resolved_requests: Sender>, new_units_from_creation: Receiver>, @@ -455,10 +455,18 @@ where } } - fn on_unit_reconstructed(&mut self, unit: ReconstructedUnit>) { + fn on_unit_reconstructed(&mut self, unit: DagUnit) { let unit_hash = unit.hash(); trace!(target: "AlephBFT-runway", "Unit {:?} {} reconstructed.", unit_hash, unit.coord()); + if self.backup_units_for_saver.unbounded_send(unit).is_err() { + error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash); + } + } + + fn on_unit_backup_saved(&mut self, unit: DagUnit) { + let unit_hash = unit.hash(); self.store.insert(unit.clone()); + self.dag.finished_processing(&unit_hash); self.resolve_missing_parents(&unit_hash); self.resolve_missing_coord(&unit.coord()); if self @@ -477,21 +485,12 @@ where warn!(target: "AlephBFT-runway", "Creator channel should be open."); self.exiting = true; } - if self - .backup_units_for_saver - .unbounded_send(unit.unpack().into()) - .is_err() - { - error!(target: "AlephBFT-runway", "{:?} A unit couldn't be sent to backup: {:?}.", self.index(), unit_hash); - } - } - - fn on_unit_backup_saved(&mut self, unit: UncheckedSignedUnit) { - self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone())); + let unit = unit.unpack(); + self.send_message_for_network(RunwayNotificationOut::NewAnyUnit(unit.clone().into())); if unit.as_signable().creator() == self.index() { trace!(target: "AlephBFT-runway", "{:?} Sending a unit {:?}.", self.index(), unit.as_signable().hash()); - self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit)); + self.send_message_for_network(RunwayNotificationOut::NewSelfUnit(unit.into())); } } diff --git a/consensus/src/testing/crash_recovery.rs b/consensus/src/testing/crash_recovery.rs index 9c531dab..89bf8815 100644 --- a/consensus/src/testing/crash_recovery.rs +++ b/consensus/src/testing/crash_recovery.rs @@ -129,7 +129,7 @@ fn verify_backup(buf: &mut &[u8]) -> HashSet { let mut already_saved = HashSet::new(); while !buf.is_empty() { - let unit = UncheckedSignedUnit::::decode(buf).unwrap(); + let unit = >::decode(buf).unwrap(); let full_unit = unit.as_signable(); let coord = full_unit.coord(); let parent_ids = &full_unit.as_pre_unit().control_hash().parents_mask; diff --git a/consensus/src/units/mod.rs b/consensus/src/units/mod.rs index ba811187..218d2248 100644 --- a/consensus/src/units/mod.rs +++ b/consensus/src/units/mod.rs @@ -16,8 +16,9 @@ pub(crate) use store::*; #[cfg(test)] pub use testing::{ create_preunits, creator_set, full_unit_to_unchecked_signed_unit, preunit_to_full_unit, - preunit_to_unchecked_signed_unit, random_full_parent_units_up_to, random_unit_with_parents, - FullUnit as TestingFullUnit, SignedUnit as TestingSignedUnit, WrappedSignedUnit, + preunit_to_signed_unit, preunit_to_unchecked_signed_unit, random_full_parent_units_up_to, + random_unit_with_parents, FullUnit as TestingFullUnit, SignedUnit as TestingSignedUnit, + WrappedSignedUnit, }; pub use validator::{ValidationError, Validator}; diff --git a/consensus/src/units/testing.rs b/consensus/src/units/testing.rs index 99fa56b5..3aafe119 100644 --- a/consensus/src/units/testing.rs +++ b/consensus/src/units/testing.rs @@ -70,11 +70,23 @@ impl Creator { } } +pub fn full_unit_to_signed_unit(full_unit: FullUnit, keychain: &Keychain) -> SignedUnit { + Signed::sign(full_unit, keychain) +} + +pub fn preunit_to_signed_unit( + pu: PreUnit, + session_id: SessionId, + keychain: &Keychain, +) -> SignedUnit { + full_unit_to_signed_unit(preunit_to_full_unit(pu, session_id), keychain) +} + pub fn full_unit_to_unchecked_signed_unit( full_unit: FullUnit, keychain: &Keychain, ) -> UncheckedSignedUnit { - Signed::sign(full_unit, keychain).into() + full_unit_to_signed_unit(full_unit, keychain).into() } pub fn preunit_to_unchecked_signed_unit( @@ -82,7 +94,7 @@ pub fn preunit_to_unchecked_signed_unit( session_id: SessionId, keychain: &Keychain, ) -> UncheckedSignedUnit { - full_unit_to_unchecked_signed_unit(preunit_to_full_unit(pu, session_id), keychain) + preunit_to_signed_unit(pu, session_id, keychain).into() } fn initial_preunit(n_members: NodeCount, node_id: NodeIndex) -> PreUnit {