diff --git a/Cargo.lock b/Cargo.lock index 4bb61bf9c9a3..8d578412c3d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4126,6 +4126,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "opentelemetry-http" version = "0.13.0" @@ -9669,6 +9681,7 @@ dependencies = [ "anyhow", "chrono", "opentelemetry", + "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", diff --git a/Cargo.toml b/Cargo.toml index 0e3f7e32e82c..fc08b524d086 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -143,6 +143,7 @@ opentelemetry = "0.24.0" opentelemetry_sdk = "0.24.0" opentelemetry-otlp = "0.17.0" opentelemetry-semantic-conventions = "0.16.0" +opentelemetry-appender-tracing = "0.5" pin-project-lite = "0.2.13" pretty_assertions = "1" prost = "0.12.1" diff --git a/core/lib/config/src/configs/observability.rs b/core/lib/config/src/configs/observability.rs index 0bc61df31197..42363cbcb4ff 100644 --- a/core/lib/config/src/configs/observability.rs +++ b/core/lib/config/src/configs/observability.rs @@ -19,6 +19,12 @@ pub struct ObservabilityConfig { pub struct OpentelemetryConfig { /// Enables export of span data of specified level (and above) using opentelemetry exporters. pub level: String, - /// Opentelemetry HTTP collector endpoint. + /// Opentelemetry HTTP traces collector endpoint. pub endpoint: String, + /// Opentelemetry HTTP logs collector endpoing. + /// This is optional, since right now the primary way to collect logs is via stdout. + /// + /// Important: sending logs via OTLP has only been tested locally, and the performance may be + /// suboptimal in production environments. + pub logs_endpoint: Option, } diff --git a/core/lib/config/src/observability_ext.rs b/core/lib/config/src/observability_ext.rs index 5f8a8927efd5..641b095eb3b9 100644 --- a/core/lib/config/src/observability_ext.rs +++ b/core/lib/config/src/observability_ext.rs @@ -46,7 +46,13 @@ impl TryFrom for Option { fn try_from(config: ObservabilityConfig) -> Result { Ok(config .opentelemetry - .map(|config| zksync_vlog::OpenTelemetry::new(&config.level, config.endpoint)) + .map(|config| { + zksync_vlog::OpenTelemetry::new( + &config.level, + Some(config.endpoint), + config.logs_endpoint, + ) + }) .transpose()?) } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 2c2934859fe5..f77321caadb5 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -681,6 +681,7 @@ impl Distribution for EncodeDist { configs::OpentelemetryConfig { level: self.sample(rng), endpoint: self.sample(rng), + logs_endpoint: self.sample(rng), } } } diff --git a/core/lib/env_config/src/observability.rs b/core/lib/env_config/src/observability.rs index 3463ee189957..3376189fa61d 100644 --- a/core/lib/env_config/src/observability.rs +++ b/core/lib/env_config/src/observability.rs @@ -35,8 +35,13 @@ impl FromEnv for ObservabilityConfig { }; let opentelemetry_level = std::env::var("OPENTELEMETRY_LEVEL").ok(); let otlp_endpoint = std::env::var("OTLP_ENDPOINT").ok(); + let logs_endpoint = std::env::var("OTLP_LOGS_ENDPOINT").ok(); // OK to be absent. let opentelemetry = match (opentelemetry_level, otlp_endpoint) { - (Some(level), Some(endpoint)) => Some(OpentelemetryConfig { level, endpoint }), + (Some(level), Some(endpoint)) => Some(OpentelemetryConfig { + level, + endpoint, + logs_endpoint, + }), _ => None, }; diff --git a/core/lib/protobuf_config/src/observability.rs b/core/lib/protobuf_config/src/observability.rs index e86894f29583..dcf87771b587 100644 --- a/core/lib/protobuf_config/src/observability.rs +++ b/core/lib/protobuf_config/src/observability.rs @@ -66,6 +66,7 @@ impl ProtoRepr for proto::Opentelemetry { Ok(Self::Type { level: required(&self.level).context("level")?.clone(), endpoint: required(&self.endpoint).context("endpoint")?.clone(), + logs_endpoint: self.logs_endpoint.clone(), }) } @@ -73,6 +74,7 @@ impl ProtoRepr for proto::Opentelemetry { Self { level: Some(this.level.clone()), endpoint: Some(this.endpoint.clone()), + logs_endpoint: this.logs_endpoint.clone(), } } } diff --git a/core/lib/protobuf_config/src/proto/config/observability.proto b/core/lib/protobuf_config/src/proto/config/observability.proto index 773ea51a2f34..aa2fbdd4521f 100644 --- a/core/lib/protobuf_config/src/proto/config/observability.proto +++ b/core/lib/protobuf_config/src/proto/config/observability.proto @@ -22,4 +22,5 @@ message Observability { message Opentelemetry { optional string level = 1; // required optional string endpoint = 2; // required + optional string logs_endpoint = 3; // optional } diff --git a/core/lib/vlog/Cargo.toml b/core/lib/vlog/Cargo.toml index 3f9ce247442a..c656e52a05a8 100644 --- a/core/lib/vlog/Cargo.toml +++ b/core/lib/vlog/Cargo.toml @@ -34,6 +34,7 @@ opentelemetry-otlp = { workspace = true, features = [ "reqwest-client", ] } opentelemetry-semantic-conventions.workspace = true +opentelemetry-appender-tracing.workspace = true vise.workspace = true vise-exporter.workspace = true url.workspace = true diff --git a/core/lib/vlog/src/lib.rs b/core/lib/vlog/src/lib.rs index aebd413b749d..48b02c6958a6 100644 --- a/core/lib/vlog/src/lib.rs +++ b/core/lib/vlog/src/lib.rs @@ -25,8 +25,11 @@ pub struct ObservabilityBuilder { /// Guard for the observability subsystem. /// Releases configured integrations upon being dropped. pub struct ObservabilityGuard { - /// Opentelemetry provider. Can be used to force flush spans. - otlp_provider: Option, + /// Opentelemetry traces provider + otlp_tracing_provider: Option, + /// Opentelemetry logs provider + otlp_logging_provider: Option, + /// Sentry client guard sentry_guard: Option, } @@ -41,7 +44,15 @@ impl ObservabilityGuard { sentry_guard.flush(Some(FLUSH_TIMEOUT)); } - if let Some(provider) = &self.otlp_provider { + if let Some(provider) = &self.otlp_tracing_provider { + for result in provider.force_flush() { + if let Err(err) = result { + tracing::warn!("Flushing the spans failed: {err:?}"); + } + } + } + + if let Some(provider) = &self.otlp_logging_provider { for result in provider.force_flush() { if let Err(err) = result { tracing::warn!("Flushing the spans failed: {err:?}"); @@ -59,7 +70,12 @@ impl ObservabilityGuard { if let Some(sentry_guard) = &self.sentry_guard { sentry_guard.close(Some(SHUTDOWN_TIMEOUT)); } - if let Some(provider) = &self.otlp_provider { + if let Some(provider) = &self.otlp_tracing_provider { + if let Err(err) = provider.shutdown() { + tracing::warn!("Shutting down the provider failed: {err:?}"); + } + } + if let Some(provider) = &self.otlp_logging_provider { if let Err(err) = provider.shutdown() { tracing::warn!("Shutting down the provider failed: {err:?}"); } @@ -111,21 +127,28 @@ impl ObservabilityBuilder { let global_filter = logs.build_filter(); let logs_layer = logs.into_layer(); - let (otlp_provider, otlp_layer) = self + let (otlp_tracing_provider, otlp_tracing_layer) = self + .opentelemetry_layer + .as_ref() + .and_then(|layer| layer.tracing_layer()) + .unzip(); + let (otlp_logging_provider, otlp_logging_layer) = self .opentelemetry_layer - .map(|layer| layer.into_layer()) + .and_then(|layer| layer.logs_layer()) .unzip(); tracing_subscriber::registry() .with(global_filter) .with(logs_layer) - .with(otlp_layer) + .with(otlp_tracing_layer) + .with(otlp_logging_layer) .init(); let sentry_guard = self.sentry.map(|sentry| sentry.install()); ObservabilityGuard { - otlp_provider, + otlp_tracing_provider, + otlp_logging_provider, sentry_guard, } } diff --git a/core/lib/vlog/src/opentelemetry/mod.rs b/core/lib/vlog/src/opentelemetry/mod.rs index 1085f6c6db06..a5ff6477f738 100644 --- a/core/lib/vlog/src/opentelemetry/mod.rs +++ b/core/lib/vlog/src/opentelemetry/mod.rs @@ -14,67 +14,88 @@ use tracing_subscriber::{registry::LookupSpan, EnvFilter, Layer}; use url::Url; /// Information about the service. -#[derive(Debug, Default)] +/// +/// This information is initially filled as follows: +/// - Fields will be attempted to fetch from environment variables. See [`ServiceDescriptor::fill_from_env`]. +/// - If not found, some default values will be chosen. +#[derive(Debug, Clone)] #[non_exhaustive] pub struct ServiceDescriptor { /// Name of the k8s pod. - pub k8s_pod_name: Option, + /// If not provided directly or though env variable, the default value would be `zksync-0`. + pub k8s_pod_name: String, /// Name of the k8s namespace. - pub k8s_namespace_name: Option, + /// If not provided directly or through env variable, the default value would be `local`. + pub k8s_namespace_name: String, /// Name of the service. - pub service_name: Option, + /// If not provided directly or through env variable, the default value would be `zksync`. + pub service_name: String, +} + +impl Default for ServiceDescriptor { + fn default() -> Self { + Self::new() + } } impl ServiceDescriptor { + /// Environment variable to fetch the k8s pod name. + pub const K8S_POD_NAME_ENV_VAR: &'static str = "POD_NAME"; + /// Environment variable to fetch the k8s namespace name. + pub const K8S_NAMESPACE_NAME_ENV_VAR: &'static str = "POD_NAMESPACE"; + /// Environment variable to fetch the service name. + pub const SERVICE_NAME_ENV_VAR: &'static str = "SERVICE_NAME"; + /// Default value for the k8s pod name. + pub const DEFAULT_K8S_POD_NAME: &'static str = "zksync-0"; + /// Default value for the k8s namespace name. + pub const DEFAULT_K8S_NAMESPACE_NAME: &'static str = "local"; + /// Default value for the service name. + pub const DEFAULT_SERVICE_NAME: &'static str = "zksync"; + + /// Creates a filled `ServiceDescriptor` object. + /// Fetched fields can be overridden. pub fn new() -> Self { - Self::default() + // Attempt fetching data from environment variables, and use defaults if not provided. + fn env_or(env_var: &str, default: &str) -> String { + std::env::var(env_var).unwrap_or_else(|_| default.to_string()) + } + Self { + k8s_pod_name: env_or(Self::K8S_POD_NAME_ENV_VAR, Self::DEFAULT_K8S_POD_NAME), + k8s_namespace_name: env_or( + Self::K8S_NAMESPACE_NAME_ENV_VAR, + Self::DEFAULT_K8S_NAMESPACE_NAME, + ), + service_name: env_or(Self::SERVICE_NAME_ENV_VAR, Self::DEFAULT_SERVICE_NAME), + } } pub fn with_k8s_pod_name(mut self, k8s_pod_name: Option) -> Self { - self.k8s_pod_name = k8s_pod_name; + if let Some(k8s_pod_name) = k8s_pod_name { + self.k8s_pod_name = k8s_pod_name; + } self } pub fn with_k8s_namespace_name(mut self, k8s_namespace_name: Option) -> Self { - self.k8s_namespace_name = k8s_namespace_name; + if let Some(k8s_namespace_name) = k8s_namespace_name { + self.k8s_namespace_name = k8s_namespace_name; + } self } pub fn with_service_name(mut self, service_name: Option) -> Self { - self.service_name = service_name; - self - } - - /// Tries to fill empty fields from environment variables. - /// - /// The following environment variables are used: - /// - `POD_NAME` - /// - `POD_NAMESPACE` - /// - `SERVICE_NAME` - pub fn fill_from_env(mut self) -> Self { - if self.k8s_pod_name.is_none() { - self.k8s_pod_name = std::env::var("POD_NAME").ok(); - } - if self.k8s_namespace_name.is_none() { - self.k8s_namespace_name = std::env::var("POD_NAMESPACE").ok(); - } - if self.service_name.is_none() { - self.service_name = std::env::var("SERVICE_NAME").ok(); + if let Some(service_name) = service_name { + self.service_name = service_name; } self } fn into_otlp_resource(self) -> Resource { - let mut attributes = vec![]; - if let Some(pod_name) = self.k8s_pod_name { - attributes.push(KeyValue::new(K8S_POD_NAME, pod_name)); - } - if let Some(pod_namespace) = self.k8s_namespace_name { - attributes.push(KeyValue::new(K8S_NAMESPACE_NAME, pod_namespace)); - } - if let Some(service_name) = self.service_name { - attributes.push(KeyValue::new(SERVICE_NAME, service_name)); - } + let attributes = vec![ + KeyValue::new(K8S_POD_NAME, self.k8s_pod_name), + KeyValue::new(K8S_NAMESPACE_NAME, self.k8s_namespace_name), + KeyValue::new(SERVICE_NAME, self.service_name), + ]; Resource::new(attributes) } } @@ -83,57 +104,109 @@ impl ServiceDescriptor { pub struct OpenTelemetry { /// Enables export of span data of specified level (and above) using opentelemetry exporters. pub opentelemetry_level: OpenTelemetryLevel, - /// Opentelemetry HTTP collector endpoint. - pub otlp_endpoint: Url, + /// Opentelemetry HTTP collector endpoint for traces. + pub tracing_endpoint: Option, + /// Opentelemetry HTTP collector endpoint for logs. + pub logging_endpoint: Option, /// Information about service - pub service: Option, + pub service: ServiceDescriptor, } impl OpenTelemetry { pub fn new( opentelemetry_level: &str, - otlp_endpoint: String, + tracing_endpoint: Option, + logging_endpoint: Option, ) -> Result { + fn parse_url(url: Option) -> Result, OpenTelemetryLayerError> { + url.map(|v| { + v.parse() + .map_err(|e| OpenTelemetryLayerError::InvalidUrl(v, e)) + }) + .transpose() + } + Ok(Self { opentelemetry_level: opentelemetry_level.parse()?, - otlp_endpoint: otlp_endpoint - .parse() - .map_err(|e| OpenTelemetryLayerError::InvalidUrl(otlp_endpoint, e))?, - service: None, + tracing_endpoint: parse_url(tracing_endpoint)?, + logging_endpoint: parse_url(logging_endpoint)?, + service: ServiceDescriptor::new(), }) } + /// Can be used to override the service descriptor used by the layer. pub fn with_service_descriptor(mut self, service: ServiceDescriptor) -> Self { - self.service = Some(service); + self.service = service; self } - pub(super) fn into_layer(self) -> (opentelemetry_sdk::trace::TracerProvider, impl Layer) + /// Prepares an exporter for OTLP logs and layer for the `tracing` library. + /// Will return `None` if no logging URL was provided. + /// + /// *Important*: we use `tracing` library to generate logs, and convert the logs + /// to OTLP format when exporting. However, `tracing` doesn't provide information + /// about timestamp of the log. While this value is optional in OTLP, some + /// collectors/processors may ignore logs without timestamp. Thus, you may need to + /// have a proxy collector, like `opentelemetry-collector-contrib` or `vector`, and + /// use the functionality there to set the timestamp. Here's example configuration + /// for `opentelemetry-collector-contrib`: + /// + /// ```text + /// processors: + /// transform/set_time_unix_nano: + /// log_statements: + /// - context: log + /// statements: + /// - set(time_unix_nano, observed_time_unix_nano) + /// ``` + pub(super) fn logs_layer( + &self, + ) -> Option<(opentelemetry_sdk::logs::LoggerProvider, impl Layer)> where S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, { - let filter = match self.opentelemetry_level { - OpenTelemetryLevel::OFF => EnvFilter::new("off"), - OpenTelemetryLevel::INFO => EnvFilter::new("info"), - OpenTelemetryLevel::DEBUG => EnvFilter::new("debug"), - OpenTelemetryLevel::TRACE => EnvFilter::new("trace"), - }; + let logging_endpoint = self.logging_endpoint.clone()?; + let resource = self.service.clone().into_otlp_resource(); + + let exporter = opentelemetry_otlp::new_exporter() + .http() + .with_endpoint(logging_endpoint) + .build_log_exporter() + .expect("Failed to create OTLP exporter"); // URL is validated. + + let provider = opentelemetry_sdk::logs::LoggerProvider::builder() + .with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio) + .with_resource(resource) + .build(); + + let layer = + opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new(&provider); + + Some((provider, layer)) + } + + /// Prepares an exporter for OTLP traces and layer for `tracing` library. + /// Will return `None` if no tracing URL was provided. + pub(super) fn tracing_layer( + &self, + ) -> Option<(opentelemetry_sdk::trace::TracerProvider, impl Layer)> + where + S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, + { + let tracing_endpoint = self.tracing_endpoint.clone()?; // `otel::tracing` should be a level info to emit opentelemetry trace & span // `otel` set to debug to log detected resources, configuration read and inferred - let filter = filter + let filter = self + .filter() .add_directive("otel::tracing=trace".parse().unwrap()) .add_directive("otel=debug".parse().unwrap()); - let service = self.service.unwrap_or_default().fill_from_env(); - let service_name = service - .service_name - .clone() - .unwrap_or_else(|| "zksync_vlog".to_string()); - let resource = service.into_otlp_resource(); + let service_name = self.service.service_name.clone(); + let resource = self.service.clone().into_otlp_resource(); let exporter = opentelemetry_otlp::new_exporter() .http() - .with_endpoint(self.otlp_endpoint) + .with_endpoint(tracing_endpoint) .build_span_exporter() .expect("Failed to create OTLP exporter"); // URL is validated. @@ -155,7 +228,19 @@ impl OpenTelemetry { .with_tracer(tracer) .with_filter(filter); - (provider, layer) + Some((provider, layer)) + } + + /// Returns a filter for opentelemetry layer. + /// It's applied to the layer only, but note that there might be a global filter applied to the + /// whole subscriber. + fn filter(&self) -> EnvFilter { + match self.opentelemetry_level { + OpenTelemetryLevel::OFF => EnvFilter::new("off"), + OpenTelemetryLevel::INFO => EnvFilter::new("info"), + OpenTelemetryLevel::DEBUG => EnvFilter::new("debug"), + OpenTelemetryLevel::TRACE => EnvFilter::new("trace"), + } } } diff --git a/core/node/consensus/Cargo.toml b/core/node/consensus/Cargo.toml index 68fffa56dcbc..574ae6fdf9f8 100644 --- a/core/node/consensus/Cargo.toml +++ b/core/node/consensus/Cargo.toml @@ -39,13 +39,14 @@ secrecy.workspace = true tempfile.workspace = true thiserror.workspace = true tracing.workspace = true +tokio.workspace = true [dev-dependencies] zksync_node_genesis.workspace = true zksync_node_test_utils.workspace = true zksync_node_api_server.workspace = true zksync_test_account.workspace = true -zksync_contracts.workspace= true +zksync_contracts.workspace = true tokio.workspace = true test-casing.workspace = true diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index f6dcd653cc92..fbc3d030d47a 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -195,6 +195,7 @@ impl EN { } /// Fetches genesis from the main node. + #[tracing::instrument(skip_all)] async fn fetch_genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { let genesis = ctx .wait(self.client.fetch_consensus_genesis()) diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 07f82ed4fd23..0e2039ae6bc0 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -106,6 +106,7 @@ impl<'a> Connection<'a> { } /// Wrapper for `consensus_dal().insert_block_certificate()`. + #[tracing::instrument(skip_all, fields(l2_block = %cert.message.proposal.number))] pub async fn insert_block_certificate( &mut self, ctx: &ctx::Ctx, @@ -118,6 +119,7 @@ impl<'a> Connection<'a> { /// Wrapper for `consensus_dal().insert_batch_certificate()`, /// which additionally verifies that the batch hash matches the stored batch. + #[tracing::instrument(skip_all, fields(l1_batch = %cert.message.number))] pub async fn insert_batch_certificate( &mut self, ctx: &ctx::Ctx, @@ -223,11 +225,13 @@ impl<'a> Connection<'a> { } /// Wrapper for `consensus_dal().next_block()`. + #[tracing::instrument(skip_all)] async fn next_block(&mut self, ctx: &ctx::Ctx) -> ctx::Result { Ok(ctx.wait(self.0.consensus_dal().next_block()).await??) } /// Wrapper for `consensus_dal().block_certificates_range()`. + #[tracing::instrument(skip_all)] pub(crate) async fn block_certificates_range( &mut self, ctx: &ctx::Ctx, @@ -305,6 +309,7 @@ impl<'a> Connection<'a> { } /// Wrapper for `blocks_dal().get_sealed_l1_batch_number()`. + #[tracing::instrument(skip_all)] pub async fn get_last_batch_number( &mut self, ctx: &ctx::Ctx, @@ -390,6 +395,7 @@ impl<'a> Connection<'a> { } /// Construct the [storage::BatchStoreState] which contains the earliest batch and the last available [attester::SyncBatch]. + #[tracing::instrument(skip_all)] pub async fn batches_range(&mut self, ctx: &ctx::Ctx) -> ctx::Result { let first = self .0 diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index 0e7d403e9c68..0e08811c237f 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -1,9 +1,11 @@ use std::sync::Arc; use anyhow::Context as _; +use tokio::sync::watch::Sender; +use tracing::Instrument; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_bft::PayloadManager; -use zksync_consensus_roles::{attester, validator}; +use zksync_consensus_roles::{attester, attester::BatchNumber, validator}; use zksync_consensus_storage::{self as storage, BatchStoreState}; use zksync_dal::consensus_dal::{self, Payload}; use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction}; @@ -129,6 +131,7 @@ impl PersistedBlockState { /// If `persisted.first` is moved forward, it means that blocks have been pruned. /// If `persisted.last` is moved forward, it means that new blocks with certificates have been /// persisted. + #[tracing::instrument(skip_all, fields(first = %new.first, last = ?new.last.as_ref().map(|l| l.message.proposal.number)))] fn update(&self, new: storage::BlockStoreState) { self.0.send_if_modified(|p| { if &new == p { @@ -149,6 +152,7 @@ impl PersistedBlockState { } /// Appends the `cert` to `persisted` range. + #[tracing::instrument(skip_all, fields(batch_number = %cert.message.proposal.number))] fn advance(&self, cert: validator::CommitQC) { self.0.send_if_modified(|p| { if p.next() != cert.header().number { @@ -171,21 +175,61 @@ impl StoreRunner { } = self; let res = scope::run!(ctx, |ctx, s| async { + #[tracing::instrument(skip_all)] + async fn update_blocks_persisted_iteration( + ctx: &ctx::Ctx, + pool: &ConnectionPool, + blocks_persisted: &PersistedBlockState, + ) -> ctx::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); + + let range = pool + .connection(ctx) + .await? + .block_certificates_range(ctx) + .await + .wrap("block_certificates_range()")?; + blocks_persisted.update(range); + ctx.sleep(POLL_INTERVAL).await?; + + Ok(()) + } + s.spawn::<()>(async { // Loop updating `blocks_persisted` whenever blocks get pruned. - const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); loop { - let range = pool - .connection(ctx) - .await? - .block_certificates_range(ctx) - .await - .wrap("block_certificates_range()")?; - blocks_persisted.update(range); - ctx.sleep(POLL_INTERVAL).await?; + update_blocks_persisted_iteration(ctx, &pool, &blocks_persisted).await?; } }); + #[tracing::instrument(skip_all, fields(l1_batch = %next_batch_number))] + async fn gossip_sync_batches_iteration( + ctx: &ctx::Ctx, + pool: &ConnectionPool, + next_batch_number: &mut BatchNumber, + batches_persisted: &Sender, + ) -> ctx::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); + + let mut conn = pool.connection(ctx).await?; + if let Some(last_batch_number) = conn + .get_last_batch_number(ctx) + .await + .wrap("last_batch_number()")? + { + if last_batch_number >= *next_batch_number { + let range = conn.batches_range(ctx).await.wrap("batches_range()")?; + *next_batch_number = last_batch_number.next(); + tracing::info_span!("batches_persisted_send").in_scope(|| { + batches_persisted.send_replace(range); + }); + } + } + ctx.sleep(POLL_INTERVAL).await?; + + Ok(()) + } + // NOTE: Running this update loop will trigger the gossip of `SyncBatches` which is currently // pointless as there is no proof and we have to ignore them. We can disable it, but bear in // mind that any node which gossips the availability will cause pushes and pulls in the consensus. @@ -200,65 +244,85 @@ impl StoreRunner { // up with L1 batches from peers _without_ the QC, based on L1 inclusion proofs instead. // Nevertheless since the `SyncBatch` contains all transactions for all L2 blocks, // we can try to make it less frequent by querying just the last batch number first. - const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); let mut next_batch_number = { batches_persisted.borrow().next() }; loop { - let mut conn = pool.connection(ctx).await?; - if let Some(last_batch_number) = conn - .get_last_batch_number(ctx) - .await - .wrap("last_batch_number()")? - { - if last_batch_number >= next_batch_number { - let range = conn.batches_range(ctx).await.wrap("batches_range()")?; - next_batch_number = last_batch_number.next(); - batches_persisted.send_replace(range); - } - } - ctx.sleep(POLL_INTERVAL).await?; + gossip_sync_batches_iteration( + ctx, + &pool, + &mut next_batch_number, + &batches_persisted, + ) + .await?; } }); - s.spawn::<()>(async { - // Loop inserting batch certificates into storage + #[tracing::instrument(skip_all)] + async fn insert_batch_certificates_iteration( + ctx: &ctx::Ctx, + pool: &ConnectionPool, + batch_certificates: &mut ctx::channel::UnboundedReceiver, + ) -> ctx::Result<()> { const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + + let cert = batch_certificates + .recv(ctx) + .instrument(tracing::info_span!("wait_for_batch_certificate")) + .await?; + loop { - let cert = batch_certificates.recv(ctx).await?; - - loop { - use consensus_dal::InsertCertificateError as E; - // Try to insert the cert. - let res = pool - .connection(ctx) - .await? - .insert_batch_certificate(ctx, &cert) - .await; - - match res { - Ok(()) => { - break; - } - Err(InsertCertificateError::Inner(E::MissingPayload)) => { - // The L1 batch isn't available yet. - // We can wait until it's produced/received, or we could modify gossip - // so that we don't even accept votes until we have the corresponding batch. - ctx.sleep(POLL_INTERVAL).await?; - } - Err(InsertCertificateError::Inner(err)) => { - return Err(ctx::Error::Internal(anyhow::Error::from(err))) - } - Err(InsertCertificateError::Canceled(err)) => { - return Err(ctx::Error::Canceled(err)) - } + use consensus_dal::InsertCertificateError as E; + // Try to insert the cert. + let res = pool + .connection(ctx) + .await? + .insert_batch_certificate(ctx, &cert) + .await; + + match res { + Ok(()) => { + break; + } + Err(InsertCertificateError::Inner(E::MissingPayload)) => { + // The L1 batch isn't available yet. + // We can wait until it's produced/received, or we could modify gossip + // so that we don't even accept votes until we have the corresponding batch. + ctx.sleep(POLL_INTERVAL) + .instrument(tracing::info_span!("wait_for_batch")) + .await?; + } + Err(InsertCertificateError::Inner(err)) => { + return Err(ctx::Error::Internal(anyhow::Error::from(err))) + } + Err(InsertCertificateError::Canceled(err)) => { + return Err(ctx::Error::Canceled(err)) } } } + + Ok(()) + } + + s.spawn::<()>(async { + // Loop inserting batch certificates into storage + loop { + insert_batch_certificates_iteration(ctx, &pool, &mut batch_certificates) + .await?; + } }); - // Loop inserting block certs to storage. - const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); - loop { - let cert = block_certificates.recv(ctx).await?; + #[tracing::instrument(skip_all)] + async fn insert_block_certificates_iteration( + ctx: &ctx::Ctx, + pool: &ConnectionPool, + block_certificates: &mut ctx::channel::UnboundedReceiver, + blocks_persisted: &PersistedBlockState, + ) -> ctx::Result<()> { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + + let cert = block_certificates + .recv(ctx) + .instrument(tracing::info_span!("wait_for_block_certificate")) + .await?; // Wait for the block to be persisted, so that we can attach a cert to it. // We may exit this loop without persisting the certificate in case the // corresponding block has been pruned in the meantime. @@ -280,7 +344,9 @@ impl StoreRunner { Err(InsertCertificateError::Inner(E::MissingPayload)) => { // the payload is not in storage, it's either not yet persisted // or already pruned. We will retry after a delay. - ctx.sleep(POLL_INTERVAL).await?; + ctx.sleep(POLL_INTERVAL) + .instrument(tracing::info_span!("wait_for_block")) + .await?; } Err(InsertCertificateError::Canceled(err)) => { return Err(ctx::Error::Canceled(err)) @@ -290,6 +356,19 @@ impl StoreRunner { } } } + + Ok(()) + } + + // Loop inserting block certs to storage. + loop { + insert_block_certificates_iteration( + ctx, + &pool, + &mut block_certificates, + &blocks_persisted, + ) + .await?; } }) .await; diff --git a/docs/announcements/README.md b/docs/announcements/README.md new file mode 100644 index 000000000000..2e793fba7d38 --- /dev/null +++ b/docs/announcements/README.md @@ -0,0 +1,8 @@ +# ZKsync development announcement + +This directory will contain announcements that don't necessarily serve as documentation, but still provide valuable +information to be stored long-term. + +Current announcements: + +- 01.08.2024 - [Attester committee invitation](./attester_commitee.md) diff --git a/docs/announcements/attester_commitee.md b/docs/announcements/attester_commitee.md new file mode 100644 index 000000000000..84ff8aa5be6d --- /dev/null +++ b/docs/announcements/attester_commitee.md @@ -0,0 +1,44 @@ +# Attester Committee + +## Overview + +The Attester committee is a subset of ZKSync nodes. After each l1 batch execution, participating nodes sign its +execution result and send back to the network. + +The ultimate goal is to make L1 commit operation contingent on such signatures. This will improve the security and +finality guarantees: having these signatures on L1 shows that additional actors executed the corresponding blocks - and +ended up with the same state root hash. + +## Current State + +Before starting the L1 integration, we want to ensure that we can to consistently reach the quorum and collect the +signatures in a timely manner. Currently the main node just stores the signatures in the local DB +(`l1_batches_consensus` table). + +We run a (temporary) PoA network of attesters where the Main Node administrator defines the committee for every L1 +batch. There is currently no tangible incentive or any kind of actual or implied reward for the participants - we'll be +developing these and potential tokenomics later on. + +We are looking for participants to this network. + +## Participating in the Attester Committee + +Joining the attester committee imposes no additional computational, operational, security, or business overhead for EN +operators. + +The only difference in behavior is that the node would asynchronously sign batches and send those signatures to the main +node. Node checks if its public key is part of the committee for the current l1 batch - if it is, this logic kicks in. +We expect the participating nodes to have very high uptime, but there are no penalties for not submitting votes (and we +wouldn't have any way to impose this). + +Participants can leave the committee at any time. + +The only action that is required to participate is to share your attester public key with the Main Node operator (by +opening an issue in this repo or using any other communication channel). You can find it in the comment in the +`consensus_secrets.yaml` file (that was - in most cases - generated by the tool described +[here](https://github.com/matter-labs/zksync-era/blob/main/docs/guides/external-node/09_decentralization.md#generating-secrets)) + +> [!WARNING] +> +> Never share your **private** keys. Make sure you are only sharing the **public** key. It looks like this: +> `# attester:public:secp256k1:02e7b1f24fb58b770cb722bf08e9512c7d8667ec0befa37611eddafd0109656132` diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 6f039520de08..c46bf4372326 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -3912,6 +3912,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "opentelemetry-http" version = "0.13.0" @@ -8415,6 +8427,7 @@ dependencies = [ "anyhow", "chrono", "opentelemetry", + "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", diff --git a/zk_toolbox/Cargo.lock b/zk_toolbox/Cargo.lock index 4793e9e0a4e3..35e3067482e1 100644 --- a/zk_toolbox/Cargo.lock +++ b/zk_toolbox/Cargo.lock @@ -3042,6 +3042,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84de945cb3a6f1e0d6317cbd998bbd0519ab00f4b790db67e0ff4fdcf7cedb6" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "opentelemetry-http" version = "0.13.0" @@ -6535,6 +6547,7 @@ dependencies = [ "anyhow", "chrono", "opentelemetry", + "opentelemetry-appender-tracing", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk",