From 4a1ba260dd87a8090fa8dd5a9007aa0cecf7eab2 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 6 Aug 2024 20:21:14 +1000 Subject: [PATCH 1/5] cover code with spans --- node/actors/bft/src/lib.rs | 28 +++- node/actors/executor/src/attestation.rs | 61 +++++++-- .../network/src/consensus/handshake/mod.rs | 2 + node/actors/network/src/consensus/mod.rs | 50 ++++--- node/actors/network/src/gossip/batch_votes.rs | 2 + node/actors/network/src/gossip/mod.rs | 124 +++++++++++++----- node/actors/network/src/gossip/runner.rs | 4 +- node/actors/network/src/lib.rs | 3 +- node/libs/storage/src/batch_store/mod.rs | 67 ++++++++-- node/libs/storage/src/block_store/mod.rs | 68 ++++++++-- node/libs/storage/src/testonly/in_memory.rs | 1 + node/tools/src/store.rs | 2 +- 12 files changed, 306 insertions(+), 106 deletions(-) diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index c677735a..3af76fb7 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -19,7 +19,8 @@ use crate::io::{InputMessage, OutputMessage}; use anyhow::Context; pub use config::Config; use std::sync::Arc; -use zksync_concurrency::{ctx, error::Wrap as _, oneshot, scope}; +use tracing::Instrument; +use zksync_concurrency::{ctx, error::Wrap as _, oneshot, scope, sync}; use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; use zksync_consensus_utils::pipe::ActorPipe; @@ -56,6 +57,8 @@ pub trait PayloadManager: std::fmt::Debug + Send + Sync { /// Channel through which bft actor sends network messages. pub(crate) type OutputSender = ctx::channel::UnboundedSender; +/// Channel through which bft actor receives network messages. +pub(crate) type InputReceiver = ctx::channel::UnboundedReceiver; impl Config { /// Starts the bft actor. It will start running, processing incoming messages and @@ -87,10 +90,17 @@ impl Config { tracing::info!("Starting consensus actor {:?}", cfg.secret_key.public()); - // This is the infinite loop where the consensus actually runs. The validator waits for either - // a message from the network or for a timeout, and processes each accordingly. - loop { - let InputMessage::Network(req) = pipe.recv.recv(ctx).await?; + #[tracing::instrument(skip_all)] + async fn bft_iter( + ctx: &ctx::Ctx, + pipe_recv: &mut InputReceiver, + leader_send: &sync::prunable_mpsc::Sender, + replica_send: &sync::prunable_mpsc::Sender, + ) -> anyhow::Result<()> { + let InputMessage::Network(req) = pipe_recv + .recv(ctx) + .instrument(tracing::info_span!("wait_for_message")) + .await?; use validator::ConsensusMsg as M; match &req.msg.msg { M::ReplicaPrepare(_) => { @@ -108,6 +118,14 @@ impl Config { M::ReplicaCommit(_) => leader_send.send(req), M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req), } + + Ok(()) + } + + // This is the infinite loop where the consensus actually runs. The validator waits for either + // a message from the network or for a timeout, and processes each accordingly. + loop { + bft_iter(ctx, &mut pipe.recv, &leader_send, &replica_send).await?; } }) .await; diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 8ccee89c..5fe669f9 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -3,11 +3,12 @@ use crate::Attester; use anyhow::Context; use std::sync::Arc; +use tracing::Instrument; use zksync_concurrency::{ctx, sync, time}; use zksync_consensus_network::gossip::{ AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, }; -use zksync_consensus_roles::attester; +use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; /// Polls the database for new batches to be signed and publishes them to the gossip channel. @@ -57,12 +58,23 @@ impl AttesterRunner { // Subscribe starts as seen but we don't want to miss the first item. self.status.mark_changed(); - loop { - let Some(batch_number) = sync::changed(ctx, &mut self.status) + #[tracing::instrument(skip_all)] + async fn attestation_iter( + ctx: &ctx::Ctx, + status: &mut AttestationStatusReceiver, + batch_store: &BatchStore, + publisher: &BatchVotesPublisher, + attester: &Attester, + poll_interval: time::Duration, + attesters: &attester::Committee, + genesis: &validator::GenesisHash, + ) -> anyhow::Result<()> { + let Some(batch_number) = sync::changed(ctx, status) + .instrument(tracing::info_span!("wait_for_attestation_status")) .await? .next_batch_to_attest else { - continue; + return Ok(()); }; tracing::info!(%batch_number, "attestation status"); @@ -71,12 +83,16 @@ impl AttesterRunner { // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, // which for nodes that get the blocks through BFT should happen after execution. Nodes which // rely on batch sync don't participate in attestations as they need the batch on L1 first. - self.batch_store - .wait_until_persisted(ctx, batch_number) - .await?; + batch_store.wait_until_persisted(ctx, batch_number).await?; // Try to get the next batch to sign; the commitment might not be available just yet. - let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?; + let batch = AttesterRunner::wait_for_batch_to_sign( + ctx, + batch_number, + batch_store, + poll_interval, + ) + .await?; // The certificates might be collected out of order because of how gossip works; // we could query the DB to see if we already have a QC, or we can just go ahead @@ -85,29 +101,46 @@ impl AttesterRunner { tracing::info!(%batch_number, "publishing attestation"); // We only have to publish a vote once; future peers can pull it from the register. - self.publisher - .publish(attesters, &genesis, &self.attester.key, batch) + publisher + .publish(attesters, &genesis, &attester.key, batch) .await .context("publish")?; + + Ok(()) + } + + loop { + attestation_iter( + ctx, + &mut self.status, + &self.batch_store, + &self.publisher, + &self.attester, + self.poll_interval, + &attesters, + &genesis, + ) + .await?; } } /// Wait for the batch commitment to become available. + #[tracing::instrument(skip_all, fields(l1_batch = %number))] async fn wait_for_batch_to_sign( - &self, ctx: &ctx::Ctx, number: attester::BatchNumber, + batch_store: &BatchStore, + poll_interval: time::Duration, ) -> ctx::Result { loop { - if let Some(batch) = self - .batch_store + if let Some(batch) = batch_store .batch_to_sign(ctx, number) .await .context("batch_to_sign")? { return Ok(batch); } else { - ctx.sleep(self.poll_interval).await?; + ctx.sleep(poll_interval).await?; } } } diff --git a/node/actors/network/src/consensus/handshake/mod.rs b/node/actors/network/src/consensus/handshake/mod.rs index 9dc6bdb6..5b0e7561 100644 --- a/node/actors/network/src/consensus/handshake/mod.rs +++ b/node/actors/network/src/consensus/handshake/mod.rs @@ -58,6 +58,7 @@ pub(super) enum Error { Stream(#[from] ctx::Error), } +#[tracing::instrument(name = "handshake::outbound", skip_all)] pub(super) async fn outbound( ctx: &ctx::Ctx, me: &validator::SecretKey, @@ -93,6 +94,7 @@ pub(super) async fn outbound( Ok(()) } +#[tracing::instrument(name = "handshake::inbound", skip_all)] pub(super) async fn inbound( ctx: &ctx::Ctx, me: &validator::SecretKey, diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index c7acea92..d94de6c3 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -1,5 +1,6 @@ //! Consensus network is a full graph of connections between all validators. //! BFT consensus messages are exchanged over this network. +use crate::gossip::ValidatorAddrs; use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc, MeteredStreamStats}; use anyhow::Context as _; use rand::seq::SliceRandom; @@ -163,7 +164,7 @@ impl Network { /// Performs handshake of an inbound stream. /// Closes the stream if there is another inbound stream opened from the same validator. - #[tracing::instrument(level = "info", name = "consensus", skip_all)] + #[tracing::instrument(name = "consensus::run_inbound_stream", skip_all)] pub(crate) async fn run_inbound_stream( &self, ctx: &ctx::Ctx, @@ -195,7 +196,7 @@ impl Network { res } - #[tracing::instrument(level = "info", name = "consensus", skip_all)] + #[tracing::instrument(name = "consensus::run_outbound_stream", skip_all, fields(?peer, %addr))] async fn run_outbound_stream( &self, ctx: &ctx::Ctx, @@ -282,6 +283,7 @@ impl Network { res } + #[tracing::instrument(skip_all)] async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let addr = *self .gossip @@ -295,10 +297,32 @@ impl Network { format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr) })?; self.run_outbound_stream(ctx, &self.key.public(), addr) - .instrument(tracing::info_span!("loopback", ?addr)) .await } + #[tracing::instrument(skip_all, fields(?peer, ?addr))] + async fn maintain_connection_iter( + &self, + ctx: &ctx::Ctx, + peer: &validator::PublicKey, + addrs: &mut sync::watch::Receiver, + addr: &mut Option, + ) { + // Wait for a new address, or retry with the old one after timeout. + if let Ok(new) = sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| { + &addrs.get(peer).map(|x| x.msg.addr) != addr + }) + .instrument(tracing::info_span!("wait_for_address")) + .await + { + *addr = new.get(peer).map(|x| x.msg.addr); + } + let Some(addr) = addr else { return }; + if let Err(err) = self.run_outbound_stream(ctx, peer, *addr).await { + tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}"); + } + } + /// Maintains a connection to the given validator. /// If connection breaks, it tries to reconnect periodically. pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) { @@ -314,24 +338,10 @@ impl Network { } let addrs = &mut self.gossip.validator_addrs.subscribe(); let mut addr = None; + while ctx.is_active() { - // Wait for a new address, or retry with the old one after timeout. - if let Ok(new) = - sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| { - addrs.get(peer).map(|x| x.msg.addr) != addr - }) - .await - { - addr = new.get(peer).map(|x| x.msg.addr); - } - let Some(addr) = addr else { continue }; - if let Err(err) = self - .run_outbound_stream(ctx, peer, addr) - .instrument(tracing::info_span!("out", ?addr)) - .await - { - tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}"); - } + self.maintain_connection_iter(ctx, peer, addrs, &mut addr) + .await; } } } diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index abe8d969..b48fb205 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -286,6 +286,7 @@ impl BatchVotesWatch { } /// Set the minimum batch number on the votes and discard old data. + #[tracing::instrument(skip_all, fields(%min_batch_number))] pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) { let this = self.0.lock().await; this.send_modify(|votes| votes.set_min_batch_number(min_batch_number)); @@ -308,6 +309,7 @@ impl fmt::Debug for BatchVotesPublisher { impl BatchVotesPublisher { /// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network. + #[tracing::instrument(skip_all, fields(l1_batch = %batch.number))] pub async fn publish( &self, attesters: &attester::Committee, diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 88663383..3c3a15d2 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -17,12 +17,14 @@ pub use self::attestation_status::{ }; pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; +use crate::gossip::batch_votes::BatchVotes; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; +use tracing::Instrument; pub(crate) use validator_addrs::*; use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; -use zksync_consensus_roles::{node, validator}; +use zksync_consensus_roles::{attester, node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; mod attestation_status; @@ -108,20 +110,32 @@ impl Network { let number = ctx::NoCopy(next); next = next + 1; // Fetch a block asynchronously. - s.spawn(async { - let _permit = permit; - let number = number.into(); - let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { - s.spawn_bg(self.fetch_queue.request(ctx, RequestItem::Block(number))); - // Cancel fetching as soon as block is queued for storage. - self.block_store.wait_until_queued(ctx, number).await?; - Err(ctx::Canceled) - }) - .await; - // Wait until the block is actually persisted, so that the amount of blocks - // stored in memory is bounded. - self.block_store.wait_until_persisted(ctx, number).await - }); + s.spawn( + async { + let _permit = permit; + let number = number.into(); + let queue_span = tracing::info_span!("wait_for_block_to_queue"); + let queue_span_copy = queue_span.clone(); + let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { + let request_span = tracing::info_span!("fetch_block_request"); + request_span.follows_from(queue_span_copy.id()); + s.spawn_bg( + self.fetch_queue + .request(ctx, RequestItem::Block(number)) + .instrument(request_span), + ); + // Cancel fetching as soon as block is queued for storage. + self.block_store.wait_until_queued(ctx, number).await?; + Err(ctx::Canceled) + }) + .instrument(queue_span) + .await; + // Wait until the block is actually persisted, so that the amount of blocks + // stored in memory is bounded. + self.block_store.wait_until_persisted(ctx, number).await + } + .instrument(tracing::info_span!("fetch_block_from_peer", l2_block = %next)), + ); } }) .await; @@ -137,20 +151,32 @@ impl Network { let number = ctx::NoCopy(next); next = next + 1; // Fetch a batch asynchronously. - s.spawn(async { - let _permit = permit; - let number = number.into(); - let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { - s.spawn_bg(self.fetch_queue.request(ctx, RequestItem::Batch(number))); - // Cancel fetching as soon as batch is queued for storage. - self.batch_store.wait_until_queued(ctx, number).await?; - Err(ctx::Canceled) - }) - .await; - // Wait until the batch is actually persisted, so that the amount of batches - // stored in memory is bounded. - self.batch_store.wait_until_persisted(ctx, number).await - }); + s.spawn( + async { + let _permit = permit; + let number = number.into(); + let queue_span = tracing::info_span!("wait_for_batch_to_queue"); + let queue_span_copy = queue_span.clone(); + let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { + let request_span = tracing::info_span!("fetch_block_request"); + request_span.follows_from(queue_span_copy.id()); + s.spawn_bg( + self.fetch_queue + .request(ctx, RequestItem::Batch(number)) + .instrument(request_span), + ); + // Cancel fetching as soon as batch is queued for storage. + self.batch_store.wait_until_queued(ctx, number).await?; + Err(ctx::Canceled) + }) + .instrument(queue_span) + .await; + // Wait until the batch is actually persisted, so that the amount of batches + // stored in memory is bounded. + self.batch_store.wait_until_persisted(ctx, number).await + } + .instrument(tracing::info_span!("fetch_batch_from_peer", l1_batch = %next)), + ); } }) .await; @@ -171,17 +197,27 @@ impl Network { // Subscribe starts as seen but we don't want to miss the first item. recv_status.mark_changed(); - loop { + #[tracing::instrument(skip_all)] + async fn new_votes_iter( + ctx: &ctx::Ctx, + attesters: &attester::Committee, + genesis: &validator::GenesisHash, + recv_votes: &mut sync::watch::Receiver, + recv_status: &mut AttestationStatusReceiver, + batch_votes: &BatchVotesWatch, + batch_store: &BatchStore, + ) -> ctx::Result<()> { // Wait until the status indicates that we're ready to sign the next batch. - let Some(batch_number) = sync::changed(ctx, &mut recv_status) + let Some(batch_number) = sync::changed(ctx, recv_status) + .instrument(tracing::info_span!("wait_for_attestation_status")) .await? .next_batch_to_attest else { - continue; + return Ok(()); }; // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. - self.batch_votes.set_min_batch_number(batch_number).await; + batch_votes.set_min_batch_number(batch_number).await; // Now wait until we find the next quorum, whatever it is: // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps @@ -189,15 +225,31 @@ impl Network { // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. - let qc = sync::wait_for_some(ctx, &mut recv_votes, |votes| { - votes.find_quorum(attesters, &genesis) + let qc = sync::wait_for_some(ctx, recv_votes, |votes| { + votes.find_quorum(attesters, genesis) }) + .instrument(tracing::info_span!("wait_for_quorum")) .await?; - self.batch_store + batch_store .persist_batch_qc(ctx, qc) .await .wrap("persist_batch_qc")?; + + Ok(()) + } + + loop { + new_votes_iter( + ctx, + attesters, + &genesis, + &mut recv_votes, + &mut recv_status, + &self.batch_votes, + &self.batch_store, + ) + .await?; } } } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 2596ccd7..effd0466 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -366,7 +366,7 @@ impl Network { /// Handles an inbound stream. /// Closes the stream if there is another inbound stream opened from the same peer. - #[tracing::instrument(level = "info", name = "gossip", skip_all)] + #[tracing::instrument(name = "gossip::run_inbound_stream", skip_all)] pub(crate) async fn run_inbound_stream( &self, ctx: &ctx::Ctx, @@ -384,7 +384,7 @@ impl Network { } /// Connects to a peer and handles the resulting stream. - #[tracing::instrument(level = "info", name = "gossip", skip_all)] + #[tracing::instrument(name = "gossip::run_outbound_stream", skip_all)] pub(crate) async fn run_outbound_stream( &self, ctx: &ctx::Ctx, diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 1d5f3cbd..94b692c6 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -151,7 +151,6 @@ impl Runner { .net .gossip .run_outbound_stream(ctx, peer, addr.clone()) - .instrument(tracing::info_span!("out", ?addr)) .await; if let Err(err) = res { tracing::info!("gossip.run_outbound_stream({addr:?}): {err:#}"); @@ -209,7 +208,7 @@ impl Runner { } anyhow::Ok(()) } - .instrument(tracing::info_span!("in", ?addr)) + .instrument(tracing::info_span!("accept_connection", ?addr)) .await; if let Err(err) = res { tracing::info!("{addr}: {err:#}"); diff --git a/node/libs/storage/src/batch_store/mod.rs b/node/libs/storage/src/batch_store/mod.rs index 889014f1..e6f0ac53 100644 --- a/node/libs/storage/src/batch_store/mod.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -1,6 +1,8 @@ //! Defines storage layer for batches of blocks. use anyhow::Context as _; +use std::borrow::Borrow; use std::{collections::VecDeque, fmt, sync::Arc}; +use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{attester, validator}; @@ -148,6 +150,7 @@ impl Inner { true } + #[tracing::instrument(skip_all)] fn truncate_cache(&mut self) { while self.cache.len() > Self::CACHE_CAPACITY && self.persisted.contains(self.cache[0].number) @@ -185,6 +188,45 @@ impl BatchStoreRunner { let store_ref = Arc::downgrade(&self.0); let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); + #[tracing::instrument(skip_all)] + async fn truncate_batch_cache_iteration( + ctx: &ctx::Ctx, + persisted: &mut sync::watch::Receiver, + inner: &sync::watch::Sender, + ) -> ctx::Result<()> { + let persisted = sync::changed(ctx, persisted) + .instrument(tracing::info_span!("wait_for_batch_store_change")) + .await? + .clone(); + inner.send_modify(|inner| { + // XXX: In `BlockStoreRunner` update both the `queued` and the `persisted` here. + inner.persisted = persisted; + inner.truncate_cache(); + }); + + Ok(()) + } + + #[tracing::instrument(skip_all)] + async fn queue_persist_batch_iteration( + ctx: &ctx::Ctx, + queue_next: &mut attester::BatchNumber, + inner: &mut sync::watch::Receiver, + persistent: &dyn PersistentBatchStore, + ) -> ctx::Result<()> { + let batch = sync::wait_for(ctx, inner, |inner| inner.queued.contains(*queue_next)) + .instrument(tracing::info_span!("wait_for_next_batch")) + .await? + .batch(*queue_next) + .unwrap(); + + *queue_next = queue_next.next(); + + persistent.queue_next_batch(ctx, batch).await?; + + Ok(()) + } + let res = scope::run!(ctx, |ctx, s| async { let persisted = self.0.persistent.persisted(); let mut queue_next = persisted.borrow().next(); @@ -192,24 +234,18 @@ impl BatchStoreRunner { s.spawn::<()>(async { let mut persisted = persisted; loop { - let persisted = sync::changed(ctx, &mut persisted).await?.clone(); - self.0.inner.send_modify(|inner| { - // XXX: In `BlockStoreRunner` update both the `queued` and the `persisted` here. - inner.persisted = persisted; - inner.truncate_cache(); - }); + truncate_batch_cache_iteration(ctx, &mut persisted, &self.0.inner).await?; } }); let inner = &mut self.0.inner.subscribe(); loop { - let batch = sync::wait_for(ctx, inner, |inner| inner.queued.contains(queue_next)) - .await? - .batch(queue_next) - .unwrap(); - - queue_next = queue_next.next(); - - self.0.persistent.queue_next_batch(ctx, batch).await?; + queue_persist_batch_iteration( + ctx, + &mut queue_next, + inner, + self.0.persistent.borrow(), + ) + .await?; } }) .await; @@ -347,6 +383,7 @@ impl BatchStore { } /// Wait until the database has a batch, then attach the corresponding QC. + #[tracing::instrument(skip_all, fields(l1_batch = %qc.message.number))] pub async fn persist_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { let t = metrics::BATCH_STORE.persist_batch_qc.start(); // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload @@ -368,6 +405,7 @@ impl BatchStore { /// Waits until the given batch is queued (in memory, or persisted). /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. + #[tracing::instrument(skip_all, fields(l1_batch = %number))] pub async fn wait_until_queued( &self, ctx: &ctx::Ctx, @@ -388,6 +426,7 @@ impl BatchStore { /// Waits until the given batch is stored persistently. /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. + #[tracing::instrument(skip_all, fields(l1_batch = %number))] pub async fn wait_until_persisted( &self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index a901c0fe..0dcc0a45 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -1,6 +1,8 @@ //! Defines storage layer for finalized blocks. use anyhow::Context as _; +use std::borrow::Borrow; use std::{collections::VecDeque, fmt, sync::Arc}; +use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; use zksync_consensus_roles::validator; @@ -117,6 +119,13 @@ impl Inner { } /// Updates `persisted` field. + #[tracing::instrument( + skip_all, + fields( + first_l2_block = %persisted.first, + last_l2_block = ?persisted.last.as_ref().map(|l| l.message.proposal.number) + ) + )] fn update_persisted(&mut self, persisted: BlockStoreState) -> anyhow::Result<()> { if persisted.next() < self.persisted.next() { anyhow::bail!("head block has been removed from storage, this is not supported"); @@ -175,31 +184,64 @@ impl BlockStoreRunner { let store_ref = Arc::downgrade(&self.0); let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); + #[tracing::instrument(skip_all)] + async fn watch_persistent_state_iteration( + ctx: &ctx::Ctx, + persisted: &mut sync::watch::Receiver, + inner: &sync::watch::Sender, + ) -> ctx::Result<()> { + let new = sync::changed(ctx, persisted) + .instrument(tracing::info_span!("wait_for_block_store_change")) + .await? + .clone(); + sync::try_send_modify(inner, |inner| inner.update_persisted(new))?; + + Ok(()) + } + + #[tracing::instrument(skip_all)] + async fn queue_persist_block_iteration( + ctx: &ctx::Ctx, + queue_next: &mut validator::BlockNumber, + inner: &mut sync::watch::Receiver, + persistent: &dyn PersistentBlockStore, + ) -> ctx::Result<()> { + let block = sync::wait_for_some(ctx, inner, |inner| { + inner.block((*queue_next).max(inner.persisted.next())) + }) + .instrument(tracing::info_span!("wait_for_next_block")) + .await?; + *queue_next = block.number().next(); + // TODO: monitor errors as well. + let t = metrics::PERSISTENT_BLOCK_STORE + .queue_next_block_latency + .start(); + persistent.queue_next_block(ctx, block).await?; + t.observe(); + + Ok(()) + } + let res = scope::run!(ctx, |ctx, s| async { s.spawn::<()>(async { // Task watching the persisted state. let mut persisted = self.0.persistent.persisted(); persisted.mark_changed(); loop { - let new = sync::changed(ctx, &mut persisted).await?.clone(); - sync::try_send_modify(&self.0.inner, |inner| inner.update_persisted(new))?; + watch_persistent_state_iteration(ctx, &mut persisted, &self.0.inner).await?; } }); // Task queueing blocks to be persisted. let inner = &mut self.0.inner.subscribe(); let mut queue_next = validator::BlockNumber(0); loop { - let block = sync::wait_for_some(ctx, inner, |inner| { - inner.block(queue_next.max(inner.persisted.next())) - }) + queue_persist_block_iteration( + ctx, + &mut queue_next, + inner, + self.0.persistent.borrow(), + ) .await?; - queue_next = block.number().next(); - // TODO: monitor errors as well. - let t = metrics::PERSISTENT_BLOCK_STORE - .queue_next_block_latency - .start(); - self.0.persistent.queue_next_block(ctx, block).await?; - t.observe(); } }) .await; @@ -309,6 +351,7 @@ impl BlockStore { /// Waits until the given block is queued (in memory, or persisted). /// Note that it doesn't mean that the block is actually available, as old blocks might get pruned. + #[tracing::instrument(skip_all, fields(l2_block = %number))] pub async fn wait_until_queued( &self, ctx: &ctx::Ctx, @@ -324,6 +367,7 @@ impl BlockStore { /// Waits until the given block is stored persistently. /// Note that it doesn't mean that the block is actually available, as old blocks might get pruned. + #[tracing::instrument(skip_all, fields(l2_block = %number))] pub async fn wait_until_persisted( &self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 79a21207..2ca8858e 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -202,6 +202,7 @@ impl PersistentBatchStore for BatchStore { Ok(batches.get(idx as usize).cloned()) } + #[tracing::instrument(skip_all, fields(l1_batch = %batch.number))] async fn queue_next_batch( &self, _ctx: &ctx::Ctx, diff --git a/node/tools/src/store.rs b/node/tools/src/store.rs index 8847a27c..c6622b75 100644 --- a/node/tools/src/store.rs +++ b/node/tools/src/store.rs @@ -137,7 +137,7 @@ impl PersistentBlockStore for RocksDB { .wrap(number) } - #[tracing::instrument(level = "debug", skip(self))] + #[tracing::instrument(skip_all, fields(l2_block = %block.justification.message.proposal.number))] async fn queue_next_block( &self, _ctx: &ctx::Ctx, From 75189d84bbb922a528660e7545f69abd2f69510d Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 6 Aug 2024 20:32:21 +1000 Subject: [PATCH 2/5] fmt --- node/actors/network/src/gossip/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 7ceb3e98..c5e07d39 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -14,11 +14,11 @@ //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). use self::batch_votes::BatchVotesWatch; -use crate::gossip::batch_votes::BatchVotes; pub use self::{ attestation_status::{AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch}, batch_votes::BatchVotesPublisher, }; +use crate::gossip::batch_votes::BatchVotes; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; From 857c3bd9c06fd0ac418a08ff182b2eb5c27de2da Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Tue, 6 Aug 2024 21:04:04 +1000 Subject: [PATCH 3/5] get rid of `_iter` functions --- node/actors/bft/src/lib.rs | 65 ++++++++-------- node/actors/executor/src/attestation.rs | 94 ++++++++++-------------- node/actors/network/src/consensus/mod.rs | 44 +++++------ node/actors/network/src/gossip/mod.rs | 81 ++++++++------------ node/libs/concurrency/src/ctx/mod.rs | 14 +++- node/libs/concurrency/src/ctx/tests.rs | 6 +- node/libs/storage/src/batch_store/mod.rs | 77 ++++++++----------- node/libs/storage/src/block_store/mod.rs | 74 +++++++------------ 8 files changed, 189 insertions(+), 266 deletions(-) diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index 3af76fb7..12b0810c 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -20,7 +20,7 @@ use anyhow::Context; pub use config::Config; use std::sync::Arc; use tracing::Instrument; -use zksync_concurrency::{ctx, error::Wrap as _, oneshot, scope, sync}; +use zksync_concurrency::{ctx, error::Wrap as _, oneshot, scope}; use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; use zksync_consensus_utils::pipe::ActorPipe; @@ -57,8 +57,6 @@ pub trait PayloadManager: std::fmt::Debug + Send + Sync { /// Channel through which bft actor sends network messages. pub(crate) type OutputSender = ctx::channel::UnboundedSender; -/// Channel through which bft actor receives network messages. -pub(crate) type InputReceiver = ctx::channel::UnboundedReceiver; impl Config { /// Starts the bft actor. It will start running, processing incoming messages and @@ -90,42 +88,37 @@ impl Config { tracing::info!("Starting consensus actor {:?}", cfg.secret_key.public()); - #[tracing::instrument(skip_all)] - async fn bft_iter( - ctx: &ctx::Ctx, - pipe_recv: &mut InputReceiver, - leader_send: &sync::prunable_mpsc::Sender, - replica_send: &sync::prunable_mpsc::Sender, - ) -> anyhow::Result<()> { - let InputMessage::Network(req) = pipe_recv - .recv(ctx) - .instrument(tracing::info_span!("wait_for_message")) - .await?; - use validator::ConsensusMsg as M; - match &req.msg.msg { - M::ReplicaPrepare(_) => { - // This is a hacky way to do a clone. This is necessary since we don't want to derive - // Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway. - let (ack, _) = oneshot::channel(); - let new_req = ConsensusReq { - msg: req.msg.clone(), - ack, - }; - - replica_send.send(new_req); - leader_send.send(req); - } - M::ReplicaCommit(_) => leader_send.send(req), - M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req), - } - - Ok(()) - } - // This is the infinite loop where the consensus actually runs. The validator waits for either // a message from the network or for a timeout, and processes each accordingly. loop { - bft_iter(ctx, &mut pipe.recv, &leader_send, &replica_send).await?; + async { + let InputMessage::Network(req) = pipe + .recv + .recv(ctx) + .instrument(tracing::info_span!("wait_for_message")) + .await?; + use validator::ConsensusMsg as M; + match &req.msg.msg { + M::ReplicaPrepare(_) => { + // This is a hacky way to do a clone. This is necessary since we don't want to derive + // Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway. + let (ack, _) = oneshot::channel(); + let new_req = ConsensusReq { + msg: req.msg.clone(), + ack, + }; + + replica_send.send(new_req); + leader_send.send(req); + } + M::ReplicaCommit(_) => leader_send.send(req), + M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req), + } + + anyhow::Ok(()) + } + .instrument(tracing::info_span!("bft_iter")) + .await?; } }) .await; diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 5fe669f9..e0bd63d5 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -8,7 +8,7 @@ use zksync_concurrency::{ctx, sync, time}; use zksync_consensus_network::gossip::{ AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, }; -use zksync_consensus_roles::{attester, validator}; +use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; /// Polls the database for new batches to be signed and publishes them to the gossip channel. @@ -58,68 +58,50 @@ impl AttesterRunner { // Subscribe starts as seen but we don't want to miss the first item. self.status.mark_changed(); - #[tracing::instrument(skip_all)] - async fn attestation_iter( - ctx: &ctx::Ctx, - status: &mut AttestationStatusReceiver, - batch_store: &BatchStore, - publisher: &BatchVotesPublisher, - attester: &Attester, - poll_interval: time::Duration, - attesters: &attester::Committee, - genesis: &validator::GenesisHash, - ) -> anyhow::Result<()> { - let Some(batch_number) = sync::changed(ctx, status) - .instrument(tracing::info_span!("wait_for_attestation_status")) - .await? - .next_batch_to_attest - else { - return Ok(()); - }; + loop { + async { + let Some(batch_number) = sync::changed(ctx, &mut self.status) + .instrument(tracing::info_span!("wait_for_attestation_status")) + .await? + .next_batch_to_attest + else { + return Ok(()); + }; - tracing::info!(%batch_number, "attestation status"); + tracing::info!(%batch_number, "attestation status"); - // We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence - // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, - // which for nodes that get the blocks through BFT should happen after execution. Nodes which - // rely on batch sync don't participate in attestations as they need the batch on L1 first. - batch_store.wait_until_persisted(ctx, batch_number).await?; + // We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence + // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, + // which for nodes that get the blocks through BFT should happen after execution. Nodes which + // rely on batch sync don't participate in attestations as they need the batch on L1 first. + self.batch_store + .wait_until_persisted(ctx, batch_number) + .await?; - // Try to get the next batch to sign; the commitment might not be available just yet. - let batch = AttesterRunner::wait_for_batch_to_sign( - ctx, - batch_number, - batch_store, - poll_interval, - ) - .await?; + // Try to get the next batch to sign; the commitment might not be available just yet. + let batch = AttesterRunner::wait_for_batch_to_sign( + ctx, + batch_number, + &self.batch_store, + self.poll_interval, + ) + .await?; - // The certificates might be collected out of order because of how gossip works; - // we could query the DB to see if we already have a QC, or we can just go ahead - // and publish our vote, and let others ignore it. + // The certificates might be collected out of order because of how gossip works; + // we could query the DB to see if we already have a QC, or we can just go ahead + // and publish our vote, and let others ignore it. - tracing::info!(%batch_number, "publishing attestation"); + tracing::info!(%batch_number, "publishing attestation"); - // We only have to publish a vote once; future peers can pull it from the register. - publisher - .publish(attesters, &genesis, &attester.key, batch) - .await - .context("publish")?; + // We only have to publish a vote once; future peers can pull it from the register. + self.publisher + .publish(attesters, &genesis, &self.attester.key, batch) + .await + .context("publish")?; - Ok(()) - } - - loop { - attestation_iter( - ctx, - &mut self.status, - &self.batch_store, - &self.publisher, - &self.attester, - self.poll_interval, - &attesters, - &genesis, - ) + ctx::Ok(()) + } + .instrument(tracing::info_span!("attestation_iter")) .await?; } } diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index d94de6c3..3d4f5ad9 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -1,6 +1,5 @@ //! Consensus network is a full graph of connections between all validators. //! BFT consensus messages are exchanged over this network. -use crate::gossip::ValidatorAddrs; use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc, MeteredStreamStats}; use anyhow::Context as _; use rand::seq::SliceRandom; @@ -300,29 +299,6 @@ impl Network { .await } - #[tracing::instrument(skip_all, fields(?peer, ?addr))] - async fn maintain_connection_iter( - &self, - ctx: &ctx::Ctx, - peer: &validator::PublicKey, - addrs: &mut sync::watch::Receiver, - addr: &mut Option, - ) { - // Wait for a new address, or retry with the old one after timeout. - if let Ok(new) = sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| { - &addrs.get(peer).map(|x| x.msg.addr) != addr - }) - .instrument(tracing::info_span!("wait_for_address")) - .await - { - *addr = new.get(peer).map(|x| x.msg.addr); - } - let Some(addr) = addr else { return }; - if let Err(err) = self.run_outbound_stream(ctx, peer, *addr).await { - tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}"); - } - } - /// Maintains a connection to the given validator. /// If connection breaks, it tries to reconnect periodically. pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) { @@ -340,8 +316,24 @@ impl Network { let mut addr = None; while ctx.is_active() { - self.maintain_connection_iter(ctx, peer, addrs, &mut addr) - .await; + async { + // Wait for a new address, or retry with the old one after timeout. + if let Ok(new) = + sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| { + addrs.get(peer).map(|x| x.msg.addr) != addr + }) + .instrument(tracing::info_span!("wait_for_address")) + .await + { + addr = new.get(peer).map(|x| x.msg.addr); + } + let Some(addr) = addr else { return }; + if let Err(err) = self.run_outbound_stream(ctx, peer, addr).await { + tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}"); + } + } + .instrument(tracing::info_span!("maintain_connection_iter")) + .await; } } } diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index c5e07d39..3d2a13e1 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -18,14 +18,13 @@ pub use self::{ attestation_status::{AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch}, batch_votes::BatchVotesPublisher, }; -use crate::gossip::batch_votes::BatchVotes; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; use tracing::Instrument; pub(crate) use validator_addrs::*; use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; -use zksync_consensus_roles::{attester, node, validator}; +use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; mod attestation_status; @@ -198,58 +197,40 @@ impl Network { // Subscribe starts as seen but we don't want to miss the first item. recv_status.mark_changed(); - #[tracing::instrument(skip_all)] - async fn new_votes_iter( - ctx: &ctx::Ctx, - attesters: &attester::Committee, - genesis: &validator::GenesisHash, - recv_votes: &mut sync::watch::Receiver, - recv_status: &mut AttestationStatusReceiver, - batch_votes: &BatchVotesWatch, - batch_store: &BatchStore, - ) -> ctx::Result<()> { - // Wait until the status indicates that we're ready to sign the next batch. - let Some(batch_number) = sync::changed(ctx, recv_status) - .instrument(tracing::info_span!("wait_for_attestation_status")) - .await? - .next_batch_to_attest - else { - return Ok(()); - }; - - // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. - batch_votes.set_min_batch_number(batch_number).await; + loop { + async { + // Wait until the status indicates that we're ready to sign the next batch. + let Some(batch_number) = sync::changed(ctx, &mut recv_status) + .instrument(tracing::info_span!("wait_for_attestation_status")) + .await? + .next_batch_to_attest + else { + return Ok(()); + }; - // Now wait until we find the next quorum, whatever it is: - // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps - // * on external nodes the votes might be affected by changes in the value returned by the API, and there might be gaps - // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters - // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. - // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. - let qc = sync::wait_for_some(ctx, recv_votes, |votes| { - votes.find_quorum(attesters, genesis) - }) - .instrument(tracing::info_span!("wait_for_quorum")) - .await?; + // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. + self.batch_votes.set_min_batch_number(batch_number).await; - batch_store - .persist_batch_qc(ctx, qc) - .await - .wrap("persist_batch_qc")?; + // Now wait until we find the next quorum, whatever it is: + // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps + // * on external nodes the votes might be affected by changes in the value returned by the API, and there might be gaps + // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters + // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. + // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. + let qc = sync::wait_for_some(ctx, &mut recv_votes, |votes| { + votes.find_quorum(attesters, &genesis) + }) + .instrument(tracing::info_span!("wait_for_quorum")) + .await?; - Ok(()) - } + self.batch_store + .persist_batch_qc(ctx, qc) + .await + .wrap("persist_batch_qc")?; - loop { - new_votes_iter( - ctx, - attesters, - &genesis, - &mut recv_votes, - &mut recv_status, - &self.batch_votes, - &self.batch_store, - ) + ctx::Ok(()) + } + .instrument(tracing::info_span!("new_votes_iter")) .await?; } } diff --git a/node/libs/concurrency/src/ctx/mod.rs b/node/libs/concurrency/src/ctx/mod.rs index bf4f01d5..58ac2d54 100644 --- a/node/libs/concurrency/src/ctx/mod.rs +++ b/node/libs/concurrency/src/ctx/mod.rs @@ -185,8 +185,8 @@ impl Ctx { ) -> CtxAware>> { CtxAware(async { tokio::select! { - output = fut => Ok(output), - () = self.0.canceled.cancel_safe_recv() => Err(Canceled), + output = fut => OrCanceled::Ok(output), + () = self.0.canceled.cancel_safe_recv() => OrCanceled::Err(Canceled), } }) } @@ -271,6 +271,16 @@ pub enum Error { /// Alias for Result with `ctx::Error`. pub type Result = std::result::Result; +/// Equivalent to Ok::<_, ctx::Error>(value). +/// +/// This simplifies creation of an ctx::Result in places where type inference +/// cannot deduce the `E` type of the result — without needing to write +/// `Ok::<_, ctx::Error>(value)`. +#[allow(non_snake_case)] +pub fn Ok(t: T) -> Result { + Result::Ok(t) +} + impl crate::error::Wrap for Error { fn with_wrap C>(self, f: F) -> Self { match self { diff --git a/node/libs/concurrency/src/ctx/tests.rs b/node/libs/concurrency/src/ctx/tests.rs index cf24ac50..41f33cd3 100644 --- a/node/libs/concurrency/src/ctx/tests.rs +++ b/node/libs/concurrency/src/ctx/tests.rs @@ -9,7 +9,7 @@ async fn test_run_canceled() { scope::run!(ctx, |ctx, s| async { s.cancel(); assert!(!ctx.is_active()); - Ok::<(), ()>(()) + std::result::Result::Ok::<(), ()>(()) }) .await .unwrap(); @@ -46,7 +46,7 @@ async fn test_sleep_until() { }); clock.advance(1001 * sec); tracing::info!("root task terminating"); - Ok(()) + std::result::Result::Ok(()) }) .await; assert_eq!(Err(9), res); @@ -55,7 +55,7 @@ async fn test_sleep_until() { let res = scope::run!(ctx, |ctx, s| async { s.spawn(async { assert!(ctx.sleep_until(t).await.is_err()); - Ok(()) + std::result::Result::Ok(()) }); clock.advance_until(t - sec); R::Err(1) diff --git a/node/libs/storage/src/batch_store/mod.rs b/node/libs/storage/src/batch_store/mod.rs index e6f0ac53..43d060a0 100644 --- a/node/libs/storage/src/batch_store/mod.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -1,6 +1,5 @@ //! Defines storage layer for batches of blocks. use anyhow::Context as _; -use std::borrow::Borrow; use std::{collections::VecDeque, fmt, sync::Arc}; use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; @@ -188,45 +187,6 @@ impl BatchStoreRunner { let store_ref = Arc::downgrade(&self.0); let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); - #[tracing::instrument(skip_all)] - async fn truncate_batch_cache_iteration( - ctx: &ctx::Ctx, - persisted: &mut sync::watch::Receiver, - inner: &sync::watch::Sender, - ) -> ctx::Result<()> { - let persisted = sync::changed(ctx, persisted) - .instrument(tracing::info_span!("wait_for_batch_store_change")) - .await? - .clone(); - inner.send_modify(|inner| { - // XXX: In `BlockStoreRunner` update both the `queued` and the `persisted` here. - inner.persisted = persisted; - inner.truncate_cache(); - }); - - Ok(()) - } - - #[tracing::instrument(skip_all)] - async fn queue_persist_batch_iteration( - ctx: &ctx::Ctx, - queue_next: &mut attester::BatchNumber, - inner: &mut sync::watch::Receiver, - persistent: &dyn PersistentBatchStore, - ) -> ctx::Result<()> { - let batch = sync::wait_for(ctx, inner, |inner| inner.queued.contains(*queue_next)) - .instrument(tracing::info_span!("wait_for_next_batch")) - .await? - .batch(*queue_next) - .unwrap(); - - *queue_next = queue_next.next(); - - persistent.queue_next_batch(ctx, batch).await?; - - Ok(()) - } - let res = scope::run!(ctx, |ctx, s| async { let persisted = self.0.persistent.persisted(); let mut queue_next = persisted.borrow().next(); @@ -234,17 +194,40 @@ impl BatchStoreRunner { s.spawn::<()>(async { let mut persisted = persisted; loop { - truncate_batch_cache_iteration(ctx, &mut persisted, &self.0.inner).await?; + async { + let persisted = sync::changed(ctx, &mut persisted) + .instrument(tracing::info_span!("wait_for_batch_store_change")) + .await? + .clone(); + self.0.inner.send_modify(|inner| { + // XXX: In `BlockStoreRunner` update both the `queued` and the `persisted` here. + inner.persisted = persisted; + inner.truncate_cache(); + }); + + ctx::Ok(()) + } + .instrument(tracing::info_span!("truncate_batch_cache_iter")) + .await?; } }); let inner = &mut self.0.inner.subscribe(); loop { - queue_persist_batch_iteration( - ctx, - &mut queue_next, - inner, - self.0.persistent.borrow(), - ) + async { + let batch = + sync::wait_for(ctx, inner, |inner| inner.queued.contains(queue_next)) + .instrument(tracing::info_span!("wait_for_next_batch")) + .await? + .batch(queue_next) + .unwrap(); + + queue_next = queue_next.next(); + + self.0.persistent.queue_next_batch(ctx, batch).await?; + + ctx::Ok(()) + } + .instrument(tracing::info_span!("queue_persist_batch_iter")) .await?; } }) diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index 0dcc0a45..14397c19 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -1,6 +1,5 @@ //! Defines storage layer for finalized blocks. use anyhow::Context as _; -use std::borrow::Borrow; use std::{collections::VecDeque, fmt, sync::Arc}; use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; @@ -184,63 +183,46 @@ impl BlockStoreRunner { let store_ref = Arc::downgrade(&self.0); let _ = COLLECTOR.before_scrape(move || Some(store_ref.upgrade()?.scrape_metrics())); - #[tracing::instrument(skip_all)] - async fn watch_persistent_state_iteration( - ctx: &ctx::Ctx, - persisted: &mut sync::watch::Receiver, - inner: &sync::watch::Sender, - ) -> ctx::Result<()> { - let new = sync::changed(ctx, persisted) - .instrument(tracing::info_span!("wait_for_block_store_change")) - .await? - .clone(); - sync::try_send_modify(inner, |inner| inner.update_persisted(new))?; - - Ok(()) - } - - #[tracing::instrument(skip_all)] - async fn queue_persist_block_iteration( - ctx: &ctx::Ctx, - queue_next: &mut validator::BlockNumber, - inner: &mut sync::watch::Receiver, - persistent: &dyn PersistentBlockStore, - ) -> ctx::Result<()> { - let block = sync::wait_for_some(ctx, inner, |inner| { - inner.block((*queue_next).max(inner.persisted.next())) - }) - .instrument(tracing::info_span!("wait_for_next_block")) - .await?; - *queue_next = block.number().next(); - // TODO: monitor errors as well. - let t = metrics::PERSISTENT_BLOCK_STORE - .queue_next_block_latency - .start(); - persistent.queue_next_block(ctx, block).await?; - t.observe(); - - Ok(()) - } - let res = scope::run!(ctx, |ctx, s| async { s.spawn::<()>(async { // Task watching the persisted state. let mut persisted = self.0.persistent.persisted(); persisted.mark_changed(); loop { - watch_persistent_state_iteration(ctx, &mut persisted, &self.0.inner).await?; + async { + let new = sync::changed(ctx, &mut persisted) + .instrument(tracing::info_span!("wait_for_block_store_change")) + .await? + .clone(); + sync::try_send_modify(&self.0.inner, |inner| inner.update_persisted(new))?; + + ctx::Ok(()) + } + .instrument(tracing::info_span!("watch_persistent_state_iteration")) + .await?; } }); // Task queueing blocks to be persisted. let inner = &mut self.0.inner.subscribe(); let mut queue_next = validator::BlockNumber(0); loop { - queue_persist_block_iteration( - ctx, - &mut queue_next, - inner, - self.0.persistent.borrow(), - ) + async { + let block = sync::wait_for_some(ctx, inner, |inner| { + inner.block(queue_next.max(inner.persisted.next())) + }) + .instrument(tracing::info_span!("wait_for_next_block")) + .await?; + queue_next = block.number().next(); + // TODO: monitor errors as well. + let t = metrics::PERSISTENT_BLOCK_STORE + .queue_next_block_latency + .start(); + self.0.persistent.queue_next_block(ctx, block).await?; + t.observe(); + + ctx::Ok(()) + } + .instrument(tracing::info_span!("queue_persist_block_iteration")) .await?; } }) From ed2eceddeb1c04ea87b88276ebf91613a1e2d535 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 8 Aug 2024 17:07:33 +1000 Subject: [PATCH 4/5] do not flatten error --- node/actors/bft/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index 12b0810c..efa74e1d 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -115,7 +115,7 @@ impl Config { M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req), } - anyhow::Ok(()) + ctx::Ok(()) } .instrument(tracing::info_span!("bft_iter")) .await?; From 6c66669999ad0738e6c3e55513fa81047d3b9266 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Thu, 8 Aug 2024 18:11:50 +1000 Subject: [PATCH 5/5] plain nested spans instead of follows_from --- node/actors/network/src/gossip/mod.rs | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 3d2a13e1..7d22062d 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -114,21 +114,17 @@ impl Network { async { let _permit = permit; let number = number.into(); - let queue_span = tracing::info_span!("wait_for_block_to_queue"); - let queue_span_copy = queue_span.clone(); let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { - let request_span = tracing::info_span!("fetch_block_request"); - request_span.follows_from(queue_span_copy.id()); s.spawn_bg( self.fetch_queue .request(ctx, RequestItem::Block(number)) - .instrument(request_span), + .instrument(tracing::info_span!("fetch_block_request")), ); // Cancel fetching as soon as block is queued for storage. self.block_store.wait_until_queued(ctx, number).await?; Err(ctx::Canceled) }) - .instrument(queue_span) + .instrument(tracing::info_span!("wait_for_block_to_queue")) .await; // Wait until the block is actually persisted, so that the amount of blocks // stored in memory is bounded. @@ -155,21 +151,17 @@ impl Network { async { let _permit = permit; let number = number.into(); - let queue_span = tracing::info_span!("wait_for_batch_to_queue"); - let queue_span_copy = queue_span.clone(); let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { - let request_span = tracing::info_span!("fetch_block_request"); - request_span.follows_from(queue_span_copy.id()); s.spawn_bg( self.fetch_queue .request(ctx, RequestItem::Batch(number)) - .instrument(request_span), + .instrument(tracing::info_span!("fetch_block_request")), ); // Cancel fetching as soon as batch is queued for storage. self.batch_store.wait_until_queued(ctx, number).await?; Err(ctx::Canceled) }) - .instrument(queue_span) + .instrument(tracing::info_span!("wait_for_batch_to_queue")) .await; // Wait until the batch is actually persisted, so that the amount of batches // stored in memory is bounded.