From d7fa0eede0bf971e350f3f81cbe1c0a2afe34a08 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Thu, 16 May 2024 13:29:58 -0400 Subject: [PATCH 1/9] Adds channel for communicating recently committed certificates in bft --- node/bft/src/helpers/channels.rs | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 70c6ad617b..df89f839b6 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -65,6 +65,7 @@ pub struct BFTSender { pub tx_primary_certificate: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, pub tx_sync_bft_dag_at_bootup: mpsc::Sender>>, pub tx_sync_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, + pub tx_is_recently_committed: mpsc::Sender<(BatchCertificate, oneshot::Sender)>, } impl BFTSender { @@ -97,6 +98,16 @@ impl BFTSender { // Await the callback to continue. callback_receiver.await? } + + /// Sends the batch certificate to the BFT to receive a callback on whether the certificate was recently committed. + pub async fn send_sync_certificate_to_check_commit_bft(&self, certificate: BatchCertificate) -> Result { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the certificate to the BFT. + self.tx_is_recently_committed.send((certificate, callback_sender)).await?; + // Await the callback to continue. + Ok(callback_receiver.await?) + } } #[derive(Debug)] @@ -105,6 +116,7 @@ pub struct BFTReceiver { pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, pub rx_sync_bft_dag_at_bootup: mpsc::Receiver>>, pub rx_sync_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, + pub rx_is_recently_committed: mpsc::Receiver<(BatchCertificate, oneshot::Sender)>, } /// Initializes the BFT channels. @@ -113,9 +125,10 @@ pub fn init_bft_channels() -> (BFTSender, BFTReceiver) { let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_is_recently_committed, rx_is_recently_committed) = mpsc::channel(MAX_CHANNEL_SIZE); - let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft }; - let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft }; + let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft, tx_is_recently_committed }; + let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft, rx_is_recently_committed }; (sender, receiver) } From 396b47d9b21a92d12b5fdaabd29bac827df004a0 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Thu, 16 May 2024 13:35:09 -0400 Subject: [PATCH 2/9] Adds to bft start handlers --- node/bft/src/bft.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index fe3937e89e..a24dcb92e4 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -778,6 +778,7 @@ impl BFT { mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup, mut rx_sync_bft, + mut rx_is_recently_committed, } = bft_receiver; // Process the current round from the primary. @@ -819,6 +820,18 @@ impl BFT { callback.send(result).ok(); } }); + + // Process the request to check if the batch certificate was recently committed. + let self_ = self.clone(); + self.spawn(async move { + while let Some((certificate, callback)) = rx_is_recently_committed.recv().await { + // Check if the certificate was recently committed. + let is_committed = self_.dag.read().is_recently_committed(certificate.round(), certificate.id()); + // Send the callback **after** updating the DAG. + // Note: We must await the DAG update before proceeding. + callback.send(is_committed).ok(); + } + }); } /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must** From eee9aeede7383d41f9df586caf074cd3ca137274 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Thu, 16 May 2024 13:50:22 -0400 Subject: [PATCH 3/9] Couples replicating DAG state to advancing with sync blocks --- node/bft/src/sync/mod.rs | 59 ++++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index e5dc84731b..d5b1fbf286 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -380,23 +380,12 @@ impl Sync { }) .collect::>(); - // Iterate over the certificates. + // Sync the storage with the certificates. for certificates in subdag.values().cloned() { cfg_into_iter!(certificates.clone()).for_each(|certificate| { // Sync the batch certificate with the block. self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions); }); - - // Sync the BFT DAG with the certificates. - for certificate in certificates { - // If a BFT sender was provided, send the certificate to the BFT. - if let Some(bft_sender) = self.bft_sender.get() { - // Await the callback to continue. - if let Err(e) = bft_sender.send_sync_bft(certificate).await { - bail!("Sync - {e}"); - }; - } - } } } @@ -474,15 +463,56 @@ impl Sync { } } - // Add the blocks to the ledger. - for block in blocks_to_add { + // Sync the BFT DAG with the blocks. + for block in blocks_to_add.clone(){ // Check that the blocks are sequential and can be added to the ledger. let block_height = block.height(); if block_height != self.ledger.latest_block_height().saturating_add(1) { warn!("Skipping block {block_height} from the latest block responses - not sequential."); continue; } + if let Authority::Quorum(subdag) = block.authority() { + // Iterate over the certificates. + for certificates in subdag.values().cloned() { + // Sync the BFT DAG with the certificates. + for certificate in certificates { + // If a BFT sender was provided, send the certificate to the BFT. + if let Some(bft_sender) = self.bft_sender.get() { + // Await the callback to continue. + if let Err(e) = bft_sender.send_sync_bft(certificate).await { + bail!("Sync - {e}"); + }; + } + } + } + } + } + // Check if the leader certificate of the block has recently been committed in the replicated DAG state above. + // This ensures consistency between block sync and the BFT DAG state. + for block in blocks_to_add { + // Retrieve the block height. + let block_height = block.height(); + if let Authority::Quorum(subdag) = block.authority() { + // Retrieve the leader certificate of the subdag. + let leader_certificate = subdag.leader_certificate(); + if let Some(bft_sender) = self.bft_sender.get() { + // Await the callback to continue. + match bft_sender.send_sync_certificate_to_check_commit_bft(leader_certificate.clone()).await { + Ok(is_recently_committed) => { + if !is_recently_committed{ + bail!("Sync - Failed to advance blocks - leader certificate with author {} from round {} was not recently committed.", leader_certificate.author(), leader_certificate.round()); + } + info!("Sync - leader certificate with author {} from round {} was recently committed.", leader_certificate.author(), leader_certificate.round()); + }, + Err(e) => { + bail!("Sync - Failed to check if leader certificate was recently committed - {e}"); + } + }; + } + } + // Add the block to the ledger. + info!("Proceeding to advance to sync block at height {}. ", block_height); let self_ = self.clone(); tokio::task::spawn_blocking(move || { // Check the next block. @@ -501,6 +531,7 @@ impl Sync { // Remove the block height from the latest block responses. latest_block_responses.remove(&block_height); } + } else { debug!( "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..." From b7392543137698d954f39c3083443036e28e4a83 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Thu, 16 May 2024 14:20:54 -0400 Subject: [PATCH 4/9] Adds comment --- node/bft/src/sync/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index d5b1fbf286..9b07591901 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -463,8 +463,10 @@ impl Sync { } } - // Sync the BFT DAG with the blocks. - for block in blocks_to_add.clone(){ + // Sync the BFT DAG with the blocks. Adding the certificates for the sync blocks must occur before checking if the leader certificates have been committed. + // Note subdags can be committed by the linking rule and so checking recent commits should only occur after the root subdag, that reached availability + // threshold, was committed in the BFT. + for block in blocks_to_add.clone() { // Check that the blocks are sequential and can be added to the ledger. let block_height = block.height(); if block_height != self.ledger.latest_block_height().saturating_add(1) { From 873c5e00cf129464239ab26a8fc70db42d6bda3d Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Fri, 17 May 2024 13:30:09 -0700 Subject: [PATCH 5/9] Clippy and add minor optimizations --- node/bft/src/bft.rs | 6 ++--- node/bft/src/helpers/channels.rs | 24 ++++++++++++----- node/bft/src/sync/mod.rs | 44 +++++++++++++++++++------------- 3 files changed, 47 insertions(+), 27 deletions(-) diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index a24dcb92e4..8a950f0b67 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -821,12 +821,12 @@ impl BFT { } }); - // Process the request to check if the batch certificate was recently committed. + // Process the request to check if the batch certificate was recently committed. let self_ = self.clone(); self.spawn(async move { while let Some((certificate, callback)) = rx_is_recently_committed.recv().await { - // Check if the certificate was recently committed. - let is_committed = self_.dag.read().is_recently_committed(certificate.round(), certificate.id()); + // Check if the certificate was recently committed. + let is_committed = self_.dag.read().is_recently_committed(certificate.round(), certificate.id()); // Send the callback **after** updating the DAG. // Note: We must await the DAG update before proceeding. callback.send(is_committed).ok(); diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index df89f839b6..44827332c5 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -99,11 +99,11 @@ impl BFTSender { callback_receiver.await? } - /// Sends the batch certificate to the BFT to receive a callback on whether the certificate was recently committed. + /// Sends the batch certificate to the BFT to receive a callback on whether the certificate was recently committed. pub async fn send_sync_certificate_to_check_commit_bft(&self, certificate: BatchCertificate) -> Result { // Initialize a callback sender and receiver. let (callback_sender, callback_receiver) = oneshot::channel(); - // Send the certificate to the BFT. + // Send the certificate to the BFT. self.tx_is_recently_committed.send((certificate, callback_sender)).await?; // Await the callback to continue. Ok(callback_receiver.await?) @@ -125,10 +125,22 @@ pub fn init_bft_channels() -> (BFTSender, BFTReceiver) { let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE); - let (tx_is_recently_committed, rx_is_recently_committed) = mpsc::channel(MAX_CHANNEL_SIZE); - - let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft, tx_is_recently_committed }; - let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft, rx_is_recently_committed }; + let (tx_is_recently_committed, rx_is_recently_committed) = mpsc::channel(MAX_CHANNEL_SIZE); + + let sender = BFTSender { + tx_primary_round, + tx_primary_certificate, + tx_sync_bft_dag_at_bootup, + tx_sync_bft, + tx_is_recently_committed, + }; + let receiver = BFTReceiver { + rx_primary_round, + rx_primary_certificate, + rx_sync_bft_dag_at_bootup, + rx_sync_bft, + rx_is_recently_committed, + }; (sender, receiver) } diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 9b07591901..01edac1401 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -382,9 +382,9 @@ impl Sync { // Sync the storage with the certificates. for certificates in subdag.values().cloned() { - cfg_into_iter!(certificates.clone()).for_each(|certificate| { + cfg_into_iter!(certificates).for_each(|certificate| { // Sync the batch certificate with the block. - self.storage.sync_certificate_with_block(&block, certificate.clone(), &unconfirmed_transactions); + self.storage.sync_certificate_with_block(&block, certificate, &unconfirmed_transactions); }); } } @@ -464,9 +464,9 @@ impl Sync { } // Sync the BFT DAG with the blocks. Adding the certificates for the sync blocks must occur before checking if the leader certificates have been committed. - // Note subdags can be committed by the linking rule and so checking recent commits should only occur after the root subdag, that reached availability - // threshold, was committed in the BFT. - for block in blocks_to_add.clone() { + // Note subdags can be committed by the linking rule and so checking recent commits should only occur after the root subdag, that reached availability + // threshold, was committed in the BFT. + for block in blocks_to_add.iter() { // Check that the blocks are sequential and can be added to the ledger. let block_height = block.height(); if block_height != self.ledger.latest_block_height().saturating_add(1) { @@ -490,31 +490,40 @@ impl Sync { } } - // Check if the leader certificate of the block has recently been committed in the replicated DAG state above. - // This ensures consistency between block sync and the BFT DAG state. + // Check if the leader certificate of the block has recently been committed in the replicated DAG state above. + // This ensures consistency between block sync and the BFT DAG state. for block in blocks_to_add { // Retrieve the block height. let block_height = block.height(); if let Authority::Quorum(subdag) = block.authority() { - // Retrieve the leader certificate of the subdag. - let leader_certificate = subdag.leader_certificate(); + // Retrieve the leader certificate of the subdag. + let leader_certificate = subdag.leader_certificate(); if let Some(bft_sender) = self.bft_sender.get() { // Await the callback to continue. - match bft_sender.send_sync_certificate_to_check_commit_bft(leader_certificate.clone()).await { + match bft_sender.send_sync_certificate_to_check_commit_bft(leader_certificate.clone()).await + { Ok(is_recently_committed) => { - if !is_recently_committed{ - bail!("Sync - Failed to advance blocks - leader certificate with author {} from round {} was not recently committed.", leader_certificate.author(), leader_certificate.round()); + if !is_recently_committed { + bail!( + "Sync - Failed to advance blocks - leader certificate with author {} from round {} was not recently committed.", + leader_certificate.author(), + leader_certificate.round() + ); } - info!("Sync - leader certificate with author {} from round {} was recently committed.", leader_certificate.author(), leader_certificate.round()); - }, + info!( + "Sync - leader certificate with author {} from round {} was recently committed.", + leader_certificate.author(), + leader_certificate.round() + ); + } Err(e) => { bail!("Sync - Failed to check if leader certificate was recently committed - {e}"); } - }; + }; } } - // Add the block to the ledger. - info!("Proceeding to advance to sync block at height {}. ", block_height); + // Add the block to the ledger. + info!("Proceeding to advance to sync block at height {}. ", block_height); let self_ = self.clone(); tokio::task::spawn_blocking(move || { // Check the next block. @@ -533,7 +542,6 @@ impl Sync { // Remove the block height from the latest block responses. latest_block_responses.remove(&block_height); } - } else { debug!( "Availability threshold was not reached for block {next_block_height} at round {commit_round}. Checking next block..." From 6ddd756ab29387542403a2398bd3f709a216aff5 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Wed, 22 May 2024 13:20:50 -0400 Subject: [PATCH 6/9] Revert change to sync certificates with BFT at start --- node/bft/src/sync/mod.rs | 43 +++++++++++++--------------------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 01edac1401..901d5dba05 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -382,16 +382,28 @@ impl Sync { // Sync the storage with the certificates. for certificates in subdag.values().cloned() { - cfg_into_iter!(certificates).for_each(|certificate| { + cfg_into_iter!(certificates.clone()).for_each(|certificate| { // Sync the batch certificate with the block. self.storage.sync_certificate_with_block(&block, certificate, &unconfirmed_transactions); }); + + // Sync the BFT DAG with the blocks. + // Note subdags can be committed by the linking rule and so checking recent commits should only occur after the root subdag, that reached availability + // threshold, was committed in the BFT. + for certificate in certificates{ + // If a BFT sender was provided, send the certificate to the BFT. + if let Some(bft_sender) = self.bft_sender.get() { + // Await the callback to continue. + if let Err(e) = bft_sender.send_sync_bft(certificate).await { + bail!("Sync - {e}"); + }; + } + } } } // Fetch the latest block height. let latest_block_height = self.ledger.latest_block_height(); - // Insert the latest block response. latest_block_responses.insert(block.height(), block); // Clear the latest block responses of older blocks. @@ -463,33 +475,6 @@ impl Sync { } } - // Sync the BFT DAG with the blocks. Adding the certificates for the sync blocks must occur before checking if the leader certificates have been committed. - // Note subdags can be committed by the linking rule and so checking recent commits should only occur after the root subdag, that reached availability - // threshold, was committed in the BFT. - for block in blocks_to_add.iter() { - // Check that the blocks are sequential and can be added to the ledger. - let block_height = block.height(); - if block_height != self.ledger.latest_block_height().saturating_add(1) { - warn!("Skipping block {block_height} from the latest block responses - not sequential."); - continue; - } - if let Authority::Quorum(subdag) = block.authority() { - // Iterate over the certificates. - for certificates in subdag.values().cloned() { - // Sync the BFT DAG with the certificates. - for certificate in certificates { - // If a BFT sender was provided, send the certificate to the BFT. - if let Some(bft_sender) = self.bft_sender.get() { - // Await the callback to continue. - if let Err(e) = bft_sender.send_sync_bft(certificate).await { - bail!("Sync - {e}"); - }; - } - } - } - } - } - // Check if the leader certificate of the block has recently been committed in the replicated DAG state above. // This ensures consistency between block sync and the BFT DAG state. for block in blocks_to_add { From 9d4c85a401b1b1e53bb0f63a4e7ddf2d1cd1b547 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Wed, 22 May 2024 15:28:52 -0400 Subject: [PATCH 7/9] Coupled committing certificates in BFT with Sync --- node/bft/src/sync/mod.rs | 64 +++++++++++++++++++++++++++++----------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 901d5dba05..b28b27fb64 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -386,19 +386,6 @@ impl Sync { // Sync the batch certificate with the block. self.storage.sync_certificate_with_block(&block, certificate, &unconfirmed_transactions); }); - - // Sync the BFT DAG with the blocks. - // Note subdags can be committed by the linking rule and so checking recent commits should only occur after the root subdag, that reached availability - // threshold, was committed in the BFT. - for certificate in certificates{ - // If a BFT sender was provided, send the certificate to the BFT. - if let Some(bft_sender) = self.bft_sender.get() { - // Await the callback to continue. - if let Err(e) = bft_sender.send_sync_bft(certificate).await { - bail!("Sync - {e}"); - }; - } - } } } @@ -437,18 +424,23 @@ impl Sync { let committee_lookback = self.ledger.get_committee_lookback_for_round(commit_round)?; // Retrieve all of the certificates for the **certificate** round. let certificates = self.storage.get_certificates_for_round(certificate_round); - // Construct a set over the authors who included the leader's certificate in the certificate round. - let authors = certificates + // Construct a set over the certificates that included the leader's certificate in the certificate round. + let election_certificates: Vec<_> = certificates .iter() .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) { - true => Some(c.author()), + true => Some(c), false => None, }) .collect(); + // Construct a set over the authors who included the leader's certificate in the certificate round. + let election_certificate_authors = election_certificates + .iter() + .map(|c| c.author()) + .collect(); debug!("Validating sync block {next_block_height} at round {commit_round}..."); // Check if the leader is ready to be committed. - if committee_lookback.is_availability_threshold_reached(&authors) { + if committee_lookback.is_availability_threshold_reached(&election_certificate_authors) { // Initialize the current certificate. let mut current_certificate = leader_certificate; // Check if there are any linked blocks that need to be added. @@ -475,6 +467,44 @@ impl Sync { } } + // Sync the BFT DAG with the blocks. + // Note subdags can be committed by the linking rule and so checking recent commits should only occur after the root subdag, that reached availability + // threshold, was committed in the BFT. + for block in blocks_to_add.iter(){ + // Check that the blocks are sequential and can be added to the ledger. + let block_height = block.height(); + if block_height != self.ledger.latest_block_height().saturating_add(1) { + warn!("Skipping block {block_height} from the latest block responses - not sequential."); + continue; + } + if let Authority::Quorum(subdag) = block.authority() { + // Iterate over the certificates. + for certificates in subdag.values().cloned() { + // Sync the BFT DAG with the certificates. + for certificate in certificates { + // If a BFT sender was provided, send the certificate to the BFT. + if let Some(bft_sender) = self.bft_sender.get() { + // Await the callback to continue. + if let Err(e) = bft_sender.send_sync_bft(certificate).await { + bail!("Sync - {e}"); + }; + } + } + } + } + } + + // Sync the election certificates with the BFT DAG. + for election_certificate in election_certificates{ + // If a BFT sender was provided, send the certificate to the BFT. + if let Some(bft_sender) = self.bft_sender.get() { + // Await the callback to continue. + if let Err(e) = bft_sender.send_sync_bft(election_certificate.clone()).await { + bail!("Sync - {e}"); + }; + } + } + // Check if the leader certificate of the block has recently been committed in the replicated DAG state above. // This ensures consistency between block sync and the BFT DAG state. for block in blocks_to_add { From 690a8e3b0262ba58b4ebf8040ba424669b4072ff Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Wed, 22 May 2024 17:21:34 -0700 Subject: [PATCH 8/9] Minor nits and update comments --- node/bft/src/sync/mod.rs | 41 ++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index b28b27fb64..905ca96ebd 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -32,7 +32,13 @@ use snarkvm::{ use anyhow::{bail, Result}; use parking_lot::Mutex; use rayon::prelude::*; -use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + future::Future, + net::SocketAddr, + sync::Arc, + time::Duration, +}; use tokio::{ sync::{oneshot, Mutex as TMutex, OnceCell}, task::JoinHandle, @@ -382,7 +388,7 @@ impl Sync { // Sync the storage with the certificates. for certificates in subdag.values().cloned() { - cfg_into_iter!(certificates.clone()).for_each(|certificate| { + cfg_into_iter!(certificates).for_each(|certificate| { // Sync the batch certificate with the block. self.storage.sync_certificate_with_block(&block, certificate, &unconfirmed_transactions); }); @@ -425,22 +431,17 @@ impl Sync { // Retrieve all of the certificates for the **certificate** round. let certificates = self.storage.get_certificates_for_round(certificate_round); // Construct a set over the certificates that included the leader's certificate in the certificate round. - let election_certificates: Vec<_> = certificates + let (election_authors, election_certificates): (HashSet<_>, HashSet<_>) = certificates .iter() .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) { - true => Some(c), + true => Some((c.author(), c)), false => None, }) - .collect(); - // Construct a set over the authors who included the leader's certificate in the certificate round. - let election_certificate_authors = election_certificates - .iter() - .map(|c| c.author()) - .collect(); + .unzip(); debug!("Validating sync block {next_block_height} at round {commit_round}..."); // Check if the leader is ready to be committed. - if committee_lookback.is_availability_threshold_reached(&election_certificate_authors) { + if committee_lookback.is_availability_threshold_reached(&election_authors) { // Initialize the current certificate. let mut current_certificate = leader_certificate; // Check if there are any linked blocks that need to be added. @@ -468,9 +469,9 @@ impl Sync { } // Sync the BFT DAG with the blocks. - // Note subdags can be committed by the linking rule and so checking recent commits should only occur after the root subdag, that reached availability - // threshold, was committed in the BFT. - for block in blocks_to_add.iter(){ + // Note: Subdags are committed by the linking rule. So, it is essential to check recent commits + // only after the root subdag, which has reached the availability threshold, has been committed in the BFT. + for block in blocks_to_add.iter() { // Check that the blocks are sequential and can be added to the ledger. let block_height = block.height(); if block_height != self.ledger.latest_block_height().saturating_add(1) { @@ -493,9 +494,9 @@ impl Sync { } } } - - // Sync the election certificates with the BFT DAG. - for election_certificate in election_certificates{ + + // Sync the election certificates with the BFT DAG. This ensures that the root subdag is committed. + for election_certificate in election_certificates { // If a BFT sender was provided, send the certificate to the BFT. if let Some(bft_sender) = self.bft_sender.get() { // Await the callback to continue. @@ -505,8 +506,7 @@ impl Sync { } } - // Check if the leader certificate of the block has recently been committed in the replicated DAG state above. - // This ensures consistency between block sync and the BFT DAG state. + // Add the blocks to the ledger. for block in blocks_to_add { // Retrieve the block height. let block_height = block.height(); @@ -514,7 +514,8 @@ impl Sync { // Retrieve the leader certificate of the subdag. let leader_certificate = subdag.leader_certificate(); if let Some(bft_sender) = self.bft_sender.get() { - // Await the callback to continue. + // Check if the leader certificate of the block has recently been committed in the replicated DAG state above. + // This ensures consistency between block sync and the BFT DAG state. match bft_sender.send_sync_certificate_to_check_commit_bft(leader_certificate.clone()).await { Ok(is_recently_committed) => { From e91ef6fd20f70b05098e0519d8b1eb455d66ac9e Mon Sep 17 00:00:00 2001 From: raychu86 <14917648+raychu86@users.noreply.github.com> Date: Wed, 22 May 2024 17:33:50 -0700 Subject: [PATCH 9/9] Cleanup and optimize --- node/bft/src/bft.rs | 4 ++-- node/bft/src/helpers/channels.rs | 14 +++++++------- node/bft/src/sync/mod.rs | 16 +++++++--------- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 8a950f0b67..5849b18015 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -824,9 +824,9 @@ impl BFT { // Process the request to check if the batch certificate was recently committed. let self_ = self.clone(); self.spawn(async move { - while let Some((certificate, callback)) = rx_is_recently_committed.recv().await { + while let Some(((round, certificate_id), callback)) = rx_is_recently_committed.recv().await { // Check if the certificate was recently committed. - let is_committed = self_.dag.read().is_recently_committed(certificate.round(), certificate.id()); + let is_committed = self_.dag.read().is_recently_committed(round, certificate_id); // Send the callback **after** updating the DAG. // Note: We must await the DAG update before proceeding. callback.send(is_committed).ok(); diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 44827332c5..a7af3defbc 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -28,7 +28,7 @@ use snarkvm::{ narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID}, puzzle::{Solution, SolutionID}, }, - prelude::Result, + prelude::{Field, Result}, }; use indexmap::IndexMap; @@ -65,7 +65,7 @@ pub struct BFTSender { pub tx_primary_certificate: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, pub tx_sync_bft_dag_at_bootup: mpsc::Sender>>, pub tx_sync_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, - pub tx_is_recently_committed: mpsc::Sender<(BatchCertificate, oneshot::Sender)>, + pub tx_is_recently_committed: mpsc::Sender<((u64, Field), oneshot::Sender)>, } impl BFTSender { @@ -99,12 +99,12 @@ impl BFTSender { callback_receiver.await? } - /// Sends the batch certificate to the BFT to receive a callback on whether the certificate was recently committed. - pub async fn send_sync_certificate_to_check_commit_bft(&self, certificate: BatchCertificate) -> Result { + /// Sends the certificate round and ID to the BFT to receive a callback on whether the certificate was recently committed. + pub async fn send_sync_is_recently_committed(&self, round: u64, certificate_id: Field) -> Result { // Initialize a callback sender and receiver. let (callback_sender, callback_receiver) = oneshot::channel(); - // Send the certificate to the BFT. - self.tx_is_recently_committed.send((certificate, callback_sender)).await?; + // Send the round and certificate ID to the BFT. + self.tx_is_recently_committed.send(((round, certificate_id), callback_sender)).await?; // Await the callback to continue. Ok(callback_receiver.await?) } @@ -116,7 +116,7 @@ pub struct BFTReceiver { pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, pub rx_sync_bft_dag_at_bootup: mpsc::Receiver>>, pub rx_sync_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, - pub rx_is_recently_committed: mpsc::Receiver<(BatchCertificate, oneshot::Sender)>, + pub rx_is_recently_committed: mpsc::Receiver<((u64, Field), oneshot::Sender)>, } /// Initializes the BFT channels. diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 905ca96ebd..72f82dd823 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -513,23 +513,21 @@ impl Sync { if let Authority::Quorum(subdag) = block.authority() { // Retrieve the leader certificate of the subdag. let leader_certificate = subdag.leader_certificate(); + let leader_round = leader_certificate.round(); + let leader_author = leader_certificate.author(); + let leader_id = leader_certificate.id(); if let Some(bft_sender) = self.bft_sender.get() { // Check if the leader certificate of the block has recently been committed in the replicated DAG state above. // This ensures consistency between block sync and the BFT DAG state. - match bft_sender.send_sync_certificate_to_check_commit_bft(leader_certificate.clone()).await - { + match bft_sender.send_sync_is_recently_committed(leader_round, leader_id).await { Ok(is_recently_committed) => { if !is_recently_committed { bail!( - "Sync - Failed to advance blocks - leader certificate with author {} from round {} was not recently committed.", - leader_certificate.author(), - leader_certificate.round() + "Sync - Failed to advance blocks - leader certificate with author {leader_author} from round {leader_round} was not recently committed.", ); } - info!( - "Sync - leader certificate with author {} from round {} was recently committed.", - leader_certificate.author(), - leader_certificate.round() + debug!( + "Sync - Leader certificate with author {leader_author} from round {leader_round} was recently committed.", ); } Err(e) => {