From 43cef2e1d136537c7be211729a22db160977ebaf Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Tue, 26 Mar 2024 11:01:07 +0100 Subject: [PATCH 01/15] added Host resolution --- node/libs/concurrency/src/net/mod.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/node/libs/concurrency/src/net/mod.rs b/node/libs/concurrency/src/net/mod.rs index 12731c90..16457cc6 100644 --- a/node/libs/concurrency/src/net/mod.rs +++ b/node/libs/concurrency/src/net/mod.rs @@ -1,3 +1,28 @@ //! Context-aware network utilities. //! Built on top of `tokio::net`. +use crate::{ctx}; + pub mod tcp; + +#[cfg(test)] +mod tests; + +/// Network host address in the format ":". +/// NOT VALIDATED, validation happens at `Host::resolve()` call. +#[derive(Debug,Clone,PartialEq)] +pub struct Host(pub String); + +impl Host { + /// If host is of the form ":", performs DNS resolution. + /// If host is of the form ":", just parses the SocketAddr. + pub async fn resolve(&self, ctx: &ctx::Ctx) -> ctx::OrCanceled>> { + let host = self.0.clone(); + // Note that we may orphan a task executing the underlying `getnameinfo` call + // if the ctx gets cancelled. This should be fine given that it is expected to finish + // after a timeout and it doesn't affect the state of the application. + Ok(ctx.wait(tokio::task::spawn_blocking(move || { + // This should never panic, so unwrapping the task result is ok. + Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) + })).await?.unwrap()) + } +} From d4da144d94029b81218527f5509031e5027c843c Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Tue, 26 Mar 2024 14:09:00 +0100 Subject: [PATCH 02/15] before upgrading Pool --- node/actors/network/src/config.rs | 7 ++++-- node/actors/network/src/consensus/mod.rs | 27 +++++++++++++++++++---- node/actors/network/src/gossip/runner.rs | 8 +++++-- node/libs/concurrency/src/net/mod.rs | 3 +++ node/libs/concurrency/src/net/tests.rs | 28 ++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 8 deletions(-) create mode 100644 node/libs/concurrency/src/net/tests.rs diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index 27d6c498..66e56f51 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -55,7 +55,7 @@ pub struct GossipConfig { pub static_inbound: HashSet, /// Outbound connections that the node should actively try to /// establish and maintain. - pub static_outbound: HashMap, + pub static_outbound: HashMap, } /// Network actor config. @@ -65,7 +65,10 @@ pub struct Config { pub server_addr: net::tcp::ListenerAddr, /// Public TCP address that other nodes are expected to connect to. /// It is announced over gossip network. - pub public_addr: std::net::SocketAddr, + /// In case public_addr is a domain instead of ip, DNS resolution is + /// performed and a loopback connection is established before announcing + /// the IP address over the gossip network. + pub public_addr: net::Host, /// Gossip network config. pub gossip: GossipConfig, /// Private key of the validator. diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 32a90540..90ccf6d0 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -21,6 +21,10 @@ const RESP_MAX_SIZE: usize = kB; /// is down. const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10); +struct Client { + consensus_rpc: rpc::Client, +} + /// Consensus network state. pub(crate) struct Network { /// Gossip network state to bootstrap consensus network from. @@ -32,7 +36,7 @@ pub(crate) struct Network { /// Set of the currently open outbound connections. pub(crate) outbound: PoolWatch, /// RPC clients for all validators. - pub(crate) clients: HashMap>, + pub(crate) clients: HashMap, } #[async_trait::async_trait] @@ -183,9 +187,23 @@ impl Network { res } + async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let Some(addr) = self.gossip.cfg.public_addr.resolve(ctx).await?.context("resolve()")? + .choose(&mut ctx.rng()).with_context(||format!("{:?} resolved to no addresses",self.gossip.cfg.public_addr))?; + self.run_outbound_stream(ctx, self.key.public(), addr).await + } + /// Maintains a connection to the given validator. /// If connection breaks, it tries to reconnect periodically. - pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) { + pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) -> ctx::OrCanceled<()> { + if self.key.public() == peer { + while ctx.is_active() { + if let Err(err) = self.run_loopback_stream(ctx).await { + tracing::info!("run_loopback_stream(): {err:#}"); + } + } + return; + } let addrs = &mut self.gossip.validator_addrs.subscribe(); let mut addr = None; while ctx.is_active() { @@ -207,13 +225,14 @@ impl Network { /// Periodically announces this validator's public IP over gossip network, /// so that other validators can discover and connect to this validator. - pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) { - let my_addr = self.gossip.cfg.public_addr; + pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let mut sub = self.gossip.validator_addrs.subscribe(); while ctx.is_active() { + let my_addrs = self.gossip.cfg.public_addr.resolve(ctx).await?.context("resolve()")?; let ctx = &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL); let _ = sync::wait_for(ctx, &mut sub, |got| { got.get(&self.key.public()).map(|x| &x.msg.addr) != Some(&my_addr) + // TODO: got does match the loopback connection addr }) .await; let next_version = sub diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index b84ac131..83f320b5 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -2,10 +2,12 @@ use super::{handshake, Network, ValidatorAddrs}; use crate::{io, noise, preface, rpc}; use async_trait::async_trait; use std::sync::{atomic::Ordering, Arc}; -use zksync_concurrency::{ctx, oneshot, scope, sync}; +use zksync_concurrency::{net, ctx, oneshot, scope, sync}; use zksync_consensus_roles::node; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; +use rand::seq::SliceRandom; +use anyhow::Context as _; struct PushValidatorAddrsServer<'a>(&'a Network); @@ -182,8 +184,10 @@ impl Network { &self, ctx: &ctx::Ctx, peer: &node::PublicKey, - addr: std::net::SocketAddr, + addr: net::Host, ) -> anyhow::Result<()> { + let addr = *addr.resolve(ctx).await?.context("resolve()")? + .choose(&mut ctx.rng()).with_context(||"{addr:?} resolved to empty address set")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet).await?; handshake::outbound( ctx, diff --git a/node/libs/concurrency/src/net/mod.rs b/node/libs/concurrency/src/net/mod.rs index 16457cc6..5d446038 100644 --- a/node/libs/concurrency/src/net/mod.rs +++ b/node/libs/concurrency/src/net/mod.rs @@ -9,6 +9,8 @@ mod tests; /// Network host address in the format ":". /// NOT VALIDATED, validation happens at `Host::resolve()` call. +// TODO: for better type safety consider verifying host to be in the valid +// format in constructor. #[derive(Debug,Clone,PartialEq)] pub struct Host(pub String); @@ -20,6 +22,7 @@ impl Host { // Note that we may orphan a task executing the underlying `getnameinfo` call // if the ctx gets cancelled. This should be fine given that it is expected to finish // after a timeout and it doesn't affect the state of the application. + // We don't use `tokio::net::lookup_host`, because it is not documented to be cancel-safe. Ok(ctx.wait(tokio::task::spawn_blocking(move || { // This should never panic, so unwrapping the task result is ok. Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) diff --git a/node/libs/concurrency/src/net/tests.rs b/node/libs/concurrency/src/net/tests.rs new file mode 100644 index 00000000..6b16a39d --- /dev/null +++ b/node/libs/concurrency/src/net/tests.rs @@ -0,0 +1,28 @@ +use super::*; + +#[tokio::test] +async fn test_resolve() { + let ctx = &ctx::test_root(&ctx::RealClock); + let addrs = Host("localhost:1234".into()).resolve(ctx).await.unwrap().unwrap(); + // We assume here that loopback host "localhost" is always configured here. + // If that turns out to be problematic we should use a more robust DNS resulution + // library, so that we can run our own DNS server in tests. + assert!(addrs.len()>0); + for addr in addrs { + // Verify the port and that the returned address is actually a loopback. + assert_eq!(addr.port(),1234); + assert!(addr.ip().is_loopback()); + + // Verify that resolving the serialized address returns the same address. + let got = Host(addr.to_string()).resolve(ctx).await.unwrap().unwrap(); + assert!(got.len()==1); + assert_eq!(got[0],addr); + } + + // Host without port should not resolve. + assert!(Host("localhost".into()).resolve(ctx).await.unwrap().is_err()); + // Invalid host name should not resolve. + assert!(Host("$#$:6746".into()).resolve(ctx).await.unwrap().is_err()); + // Empty host name should not resolve. + assert!(Host("".into()).resolve(ctx).await.unwrap().is_err()); +} From ae381b879dcb6cc95d00a8d0d00056f11e9d0735 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Tue, 26 Mar 2024 16:50:58 +0100 Subject: [PATCH 03/15] snapshot --- node/actors/network/src/consensus/mod.rs | 101 ++++++++---------- node/actors/network/src/consensus/tests.rs | 16 +-- node/actors/network/src/gossip/arcmap.rs | 42 -------- node/actors/network/src/gossip/mod.rs | 28 +++-- node/actors/network/src/gossip/runner.rs | 39 +++---- node/actors/network/src/gossip/tests.rs | 32 +++--- .../network/src/gossip/validator_addrs.rs | 22 +++- node/actors/network/src/lib.rs | 10 +- node/actors/network/src/pool.rs | 46 ++++---- node/actors/network/src/testonly.rs | 25 +++-- node/actors/network/src/tests.rs | 2 +- node/libs/concurrency/src/net/mod.rs | 9 ++ 12 files changed, 169 insertions(+), 203 deletions(-) delete mode 100644 node/actors/network/src/gossip/arcmap.rs diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 90ccf6d0..a3a42728 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -3,9 +3,10 @@ use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc}; use anyhow::Context as _; use std::{ - collections::{HashMap, HashSet}, + collections::{HashSet}, sync::Arc, }; +use rand::seq::SliceRandom; use zksync_concurrency::{ctx, oneshot, scope, sync, time}; use zksync_consensus_roles::validator; use zksync_protobuf::kB; @@ -21,8 +22,9 @@ const RESP_MAX_SIZE: usize = kB; /// is down. const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10); -struct Client { - consensus_rpc: rpc::Client, +pub(crate) struct Connection { + addr: std::net::SocketAddr, + consensus: rpc::Client, } /// Consensus network state. @@ -32,11 +34,9 @@ pub(crate) struct Network { /// This validator's secret key. pub(crate) key: validator::SecretKey, /// Set of the currently open inbound connections. - pub(crate) inbound: PoolWatch, + pub(crate) inbound: PoolWatch, /// Set of the currently open outbound connections. - pub(crate) outbound: PoolWatch, - /// RPC clients for all validators. - pub(crate) clients: HashMap, + pub(crate) outbound: PoolWatch>, } #[async_trait::async_trait] @@ -65,22 +65,13 @@ impl rpc::Handler for &Network { impl Network { /// Constructs a new consensus network state. - pub(crate) fn new(ctx: &ctx::Ctx, gossip: Arc) -> Option> { + pub(crate) fn new(gossip: Arc) -> Option> { let key = gossip.cfg.validator_key.clone()?; let validators: HashSet<_> = gossip.genesis().validators.iter().cloned().collect(); Some(Arc::new(Self { key, inbound: PoolWatch::new(validators.clone(), 0), outbound: PoolWatch::new(validators.clone(), 0), - clients: validators - .iter() - .map(|peer| { - ( - peer.clone(), - rpc::Client::new(ctx, gossip.cfg.rpc.consensus_rate), - ) - }) - .collect(), gossip, })) } @@ -92,10 +83,11 @@ impl Network { msg: validator::Signed, ) -> anyhow::Result<()> { let req = rpc::consensus::Req(msg); + let outbound = self.outbound.current(); scope::run!(ctx, |ctx, s| async { - for (peer, client) in &self.clients { + for (peer, conn) in &outbound { s.spawn(async { - if let Err(err) = client.call(ctx, &req, RESP_MAX_SIZE).await { + if let Err(err) = conn.consensus.call(ctx, &req, RESP_MAX_SIZE).await { tracing::info!("send({:?},): {err:#}", &*peer); } Ok(()) @@ -113,8 +105,9 @@ impl Network { key: &validator::PublicKey, msg: validator::Signed, ) -> anyhow::Result<()> { - let client = self.clients.get(key).context("not an active validator")?; - client + let outbound = self.outbound.current(); + outbound.get(key).context("not an active validator")? + .consensus .call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE) .await?; Ok(()) @@ -129,7 +122,7 @@ impl Network { ) -> anyhow::Result<()> { let peer = handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?; - self.inbound.insert(peer.clone()).await?; + self.inbound.insert(peer.clone(),()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) @@ -156,7 +149,6 @@ impl Network { peer: &validator::PublicKey, addr: std::net::SocketAddr, ) -> anyhow::Result<()> { - let client = self.clients.get(peer).context("not an active validator")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::ConsensusNet).await?; handshake::outbound( ctx, @@ -166,11 +158,15 @@ impl Network { peer, ) .await?; - self.outbound.insert(peer.clone()).await?; + let conn = Arc::new(Connection { + addr, + consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate), + }); + self.outbound.insert(peer.clone(),conn.clone()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) - .add_client(client); + .add_client(&conn.consensus); if let Some(ping_timeout) = &self.gossip.cfg.ping_timeout { let ping_client = rpc::Client::::new(ctx, rpc::ping::RATE); service = service.add_client(&ping_client); @@ -188,15 +184,17 @@ impl Network { } async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let Some(addr) = self.gossip.cfg.public_addr.resolve(ctx).await?.context("resolve()")? + let addr = *self.gossip.cfg.public_addr.resolve(ctx).await?.context("resolve()")? .choose(&mut ctx.rng()).with_context(||format!("{:?} resolved to no addresses",self.gossip.cfg.public_addr))?; - self.run_outbound_stream(ctx, self.key.public(), addr).await + self.run_outbound_stream(ctx, &self.key.public(), addr).await } /// Maintains a connection to the given validator. /// If connection breaks, it tries to reconnect periodically. - pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) -> ctx::OrCanceled<()> { - if self.key.public() == peer { + pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) { + // Loopback connection is a special case, because the address is taken from + // the config rather than from `gossip.validator_addrs`. + if &self.key.public() == peer { while ctx.is_active() { if let Err(err) = self.run_loopback_stream(ctx).await { tracing::info!("run_loopback_stream(): {err:#}"); @@ -225,33 +223,26 @@ impl Network { /// Periodically announces this validator's public IP over gossip network, /// so that other validators can discover and connect to this validator. - pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let mut sub = self.gossip.validator_addrs.subscribe(); + pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) { + let key = self.key.public(); + let mut outbound = self.outbound.subscribe(); + let mut addrs = self.gossip.validator_addrs.subscribe(); + let mut my_addr = None; while ctx.is_active() { - let my_addrs = self.gossip.cfg.public_addr.resolve(ctx).await?.context("resolve()")?; - let ctx = &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL); - let _ = sync::wait_for(ctx, &mut sub, |got| { - got.get(&self.key.public()).map(|x| &x.msg.addr) != Some(&my_addr) - // TODO: got does match the loopback connection addr - }) - .await; - let next_version = sub - .borrow() - .get(&self.key.public()) - .map(|x| x.msg.version + 1) - .unwrap_or(0); - self.gossip - .validator_addrs - .update( - &self.gossip.genesis().validators, - &[Arc::new(self.key.sign_msg(validator::NetAddress { - addr: my_addr, - version: next_version, - timestamp: ctx.now_utc(), - }))], - ) - .await - .unwrap(); + let _ : Result<(),_> = scope::run!(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL),|ctx,s| async { + s.spawn::<()>(async { + sync::wait_for(ctx, &mut outbound, |x| x.current().get(&key).map(|x|x.addr) != my_addr).await?; + Err(ctx::Canceled) + }); + sync::wait_for(ctx, &mut addrs, |got| got.get(&key).map(|x| x.msg.addr) != my_addr).await?; + Err(ctx::Canceled) + }).await; + if let Some(conn) = outbound.borrow().current().get(&key) { + my_addr = Some(conn.addr); + } + if let Some(addr) = my_addr { + self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await; + } } } } diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index f2adc113..6e23a5a3 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -19,7 +19,7 @@ async fn test_one_connection_per_validator() { let (store,runner) = new_store(ctx,&setup.genesis).await; s.spawn_bg(runner.run(ctx)); let nodes : Vec<_> = nodes.into_iter().enumerate().map(|(i,node)| { - let (node,runner) = testonly::Instance::new(ctx, node, store.clone()); + let (node,runner) = testonly::Instance::new(node, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }).collect(); @@ -77,7 +77,7 @@ async fn test_genesis_mismatch() { tracing::info!("Start one node, we will simulate the other one."); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); tracing::info!("Populate the validator_addrs of the running node."); @@ -87,7 +87,7 @@ async fn test_genesis_mismatch() { .update( &setup.genesis.validators, &[Arc::new(setup.keys[1].sign_msg(validator::NetAddress { - addr: cfgs[1].public_addr, + addr: *cfgs[1].server_addr, version: 0, timestamp: ctx.now_utc(), }))], @@ -109,7 +109,7 @@ async fn test_genesis_mismatch() { tracing::info!("Try to connect to a node with a mismatching genesis."); let mut stream = - preface::connect(ctx, cfgs[0].public_addr, preface::Endpoint::ConsensusNet) + preface::connect(ctx, *cfgs[0].server_addr, preface::Endpoint::ConsensusNet) .await .context("preface::connect")?; let res = handshake::outbound( @@ -145,7 +145,7 @@ async fn test_address_change() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -167,8 +167,8 @@ async fn test_address_change() { // Then it should broadcast its new address and the consensus network // should get reconstructed. cfgs[0].server_addr = net::tcp::testonly::reserve_listener(); - cfgs[0].public_addr = *cfgs[0].server_addr; - let (node0, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + cfgs[0].public_addr = (*cfgs[0].server_addr).into(); + let (node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); nodes[0] = node0; for n in &nodes { @@ -198,7 +198,7 @@ async fn test_transmission() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); let i = ctx::NoCopy(i); s.spawn_bg(async { let i = i; diff --git a/node/actors/network/src/gossip/arcmap.rs b/node/actors/network/src/gossip/arcmap.rs deleted file mode 100644 index 2fe3307d..00000000 --- a/node/actors/network/src/gossip/arcmap.rs +++ /dev/null @@ -1,42 +0,0 @@ -//! Multimap of pointers indexed by `node::PublicKey`. -//! Used to maintain a collection GetBlock rpc clients. -//! TODO(gprusak): consider upgrading PoolWatch instead. -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; -use zksync_consensus_roles::node; - -/// ArcMap -pub(crate) struct ArcMap(Mutex>>>); - -impl Default for ArcMap { - fn default() -> Self { - Self(Mutex::default()) - } -} - -impl ArcMap { - /// Fetches any pointer for the given key. - pub(crate) fn get_any(&self, key: &node::PublicKey) -> Option> { - self.0.lock().unwrap().get(key)?.first().cloned() - } - - /// Insert a pointer. - pub(crate) fn insert(&self, key: node::PublicKey, p: Arc) { - self.0.lock().unwrap().entry(key).or_default().push(p); - } - - /// Removes a pointer. - pub(crate) fn remove(&self, key: node::PublicKey, p: Arc) { - let mut this = self.0.lock().unwrap(); - use std::collections::hash_map::Entry; - let Entry::Occupied(mut e) = this.entry(key) else { - return; - }; - e.get_mut().retain(|c| !Arc::ptr_eq(&p, c)); - if e.get_mut().is_empty() { - e.remove(); - } - } -} diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index e20cf7e8..74a69c19 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -13,7 +13,7 @@ //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). use crate::{ - gossip::{ArcMap, ValidatorAddrsWatch}, + gossip::{ValidatorAddrsWatch}, io, pool::PoolWatch, rpc, Config, @@ -21,34 +21,34 @@ use crate::{ use anyhow::Context as _; use std::sync::{atomic::AtomicUsize, Arc}; -mod arcmap; mod handshake; mod runner; #[cfg(test)] mod tests; mod validator_addrs; -pub(crate) use arcmap::*; pub(crate) use validator_addrs::*; use zksync_concurrency::{ctx, ctx::channel}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; +pub(crate) struct Connection { + pub(crate) get_block: rpc::Client, +} + /// Gossip network state. pub(crate) struct Network { /// Gossip network configuration. pub(crate) cfg: Config, /// Currently open inbound connections. - pub(crate) inbound: PoolWatch, + pub(crate) inbound: PoolWatch>, /// Currently open outbound connections. - pub(crate) outbound: PoolWatch, + pub(crate) outbound: PoolWatch>, /// Current state of knowledge about validators' endpoints. pub(crate) validator_addrs: ValidatorAddrsWatch, /// Block store to serve `get_block` requests from. pub(crate) block_store: Arc, - /// Clients for `get_block` requests for each currently active peer. - pub(crate) get_block_clients: ArcMap>, /// Output pipe of the network actor. pub(crate) sender: channel::UnboundedSender, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. @@ -71,7 +71,6 @@ impl Network { outbound: PoolWatch::new(cfg.gossip.static_outbound.keys().cloned().collect(), 0), validator_addrs: ValidatorAddrsWatch::default(), block_store, - get_block_clients: ArcMap::default(), cfg, push_validator_addrs_calls: 0.into(), }) @@ -89,15 +88,12 @@ impl Network { recipient: &node::PublicKey, number: validator::BlockNumber, ) -> anyhow::Result> { - Ok(self - .get_block_clients - .get_any(recipient) + let outbound = self.outbound.current(); + let inbound = self.inbound.current(); + Ok(outbound.get(recipient).or(inbound.get(recipient)) .context("recipient is unreachable")? - .call( - ctx, - &rpc::get_block::Req(number), - self.cfg.max_block_size.saturating_add(kB), - ) + .get_block + .call(ctx, &rpc::get_block::Req(number), self.cfg.max_block_size.saturating_add(kB)) .await? .0) } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 83f320b5..3cc4c445 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -1,4 +1,4 @@ -use super::{handshake, Network, ValidatorAddrs}; +use super::{handshake, Network, ValidatorAddrs, Connection}; use crate::{io, noise, preface, rpc}; use async_trait::async_trait; use std::sync::{atomic::Ordering, Arc}; @@ -81,6 +81,7 @@ impl Network { ctx: &ctx::Ctx, peer: &node::PublicKey, stream: noise::Stream, + conn: &Connection, ) -> anyhow::Result<()> { let push_validator_addrs_client = rpc::Client::::new( ctx, @@ -92,15 +93,7 @@ impl Network { self.cfg.rpc.push_block_store_state_rate, ); let push_block_store_state_server = PushBlockStoreStateServer { peer, net: self }; - - let get_block_client = Arc::new(rpc::Client::::new( - ctx, - self.cfg.rpc.get_block_rate, - )); - self.get_block_clients - .insert(peer.clone(), get_block_client.clone()); - - let res = scope::run!(ctx, |ctx, s| async { + scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_client(&push_validator_addrs_client) .add_server( @@ -112,7 +105,7 @@ impl Network { push_block_store_state_server, self.cfg.rpc.push_block_store_state_rate, ) - .add_client(&get_block_client) + .add_client(&conn.get_block) .add_server(&*self.block_store, self.cfg.rpc.get_block_rate) .add_server(rpc::ping::Server, rpc::ping::RATE); @@ -156,11 +149,7 @@ impl Network { service.run(ctx, stream).await?; Ok(()) }) - .await; - - self.get_block_clients - .remove(peer.clone(), get_block_client); - res + .await } /// Handles an inbound stream. @@ -170,11 +159,13 @@ impl Network { ctx: &ctx::Ctx, mut stream: noise::Stream, ) -> anyhow::Result<()> { - let peer = - handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; + let peer = handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; tracing::Span::current().record("peer", tracing::field::debug(&peer)); - self.inbound.insert(peer.clone()).await?; - let res = self.run_stream(ctx, &peer, stream).await; + let conn = Arc::new(Connection { + get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), + }); + self.inbound.insert(peer.clone(),conn.clone()).await?; + let res = self.run_stream(ctx, &peer, stream, &*conn).await; self.inbound.remove(&peer).await; res } @@ -197,9 +188,11 @@ impl Network { peer, ) .await?; - - self.outbound.insert(peer.clone()).await?; - let res = self.run_stream(ctx, peer, stream).await; + let conn = Arc::new(Connection { + get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), + }); + self.outbound.insert(peer.clone(),conn.clone()).await?; + let res = self.run_stream(ctx, peer, stream, &*conn).await; self.outbound.remove(peer).await; res } diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 575a8492..836430ad 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -30,7 +30,7 @@ async fn test_one_connection_per_node() { let (store,runner) = new_store(ctx,&setup.genesis).await; s.spawn_bg(runner.run(ctx)); let mut nodes : Vec<_> = cfgs.iter().enumerate().map(|(i,cfg)| { - let (node,runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node,runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }).collect(); @@ -46,7 +46,7 @@ async fn test_one_connection_per_node() { let (peer, addr) = cfgs[0].gossip.static_outbound.iter().next().unwrap(); let mut stream = preface::connect( ctx, - *addr, + addr.resolve(ctx).await.unwrap().unwrap()[0], preface::Endpoint::GossipNet, ) .await @@ -235,7 +235,7 @@ async fn test_validator_addrs_propagation() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -245,7 +245,7 @@ async fn test_validator_addrs_propagation() { .map(|cfg| { ( cfg.validator_key.as_ref().unwrap().public(), - cfg.public_addr, + *cfg.server_addr, ) }) .collect(); @@ -274,7 +274,7 @@ async fn test_genesis_mismatch() { tracing::info!("Start one node, we will simulate the other one."); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (_node, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (_node, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); tracing::info!("Accept a connection with mismatching genesis."); @@ -290,7 +290,7 @@ async fn test_genesis_mismatch() { assert_matches!(res, Err(handshake::Error::GenesisMismatch)); tracing::info!("Try to connect to a node with a mismatching genesis."); - let mut stream = preface::connect(ctx, cfgs[0].public_addr, preface::Endpoint::GossipNet) + let mut stream = preface::connect(ctx, *cfgs[0].server_addr, preface::Endpoint::GossipNet) .await .context("preface::connect")?; let res = handshake::outbound( @@ -333,7 +333,7 @@ async fn syncing_blocks(node_count: usize, gossip_peers: usize) { for (i, cfg) in cfgs.into_iter().enumerate() { let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfg, store); + let (node, runner) = testonly::Instance::new(cfg, store); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); nodes.push(node); } @@ -408,7 +408,7 @@ async fn uncoordinated_block_syncing( let i = i; let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); s.spawn(async { let store = store; @@ -455,7 +455,7 @@ async fn getting_blocks_from_peers(node_count: usize, gossip_peers: usize) { .into_iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -536,7 +536,7 @@ async fn validator_node_restart() { cfg.rpc.push_validator_addrs_rate.refresh = time::Duration::ZERO; } let (store, store_runner) = new_store(ctx, &setup.genesis).await; - let (node1, node1_runner) = testonly::Instance::new(ctx, cfgs[1].clone(), store.clone()); + let (node1, node1_runner) = testonly::Instance::new(cfgs[1].clone(), store.clone()); scope::run!(ctx, |ctx, s| async { s.spawn_bg(store_runner.run(ctx)); s.spawn_bg( @@ -553,7 +553,7 @@ async fn validator_node_restart() { for clock_shift in [zero, sec, -2 * sec, 4 * sec, 10 * sec, -30 * sec] { // Set the new addr to broadcast. let addr0 = mk_addr(rng); - cfgs[0].public_addr = addr0; + cfgs[0].public_addr = addr0.into(); // Shift the UTC clock. let now = start + clock_shift; assert!( @@ -565,7 +565,7 @@ async fn validator_node_restart() { // _node0 contains pipe, which has to exist to prevent the connection from dying // early. - let (_node0, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (_node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); scope::run!(ctx, |ctx, s| async { s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); tracing::info!("wait for the update to arrive to node1"); @@ -607,13 +607,13 @@ async fn rate_limiting() { .map(|cfg| { ( cfg.validator_key.as_ref().unwrap().public(), - cfg.public_addr, + *cfg.server_addr, ) }) .collect(); for i in 1..n { let key = cfgs[i].gossip.key.public().clone(); - let public_addr = cfgs[i].public_addr; + let public_addr = cfgs[i].public_addr.clone(); cfgs[0].gossip.static_outbound.insert(key, public_addr); } let mut nodes = vec![]; @@ -623,7 +623,7 @@ async fn rate_limiting() { // Spawn the satellite nodes and wait until they register // their own address. for (i, cfg) in cfgs[1..].iter().enumerate() { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); let sub = &mut node.net.gossip.validator_addrs.subscribe(); sync::wait_for(ctx, sub, |got| { @@ -636,7 +636,7 @@ async fn rate_limiting() { } // Spawn the center node. - let (center, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (center, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node[0]"))); // Await for the center to receive all validator addrs. let sub = &mut center.net.gossip.validator_addrs.subscribe(); diff --git a/node/actors/network/src/gossip/validator_addrs.rs b/node/actors/network/src/gossip/validator_addrs.rs index ed176bf5..09f3602a 100644 --- a/node/actors/network/src/gossip/validator_addrs.rs +++ b/node/actors/network/src/gossip/validator_addrs.rs @@ -1,7 +1,7 @@ //! Global state distributed by active validators, observed by all the nodes in the network. use crate::watch::Watch; use std::{collections::HashSet, sync::Arc}; -use zksync_concurrency::sync; +use zksync_concurrency::{time,sync}; use zksync_consensus_roles::validator; /// Mapping from validator::PublicKey to a signed validator::NetAddress. @@ -90,6 +90,24 @@ impl ValidatorAddrsWatch { self.0.subscribe() } + pub(crate) async fn announce( + &self, + key: &validator::SecretKey, + addr: std::net::SocketAddr, + timestamp: time::Utc, + ) { + let this = self.0.lock().await; + let mut validator_addrs = this.borrow().clone(); + let version = validator_addrs.get(&key.public()).map(|x| x.msg.version + 1).unwrap_or(0); + let d = Arc::new(key.sign_msg(validator::NetAddress { + addr, + version, + timestamp, + })); + validator_addrs.0.insert(d.key.clone(), d); + this.send_replace(validator_addrs); + } + /// Inserts data to ValidatorAddrs. /// Subscribers are notified iff at least 1 new entry has /// been inserted. Returns an error iff an invalid @@ -103,7 +121,7 @@ impl ValidatorAddrsWatch { let this = self.0.lock().await; let mut validator_addrs = this.borrow().clone(); if validator_addrs.update(validators, data)? { - this.send(validator_addrs).ok().unwrap(); + this.send_replace(validator_addrs); } Ok(()) } diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 73c25cb4..e8315e45 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -46,13 +46,12 @@ impl Network { /// Constructs a new network actor state. /// Call `run_network` to run the actor. pub fn new( - ctx: &ctx::Ctx, cfg: Config, block_store: Arc, pipe: ActorPipe, ) -> (Arc, Runner) { let gossip = gossip::Network::new(cfg, block_store, pipe.send); - let consensus = consensus::Network::new(ctx, gossip.clone()); + let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( net.clone(), @@ -138,7 +137,7 @@ impl Runner { s.spawn(async { loop { let run_result = - self.net.gossip.run_outbound_stream(ctx, peer, *addr).await; + self.net.gossip.run_outbound_stream(ctx, peer, addr.clone()).await; if let Err(err) = run_result { tracing::info!("gossip.run_outbound_stream(): {err:#}"); } @@ -150,10 +149,11 @@ impl Runner { } if let Some(c) = &self.net.consensus { + let validators = &c.gossip.genesis().validators; // If we are active validator ... - if c.gossip.genesis().validators.contains(&c.key.public()) { + if validators.contains(&c.key.public()) { // Maintain outbound connections. - for peer in c.clients.keys() { + for peer in validators.iter() { s.spawn(async { c.maintain_connection(ctx, peer).await; Ok(()) diff --git a/node/actors/network/src/pool.rs b/node/actors/network/src/pool.rs index 92257993..53dba32a 100644 --- a/node/actors/network/src/pool.rs +++ b/node/actors/network/src/pool.rs @@ -1,40 +1,38 @@ //! An abstraction for a set of "connections" which constraints //! which peers are allowed to connect. use crate::watch::Watch; -use std::collections::HashSet; +use std::collections::{HashSet}; use zksync_concurrency::sync; -/// Set of elements of type T. +/// Map with restrictions on the allowed keys. /// This set consists of an arbitrary subset of `allowed` + up to `extra_limit` elements outside of /// `allowed`. #[derive(Clone)] -pub(crate) struct Pool { +pub(crate) struct Pool { extra_limit: usize, extra_count: usize, - allowed: HashSet, - current: HashSet, + allowed: HashSet, + current: im::HashMap, } -impl Pool { - /// Returns a reference to the underlying set. - #[allow(dead_code)] - pub(crate) fn current(&self) -> &HashSet { +impl Pool { + pub(crate) fn current(&self) -> &im::HashMap { &self.current } } /// Watch wrapper of the Pool. -/// Supports subscribing to the Pool updates. -pub(crate) struct PoolWatch(Watch>); +/// Supports subscribing to the Pool membership changes. +pub(crate) struct PoolWatch(Watch>); -impl PoolWatch { +impl PoolWatch { /// Constructs a new pool. - pub(crate) fn new(allowed: HashSet, extra_limit: usize) -> Self { + pub(crate) fn new(allowed: HashSet, extra_limit: usize) -> Self { Self(Watch::new(Pool { extra_limit, extra_count: 0, allowed, - current: HashSet::new(), + current: im::HashMap::new(), })) } @@ -42,40 +40,44 @@ impl PoolWatch { /// Returns an error if /// * `v` is already in the set /// * `v` cannot be added due to size restrictions - pub(crate) async fn insert(&self, v: T) -> anyhow::Result<()> { + pub(crate) async fn insert(&self, k:K,v:V) -> anyhow::Result<()> { self.0 .send_if_ok(|pool| { - if pool.current.contains(&v) { + if pool.current.contains_key(&k) { anyhow::bail!("already exists"); } - if !pool.allowed.contains(&v) { + if !pool.allowed.contains(&k) { if pool.extra_count >= pool.extra_limit { anyhow::bail!("limit exceeded"); } pool.extra_count += 1; } - pool.current.insert(v); + pool.current.insert(k,v); Ok(()) }) .await } /// Removes an element from the set. - pub(crate) async fn remove(&self, v: &T) { + pub(crate) async fn remove(&self, k: &K) { self.0.lock().await.send_if_modified(|pool| { - if !pool.current.remove(v) { + if pool.current.remove(k).is_none() { return false; } - if !pool.allowed.contains(v) { + if !pool.allowed.contains(k) { pool.extra_count -= 1; } true }); } + pub(crate) fn current(&self) -> im::HashMap { + self.0.subscribe().borrow().current.clone() + } + /// Subscribes to the set changes. #[allow(dead_code)] - pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { + pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { self.0.subscribe() } } diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 022bb747..d18bdddf 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -57,7 +57,7 @@ pub fn new_configs( let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, - public_addr: *addr, + public_addr: (*addr).into(), // Pings are disabled in tests by default to avoid dropping connections // due to timeouts. ping_timeout: None, @@ -79,7 +79,7 @@ pub fn new_configs( for j in 0..gossip_peers { let j = (i + j + 1) % n; let peer = cfgs[j].gossip.key.public(); - let addr = *cfgs[j].server_addr; + let addr = cfgs[j].public_addr.clone(); cfgs[i].gossip.static_outbound.insert(peer, addr); } } @@ -92,7 +92,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, - public_addr: *addr, + public_addr: (*addr).into(), // Pings are disabled in tests by default to avoid dropping connections // due to timeouts. ping_timeout: None, @@ -101,7 +101,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { key: rng.gen(), dynamic_inbound_limit: usize::MAX, static_inbound: HashSet::default(), - static_outbound: [(peer.gossip.key.public(), peer.public_addr)].into(), + static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(), }, max_block_size: usize::MAX, rpc: RpcConfig::default(), @@ -131,12 +131,11 @@ impl InstanceRunner { impl Instance { /// Construct an instance for a given config. pub fn new( - ctx: &ctx::Ctx, cfg: Config, block_store: Arc, ) -> (Self, InstanceRunner) { let (actor_pipe, dispatcher_pipe) = pipe::new(); - let (net, runner) = Network::new(ctx, cfg, block_store, actor_pipe); + let (net, runner) = Network::new(cfg, block_store, actor_pipe); let (terminate_send, terminate_recv) = channel::bounded(1); ( Self { @@ -187,7 +186,7 @@ impl Instance { .gossip .outbound .subscribe() - .wait_for(|got| want.is_subset(got.current())) + .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) .await .unwrap(); } @@ -200,13 +199,13 @@ impl Instance { consensus_state .inbound .subscribe() - .wait_for(|got| got.current() == &want) + .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) .await .unwrap(); consensus_state .outbound .subscribe() - .wait_for(|got| got.current() == &want) + .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) .await .unwrap(); } @@ -219,11 +218,11 @@ impl Instance { ) -> ctx::OrCanceled<()> { let state = &self.net.gossip; sync::wait_for(ctx, &mut state.inbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; sync::wait_for(ctx, &mut state.outbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; Ok(()) @@ -237,11 +236,11 @@ impl Instance { ) -> ctx::OrCanceled<()> { let state = self.net.consensus.as_ref().unwrap(); sync::wait_for(ctx, &mut state.inbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; sync::wait_for(ctx, &mut state.outbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; Ok(()) diff --git a/node/actors/network/src/tests.rs b/node/actors/network/src/tests.rs index c1cde500..718ac5e6 100644 --- a/node/actors/network/src/tests.rs +++ b/node/actors/network/src/tests.rs @@ -20,7 +20,7 @@ async fn test_metrics() { .into_iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) diff --git a/node/libs/concurrency/src/net/mod.rs b/node/libs/concurrency/src/net/mod.rs index 5d446038..68d36f96 100644 --- a/node/libs/concurrency/src/net/mod.rs +++ b/node/libs/concurrency/src/net/mod.rs @@ -14,6 +14,10 @@ mod tests; #[derive(Debug,Clone,PartialEq)] pub struct Host(pub String); +impl From for Host { + fn from(addr: std::net::SocketAddr) -> Self { Self(addr.to_string()) } +} + impl Host { /// If host is of the form ":", performs DNS resolution. /// If host is of the form ":", just parses the SocketAddr. @@ -28,4 +32,9 @@ impl Host { Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) })).await?.unwrap()) } + + /*/// Assumes host to be of the form ":" and parses it. + pub fn parse_socket_addr(&self) -> Result { + self.0.parse() + }*/ } From 5b58d5e7b85841e1a87fee3b81ac9816a1293ec0 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Tue, 26 Mar 2024 17:03:38 +0100 Subject: [PATCH 04/15] everything compiles --- node/actors/bft/src/testonly/run.rs | 1 - node/actors/executor/src/lib.rs | 7 ++-- node/actors/executor/src/tests.rs | 2 +- .../sync_blocks/src/tests/end_to_end.rs | 2 +- node/tools/src/bin/localnet_config.rs | 6 ++-- node/tools/src/config.rs | 33 ++++++++++--------- node/tools/src/k8s.rs | 2 +- node/tools/src/main.rs | 4 +-- node/tools/src/tests.rs | 4 +-- 9 files changed, 30 insertions(+), 31 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 693f85ca..af06ea04 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -77,7 +77,6 @@ async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow:: let mut nodes = vec![]; for (i, spec) in specs.iter().enumerate() { let (node, runner) = network::testonly::Instance::new( - ctx, spec.net.clone(), spec.block_store.clone(), ); diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 2e0fad85..cec596d8 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -45,7 +45,7 @@ pub struct Config { pub server_addr: std::net::SocketAddr, /// Public TCP address that other nodes are expected to connect to. /// It is announced over gossip network. - pub public_addr: std::net::SocketAddr, + pub public_addr: net::Host, /// Maximal size of the block payload. pub max_payload_size: usize, @@ -59,7 +59,7 @@ pub struct Config { pub gossip_static_inbound: HashSet, /// Outbound connections that the node should actively try to /// establish and maintain. - pub gossip_static_outbound: HashMap, + pub gossip_static_outbound: HashMap, } impl Config { @@ -90,7 +90,7 @@ impl Executor { fn network_config(&self) -> network::Config { network::Config { server_addr: net::tcp::ListenerAddr::new(self.config.server_addr), - public_addr: self.config.public_addr, + public_addr: self.config.public_addr.clone(), gossip: self.config.gossip(), validator_key: self.validator.as_ref().map(|v| v.key.clone()), ping_timeout: Some(time::Duration::seconds(10)), @@ -136,7 +136,6 @@ impl Executor { s.spawn_blocking(|| dispatcher.run(ctx).context("IO Dispatcher stopped")); s.spawn(async { let (net, runner) = network::Network::new( - ctx, network_config, self.block_store.clone(), network_actor_pipe, diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 2b34e7a0..7ee8394d 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -12,7 +12,7 @@ use zksync_consensus_storage::{ fn config(cfg: &network::Config) -> Config { Config { server_addr: *cfg.server_addr, - public_addr: cfg.public_addr, + public_addr: cfg.public_addr.clone(), max_payload_size: usize::MAX, node_key: cfg.gossip.key.clone(), gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index b078a804..77e3affd 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -80,7 +80,7 @@ impl NodeRunner { let key = self.network.gossip.key.public(); let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); let (mut network, network_runner) = - network::testonly::Instance::new(ctx, self.network.clone(), self.store.clone()); + network::testonly::Instance::new(self.network.clone(), self.store.clone()); let sync_blocks_config = Config::new(); let res = scope::run!(ctx, |ctx, s| async { s.spawn_bg(self.store_runner.run(ctx)); diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index e5ef14bb..a13db3a6 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -61,21 +61,21 @@ fn main() -> anyhow::Result<()> { default_config.with_metrics_server_addr(metrics_server_addr); } let mut cfgs: Vec<_> = (0..nodes) - .map(|i| default_config.with_public_addr(addrs[i]).clone()) + .map(|i| default_config.with_public_addr(addrs[i].into()).clone()) .collect(); // Construct a gossip network with optimal diameter. for i in 0..nodes { for j in 0..peers { let next = (i * peers + j + 1) % nodes; - cfgs[i].add_gossip_static_outbound(node_keys[next].public(), addrs[next]); + cfgs[i].add_gossip_static_outbound(node_keys[next].public(), addrs[next].into()); cfgs[next].add_gossip_static_inbound(node_keys[i].public()); } } for (i, cfg) in cfgs.into_iter().enumerate() { // Recreate the directory for the node's config. - let root = args.output_dir.join(cfg.public_addr.to_string()); + let root = args.output_dir.join(&cfg.public_addr.0); let _ = fs::remove_dir_all(&root); fs::create_dir_all(&root).with_context(|| format!("create_dir_all({:?})", root))?; cfg.write_to_file(&root)?; diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index ffb8410c..08faec36 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -9,7 +9,7 @@ use std::{ path::{Path, PathBuf}, str::FromStr, }; -use zksync_concurrency::ctx; +use zksync_concurrency::{ctx,net}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; @@ -43,11 +43,11 @@ pub(crate) fn encode_with_serializer( String::from_utf8(serializer.into_inner()).unwrap() } -/// Pair of (public key, ip address) for a gossip network node. +/// Pair of (public key, host addr) for a gossip network node. #[derive(Debug, Clone)] pub struct NodeAddr { pub key: node::PublicKey, - pub addr: SocketAddr, + pub addr: net::Host, } impl ProtoFmt for NodeAddr { @@ -55,14 +55,14 @@ impl ProtoFmt for NodeAddr { fn read(r: &Self::Proto) -> anyhow::Result { let key = read_required_text(&r.key).context("key")?; - let addr = read_required_text(&r.addr).context("addr")?; + let addr = net::Host(required(&r.addr).context("addr")?.clone()); Ok(Self { addr, key }) } fn build(&self) -> Self::Proto { Self::Proto { key: Some(TextFmt::encode(&self.key)), - addr: Some(TextFmt::encode(&self.addr)), + addr: Some(self.addr.0.clone()), } } } @@ -72,7 +72,7 @@ impl ProtoFmt for NodeAddr { #[derive(Debug, PartialEq, Clone)] pub struct AppConfig { pub server_addr: SocketAddr, - pub public_addr: SocketAddr, + pub public_addr: net::Host, pub metrics_server_addr: Option, pub genesis: validator::Genesis, @@ -80,7 +80,7 @@ pub struct AppConfig { pub gossip_dynamic_inbound_limit: usize, pub gossip_static_inbound: HashSet, - pub gossip_static_outbound: HashMap, + pub gossip_static_outbound: HashMap, } impl ProtoFmt for AppConfig { @@ -104,7 +104,7 @@ impl ProtoFmt for AppConfig { } Ok(Self { server_addr: read_required_text(&r.server_addr).context("server_addr")?, - public_addr: read_required_text(&r.public_addr).context("public_addr")?, + public_addr: net::Host(required(&r.public_addr).context("public_addr")?.clone()), metrics_server_addr: read_optional_text(&r.metrics_server_addr) .context("metrics_server_addr")?, @@ -124,7 +124,7 @@ impl ProtoFmt for AppConfig { fn build(&self) -> Self::Proto { Self::Proto { server_addr: Some(self.server_addr.encode()), - public_addr: Some(self.public_addr.encode()), + public_addr: Some(self.public_addr.0.clone()), metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode), genesis: Some(self.genesis.build()), @@ -143,7 +143,7 @@ impl ProtoFmt for AppConfig { .iter() .map(|(key, addr)| proto::NodeAddr { key: Some(TextFmt::encode(key)), - addr: Some(TextFmt::encode(addr)), + addr: Some(addr.0.clone()), }) .collect(), } @@ -234,7 +234,7 @@ impl AppConfig { pub fn default_for(genesis: validator::Genesis) -> AppConfig { Self { server_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), NODES_PORT), - public_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), NODES_PORT), + public_addr: SocketAddr::new(Ipv4Addr::LOCALHOST.into(), NODES_PORT).into(), metrics_server_addr: None, genesis, @@ -251,7 +251,7 @@ impl AppConfig { self } - pub fn with_public_addr(&mut self, public_addr: SocketAddr) -> &mut Self { + pub fn with_public_addr(&mut self, public_addr: net::Host) -> &mut Self { self.public_addr = public_addr; self } @@ -272,7 +272,7 @@ impl AppConfig { pub fn add_gossip_static_outbound( &mut self, key: node::PublicKey, - addr: SocketAddr, + addr: net::Host, ) -> &mut Self { self.gossip_static_outbound.insert(key, addr); self @@ -292,9 +292,10 @@ impl AppConfig { self } - pub fn check_public_addr(&mut self) -> anyhow::Result<()> { + /// Tries to load the public_addr IP from the env var. + pub fn try_load_public_addr(&mut self) -> anyhow::Result<()> { if let Ok(public_addr) = std::env::var("PUBLIC_ADDR") { - self.public_addr = SocketAddr::from_str(&format!("{public_addr}:{NODES_PORT}"))?; + self.public_addr = SocketAddr::from_str(&format!("{public_addr}:{NODES_PORT}"))?.into(); } Ok(()) } @@ -310,7 +311,7 @@ impl Configs { let e = executor::Executor { config: executor::Config { server_addr: self.app.server_addr, - public_addr: self.app.public_addr, + public_addr: self.app.public_addr.clone(), node_key: self.node_key.clone(), gossip_dynamic_inbound_limit: self.app.gossip_dynamic_inbound_limit, gossip_static_inbound: self.app.gossip_static_inbound.clone(), diff --git a/node/tools/src/k8s.rs b/node/tools/src/k8s.rs index 50cb012b..40c1936f 100644 --- a/node/tools/src/k8s.rs +++ b/node/tools/src/k8s.rs @@ -75,7 +75,7 @@ impl ConsensusNode { .context("Pod IP address not present")?; self.node_addr = Some(NodeAddr { key: self.key.public(), - addr: SocketAddr::new(ip.parse()?, config::NODES_PORT), + addr: SocketAddr::new(ip.parse()?, config::NODES_PORT).into(), }); Ok(()) } diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index a17efa32..b98faba5 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -130,14 +130,14 @@ async fn main() -> anyhow::Result<()> { let mut configs = args.config_args().load().context("config_args().load()")?; // if `PUBLIC_ADDR` env var is set, use it to override publicAddr in config - configs.app.check_public_addr().context("Public Address")?; + configs.app.try_load_public_addr().context("Public Address")?; let (executor, runner) = configs .make_executor(ctx) .await .context("configs.into_executor()")?; - let mut rpc_addr = configs.app.public_addr; + let mut rpc_addr = configs.app.server_addr; if let Some(port) = args.rpc_port { rpc_addr.set_port(port); } else { diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index 15d2f840..f8fe9e26 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -17,7 +17,7 @@ impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AppConfig { AppConfig { server_addr: make_addr(rng), - public_addr: make_addr(rng), + public_addr: make_addr(rng).into(), metrics_server_addr: Some(make_addr(rng)), genesis: rng.gen(), @@ -27,7 +27,7 @@ impl Distribution for Standard { .map(|_| rng.gen::().public()) .collect(), gossip_static_outbound: (0..6) - .map(|_| (rng.gen::().public(), make_addr(rng))) + .map(|_| (rng.gen::().public(), make_addr(rng).into())) .collect(), max_payload_size: rng.gen(), } From 03f3137ec337986f61586cf9143bb854c94a48ce Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 09:39:35 +0100 Subject: [PATCH 05/15] snapshot --- node/actors/network/src/consensus/mod.rs | 36 ++++++++++++++++++++---- node/actors/network/src/gossip/tests.rs | 15 ++++------ node/libs/concurrency/src/time.rs | 6 ++++ 3 files changed, 41 insertions(+), 16 deletions(-) diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index a3a42728..3cd2e735 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -174,6 +174,9 @@ impl Network { let ping_client = ping_client; ping_client.ping_loop(ctx, *ping_timeout).await }); + } + if peer == &self.key.public() { + } service.run(ctx, stream).await?; Ok(()) @@ -227,20 +230,41 @@ impl Network { let key = self.key.public(); let mut outbound = self.outbound.subscribe(); let mut addrs = self.gossip.validator_addrs.subscribe(); + // Current address of this node. let mut my_addr = None; - while ctx.is_active() { - let _ : Result<(),_> = scope::run!(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL),|ctx,s| async { - s.spawn::<()>(async { + loop { + // Wait for one of the following: + let _ : ctx::OrCanceled<()> = scope::run!(ctx,|ctx,s| async { + // loopback connection was established to a different address (this node's address has changed) + s.spawn_bg::<()>(async { sync::wait_for(ctx, &mut outbound, |x| x.current().get(&key).map(|x|x.addr) != my_addr).await?; - Err(ctx::Canceled) + tracing::info!("loopback conn addr"); + s.cancel(); + Ok(()) }); - sync::wait_for(ctx, &mut addrs, |got| got.get(&key).map(|x| x.msg.addr) != my_addr).await?; - Err(ctx::Canceled) + // an announcement from the node's previous execution has been received which + // overrides our announcement (reannouncement is needed). + s.spawn_bg::<()>(async { + sync::wait_for(ctx, &mut addrs, |got| got.get(&key).map(|x| x.msg.addr) != my_addr).await?; + tracing::info!("reannouncement"); + s.cancel(); + Ok(()) + }); + // timeout has passed. + ctx.sleep(ADDRESS_ANNOUNCER_INTERVAL).await?; + tracing::info!("timeout announcement"); + Ok(()) }).await; + if !ctx.is_active() { + return; + } + // If a loopback connection exists, update the current address. if let Some(conn) = outbound.borrow().current().get(&key) { my_addr = Some(conn.addr); } + // If address of this node is known, announce it. if let Some(addr) = my_addr { + tracing::debug!("announcing validator address {addr}"); self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await; } } diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 836430ad..44839a2a 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -10,7 +10,7 @@ use std::{ use test_casing::{test_casing, Product}; use tracing::Instrument as _; use zksync_concurrency::{ - ctx, oneshot, scope, sync, + ctx, net, oneshot, scope, sync, testonly::{abort_on_panic, set_timeout}, time, }; @@ -552,8 +552,8 @@ async fn validator_node_restart() { let start = ctx.now_utc(); for clock_shift in [zero, sec, -2 * sec, 4 * sec, 10 * sec, -30 * sec] { // Set the new addr to broadcast. - let addr0 = mk_addr(rng); - cfgs[0].public_addr = addr0.into(); + cfgs[0].server_addr = net::tcp::testonly::reserve_listener(); + cfgs[0].public_addr = (*cfgs[0].server_addr).into(); // Shift the UTC clock. let now = start + clock_shift; assert!( @@ -570,13 +570,8 @@ async fn validator_node_restart() { s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); tracing::info!("wait for the update to arrive to node1"); let sub = &mut node1.net.gossip.validator_addrs.subscribe(); - sync::wait_for(ctx, sub, |got| { - let Some(got) = got.get(&setup.keys[0].public()) else { - return false; - }; - got.msg.addr == addr0 - }) - .await?; + let want = Some(*cfgs[0].server_addr); + sync::wait_for(ctx, sub, |got| got.get(&setup.keys[0].public()).map(|x|x.msg.addr) == want).await?; Ok(()) }) .await?; diff --git a/node/libs/concurrency/src/time.rs b/node/libs/concurrency/src/time.rs index b46cca63..718317c6 100644 --- a/node/libs/concurrency/src/time.rs +++ b/node/libs/concurrency/src/time.rs @@ -18,6 +18,12 @@ impl std::fmt::Debug for Utc { } } +impl std::fmt::Display for Utc { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + (time::OffsetDateTime::UNIX_EPOCH + self.0).fmt(f) + } +} + /// Start of the unix epoch. pub const UNIX_EPOCH: Utc = Utc(Duration::ZERO); From dc6ec39fb95b0ff87421f6c233aecca2588138b8 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 09:45:54 +0100 Subject: [PATCH 06/15] snapshot --- node/actors/network/src/consensus/mod.rs | 59 +++++------------------- 1 file changed, 12 insertions(+), 47 deletions(-) diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 3cd2e735..1b41bba6 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -175,8 +175,19 @@ impl Network { ping_client.ping_loop(ctx, *ping_timeout).await }); } + // If this is a loopback connection, announce periodically the address of this + // validator to the network. + // Note that this is executed only for outbound end of the loopback connection. + // Inbound end doesn't know the public address of itself. if peer == &self.key.public() { - + let mut sub = self.gossip.validator_addrs.subscribe(); + s.spawn(async { + while ctx.is_active() { + self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await; + let _ = sync::wait_for(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), &mut sub, |got| got.get(peer).map(|x| x.msg.addr) != addr).await; + } + Ok(()) + }); } service.run(ctx, stream).await?; Ok(()) @@ -223,50 +234,4 @@ impl Network { } } } - - /// Periodically announces this validator's public IP over gossip network, - /// so that other validators can discover and connect to this validator. - pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) { - let key = self.key.public(); - let mut outbound = self.outbound.subscribe(); - let mut addrs = self.gossip.validator_addrs.subscribe(); - // Current address of this node. - let mut my_addr = None; - loop { - // Wait for one of the following: - let _ : ctx::OrCanceled<()> = scope::run!(ctx,|ctx,s| async { - // loopback connection was established to a different address (this node's address has changed) - s.spawn_bg::<()>(async { - sync::wait_for(ctx, &mut outbound, |x| x.current().get(&key).map(|x|x.addr) != my_addr).await?; - tracing::info!("loopback conn addr"); - s.cancel(); - Ok(()) - }); - // an announcement from the node's previous execution has been received which - // overrides our announcement (reannouncement is needed). - s.spawn_bg::<()>(async { - sync::wait_for(ctx, &mut addrs, |got| got.get(&key).map(|x| x.msg.addr) != my_addr).await?; - tracing::info!("reannouncement"); - s.cancel(); - Ok(()) - }); - // timeout has passed. - ctx.sleep(ADDRESS_ANNOUNCER_INTERVAL).await?; - tracing::info!("timeout announcement"); - Ok(()) - }).await; - if !ctx.is_active() { - return; - } - // If a loopback connection exists, update the current address. - if let Some(conn) = outbound.borrow().current().get(&key) { - my_addr = Some(conn.addr); - } - // If address of this node is known, announce it. - if let Some(addr) = my_addr { - tracing::debug!("announcing validator address {addr}"); - self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await; - } - } - } } From aec5777124a036bec25607a308355d664ed7c82b Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 10:00:54 +0100 Subject: [PATCH 07/15] tests pass --- node/actors/network/src/consensus/mod.rs | 47 ++++++++++++++----- node/actors/network/src/gossip/mod.rs | 23 +++++---- node/actors/network/src/gossip/runner.rs | 27 ++++++----- node/actors/network/src/gossip/tests.rs | 5 +- .../network/src/gossip/validator_addrs.rs | 8 +++- node/actors/network/src/lib.rs | 12 ++--- node/actors/network/src/metrics.rs | 9 ++-- node/actors/network/src/pool.rs | 25 +++++----- node/actors/network/src/rpc/mod.rs | 2 + node/actors/network/src/testonly.rs | 11 ++--- node/libs/concurrency/src/net/mod.rs | 26 ++++++---- node/libs/concurrency/src/net/tests.rs | 22 ++++++--- node/tools/src/bin/deployer.rs | 6 +-- node/tools/src/config.rs | 2 +- node/tools/src/main.rs | 5 +- 15 files changed, 140 insertions(+), 90 deletions(-) diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 1b41bba6..621acdda 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -2,11 +2,8 @@ //! BFT consensus messages are exchanged over this network. use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc}; use anyhow::Context as _; -use std::{ - collections::{HashSet}, - sync::Arc, -}; use rand::seq::SliceRandom; +use std::{collections::HashSet, sync::Arc}; use zksync_concurrency::{ctx, oneshot, scope, sync, time}; use zksync_consensus_roles::validator; use zksync_protobuf::kB; @@ -22,7 +19,11 @@ const RESP_MAX_SIZE: usize = kB; /// is down. const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10); +/// Outbound connection state. pub(crate) struct Connection { + /// Peer's address. + /// This is not used for now, but will be required for the debug page. + #[allow(dead_code)] addr: std::net::SocketAddr, consensus: rpc::Client, } @@ -106,7 +107,9 @@ impl Network { msg: validator::Signed, ) -> anyhow::Result<()> { let outbound = self.outbound.current(); - outbound.get(key).context("not an active validator")? + outbound + .get(key) + .context("not an active validator")? .consensus .call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE) .await?; @@ -122,7 +125,7 @@ impl Network { ) -> anyhow::Result<()> { let peer = handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?; - self.inbound.insert(peer.clone(),()).await?; + self.inbound.insert(peer.clone(), ()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) @@ -162,7 +165,7 @@ impl Network { addr, consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate), }); - self.outbound.insert(peer.clone(),conn.clone()).await?; + self.outbound.insert(peer.clone(), conn.clone()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) @@ -180,11 +183,19 @@ impl Network { // Note that this is executed only for outbound end of the loopback connection. // Inbound end doesn't know the public address of itself. if peer == &self.key.public() { - let mut sub = self.gossip.validator_addrs.subscribe(); s.spawn(async { + let mut sub = self.gossip.validator_addrs.subscribe(); while ctx.is_active() { - self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await; - let _ = sync::wait_for(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), &mut sub, |got| got.get(peer).map(|x| x.msg.addr) != addr).await; + self.gossip + .validator_addrs + .announce(&self.key, addr, ctx.now_utc()) + .await; + let _ = sync::wait_for( + &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), + &mut sub, + |got| got.get(peer).map(|x| x.msg.addr) != Some(addr), + ) + .await; } Ok(()) }); @@ -198,9 +209,19 @@ impl Network { } async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let addr = *self.gossip.cfg.public_addr.resolve(ctx).await?.context("resolve()")? - .choose(&mut ctx.rng()).with_context(||format!("{:?} resolved to no addresses",self.gossip.cfg.public_addr))?; - self.run_outbound_stream(ctx, &self.key.public(), addr).await + let addr = *self + .gossip + .cfg + .public_addr + .resolve(ctx) + .await? + .context("resolve()")? + .choose(&mut ctx.rng()) + .with_context(|| { + format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr) + })?; + self.run_outbound_stream(ctx, &self.key.public(), addr) + .await } /// Maintains a connection to the given validator. diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 74a69c19..2b88dbb2 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,12 +12,7 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -use crate::{ - gossip::{ValidatorAddrsWatch}, - io, - pool::PoolWatch, - rpc, Config, -}; +use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, rpc, Config}; use anyhow::Context as _; use std::sync::{atomic::AtomicUsize, Arc}; @@ -33,7 +28,9 @@ use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; +/// State of the gossip connection. pub(crate) struct Connection { + /// `get_block` rpc client. pub(crate) get_block: rpc::Client, } @@ -42,9 +39,9 @@ pub(crate) struct Network { /// Gossip network configuration. pub(crate) cfg: Config, /// Currently open inbound connections. - pub(crate) inbound: PoolWatch>, + pub(crate) inbound: PoolWatch>, /// Currently open outbound connections. - pub(crate) outbound: PoolWatch>, + pub(crate) outbound: PoolWatch>, /// Current state of knowledge about validators' endpoints. pub(crate) validator_addrs: ValidatorAddrsWatch, /// Block store to serve `get_block` requests from. @@ -90,10 +87,16 @@ impl Network { ) -> anyhow::Result> { let outbound = self.outbound.current(); let inbound = self.inbound.current(); - Ok(outbound.get(recipient).or(inbound.get(recipient)) + Ok(outbound + .get(recipient) + .or(inbound.get(recipient)) .context("recipient is unreachable")? .get_block - .call(ctx, &rpc::get_block::Req(number), self.cfg.max_block_size.saturating_add(kB)) + .call( + ctx, + &rpc::get_block::Req(number), + self.cfg.max_block_size.saturating_add(kB), + ) .await? .0) } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 3cc4c445..e3568b13 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -1,13 +1,13 @@ -use super::{handshake, Network, ValidatorAddrs, Connection}; +use super::{handshake, Connection, Network, ValidatorAddrs}; use crate::{io, noise, preface, rpc}; +use anyhow::Context as _; use async_trait::async_trait; +use rand::seq::SliceRandom; use std::sync::{atomic::Ordering, Arc}; -use zksync_concurrency::{net, ctx, oneshot, scope, sync}; +use zksync_concurrency::{ctx, net, oneshot, scope, sync}; use zksync_consensus_roles::node; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; -use rand::seq::SliceRandom; -use anyhow::Context as _; struct PushValidatorAddrsServer<'a>(&'a Network); @@ -159,13 +159,14 @@ impl Network { ctx: &ctx::Ctx, mut stream: noise::Stream, ) -> anyhow::Result<()> { - let peer = handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; + let peer = + handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; tracing::Span::current().record("peer", tracing::field::debug(&peer)); let conn = Arc::new(Connection { get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), }); - self.inbound.insert(peer.clone(),conn.clone()).await?; - let res = self.run_stream(ctx, &peer, stream, &*conn).await; + self.inbound.insert(peer.clone(), conn.clone()).await?; + let res = self.run_stream(ctx, &peer, stream, &conn).await; self.inbound.remove(&peer).await; res } @@ -177,8 +178,12 @@ impl Network { peer: &node::PublicKey, addr: net::Host, ) -> anyhow::Result<()> { - let addr = *addr.resolve(ctx).await?.context("resolve()")? - .choose(&mut ctx.rng()).with_context(||"{addr:?} resolved to empty address set")?; + let addr = *addr + .resolve(ctx) + .await? + .context("resolve()")? + .choose(&mut ctx.rng()) + .with_context(|| "{addr:?} resolved to empty address set")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet).await?; handshake::outbound( ctx, @@ -191,8 +196,8 @@ impl Network { let conn = Arc::new(Connection { get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), }); - self.outbound.insert(peer.clone(),conn.clone()).await?; - let res = self.run_stream(ctx, peer, stream, &*conn).await; + self.outbound.insert(peer.clone(), conn.clone()).await?; + let res = self.run_stream(ctx, peer, stream, &conn).await; self.outbound.remove(peer).await; res } diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 44839a2a..85e4d0d1 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -571,7 +571,10 @@ async fn validator_node_restart() { tracing::info!("wait for the update to arrive to node1"); let sub = &mut node1.net.gossip.validator_addrs.subscribe(); let want = Some(*cfgs[0].server_addr); - sync::wait_for(ctx, sub, |got| got.get(&setup.keys[0].public()).map(|x|x.msg.addr) == want).await?; + sync::wait_for(ctx, sub, |got| { + got.get(&setup.keys[0].public()).map(|x| x.msg.addr) == want + }) + .await?; Ok(()) }) .await?; diff --git a/node/actors/network/src/gossip/validator_addrs.rs b/node/actors/network/src/gossip/validator_addrs.rs index 09f3602a..8fd21b63 100644 --- a/node/actors/network/src/gossip/validator_addrs.rs +++ b/node/actors/network/src/gossip/validator_addrs.rs @@ -1,7 +1,7 @@ //! Global state distributed by active validators, observed by all the nodes in the network. use crate::watch::Watch; use std::{collections::HashSet, sync::Arc}; -use zksync_concurrency::{time,sync}; +use zksync_concurrency::{sync, time}; use zksync_consensus_roles::validator; /// Mapping from validator::PublicKey to a signed validator::NetAddress. @@ -90,6 +90,7 @@ impl ValidatorAddrsWatch { self.0.subscribe() } + /// Inserts a new version of the announcement signed with the given key. pub(crate) async fn announce( &self, key: &validator::SecretKey, @@ -98,7 +99,10 @@ impl ValidatorAddrsWatch { ) { let this = self.0.lock().await; let mut validator_addrs = this.borrow().clone(); - let version = validator_addrs.get(&key.public()).map(|x| x.msg.version + 1).unwrap_or(0); + let version = validator_addrs + .get(&key.public()) + .map(|x| x.msg.version + 1) + .unwrap_or(0); let d = Arc::new(key.sign_msg(validator::NetAddress { addr, version, diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index e8315e45..c8d3e70d 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -136,8 +136,11 @@ impl Runner { for (peer, addr) in &self.net.gossip.cfg.gossip.static_outbound { s.spawn(async { loop { - let run_result = - self.net.gossip.run_outbound_stream(ctx, peer, addr.clone()).await; + let run_result = self + .net + .gossip + .run_outbound_stream(ctx, peer, addr.clone()) + .await; if let Err(err) = run_result { tracing::info!("gossip.run_outbound_stream(): {err:#}"); } @@ -159,11 +162,6 @@ impl Runner { Ok(()) }); } - // Announce IP periodically. - s.spawn(async { - c.run_address_announcer(ctx).await; - Ok(()) - }); } } diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs index 86fa1cf5..5cf1c52f 100644 --- a/node/actors/network/src/metrics.rs +++ b/node/actors/network/src/metrics.rs @@ -145,15 +145,14 @@ impl NetworkGauges { let register_result = COLLECTOR.before_scrape(move || { state_ref.upgrade().map(|state| { let gauges = NetworkGauges::default(); - let len = state.gossip.inbound.subscribe().borrow().current().len(); + let len = state.gossip.inbound.current().len(); gauges.gossip_inbound_connections.set(len); - let len = state.gossip.outbound.subscribe().borrow().current().len(); + let len = state.gossip.outbound.current().len(); gauges.gossip_outbound_connections.set(len); if let Some(consensus_state) = &state.consensus { - let len = consensus_state.inbound.subscribe().borrow().current().len(); + let len = consensus_state.inbound.current().len(); gauges.consensus_inbound_connections.set(len); - let subscriber = consensus_state.outbound.subscribe(); - let len = subscriber.borrow().current().len(); + let len = consensus_state.outbound.current().len(); gauges.consensus_outbound_connections.set(len); } gauges diff --git a/node/actors/network/src/pool.rs b/node/actors/network/src/pool.rs index 53dba32a..a5e7375f 100644 --- a/node/actors/network/src/pool.rs +++ b/node/actors/network/src/pool.rs @@ -1,31 +1,32 @@ //! An abstraction for a set of "connections" which constraints //! which peers are allowed to connect. use crate::watch::Watch; -use std::collections::{HashSet}; +use std::collections::HashSet; use zksync_concurrency::sync; /// Map with restrictions on the allowed keys. /// This set consists of an arbitrary subset of `allowed` + up to `extra_limit` elements outside of /// `allowed`. #[derive(Clone)] -pub(crate) struct Pool { +pub(crate) struct Pool { extra_limit: usize, extra_count: usize, allowed: HashSet, - current: im::HashMap, + current: im::HashMap, } -impl Pool { - pub(crate) fn current(&self) -> &im::HashMap { +impl Pool { + /// Current pool state. + pub(crate) fn current(&self) -> &im::HashMap { &self.current } } /// Watch wrapper of the Pool. /// Supports subscribing to the Pool membership changes. -pub(crate) struct PoolWatch(Watch>); +pub(crate) struct PoolWatch(Watch>); -impl PoolWatch { +impl PoolWatch { /// Constructs a new pool. pub(crate) fn new(allowed: HashSet, extra_limit: usize) -> Self { Self(Watch::new(Pool { @@ -40,7 +41,7 @@ impl PoolWatch { /// Returns an error if /// * `v` is already in the set /// * `v` cannot be added due to size restrictions - pub(crate) async fn insert(&self, k:K,v:V) -> anyhow::Result<()> { + pub(crate) async fn insert(&self, k: K, v: V) -> anyhow::Result<()> { self.0 .send_if_ok(|pool| { if pool.current.contains_key(&k) { @@ -52,7 +53,7 @@ impl PoolWatch { } pool.extra_count += 1; } - pool.current.insert(k,v); + pool.current.insert(k, v); Ok(()) }) .await @@ -71,13 +72,13 @@ impl PoolWatch { }); } - pub(crate) fn current(&self) -> im::HashMap { + /// Copy of the current pool state. + pub(crate) fn current(&self) -> im::HashMap { self.0.subscribe().borrow().current.clone() } /// Subscribes to the set changes. - #[allow(dead_code)] - pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { + pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { self.0.subscribe() } } diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 184326cd..7028987b 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -119,6 +119,8 @@ pub(crate) struct Client { impl Client { /// Constructs a new client. + // TODO(gprusak): at this point we don't need the clients to be reusable, + // so perhaps they should be constructed by `Servive::add_client` instead? pub(crate) fn new(ctx: &ctx::Ctx, rate: limiter::Rate) -> Self { Client { limiter: limiter::Limiter::new(ctx, rate), diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index d18bdddf..3a55c324 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -130,10 +130,7 @@ impl InstanceRunner { impl Instance { /// Construct an instance for a given config. - pub fn new( - cfg: Config, - block_store: Arc, - ) -> (Self, InstanceRunner) { + pub fn new(cfg: Config, block_store: Arc) -> (Self, InstanceRunner) { let (actor_pipe, dispatcher_pipe) = pipe::new(); let (net, runner) = Network::new(cfg, block_store, actor_pipe); let (terminate_send, terminate_recv) = channel::bounded(1); @@ -186,7 +183,7 @@ impl Instance { .gossip .outbound .subscribe() - .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); } @@ -199,13 +196,13 @@ impl Instance { consensus_state .inbound .subscribe() - .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); consensus_state .outbound .subscribe() - .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); } diff --git a/node/libs/concurrency/src/net/mod.rs b/node/libs/concurrency/src/net/mod.rs index 68d36f96..a5c04a37 100644 --- a/node/libs/concurrency/src/net/mod.rs +++ b/node/libs/concurrency/src/net/mod.rs @@ -1,6 +1,6 @@ //! Context-aware network utilities. //! Built on top of `tokio::net`. -use crate::{ctx}; +use crate::ctx; pub mod tcp; @@ -11,26 +11,34 @@ mod tests; /// NOT VALIDATED, validation happens at `Host::resolve()` call. // TODO: for better type safety consider verifying host to be in the valid // format in constructor. -#[derive(Debug,Clone,PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct Host(pub String); impl From for Host { - fn from(addr: std::net::SocketAddr) -> Self { Self(addr.to_string()) } + fn from(addr: std::net::SocketAddr) -> Self { + Self(addr.to_string()) + } } impl Host { /// If host is of the form ":", performs DNS resolution. - /// If host is of the form ":", just parses the SocketAddr. - pub async fn resolve(&self, ctx: &ctx::Ctx) -> ctx::OrCanceled>> { + /// If host is of the form ":", just parses the SocketAddr. + pub async fn resolve( + &self, + ctx: &ctx::Ctx, + ) -> ctx::OrCanceled>> { let host = self.0.clone(); // Note that we may orphan a task executing the underlying `getnameinfo` call // if the ctx gets cancelled. This should be fine given that it is expected to finish // after a timeout and it doesn't affect the state of the application. // We don't use `tokio::net::lookup_host`, because it is not documented to be cancel-safe. - Ok(ctx.wait(tokio::task::spawn_blocking(move || { - // This should never panic, so unwrapping the task result is ok. - Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) - })).await?.unwrap()) + Ok(ctx + .wait(tokio::task::spawn_blocking(move || { + // This should never panic, so unwrapping the task result is ok. + Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) + })) + .await? + .unwrap()) } /*/// Assumes host to be of the form ":" and parses it. diff --git a/node/libs/concurrency/src/net/tests.rs b/node/libs/concurrency/src/net/tests.rs index 6b16a39d..17f2dc4b 100644 --- a/node/libs/concurrency/src/net/tests.rs +++ b/node/libs/concurrency/src/net/tests.rs @@ -3,26 +3,34 @@ use super::*; #[tokio::test] async fn test_resolve() { let ctx = &ctx::test_root(&ctx::RealClock); - let addrs = Host("localhost:1234".into()).resolve(ctx).await.unwrap().unwrap(); + let addrs = Host("localhost:1234".into()) + .resolve(ctx) + .await + .unwrap() + .unwrap(); // We assume here that loopback host "localhost" is always configured here. // If that turns out to be problematic we should use a more robust DNS resulution // library, so that we can run our own DNS server in tests. - assert!(addrs.len()>0); + assert!(!addrs.is_empty()); for addr in addrs { // Verify the port and that the returned address is actually a loopback. - assert_eq!(addr.port(),1234); + assert_eq!(addr.port(), 1234); assert!(addr.ip().is_loopback()); // Verify that resolving the serialized address returns the same address. let got = Host(addr.to_string()).resolve(ctx).await.unwrap().unwrap(); - assert!(got.len()==1); - assert_eq!(got[0],addr); + assert!(got.len() == 1); + assert_eq!(got[0], addr); } // Host without port should not resolve. - assert!(Host("localhost".into()).resolve(ctx).await.unwrap().is_err()); + assert!(Host("localhost".into()) + .resolve(ctx) + .await + .unwrap() + .is_err()); // Invalid host name should not resolve. assert!(Host("$#$:6746".into()).resolve(ctx).await.unwrap().is_err()); // Empty host name should not resolve. - assert!(Host("".into()).resolve(ctx).await.unwrap().is_err()); + assert!(Host(String::new()).resolve(ctx).await.unwrap().is_err()); } diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index f8863a03..c5d2557e 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -1,10 +1,8 @@ //! Deployer for the kubernetes cluster. use clap::Parser; use std::collections::HashMap; -use zksync_consensus_roles::node::SecretKey; -use zksync_consensus_roles::validator; -use zksync_consensus_tools::k8s::ConsensusNode; -use zksync_consensus_tools::{k8s, AppConfig}; +use zksync_consensus_roles::{node::SecretKey, validator}; +use zksync_consensus_tools::{k8s, k8s::ConsensusNode, AppConfig}; /// Command line arguments. #[derive(Debug, Parser)] diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 08faec36..944c39b5 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -9,7 +9,7 @@ use std::{ path::{Path, PathBuf}, str::FromStr, }; -use zksync_concurrency::{ctx,net}; +use zksync_concurrency::{ctx, net}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index b98faba5..7f272131 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -130,7 +130,10 @@ async fn main() -> anyhow::Result<()> { let mut configs = args.config_args().load().context("config_args().load()")?; // if `PUBLIC_ADDR` env var is set, use it to override publicAddr in config - configs.app.try_load_public_addr().context("Public Address")?; + configs + .app + .try_load_public_addr() + .context("Public Address")?; let (executor, runner) = configs .make_executor(ctx) From 9bb51064e2d05922294dd7339b6a783eb793581b Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 10:14:32 +0100 Subject: [PATCH 08/15] typos --- node/actors/network/src/rpc/mod.rs | 2 +- node/libs/concurrency/src/net/tests.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 7028987b..1e58ad17 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -120,7 +120,7 @@ pub(crate) struct Client { impl Client { /// Constructs a new client. // TODO(gprusak): at this point we don't need the clients to be reusable, - // so perhaps they should be constructed by `Servive::add_client` instead? + // so perhaps they should be constructed by `Service::add_client` instead? pub(crate) fn new(ctx: &ctx::Ctx, rate: limiter::Rate) -> Self { Client { limiter: limiter::Limiter::new(ctx, rate), diff --git a/node/libs/concurrency/src/net/tests.rs b/node/libs/concurrency/src/net/tests.rs index 17f2dc4b..69c7a1e7 100644 --- a/node/libs/concurrency/src/net/tests.rs +++ b/node/libs/concurrency/src/net/tests.rs @@ -9,7 +9,7 @@ async fn test_resolve() { .unwrap() .unwrap(); // We assume here that loopback host "localhost" is always configured here. - // If that turns out to be problematic we should use a more robust DNS resulution + // If that turns out to be problematic we should use a more robust DNS resolution // library, so that we can run our own DNS server in tests. assert!(!addrs.is_empty()); for addr in addrs { From 1c16675da85207908efe012f9a97f0070a08b143 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 14:20:44 +0100 Subject: [PATCH 09/15] randomization wip --- node/Cargo.lock | 3 + node/Cargo.toml | 1 + node/libs/protobuf/Cargo.toml | 4 +- node/libs/protobuf/src/testonly/gen_value.rs | 26 ----- node/libs/protobuf/src/testonly/mod.rs | 101 +++++++++++++++++-- node/libs/utils/Cargo.toml | 3 +- node/libs/utils/src/lib.rs | 3 + node/tools/src/tests.rs | 29 ++---- 8 files changed, 118 insertions(+), 52 deletions(-) diff --git a/node/Cargo.lock b/node/Cargo.lock index de720d3b..aec35e5e 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -3894,6 +3894,7 @@ dependencies = [ name = "zksync_consensus_utils" version = "0.1.0" dependencies = [ + "rand 0.8.5", "thiserror", "zksync_concurrency", ] @@ -3911,10 +3912,12 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "serde_yaml", "tokio", "tracing", "tracing-subscriber", "zksync_concurrency", + "zksync_consensus_utils", "zksync_protobuf_build", ] diff --git a/node/Cargo.toml b/node/Cargo.toml index 3f9f8496..1daf85f2 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -73,6 +73,7 @@ rand04 = { package = "rand", version = "0.4" } rocksdb = "0.21.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.95" +serde_yaml = "0.9" sha3 = "0.10.8" snow = "0.9.3" syn = "2.0.17" diff --git a/node/libs/protobuf/Cargo.toml b/node/libs/protobuf/Cargo.toml index e7302137..889b531c 100644 --- a/node/libs/protobuf/Cargo.toml +++ b/node/libs/protobuf/Cargo.toml @@ -9,6 +9,7 @@ links = "zksync_protobuf_proto" [dependencies] zksync_concurrency.workspace = true +zksync_consensus_utils.workspace = true anyhow.workspace = true bit-vec.workspace = true @@ -19,6 +20,7 @@ quick-protobuf.workspace = true rand.workspace = true serde.workspace = true serde_json.workspace = true +serde_yaml.workspace = true [dev-dependencies] tokio.workspace = true @@ -29,4 +31,4 @@ tracing-subscriber.workspace = true zksync_protobuf_build.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/node/libs/protobuf/src/testonly/gen_value.rs b/node/libs/protobuf/src/testonly/gen_value.rs index 4c091e37..8b137891 100644 --- a/node/libs/protobuf/src/testonly/gen_value.rs +++ b/node/libs/protobuf/src/testonly/gen_value.rs @@ -1,27 +1 @@ -//! Util types for generating random values in tests. -use rand::Rng; - -/// Generator of random values. -pub struct GenValue<'a, R: Rng> { - /// Underlying RNG. - pub rng: &'a mut R, - /// Generate values with only required fields. - pub required_only: bool, - /// Generate decimal fractions for f64 - /// to avoid rounding errors of decimal encodings. - pub decimal_fractions: bool, -} - -impl<'a, R: Rng> GenValue<'a, R> { - /// Generates a random value of type `C`. - pub fn gen(&mut self) -> C { - C::sample(self) - } -} - -/// Types that can be used to generate a random instance. -pub trait RandomValue { - /// Generates a random value. - fn sample(g: &mut GenValue) -> Self; -} diff --git a/node/libs/protobuf/src/testonly/mod.rs b/node/libs/protobuf/src/testonly/mod.rs index e86a6479..2853d804 100644 --- a/node/libs/protobuf/src/testonly/mod.rs +++ b/node/libs/protobuf/src/testonly/mod.rs @@ -1,10 +1,8 @@ //! Testonly utilities. - -pub mod gen_value; -use super::{canonical, canonical_raw, decode, encode, read_fields, ProtoFmt, Wire}; -pub use gen_value::*; +use zksync_consensus_utils::EncodeDist; +use super::{canonical, canonical_raw, decode, encode, read_fields, ProtoFmt, Wire, ProtoRepr}; use prost::Message as _; -use prost_reflect::ReflectMessage as _; +use prost_reflect::ReflectMessage; use rand::{ distributions::{Distribution, Standard}, Rng, @@ -26,8 +24,9 @@ pub fn test_encode(rng: &mut } } + /// Syntax sugar for `test_encode`, -/// because `test_encode(rng,&rng::gen())` doesn't compile. +/// because `test_encode(rng,&rng.gen())` doesn't compile. #[track_caller] pub fn test_encode_random(rng: &mut impl Rng) where @@ -90,3 +89,93 @@ pub(crate) fn encode_shuffled(rng: &mut R, x: &T) -> Vec anyhow::Result; + /// build. + fn build(this: &Self::Type) -> Self::Proto; +} + +fn encode_proto(msg: &X::Type) -> Vec { + let msg = X::build(msg); + canonical_raw(&msg.encode_to_vec(), &msg.descriptor()).unwrap() +} + +fn decode_proto(bytes: &[u8]) -> anyhow::Result { + X::read(&X::Proto::decode(bytes)?) +} + +fn encode_json(msg: &X::Type) -> String { + let mut s = serde_json::Serializer::pretty(vec![]); + crate::serde::serialize_proto(&X::build(msg), &mut s).unwrap(); + String::from_utf8(s.into_inner()).unwrap() +} + +fn decode_json(json: &str) -> anyhow::Result { + let mut d = serde_json::Deserializer::from_str(json); + X::read(&crate::serde::deserialize_proto(&mut d)?) +} + +fn encode_yaml(msg: &X::Type) -> String { + let mut s = serde_yaml::Serializer::new(vec![]); + crate::serde::serialize_proto(&X::build(msg), &mut s).unwrap(); + String::from_utf8(s.into_inner().unwrap()).unwrap() +} + +fn decode_yaml(yaml: &str) -> anyhow::Result { + let d = serde_yaml::Deserializer::from_str(yaml); + X::read(&crate::serde::deserialize_proto(d)?) +} + +/// Wrapper for `ProtoRepr`, implementing ProtoConv; +pub struct ReprConv(std::marker::PhantomData

); +/// Wrapper for `ProtoFmt`, implementing ProtoConv; +pub struct FmtConv(std::marker::PhantomData); + +impl ProtoConv for FmtConv { + type Type = T; + type Proto = T::Proto; + fn read(r: &T::Proto) -> anyhow::Result { + ProtoFmt::read(r) + } + fn build(this: &T) -> T::Proto { + ProtoFmt::build(this) + } +} + +impl ProtoConv for ReprConv

{ + type Type = P::Type; + type Proto = P; + fn read(r: &P) -> anyhow::Result { + ProtoRepr::read(r) + } + fn build(this: &P::Type) -> P { + ProtoRepr::build(this) + } +} + +/// Test reencoding random values in various formats. +#[track_caller] +pub fn test_encode_all_formats(rng: &mut impl Rng) +where + X::Type: std::fmt::Debug + PartialEq, + EncodeDist: Distribution, +{ + for required_only in [false, true] { + let want: X::Type = EncodeDist { required_only, decimal_fractions: false }.sample(rng); + let got = decode_proto::(&encode_proto::(&want)).unwrap(); + assert_eq!(&want, &got, "binary encoding"); + let got = decode_yaml::(&encode_yaml::(&want)).unwrap(); + assert_eq!(&want, &got, "yaml encoding"); + + let want: X::Type = EncodeDist { required_only, decimal_fractions: true }.sample(rng); + let got = decode_json::(&encode_json::(&want)).unwrap(); + assert_eq!(&want, &got, "json encoding"); + } +} diff --git a/node/libs/utils/Cargo.toml b/node/libs/utils/Cargo.toml index 07ac636b..b6a0d6a9 100644 --- a/node/libs/utils/Cargo.toml +++ b/node/libs/utils/Cargo.toml @@ -9,7 +9,8 @@ license.workspace = true [dependencies] zksync_concurrency.workspace = true +rand.workspace = true thiserror.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/node/libs/utils/src/lib.rs b/node/libs/utils/src/lib.rs index 31e43ba1..65dcb369 100644 --- a/node/libs/utils/src/lib.rs +++ b/node/libs/utils/src/lib.rs @@ -2,3 +2,6 @@ pub mod enum_util; pub mod pipe; +pub mod encode; + +pub use encode::*; diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index f8fe9e26..4d3694f2 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -1,34 +1,27 @@ use crate::{store, AppConfig}; use rand::{ - distributions::{Distribution, Standard}, + distributions::{Distribution}, Rng, }; use tempfile::TempDir; use zksync_concurrency::ctx; -use zksync_consensus_roles::{node, validator::testonly::Setup}; +use zksync_consensus_roles::{validator::testonly::Setup}; use zksync_consensus_storage::{testonly, PersistentBlockStore}; -use zksync_protobuf::testonly::test_encode_random; +use zksync_protobuf::testonly::{FmtConv,test_encode_all_formats}; +use zksync_consensus_utils::EncodeDist; -fn make_addr(rng: &mut R) -> std::net::SocketAddr { - std::net::SocketAddr::new(std::net::IpAddr::from(rng.gen::<[u8; 16]>()), rng.gen()) -} - -impl Distribution for Standard { +impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> AppConfig { AppConfig { - server_addr: make_addr(rng), - public_addr: make_addr(rng).into(), - metrics_server_addr: Some(make_addr(rng)), + server_addr: self.sample(rng), + public_addr: self.sample(rng), + metrics_server_addr: Some(self.sample(rng)), genesis: rng.gen(), gossip_dynamic_inbound_limit: rng.gen(), - gossip_static_inbound: (0..5) - .map(|_| rng.gen::().public()) - .collect(), - gossip_static_outbound: (0..6) - .map(|_| (rng.gen::().public(), make_addr(rng).into())) - .collect(), + gossip_static_inbound: self.sample_range(rng).map(|_|rng.gen()).collect(), + gossip_static_outbound: self.sample_range(rng).map(|_|(rng.gen(),self.sample(rng))).collect(), max_payload_size: rng.gen(), } } @@ -38,7 +31,7 @@ impl Distribution for Standard { fn test_schema_encoding() { let ctx = ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - test_encode_random::(rng); + test_encode_all_formats::>(rng); } #[tokio::test] From f19b230385ab5c859690a849633a2865936af468 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 14:33:22 +0100 Subject: [PATCH 10/15] updated test limits --- node/actors/bft/src/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index e6bb83b5..c73cf233 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -3,7 +3,7 @@ use zksync_concurrency::{ctx, scope, time}; use zksync_consensus_roles::validator; async fn run_test(behavior: Behavior, network: Network) { - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20)); + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); @@ -15,7 +15,7 @@ async fn run_test(behavior: Behavior, network: Network) { Test { network, nodes, - blocks_to_finalize: 15, + blocks_to_finalize: 10, } .run(ctx) .await From c59f57fe48e030a85c89d288c72d006e70b097c7 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 14:38:54 +0100 Subject: [PATCH 11/15] stuff --- node/libs/protobuf/src/testonly/mod.rs | 19 +++++++++++++------ node/libs/utils/src/lib.rs | 2 +- node/tools/src/tests.rs | 16 ++++++++-------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/node/libs/protobuf/src/testonly/mod.rs b/node/libs/protobuf/src/testonly/mod.rs index 2853d804..051db885 100644 --- a/node/libs/protobuf/src/testonly/mod.rs +++ b/node/libs/protobuf/src/testonly/mod.rs @@ -1,12 +1,12 @@ //! Testonly utilities. -use zksync_consensus_utils::EncodeDist; -use super::{canonical, canonical_raw, decode, encode, read_fields, ProtoFmt, Wire, ProtoRepr}; +use super::{canonical, canonical_raw, decode, encode, read_fields, ProtoFmt, ProtoRepr, Wire}; use prost::Message as _; use prost_reflect::ReflectMessage; use rand::{ distributions::{Distribution, Standard}, Rng, }; +use zksync_consensus_utils::EncodeDist; /// Test encoding and canonical encoding properties. #[track_caller] @@ -24,7 +24,6 @@ pub fn test_encode(rng: &mut } } - /// Syntax sugar for `test_encode`, /// because `test_encode(rng,&rng.gen())` doesn't compile. #[track_caller] @@ -165,16 +164,24 @@ impl ProtoConv for ReprConv

{ pub fn test_encode_all_formats(rng: &mut impl Rng) where X::Type: std::fmt::Debug + PartialEq, - EncodeDist: Distribution, + EncodeDist: Distribution, { for required_only in [false, true] { - let want: X::Type = EncodeDist { required_only, decimal_fractions: false }.sample(rng); + let want: X::Type = EncodeDist { + required_only, + decimal_fractions: false, + } + .sample(rng); let got = decode_proto::(&encode_proto::(&want)).unwrap(); assert_eq!(&want, &got, "binary encoding"); let got = decode_yaml::(&encode_yaml::(&want)).unwrap(); assert_eq!(&want, &got, "yaml encoding"); - let want: X::Type = EncodeDist { required_only, decimal_fractions: true }.sample(rng); + let want: X::Type = EncodeDist { + required_only, + decimal_fractions: true, + } + .sample(rng); let got = decode_json::(&encode_json::(&want)).unwrap(); assert_eq!(&want, &got, "json encoding"); } diff --git a/node/libs/utils/src/lib.rs b/node/libs/utils/src/lib.rs index 65dcb369..7cff4361 100644 --- a/node/libs/utils/src/lib.rs +++ b/node/libs/utils/src/lib.rs @@ -2,6 +2,6 @@ pub mod enum_util; pub mod pipe; -pub mod encode; +mod encode; pub use encode::*; diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index 4d3694f2..95f6c489 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -1,14 +1,11 @@ use crate::{store, AppConfig}; -use rand::{ - distributions::{Distribution}, - Rng, -}; +use rand::{distributions::Distribution, Rng}; use tempfile::TempDir; use zksync_concurrency::ctx; -use zksync_consensus_roles::{validator::testonly::Setup}; +use zksync_consensus_roles::validator::testonly::Setup; use zksync_consensus_storage::{testonly, PersistentBlockStore}; -use zksync_protobuf::testonly::{FmtConv,test_encode_all_formats}; use zksync_consensus_utils::EncodeDist; +use zksync_protobuf::testonly::{test_encode_all_formats, FmtConv}; impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> AppConfig { @@ -20,8 +17,11 @@ impl Distribution for EncodeDist { genesis: rng.gen(), gossip_dynamic_inbound_limit: rng.gen(), - gossip_static_inbound: self.sample_range(rng).map(|_|rng.gen()).collect(), - gossip_static_outbound: self.sample_range(rng).map(|_|(rng.gen(),self.sample(rng))).collect(), + gossip_static_inbound: self.sample_range(rng).map(|_| rng.gen()).collect(), + gossip_static_outbound: self + .sample_range(rng) + .map(|_| (rng.gen(), self.sample(rng))) + .collect(), max_payload_size: rng.gen(), } } From 0d0a762a047c4173c9ccfcc78fab2c0d263e5ef7 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 14:48:39 +0100 Subject: [PATCH 12/15] retrieved encode.rs --- node/libs/utils/src/encode.rs | 66 +++++++++++++++++++++++++++++++++++ node/libs/utils/src/lib.rs | 2 +- 2 files changed, 67 insertions(+), 1 deletion(-) create mode 100644 node/libs/utils/src/encode.rs diff --git a/node/libs/utils/src/encode.rs b/node/libs/utils/src/encode.rs new file mode 100644 index 00000000..8f691790 --- /dev/null +++ b/node/libs/utils/src/encode.rs @@ -0,0 +1,66 @@ +//! Utilites for testing encodings. +use rand::{ + distributions::{Alphanumeric, DistString, Distribution}, + Rng, +}; +use zksync_concurrency::net; + +/// Distribution for testing encodings. +pub struct EncodeDist { + /// Generate configs with only required fields. + pub required_only: bool, + /// Generate decimal fractions for f64 + /// to avoid rounding errors of decimal encodings. + pub decimal_fractions: bool, +} + +impl EncodeDist { + /// Returns a small non-empty range if `required_only` is false. + /// Returns an empty range otherwise. + pub fn sample_range(&self, rng: &mut (impl Rng + ?Sized)) -> std::ops::Range { + if self.required_only { + 0..0 + } else { + 0..rng.gen_range(5..10) + } + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> std::net::SocketAddr { + std::net::SocketAddr::new(std::net::IpAddr::from(rng.gen::<[u8; 16]>()), rng.gen()) + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> net::Host { + Distribution::::sample(self, rng).into() + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> String { + let n = self.sample_range(rng).len(); + Alphanumeric.sample_string(rng, n) + } +} + +impl Distribution> for EncodeDist +where + EncodeDist: Distribution, +{ + fn sample(&self, rng: &mut R) -> Option { + self.sample_range(rng).map(|_| self.sample(rng)).next() + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> f64 { + if self.decimal_fractions { + const PRECISION: usize = 1000000; + #[allow(clippy::float_arithmetic)] + return rng.gen_range(0..PRECISION) as f64 / PRECISION as f64; + } + rng.gen() + } +} diff --git a/node/libs/utils/src/lib.rs b/node/libs/utils/src/lib.rs index 7cff4361..71fc6af1 100644 --- a/node/libs/utils/src/lib.rs +++ b/node/libs/utils/src/lib.rs @@ -1,7 +1,7 @@ //! Crate that holds several small utilities and primitives. +mod encode; pub mod enum_util; pub mod pipe; -mod encode; pub use encode::*; From d8fe5374126fa1976953cba1cee1874e90a8b5d0 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 16:37:05 +0100 Subject: [PATCH 13/15] sample_opt --- node/libs/utils/src/encode.rs | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/node/libs/utils/src/encode.rs b/node/libs/utils/src/encode.rs index 8f691790..ad51d2ac 100644 --- a/node/libs/utils/src/encode.rs +++ b/node/libs/utils/src/encode.rs @@ -18,11 +18,13 @@ impl EncodeDist { /// Returns a small non-empty range if `required_only` is false. /// Returns an empty range otherwise. pub fn sample_range(&self, rng: &mut (impl Rng + ?Sized)) -> std::ops::Range { - if self.required_only { - 0..0 - } else { - 0..rng.gen_range(5..10) - } + if self.required_only { 0..0 } else { 0..rng.gen_range(5..10) } + } + + /// Returns `Some(f())` if `required_only` is false. + /// Returns `None otherwise. + pub fn sample_opt(&self, f: impl FnOnce()->T) -> Option { + if self.required_only { None } else { Some(f()) } } } @@ -45,15 +47,6 @@ impl Distribution for EncodeDist { } } -impl Distribution> for EncodeDist -where - EncodeDist: Distribution, -{ - fn sample(&self, rng: &mut R) -> Option { - self.sample_range(rng).map(|_| self.sample(rng)).next() - } -} - impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> f64 { if self.decimal_fractions { From 6e0d98147cdcb3d9bcb2b3b64c7fabbd9da112c7 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 17:22:06 +0100 Subject: [PATCH 14/15] more sampling methods --- node/libs/utils/src/encode.rs | 70 +++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/node/libs/utils/src/encode.rs b/node/libs/utils/src/encode.rs index ad51d2ac..7eae9855 100644 --- a/node/libs/utils/src/encode.rs +++ b/node/libs/utils/src/encode.rs @@ -18,13 +18,41 @@ impl EncodeDist { /// Returns a small non-empty range if `required_only` is false. /// Returns an empty range otherwise. pub fn sample_range(&self, rng: &mut (impl Rng + ?Sized)) -> std::ops::Range { - if self.required_only { 0..0 } else { 0..rng.gen_range(5..10) } + if self.required_only { + 0..0 + } else { + 0..rng.gen_range(5..10) + } } /// Returns `Some(f())` if `required_only` is false. /// Returns `None otherwise. - pub fn sample_opt(&self, f: impl FnOnce()->T) -> Option { - if self.required_only { None } else { Some(f()) } + pub fn sample_opt(&self, f: impl FnOnce() -> T) -> Option { + if self.required_only { + None + } else { + Some(f()) + } + } + + /// Samples a collection of type T. + pub fn sample_collect>( + &self, + rng: &mut (impl Rng + ?Sized), + ) -> T + where + EncodeDist: Distribution, + { + self.sample_range(rng).map(|_| self.sample(rng)).collect() + } +} + +impl Distribution> for EncodeDist +where + EncodeDist: Distribution, +{ + fn sample(&self, rng: &mut R) -> Option { + self.sample_opt(|| self.sample(rng)) } } @@ -47,6 +75,42 @@ impl Distribution for EncodeDist { } } +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> bool { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u8 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u16 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u32 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u64 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> usize { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> std::num::NonZeroU32 { + rng.gen() + } +} + impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> f64 { if self.decimal_fractions { From 8c64769fb7786483b3cc2f1c98637c3bae964100 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 27 Mar 2024 17:33:50 +0100 Subject: [PATCH 15/15] applied comments --- node/libs/concurrency/src/net/mod.rs | 5 ----- node/libs/protobuf/src/testonly/gen_value.rs | 1 - node/libs/utils/src/encode.rs | 2 +- 3 files changed, 1 insertion(+), 7 deletions(-) delete mode 100644 node/libs/protobuf/src/testonly/gen_value.rs diff --git a/node/libs/concurrency/src/net/mod.rs b/node/libs/concurrency/src/net/mod.rs index a5c04a37..8a507737 100644 --- a/node/libs/concurrency/src/net/mod.rs +++ b/node/libs/concurrency/src/net/mod.rs @@ -40,9 +40,4 @@ impl Host { .await? .unwrap()) } - - /*/// Assumes host to be of the form ":" and parses it. - pub fn parse_socket_addr(&self) -> Result { - self.0.parse() - }*/ } diff --git a/node/libs/protobuf/src/testonly/gen_value.rs b/node/libs/protobuf/src/testonly/gen_value.rs deleted file mode 100644 index 8b137891..00000000 --- a/node/libs/protobuf/src/testonly/gen_value.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/node/libs/utils/src/encode.rs b/node/libs/utils/src/encode.rs index 7eae9855..621c017f 100644 --- a/node/libs/utils/src/encode.rs +++ b/node/libs/utils/src/encode.rs @@ -1,4 +1,4 @@ -//! Utilites for testing encodings. +//! Utilities for testing encodings. use rand::{ distributions::{Alphanumeric, DistString, Distribution}, Rng,