diff --git a/node/Cargo.lock b/node/Cargo.lock index de720d3b..aec35e5e 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -3894,6 +3894,7 @@ dependencies = [ name = "zksync_consensus_utils" version = "0.1.0" dependencies = [ + "rand 0.8.5", "thiserror", "zksync_concurrency", ] @@ -3911,10 +3912,12 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "serde_yaml", "tokio", "tracing", "tracing-subscriber", "zksync_concurrency", + "zksync_consensus_utils", "zksync_protobuf_build", ] diff --git a/node/Cargo.toml b/node/Cargo.toml index 3f9f8496..1daf85f2 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -73,6 +73,7 @@ rand04 = { package = "rand", version = "0.4" } rocksdb = "0.21.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.95" +serde_yaml = "0.9" sha3 = "0.10.8" snow = "0.9.3" syn = "2.0.17" diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 693f85ca..af06ea04 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -77,7 +77,6 @@ async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow:: let mut nodes = vec![]; for (i, spec) in specs.iter().enumerate() { let (node, runner) = network::testonly::Instance::new( - ctx, spec.net.clone(), spec.block_store.clone(), ); diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index e6bb83b5..c73cf233 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -3,7 +3,7 @@ use zksync_concurrency::{ctx, scope, time}; use zksync_consensus_roles::validator; async fn run_test(behavior: Behavior, network: Network) { - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20)); + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); @@ -15,7 +15,7 @@ async fn run_test(behavior: Behavior, network: Network) { Test { network, nodes, - blocks_to_finalize: 15, + blocks_to_finalize: 10, } .run(ctx) .await diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 2e0fad85..cec596d8 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -45,7 +45,7 @@ pub struct Config { pub server_addr: std::net::SocketAddr, /// Public TCP address that other nodes are expected to connect to. /// It is announced over gossip network. - pub public_addr: std::net::SocketAddr, + pub public_addr: net::Host, /// Maximal size of the block payload. pub max_payload_size: usize, @@ -59,7 +59,7 @@ pub struct Config { pub gossip_static_inbound: HashSet, /// Outbound connections that the node should actively try to /// establish and maintain. - pub gossip_static_outbound: HashMap, + pub gossip_static_outbound: HashMap, } impl Config { @@ -90,7 +90,7 @@ impl Executor { fn network_config(&self) -> network::Config { network::Config { server_addr: net::tcp::ListenerAddr::new(self.config.server_addr), - public_addr: self.config.public_addr, + public_addr: self.config.public_addr.clone(), gossip: self.config.gossip(), validator_key: self.validator.as_ref().map(|v| v.key.clone()), ping_timeout: Some(time::Duration::seconds(10)), @@ -136,7 +136,6 @@ impl Executor { s.spawn_blocking(|| dispatcher.run(ctx).context("IO Dispatcher stopped")); s.spawn(async { let (net, runner) = network::Network::new( - ctx, network_config, self.block_store.clone(), network_actor_pipe, diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 2b34e7a0..7ee8394d 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -12,7 +12,7 @@ use zksync_consensus_storage::{ fn config(cfg: &network::Config) -> Config { Config { server_addr: *cfg.server_addr, - public_addr: cfg.public_addr, + public_addr: cfg.public_addr.clone(), max_payload_size: usize::MAX, node_key: cfg.gossip.key.clone(), gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index 27d6c498..66e56f51 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -55,7 +55,7 @@ pub struct GossipConfig { pub static_inbound: HashSet, /// Outbound connections that the node should actively try to /// establish and maintain. - pub static_outbound: HashMap, + pub static_outbound: HashMap, } /// Network actor config. @@ -65,7 +65,10 @@ pub struct Config { pub server_addr: net::tcp::ListenerAddr, /// Public TCP address that other nodes are expected to connect to. /// It is announced over gossip network. - pub public_addr: std::net::SocketAddr, + /// In case public_addr is a domain instead of ip, DNS resolution is + /// performed and a loopback connection is established before announcing + /// the IP address over the gossip network. + pub public_addr: net::Host, /// Gossip network config. pub gossip: GossipConfig, /// Private key of the validator. diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 32a90540..621acdda 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -2,10 +2,8 @@ //! BFT consensus messages are exchanged over this network. use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc}; use anyhow::Context as _; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use rand::seq::SliceRandom; +use std::{collections::HashSet, sync::Arc}; use zksync_concurrency::{ctx, oneshot, scope, sync, time}; use zksync_consensus_roles::validator; use zksync_protobuf::kB; @@ -21,6 +19,15 @@ const RESP_MAX_SIZE: usize = kB; /// is down. const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10); +/// Outbound connection state. +pub(crate) struct Connection { + /// Peer's address. + /// This is not used for now, but will be required for the debug page. + #[allow(dead_code)] + addr: std::net::SocketAddr, + consensus: rpc::Client, +} + /// Consensus network state. pub(crate) struct Network { /// Gossip network state to bootstrap consensus network from. @@ -28,11 +35,9 @@ pub(crate) struct Network { /// This validator's secret key. pub(crate) key: validator::SecretKey, /// Set of the currently open inbound connections. - pub(crate) inbound: PoolWatch, + pub(crate) inbound: PoolWatch, /// Set of the currently open outbound connections. - pub(crate) outbound: PoolWatch, - /// RPC clients for all validators. - pub(crate) clients: HashMap>, + pub(crate) outbound: PoolWatch>, } #[async_trait::async_trait] @@ -61,22 +66,13 @@ impl rpc::Handler for &Network { impl Network { /// Constructs a new consensus network state. - pub(crate) fn new(ctx: &ctx::Ctx, gossip: Arc) -> Option> { + pub(crate) fn new(gossip: Arc) -> Option> { let key = gossip.cfg.validator_key.clone()?; let validators: HashSet<_> = gossip.genesis().validators.iter().cloned().collect(); Some(Arc::new(Self { key, inbound: PoolWatch::new(validators.clone(), 0), outbound: PoolWatch::new(validators.clone(), 0), - clients: validators - .iter() - .map(|peer| { - ( - peer.clone(), - rpc::Client::new(ctx, gossip.cfg.rpc.consensus_rate), - ) - }) - .collect(), gossip, })) } @@ -88,10 +84,11 @@ impl Network { msg: validator::Signed, ) -> anyhow::Result<()> { let req = rpc::consensus::Req(msg); + let outbound = self.outbound.current(); scope::run!(ctx, |ctx, s| async { - for (peer, client) in &self.clients { + for (peer, conn) in &outbound { s.spawn(async { - if let Err(err) = client.call(ctx, &req, RESP_MAX_SIZE).await { + if let Err(err) = conn.consensus.call(ctx, &req, RESP_MAX_SIZE).await { tracing::info!("send({:?},): {err:#}", &*peer); } Ok(()) @@ -109,8 +106,11 @@ impl Network { key: &validator::PublicKey, msg: validator::Signed, ) -> anyhow::Result<()> { - let client = self.clients.get(key).context("not an active validator")?; - client + let outbound = self.outbound.current(); + outbound + .get(key) + .context("not an active validator")? + .consensus .call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE) .await?; Ok(()) @@ -125,7 +125,7 @@ impl Network { ) -> anyhow::Result<()> { let peer = handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?; - self.inbound.insert(peer.clone()).await?; + self.inbound.insert(peer.clone(), ()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) @@ -152,7 +152,6 @@ impl Network { peer: &validator::PublicKey, addr: std::net::SocketAddr, ) -> anyhow::Result<()> { - let client = self.clients.get(peer).context("not an active validator")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::ConsensusNet).await?; handshake::outbound( ctx, @@ -162,11 +161,15 @@ impl Network { peer, ) .await?; - self.outbound.insert(peer.clone()).await?; + let conn = Arc::new(Connection { + addr, + consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate), + }); + self.outbound.insert(peer.clone(), conn.clone()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) - .add_client(client); + .add_client(&conn.consensus); if let Some(ping_timeout) = &self.gossip.cfg.ping_timeout { let ping_client = rpc::Client::::new(ctx, rpc::ping::RATE); service = service.add_client(&ping_client); @@ -175,6 +178,28 @@ impl Network { ping_client.ping_loop(ctx, *ping_timeout).await }); } + // If this is a loopback connection, announce periodically the address of this + // validator to the network. + // Note that this is executed only for outbound end of the loopback connection. + // Inbound end doesn't know the public address of itself. + if peer == &self.key.public() { + s.spawn(async { + let mut sub = self.gossip.validator_addrs.subscribe(); + while ctx.is_active() { + self.gossip + .validator_addrs + .announce(&self.key, addr, ctx.now_utc()) + .await; + let _ = sync::wait_for( + &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), + &mut sub, + |got| got.get(peer).map(|x| x.msg.addr) != Some(addr), + ) + .await; + } + Ok(()) + }); + } service.run(ctx, stream).await?; Ok(()) }) @@ -183,9 +208,35 @@ impl Network { res } + async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let addr = *self + .gossip + .cfg + .public_addr + .resolve(ctx) + .await? + .context("resolve()")? + .choose(&mut ctx.rng()) + .with_context(|| { + format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr) + })?; + self.run_outbound_stream(ctx, &self.key.public(), addr) + .await + } + /// Maintains a connection to the given validator. /// If connection breaks, it tries to reconnect periodically. pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) { + // Loopback connection is a special case, because the address is taken from + // the config rather than from `gossip.validator_addrs`. + if &self.key.public() == peer { + while ctx.is_active() { + if let Err(err) = self.run_loopback_stream(ctx).await { + tracing::info!("run_loopback_stream(): {err:#}"); + } + } + return; + } let addrs = &mut self.gossip.validator_addrs.subscribe(); let mut addr = None; while ctx.is_active() { @@ -204,35 +255,4 @@ impl Network { } } } - - /// Periodically announces this validator's public IP over gossip network, - /// so that other validators can discover and connect to this validator. - pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) { - let my_addr = self.gossip.cfg.public_addr; - let mut sub = self.gossip.validator_addrs.subscribe(); - while ctx.is_active() { - let ctx = &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL); - let _ = sync::wait_for(ctx, &mut sub, |got| { - got.get(&self.key.public()).map(|x| &x.msg.addr) != Some(&my_addr) - }) - .await; - let next_version = sub - .borrow() - .get(&self.key.public()) - .map(|x| x.msg.version + 1) - .unwrap_or(0); - self.gossip - .validator_addrs - .update( - &self.gossip.genesis().validators, - &[Arc::new(self.key.sign_msg(validator::NetAddress { - addr: my_addr, - version: next_version, - timestamp: ctx.now_utc(), - }))], - ) - .await - .unwrap(); - } - } } diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index f2adc113..6e23a5a3 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -19,7 +19,7 @@ async fn test_one_connection_per_validator() { let (store,runner) = new_store(ctx,&setup.genesis).await; s.spawn_bg(runner.run(ctx)); let nodes : Vec<_> = nodes.into_iter().enumerate().map(|(i,node)| { - let (node,runner) = testonly::Instance::new(ctx, node, store.clone()); + let (node,runner) = testonly::Instance::new(node, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }).collect(); @@ -77,7 +77,7 @@ async fn test_genesis_mismatch() { tracing::info!("Start one node, we will simulate the other one."); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); tracing::info!("Populate the validator_addrs of the running node."); @@ -87,7 +87,7 @@ async fn test_genesis_mismatch() { .update( &setup.genesis.validators, &[Arc::new(setup.keys[1].sign_msg(validator::NetAddress { - addr: cfgs[1].public_addr, + addr: *cfgs[1].server_addr, version: 0, timestamp: ctx.now_utc(), }))], @@ -109,7 +109,7 @@ async fn test_genesis_mismatch() { tracing::info!("Try to connect to a node with a mismatching genesis."); let mut stream = - preface::connect(ctx, cfgs[0].public_addr, preface::Endpoint::ConsensusNet) + preface::connect(ctx, *cfgs[0].server_addr, preface::Endpoint::ConsensusNet) .await .context("preface::connect")?; let res = handshake::outbound( @@ -145,7 +145,7 @@ async fn test_address_change() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -167,8 +167,8 @@ async fn test_address_change() { // Then it should broadcast its new address and the consensus network // should get reconstructed. cfgs[0].server_addr = net::tcp::testonly::reserve_listener(); - cfgs[0].public_addr = *cfgs[0].server_addr; - let (node0, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + cfgs[0].public_addr = (*cfgs[0].server_addr).into(); + let (node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); nodes[0] = node0; for n in &nodes { @@ -198,7 +198,7 @@ async fn test_transmission() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); let i = ctx::NoCopy(i); s.spawn_bg(async { let i = i; diff --git a/node/actors/network/src/gossip/arcmap.rs b/node/actors/network/src/gossip/arcmap.rs deleted file mode 100644 index 2fe3307d..00000000 --- a/node/actors/network/src/gossip/arcmap.rs +++ /dev/null @@ -1,42 +0,0 @@ -//! Multimap of pointers indexed by `node::PublicKey`. -//! Used to maintain a collection GetBlock rpc clients. -//! TODO(gprusak): consider upgrading PoolWatch instead. -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; -use zksync_consensus_roles::node; - -/// ArcMap -pub(crate) struct ArcMap(Mutex>>>); - -impl Default for ArcMap { - fn default() -> Self { - Self(Mutex::default()) - } -} - -impl ArcMap { - /// Fetches any pointer for the given key. - pub(crate) fn get_any(&self, key: &node::PublicKey) -> Option> { - self.0.lock().unwrap().get(key)?.first().cloned() - } - - /// Insert a pointer. - pub(crate) fn insert(&self, key: node::PublicKey, p: Arc) { - self.0.lock().unwrap().entry(key).or_default().push(p); - } - - /// Removes a pointer. - pub(crate) fn remove(&self, key: node::PublicKey, p: Arc) { - let mut this = self.0.lock().unwrap(); - use std::collections::hash_map::Entry; - let Entry::Occupied(mut e) = this.entry(key) else { - return; - }; - e.get_mut().retain(|c| !Arc::ptr_eq(&p, c)); - if e.get_mut().is_empty() { - e.remove(); - } - } -} diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index e20cf7e8..2b88dbb2 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,43 +12,40 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -use crate::{ - gossip::{ArcMap, ValidatorAddrsWatch}, - io, - pool::PoolWatch, - rpc, Config, -}; +use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, rpc, Config}; use anyhow::Context as _; use std::sync::{atomic::AtomicUsize, Arc}; -mod arcmap; mod handshake; mod runner; #[cfg(test)] mod tests; mod validator_addrs; -pub(crate) use arcmap::*; pub(crate) use validator_addrs::*; use zksync_concurrency::{ctx, ctx::channel}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; +/// State of the gossip connection. +pub(crate) struct Connection { + /// `get_block` rpc client. + pub(crate) get_block: rpc::Client, +} + /// Gossip network state. pub(crate) struct Network { /// Gossip network configuration. pub(crate) cfg: Config, /// Currently open inbound connections. - pub(crate) inbound: PoolWatch, + pub(crate) inbound: PoolWatch>, /// Currently open outbound connections. - pub(crate) outbound: PoolWatch, + pub(crate) outbound: PoolWatch>, /// Current state of knowledge about validators' endpoints. pub(crate) validator_addrs: ValidatorAddrsWatch, /// Block store to serve `get_block` requests from. pub(crate) block_store: Arc, - /// Clients for `get_block` requests for each currently active peer. - pub(crate) get_block_clients: ArcMap>, /// Output pipe of the network actor. pub(crate) sender: channel::UnboundedSender, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. @@ -71,7 +68,6 @@ impl Network { outbound: PoolWatch::new(cfg.gossip.static_outbound.keys().cloned().collect(), 0), validator_addrs: ValidatorAddrsWatch::default(), block_store, - get_block_clients: ArcMap::default(), cfg, push_validator_addrs_calls: 0.into(), }) @@ -89,10 +85,13 @@ impl Network { recipient: &node::PublicKey, number: validator::BlockNumber, ) -> anyhow::Result> { - Ok(self - .get_block_clients - .get_any(recipient) + let outbound = self.outbound.current(); + let inbound = self.inbound.current(); + Ok(outbound + .get(recipient) + .or(inbound.get(recipient)) .context("recipient is unreachable")? + .get_block .call( ctx, &rpc::get_block::Req(number), diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index b84ac131..e3568b13 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -1,8 +1,10 @@ -use super::{handshake, Network, ValidatorAddrs}; +use super::{handshake, Connection, Network, ValidatorAddrs}; use crate::{io, noise, preface, rpc}; +use anyhow::Context as _; use async_trait::async_trait; +use rand::seq::SliceRandom; use std::sync::{atomic::Ordering, Arc}; -use zksync_concurrency::{ctx, oneshot, scope, sync}; +use zksync_concurrency::{ctx, net, oneshot, scope, sync}; use zksync_consensus_roles::node; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; @@ -79,6 +81,7 @@ impl Network { ctx: &ctx::Ctx, peer: &node::PublicKey, stream: noise::Stream, + conn: &Connection, ) -> anyhow::Result<()> { let push_validator_addrs_client = rpc::Client::::new( ctx, @@ -90,15 +93,7 @@ impl Network { self.cfg.rpc.push_block_store_state_rate, ); let push_block_store_state_server = PushBlockStoreStateServer { peer, net: self }; - - let get_block_client = Arc::new(rpc::Client::::new( - ctx, - self.cfg.rpc.get_block_rate, - )); - self.get_block_clients - .insert(peer.clone(), get_block_client.clone()); - - let res = scope::run!(ctx, |ctx, s| async { + scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_client(&push_validator_addrs_client) .add_server( @@ -110,7 +105,7 @@ impl Network { push_block_store_state_server, self.cfg.rpc.push_block_store_state_rate, ) - .add_client(&get_block_client) + .add_client(&conn.get_block) .add_server(&*self.block_store, self.cfg.rpc.get_block_rate) .add_server(rpc::ping::Server, rpc::ping::RATE); @@ -154,11 +149,7 @@ impl Network { service.run(ctx, stream).await?; Ok(()) }) - .await; - - self.get_block_clients - .remove(peer.clone(), get_block_client); - res + .await } /// Handles an inbound stream. @@ -171,8 +162,11 @@ impl Network { let peer = handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; tracing::Span::current().record("peer", tracing::field::debug(&peer)); - self.inbound.insert(peer.clone()).await?; - let res = self.run_stream(ctx, &peer, stream).await; + let conn = Arc::new(Connection { + get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), + }); + self.inbound.insert(peer.clone(), conn.clone()).await?; + let res = self.run_stream(ctx, &peer, stream, &conn).await; self.inbound.remove(&peer).await; res } @@ -182,8 +176,14 @@ impl Network { &self, ctx: &ctx::Ctx, peer: &node::PublicKey, - addr: std::net::SocketAddr, + addr: net::Host, ) -> anyhow::Result<()> { + let addr = *addr + .resolve(ctx) + .await? + .context("resolve()")? + .choose(&mut ctx.rng()) + .with_context(|| "{addr:?} resolved to empty address set")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet).await?; handshake::outbound( ctx, @@ -193,9 +193,11 @@ impl Network { peer, ) .await?; - - self.outbound.insert(peer.clone()).await?; - let res = self.run_stream(ctx, peer, stream).await; + let conn = Arc::new(Connection { + get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), + }); + self.outbound.insert(peer.clone(), conn.clone()).await?; + let res = self.run_stream(ctx, peer, stream, &conn).await; self.outbound.remove(peer).await; res } diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 575a8492..85e4d0d1 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -10,7 +10,7 @@ use std::{ use test_casing::{test_casing, Product}; use tracing::Instrument as _; use zksync_concurrency::{ - ctx, oneshot, scope, sync, + ctx, net, oneshot, scope, sync, testonly::{abort_on_panic, set_timeout}, time, }; @@ -30,7 +30,7 @@ async fn test_one_connection_per_node() { let (store,runner) = new_store(ctx,&setup.genesis).await; s.spawn_bg(runner.run(ctx)); let mut nodes : Vec<_> = cfgs.iter().enumerate().map(|(i,cfg)| { - let (node,runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node,runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }).collect(); @@ -46,7 +46,7 @@ async fn test_one_connection_per_node() { let (peer, addr) = cfgs[0].gossip.static_outbound.iter().next().unwrap(); let mut stream = preface::connect( ctx, - *addr, + addr.resolve(ctx).await.unwrap().unwrap()[0], preface::Endpoint::GossipNet, ) .await @@ -235,7 +235,7 @@ async fn test_validator_addrs_propagation() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -245,7 +245,7 @@ async fn test_validator_addrs_propagation() { .map(|cfg| { ( cfg.validator_key.as_ref().unwrap().public(), - cfg.public_addr, + *cfg.server_addr, ) }) .collect(); @@ -274,7 +274,7 @@ async fn test_genesis_mismatch() { tracing::info!("Start one node, we will simulate the other one."); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (_node, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (_node, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); tracing::info!("Accept a connection with mismatching genesis."); @@ -290,7 +290,7 @@ async fn test_genesis_mismatch() { assert_matches!(res, Err(handshake::Error::GenesisMismatch)); tracing::info!("Try to connect to a node with a mismatching genesis."); - let mut stream = preface::connect(ctx, cfgs[0].public_addr, preface::Endpoint::GossipNet) + let mut stream = preface::connect(ctx, *cfgs[0].server_addr, preface::Endpoint::GossipNet) .await .context("preface::connect")?; let res = handshake::outbound( @@ -333,7 +333,7 @@ async fn syncing_blocks(node_count: usize, gossip_peers: usize) { for (i, cfg) in cfgs.into_iter().enumerate() { let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfg, store); + let (node, runner) = testonly::Instance::new(cfg, store); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); nodes.push(node); } @@ -408,7 +408,7 @@ async fn uncoordinated_block_syncing( let i = i; let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); s.spawn(async { let store = store; @@ -455,7 +455,7 @@ async fn getting_blocks_from_peers(node_count: usize, gossip_peers: usize) { .into_iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -536,7 +536,7 @@ async fn validator_node_restart() { cfg.rpc.push_validator_addrs_rate.refresh = time::Duration::ZERO; } let (store, store_runner) = new_store(ctx, &setup.genesis).await; - let (node1, node1_runner) = testonly::Instance::new(ctx, cfgs[1].clone(), store.clone()); + let (node1, node1_runner) = testonly::Instance::new(cfgs[1].clone(), store.clone()); scope::run!(ctx, |ctx, s| async { s.spawn_bg(store_runner.run(ctx)); s.spawn_bg( @@ -552,8 +552,8 @@ async fn validator_node_restart() { let start = ctx.now_utc(); for clock_shift in [zero, sec, -2 * sec, 4 * sec, 10 * sec, -30 * sec] { // Set the new addr to broadcast. - let addr0 = mk_addr(rng); - cfgs[0].public_addr = addr0; + cfgs[0].server_addr = net::tcp::testonly::reserve_listener(); + cfgs[0].public_addr = (*cfgs[0].server_addr).into(); // Shift the UTC clock. let now = start + clock_shift; assert!( @@ -565,16 +565,14 @@ async fn validator_node_restart() { // _node0 contains pipe, which has to exist to prevent the connection from dying // early. - let (_node0, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (_node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); scope::run!(ctx, |ctx, s| async { s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); tracing::info!("wait for the update to arrive to node1"); let sub = &mut node1.net.gossip.validator_addrs.subscribe(); + let want = Some(*cfgs[0].server_addr); sync::wait_for(ctx, sub, |got| { - let Some(got) = got.get(&setup.keys[0].public()) else { - return false; - }; - got.msg.addr == addr0 + got.get(&setup.keys[0].public()).map(|x| x.msg.addr) == want }) .await?; Ok(()) @@ -607,13 +605,13 @@ async fn rate_limiting() { .map(|cfg| { ( cfg.validator_key.as_ref().unwrap().public(), - cfg.public_addr, + *cfg.server_addr, ) }) .collect(); for i in 1..n { let key = cfgs[i].gossip.key.public().clone(); - let public_addr = cfgs[i].public_addr; + let public_addr = cfgs[i].public_addr.clone(); cfgs[0].gossip.static_outbound.insert(key, public_addr); } let mut nodes = vec![]; @@ -623,7 +621,7 @@ async fn rate_limiting() { // Spawn the satellite nodes and wait until they register // their own address. for (i, cfg) in cfgs[1..].iter().enumerate() { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); let sub = &mut node.net.gossip.validator_addrs.subscribe(); sync::wait_for(ctx, sub, |got| { @@ -636,7 +634,7 @@ async fn rate_limiting() { } // Spawn the center node. - let (center, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (center, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node[0]"))); // Await for the center to receive all validator addrs. let sub = &mut center.net.gossip.validator_addrs.subscribe(); diff --git a/node/actors/network/src/gossip/validator_addrs.rs b/node/actors/network/src/gossip/validator_addrs.rs index ed176bf5..8fd21b63 100644 --- a/node/actors/network/src/gossip/validator_addrs.rs +++ b/node/actors/network/src/gossip/validator_addrs.rs @@ -1,7 +1,7 @@ //! Global state distributed by active validators, observed by all the nodes in the network. use crate::watch::Watch; use std::{collections::HashSet, sync::Arc}; -use zksync_concurrency::sync; +use zksync_concurrency::{sync, time}; use zksync_consensus_roles::validator; /// Mapping from validator::PublicKey to a signed validator::NetAddress. @@ -90,6 +90,28 @@ impl ValidatorAddrsWatch { self.0.subscribe() } + /// Inserts a new version of the announcement signed with the given key. + pub(crate) async fn announce( + &self, + key: &validator::SecretKey, + addr: std::net::SocketAddr, + timestamp: time::Utc, + ) { + let this = self.0.lock().await; + let mut validator_addrs = this.borrow().clone(); + let version = validator_addrs + .get(&key.public()) + .map(|x| x.msg.version + 1) + .unwrap_or(0); + let d = Arc::new(key.sign_msg(validator::NetAddress { + addr, + version, + timestamp, + })); + validator_addrs.0.insert(d.key.clone(), d); + this.send_replace(validator_addrs); + } + /// Inserts data to ValidatorAddrs. /// Subscribers are notified iff at least 1 new entry has /// been inserted. Returns an error iff an invalid @@ -103,7 +125,7 @@ impl ValidatorAddrsWatch { let this = self.0.lock().await; let mut validator_addrs = this.borrow().clone(); if validator_addrs.update(validators, data)? { - this.send(validator_addrs).ok().unwrap(); + this.send_replace(validator_addrs); } Ok(()) } diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 73c25cb4..c8d3e70d 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -46,13 +46,12 @@ impl Network { /// Constructs a new network actor state. /// Call `run_network` to run the actor. pub fn new( - ctx: &ctx::Ctx, cfg: Config, block_store: Arc, pipe: ActorPipe, ) -> (Arc, Runner) { let gossip = gossip::Network::new(cfg, block_store, pipe.send); - let consensus = consensus::Network::new(ctx, gossip.clone()); + let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( net.clone(), @@ -137,8 +136,11 @@ impl Runner { for (peer, addr) in &self.net.gossip.cfg.gossip.static_outbound { s.spawn(async { loop { - let run_result = - self.net.gossip.run_outbound_stream(ctx, peer, *addr).await; + let run_result = self + .net + .gossip + .run_outbound_stream(ctx, peer, addr.clone()) + .await; if let Err(err) = run_result { tracing::info!("gossip.run_outbound_stream(): {err:#}"); } @@ -150,20 +152,16 @@ impl Runner { } if let Some(c) = &self.net.consensus { + let validators = &c.gossip.genesis().validators; // If we are active validator ... - if c.gossip.genesis().validators.contains(&c.key.public()) { + if validators.contains(&c.key.public()) { // Maintain outbound connections. - for peer in c.clients.keys() { + for peer in validators.iter() { s.spawn(async { c.maintain_connection(ctx, peer).await; Ok(()) }); } - // Announce IP periodically. - s.spawn(async { - c.run_address_announcer(ctx).await; - Ok(()) - }); } } diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs index 86fa1cf5..5cf1c52f 100644 --- a/node/actors/network/src/metrics.rs +++ b/node/actors/network/src/metrics.rs @@ -145,15 +145,14 @@ impl NetworkGauges { let register_result = COLLECTOR.before_scrape(move || { state_ref.upgrade().map(|state| { let gauges = NetworkGauges::default(); - let len = state.gossip.inbound.subscribe().borrow().current().len(); + let len = state.gossip.inbound.current().len(); gauges.gossip_inbound_connections.set(len); - let len = state.gossip.outbound.subscribe().borrow().current().len(); + let len = state.gossip.outbound.current().len(); gauges.gossip_outbound_connections.set(len); if let Some(consensus_state) = &state.consensus { - let len = consensus_state.inbound.subscribe().borrow().current().len(); + let len = consensus_state.inbound.current().len(); gauges.consensus_inbound_connections.set(len); - let subscriber = consensus_state.outbound.subscribe(); - let len = subscriber.borrow().current().len(); + let len = consensus_state.outbound.current().len(); gauges.consensus_outbound_connections.set(len); } gauges diff --git a/node/actors/network/src/pool.rs b/node/actors/network/src/pool.rs index 92257993..a5e7375f 100644 --- a/node/actors/network/src/pool.rs +++ b/node/actors/network/src/pool.rs @@ -4,37 +4,36 @@ use crate::watch::Watch; use std::collections::HashSet; use zksync_concurrency::sync; -/// Set of elements of type T. +/// Map with restrictions on the allowed keys. /// This set consists of an arbitrary subset of `allowed` + up to `extra_limit` elements outside of /// `allowed`. #[derive(Clone)] -pub(crate) struct Pool { +pub(crate) struct Pool { extra_limit: usize, extra_count: usize, - allowed: HashSet, - current: HashSet, + allowed: HashSet, + current: im::HashMap, } -impl Pool { - /// Returns a reference to the underlying set. - #[allow(dead_code)] - pub(crate) fn current(&self) -> &HashSet { +impl Pool { + /// Current pool state. + pub(crate) fn current(&self) -> &im::HashMap { &self.current } } /// Watch wrapper of the Pool. -/// Supports subscribing to the Pool updates. -pub(crate) struct PoolWatch(Watch>); +/// Supports subscribing to the Pool membership changes. +pub(crate) struct PoolWatch(Watch>); -impl PoolWatch { +impl PoolWatch { /// Constructs a new pool. - pub(crate) fn new(allowed: HashSet, extra_limit: usize) -> Self { + pub(crate) fn new(allowed: HashSet, extra_limit: usize) -> Self { Self(Watch::new(Pool { extra_limit, extra_count: 0, allowed, - current: HashSet::new(), + current: im::HashMap::new(), })) } @@ -42,40 +41,44 @@ impl PoolWatch { /// Returns an error if /// * `v` is already in the set /// * `v` cannot be added due to size restrictions - pub(crate) async fn insert(&self, v: T) -> anyhow::Result<()> { + pub(crate) async fn insert(&self, k: K, v: V) -> anyhow::Result<()> { self.0 .send_if_ok(|pool| { - if pool.current.contains(&v) { + if pool.current.contains_key(&k) { anyhow::bail!("already exists"); } - if !pool.allowed.contains(&v) { + if !pool.allowed.contains(&k) { if pool.extra_count >= pool.extra_limit { anyhow::bail!("limit exceeded"); } pool.extra_count += 1; } - pool.current.insert(v); + pool.current.insert(k, v); Ok(()) }) .await } /// Removes an element from the set. - pub(crate) async fn remove(&self, v: &T) { + pub(crate) async fn remove(&self, k: &K) { self.0.lock().await.send_if_modified(|pool| { - if !pool.current.remove(v) { + if pool.current.remove(k).is_none() { return false; } - if !pool.allowed.contains(v) { + if !pool.allowed.contains(k) { pool.extra_count -= 1; } true }); } + /// Copy of the current pool state. + pub(crate) fn current(&self) -> im::HashMap { + self.0.subscribe().borrow().current.clone() + } + /// Subscribes to the set changes. - #[allow(dead_code)] - pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { + pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { self.0.subscribe() } } diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 184326cd..1e58ad17 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -119,6 +119,8 @@ pub(crate) struct Client { impl Client { /// Constructs a new client. + // TODO(gprusak): at this point we don't need the clients to be reusable, + // so perhaps they should be constructed by `Service::add_client` instead? pub(crate) fn new(ctx: &ctx::Ctx, rate: limiter::Rate) -> Self { Client { limiter: limiter::Limiter::new(ctx, rate), diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 022bb747..3a55c324 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -57,7 +57,7 @@ pub fn new_configs( let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, - public_addr: *addr, + public_addr: (*addr).into(), // Pings are disabled in tests by default to avoid dropping connections // due to timeouts. ping_timeout: None, @@ -79,7 +79,7 @@ pub fn new_configs( for j in 0..gossip_peers { let j = (i + j + 1) % n; let peer = cfgs[j].gossip.key.public(); - let addr = *cfgs[j].server_addr; + let addr = cfgs[j].public_addr.clone(); cfgs[i].gossip.static_outbound.insert(peer, addr); } } @@ -92,7 +92,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, - public_addr: *addr, + public_addr: (*addr).into(), // Pings are disabled in tests by default to avoid dropping connections // due to timeouts. ping_timeout: None, @@ -101,7 +101,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { key: rng.gen(), dynamic_inbound_limit: usize::MAX, static_inbound: HashSet::default(), - static_outbound: [(peer.gossip.key.public(), peer.public_addr)].into(), + static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(), }, max_block_size: usize::MAX, rpc: RpcConfig::default(), @@ -130,13 +130,9 @@ impl InstanceRunner { impl Instance { /// Construct an instance for a given config. - pub fn new( - ctx: &ctx::Ctx, - cfg: Config, - block_store: Arc, - ) -> (Self, InstanceRunner) { + pub fn new(cfg: Config, block_store: Arc) -> (Self, InstanceRunner) { let (actor_pipe, dispatcher_pipe) = pipe::new(); - let (net, runner) = Network::new(ctx, cfg, block_store, actor_pipe); + let (net, runner) = Network::new(cfg, block_store, actor_pipe); let (terminate_send, terminate_recv) = channel::bounded(1); ( Self { @@ -187,7 +183,7 @@ impl Instance { .gossip .outbound .subscribe() - .wait_for(|got| want.is_subset(got.current())) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); } @@ -200,13 +196,13 @@ impl Instance { consensus_state .inbound .subscribe() - .wait_for(|got| got.current() == &want) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); consensus_state .outbound .subscribe() - .wait_for(|got| got.current() == &want) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); } @@ -219,11 +215,11 @@ impl Instance { ) -> ctx::OrCanceled<()> { let state = &self.net.gossip; sync::wait_for(ctx, &mut state.inbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; sync::wait_for(ctx, &mut state.outbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; Ok(()) @@ -237,11 +233,11 @@ impl Instance { ) -> ctx::OrCanceled<()> { let state = self.net.consensus.as_ref().unwrap(); sync::wait_for(ctx, &mut state.inbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; sync::wait_for(ctx, &mut state.outbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; Ok(()) diff --git a/node/actors/network/src/tests.rs b/node/actors/network/src/tests.rs index c1cde500..718ac5e6 100644 --- a/node/actors/network/src/tests.rs +++ b/node/actors/network/src/tests.rs @@ -20,7 +20,7 @@ async fn test_metrics() { .into_iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index b078a804..77e3affd 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -80,7 +80,7 @@ impl NodeRunner { let key = self.network.gossip.key.public(); let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); let (mut network, network_runner) = - network::testonly::Instance::new(ctx, self.network.clone(), self.store.clone()); + network::testonly::Instance::new(self.network.clone(), self.store.clone()); let sync_blocks_config = Config::new(); let res = scope::run!(ctx, |ctx, s| async { s.spawn_bg(self.store_runner.run(ctx)); diff --git a/node/libs/concurrency/src/net/mod.rs b/node/libs/concurrency/src/net/mod.rs index 12731c90..8a507737 100644 --- a/node/libs/concurrency/src/net/mod.rs +++ b/node/libs/concurrency/src/net/mod.rs @@ -1,3 +1,43 @@ //! Context-aware network utilities. //! Built on top of `tokio::net`. +use crate::ctx; + pub mod tcp; + +#[cfg(test)] +mod tests; + +/// Network host address in the format ":". +/// NOT VALIDATED, validation happens at `Host::resolve()` call. +// TODO: for better type safety consider verifying host to be in the valid +// format in constructor. +#[derive(Debug, Clone, PartialEq)] +pub struct Host(pub String); + +impl From for Host { + fn from(addr: std::net::SocketAddr) -> Self { + Self(addr.to_string()) + } +} + +impl Host { + /// If host is of the form ":", performs DNS resolution. + /// If host is of the form ":", just parses the SocketAddr. + pub async fn resolve( + &self, + ctx: &ctx::Ctx, + ) -> ctx::OrCanceled>> { + let host = self.0.clone(); + // Note that we may orphan a task executing the underlying `getnameinfo` call + // if the ctx gets cancelled. This should be fine given that it is expected to finish + // after a timeout and it doesn't affect the state of the application. + // We don't use `tokio::net::lookup_host`, because it is not documented to be cancel-safe. + Ok(ctx + .wait(tokio::task::spawn_blocking(move || { + // This should never panic, so unwrapping the task result is ok. + Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) + })) + .await? + .unwrap()) + } +} diff --git a/node/libs/concurrency/src/net/tests.rs b/node/libs/concurrency/src/net/tests.rs new file mode 100644 index 00000000..69c7a1e7 --- /dev/null +++ b/node/libs/concurrency/src/net/tests.rs @@ -0,0 +1,36 @@ +use super::*; + +#[tokio::test] +async fn test_resolve() { + let ctx = &ctx::test_root(&ctx::RealClock); + let addrs = Host("localhost:1234".into()) + .resolve(ctx) + .await + .unwrap() + .unwrap(); + // We assume here that loopback host "localhost" is always configured here. + // If that turns out to be problematic we should use a more robust DNS resolution + // library, so that we can run our own DNS server in tests. + assert!(!addrs.is_empty()); + for addr in addrs { + // Verify the port and that the returned address is actually a loopback. + assert_eq!(addr.port(), 1234); + assert!(addr.ip().is_loopback()); + + // Verify that resolving the serialized address returns the same address. + let got = Host(addr.to_string()).resolve(ctx).await.unwrap().unwrap(); + assert!(got.len() == 1); + assert_eq!(got[0], addr); + } + + // Host without port should not resolve. + assert!(Host("localhost".into()) + .resolve(ctx) + .await + .unwrap() + .is_err()); + // Invalid host name should not resolve. + assert!(Host("$#$:6746".into()).resolve(ctx).await.unwrap().is_err()); + // Empty host name should not resolve. + assert!(Host(String::new()).resolve(ctx).await.unwrap().is_err()); +} diff --git a/node/libs/concurrency/src/time.rs b/node/libs/concurrency/src/time.rs index b46cca63..718317c6 100644 --- a/node/libs/concurrency/src/time.rs +++ b/node/libs/concurrency/src/time.rs @@ -18,6 +18,12 @@ impl std::fmt::Debug for Utc { } } +impl std::fmt::Display for Utc { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + (time::OffsetDateTime::UNIX_EPOCH + self.0).fmt(f) + } +} + /// Start of the unix epoch. pub const UNIX_EPOCH: Utc = Utc(Duration::ZERO); diff --git a/node/libs/protobuf/Cargo.toml b/node/libs/protobuf/Cargo.toml index e7302137..889b531c 100644 --- a/node/libs/protobuf/Cargo.toml +++ b/node/libs/protobuf/Cargo.toml @@ -9,6 +9,7 @@ links = "zksync_protobuf_proto" [dependencies] zksync_concurrency.workspace = true +zksync_consensus_utils.workspace = true anyhow.workspace = true bit-vec.workspace = true @@ -19,6 +20,7 @@ quick-protobuf.workspace = true rand.workspace = true serde.workspace = true serde_json.workspace = true +serde_yaml.workspace = true [dev-dependencies] tokio.workspace = true @@ -29,4 +31,4 @@ tracing-subscriber.workspace = true zksync_protobuf_build.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/node/libs/protobuf/src/testonly/gen_value.rs b/node/libs/protobuf/src/testonly/gen_value.rs deleted file mode 100644 index 4c091e37..00000000 --- a/node/libs/protobuf/src/testonly/gen_value.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! Util types for generating random values in tests. - -use rand::Rng; - -/// Generator of random values. -pub struct GenValue<'a, R: Rng> { - /// Underlying RNG. - pub rng: &'a mut R, - /// Generate values with only required fields. - pub required_only: bool, - /// Generate decimal fractions for f64 - /// to avoid rounding errors of decimal encodings. - pub decimal_fractions: bool, -} - -impl<'a, R: Rng> GenValue<'a, R> { - /// Generates a random value of type `C`. - pub fn gen(&mut self) -> C { - C::sample(self) - } -} - -/// Types that can be used to generate a random instance. -pub trait RandomValue { - /// Generates a random value. - fn sample(g: &mut GenValue) -> Self; -} diff --git a/node/libs/protobuf/src/testonly/mod.rs b/node/libs/protobuf/src/testonly/mod.rs index e86a6479..051db885 100644 --- a/node/libs/protobuf/src/testonly/mod.rs +++ b/node/libs/protobuf/src/testonly/mod.rs @@ -1,14 +1,12 @@ //! Testonly utilities. - -pub mod gen_value; -use super::{canonical, canonical_raw, decode, encode, read_fields, ProtoFmt, Wire}; -pub use gen_value::*; +use super::{canonical, canonical_raw, decode, encode, read_fields, ProtoFmt, ProtoRepr, Wire}; use prost::Message as _; -use prost_reflect::ReflectMessage as _; +use prost_reflect::ReflectMessage; use rand::{ distributions::{Distribution, Standard}, Rng, }; +use zksync_consensus_utils::EncodeDist; /// Test encoding and canonical encoding properties. #[track_caller] @@ -27,7 +25,7 @@ pub fn test_encode(rng: &mut } /// Syntax sugar for `test_encode`, -/// because `test_encode(rng,&rng::gen())` doesn't compile. +/// because `test_encode(rng,&rng.gen())` doesn't compile. #[track_caller] pub fn test_encode_random(rng: &mut impl Rng) where @@ -90,3 +88,101 @@ pub(crate) fn encode_shuffled(rng: &mut R, x: &T) -> Vec anyhow::Result; + /// build. + fn build(this: &Self::Type) -> Self::Proto; +} + +fn encode_proto(msg: &X::Type) -> Vec { + let msg = X::build(msg); + canonical_raw(&msg.encode_to_vec(), &msg.descriptor()).unwrap() +} + +fn decode_proto(bytes: &[u8]) -> anyhow::Result { + X::read(&X::Proto::decode(bytes)?) +} + +fn encode_json(msg: &X::Type) -> String { + let mut s = serde_json::Serializer::pretty(vec![]); + crate::serde::serialize_proto(&X::build(msg), &mut s).unwrap(); + String::from_utf8(s.into_inner()).unwrap() +} + +fn decode_json(json: &str) -> anyhow::Result { + let mut d = serde_json::Deserializer::from_str(json); + X::read(&crate::serde::deserialize_proto(&mut d)?) +} + +fn encode_yaml(msg: &X::Type) -> String { + let mut s = serde_yaml::Serializer::new(vec![]); + crate::serde::serialize_proto(&X::build(msg), &mut s).unwrap(); + String::from_utf8(s.into_inner().unwrap()).unwrap() +} + +fn decode_yaml(yaml: &str) -> anyhow::Result { + let d = serde_yaml::Deserializer::from_str(yaml); + X::read(&crate::serde::deserialize_proto(d)?) +} + +/// Wrapper for `ProtoRepr`, implementing ProtoConv; +pub struct ReprConv(std::marker::PhantomData

); +/// Wrapper for `ProtoFmt`, implementing ProtoConv; +pub struct FmtConv(std::marker::PhantomData); + +impl ProtoConv for FmtConv { + type Type = T; + type Proto = T::Proto; + fn read(r: &T::Proto) -> anyhow::Result { + ProtoFmt::read(r) + } + fn build(this: &T) -> T::Proto { + ProtoFmt::build(this) + } +} + +impl ProtoConv for ReprConv

{ + type Type = P::Type; + type Proto = P; + fn read(r: &P) -> anyhow::Result { + ProtoRepr::read(r) + } + fn build(this: &P::Type) -> P { + ProtoRepr::build(this) + } +} + +/// Test reencoding random values in various formats. +#[track_caller] +pub fn test_encode_all_formats(rng: &mut impl Rng) +where + X::Type: std::fmt::Debug + PartialEq, + EncodeDist: Distribution, +{ + for required_only in [false, true] { + let want: X::Type = EncodeDist { + required_only, + decimal_fractions: false, + } + .sample(rng); + let got = decode_proto::(&encode_proto::(&want)).unwrap(); + assert_eq!(&want, &got, "binary encoding"); + let got = decode_yaml::(&encode_yaml::(&want)).unwrap(); + assert_eq!(&want, &got, "yaml encoding"); + + let want: X::Type = EncodeDist { + required_only, + decimal_fractions: true, + } + .sample(rng); + let got = decode_json::(&encode_json::(&want)).unwrap(); + assert_eq!(&want, &got, "json encoding"); + } +} diff --git a/node/libs/utils/Cargo.toml b/node/libs/utils/Cargo.toml index 07ac636b..b6a0d6a9 100644 --- a/node/libs/utils/Cargo.toml +++ b/node/libs/utils/Cargo.toml @@ -9,7 +9,8 @@ license.workspace = true [dependencies] zksync_concurrency.workspace = true +rand.workspace = true thiserror.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/node/libs/utils/src/encode.rs b/node/libs/utils/src/encode.rs new file mode 100644 index 00000000..621c017f --- /dev/null +++ b/node/libs/utils/src/encode.rs @@ -0,0 +1,123 @@ +//! Utilities for testing encodings. +use rand::{ + distributions::{Alphanumeric, DistString, Distribution}, + Rng, +}; +use zksync_concurrency::net; + +/// Distribution for testing encodings. +pub struct EncodeDist { + /// Generate configs with only required fields. + pub required_only: bool, + /// Generate decimal fractions for f64 + /// to avoid rounding errors of decimal encodings. + pub decimal_fractions: bool, +} + +impl EncodeDist { + /// Returns a small non-empty range if `required_only` is false. + /// Returns an empty range otherwise. + pub fn sample_range(&self, rng: &mut (impl Rng + ?Sized)) -> std::ops::Range { + if self.required_only { + 0..0 + } else { + 0..rng.gen_range(5..10) + } + } + + /// Returns `Some(f())` if `required_only` is false. + /// Returns `None otherwise. + pub fn sample_opt(&self, f: impl FnOnce() -> T) -> Option { + if self.required_only { + None + } else { + Some(f()) + } + } + + /// Samples a collection of type T. + pub fn sample_collect>( + &self, + rng: &mut (impl Rng + ?Sized), + ) -> T + where + EncodeDist: Distribution, + { + self.sample_range(rng).map(|_| self.sample(rng)).collect() + } +} + +impl Distribution> for EncodeDist +where + EncodeDist: Distribution, +{ + fn sample(&self, rng: &mut R) -> Option { + self.sample_opt(|| self.sample(rng)) + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> std::net::SocketAddr { + std::net::SocketAddr::new(std::net::IpAddr::from(rng.gen::<[u8; 16]>()), rng.gen()) + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> net::Host { + Distribution::::sample(self, rng).into() + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> String { + let n = self.sample_range(rng).len(); + Alphanumeric.sample_string(rng, n) + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> bool { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u8 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u16 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u32 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u64 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> usize { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> std::num::NonZeroU32 { + rng.gen() + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> f64 { + if self.decimal_fractions { + const PRECISION: usize = 1000000; + #[allow(clippy::float_arithmetic)] + return rng.gen_range(0..PRECISION) as f64 / PRECISION as f64; + } + rng.gen() + } +} diff --git a/node/libs/utils/src/lib.rs b/node/libs/utils/src/lib.rs index 31e43ba1..71fc6af1 100644 --- a/node/libs/utils/src/lib.rs +++ b/node/libs/utils/src/lib.rs @@ -1,4 +1,7 @@ //! Crate that holds several small utilities and primitives. +mod encode; pub mod enum_util; pub mod pipe; + +pub use encode::*; diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index f8863a03..c5d2557e 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -1,10 +1,8 @@ //! Deployer for the kubernetes cluster. use clap::Parser; use std::collections::HashMap; -use zksync_consensus_roles::node::SecretKey; -use zksync_consensus_roles::validator; -use zksync_consensus_tools::k8s::ConsensusNode; -use zksync_consensus_tools::{k8s, AppConfig}; +use zksync_consensus_roles::{node::SecretKey, validator}; +use zksync_consensus_tools::{k8s, k8s::ConsensusNode, AppConfig}; /// Command line arguments. #[derive(Debug, Parser)] diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index e5ef14bb..a13db3a6 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -61,21 +61,21 @@ fn main() -> anyhow::Result<()> { default_config.with_metrics_server_addr(metrics_server_addr); } let mut cfgs: Vec<_> = (0..nodes) - .map(|i| default_config.with_public_addr(addrs[i]).clone()) + .map(|i| default_config.with_public_addr(addrs[i].into()).clone()) .collect(); // Construct a gossip network with optimal diameter. for i in 0..nodes { for j in 0..peers { let next = (i * peers + j + 1) % nodes; - cfgs[i].add_gossip_static_outbound(node_keys[next].public(), addrs[next]); + cfgs[i].add_gossip_static_outbound(node_keys[next].public(), addrs[next].into()); cfgs[next].add_gossip_static_inbound(node_keys[i].public()); } } for (i, cfg) in cfgs.into_iter().enumerate() { // Recreate the directory for the node's config. - let root = args.output_dir.join(cfg.public_addr.to_string()); + let root = args.output_dir.join(&cfg.public_addr.0); let _ = fs::remove_dir_all(&root); fs::create_dir_all(&root).with_context(|| format!("create_dir_all({:?})", root))?; cfg.write_to_file(&root)?; diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index ffb8410c..944c39b5 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -9,7 +9,7 @@ use std::{ path::{Path, PathBuf}, str::FromStr, }; -use zksync_concurrency::ctx; +use zksync_concurrency::{ctx, net}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; @@ -43,11 +43,11 @@ pub(crate) fn encode_with_serializer( String::from_utf8(serializer.into_inner()).unwrap() } -/// Pair of (public key, ip address) for a gossip network node. +/// Pair of (public key, host addr) for a gossip network node. #[derive(Debug, Clone)] pub struct NodeAddr { pub key: node::PublicKey, - pub addr: SocketAddr, + pub addr: net::Host, } impl ProtoFmt for NodeAddr { @@ -55,14 +55,14 @@ impl ProtoFmt for NodeAddr { fn read(r: &Self::Proto) -> anyhow::Result { let key = read_required_text(&r.key).context("key")?; - let addr = read_required_text(&r.addr).context("addr")?; + let addr = net::Host(required(&r.addr).context("addr")?.clone()); Ok(Self { addr, key }) } fn build(&self) -> Self::Proto { Self::Proto { key: Some(TextFmt::encode(&self.key)), - addr: Some(TextFmt::encode(&self.addr)), + addr: Some(self.addr.0.clone()), } } } @@ -72,7 +72,7 @@ impl ProtoFmt for NodeAddr { #[derive(Debug, PartialEq, Clone)] pub struct AppConfig { pub server_addr: SocketAddr, - pub public_addr: SocketAddr, + pub public_addr: net::Host, pub metrics_server_addr: Option, pub genesis: validator::Genesis, @@ -80,7 +80,7 @@ pub struct AppConfig { pub gossip_dynamic_inbound_limit: usize, pub gossip_static_inbound: HashSet, - pub gossip_static_outbound: HashMap, + pub gossip_static_outbound: HashMap, } impl ProtoFmt for AppConfig { @@ -104,7 +104,7 @@ impl ProtoFmt for AppConfig { } Ok(Self { server_addr: read_required_text(&r.server_addr).context("server_addr")?, - public_addr: read_required_text(&r.public_addr).context("public_addr")?, + public_addr: net::Host(required(&r.public_addr).context("public_addr")?.clone()), metrics_server_addr: read_optional_text(&r.metrics_server_addr) .context("metrics_server_addr")?, @@ -124,7 +124,7 @@ impl ProtoFmt for AppConfig { fn build(&self) -> Self::Proto { Self::Proto { server_addr: Some(self.server_addr.encode()), - public_addr: Some(self.public_addr.encode()), + public_addr: Some(self.public_addr.0.clone()), metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode), genesis: Some(self.genesis.build()), @@ -143,7 +143,7 @@ impl ProtoFmt for AppConfig { .iter() .map(|(key, addr)| proto::NodeAddr { key: Some(TextFmt::encode(key)), - addr: Some(TextFmt::encode(addr)), + addr: Some(addr.0.clone()), }) .collect(), } @@ -234,7 +234,7 @@ impl AppConfig { pub fn default_for(genesis: validator::Genesis) -> AppConfig { Self { server_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), NODES_PORT), - public_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), NODES_PORT), + public_addr: SocketAddr::new(Ipv4Addr::LOCALHOST.into(), NODES_PORT).into(), metrics_server_addr: None, genesis, @@ -251,7 +251,7 @@ impl AppConfig { self } - pub fn with_public_addr(&mut self, public_addr: SocketAddr) -> &mut Self { + pub fn with_public_addr(&mut self, public_addr: net::Host) -> &mut Self { self.public_addr = public_addr; self } @@ -272,7 +272,7 @@ impl AppConfig { pub fn add_gossip_static_outbound( &mut self, key: node::PublicKey, - addr: SocketAddr, + addr: net::Host, ) -> &mut Self { self.gossip_static_outbound.insert(key, addr); self @@ -292,9 +292,10 @@ impl AppConfig { self } - pub fn check_public_addr(&mut self) -> anyhow::Result<()> { + /// Tries to load the public_addr IP from the env var. + pub fn try_load_public_addr(&mut self) -> anyhow::Result<()> { if let Ok(public_addr) = std::env::var("PUBLIC_ADDR") { - self.public_addr = SocketAddr::from_str(&format!("{public_addr}:{NODES_PORT}"))?; + self.public_addr = SocketAddr::from_str(&format!("{public_addr}:{NODES_PORT}"))?.into(); } Ok(()) } @@ -310,7 +311,7 @@ impl Configs { let e = executor::Executor { config: executor::Config { server_addr: self.app.server_addr, - public_addr: self.app.public_addr, + public_addr: self.app.public_addr.clone(), node_key: self.node_key.clone(), gossip_dynamic_inbound_limit: self.app.gossip_dynamic_inbound_limit, gossip_static_inbound: self.app.gossip_static_inbound.clone(), diff --git a/node/tools/src/k8s.rs b/node/tools/src/k8s.rs index 50cb012b..40c1936f 100644 --- a/node/tools/src/k8s.rs +++ b/node/tools/src/k8s.rs @@ -75,7 +75,7 @@ impl ConsensusNode { .context("Pod IP address not present")?; self.node_addr = Some(NodeAddr { key: self.key.public(), - addr: SocketAddr::new(ip.parse()?, config::NODES_PORT), + addr: SocketAddr::new(ip.parse()?, config::NODES_PORT).into(), }); Ok(()) } diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index a17efa32..7f272131 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -130,14 +130,17 @@ async fn main() -> anyhow::Result<()> { let mut configs = args.config_args().load().context("config_args().load()")?; // if `PUBLIC_ADDR` env var is set, use it to override publicAddr in config - configs.app.check_public_addr().context("Public Address")?; + configs + .app + .try_load_public_addr() + .context("Public Address")?; let (executor, runner) = configs .make_executor(ctx) .await .context("configs.into_executor()")?; - let mut rpc_addr = configs.app.public_addr; + let mut rpc_addr = configs.app.server_addr; if let Some(port) = args.rpc_port { rpc_addr.set_port(port); } else { diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index 15d2f840..95f6c489 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -1,33 +1,26 @@ use crate::{store, AppConfig}; -use rand::{ - distributions::{Distribution, Standard}, - Rng, -}; +use rand::{distributions::Distribution, Rng}; use tempfile::TempDir; use zksync_concurrency::ctx; -use zksync_consensus_roles::{node, validator::testonly::Setup}; +use zksync_consensus_roles::validator::testonly::Setup; use zksync_consensus_storage::{testonly, PersistentBlockStore}; -use zksync_protobuf::testonly::test_encode_random; +use zksync_consensus_utils::EncodeDist; +use zksync_protobuf::testonly::{test_encode_all_formats, FmtConv}; -fn make_addr(rng: &mut R) -> std::net::SocketAddr { - std::net::SocketAddr::new(std::net::IpAddr::from(rng.gen::<[u8; 16]>()), rng.gen()) -} - -impl Distribution for Standard { +impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> AppConfig { AppConfig { - server_addr: make_addr(rng), - public_addr: make_addr(rng), - metrics_server_addr: Some(make_addr(rng)), + server_addr: self.sample(rng), + public_addr: self.sample(rng), + metrics_server_addr: Some(self.sample(rng)), genesis: rng.gen(), gossip_dynamic_inbound_limit: rng.gen(), - gossip_static_inbound: (0..5) - .map(|_| rng.gen::().public()) - .collect(), - gossip_static_outbound: (0..6) - .map(|_| (rng.gen::().public(), make_addr(rng))) + gossip_static_inbound: self.sample_range(rng).map(|_| rng.gen()).collect(), + gossip_static_outbound: self + .sample_range(rng) + .map(|_| (rng.gen(), self.sample(rng))) .collect(), max_payload_size: rng.gen(), } @@ -38,7 +31,7 @@ impl Distribution for Standard { fn test_schema_encoding() { let ctx = ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - test_encode_random::(rng); + test_encode_all_formats::>(rng); } #[tokio::test]