Skip to content

Commit

Permalink
BFT-496: Use just sync::changed
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Jul 30, 2024
1 parent b3d75bf commit f566938
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
18 changes: 9 additions & 9 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ impl AttesterRunner {

let genesis = self.block_store.genesis().hash();

let mut prev = None;
// Subscribe starts as seen but we don't want to miss the first item.
self.status.mark_changed();

loop {
let batch_number =
sync::wait_for_some(ctx, &mut self.status, |s| match s.next_batch_to_attest {
next if next == prev => None,
next => next,
})
.await?;
let batch_number = match sync::changed(ctx, &mut self.status)
.await?
.next_batch_to_attest
{
Some(next_batch_number) => next_batch_number,
None => continue,
};

tracing::info!(%batch_number, "attestation status");

Expand All @@ -85,8 +87,6 @@ impl AttesterRunner {
.publish(attesters, &genesis, &self.attester.key, batch)
.await
.context("publish")?;

prev = Some(batch_number);
}
}

Expand Down
21 changes: 10 additions & 11 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub use self::batch_votes::BatchVotesPublisher;
use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats};
use fetch::RequestItem;
use std::ops::Deref;
use std::sync::{atomic::AtomicUsize, Arc};
pub(crate) use validator_addrs::*;
use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync};
Expand Down Expand Up @@ -169,20 +170,18 @@ impl Network {
let mut recv_votes = self.batch_votes.subscribe();
let mut recv_status = self.attestation_status.subscribe();

let mut prev_batch_number = None;
// Subscribe starts as seen but we don't want to miss the first item.
recv_status.mark_changed();

loop {
// Wait until the status indicates that we're ready to sign the next batch.
// This is not strictly necessary but avoids repeatedly finding the same quorum, or having to skip it until it changes.
let next_batch_number =
sync::wait_for_some(ctx, &mut recv_status, |s| match s.next_batch_to_attest {
next if next == prev_batch_number => None,
next => next,
})
.await?;

// Next time we'll look for something new.
prev_batch_number = Some(next_batch_number);
let next_batch_number = match sync::changed(ctx, &mut recv_status)
.await?
.next_batch_to_attest
{
Some(next_batch_number) => next_batch_number,
None => continue,
};

// Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart.
self.batch_votes
Expand Down

0 comments on commit f566938

Please sign in to comment.