Skip to content

Commit

Permalink
A0-1672: units returned by run_session (#439)
Browse files Browse the repository at this point in the history
* - added UnitFinalizationHandler: returns unit batches instead of just Data
- added OrderedUnit: unit api for the UnitFinalizationHandler

* updated docs for the new UnitFinalizationHandler

* impl From<ReconstructedUnit<Signed<FullUnit<...>>> for OrderedUnit

* bumped version of mock after adding UnitFinalizationHandler

* simplified constraints in extension/mod.rs using UnitFinalizationHandler's associated types (Data, Hasher)

* `Hasher` constraint in run_session sub by `UnitFinalizationHandler::Hasher`
  • Loading branch information
fixxxedpoint authored May 7, 2024
1 parent 2d6b03a commit 2ebeb30
Show file tree
Hide file tree
Showing 24 changed files with 300 additions and 192 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details].
- Import AlephBFT in your crate
```toml
[dependencies]
aleph-bft = "^0.36"
aleph-bft = "^0.37"
```
- The main entry point is the `run_session` function, which returns a Future that runs the
consensus algorithm.
Expand Down
4 changes: 2 additions & 2 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aleph-bft"
version = "0.36.5"
version = "0.37.0"
edition = "2021"
authors = ["Cardinal Cryptography"]
categories = ["algorithms", "data-structures", "cryptography", "database"]
Expand All @@ -14,7 +14,7 @@ description = "AlephBFT is an asynchronous and Byzantine fault tolerant consensu

[dependencies]
aleph-bft-rmc = { path = "../rmc", version = "0.13" }
aleph-bft-types = { path = "../types", version = "0.13" }
aleph-bft-types = { path = "../types", version = "0.14" }
anyhow = "1.0"
async-trait = "0.1"
codec = { package = "parity-scale-codec", version = "3.0", default-features = false, features = ["derive"] }
Expand Down
23 changes: 9 additions & 14 deletions consensus/src/creation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
config::Config,
units::{PreUnit, SignedUnit, Unit},
Data, DataProvider, MultiKeychain, Receiver, Round, Sender, Terminator,
DataProvider, MultiKeychain, Receiver, Round, Sender, Terminator,
};
use futures::{
channel::{
Expand Down Expand Up @@ -32,9 +32,9 @@ impl<T> From<TrySendError<T>> for CreatorError {
}
}

pub struct IO<U: Unit, D: Data, MK: MultiKeychain, DP: DataProvider<D>> {
pub struct IO<U: Unit, MK: MultiKeychain, DP: DataProvider> {
pub incoming_parents: Receiver<U>,
pub outgoing_units: Sender<SignedUnit<U::Hasher, D, MK>>,
pub outgoing_units: Sender<SignedUnit<U::Hasher, DP::Output, MK>>,
pub data_provider: DP,
}

Expand Down Expand Up @@ -106,9 +106,9 @@ async fn keep_processing_units_until<U: Unit>(
///
/// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html
/// Section 5.1 for a discussion of this component.
pub async fn run<U: Unit, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
pub async fn run<U: Unit, MK: MultiKeychain, DP: DataProvider>(
conf: Config,
mut io: IO<U, D, MK, DP>,
mut io: IO<U, MK, DP>,
keychain: MK,
mut starting_round: oneshot::Receiver<Option<Round>>,
mut terminator: Terminator,
Expand All @@ -123,14 +123,9 @@ pub async fn run<U: Unit, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
terminator.terminate_sync().await;
}

async fn read_starting_round_and_run_creator<
U: Unit,
D: Data,
MK: MultiKeychain,
DP: DataProvider<D>,
>(
async fn read_starting_round_and_run_creator<U: Unit, MK: MultiKeychain, DP: DataProvider>(
conf: Config,
io: &mut IO<U, D, MK, DP>,
io: &mut IO<U, MK, DP>,
keychain: MK,
starting_round: &mut oneshot::Receiver<Option<Round>>,
) {
Expand Down Expand Up @@ -159,9 +154,9 @@ async fn read_starting_round_and_run_creator<
}
}

async fn run_creator<U: Unit, D: Data, MK: MultiKeychain, DP: DataProvider<D>>(
async fn run_creator<U: Unit, MK: MultiKeychain, DP: DataProvider>(
conf: Config,
io: &mut IO<U, D, MK, DP>,
io: &mut IO<U, MK, DP>,
keychain: MK,
starting_round: Round,
) -> anyhow::Result<(), CreatorError> {
Expand Down
31 changes: 30 additions & 1 deletion consensus/src/dag/reconstruction/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::collections::HashMap;

use crate::{
units::{ControlHash, HashFor, Unit, UnitCoord, UnitWithParents, WrappedUnit},
units::{ControlHash, FullUnit, HashFor, Unit, UnitCoord, UnitWithParents, WrappedUnit},
Hasher, NodeMap, SessionId,
};

mod dag;
mod parents;

use aleph_bft_types::{Data, MultiKeychain, OrderedUnit, Signed};
use dag::Dag;
use parents::Reconstruction as ParentReconstruction;

Expand Down Expand Up @@ -76,6 +77,34 @@ impl<U: Unit> UnitWithParents for ReconstructedUnit<U> {
}
}

impl<D: Data, H: Hasher, K: MultiKeychain> From<ReconstructedUnit<Signed<FullUnit<H, D>, K>>>
for Option<D>
{
fn from(value: ReconstructedUnit<Signed<FullUnit<H, D>, K>>) -> Self {
value.unpack().into_signable().into()
}
}

impl<D: Data, H: Hasher, K: MultiKeychain> From<ReconstructedUnit<Signed<FullUnit<H, D>, K>>>
for OrderedUnit<D, H>
{
fn from(unit: ReconstructedUnit<Signed<FullUnit<H, D>, K>>) -> Self {
let parents = unit.parents().values().cloned().collect();
let unit = unit.unpack();
let creator = unit.creator();
let round = unit.round();
let hash = unit.hash();
let data = unit.into_signable().data().clone();
OrderedUnit {
parents,
creator,
round,
hash,
data,
}
}
}

/// What we need to request to reconstruct units.
#[derive(Debug, PartialEq, Eq)]
pub enum Request<H: Hasher> {
Expand Down
33 changes: 10 additions & 23 deletions consensus/src/extension/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
use crate::{
dag::DagUnit,
units::{Unit, WrappedUnit},
Data, FinalizationHandler, Hasher, MultiKeychain,
};
use crate::{dag::DagUnit, MultiKeychain};

mod election;
mod extender;
mod units;

use aleph_bft_types::UnitFinalizationHandler;
use extender::Extender;

/// A struct responsible for executing the Consensus protocol on a local copy of the Dag.
Expand All @@ -19,34 +16,24 @@ use extender::Extender;
///
/// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html
/// Section 5.4 for a discussion of this component.
pub struct Ordering<H: Hasher, D: Data, MK: MultiKeychain, FH: FinalizationHandler<D>> {
extender: Extender<DagUnit<H, D, MK>>,
finalization_handler: FH,
pub struct Ordering<MK: MultiKeychain, UFH: UnitFinalizationHandler> {
extender: Extender<DagUnit<UFH::Hasher, UFH::Data, MK>>,
finalization_handler: UFH,
}

impl<H: Hasher, D: Data, MK: MultiKeychain, FH: FinalizationHandler<D>> Ordering<H, D, MK, FH> {
pub fn new(finalization_handler: FH) -> Self {
impl<MK: MultiKeychain, UFH: UnitFinalizationHandler> Ordering<MK, UFH> {
pub fn new(finalization_handler: UFH) -> Self {
let extender = Extender::new();
Ordering {
extender,
finalization_handler,
}
}

fn handle_batch(&mut self, batch: Vec<DagUnit<H, D, MK>>) {
for unit in batch {
let unit = unit.unpack();
self.finalization_handler.unit_finalized(
unit.creator(),
unit.round(),
unit.as_signable().data().clone(),
)
}
}

pub fn add_unit(&mut self, unit: DagUnit<H, D, MK>) {
pub fn add_unit(&mut self, unit: DagUnit<UFH::Hasher, UFH::Data, MK>) {
for batch in self.extender.add_unit(unit) {
self.handle_batch(batch);
self.finalization_handler
.batch_finalized(batch.into_iter().map(|unit| unit.into()).collect());
}
}
}
4 changes: 2 additions & 2 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ mod task_queue;
mod testing;

pub use aleph_bft_types::{
Data, DataProvider, FinalizationHandler, Hasher, IncompleteMultisignatureError, Index, Indexed,
Keychain, MultiKeychain, Multisigned, Network, NodeCount, NodeIndex, NodeMap, NodeSubset,
Data, DataProvider, Hasher, IncompleteMultisignatureError, Index, Indexed, Keychain,
MultiKeychain, Multisigned, Network, NodeCount, NodeIndex, NodeMap, NodeSubset,
PartialMultisignature, PartiallyMultisigned, Recipient, Round, SessionId, Signable, Signature,
SignatureError, SignatureSet, Signed, SpawnHandle, TaskHandle, UncheckedSigned,
};
Expand Down
97 changes: 74 additions & 23 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use crate::{
},
task_queue::TaskQueue,
units::{UncheckedSignedUnit, Unit, UnitCoord},
Config, Data, DataProvider, FinalizationHandler, Hasher, MultiKeychain, Network, NodeIndex,
Receiver, Recipient, Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned,
Config, Data, DataProvider, Hasher, MultiKeychain, Network, NodeIndex, Receiver, Recipient,
Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned,
};
use aleph_bft_types::NodeMap;
use aleph_bft_types::{FinalizationHandler, NodeMap, OrderedUnit, UnitFinalizationHandler};
use codec::{Decode, Encode};
use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt};
use futures_timer::Delay;
Expand Down Expand Up @@ -106,36 +106,81 @@ enum TaskDetails<H: Hasher, D: Data, S: Signature> {
},
}

/// This adapter allows to map an implementation of [`FinalizationHandler`] onto implementation of [`UnitFinalizationHandler`].
pub struct FinalizationHandlerAdapter<FH, D, H> {
finalization_handler: FH,
_phantom: PhantomData<(D, H)>,
}

impl<FH, D, H> From<FH> for FinalizationHandlerAdapter<FH, D, H> {
fn from(value: FH) -> Self {
Self {
finalization_handler: value,
_phantom: PhantomData,
}
}
}

impl<D: Data, H: Hasher, FH: FinalizationHandler<D>> UnitFinalizationHandler
for FinalizationHandlerAdapter<FH, D, H>
{
type Data = D;
type Hasher = H;

fn batch_finalized(&mut self, batch: Vec<OrderedUnit<Self::Data, Self::Hasher>>) {
for unit in batch {
if let Some(data) = unit.data {
self.finalization_handler.data_finalized(data)
}
}
}
}

#[derive(Clone)]
pub struct LocalIO<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: AsyncWrite,
UL: AsyncRead,
> {
pub struct LocalIO<DP: DataProvider, UFH: UnitFinalizationHandler, US: AsyncWrite, UL: AsyncRead> {
data_provider: DP,
finalization_handler: FH,
finalization_handler: UFH,
unit_saver: US,
unit_loader: UL,
_phantom: PhantomData<D>,
}

impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: AsyncWrite, UL: AsyncRead>
LocalIO<D, DP, FH, US, UL>
impl<
H: Hasher,
DP: DataProvider,
FH: FinalizationHandler<DP::Output>,
US: AsyncWrite,
UL: AsyncRead,
> LocalIO<DP, FinalizationHandlerAdapter<FH, DP::Output, H>, US, UL>
{
pub fn new(
data_provider: DP,
finalization_handler: FH,
unit_saver: US,
unit_loader: UL,
) -> LocalIO<D, DP, FH, US, UL> {
LocalIO {
) -> Self {
Self {
data_provider,
finalization_handler: finalization_handler.into(),
unit_saver,
unit_loader,
}
}
}

impl<DP: DataProvider, UFH: UnitFinalizationHandler, US: AsyncWrite, UL: AsyncRead>
LocalIO<DP, UFH, US, UL>
{
pub fn new_with_unit_finalization_handler(
data_provider: DP,
finalization_handler: UFH,
unit_saver: US,
unit_loader: UL,
) -> Self {
Self {
data_provider,
finalization_handler,
unit_saver,
unit_loader,
_phantom: PhantomData,
}
}
}
Expand Down Expand Up @@ -573,19 +618,25 @@ where
/// For a detailed description of the consensus implemented by `run_session` see
/// [docs for devs](https://cardinal-cryptography.github.io/AlephBFT/index.html)
/// or the [original paper](https://arxiv.org/abs/1908.05156).
///
/// Please note that in order to fulfill the constraint [`UnitFinalizationHandler<Data = DP::Output, Hasher
/// = H>`] it is enough to provide implementation of [`FinalizationHandler<DP::Output>`]. We provide
/// implementation of [`UnitFinalizationHandler<Data = DP::Output, Hasher = H>`] for anything that satisfies
/// the trait [`FinalizationHandler<DP::Output>`] (by means of [`FinalizationHandlerAdapter`]). Implementing
/// [`UnitFinalizationHandler`] directly is considered less stable since it exposes intrisics which might be
/// subject to change. Implement [`FinalizationHandler<DP::Output>`] instead, unless you absolutely know
/// what you are doing.
pub async fn run_session<
H: Hasher,
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
DP: DataProvider,
UFH: UnitFinalizationHandler<Data = DP::Output>,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
N: Network<NetworkData<H, D, MK::Signature, MK::PartialMultisignature>> + 'static,
N: Network<NetworkData<UFH::Hasher, DP::Output, MK::Signature, MK::PartialMultisignature>>,
SH: SpawnHandle,
MK: MultiKeychain,
>(
config: Config,
local_io: LocalIO<D, DP, FH, US, UL>,
local_io: LocalIO<DP, UFH, US, UL>,
network: N,
keychain: MK,
spawn_handle: SH,
Expand Down
Loading

0 comments on commit 2ebeb30

Please sign in to comment.