diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 1b41bba6..621acdda 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -2,11 +2,8 @@ //! BFT consensus messages are exchanged over this network. use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc}; use anyhow::Context as _; -use std::{ - collections::{HashSet}, - sync::Arc, -}; use rand::seq::SliceRandom; +use std::{collections::HashSet, sync::Arc}; use zksync_concurrency::{ctx, oneshot, scope, sync, time}; use zksync_consensus_roles::validator; use zksync_protobuf::kB; @@ -22,7 +19,11 @@ const RESP_MAX_SIZE: usize = kB; /// is down. const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10); +/// Outbound connection state. pub(crate) struct Connection { + /// Peer's address. + /// This is not used for now, but will be required for the debug page. + #[allow(dead_code)] addr: std::net::SocketAddr, consensus: rpc::Client, } @@ -106,7 +107,9 @@ impl Network { msg: validator::Signed, ) -> anyhow::Result<()> { let outbound = self.outbound.current(); - outbound.get(key).context("not an active validator")? + outbound + .get(key) + .context("not an active validator")? .consensus .call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE) .await?; @@ -122,7 +125,7 @@ impl Network { ) -> anyhow::Result<()> { let peer = handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?; - self.inbound.insert(peer.clone(),()).await?; + self.inbound.insert(peer.clone(), ()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) @@ -162,7 +165,7 @@ impl Network { addr, consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate), }); - self.outbound.insert(peer.clone(),conn.clone()).await?; + self.outbound.insert(peer.clone(), conn.clone()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) @@ -180,11 +183,19 @@ impl Network { // Note that this is executed only for outbound end of the loopback connection. // Inbound end doesn't know the public address of itself. if peer == &self.key.public() { - let mut sub = self.gossip.validator_addrs.subscribe(); s.spawn(async { + let mut sub = self.gossip.validator_addrs.subscribe(); while ctx.is_active() { - self.gossip.validator_addrs.announce(&self.key,addr,ctx.now_utc()).await; - let _ = sync::wait_for(&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), &mut sub, |got| got.get(peer).map(|x| x.msg.addr) != addr).await; + self.gossip + .validator_addrs + .announce(&self.key, addr, ctx.now_utc()) + .await; + let _ = sync::wait_for( + &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), + &mut sub, + |got| got.get(peer).map(|x| x.msg.addr) != Some(addr), + ) + .await; } Ok(()) }); @@ -198,9 +209,19 @@ impl Network { } async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let addr = *self.gossip.cfg.public_addr.resolve(ctx).await?.context("resolve()")? - .choose(&mut ctx.rng()).with_context(||format!("{:?} resolved to no addresses",self.gossip.cfg.public_addr))?; - self.run_outbound_stream(ctx, &self.key.public(), addr).await + let addr = *self + .gossip + .cfg + .public_addr + .resolve(ctx) + .await? + .context("resolve()")? + .choose(&mut ctx.rng()) + .with_context(|| { + format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr) + })?; + self.run_outbound_stream(ctx, &self.key.public(), addr) + .await } /// Maintains a connection to the given validator. diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 74a69c19..2b88dbb2 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,12 +12,7 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -use crate::{ - gossip::{ValidatorAddrsWatch}, - io, - pool::PoolWatch, - rpc, Config, -}; +use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, rpc, Config}; use anyhow::Context as _; use std::sync::{atomic::AtomicUsize, Arc}; @@ -33,7 +28,9 @@ use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; +/// State of the gossip connection. pub(crate) struct Connection { + /// `get_block` rpc client. pub(crate) get_block: rpc::Client, } @@ -42,9 +39,9 @@ pub(crate) struct Network { /// Gossip network configuration. pub(crate) cfg: Config, /// Currently open inbound connections. - pub(crate) inbound: PoolWatch>, + pub(crate) inbound: PoolWatch>, /// Currently open outbound connections. - pub(crate) outbound: PoolWatch>, + pub(crate) outbound: PoolWatch>, /// Current state of knowledge about validators' endpoints. pub(crate) validator_addrs: ValidatorAddrsWatch, /// Block store to serve `get_block` requests from. @@ -90,10 +87,16 @@ impl Network { ) -> anyhow::Result> { let outbound = self.outbound.current(); let inbound = self.inbound.current(); - Ok(outbound.get(recipient).or(inbound.get(recipient)) + Ok(outbound + .get(recipient) + .or(inbound.get(recipient)) .context("recipient is unreachable")? .get_block - .call(ctx, &rpc::get_block::Req(number), self.cfg.max_block_size.saturating_add(kB)) + .call( + ctx, + &rpc::get_block::Req(number), + self.cfg.max_block_size.saturating_add(kB), + ) .await? .0) } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 3cc4c445..e3568b13 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -1,13 +1,13 @@ -use super::{handshake, Network, ValidatorAddrs, Connection}; +use super::{handshake, Connection, Network, ValidatorAddrs}; use crate::{io, noise, preface, rpc}; +use anyhow::Context as _; use async_trait::async_trait; +use rand::seq::SliceRandom; use std::sync::{atomic::Ordering, Arc}; -use zksync_concurrency::{net, ctx, oneshot, scope, sync}; +use zksync_concurrency::{ctx, net, oneshot, scope, sync}; use zksync_consensus_roles::node; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; -use rand::seq::SliceRandom; -use anyhow::Context as _; struct PushValidatorAddrsServer<'a>(&'a Network); @@ -159,13 +159,14 @@ impl Network { ctx: &ctx::Ctx, mut stream: noise::Stream, ) -> anyhow::Result<()> { - let peer = handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; + let peer = + handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; tracing::Span::current().record("peer", tracing::field::debug(&peer)); let conn = Arc::new(Connection { get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), }); - self.inbound.insert(peer.clone(),conn.clone()).await?; - let res = self.run_stream(ctx, &peer, stream, &*conn).await; + self.inbound.insert(peer.clone(), conn.clone()).await?; + let res = self.run_stream(ctx, &peer, stream, &conn).await; self.inbound.remove(&peer).await; res } @@ -177,8 +178,12 @@ impl Network { peer: &node::PublicKey, addr: net::Host, ) -> anyhow::Result<()> { - let addr = *addr.resolve(ctx).await?.context("resolve()")? - .choose(&mut ctx.rng()).with_context(||"{addr:?} resolved to empty address set")?; + let addr = *addr + .resolve(ctx) + .await? + .context("resolve()")? + .choose(&mut ctx.rng()) + .with_context(|| "{addr:?} resolved to empty address set")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet).await?; handshake::outbound( ctx, @@ -191,8 +196,8 @@ impl Network { let conn = Arc::new(Connection { get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), }); - self.outbound.insert(peer.clone(),conn.clone()).await?; - let res = self.run_stream(ctx, peer, stream, &*conn).await; + self.outbound.insert(peer.clone(), conn.clone()).await?; + let res = self.run_stream(ctx, peer, stream, &conn).await; self.outbound.remove(peer).await; res } diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 44839a2a..85e4d0d1 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -571,7 +571,10 @@ async fn validator_node_restart() { tracing::info!("wait for the update to arrive to node1"); let sub = &mut node1.net.gossip.validator_addrs.subscribe(); let want = Some(*cfgs[0].server_addr); - sync::wait_for(ctx, sub, |got| got.get(&setup.keys[0].public()).map(|x|x.msg.addr) == want).await?; + sync::wait_for(ctx, sub, |got| { + got.get(&setup.keys[0].public()).map(|x| x.msg.addr) == want + }) + .await?; Ok(()) }) .await?; diff --git a/node/actors/network/src/gossip/validator_addrs.rs b/node/actors/network/src/gossip/validator_addrs.rs index 09f3602a..8fd21b63 100644 --- a/node/actors/network/src/gossip/validator_addrs.rs +++ b/node/actors/network/src/gossip/validator_addrs.rs @@ -1,7 +1,7 @@ //! Global state distributed by active validators, observed by all the nodes in the network. use crate::watch::Watch; use std::{collections::HashSet, sync::Arc}; -use zksync_concurrency::{time,sync}; +use zksync_concurrency::{sync, time}; use zksync_consensus_roles::validator; /// Mapping from validator::PublicKey to a signed validator::NetAddress. @@ -90,6 +90,7 @@ impl ValidatorAddrsWatch { self.0.subscribe() } + /// Inserts a new version of the announcement signed with the given key. pub(crate) async fn announce( &self, key: &validator::SecretKey, @@ -98,7 +99,10 @@ impl ValidatorAddrsWatch { ) { let this = self.0.lock().await; let mut validator_addrs = this.borrow().clone(); - let version = validator_addrs.get(&key.public()).map(|x| x.msg.version + 1).unwrap_or(0); + let version = validator_addrs + .get(&key.public()) + .map(|x| x.msg.version + 1) + .unwrap_or(0); let d = Arc::new(key.sign_msg(validator::NetAddress { addr, version, diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index e8315e45..c8d3e70d 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -136,8 +136,11 @@ impl Runner { for (peer, addr) in &self.net.gossip.cfg.gossip.static_outbound { s.spawn(async { loop { - let run_result = - self.net.gossip.run_outbound_stream(ctx, peer, addr.clone()).await; + let run_result = self + .net + .gossip + .run_outbound_stream(ctx, peer, addr.clone()) + .await; if let Err(err) = run_result { tracing::info!("gossip.run_outbound_stream(): {err:#}"); } @@ -159,11 +162,6 @@ impl Runner { Ok(()) }); } - // Announce IP periodically. - s.spawn(async { - c.run_address_announcer(ctx).await; - Ok(()) - }); } } diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs index 86fa1cf5..5cf1c52f 100644 --- a/node/actors/network/src/metrics.rs +++ b/node/actors/network/src/metrics.rs @@ -145,15 +145,14 @@ impl NetworkGauges { let register_result = COLLECTOR.before_scrape(move || { state_ref.upgrade().map(|state| { let gauges = NetworkGauges::default(); - let len = state.gossip.inbound.subscribe().borrow().current().len(); + let len = state.gossip.inbound.current().len(); gauges.gossip_inbound_connections.set(len); - let len = state.gossip.outbound.subscribe().borrow().current().len(); + let len = state.gossip.outbound.current().len(); gauges.gossip_outbound_connections.set(len); if let Some(consensus_state) = &state.consensus { - let len = consensus_state.inbound.subscribe().borrow().current().len(); + let len = consensus_state.inbound.current().len(); gauges.consensus_inbound_connections.set(len); - let subscriber = consensus_state.outbound.subscribe(); - let len = subscriber.borrow().current().len(); + let len = consensus_state.outbound.current().len(); gauges.consensus_outbound_connections.set(len); } gauges diff --git a/node/actors/network/src/pool.rs b/node/actors/network/src/pool.rs index 53dba32a..a5e7375f 100644 --- a/node/actors/network/src/pool.rs +++ b/node/actors/network/src/pool.rs @@ -1,31 +1,32 @@ //! An abstraction for a set of "connections" which constraints //! which peers are allowed to connect. use crate::watch::Watch; -use std::collections::{HashSet}; +use std::collections::HashSet; use zksync_concurrency::sync; /// Map with restrictions on the allowed keys. /// This set consists of an arbitrary subset of `allowed` + up to `extra_limit` elements outside of /// `allowed`. #[derive(Clone)] -pub(crate) struct Pool { +pub(crate) struct Pool { extra_limit: usize, extra_count: usize, allowed: HashSet, - current: im::HashMap, + current: im::HashMap, } -impl Pool { - pub(crate) fn current(&self) -> &im::HashMap { +impl Pool { + /// Current pool state. + pub(crate) fn current(&self) -> &im::HashMap { &self.current } } /// Watch wrapper of the Pool. /// Supports subscribing to the Pool membership changes. -pub(crate) struct PoolWatch(Watch>); +pub(crate) struct PoolWatch(Watch>); -impl PoolWatch { +impl PoolWatch { /// Constructs a new pool. pub(crate) fn new(allowed: HashSet, extra_limit: usize) -> Self { Self(Watch::new(Pool { @@ -40,7 +41,7 @@ impl PoolWatch { /// Returns an error if /// * `v` is already in the set /// * `v` cannot be added due to size restrictions - pub(crate) async fn insert(&self, k:K,v:V) -> anyhow::Result<()> { + pub(crate) async fn insert(&self, k: K, v: V) -> anyhow::Result<()> { self.0 .send_if_ok(|pool| { if pool.current.contains_key(&k) { @@ -52,7 +53,7 @@ impl PoolWatch { } pool.extra_count += 1; } - pool.current.insert(k,v); + pool.current.insert(k, v); Ok(()) }) .await @@ -71,13 +72,13 @@ impl PoolWatch { }); } - pub(crate) fn current(&self) -> im::HashMap { + /// Copy of the current pool state. + pub(crate) fn current(&self) -> im::HashMap { self.0.subscribe().borrow().current.clone() } /// Subscribes to the set changes. - #[allow(dead_code)] - pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { + pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { self.0.subscribe() } } diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 184326cd..7028987b 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -119,6 +119,8 @@ pub(crate) struct Client { impl Client { /// Constructs a new client. + // TODO(gprusak): at this point we don't need the clients to be reusable, + // so perhaps they should be constructed by `Servive::add_client` instead? pub(crate) fn new(ctx: &ctx::Ctx, rate: limiter::Rate) -> Self { Client { limiter: limiter::Limiter::new(ctx, rate), diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index d18bdddf..3a55c324 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -130,10 +130,7 @@ impl InstanceRunner { impl Instance { /// Construct an instance for a given config. - pub fn new( - cfg: Config, - block_store: Arc, - ) -> (Self, InstanceRunner) { + pub fn new(cfg: Config, block_store: Arc) -> (Self, InstanceRunner) { let (actor_pipe, dispatcher_pipe) = pipe::new(); let (net, runner) = Network::new(cfg, block_store, actor_pipe); let (terminate_send, terminate_recv) = channel::bounded(1); @@ -186,7 +183,7 @@ impl Instance { .gossip .outbound .subscribe() - .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); } @@ -199,13 +196,13 @@ impl Instance { consensus_state .inbound .subscribe() - .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); consensus_state .outbound .subscribe() - .wait_for(|got| want.iter().all(|k|got.current().contains_key(k))) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); } diff --git a/node/libs/concurrency/src/net/mod.rs b/node/libs/concurrency/src/net/mod.rs index 68d36f96..a5c04a37 100644 --- a/node/libs/concurrency/src/net/mod.rs +++ b/node/libs/concurrency/src/net/mod.rs @@ -1,6 +1,6 @@ //! Context-aware network utilities. //! Built on top of `tokio::net`. -use crate::{ctx}; +use crate::ctx; pub mod tcp; @@ -11,26 +11,34 @@ mod tests; /// NOT VALIDATED, validation happens at `Host::resolve()` call. // TODO: for better type safety consider verifying host to be in the valid // format in constructor. -#[derive(Debug,Clone,PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct Host(pub String); impl From for Host { - fn from(addr: std::net::SocketAddr) -> Self { Self(addr.to_string()) } + fn from(addr: std::net::SocketAddr) -> Self { + Self(addr.to_string()) + } } impl Host { /// If host is of the form ":", performs DNS resolution. - /// If host is of the form ":", just parses the SocketAddr. - pub async fn resolve(&self, ctx: &ctx::Ctx) -> ctx::OrCanceled>> { + /// If host is of the form ":", just parses the SocketAddr. + pub async fn resolve( + &self, + ctx: &ctx::Ctx, + ) -> ctx::OrCanceled>> { let host = self.0.clone(); // Note that we may orphan a task executing the underlying `getnameinfo` call // if the ctx gets cancelled. This should be fine given that it is expected to finish // after a timeout and it doesn't affect the state of the application. // We don't use `tokio::net::lookup_host`, because it is not documented to be cancel-safe. - Ok(ctx.wait(tokio::task::spawn_blocking(move || { - // This should never panic, so unwrapping the task result is ok. - Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) - })).await?.unwrap()) + Ok(ctx + .wait(tokio::task::spawn_blocking(move || { + // This should never panic, so unwrapping the task result is ok. + Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) + })) + .await? + .unwrap()) } /*/// Assumes host to be of the form ":" and parses it. diff --git a/node/libs/concurrency/src/net/tests.rs b/node/libs/concurrency/src/net/tests.rs index 6b16a39d..17f2dc4b 100644 --- a/node/libs/concurrency/src/net/tests.rs +++ b/node/libs/concurrency/src/net/tests.rs @@ -3,26 +3,34 @@ use super::*; #[tokio::test] async fn test_resolve() { let ctx = &ctx::test_root(&ctx::RealClock); - let addrs = Host("localhost:1234".into()).resolve(ctx).await.unwrap().unwrap(); + let addrs = Host("localhost:1234".into()) + .resolve(ctx) + .await + .unwrap() + .unwrap(); // We assume here that loopback host "localhost" is always configured here. // If that turns out to be problematic we should use a more robust DNS resulution // library, so that we can run our own DNS server in tests. - assert!(addrs.len()>0); + assert!(!addrs.is_empty()); for addr in addrs { // Verify the port and that the returned address is actually a loopback. - assert_eq!(addr.port(),1234); + assert_eq!(addr.port(), 1234); assert!(addr.ip().is_loopback()); // Verify that resolving the serialized address returns the same address. let got = Host(addr.to_string()).resolve(ctx).await.unwrap().unwrap(); - assert!(got.len()==1); - assert_eq!(got[0],addr); + assert!(got.len() == 1); + assert_eq!(got[0], addr); } // Host without port should not resolve. - assert!(Host("localhost".into()).resolve(ctx).await.unwrap().is_err()); + assert!(Host("localhost".into()) + .resolve(ctx) + .await + .unwrap() + .is_err()); // Invalid host name should not resolve. assert!(Host("$#$:6746".into()).resolve(ctx).await.unwrap().is_err()); // Empty host name should not resolve. - assert!(Host("".into()).resolve(ctx).await.unwrap().is_err()); + assert!(Host(String::new()).resolve(ctx).await.unwrap().is_err()); } diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index f8863a03..c5d2557e 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -1,10 +1,8 @@ //! Deployer for the kubernetes cluster. use clap::Parser; use std::collections::HashMap; -use zksync_consensus_roles::node::SecretKey; -use zksync_consensus_roles::validator; -use zksync_consensus_tools::k8s::ConsensusNode; -use zksync_consensus_tools::{k8s, AppConfig}; +use zksync_consensus_roles::{node::SecretKey, validator}; +use zksync_consensus_tools::{k8s, k8s::ConsensusNode, AppConfig}; /// Command line arguments. #[derive(Debug, Parser)] diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 08faec36..944c39b5 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -9,7 +9,7 @@ use std::{ path::{Path, PathBuf}, str::FromStr, }; -use zksync_concurrency::{ctx,net}; +use zksync_concurrency::{ctx, net}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index b98faba5..7f272131 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -130,7 +130,10 @@ async fn main() -> anyhow::Result<()> { let mut configs = args.config_args().load().context("config_args().load()")?; // if `PUBLIC_ADDR` env var is set, use it to override publicAddr in config - configs.app.try_load_public_addr().context("Public Address")?; + configs + .app + .try_load_public_addr() + .context("Public Address")?; let (executor, runner) = configs .make_executor(ctx)