diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index c677735a..efa74e1d 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -19,6 +19,7 @@ use crate::io::{InputMessage, OutputMessage}; use anyhow::Context; pub use config::Config; use std::sync::Arc; +use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, oneshot, scope}; use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; @@ -90,24 +91,34 @@ impl Config { // This is the infinite loop where the consensus actually runs. The validator waits for either // a message from the network or for a timeout, and processes each accordingly. loop { - let InputMessage::Network(req) = pipe.recv.recv(ctx).await?; - use validator::ConsensusMsg as M; - match &req.msg.msg { - M::ReplicaPrepare(_) => { - // This is a hacky way to do a clone. This is necessary since we don't want to derive - // Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway. - let (ack, _) = oneshot::channel(); - let new_req = ConsensusReq { - msg: req.msg.clone(), - ack, - }; + async { + let InputMessage::Network(req) = pipe + .recv + .recv(ctx) + .instrument(tracing::info_span!("wait_for_message")) + .await?; + use validator::ConsensusMsg as M; + match &req.msg.msg { + M::ReplicaPrepare(_) => { + // This is a hacky way to do a clone. This is necessary since we don't want to derive + // Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway. + let (ack, _) = oneshot::channel(); + let new_req = ConsensusReq { + msg: req.msg.clone(), + ack, + }; - replica_send.send(new_req); - leader_send.send(req); + replica_send.send(new_req); + leader_send.send(req); + } + M::ReplicaCommit(_) => leader_send.send(req), + M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req), } - M::ReplicaCommit(_) => leader_send.send(req), - M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req), + + ctx::Ok(()) } + .instrument(tracing::info_span!("bft_iter")) + .await?; } }) .await; diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index 8ccee89c..e0bd63d5 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -3,6 +3,7 @@ use crate::Attester; use anyhow::Context; use std::sync::Arc; +use tracing::Instrument; use zksync_concurrency::{ctx, sync, time}; use zksync_consensus_network::gossip::{ AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, @@ -58,56 +59,70 @@ impl AttesterRunner { self.status.mark_changed(); loop { - let Some(batch_number) = sync::changed(ctx, &mut self.status) - .await? - .next_batch_to_attest - else { - continue; - }; - - tracing::info!(%batch_number, "attestation status"); - - // We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence - // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, - // which for nodes that get the blocks through BFT should happen after execution. Nodes which - // rely on batch sync don't participate in attestations as they need the batch on L1 first. - self.batch_store - .wait_until_persisted(ctx, batch_number) + async { + let Some(batch_number) = sync::changed(ctx, &mut self.status) + .instrument(tracing::info_span!("wait_for_attestation_status")) + .await? + .next_batch_to_attest + else { + return Ok(()); + }; + + tracing::info!(%batch_number, "attestation status"); + + // We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence + // to be indicated in memory (which itself relies on polling). This happens once we have the commitment, + // which for nodes that get the blocks through BFT should happen after execution. Nodes which + // rely on batch sync don't participate in attestations as they need the batch on L1 first. + self.batch_store + .wait_until_persisted(ctx, batch_number) + .await?; + + // Try to get the next batch to sign; the commitment might not be available just yet. + let batch = AttesterRunner::wait_for_batch_to_sign( + ctx, + batch_number, + &self.batch_store, + self.poll_interval, + ) .await?; - // Try to get the next batch to sign; the commitment might not be available just yet. - let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?; + // The certificates might be collected out of order because of how gossip works; + // we could query the DB to see if we already have a QC, or we can just go ahead + // and publish our vote, and let others ignore it. - // The certificates might be collected out of order because of how gossip works; - // we could query the DB to see if we already have a QC, or we can just go ahead - // and publish our vote, and let others ignore it. + tracing::info!(%batch_number, "publishing attestation"); - tracing::info!(%batch_number, "publishing attestation"); + // We only have to publish a vote once; future peers can pull it from the register. + self.publisher + .publish(attesters, &genesis, &self.attester.key, batch) + .await + .context("publish")?; - // We only have to publish a vote once; future peers can pull it from the register. - self.publisher - .publish(attesters, &genesis, &self.attester.key, batch) - .await - .context("publish")?; + ctx::Ok(()) + } + .instrument(tracing::info_span!("attestation_iter")) + .await?; } } /// Wait for the batch commitment to become available. + #[tracing::instrument(skip_all, fields(l1_batch = %number))] async fn wait_for_batch_to_sign( - &self, ctx: &ctx::Ctx, number: attester::BatchNumber, + batch_store: &BatchStore, + poll_interval: time::Duration, ) -> ctx::Result { loop { - if let Some(batch) = self - .batch_store + if let Some(batch) = batch_store .batch_to_sign(ctx, number) .await .context("batch_to_sign")? { return Ok(batch); } else { - ctx.sleep(self.poll_interval).await?; + ctx.sleep(poll_interval).await?; } } } diff --git a/node/actors/network/src/consensus/handshake/mod.rs b/node/actors/network/src/consensus/handshake/mod.rs index 9dc6bdb6..5b0e7561 100644 --- a/node/actors/network/src/consensus/handshake/mod.rs +++ b/node/actors/network/src/consensus/handshake/mod.rs @@ -58,6 +58,7 @@ pub(super) enum Error { Stream(#[from] ctx::Error), } +#[tracing::instrument(name = "handshake::outbound", skip_all)] pub(super) async fn outbound( ctx: &ctx::Ctx, me: &validator::SecretKey, @@ -93,6 +94,7 @@ pub(super) async fn outbound( Ok(()) } +#[tracing::instrument(name = "handshake::inbound", skip_all)] pub(super) async fn inbound( ctx: &ctx::Ctx, me: &validator::SecretKey, diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index c7acea92..3d4f5ad9 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -163,7 +163,7 @@ impl Network { /// Performs handshake of an inbound stream. /// Closes the stream if there is another inbound stream opened from the same validator. - #[tracing::instrument(level = "info", name = "consensus", skip_all)] + #[tracing::instrument(name = "consensus::run_inbound_stream", skip_all)] pub(crate) async fn run_inbound_stream( &self, ctx: &ctx::Ctx, @@ -195,7 +195,7 @@ impl Network { res } - #[tracing::instrument(level = "info", name = "consensus", skip_all)] + #[tracing::instrument(name = "consensus::run_outbound_stream", skip_all, fields(?peer, %addr))] async fn run_outbound_stream( &self, ctx: &ctx::Ctx, @@ -282,6 +282,7 @@ impl Network { res } + #[tracing::instrument(skip_all)] async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let addr = *self .gossip @@ -295,7 +296,6 @@ impl Network { format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr) })?; self.run_outbound_stream(ctx, &self.key.public(), addr) - .instrument(tracing::info_span!("loopback", ?addr)) .await } @@ -314,24 +314,26 @@ impl Network { } let addrs = &mut self.gossip.validator_addrs.subscribe(); let mut addr = None; + while ctx.is_active() { - // Wait for a new address, or retry with the old one after timeout. - if let Ok(new) = - sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| { - addrs.get(peer).map(|x| x.msg.addr) != addr - }) - .await - { - addr = new.get(peer).map(|x| x.msg.addr); - } - let Some(addr) = addr else { continue }; - if let Err(err) = self - .run_outbound_stream(ctx, peer, addr) - .instrument(tracing::info_span!("out", ?addr)) - .await - { - tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}"); + async { + // Wait for a new address, or retry with the old one after timeout. + if let Ok(new) = + sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| { + addrs.get(peer).map(|x| x.msg.addr) != addr + }) + .instrument(tracing::info_span!("wait_for_address")) + .await + { + addr = new.get(peer).map(|x| x.msg.addr); + } + let Some(addr) = addr else { return }; + if let Err(err) = self.run_outbound_stream(ctx, peer, addr).await { + tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}"); + } } + .instrument(tracing::info_span!("maintain_connection_iter")) + .await; } } } diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index abe8d969..b48fb205 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -286,6 +286,7 @@ impl BatchVotesWatch { } /// Set the minimum batch number on the votes and discard old data. + #[tracing::instrument(skip_all, fields(%min_batch_number))] pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) { let this = self.0.lock().await; this.send_modify(|votes| votes.set_min_batch_number(min_batch_number)); @@ -308,6 +309,7 @@ impl fmt::Debug for BatchVotesPublisher { impl BatchVotesPublisher { /// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network. + #[tracing::instrument(skip_all, fields(l1_batch = %batch.number))] pub async fn publish( &self, attesters: &attester::Committee, diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 4e72acd0..7d22062d 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -21,6 +21,7 @@ pub use self::{ use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; +use tracing::Instrument; pub(crate) use validator_addrs::*; use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{node, validator}; @@ -109,20 +110,28 @@ impl Network { let number = ctx::NoCopy(next); next = next + 1; // Fetch a block asynchronously. - s.spawn(async { - let _permit = permit; - let number = number.into(); - let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { - s.spawn_bg(self.fetch_queue.request(ctx, RequestItem::Block(number))); - // Cancel fetching as soon as block is queued for storage. - self.block_store.wait_until_queued(ctx, number).await?; - Err(ctx::Canceled) - }) - .await; - // Wait until the block is actually persisted, so that the amount of blocks - // stored in memory is bounded. - self.block_store.wait_until_persisted(ctx, number).await - }); + s.spawn( + async { + let _permit = permit; + let number = number.into(); + let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { + s.spawn_bg( + self.fetch_queue + .request(ctx, RequestItem::Block(number)) + .instrument(tracing::info_span!("fetch_block_request")), + ); + // Cancel fetching as soon as block is queued for storage. + self.block_store.wait_until_queued(ctx, number).await?; + Err(ctx::Canceled) + }) + .instrument(tracing::info_span!("wait_for_block_to_queue")) + .await; + // Wait until the block is actually persisted, so that the amount of blocks + // stored in memory is bounded. + self.block_store.wait_until_persisted(ctx, number).await + } + .instrument(tracing::info_span!("fetch_block_from_peer", l2_block = %next)), + ); } }) .await; @@ -138,20 +147,28 @@ impl Network { let number = ctx::NoCopy(next); next = next + 1; // Fetch a batch asynchronously. - s.spawn(async { - let _permit = permit; - let number = number.into(); - let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { - s.spawn_bg(self.fetch_queue.request(ctx, RequestItem::Batch(number))); - // Cancel fetching as soon as batch is queued for storage. - self.batch_store.wait_until_queued(ctx, number).await?; - Err(ctx::Canceled) - }) - .await; - // Wait until the batch is actually persisted, so that the amount of batches - // stored in memory is bounded. - self.batch_store.wait_until_persisted(ctx, number).await - }); + s.spawn( + async { + let _permit = permit; + let number = number.into(); + let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { + s.spawn_bg( + self.fetch_queue + .request(ctx, RequestItem::Batch(number)) + .instrument(tracing::info_span!("fetch_block_request")), + ); + // Cancel fetching as soon as batch is queued for storage. + self.batch_store.wait_until_queued(ctx, number).await?; + Err(ctx::Canceled) + }) + .instrument(tracing::info_span!("wait_for_batch_to_queue")) + .await; + // Wait until the batch is actually persisted, so that the amount of batches + // stored in memory is bounded. + self.batch_store.wait_until_persisted(ctx, number).await + } + .instrument(tracing::info_span!("fetch_batch_from_peer", l1_batch = %next)), + ); } }) .await; @@ -173,32 +190,40 @@ impl Network { recv_status.mark_changed(); loop { - // Wait until the status indicates that we're ready to sign the next batch. - let Some(batch_number) = sync::changed(ctx, &mut recv_status) - .await? - .next_batch_to_attest - else { - continue; - }; + async { + // Wait until the status indicates that we're ready to sign the next batch. + let Some(batch_number) = sync::changed(ctx, &mut recv_status) + .instrument(tracing::info_span!("wait_for_attestation_status")) + .await? + .next_batch_to_attest + else { + return Ok(()); + }; - // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. - self.batch_votes.set_min_batch_number(batch_number).await; + // Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart. + self.batch_votes.set_min_batch_number(batch_number).await; - // Now wait until we find the next quorum, whatever it is: - // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps - // * on external nodes the votes might be affected by changes in the value returned by the API, and there might be gaps - // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters - // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. - // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. - let qc = sync::wait_for_some(ctx, &mut recv_votes, |votes| { - votes.find_quorum(attesters, &genesis) - }) - .await?; + // Now wait until we find the next quorum, whatever it is: + // * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps + // * on external nodes the votes might be affected by changes in the value returned by the API, and there might be gaps + // What is important, though, is that the batch number does not move backwards while we look for a quorum, because attesters + // (re)casting earlier votes will go ignored by those fixed on a higher min_batch_number, and gossip will only be attempted once. + // The possibility of this will be fixed by deterministally picking a start batch number based on fork indicated by genesis. + let qc = sync::wait_for_some(ctx, &mut recv_votes, |votes| { + votes.find_quorum(attesters, &genesis) + }) + .instrument(tracing::info_span!("wait_for_quorum")) + .await?; + + self.batch_store + .persist_batch_qc(ctx, qc) + .await + .wrap("persist_batch_qc")?; - self.batch_store - .persist_batch_qc(ctx, qc) - .await - .wrap("persist_batch_qc")?; + ctx::Ok(()) + } + .instrument(tracing::info_span!("new_votes_iter")) + .await?; } } } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 2596ccd7..effd0466 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -366,7 +366,7 @@ impl Network { /// Handles an inbound stream. /// Closes the stream if there is another inbound stream opened from the same peer. - #[tracing::instrument(level = "info", name = "gossip", skip_all)] + #[tracing::instrument(name = "gossip::run_inbound_stream", skip_all)] pub(crate) async fn run_inbound_stream( &self, ctx: &ctx::Ctx, @@ -384,7 +384,7 @@ impl Network { } /// Connects to a peer and handles the resulting stream. - #[tracing::instrument(level = "info", name = "gossip", skip_all)] + #[tracing::instrument(name = "gossip::run_outbound_stream", skip_all)] pub(crate) async fn run_outbound_stream( &self, ctx: &ctx::Ctx, diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 1d5f3cbd..94b692c6 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -151,7 +151,6 @@ impl Runner { .net .gossip .run_outbound_stream(ctx, peer, addr.clone()) - .instrument(tracing::info_span!("out", ?addr)) .await; if let Err(err) = res { tracing::info!("gossip.run_outbound_stream({addr:?}): {err:#}"); @@ -209,7 +208,7 @@ impl Runner { } anyhow::Ok(()) } - .instrument(tracing::info_span!("in", ?addr)) + .instrument(tracing::info_span!("accept_connection", ?addr)) .await; if let Err(err) = res { tracing::info!("{addr}: {err:#}"); diff --git a/node/libs/concurrency/src/ctx/mod.rs b/node/libs/concurrency/src/ctx/mod.rs index bf4f01d5..58ac2d54 100644 --- a/node/libs/concurrency/src/ctx/mod.rs +++ b/node/libs/concurrency/src/ctx/mod.rs @@ -185,8 +185,8 @@ impl Ctx { ) -> CtxAware>> { CtxAware(async { tokio::select! { - output = fut => Ok(output), - () = self.0.canceled.cancel_safe_recv() => Err(Canceled), + output = fut => OrCanceled::Ok(output), + () = self.0.canceled.cancel_safe_recv() => OrCanceled::Err(Canceled), } }) } @@ -271,6 +271,16 @@ pub enum Error { /// Alias for Result with `ctx::Error`. pub type Result = std::result::Result; +/// Equivalent to Ok::<_, ctx::Error>(value). +/// +/// This simplifies creation of an ctx::Result in places where type inference +/// cannot deduce the `E` type of the result — without needing to write +/// `Ok::<_, ctx::Error>(value)`. +#[allow(non_snake_case)] +pub fn Ok(t: T) -> Result { + Result::Ok(t) +} + impl crate::error::Wrap for Error { fn with_wrap C>(self, f: F) -> Self { match self { diff --git a/node/libs/concurrency/src/ctx/tests.rs b/node/libs/concurrency/src/ctx/tests.rs index cf24ac50..41f33cd3 100644 --- a/node/libs/concurrency/src/ctx/tests.rs +++ b/node/libs/concurrency/src/ctx/tests.rs @@ -9,7 +9,7 @@ async fn test_run_canceled() { scope::run!(ctx, |ctx, s| async { s.cancel(); assert!(!ctx.is_active()); - Ok::<(), ()>(()) + std::result::Result::Ok::<(), ()>(()) }) .await .unwrap(); @@ -46,7 +46,7 @@ async fn test_sleep_until() { }); clock.advance(1001 * sec); tracing::info!("root task terminating"); - Ok(()) + std::result::Result::Ok(()) }) .await; assert_eq!(Err(9), res); @@ -55,7 +55,7 @@ async fn test_sleep_until() { let res = scope::run!(ctx, |ctx, s| async { s.spawn(async { assert!(ctx.sleep_until(t).await.is_err()); - Ok(()) + std::result::Result::Ok(()) }); clock.advance_until(t - sec); R::Err(1) diff --git a/node/libs/storage/src/batch_store/mod.rs b/node/libs/storage/src/batch_store/mod.rs index 889014f1..43d060a0 100644 --- a/node/libs/storage/src/batch_store/mod.rs +++ b/node/libs/storage/src/batch_store/mod.rs @@ -1,6 +1,7 @@ //! Defines storage layer for batches of blocks. use anyhow::Context as _; use std::{collections::VecDeque, fmt, sync::Arc}; +use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; use zksync_consensus_roles::{attester, validator}; @@ -148,6 +149,7 @@ impl Inner { true } + #[tracing::instrument(skip_all)] fn truncate_cache(&mut self) { while self.cache.len() > Self::CACHE_CAPACITY && self.persisted.contains(self.cache[0].number) @@ -192,24 +194,41 @@ impl BatchStoreRunner { s.spawn::<()>(async { let mut persisted = persisted; loop { - let persisted = sync::changed(ctx, &mut persisted).await?.clone(); - self.0.inner.send_modify(|inner| { - // XXX: In `BlockStoreRunner` update both the `queued` and the `persisted` here. - inner.persisted = persisted; - inner.truncate_cache(); - }); + async { + let persisted = sync::changed(ctx, &mut persisted) + .instrument(tracing::info_span!("wait_for_batch_store_change")) + .await? + .clone(); + self.0.inner.send_modify(|inner| { + // XXX: In `BlockStoreRunner` update both the `queued` and the `persisted` here. + inner.persisted = persisted; + inner.truncate_cache(); + }); + + ctx::Ok(()) + } + .instrument(tracing::info_span!("truncate_batch_cache_iter")) + .await?; } }); let inner = &mut self.0.inner.subscribe(); loop { - let batch = sync::wait_for(ctx, inner, |inner| inner.queued.contains(queue_next)) - .await? - .batch(queue_next) - .unwrap(); + async { + let batch = + sync::wait_for(ctx, inner, |inner| inner.queued.contains(queue_next)) + .instrument(tracing::info_span!("wait_for_next_batch")) + .await? + .batch(queue_next) + .unwrap(); - queue_next = queue_next.next(); + queue_next = queue_next.next(); - self.0.persistent.queue_next_batch(ctx, batch).await?; + self.0.persistent.queue_next_batch(ctx, batch).await?; + + ctx::Ok(()) + } + .instrument(tracing::info_span!("queue_persist_batch_iter")) + .await?; } }) .await; @@ -347,6 +366,7 @@ impl BatchStore { } /// Wait until the database has a batch, then attach the corresponding QC. + #[tracing::instrument(skip_all, fields(l1_batch = %qc.message.number))] pub async fn persist_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { let t = metrics::BATCH_STORE.persist_batch_qc.start(); // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload @@ -368,6 +388,7 @@ impl BatchStore { /// Waits until the given batch is queued (in memory, or persisted). /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. + #[tracing::instrument(skip_all, fields(l1_batch = %number))] pub async fn wait_until_queued( &self, ctx: &ctx::Ctx, @@ -388,6 +409,7 @@ impl BatchStore { /// Waits until the given batch is stored persistently. /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. + #[tracing::instrument(skip_all, fields(l1_batch = %number))] pub async fn wait_until_persisted( &self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index a901c0fe..14397c19 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -1,6 +1,7 @@ //! Defines storage layer for finalized blocks. use anyhow::Context as _; use std::{collections::VecDeque, fmt, sync::Arc}; +use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; use zksync_consensus_roles::validator; @@ -117,6 +118,13 @@ impl Inner { } /// Updates `persisted` field. + #[tracing::instrument( + skip_all, + fields( + first_l2_block = %persisted.first, + last_l2_block = ?persisted.last.as_ref().map(|l| l.message.proposal.number) + ) + )] fn update_persisted(&mut self, persisted: BlockStoreState) -> anyhow::Result<()> { if persisted.next() < self.persisted.next() { anyhow::bail!("head block has been removed from storage, this is not supported"); @@ -181,25 +189,41 @@ impl BlockStoreRunner { let mut persisted = self.0.persistent.persisted(); persisted.mark_changed(); loop { - let new = sync::changed(ctx, &mut persisted).await?.clone(); - sync::try_send_modify(&self.0.inner, |inner| inner.update_persisted(new))?; + async { + let new = sync::changed(ctx, &mut persisted) + .instrument(tracing::info_span!("wait_for_block_store_change")) + .await? + .clone(); + sync::try_send_modify(&self.0.inner, |inner| inner.update_persisted(new))?; + + ctx::Ok(()) + } + .instrument(tracing::info_span!("watch_persistent_state_iteration")) + .await?; } }); // Task queueing blocks to be persisted. let inner = &mut self.0.inner.subscribe(); let mut queue_next = validator::BlockNumber(0); loop { - let block = sync::wait_for_some(ctx, inner, |inner| { - inner.block(queue_next.max(inner.persisted.next())) - }) + async { + let block = sync::wait_for_some(ctx, inner, |inner| { + inner.block(queue_next.max(inner.persisted.next())) + }) + .instrument(tracing::info_span!("wait_for_next_block")) + .await?; + queue_next = block.number().next(); + // TODO: monitor errors as well. + let t = metrics::PERSISTENT_BLOCK_STORE + .queue_next_block_latency + .start(); + self.0.persistent.queue_next_block(ctx, block).await?; + t.observe(); + + ctx::Ok(()) + } + .instrument(tracing::info_span!("queue_persist_block_iteration")) .await?; - queue_next = block.number().next(); - // TODO: monitor errors as well. - let t = metrics::PERSISTENT_BLOCK_STORE - .queue_next_block_latency - .start(); - self.0.persistent.queue_next_block(ctx, block).await?; - t.observe(); } }) .await; @@ -309,6 +333,7 @@ impl BlockStore { /// Waits until the given block is queued (in memory, or persisted). /// Note that it doesn't mean that the block is actually available, as old blocks might get pruned. + #[tracing::instrument(skip_all, fields(l2_block = %number))] pub async fn wait_until_queued( &self, ctx: &ctx::Ctx, @@ -324,6 +349,7 @@ impl BlockStore { /// Waits until the given block is stored persistently. /// Note that it doesn't mean that the block is actually available, as old blocks might get pruned. + #[tracing::instrument(skip_all, fields(l2_block = %number))] pub async fn wait_until_persisted( &self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 79a21207..2ca8858e 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -202,6 +202,7 @@ impl PersistentBatchStore for BatchStore { Ok(batches.get(idx as usize).cloned()) } + #[tracing::instrument(skip_all, fields(l1_batch = %batch.number))] async fn queue_next_batch( &self, _ctx: &ctx::Ctx, diff --git a/node/tools/src/store.rs b/node/tools/src/store.rs index 8847a27c..c6622b75 100644 --- a/node/tools/src/store.rs +++ b/node/tools/src/store.rs @@ -137,7 +137,7 @@ impl PersistentBlockStore for RocksDB { .wrap(number) } - #[tracing::instrument(level = "debug", skip(self))] + #[tracing::instrument(skip_all, fields(l2_block = %block.justification.message.proposal.number))] async fn queue_next_block( &self, _ctx: &ctx::Ctx,