diff --git a/node/Cargo.lock b/node/Cargo.lock index 44d8996a..191676da 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -4077,6 +4077,7 @@ dependencies = [ "anyhow", "async-trait", "rand 0.8.5", + "semver", "test-casing", "tokio", "tracing", @@ -4110,6 +4111,7 @@ dependencies = [ "pretty_assertions", "prost", "rand 0.8.5", + "semver", "snow", "test-casing", "thiserror", diff --git a/node/Cargo.toml b/node/Cargo.toml index f0dde15e..294093de 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -86,6 +86,7 @@ quote = "1.0.33" rand = "0.8.0" rand04 = { package = "rand", version = "0.4" } rocksdb = "0.21.0" +semver = "1.0.23" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.95" serde_yaml = "0.9" diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index 56696efc..128d9167 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -22,6 +22,7 @@ zksync_protobuf.workspace = true anyhow.workspace = true async-trait.workspace = true rand.workspace = true +semver.workspace = true tracing.workspace = true vise.workspace = true diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 2cd86e34..3cbed6e6 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -33,6 +33,8 @@ pub struct Validator { /// Config of the node executor. #[derive(Debug)] pub struct Config { + /// Label identifying the build version of the binary that this node is running. + pub build_version: Option, /// IP:port to listen on, for incoming TCP connections. /// Use `0.0.0.0:` to listen on all network interfaces (i.e. on all IPs exposed by this VM). pub server_addr: std::net::SocketAddr, @@ -99,6 +101,7 @@ impl Executor { /// Extracts a network crate config. fn network_config(&self) -> network::Config { network::Config { + build_version: self.config.build_version.clone(), server_addr: net::tcp::ListenerAddr::new(self.config.server_addr), public_addr: self.config.public_addr.clone(), gossip: self.config.gossip(), diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 4e88f5ef..7fbde0ea 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -14,6 +14,7 @@ use zksync_consensus_storage::{ fn config(cfg: &network::Config) -> Config { Config { + build_version: None, server_addr: *cfg.server_addr, public_addr: cfg.public_addr.clone(), max_payload_size: usize::MAX, diff --git a/node/actors/network/Cargo.toml b/node/actors/network/Cargo.toml index 06151132..322bf743 100644 --- a/node/actors/network/Cargo.toml +++ b/node/actors/network/Cargo.toml @@ -37,6 +37,7 @@ tls-listener.workspace = true base64.workspace = true build_html.workspace = true bytesize.workspace = true +semver.workspace = true [dev-dependencies] assert_matches.workspace = true diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index 03e5020b..1f723b31 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -85,6 +85,9 @@ pub struct GossipConfig { /// Network actor config. #[derive(Debug, Clone)] pub struct Config { + /// Label identifying the build version of the binary that this node is running. + /// There is no specific semantics assigned to it. + pub build_version: Option, /// TCP socket address to listen for inbound connections at. pub server_addr: net::tcp::ListenerAddr, /// Public TCP address that other nodes are expected to connect to. diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 3d4f5ad9..b63a3248 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -171,9 +171,7 @@ impl Network { ) -> anyhow::Result<()> { let peer = handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?; - self.inbound - .insert(peer.clone(), stream.get_values()) - .await?; + self.inbound.insert(peer.clone(), stream.stats()).await?; tracing::info!("peer = {peer:?}"); let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() @@ -211,9 +209,7 @@ impl Network { peer, ) .await?; - self.outbound - .insert(peer.clone(), stream.get_values()) - .await?; + self.outbound.insert(peer.clone(), stream.stats()).await?; tracing::info!("peer = {peer:?}"); let consensus_cli = rpc::Client::::new(ctx, self.gossip.cfg.rpc.consensus_rate); diff --git a/node/actors/network/src/gossip/handshake/mod.rs b/node/actors/network/src/gossip/handshake/mod.rs index 8df23110..bdd02d4c 100644 --- a/node/actors/network/src/gossip/handshake/mod.rs +++ b/node/actors/network/src/gossip/handshake/mod.rs @@ -1,5 +1,7 @@ -use crate::{frame, noise, proto::gossip as proto, GossipConfig}; +use super::Connection; +use crate::{frame, noise, proto::gossip as proto, Config}; use anyhow::Context as _; +use std::sync::Arc; use zksync_concurrency::{ctx, error::Wrap as _, time}; use zksync_consensus_crypto::ByteFmt; use zksync_consensus_roles::{node, validator}; @@ -28,6 +30,9 @@ pub(crate) struct Handshake { /// Information whether the peer treats this connection as static. /// It is informational only, it doesn't affect the logic of the node. pub(crate) is_static: bool, + /// Version at which peer's binary has been built. + /// It is declared by peer (i.e. not verified in any way). + pub(crate) build_version: Option, } impl ProtoFmt for Handshake { @@ -37,6 +42,12 @@ impl ProtoFmt for Handshake { session_id: read_required(&r.session_id).context("session_id")?, genesis: read_required(&r.genesis).context("genesis")?, is_static: *required(&r.is_static).context("is_static")?, + build_version: r + .build_version + .as_ref() + .map(|x| x.parse()) + .transpose() + .context("build_version")?, }) } fn build(&self) -> Self::Proto { @@ -44,6 +55,7 @@ impl ProtoFmt for Handshake { session_id: Some(self.session_id.build()), genesis: Some(self.genesis.build()), is_static: Some(self.is_static), + build_version: self.build_version.as_ref().map(|x| x.to_string()), } } } @@ -65,20 +77,21 @@ pub(super) enum Error { pub(super) async fn outbound( ctx: &ctx::Ctx, - cfg: &GossipConfig, + cfg: &Config, genesis: validator::GenesisHash, stream: &mut noise::Stream, peer: &node::PublicKey, -) -> Result<(), Error> { +) -> Result { let ctx = &ctx.with_timeout(TIMEOUT); let session_id = node::SessionId(stream.id().encode()); frame::send_proto( ctx, stream, &Handshake { - session_id: cfg.key.sign_msg(session_id.clone()), + session_id: cfg.gossip.key.sign_msg(session_id.clone()), genesis, - is_static: cfg.static_outbound.contains_key(peer), + is_static: cfg.gossip.static_outbound.contains_key(peer), + build_version: cfg.build_version.clone(), }, ) .await @@ -96,15 +109,19 @@ pub(super) async fn outbound( return Err(Error::PeerMismatch); } h.session_id.verify()?; - Ok(()) + Ok(Connection { + key: h.session_id.key, + build_version: h.build_version, + stats: stream.stats(), + }) } pub(super) async fn inbound( ctx: &ctx::Ctx, - cfg: &GossipConfig, + cfg: &Config, genesis: validator::GenesisHash, stream: &mut noise::Stream, -) -> Result { +) -> Result, Error> { let ctx = &ctx.with_timeout(TIMEOUT); let session_id = node::SessionId(stream.id().encode()); let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME) @@ -121,12 +138,18 @@ pub(super) async fn inbound( ctx, stream, &Handshake { - session_id: cfg.key.sign_msg(session_id.clone()), + build_version: cfg.build_version.clone(), + session_id: cfg.gossip.key.sign_msg(session_id.clone()), genesis, - is_static: cfg.static_inbound.contains(&h.session_id.key), + is_static: cfg.gossip.static_inbound.contains(&h.session_id.key), }, ) .await .wrap("send_proto()")?; - Ok(h.session_id.key) + Ok(Connection { + key: h.session_id.key, + build_version: h.build_version, + stats: stream.stats(), + } + .into()) } diff --git a/node/actors/network/src/gossip/handshake/testonly.rs b/node/actors/network/src/gossip/handshake/testonly.rs index c0894adf..27417d6f 100644 --- a/node/actors/network/src/gossip/handshake/testonly.rs +++ b/node/actors/network/src/gossip/handshake/testonly.rs @@ -9,6 +9,23 @@ use rand::{ }; use zksync_consensus_roles::node; +/// Semver has specific restrictions on how an identifier +/// should look like, so we play it safe and generate +/// a letter-only string of fixed length. +fn gen_semver_identifier(rng: &mut R) -> String { + (0..10).map(|_| rng.gen_range('a'..='z')).collect() +} + +fn gen_semver(rng: &mut R) -> semver::Version { + semver::Version { + major: rng.gen(), + minor: rng.gen(), + patch: rng.gen(), + pre: semver::Prerelease::new(&gen_semver_identifier(rng)).unwrap(), + build: semver::BuildMetadata::new(&gen_semver_identifier(rng)).unwrap(), + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Handshake { let key: node::SecretKey = rng.gen(); @@ -17,6 +34,7 @@ impl Distribution for Standard { session_id: key.sign_msg(session_id), genesis: rng.gen(), is_static: rng.gen(), + build_version: Some(gen_semver(rng)), } } } diff --git a/node/actors/network/src/gossip/handshake/tests.rs b/node/actors/network/src/gossip/handshake/tests.rs index 0cd7ede8..9dd92b05 100644 --- a/node/actors/network/src/gossip/handshake/tests.rs +++ b/node/actors/network/src/gossip/handshake/tests.rs @@ -1,8 +1,7 @@ use super::*; -use crate::{frame, noise, testonly, GossipConfig}; +use crate::{frame, noise, testonly}; use assert_matches::assert_matches; use rand::Rng; -use std::collections::{HashMap, HashSet}; use zksync_concurrency::{ctx, io, scope, testonly::abort_on_panic}; use zksync_consensus_roles::node; use zksync_protobuf::testonly::test_encode_random; @@ -13,23 +12,14 @@ fn test_schema_encode_decode() { test_encode_random::(rng); } -fn make_cfg(rng: &mut R) -> GossipConfig { - GossipConfig { - key: rng.gen(), - dynamic_inbound_limit: 0, - static_inbound: HashSet::default(), - static_outbound: HashMap::default(), - } -} - #[tokio::test] async fn test_session_id_mismatch() { abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let cfg0 = make_cfg(rng); - let cfg1 = make_cfg(rng); + let cfg0 = testonly::make_config(rng.gen()); + let cfg1 = testonly::make_config(rng.gen()); let genesis: validator::GenesisHash = rng.gen(); // MitM attempt detected on the inbound end. @@ -55,7 +45,7 @@ async fn test_session_id_mismatch() { }); s.spawn(async { let mut s1 = s1; - match outbound(ctx, &cfg1, genesis, &mut s1, &cfg0.key.public()).await { + match outbound(ctx, &cfg1, genesis, &mut s1, &cfg0.gossip.key.public()).await { Err(Error::Stream(..)) => Ok(()), res => panic!("unexpected res: {res:?}"), } @@ -75,15 +65,16 @@ async fn test_session_id_mismatch() { ctx, &mut s2, &Handshake { - session_id: cfg1.key.sign_msg(rng.gen::()), + session_id: cfg1.gossip.key.sign_msg(rng.gen::()), genesis, is_static: false, + build_version: None, }, ) .await?; Ok(()) }); - match outbound(ctx, &cfg0, genesis, &mut s1, &cfg1.key.public()).await { + match outbound(ctx, &cfg0, genesis, &mut s1, &cfg1.gossip.key.public()).await { Err(Error::SessionIdMismatch) => anyhow::Ok(()), res => panic!("unexpected res: {res:?}"), } @@ -98,9 +89,9 @@ async fn test_peer_mismatch() { let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let cfg0 = make_cfg(rng); - let cfg1 = make_cfg(rng); - let cfg2 = make_cfg(rng); + let cfg0 = testonly::make_config(rng.gen()); + let cfg1 = testonly::make_config(rng.gen()); + let cfg2 = testonly::make_config(rng.gen()); let genesis: validator::GenesisHash = rng.gen(); @@ -109,14 +100,14 @@ async fn test_peer_mismatch() { s.spawn(async { let mut s0 = s0; assert_eq!( - cfg1.key.public(), - inbound(ctx, &cfg0, genesis, &mut s0).await? + cfg1.gossip.key.public(), + inbound(ctx, &cfg0, genesis, &mut s0).await?.key ); Ok(()) }); s.spawn(async { let mut s1 = s1; - match outbound(ctx, &cfg1, genesis, &mut s1, &cfg2.key.public()).await { + match outbound(ctx, &cfg1, genesis, &mut s1, &cfg2.gossip.key.public()).await { Err(Error::PeerMismatch) => Ok(()), res => panic!("unexpected res: {res:?}"), } @@ -133,15 +124,22 @@ async fn test_genesis_mismatch() { let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let cfg0 = make_cfg(rng); - let cfg1 = make_cfg(rng); + let cfg0 = testonly::make_config(rng.gen()); + let cfg1 = testonly::make_config(rng.gen()); tracing::info!("test that inbound handshake rejects mismatching genesis"); scope::run!(ctx, |ctx, s| async { let (s0, mut s1) = noise::testonly::pipe(ctx).await; s.spawn(async { let mut s0 = s0; - let res = outbound(ctx, &cfg0, ctx.rng().gen(), &mut s0, &cfg1.key.public()).await; + let res = outbound( + ctx, + &cfg0, + ctx.rng().gen(), + &mut s0, + &cfg1.gossip.key.public(), + ) + .await; assert_matches!(res, Err(Error::Stream(_))); Ok(()) }); @@ -157,7 +155,14 @@ async fn test_genesis_mismatch() { let (s0, mut s1) = noise::testonly::pipe(ctx).await; s.spawn(async { let mut s0 = s0; - let res = outbound(ctx, &cfg0, ctx.rng().gen(), &mut s0, &cfg1.key.public()).await; + let res = outbound( + ctx, + &cfg0, + ctx.rng().gen(), + &mut s0, + &cfg1.gossip.key.public(), + ) + .await; assert_matches!(res, Err(Error::GenesisMismatch)); Ok(()) }); @@ -167,9 +172,10 @@ async fn test_genesis_mismatch() { ctx, &mut s1, &Handshake { - session_id: cfg1.key.sign_msg(session_id), + session_id: cfg1.gossip.key.sign_msg(session_id), genesis: rng.gen(), is_static: false, + build_version: None, }, ) .await @@ -186,8 +192,8 @@ async fn test_invalid_signature() { let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let cfg0 = make_cfg(rng); - let cfg1 = make_cfg(rng); + let cfg0 = testonly::make_config(rng.gen()); + let cfg1 = testonly::make_config(rng.gen()); let genesis: validator::GenesisHash = rng.gen(); @@ -197,11 +203,11 @@ async fn test_invalid_signature() { s.spawn_bg(async { let mut s1 = s1; let mut h: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await?; - h.session_id.key = cfg1.key.public(); + h.session_id.key = cfg1.gossip.key.public(); frame::send_proto(ctx, &mut s1, &h).await?; Ok(()) }); - match outbound(ctx, &cfg0, genesis, &mut s0, &cfg1.key.public()).await { + match outbound(ctx, &cfg0, genesis, &mut s0, &cfg1.gossip.key.public()).await { Err(Error::Signature(..)) => anyhow::Ok(()), res => panic!("unexpected res: {res:?}"), } @@ -215,11 +221,12 @@ async fn test_invalid_signature() { s.spawn_bg(async { let mut s1 = s1; let mut h = Handshake { - session_id: cfg0.key.sign_msg(node::SessionId(s1.id().encode())), + session_id: cfg0.gossip.key.sign_msg(node::SessionId(s1.id().encode())), genesis, is_static: true, + build_version: None, }; - h.session_id.key = cfg1.key.public(); + h.session_id.key = cfg1.gossip.key.public(); frame::send_proto(ctx, &mut s1, &h).await }); match inbound(ctx, &cfg0, genesis, &mut s0).await { diff --git a/node/actors/network/src/gossip/loadtest/mod.rs b/node/actors/network/src/gossip/loadtest/mod.rs index b54098ce..4caca21b 100644 --- a/node/actors/network/src/gossip/loadtest/mod.rs +++ b/node/actors/network/src/gossip/loadtest/mod.rs @@ -1,5 +1,5 @@ //! Loadtest of the gossip endpoint of a node. -use crate::{gossip, mux, noise, preface, rpc, GossipConfig}; +use crate::{gossip, mux, noise, preface, rpc, testonly}; use anyhow::Context as _; use async_trait::async_trait; use rand::Rng; @@ -91,12 +91,7 @@ impl Loadtest { let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet) .await .context("connect()")?; - let cfg = GossipConfig { - key: node::SecretKey::generate(), - dynamic_inbound_limit: 0, - static_inbound: [].into(), - static_outbound: [].into(), - }; + let cfg = testonly::make_config(ctx.rng().gen()); gossip::handshake::outbound(ctx, &cfg, self.genesis.hash(), &mut stream, &self.peer) .await .context("handshake")?; diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 52512425..126d887c 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -33,14 +33,25 @@ mod testonly; mod tests; mod validator_addrs; +/// Info about a gossip connection. +#[derive(Debug)] +pub(crate) struct Connection { + /// Peer's public key. + pub(crate) key: node::PublicKey, + /// Build version of peer's binary (not verified). + pub(crate) build_version: Option, + /// TCP connection stats. + pub(crate) stats: Arc, +} + /// Gossip network state. pub(crate) struct Network { /// Gossip network configuration. pub(crate) cfg: Config, /// Currently open inbound connections. - pub(crate) inbound: PoolWatch>, + pub(crate) inbound: PoolWatch>, /// Currently open outbound connections. - pub(crate) outbound: PoolWatch>, + pub(crate) outbound: PoolWatch>, /// Current state of knowledge about validators' endpoints. pub(crate) validator_addrs: ValidatorAddrsWatch, /// Block store to serve `get_block` requests from. diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 23ccf009..66cf8fd8 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -395,14 +395,11 @@ impl Network { ctx: &ctx::Ctx, mut stream: noise::Stream, ) -> anyhow::Result<()> { - let peer = - handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; - tracing::info!("peer = {peer:?}"); - self.inbound - .insert(peer.clone(), stream.get_values()) - .await?; + let conn = handshake::inbound(ctx, &self.cfg, self.genesis().hash(), &mut stream).await?; + tracing::info!("peer = {:?}", conn.key); + self.inbound.insert(conn.key.clone(), conn.clone()).await?; let res = self.run_stream(ctx, stream).await; - self.inbound.remove(&peer).await; + self.inbound.remove(&conn.key).await; res } @@ -422,18 +419,10 @@ impl Network { .with_context(|| "{addr:?} resolved to empty address set")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet).await?; - handshake::outbound( - ctx, - &self.cfg.gossip, - self.genesis().hash(), - &mut stream, - peer, - ) - .await?; + let conn = + handshake::outbound(ctx, &self.cfg, self.genesis().hash(), &mut stream, peer).await?; tracing::info!("peer = {peer:?}"); - self.outbound - .insert(peer.clone(), stream.get_values()) - .await?; + self.outbound.insert(peer.clone(), conn.into()).await?; let res = self.run_stream(ctx, stream).await; self.outbound.remove(peer).await; res diff --git a/node/actors/network/src/gossip/testonly.rs b/node/actors/network/src/gossip/testonly.rs index 114a838e..a2745d25 100644 --- a/node/actors/network/src/gossip/testonly.rs +++ b/node/actors/network/src/gossip/testonly.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] use super::*; -use crate::{frame, mux, noise, preface, rpc, Config, GossipConfig}; +use crate::{frame, mux, noise, preface, rpc, testonly::make_config, Config}; use anyhow::Context as _; use rand::Rng; use std::collections::BTreeMap; @@ -48,12 +48,7 @@ pub(super) async fn connect( let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet) .await .context("preface::connect()")?; - let cfg = GossipConfig { - key: ctx.rng().gen(), - dynamic_inbound_limit: 0, - static_outbound: [].into(), - static_inbound: [].into(), - }; + let cfg = make_config(ctx.rng().gen()); handshake::outbound(ctx, &cfg, genesis, &mut stream, &peer.gossip.key.public()) .await .context("handshake::outbound()")?; diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index 7ab311bb..c72343c1 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -61,7 +61,7 @@ async fn test_one_connection_per_node() { .await .context("preface::connect")?; - handshake::outbound(ctx, &cfgs[0].gossip, setup.genesis.hash(), &mut stream, peer) + handshake::outbound(ctx, &cfgs[0], setup.genesis.hash(), &mut stream, peer) .await .context("handshake::outbound")?; tracing::info!("The connection is expected to be closed automatically by peer."); @@ -304,7 +304,7 @@ async fn test_genesis_mismatch() { .wrap("preface::accept()")?; assert_eq!(endpoint, preface::Endpoint::GossipNet); tracing::info!("Expect the handshake to fail"); - let res = handshake::inbound(ctx, &cfgs[1].gossip, rng.gen(), &mut stream).await; + let res = handshake::inbound(ctx, &cfgs[1], rng.gen(), &mut stream).await; assert_matches!(res, Err(handshake::Error::GenesisMismatch)); tracing::info!("Try to connect to a node with a mismatching genesis."); @@ -313,7 +313,7 @@ async fn test_genesis_mismatch() { .context("preface::connect")?; let res = handshake::outbound( ctx, - &cfgs[1].gossip, + &cfgs[1], rng.gen(), &mut stream, &cfgs[0].gossip.key.public(), diff --git a/node/actors/network/src/http/mod.rs b/node/actors/network/src/http/mod.rs index bb796389..748e027f 100644 --- a/node/actors/network/src/http/mod.rs +++ b/node/actors/network/src/http/mod.rs @@ -1,5 +1,5 @@ //! Http Server to export debug information -use crate::{consensus, MeteredStreamStats, Network}; +use crate::{MeteredStreamStats, Network}; use anyhow::Context as _; use base64::Engine; use build_html::{Html, HtmlContainer, HtmlPage, Table, TableCell, TableCellType, TableRow}; @@ -12,10 +12,9 @@ use hyper::{ HeaderMap, Request, Response, StatusCode, }; use hyper_util::rt::tokio::TokioIo; -use im::HashMap; use std::{ net::SocketAddr, - sync::Arc, + sync::{atomic::Ordering, Arc}, time::{Duration, SystemTime}, }; use tls_listener::TlsListener; @@ -28,6 +27,7 @@ use tokio_rustls::{ TlsAcceptor, }; use zksync_concurrency::{ctx, scope}; +use zksync_consensus_crypto::TextFmt as _; use zksync_consensus_utils::debug_page; const STYLE: &str = include_str!("style.css"); @@ -149,86 +149,112 @@ impl DebugPageServer { } fn serve(&self, _request: Request) -> Full { - let html: String = HtmlPage::new() + let mut html = HtmlPage::new() .with_title("Node debug page") .with_style(STYLE) - .with_header(1, "Active connections") - .with_header(2, "Validator network") + .with_header(1, "Active connections"); + if let Some(consensus) = self.network.consensus.as_ref() { + html = html + .with_header(2, "Validator network") + .with_header(3, "Incoming connections") + .with_paragraph( + self.connections_html( + consensus + .inbound + .current() + .iter() + .map(|(k, v)| (k.encode(), v, None)), + ), + ) + .with_header(3, "Outgoing connections") + .with_paragraph( + self.connections_html( + consensus + .outbound + .current() + .iter() + .map(|(k, v)| (k.encode(), v, None)), + ), + ); + } + html = html + .with_header(2, "Gossip network") .with_header(3, "Incoming connections") .with_paragraph( self.connections_html( - > as Clone>::clone(&self.network.consensus) - .unwrap() + self.network + .gossip .inbound - .current(), + .current() + .values() + .map(|c| (c.key.encode(), &c.stats, c.build_version.clone())), ), ) .with_header(3, "Outgoing connections") .with_paragraph( self.connections_html( - > as Clone>::clone(&self.network.consensus) - .unwrap() + self.network + .gossip .outbound - .current(), + .current() + .values() + .map(|c| (c.key.encode(), &c.stats, c.build_version.clone())), ), - ) - .with_header(2, "Gossip network") - .with_header(3, "Incoming connections") - .with_paragraph(self.connections_html(self.network.gossip.inbound.current())) - .with_header(3, "Outgoing connections") - .with_paragraph(self.connections_html(self.network.gossip.outbound.current())) - .to_html_string(); - Full::new(Bytes::from(html)) + ); + Full::new(Bytes::from(html.to_html_string())) } - fn connections_html(&self, connections: HashMap>) -> String - where - K: std::hash::Hash + Eq + Clone + std::fmt::Debug + zksync_consensus_crypto::TextFmt, - { + fn connections_html<'a>( + &self, + connections: impl Iterator< + Item = (String, &'a Arc, Option), + >, + ) -> String { let mut table = Table::new() .with_custom_header_row( TableRow::new() .with_cell(TableCell::new(TableCellType::Header).with_raw("Public key")) .with_cell(TableCell::new(TableCellType::Header).with_raw("Address")) + .with_cell(TableCell::new(TableCellType::Header).with_raw("Build version")) .with_cell( TableCell::new(TableCellType::Header) .with_attributes([("colspan", "2")]) - .with_raw("Incoming"), + .with_raw("received [B]"), ) .with_cell( TableCell::new(TableCellType::Header) .with_attributes([("colspan", "2")]) - .with_raw("Outgoing"), + .with_raw("sent [B]"), ) .with_cell(TableCell::new(TableCellType::Header).with_raw("Age")), ) - .with_header_row(vec!["", "", "size", "bandwidth", "size", "bandwidth", ""]); - for (key, values) in connections { + .with_header_row(vec!["", "", "", "total", "avg", "total", "avg", ""]); + for (key, stats, build_version) in connections { let age = SystemTime::now() - .duration_since(values.established) + .duration_since(stats.established) .ok() .unwrap_or_else(|| Duration::new(1, 0)) .max(Duration::new(1, 0)); // Ensure Duration is not 0 to prevent division by zero - let received = values.received.load(std::sync::atomic::Ordering::Relaxed); - let sent = values.sent.load(std::sync::atomic::Ordering::Relaxed); + let received = stats.received.load(Ordering::Relaxed); + let sent = stats.sent.load(Ordering::Relaxed); table.add_body_row(vec![ - self.shorten(key), - values.peer_addr.to_string(), + Self::shorten(key), + stats.peer_addr.to_string(), + build_version.map(|v| v.to_string()).unwrap_or_default(), bytesize::to_string(received, false), + // TODO: this is not useful - we should display avg from the last ~1min instead. bytesize::to_string(received / age.as_secs(), false) + "/s", bytesize::to_string(sent, false), bytesize::to_string(sent / age.as_secs(), false) + "/s", + // TODO: this is not a human-friendly format, use days + hours + minutes + seconds, + // or similar. format!("{}s", age.as_secs()), ]) } table.to_html_string() } - fn shorten(&self, key: K) -> String - where - K: std::fmt::Debug + zksync_consensus_crypto::TextFmt, - { - let key = key.encode(); + fn shorten(key: String) -> String { key.strip_prefix("validator:public:bls12_381:") .or(key.strip_prefix("node:public:ed25519:")) .map_or("-".to_string(), |key| { diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs index 840e0a36..3ecbd60e 100644 --- a/node/actors/network/src/metrics.rs +++ b/node/actors/network/src/metrics.rs @@ -2,6 +2,7 @@ use crate::Network; use anyhow::Context as _; use std::{ + fmt, net::SocketAddr, pin::Pin, sync::{ @@ -61,7 +62,7 @@ impl MeteredStream { } /// Returns a reference to the the Stream values for inspection - pub(crate) fn get_values(&self) -> Arc { + pub(crate) fn stats(&self) -> Arc { self.stats.clone() } } @@ -141,6 +142,18 @@ struct TcpMetrics { #[vise::register] static TCP_METRICS: vise::Global = vise::Global::new(); +/// `build_version` label. +#[derive(Clone, Debug, PartialEq, Eq, Hash, EncodeLabelSet, EncodeLabelValue)] +#[metrics(label = "build_version")] +pub(crate) struct BuildVersion(String); + +// For the isolated metric label to work, you should implement `Display` for it: +impl fmt::Display for BuildVersion { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(fmt) + } +} + /// General-purpose network metrics exposed via a collector. #[derive(Debug, Metrics)] #[metrics(prefix = "network")] @@ -153,6 +166,12 @@ pub(crate) struct NetworkGauges { consensus_inbound_connections: Gauge, /// Number of active outbound consensus connections. consensus_outbound_connections: Gauge, + /// Number of peers (both inbound and outbound) with the given `build_version`. + /// Label "" corresponds to peers with build_version not set. + /// WARNING: with the current implementation we do not bound + /// the allowed set of BuildVersion values. This is not a threat + /// to the node itself, but the prometheus scraper might be. + gossip_peers_by_build_version: Family>, } impl NetworkGauges { @@ -164,10 +183,28 @@ impl NetworkGauges { let register_result = COLLECTOR.before_scrape(move || { state_ref.upgrade().map(|state| { let gauges = NetworkGauges::default(); - let len = state.gossip.inbound.current().len(); - gauges.gossip_inbound_connections.set(len); - let len = state.gossip.outbound.current().len(); - gauges.gossip_outbound_connections.set(len); + let inbound = state.gossip.inbound.current(); + gauges.gossip_inbound_connections.set(inbound.len()); + for conn in inbound.values() { + let v = BuildVersion( + conn.build_version + .as_ref() + .map(|v| v.to_string()) + .unwrap_or_default(), + ); + gauges.gossip_peers_by_build_version[&v].inc_by(1); + } + let outbound = state.gossip.outbound.current(); + gauges.gossip_outbound_connections.set(outbound.len()); + for conn in outbound.values() { + let v = BuildVersion( + conn.build_version + .as_ref() + .map(|v| v.to_string()) + .unwrap_or_default(), + ); + gauges.gossip_peers_by_build_version[&v].inc_by(1); + } if let Some(consensus_state) = &state.consensus { let len = consensus_state.inbound.current().len(); gauges.consensus_inbound_connections.set(len); diff --git a/node/actors/network/src/proto/gossip.proto b/node/actors/network/src/proto/gossip.proto index 8f8c1327..a5ff96b7 100644 --- a/node/actors/network/src/proto/gossip.proto +++ b/node/actors/network/src/proto/gossip.proto @@ -11,6 +11,9 @@ message Handshake { optional roles.node.Signed session_id = 1; // required optional roles.validator.GenesisHash genesis = 3; // required optional bool is_static = 2; // required + + // Optional information about the node. + optional string build_version = 4; // SemVer; optional } message PushValidatorAddrs { diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 909e7248..dac7ba76 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -39,6 +39,33 @@ impl Distribution for Standard { } } +/// Generates a new default testonly config for a node +/// with a fresh reserved local port as `server_addr`. +/// Output can be customized afterwards, depending on the test's needs. +pub(crate) fn make_config(key: node::SecretKey) -> Config { + let addr = net::tcp::testonly::reserve_listener(); + Config { + build_version: None, + server_addr: addr, + public_addr: (*addr).into(), + // Pings are disabled in tests by default to avoid dropping connections + // due to timeouts. + ping_timeout: None, + validator_key: None, + gossip: GossipConfig { + key, + dynamic_inbound_limit: usize::MAX, + static_inbound: HashSet::default(), + static_outbound: HashMap::default(), + }, + max_block_size: usize::MAX, + max_batch_size: usize::MAX, + tcp_accept_rate: limiter::Rate::INF, + rpc: RpcConfig::default(), + max_block_queue_size: 10, + } +} + /// Synchronously forwards data from one stream to another. pub(crate) async fn forward( ctx: &ctx::Ctx, @@ -96,26 +123,9 @@ where I: Iterator, { let configs = validator_keys.map(|validator_key| { - let addr = net::tcp::testonly::reserve_listener(); - Config { - server_addr: addr, - public_addr: (*addr).into(), - // Pings are disabled in tests by default to avoid dropping connections - // due to timeouts. - ping_timeout: None, - validator_key: Some(validator_key.clone()), - gossip: GossipConfig { - key: rng.gen(), - dynamic_inbound_limit: usize::MAX, - static_inbound: HashSet::default(), - static_outbound: HashMap::default(), - }, - max_block_size: usize::MAX, - max_batch_size: usize::MAX, - tcp_accept_rate: limiter::Rate::INF, - rpc: RpcConfig::default(), - max_block_queue_size: 10, - } + let mut cfg = make_config(rng.gen()); + cfg.validator_key = Some(validator_key.clone()); + cfg }); let mut cfgs: Vec<_> = configs.collect(); @@ -136,6 +146,7 @@ where pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { let addr = net::tcp::testonly::reserve_listener(); Config { + build_version: None, server_addr: addr, public_addr: (*addr).into(), // Pings are disabled in tests by default to avoid dropping connections diff --git a/node/libs/roles/src/attester/messages/msg.rs b/node/libs/roles/src/attester/messages/msg.rs index cdf8b1e6..bb8d0c7e 100644 --- a/node/libs/roles/src/attester/messages/msg.rs +++ b/node/libs/roles/src/attester/messages/msg.rs @@ -74,7 +74,7 @@ impl Signers { /// A struct that represents a set of attesters. It is used to store the current attester set. /// We represent each attester by its attester public key. -#[derive(Clone, Debug, PartialEq, Eq, Default)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Committee { vec: Vec, indexes: BTreeMap, diff --git a/node/libs/roles/src/validator/keys/mod.rs b/node/libs/roles/src/validator/keys/mod.rs index 44c56289..f8d18c94 100644 --- a/node/libs/roles/src/validator/keys/mod.rs +++ b/node/libs/roles/src/validator/keys/mod.rs @@ -8,4 +8,4 @@ mod signature; pub use aggregate_signature::AggregateSignature; pub use public_key::PublicKey; pub use secret_key::SecretKey; -pub use signature::Signature; +pub use signature::{ProofOfPossession, Signature}; diff --git a/node/libs/roles/src/validator/keys/signature.rs b/node/libs/roles/src/validator/keys/signature.rs index 967ae149..a84d55be 100644 --- a/node/libs/roles/src/validator/keys/signature.rs +++ b/node/libs/roles/src/validator/keys/signature.rs @@ -19,6 +19,17 @@ impl Signature { } } +/// Proof of possession of a validator secret key. +#[derive(Clone, PartialEq, Eq)] +pub struct ProofOfPossession(pub(crate) bls12_381::ProofOfPossession); + +impl ProofOfPossession { + /// Verifies the proof against the public key. + pub fn verify(&self, pk: PublicKey) -> anyhow::Result<()> { + self.0.verify(&pk.0) + } +} + impl ByteFmt for Signature { fn encode(&self) -> Vec { ByteFmt::encode(&self.0) @@ -28,6 +39,15 @@ impl ByteFmt for Signature { } } +impl ByteFmt for ProofOfPossession { + fn encode(&self) -> Vec { + ByteFmt::encode(&self.0) + } + fn decode(bytes: &[u8]) -> anyhow::Result { + ByteFmt::decode(bytes).map(Self) + } +} + impl TextFmt for Signature { fn encode(&self) -> String { format!( @@ -42,6 +62,26 @@ impl TextFmt for Signature { } } +impl TextFmt for ProofOfPossession { + fn encode(&self) -> String { + format!( + "validator:pop:bls12_381:{}", + hex::encode(ByteFmt::encode(&self.0)) + ) + } + fn decode(text: Text) -> anyhow::Result { + text.strip("validator:pop:bls12_381:")? + .decode_hex() + .map(Self) + } +} + +impl fmt::Debug for ProofOfPossession { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.write_str(&TextFmt::encode(self)) + } +} + impl fmt::Debug for Signature { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.write_str(&TextFmt::encode(self)) diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index 71738790..f9895f72 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -82,7 +82,7 @@ pub(crate) fn leader_weighted_eligibility(input: u64, total_weight: u64) -> u64 /// A struct that represents a set of validators. It is used to store the current validator set. /// We represent each validator by its validator public key. -#[derive(Clone, Debug, PartialEq, Eq, Default)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Committee { vec: Vec, indexes: BTreeMap, diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index c9a3a314..f4fdea28 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -2,9 +2,9 @@ use super::{ AggregateSignature, BlockHeader, BlockNumber, ChainId, CommitQC, Committee, ConsensusMsg, FinalBlock, ForkNumber, Genesis, GenesisHash, GenesisRaw, LeaderCommit, LeaderPrepare, Msg, - MsgHash, NetAddress, Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, - ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, View, ViewNumber, - WeightedValidator, + MsgHash, NetAddress, Payload, PayloadHash, Phase, PrepareQC, ProofOfPossession, + ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, + Signers, View, ViewNumber, WeightedValidator, }; use crate::{attester, validator::LeaderSelectionMode}; use bit_vec::BitVec; @@ -243,6 +243,12 @@ impl Distribution for Standard { } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> ProofOfPossession { + ProofOfPossession(rng.gen()) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { SecretKey(Arc::new(rng.gen())) diff --git a/node/libs/roles/src/validator/tests.rs b/node/libs/roles/src/validator/tests.rs index ec6aa1e1..b1c77204 100644 --- a/node/libs/roles/src/validator/tests.rs +++ b/node/libs/roles/src/validator/tests.rs @@ -27,6 +27,9 @@ fn test_byte_encoding() { ByteFmt::decode(&ByteFmt::encode(&agg_sig)).unwrap() ); + let pop: ProofOfPossession = rng.gen(); + assert_eq!(pop, ByteFmt::decode(&ByteFmt::encode(&pop)).unwrap()); + let final_block: FinalBlock = rng.gen(); assert_eq!( final_block, @@ -57,6 +60,10 @@ fn test_text_encoding() { let t = TextFmt::encode(&sig); assert_eq!(sig, Text::new(&t).decode::().unwrap()); + let pop: ProofOfPossession = rng.gen(); + let t = TextFmt::encode(&pop); + assert_eq!(pop, Text::new(&t).decode::().unwrap()); + let agg_sig: AggregateSignature = rng.gen(); let t = TextFmt::encode(&agg_sig); assert_eq!( diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index cb84e48d..75fa8243 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -20,6 +20,8 @@ use zksync_consensus_storage::testonly::{TestMemoryStorage, TestMemoryStorageRun use zksync_consensus_utils::debug_page; use zksync_protobuf::{kB, read_required, required, ProtoFmt}; +const CRATE_VERSION: &str = env!("CARGO_PKG_VERSION"); + fn read_required_secret_text(text: &Option) -> anyhow::Result { Text::new( text.as_ref() @@ -269,6 +271,7 @@ impl Configs { let e = executor::Executor { config: executor::Config { + build_version: Some(CRATE_VERSION.parse().context("CRATE_VERSION.parse()")?), server_addr: self.app.server_addr, public_addr: self.app.public_addr.clone(), node_key: self.app.node_key.clone(),