Skip to content

Commit

Permalink
fix(consensus): better logging of errors (#3170)
Browse files Browse the repository at this point in the history
Added more debug information in case of payload mismatch.
Improved formatting of errors in the node_framework.
  • Loading branch information
pompon0 authored Oct 24, 2024
1 parent ffa18e1 commit a5028da
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 23 deletions.
12 changes: 5 additions & 7 deletions core/lib/dal/src/consensus_dal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ pub struct ConsensusDal<'a, 'c> {
pub enum InsertCertificateError {
#[error("corresponding payload is missing")]
MissingPayload,
#[error("certificate doesn't match the payload")]
PayloadMismatch,
#[error("certificate doesn't match the payload, payload = {0:?}")]
PayloadMismatch(Payload),
#[error(transparent)]
Dal(#[from] DalError),
#[error(transparent)]
Expand Down Expand Up @@ -528,7 +528,7 @@ impl ConsensusDal<'_, '_> {
.await?
.ok_or(E::MissingPayload)?;
if header.payload != want_payload.encode().hash() {
return Err(E::PayloadMismatch);
return Err(E::PayloadMismatch(want_payload));
}
sqlx::query!(
r#"
Expand Down Expand Up @@ -634,7 +634,7 @@ impl ConsensusDal<'_, '_> {
pub async fn insert_batch_certificate(
&mut self,
cert: &attester::BatchQC,
) -> Result<(), InsertCertificateError> {
) -> anyhow::Result<()> {
let cfg = self
.global_config()
.await
Expand All @@ -652,9 +652,7 @@ impl ConsensusDal<'_, '_> {
.context("batch()")?
.context("batch is missing")?,
);
if cert.message.hash != hash {
return Err(InsertCertificateError::PayloadMismatch);
}
anyhow::ensure!(cert.message.hash == hash, "hash mismatch");
cert.verify(cfg.genesis.hash(), &committee)
.context("cert.verify()")?;
sqlx::query!(
Expand Down
11 changes: 8 additions & 3 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,11 @@ impl EN {
let mut next = attester::BatchNumber(0);
loop {
let status = loop {
match self.fetch_attestation_status(ctx).await {
match self
.fetch_attestation_status(ctx)
.await
.wrap("fetch_attestation_status()")
{
Err(err) => tracing::warn!("{err:#}"),
Ok(status) => {
if status.genesis != cfg.genesis.hash() {
Expand Down Expand Up @@ -439,7 +443,7 @@ impl EN {
});
while end.map_or(true, |end| queue.next() < end) {
let block = recv.recv(ctx).await?.join(ctx).await?;
queue.send(block).await?;
queue.send(block).await.context("queue.send()")?;
}
Ok(())
})
Expand All @@ -448,7 +452,8 @@ impl EN {
if first < queue.next() {
self.pool
.wait_for_payload(ctx, queue.next().prev().unwrap())
.await?;
.await
.wrap("wait_for_payload()")?;
}
Ok(())
}
Expand Down
7 changes: 2 additions & 5 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use zksync_dal::consensus_dal;

use crate::{
config, registry,
storage::{ConnectionPool, InsertCertificateError, Store},
storage::{ConnectionPool, Store},
};

/// Task running a consensus validator for the main node.
Expand Down Expand Up @@ -179,10 +179,7 @@ async fn run_attestation_controller(
.wrap("connection()")?
.insert_batch_certificate(ctx, &qc)
.await
.map_err(|err| match err {
InsertCertificateError::Canceled(err) => ctx::Error::Canceled(err),
InsertCertificateError::Inner(err) => ctx::Error::Internal(err.into()),
})?;
.wrap("insert_batch_certificate()")?;
}
}
.await;
Expand Down
2 changes: 1 addition & 1 deletion core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl<'a> Connection<'a> {
&mut self,
ctx: &ctx::Ctx,
cert: &attester::BatchQC,
) -> Result<(), super::InsertCertificateError> {
) -> ctx::Result<()> {
Ok(ctx
.wait(self.0.consensus_dal().insert_batch_certificate(cert))
.await??)
Expand Down
4 changes: 1 addition & 3 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ impl StoreRunner {
Err(InsertCertificateError::Canceled(err)) => {
return Err(ctx::Error::Canceled(err))
}
Err(InsertCertificateError::Inner(err)) => {
return Err(ctx::Error::Internal(anyhow::Error::from(err)))
}
Err(err) => Err(err).context("insert_block_certificate()")?,
}
}

Expand Down
27 changes: 24 additions & 3 deletions core/node/node_framework/src/service/error.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
use std::fmt;

use crate::{task::TaskId, wiring_layer::WiringError};

/// An error that can occur during the task lifecycle.
#[derive(Debug, thiserror::Error)]
pub enum TaskError {
#[error("Task {0} failed: {1}")]
#[error("Task {0} failed: {1:#}")]
TaskFailed(TaskId, anyhow::Error),
#[error("Task {0} panicked: {1}")]
TaskPanicked(TaskId, String),
#[error("Shutdown for task {0} timed out")]
TaskShutdownTimedOut(TaskId),
#[error("Shutdown hook {0} failed: {1}")]
#[error("Shutdown hook {0} failed: {1:#}")]
ShutdownHookFailed(TaskId, anyhow::Error),
#[error("Shutdown hook {0} timed out")]
ShutdownHookTimedOut(TaskId),
}

/// Wrapper of a list of errors with a reasonable formatting.
pub struct TaskErrors(pub Vec<TaskError>);

impl From<Vec<TaskError>> for TaskErrors {
fn from(errs: Vec<TaskError>) -> Self {
Self(errs)
}
}

impl fmt::Debug for TaskErrors {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0
.iter()
.map(|err| format!("{err:#}"))
.collect::<Vec<_>>()
.fmt(f)
}
}

/// An error that can occur during the service lifecycle.
#[derive(Debug, thiserror::Error)]
pub enum ZkStackServiceError {
Expand All @@ -25,5 +46,5 @@ pub enum ZkStackServiceError {
#[error("One or more wiring layers failed to initialize: {0:?}")]
Wiring(Vec<(String, WiringError)>),
#[error("One or more tasks failed: {0:?}")]
Task(Vec<TaskError>),
Task(TaskErrors),
}
2 changes: 1 addition & 1 deletion core/node/node_framework/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl ZkStackService {
if self.errors.is_empty() {
Ok(())
} else {
Err(ZkStackServiceError::Task(self.errors))
Err(ZkStackServiceError::Task(self.errors.into()))
}
}

Expand Down

0 comments on commit a5028da

Please sign in to comment.