Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: instrument code with tracing spans #178

Merged
merged 7 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::io::{InputMessage, OutputMessage};
use anyhow::Context;
pub use config::Config;
use std::sync::Arc;
use tracing::Instrument;
use zksync_concurrency::{ctx, error::Wrap as _, oneshot, scope};
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::validator;
Expand Down Expand Up @@ -90,24 +91,34 @@ impl Config {
// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
loop {
let InputMessage::Network(req) = pipe.recv.recv(ctx).await?;
use validator::ConsensusMsg as M;
match &req.msg.msg {
M::ReplicaPrepare(_) => {
// This is a hacky way to do a clone. This is necessary since we don't want to derive
// Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway.
let (ack, _) = oneshot::channel();
let new_req = ConsensusReq {
msg: req.msg.clone(),
ack,
};
async {
let InputMessage::Network(req) = pipe
.recv
.recv(ctx)
.instrument(tracing::info_span!("wait_for_message"))
.await?;
use validator::ConsensusMsg as M;
match &req.msg.msg {
M::ReplicaPrepare(_) => {
// This is a hacky way to do a clone. This is necessary since we don't want to derive
// Clone for ConsensusReq. When we change to ChonkyBFT this will be removed anyway.
let (ack, _) = oneshot::channel();
let new_req = ConsensusReq {
msg: req.msg.clone(),
ack,
};

replica_send.send(new_req);
leader_send.send(req);
replica_send.send(new_req);
leader_send.send(req);
}
M::ReplicaCommit(_) => leader_send.send(req),
M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req),
}
M::ReplicaCommit(_) => leader_send.send(req),
M::LeaderPrepare(_) | M::LeaderCommit(_) => replica_send.send(req),

anyhow::Ok(())
}
.instrument(tracing::info_span!("bft_iter"))
.await?;
}
})
.await;
Expand Down
75 changes: 45 additions & 30 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::Attester;
use anyhow::Context;
use std::sync::Arc;
use tracing::Instrument;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{
AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
Expand Down Expand Up @@ -58,56 +59,70 @@ impl AttesterRunner {
self.status.mark_changed();

loop {
let Some(batch_number) = sync::changed(ctx, &mut self.status)
.await?
.next_batch_to_attest
else {
continue;
};

tracing::info!(%batch_number, "attestation status");

// We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence
// to be indicated in memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
async {
let Some(batch_number) = sync::changed(ctx, &mut self.status)
.instrument(tracing::info_span!("wait_for_attestation_status"))
.await?
.next_batch_to_attest
else {
return Ok(());
};

tracing::info!(%batch_number, "attestation status");

// We can avoid actively polling the database in `wait_for_batch_to_sign` by waiting its existence
// to be indicated in memory (which itself relies on polling). This happens once we have the commitment,
// which for nodes that get the blocks through BFT should happen after execution. Nodes which
// rely on batch sync don't participate in attestations as they need the batch on L1 first.
self.batch_store
.wait_until_persisted(ctx, batch_number)
.await?;

// Try to get the next batch to sign; the commitment might not be available just yet.
let batch = AttesterRunner::wait_for_batch_to_sign(
ctx,
batch_number,
&self.batch_store,
self.poll_interval,
)
.await?;

// Try to get the next batch to sign; the commitment might not be available just yet.
let batch = self.wait_for_batch_to_sign(ctx, batch_number).await?;
// The certificates might be collected out of order because of how gossip works;
// we could query the DB to see if we already have a QC, or we can just go ahead
// and publish our vote, and let others ignore it.

// The certificates might be collected out of order because of how gossip works;
// we could query the DB to see if we already have a QC, or we can just go ahead
// and publish our vote, and let others ignore it.
tracing::info!(%batch_number, "publishing attestation");

tracing::info!(%batch_number, "publishing attestation");
// We only have to publish a vote once; future peers can pull it from the register.
self.publisher
.publish(attesters, &genesis, &self.attester.key, batch)
.await
.context("publish")?;

// We only have to publish a vote once; future peers can pull it from the register.
self.publisher
.publish(attesters, &genesis, &self.attester.key, batch)
.await
.context("publish")?;
ctx::Ok(())
}
.instrument(tracing::info_span!("attestation_iter"))
.await?;
}
}

/// Wait for the batch commitment to become available.
#[tracing::instrument(skip_all, fields(l1_batch = %number))]
async fn wait_for_batch_to_sign(
&self,
ctx: &ctx::Ctx,
number: attester::BatchNumber,
batch_store: &BatchStore,
poll_interval: time::Duration,
) -> ctx::Result<attester::Batch> {
itegulov marked this conversation as resolved.
Show resolved Hide resolved
loop {
if let Some(batch) = self
.batch_store
if let Some(batch) = batch_store
.batch_to_sign(ctx, number)
.await
.context("batch_to_sign")?
{
return Ok(batch);
} else {
ctx.sleep(self.poll_interval).await?;
ctx.sleep(poll_interval).await?;
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/consensus/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub(super) enum Error {
Stream(#[from] ctx::Error),
}

#[tracing::instrument(name = "handshake::outbound", skip_all)]
pub(super) async fn outbound(
ctx: &ctx::Ctx,
me: &validator::SecretKey,
Expand Down Expand Up @@ -93,6 +94,7 @@ pub(super) async fn outbound(
Ok(())
}

#[tracing::instrument(name = "handshake::inbound", skip_all)]
pub(super) async fn inbound(
ctx: &ctx::Ctx,
me: &validator::SecretKey,
Expand Down
40 changes: 21 additions & 19 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl Network {

/// Performs handshake of an inbound stream.
/// Closes the stream if there is another inbound stream opened from the same validator.
#[tracing::instrument(level = "info", name = "consensus", skip_all)]
#[tracing::instrument(name = "consensus::run_inbound_stream", skip_all)]
pub(crate) async fn run_inbound_stream(
&self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Network {
res
}

#[tracing::instrument(level = "info", name = "consensus", skip_all)]
#[tracing::instrument(name = "consensus::run_outbound_stream", skip_all, fields(?peer, %addr))]
async fn run_outbound_stream(
&self,
ctx: &ctx::Ctx,
Expand Down Expand Up @@ -282,6 +282,7 @@ impl Network {
res
}

#[tracing::instrument(skip_all)]
async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let addr = *self
.gossip
Expand All @@ -295,7 +296,6 @@ impl Network {
format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr)
})?;
self.run_outbound_stream(ctx, &self.key.public(), addr)
.instrument(tracing::info_span!("loopback", ?addr))
.await
}

Expand All @@ -314,24 +314,26 @@ impl Network {
}
let addrs = &mut self.gossip.validator_addrs.subscribe();
let mut addr = None;

while ctx.is_active() {
// Wait for a new address, or retry with the old one after timeout.
if let Ok(new) =
sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| {
addrs.get(peer).map(|x| x.msg.addr) != addr
})
.await
{
addr = new.get(peer).map(|x| x.msg.addr);
}
let Some(addr) = addr else { continue };
if let Err(err) = self
.run_outbound_stream(ctx, peer, addr)
.instrument(tracing::info_span!("out", ?addr))
.await
{
tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}");
async {
// Wait for a new address, or retry with the old one after timeout.
if let Ok(new) =
sync::wait_for(&ctx.with_timeout(config::CONNECT_RETRY), addrs, |addrs| {
addrs.get(peer).map(|x| x.msg.addr) != addr
})
.instrument(tracing::info_span!("wait_for_address"))
.await
{
addr = new.get(peer).map(|x| x.msg.addr);
}
let Some(addr) = addr else { return };
if let Err(err) = self.run_outbound_stream(ctx, peer, addr).await {
tracing::info!("run_outbound_stream({peer:?},{addr}): {err:#}");
}
}
.instrument(tracing::info_span!("maintain_connection_iter"))
.await;
}
}
}
2 changes: 2 additions & 0 deletions node/actors/network/src/gossip/batch_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ impl BatchVotesWatch {
}

/// Set the minimum batch number on the votes and discard old data.
#[tracing::instrument(skip_all, fields(%min_batch_number))]
pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_modify(|votes| votes.set_min_batch_number(min_batch_number));
Expand All @@ -308,6 +309,7 @@ impl fmt::Debug for BatchVotesPublisher {

impl BatchVotesPublisher {
/// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network.
#[tracing::instrument(skip_all, fields(l1_batch = %batch.number))]
pub async fn publish(
&self,
attesters: &attester::Committee,
Expand Down
Loading
Loading