From ce9279566ae9653c6da61b499e89c0779d601484 Mon Sep 17 00:00:00 2001 From: pompon0 Date: Wed, 27 Mar 2024 17:41:35 +0100 Subject: [PATCH] Added dns support for the addresses in the config. (#82) Added dns resolution support for the addresses in the config. Now public_addr and gossip peers' addrs can be specified in `:` format. Validator address discovery still uses `:` and validators announce their IP address only after verifying it (by establishing a loopback connection using the public_addr). DNS ecosystem is slightly more centralized (requires authoritative DNS servers) than IP assignment, but not that much. It is useful in case of systems which have problem with providing stable IPs (like kubernetes). We need dns support at the very least for the non-public (stage) environment of zksync-era. This pr also implements some minor related upgrades (like merged ArcMap functionality into Pool, and moved the sampling logic from zksync-era). --- node/Cargo.lock | 3 + node/Cargo.toml | 1 + node/actors/bft/src/testonly/run.rs | 1 - node/actors/bft/src/tests.rs | 4 +- node/actors/executor/src/lib.rs | 7 +- node/actors/executor/src/tests.rs | 2 +- node/actors/network/src/config.rs | 7 +- node/actors/network/src/consensus/mod.rs | 134 ++++++++++-------- node/actors/network/src/consensus/tests.rs | 16 +-- node/actors/network/src/gossip/arcmap.rs | 42 ------ node/actors/network/src/gossip/mod.rs | 31 ++-- node/actors/network/src/gossip/runner.rs | 48 ++++--- node/actors/network/src/gossip/tests.rs | 42 +++--- .../network/src/gossip/validator_addrs.rs | 26 +++- node/actors/network/src/lib.rs | 20 ++- node/actors/network/src/metrics.rs | 9 +- node/actors/network/src/pool.rs | 47 +++--- node/actors/network/src/rpc/mod.rs | 2 + node/actors/network/src/testonly.rs | 30 ++-- node/actors/network/src/tests.rs | 2 +- .../sync_blocks/src/tests/end_to_end.rs | 2 +- node/libs/concurrency/src/net/mod.rs | 40 ++++++ node/libs/concurrency/src/net/tests.rs | 36 +++++ node/libs/concurrency/src/time.rs | 6 + node/libs/protobuf/Cargo.toml | 4 +- node/libs/protobuf/src/testonly/gen_value.rs | 27 ---- node/libs/protobuf/src/testonly/mod.rs | 108 +++++++++++++- node/libs/utils/Cargo.toml | 3 +- node/libs/utils/src/encode.rs | 123 ++++++++++++++++ node/libs/utils/src/lib.rs | 3 + node/tools/src/bin/deployer.rs | 6 +- node/tools/src/bin/localnet_config.rs | 6 +- node/tools/src/config.rs | 33 ++--- node/tools/src/k8s.rs | 2 +- node/tools/src/main.rs | 7 +- node/tools/src/tests.rs | 33 ++--- 36 files changed, 595 insertions(+), 318 deletions(-) delete mode 100644 node/actors/network/src/gossip/arcmap.rs create mode 100644 node/libs/concurrency/src/net/tests.rs delete mode 100644 node/libs/protobuf/src/testonly/gen_value.rs create mode 100644 node/libs/utils/src/encode.rs diff --git a/node/Cargo.lock b/node/Cargo.lock index de720d3b..aec35e5e 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -3894,6 +3894,7 @@ dependencies = [ name = "zksync_consensus_utils" version = "0.1.0" dependencies = [ + "rand 0.8.5", "thiserror", "zksync_concurrency", ] @@ -3911,10 +3912,12 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "serde_yaml", "tokio", "tracing", "tracing-subscriber", "zksync_concurrency", + "zksync_consensus_utils", "zksync_protobuf_build", ] diff --git a/node/Cargo.toml b/node/Cargo.toml index 3f9f8496..1daf85f2 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -73,6 +73,7 @@ rand04 = { package = "rand", version = "0.4" } rocksdb = "0.21.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.95" +serde_yaml = "0.9" sha3 = "0.10.8" snow = "0.9.3" syn = "2.0.17" diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 693f85ca..af06ea04 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -77,7 +77,6 @@ async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow:: let mut nodes = vec![]; for (i, spec) in specs.iter().enumerate() { let (node, runner) = network::testonly::Instance::new( - ctx, spec.net.clone(), spec.block_store.clone(), ); diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index e6bb83b5..c73cf233 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -3,7 +3,7 @@ use zksync_concurrency::{ctx, scope, time}; use zksync_consensus_roles::validator; async fn run_test(behavior: Behavior, network: Network) { - let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20)); + let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30)); zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); @@ -15,7 +15,7 @@ async fn run_test(behavior: Behavior, network: Network) { Test { network, nodes, - blocks_to_finalize: 15, + blocks_to_finalize: 10, } .run(ctx) .await diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 2e0fad85..cec596d8 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -45,7 +45,7 @@ pub struct Config { pub server_addr: std::net::SocketAddr, /// Public TCP address that other nodes are expected to connect to. /// It is announced over gossip network. - pub public_addr: std::net::SocketAddr, + pub public_addr: net::Host, /// Maximal size of the block payload. pub max_payload_size: usize, @@ -59,7 +59,7 @@ pub struct Config { pub gossip_static_inbound: HashSet, /// Outbound connections that the node should actively try to /// establish and maintain. - pub gossip_static_outbound: HashMap, + pub gossip_static_outbound: HashMap, } impl Config { @@ -90,7 +90,7 @@ impl Executor { fn network_config(&self) -> network::Config { network::Config { server_addr: net::tcp::ListenerAddr::new(self.config.server_addr), - public_addr: self.config.public_addr, + public_addr: self.config.public_addr.clone(), gossip: self.config.gossip(), validator_key: self.validator.as_ref().map(|v| v.key.clone()), ping_timeout: Some(time::Duration::seconds(10)), @@ -136,7 +136,6 @@ impl Executor { s.spawn_blocking(|| dispatcher.run(ctx).context("IO Dispatcher stopped")); s.spawn(async { let (net, runner) = network::Network::new( - ctx, network_config, self.block_store.clone(), network_actor_pipe, diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 2b34e7a0..7ee8394d 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -12,7 +12,7 @@ use zksync_consensus_storage::{ fn config(cfg: &network::Config) -> Config { Config { server_addr: *cfg.server_addr, - public_addr: cfg.public_addr, + public_addr: cfg.public_addr.clone(), max_payload_size: usize::MAX, node_key: cfg.gossip.key.clone(), gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, diff --git a/node/actors/network/src/config.rs b/node/actors/network/src/config.rs index 27d6c498..66e56f51 100644 --- a/node/actors/network/src/config.rs +++ b/node/actors/network/src/config.rs @@ -55,7 +55,7 @@ pub struct GossipConfig { pub static_inbound: HashSet, /// Outbound connections that the node should actively try to /// establish and maintain. - pub static_outbound: HashMap, + pub static_outbound: HashMap, } /// Network actor config. @@ -65,7 +65,10 @@ pub struct Config { pub server_addr: net::tcp::ListenerAddr, /// Public TCP address that other nodes are expected to connect to. /// It is announced over gossip network. - pub public_addr: std::net::SocketAddr, + /// In case public_addr is a domain instead of ip, DNS resolution is + /// performed and a loopback connection is established before announcing + /// the IP address over the gossip network. + pub public_addr: net::Host, /// Gossip network config. pub gossip: GossipConfig, /// Private key of the validator. diff --git a/node/actors/network/src/consensus/mod.rs b/node/actors/network/src/consensus/mod.rs index 32a90540..621acdda 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/actors/network/src/consensus/mod.rs @@ -2,10 +2,8 @@ //! BFT consensus messages are exchanged over this network. use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc}; use anyhow::Context as _; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use rand::seq::SliceRandom; +use std::{collections::HashSet, sync::Arc}; use zksync_concurrency::{ctx, oneshot, scope, sync, time}; use zksync_consensus_roles::validator; use zksync_protobuf::kB; @@ -21,6 +19,15 @@ const RESP_MAX_SIZE: usize = kB; /// is down. const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10); +/// Outbound connection state. +pub(crate) struct Connection { + /// Peer's address. + /// This is not used for now, but will be required for the debug page. + #[allow(dead_code)] + addr: std::net::SocketAddr, + consensus: rpc::Client, +} + /// Consensus network state. pub(crate) struct Network { /// Gossip network state to bootstrap consensus network from. @@ -28,11 +35,9 @@ pub(crate) struct Network { /// This validator's secret key. pub(crate) key: validator::SecretKey, /// Set of the currently open inbound connections. - pub(crate) inbound: PoolWatch, + pub(crate) inbound: PoolWatch, /// Set of the currently open outbound connections. - pub(crate) outbound: PoolWatch, - /// RPC clients for all validators. - pub(crate) clients: HashMap>, + pub(crate) outbound: PoolWatch>, } #[async_trait::async_trait] @@ -61,22 +66,13 @@ impl rpc::Handler for &Network { impl Network { /// Constructs a new consensus network state. - pub(crate) fn new(ctx: &ctx::Ctx, gossip: Arc) -> Option> { + pub(crate) fn new(gossip: Arc) -> Option> { let key = gossip.cfg.validator_key.clone()?; let validators: HashSet<_> = gossip.genesis().validators.iter().cloned().collect(); Some(Arc::new(Self { key, inbound: PoolWatch::new(validators.clone(), 0), outbound: PoolWatch::new(validators.clone(), 0), - clients: validators - .iter() - .map(|peer| { - ( - peer.clone(), - rpc::Client::new(ctx, gossip.cfg.rpc.consensus_rate), - ) - }) - .collect(), gossip, })) } @@ -88,10 +84,11 @@ impl Network { msg: validator::Signed, ) -> anyhow::Result<()> { let req = rpc::consensus::Req(msg); + let outbound = self.outbound.current(); scope::run!(ctx, |ctx, s| async { - for (peer, client) in &self.clients { + for (peer, conn) in &outbound { s.spawn(async { - if let Err(err) = client.call(ctx, &req, RESP_MAX_SIZE).await { + if let Err(err) = conn.consensus.call(ctx, &req, RESP_MAX_SIZE).await { tracing::info!("send({:?},): {err:#}", &*peer); } Ok(()) @@ -109,8 +106,11 @@ impl Network { key: &validator::PublicKey, msg: validator::Signed, ) -> anyhow::Result<()> { - let client = self.clients.get(key).context("not an active validator")?; - client + let outbound = self.outbound.current(); + outbound + .get(key) + .context("not an active validator")? + .consensus .call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE) .await?; Ok(()) @@ -125,7 +125,7 @@ impl Network { ) -> anyhow::Result<()> { let peer = handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?; - self.inbound.insert(peer.clone()).await?; + self.inbound.insert(peer.clone(), ()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) @@ -152,7 +152,6 @@ impl Network { peer: &validator::PublicKey, addr: std::net::SocketAddr, ) -> anyhow::Result<()> { - let client = self.clients.get(peer).context("not an active validator")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::ConsensusNet).await?; handshake::outbound( ctx, @@ -162,11 +161,15 @@ impl Network { peer, ) .await?; - self.outbound.insert(peer.clone()).await?; + let conn = Arc::new(Connection { + addr, + consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate), + }); + self.outbound.insert(peer.clone(), conn.clone()).await?; let res = scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_server(rpc::ping::Server, rpc::ping::RATE) - .add_client(client); + .add_client(&conn.consensus); if let Some(ping_timeout) = &self.gossip.cfg.ping_timeout { let ping_client = rpc::Client::::new(ctx, rpc::ping::RATE); service = service.add_client(&ping_client); @@ -175,6 +178,28 @@ impl Network { ping_client.ping_loop(ctx, *ping_timeout).await }); } + // If this is a loopback connection, announce periodically the address of this + // validator to the network. + // Note that this is executed only for outbound end of the loopback connection. + // Inbound end doesn't know the public address of itself. + if peer == &self.key.public() { + s.spawn(async { + let mut sub = self.gossip.validator_addrs.subscribe(); + while ctx.is_active() { + self.gossip + .validator_addrs + .announce(&self.key, addr, ctx.now_utc()) + .await; + let _ = sync::wait_for( + &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL), + &mut sub, + |got| got.get(peer).map(|x| x.msg.addr) != Some(addr), + ) + .await; + } + Ok(()) + }); + } service.run(ctx, stream).await?; Ok(()) }) @@ -183,9 +208,35 @@ impl Network { res } + async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let addr = *self + .gossip + .cfg + .public_addr + .resolve(ctx) + .await? + .context("resolve()")? + .choose(&mut ctx.rng()) + .with_context(|| { + format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr) + })?; + self.run_outbound_stream(ctx, &self.key.public(), addr) + .await + } + /// Maintains a connection to the given validator. /// If connection breaks, it tries to reconnect periodically. pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) { + // Loopback connection is a special case, because the address is taken from + // the config rather than from `gossip.validator_addrs`. + if &self.key.public() == peer { + while ctx.is_active() { + if let Err(err) = self.run_loopback_stream(ctx).await { + tracing::info!("run_loopback_stream(): {err:#}"); + } + } + return; + } let addrs = &mut self.gossip.validator_addrs.subscribe(); let mut addr = None; while ctx.is_active() { @@ -204,35 +255,4 @@ impl Network { } } } - - /// Periodically announces this validator's public IP over gossip network, - /// so that other validators can discover and connect to this validator. - pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) { - let my_addr = self.gossip.cfg.public_addr; - let mut sub = self.gossip.validator_addrs.subscribe(); - while ctx.is_active() { - let ctx = &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL); - let _ = sync::wait_for(ctx, &mut sub, |got| { - got.get(&self.key.public()).map(|x| &x.msg.addr) != Some(&my_addr) - }) - .await; - let next_version = sub - .borrow() - .get(&self.key.public()) - .map(|x| x.msg.version + 1) - .unwrap_or(0); - self.gossip - .validator_addrs - .update( - &self.gossip.genesis().validators, - &[Arc::new(self.key.sign_msg(validator::NetAddress { - addr: my_addr, - version: next_version, - timestamp: ctx.now_utc(), - }))], - ) - .await - .unwrap(); - } - } } diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index f2adc113..6e23a5a3 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -19,7 +19,7 @@ async fn test_one_connection_per_validator() { let (store,runner) = new_store(ctx,&setup.genesis).await; s.spawn_bg(runner.run(ctx)); let nodes : Vec<_> = nodes.into_iter().enumerate().map(|(i,node)| { - let (node,runner) = testonly::Instance::new(ctx, node, store.clone()); + let (node,runner) = testonly::Instance::new(node, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }).collect(); @@ -77,7 +77,7 @@ async fn test_genesis_mismatch() { tracing::info!("Start one node, we will simulate the other one."); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); tracing::info!("Populate the validator_addrs of the running node."); @@ -87,7 +87,7 @@ async fn test_genesis_mismatch() { .update( &setup.genesis.validators, &[Arc::new(setup.keys[1].sign_msg(validator::NetAddress { - addr: cfgs[1].public_addr, + addr: *cfgs[1].server_addr, version: 0, timestamp: ctx.now_utc(), }))], @@ -109,7 +109,7 @@ async fn test_genesis_mismatch() { tracing::info!("Try to connect to a node with a mismatching genesis."); let mut stream = - preface::connect(ctx, cfgs[0].public_addr, preface::Endpoint::ConsensusNet) + preface::connect(ctx, *cfgs[0].server_addr, preface::Endpoint::ConsensusNet) .await .context("preface::connect")?; let res = handshake::outbound( @@ -145,7 +145,7 @@ async fn test_address_change() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -167,8 +167,8 @@ async fn test_address_change() { // Then it should broadcast its new address and the consensus network // should get reconstructed. cfgs[0].server_addr = net::tcp::testonly::reserve_listener(); - cfgs[0].public_addr = *cfgs[0].server_addr; - let (node0, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + cfgs[0].public_addr = (*cfgs[0].server_addr).into(); + let (node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); nodes[0] = node0; for n in &nodes { @@ -198,7 +198,7 @@ async fn test_transmission() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); let i = ctx::NoCopy(i); s.spawn_bg(async { let i = i; diff --git a/node/actors/network/src/gossip/arcmap.rs b/node/actors/network/src/gossip/arcmap.rs deleted file mode 100644 index 2fe3307d..00000000 --- a/node/actors/network/src/gossip/arcmap.rs +++ /dev/null @@ -1,42 +0,0 @@ -//! Multimap of pointers indexed by `node::PublicKey`. -//! Used to maintain a collection GetBlock rpc clients. -//! TODO(gprusak): consider upgrading PoolWatch instead. -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; -use zksync_consensus_roles::node; - -/// ArcMap -pub(crate) struct ArcMap(Mutex>>>); - -impl Default for ArcMap { - fn default() -> Self { - Self(Mutex::default()) - } -} - -impl ArcMap { - /// Fetches any pointer for the given key. - pub(crate) fn get_any(&self, key: &node::PublicKey) -> Option> { - self.0.lock().unwrap().get(key)?.first().cloned() - } - - /// Insert a pointer. - pub(crate) fn insert(&self, key: node::PublicKey, p: Arc) { - self.0.lock().unwrap().entry(key).or_default().push(p); - } - - /// Removes a pointer. - pub(crate) fn remove(&self, key: node::PublicKey, p: Arc) { - let mut this = self.0.lock().unwrap(); - use std::collections::hash_map::Entry; - let Entry::Occupied(mut e) = this.entry(key) else { - return; - }; - e.get_mut().retain(|c| !Arc::ptr_eq(&p, c)); - if e.get_mut().is_empty() { - e.remove(); - } - } -} diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index e20cf7e8..2b88dbb2 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,43 +12,40 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -use crate::{ - gossip::{ArcMap, ValidatorAddrsWatch}, - io, - pool::PoolWatch, - rpc, Config, -}; +use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, rpc, Config}; use anyhow::Context as _; use std::sync::{atomic::AtomicUsize, Arc}; -mod arcmap; mod handshake; mod runner; #[cfg(test)] mod tests; mod validator_addrs; -pub(crate) use arcmap::*; pub(crate) use validator_addrs::*; use zksync_concurrency::{ctx, ctx::channel}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; +/// State of the gossip connection. +pub(crate) struct Connection { + /// `get_block` rpc client. + pub(crate) get_block: rpc::Client, +} + /// Gossip network state. pub(crate) struct Network { /// Gossip network configuration. pub(crate) cfg: Config, /// Currently open inbound connections. - pub(crate) inbound: PoolWatch, + pub(crate) inbound: PoolWatch>, /// Currently open outbound connections. - pub(crate) outbound: PoolWatch, + pub(crate) outbound: PoolWatch>, /// Current state of knowledge about validators' endpoints. pub(crate) validator_addrs: ValidatorAddrsWatch, /// Block store to serve `get_block` requests from. pub(crate) block_store: Arc, - /// Clients for `get_block` requests for each currently active peer. - pub(crate) get_block_clients: ArcMap>, /// Output pipe of the network actor. pub(crate) sender: channel::UnboundedSender, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. @@ -71,7 +68,6 @@ impl Network { outbound: PoolWatch::new(cfg.gossip.static_outbound.keys().cloned().collect(), 0), validator_addrs: ValidatorAddrsWatch::default(), block_store, - get_block_clients: ArcMap::default(), cfg, push_validator_addrs_calls: 0.into(), }) @@ -89,10 +85,13 @@ impl Network { recipient: &node::PublicKey, number: validator::BlockNumber, ) -> anyhow::Result> { - Ok(self - .get_block_clients - .get_any(recipient) + let outbound = self.outbound.current(); + let inbound = self.inbound.current(); + Ok(outbound + .get(recipient) + .or(inbound.get(recipient)) .context("recipient is unreachable")? + .get_block .call( ctx, &rpc::get_block::Req(number), diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index b84ac131..e3568b13 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -1,8 +1,10 @@ -use super::{handshake, Network, ValidatorAddrs}; +use super::{handshake, Connection, Network, ValidatorAddrs}; use crate::{io, noise, preface, rpc}; +use anyhow::Context as _; use async_trait::async_trait; +use rand::seq::SliceRandom; use std::sync::{atomic::Ordering, Arc}; -use zksync_concurrency::{ctx, oneshot, scope, sync}; +use zksync_concurrency::{ctx, net, oneshot, scope, sync}; use zksync_consensus_roles::node; use zksync_consensus_storage::BlockStore; use zksync_protobuf::kB; @@ -79,6 +81,7 @@ impl Network { ctx: &ctx::Ctx, peer: &node::PublicKey, stream: noise::Stream, + conn: &Connection, ) -> anyhow::Result<()> { let push_validator_addrs_client = rpc::Client::::new( ctx, @@ -90,15 +93,7 @@ impl Network { self.cfg.rpc.push_block_store_state_rate, ); let push_block_store_state_server = PushBlockStoreStateServer { peer, net: self }; - - let get_block_client = Arc::new(rpc::Client::::new( - ctx, - self.cfg.rpc.get_block_rate, - )); - self.get_block_clients - .insert(peer.clone(), get_block_client.clone()); - - let res = scope::run!(ctx, |ctx, s| async { + scope::run!(ctx, |ctx, s| async { let mut service = rpc::Service::new() .add_client(&push_validator_addrs_client) .add_server( @@ -110,7 +105,7 @@ impl Network { push_block_store_state_server, self.cfg.rpc.push_block_store_state_rate, ) - .add_client(&get_block_client) + .add_client(&conn.get_block) .add_server(&*self.block_store, self.cfg.rpc.get_block_rate) .add_server(rpc::ping::Server, rpc::ping::RATE); @@ -154,11 +149,7 @@ impl Network { service.run(ctx, stream).await?; Ok(()) }) - .await; - - self.get_block_clients - .remove(peer.clone(), get_block_client); - res + .await } /// Handles an inbound stream. @@ -171,8 +162,11 @@ impl Network { let peer = handshake::inbound(ctx, &self.cfg.gossip, self.genesis().hash(), &mut stream).await?; tracing::Span::current().record("peer", tracing::field::debug(&peer)); - self.inbound.insert(peer.clone()).await?; - let res = self.run_stream(ctx, &peer, stream).await; + let conn = Arc::new(Connection { + get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), + }); + self.inbound.insert(peer.clone(), conn.clone()).await?; + let res = self.run_stream(ctx, &peer, stream, &conn).await; self.inbound.remove(&peer).await; res } @@ -182,8 +176,14 @@ impl Network { &self, ctx: &ctx::Ctx, peer: &node::PublicKey, - addr: std::net::SocketAddr, + addr: net::Host, ) -> anyhow::Result<()> { + let addr = *addr + .resolve(ctx) + .await? + .context("resolve()")? + .choose(&mut ctx.rng()) + .with_context(|| "{addr:?} resolved to empty address set")?; let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet).await?; handshake::outbound( ctx, @@ -193,9 +193,11 @@ impl Network { peer, ) .await?; - - self.outbound.insert(peer.clone()).await?; - let res = self.run_stream(ctx, peer, stream).await; + let conn = Arc::new(Connection { + get_block: rpc::Client::::new(ctx, self.cfg.rpc.get_block_rate), + }); + self.outbound.insert(peer.clone(), conn.clone()).await?; + let res = self.run_stream(ctx, peer, stream, &conn).await; self.outbound.remove(peer).await; res } diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 575a8492..85e4d0d1 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -10,7 +10,7 @@ use std::{ use test_casing::{test_casing, Product}; use tracing::Instrument as _; use zksync_concurrency::{ - ctx, oneshot, scope, sync, + ctx, net, oneshot, scope, sync, testonly::{abort_on_panic, set_timeout}, time, }; @@ -30,7 +30,7 @@ async fn test_one_connection_per_node() { let (store,runner) = new_store(ctx,&setup.genesis).await; s.spawn_bg(runner.run(ctx)); let mut nodes : Vec<_> = cfgs.iter().enumerate().map(|(i,cfg)| { - let (node,runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node,runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }).collect(); @@ -46,7 +46,7 @@ async fn test_one_connection_per_node() { let (peer, addr) = cfgs[0].gossip.static_outbound.iter().next().unwrap(); let mut stream = preface::connect( ctx, - *addr, + addr.resolve(ctx).await.unwrap().unwrap()[0], preface::Endpoint::GossipNet, ) .await @@ -235,7 +235,7 @@ async fn test_validator_addrs_propagation() { .iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -245,7 +245,7 @@ async fn test_validator_addrs_propagation() { .map(|cfg| { ( cfg.validator_key.as_ref().unwrap().public(), - cfg.public_addr, + *cfg.server_addr, ) }) .collect(); @@ -274,7 +274,7 @@ async fn test_genesis_mismatch() { tracing::info!("Start one node, we will simulate the other one."); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (_node, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (_node, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); tracing::info!("Accept a connection with mismatching genesis."); @@ -290,7 +290,7 @@ async fn test_genesis_mismatch() { assert_matches!(res, Err(handshake::Error::GenesisMismatch)); tracing::info!("Try to connect to a node with a mismatching genesis."); - let mut stream = preface::connect(ctx, cfgs[0].public_addr, preface::Endpoint::GossipNet) + let mut stream = preface::connect(ctx, *cfgs[0].server_addr, preface::Endpoint::GossipNet) .await .context("preface::connect")?; let res = handshake::outbound( @@ -333,7 +333,7 @@ async fn syncing_blocks(node_count: usize, gossip_peers: usize) { for (i, cfg) in cfgs.into_iter().enumerate() { let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfg, store); + let (node, runner) = testonly::Instance::new(cfg, store); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); nodes.push(node); } @@ -408,7 +408,7 @@ async fn uncoordinated_block_syncing( let i = i; let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); s.spawn(async { let store = store; @@ -455,7 +455,7 @@ async fn getting_blocks_from_peers(node_count: usize, gossip_peers: usize) { .into_iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) @@ -536,7 +536,7 @@ async fn validator_node_restart() { cfg.rpc.push_validator_addrs_rate.refresh = time::Duration::ZERO; } let (store, store_runner) = new_store(ctx, &setup.genesis).await; - let (node1, node1_runner) = testonly::Instance::new(ctx, cfgs[1].clone(), store.clone()); + let (node1, node1_runner) = testonly::Instance::new(cfgs[1].clone(), store.clone()); scope::run!(ctx, |ctx, s| async { s.spawn_bg(store_runner.run(ctx)); s.spawn_bg( @@ -552,8 +552,8 @@ async fn validator_node_restart() { let start = ctx.now_utc(); for clock_shift in [zero, sec, -2 * sec, 4 * sec, 10 * sec, -30 * sec] { // Set the new addr to broadcast. - let addr0 = mk_addr(rng); - cfgs[0].public_addr = addr0; + cfgs[0].server_addr = net::tcp::testonly::reserve_listener(); + cfgs[0].public_addr = (*cfgs[0].server_addr).into(); // Shift the UTC clock. let now = start + clock_shift; assert!( @@ -565,16 +565,14 @@ async fn validator_node_restart() { // _node0 contains pipe, which has to exist to prevent the connection from dying // early. - let (_node0, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (_node0, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); scope::run!(ctx, |ctx, s| async { s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node0"))); tracing::info!("wait for the update to arrive to node1"); let sub = &mut node1.net.gossip.validator_addrs.subscribe(); + let want = Some(*cfgs[0].server_addr); sync::wait_for(ctx, sub, |got| { - let Some(got) = got.get(&setup.keys[0].public()) else { - return false; - }; - got.msg.addr == addr0 + got.get(&setup.keys[0].public()).map(|x| x.msg.addr) == want }) .await?; Ok(()) @@ -607,13 +605,13 @@ async fn rate_limiting() { .map(|cfg| { ( cfg.validator_key.as_ref().unwrap().public(), - cfg.public_addr, + *cfg.server_addr, ) }) .collect(); for i in 1..n { let key = cfgs[i].gossip.key.public().clone(); - let public_addr = cfgs[i].public_addr; + let public_addr = cfgs[i].public_addr.clone(); cfgs[0].gossip.static_outbound.insert(key, public_addr); } let mut nodes = vec![]; @@ -623,7 +621,7 @@ async fn rate_limiting() { // Spawn the satellite nodes and wait until they register // their own address. for (i, cfg) in cfgs[1..].iter().enumerate() { - let (node, runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone()); + let (node, runner) = testonly::Instance::new(cfg.clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); let sub = &mut node.net.gossip.validator_addrs.subscribe(); sync::wait_for(ctx, sub, |got| { @@ -636,7 +634,7 @@ async fn rate_limiting() { } // Spawn the center node. - let (center, runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + let (center, runner) = testonly::Instance::new(cfgs[0].clone(), store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node[0]"))); // Await for the center to receive all validator addrs. let sub = &mut center.net.gossip.validator_addrs.subscribe(); diff --git a/node/actors/network/src/gossip/validator_addrs.rs b/node/actors/network/src/gossip/validator_addrs.rs index ed176bf5..8fd21b63 100644 --- a/node/actors/network/src/gossip/validator_addrs.rs +++ b/node/actors/network/src/gossip/validator_addrs.rs @@ -1,7 +1,7 @@ //! Global state distributed by active validators, observed by all the nodes in the network. use crate::watch::Watch; use std::{collections::HashSet, sync::Arc}; -use zksync_concurrency::sync; +use zksync_concurrency::{sync, time}; use zksync_consensus_roles::validator; /// Mapping from validator::PublicKey to a signed validator::NetAddress. @@ -90,6 +90,28 @@ impl ValidatorAddrsWatch { self.0.subscribe() } + /// Inserts a new version of the announcement signed with the given key. + pub(crate) async fn announce( + &self, + key: &validator::SecretKey, + addr: std::net::SocketAddr, + timestamp: time::Utc, + ) { + let this = self.0.lock().await; + let mut validator_addrs = this.borrow().clone(); + let version = validator_addrs + .get(&key.public()) + .map(|x| x.msg.version + 1) + .unwrap_or(0); + let d = Arc::new(key.sign_msg(validator::NetAddress { + addr, + version, + timestamp, + })); + validator_addrs.0.insert(d.key.clone(), d); + this.send_replace(validator_addrs); + } + /// Inserts data to ValidatorAddrs. /// Subscribers are notified iff at least 1 new entry has /// been inserted. Returns an error iff an invalid @@ -103,7 +125,7 @@ impl ValidatorAddrsWatch { let this = self.0.lock().await; let mut validator_addrs = this.borrow().clone(); if validator_addrs.update(validators, data)? { - this.send(validator_addrs).ok().unwrap(); + this.send_replace(validator_addrs); } Ok(()) } diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 73c25cb4..c8d3e70d 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -46,13 +46,12 @@ impl Network { /// Constructs a new network actor state. /// Call `run_network` to run the actor. pub fn new( - ctx: &ctx::Ctx, cfg: Config, block_store: Arc, pipe: ActorPipe, ) -> (Arc, Runner) { let gossip = gossip::Network::new(cfg, block_store, pipe.send); - let consensus = consensus::Network::new(ctx, gossip.clone()); + let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( net.clone(), @@ -137,8 +136,11 @@ impl Runner { for (peer, addr) in &self.net.gossip.cfg.gossip.static_outbound { s.spawn(async { loop { - let run_result = - self.net.gossip.run_outbound_stream(ctx, peer, *addr).await; + let run_result = self + .net + .gossip + .run_outbound_stream(ctx, peer, addr.clone()) + .await; if let Err(err) = run_result { tracing::info!("gossip.run_outbound_stream(): {err:#}"); } @@ -150,20 +152,16 @@ impl Runner { } if let Some(c) = &self.net.consensus { + let validators = &c.gossip.genesis().validators; // If we are active validator ... - if c.gossip.genesis().validators.contains(&c.key.public()) { + if validators.contains(&c.key.public()) { // Maintain outbound connections. - for peer in c.clients.keys() { + for peer in validators.iter() { s.spawn(async { c.maintain_connection(ctx, peer).await; Ok(()) }); } - // Announce IP periodically. - s.spawn(async { - c.run_address_announcer(ctx).await; - Ok(()) - }); } } diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs index 86fa1cf5..5cf1c52f 100644 --- a/node/actors/network/src/metrics.rs +++ b/node/actors/network/src/metrics.rs @@ -145,15 +145,14 @@ impl NetworkGauges { let register_result = COLLECTOR.before_scrape(move || { state_ref.upgrade().map(|state| { let gauges = NetworkGauges::default(); - let len = state.gossip.inbound.subscribe().borrow().current().len(); + let len = state.gossip.inbound.current().len(); gauges.gossip_inbound_connections.set(len); - let len = state.gossip.outbound.subscribe().borrow().current().len(); + let len = state.gossip.outbound.current().len(); gauges.gossip_outbound_connections.set(len); if let Some(consensus_state) = &state.consensus { - let len = consensus_state.inbound.subscribe().borrow().current().len(); + let len = consensus_state.inbound.current().len(); gauges.consensus_inbound_connections.set(len); - let subscriber = consensus_state.outbound.subscribe(); - let len = subscriber.borrow().current().len(); + let len = consensus_state.outbound.current().len(); gauges.consensus_outbound_connections.set(len); } gauges diff --git a/node/actors/network/src/pool.rs b/node/actors/network/src/pool.rs index 92257993..a5e7375f 100644 --- a/node/actors/network/src/pool.rs +++ b/node/actors/network/src/pool.rs @@ -4,37 +4,36 @@ use crate::watch::Watch; use std::collections::HashSet; use zksync_concurrency::sync; -/// Set of elements of type T. +/// Map with restrictions on the allowed keys. /// This set consists of an arbitrary subset of `allowed` + up to `extra_limit` elements outside of /// `allowed`. #[derive(Clone)] -pub(crate) struct Pool { +pub(crate) struct Pool { extra_limit: usize, extra_count: usize, - allowed: HashSet, - current: HashSet, + allowed: HashSet, + current: im::HashMap, } -impl Pool { - /// Returns a reference to the underlying set. - #[allow(dead_code)] - pub(crate) fn current(&self) -> &HashSet { +impl Pool { + /// Current pool state. + pub(crate) fn current(&self) -> &im::HashMap { &self.current } } /// Watch wrapper of the Pool. -/// Supports subscribing to the Pool updates. -pub(crate) struct PoolWatch(Watch>); +/// Supports subscribing to the Pool membership changes. +pub(crate) struct PoolWatch(Watch>); -impl PoolWatch { +impl PoolWatch { /// Constructs a new pool. - pub(crate) fn new(allowed: HashSet, extra_limit: usize) -> Self { + pub(crate) fn new(allowed: HashSet, extra_limit: usize) -> Self { Self(Watch::new(Pool { extra_limit, extra_count: 0, allowed, - current: HashSet::new(), + current: im::HashMap::new(), })) } @@ -42,40 +41,44 @@ impl PoolWatch { /// Returns an error if /// * `v` is already in the set /// * `v` cannot be added due to size restrictions - pub(crate) async fn insert(&self, v: T) -> anyhow::Result<()> { + pub(crate) async fn insert(&self, k: K, v: V) -> anyhow::Result<()> { self.0 .send_if_ok(|pool| { - if pool.current.contains(&v) { + if pool.current.contains_key(&k) { anyhow::bail!("already exists"); } - if !pool.allowed.contains(&v) { + if !pool.allowed.contains(&k) { if pool.extra_count >= pool.extra_limit { anyhow::bail!("limit exceeded"); } pool.extra_count += 1; } - pool.current.insert(v); + pool.current.insert(k, v); Ok(()) }) .await } /// Removes an element from the set. - pub(crate) async fn remove(&self, v: &T) { + pub(crate) async fn remove(&self, k: &K) { self.0.lock().await.send_if_modified(|pool| { - if !pool.current.remove(v) { + if pool.current.remove(k).is_none() { return false; } - if !pool.allowed.contains(v) { + if !pool.allowed.contains(k) { pool.extra_count -= 1; } true }); } + /// Copy of the current pool state. + pub(crate) fn current(&self) -> im::HashMap { + self.0.subscribe().borrow().current.clone() + } + /// Subscribes to the set changes. - #[allow(dead_code)] - pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { + pub(crate) fn subscribe(&self) -> sync::watch::Receiver> { self.0.subscribe() } } diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index 184326cd..1e58ad17 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -119,6 +119,8 @@ pub(crate) struct Client { impl Client { /// Constructs a new client. + // TODO(gprusak): at this point we don't need the clients to be reusable, + // so perhaps they should be constructed by `Service::add_client` instead? pub(crate) fn new(ctx: &ctx::Ctx, rate: limiter::Rate) -> Self { Client { limiter: limiter::Limiter::new(ctx, rate), diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index 022bb747..3a55c324 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -57,7 +57,7 @@ pub fn new_configs( let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, - public_addr: *addr, + public_addr: (*addr).into(), // Pings are disabled in tests by default to avoid dropping connections // due to timeouts. ping_timeout: None, @@ -79,7 +79,7 @@ pub fn new_configs( for j in 0..gossip_peers { let j = (i + j + 1) % n; let peer = cfgs[j].gossip.key.public(); - let addr = *cfgs[j].server_addr; + let addr = cfgs[j].public_addr.clone(); cfgs[i].gossip.static_outbound.insert(peer, addr); } } @@ -92,7 +92,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, - public_addr: *addr, + public_addr: (*addr).into(), // Pings are disabled in tests by default to avoid dropping connections // due to timeouts. ping_timeout: None, @@ -101,7 +101,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { key: rng.gen(), dynamic_inbound_limit: usize::MAX, static_inbound: HashSet::default(), - static_outbound: [(peer.gossip.key.public(), peer.public_addr)].into(), + static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(), }, max_block_size: usize::MAX, rpc: RpcConfig::default(), @@ -130,13 +130,9 @@ impl InstanceRunner { impl Instance { /// Construct an instance for a given config. - pub fn new( - ctx: &ctx::Ctx, - cfg: Config, - block_store: Arc, - ) -> (Self, InstanceRunner) { + pub fn new(cfg: Config, block_store: Arc) -> (Self, InstanceRunner) { let (actor_pipe, dispatcher_pipe) = pipe::new(); - let (net, runner) = Network::new(ctx, cfg, block_store, actor_pipe); + let (net, runner) = Network::new(cfg, block_store, actor_pipe); let (terminate_send, terminate_recv) = channel::bounded(1); ( Self { @@ -187,7 +183,7 @@ impl Instance { .gossip .outbound .subscribe() - .wait_for(|got| want.is_subset(got.current())) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); } @@ -200,13 +196,13 @@ impl Instance { consensus_state .inbound .subscribe() - .wait_for(|got| got.current() == &want) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); consensus_state .outbound .subscribe() - .wait_for(|got| got.current() == &want) + .wait_for(|got| want.iter().all(|k| got.current().contains_key(k))) .await .unwrap(); } @@ -219,11 +215,11 @@ impl Instance { ) -> ctx::OrCanceled<()> { let state = &self.net.gossip; sync::wait_for(ctx, &mut state.inbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; sync::wait_for(ctx, &mut state.outbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; Ok(()) @@ -237,11 +233,11 @@ impl Instance { ) -> ctx::OrCanceled<()> { let state = self.net.consensus.as_ref().unwrap(); sync::wait_for(ctx, &mut state.inbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; sync::wait_for(ctx, &mut state.outbound.subscribe(), |got| { - !got.current().contains(peer) + !got.current().contains_key(peer) }) .await?; Ok(()) diff --git a/node/actors/network/src/tests.rs b/node/actors/network/src/tests.rs index c1cde500..718ac5e6 100644 --- a/node/actors/network/src/tests.rs +++ b/node/actors/network/src/tests.rs @@ -20,7 +20,7 @@ async fn test_metrics() { .into_iter() .enumerate() .map(|(i, cfg)| { - let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone()); + let (node, runner) = testonly::Instance::new(cfg, store.clone()); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); node }) diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index b078a804..77e3affd 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -80,7 +80,7 @@ impl NodeRunner { let key = self.network.gossip.key.public(); let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); let (mut network, network_runner) = - network::testonly::Instance::new(ctx, self.network.clone(), self.store.clone()); + network::testonly::Instance::new(self.network.clone(), self.store.clone()); let sync_blocks_config = Config::new(); let res = scope::run!(ctx, |ctx, s| async { s.spawn_bg(self.store_runner.run(ctx)); diff --git a/node/libs/concurrency/src/net/mod.rs b/node/libs/concurrency/src/net/mod.rs index 12731c90..8a507737 100644 --- a/node/libs/concurrency/src/net/mod.rs +++ b/node/libs/concurrency/src/net/mod.rs @@ -1,3 +1,43 @@ //! Context-aware network utilities. //! Built on top of `tokio::net`. +use crate::ctx; + pub mod tcp; + +#[cfg(test)] +mod tests; + +/// Network host address in the format ":". +/// NOT VALIDATED, validation happens at `Host::resolve()` call. +// TODO: for better type safety consider verifying host to be in the valid +// format in constructor. +#[derive(Debug, Clone, PartialEq)] +pub struct Host(pub String); + +impl From for Host { + fn from(addr: std::net::SocketAddr) -> Self { + Self(addr.to_string()) + } +} + +impl Host { + /// If host is of the form ":", performs DNS resolution. + /// If host is of the form ":", just parses the SocketAddr. + pub async fn resolve( + &self, + ctx: &ctx::Ctx, + ) -> ctx::OrCanceled>> { + let host = self.0.clone(); + // Note that we may orphan a task executing the underlying `getnameinfo` call + // if the ctx gets cancelled. This should be fine given that it is expected to finish + // after a timeout and it doesn't affect the state of the application. + // We don't use `tokio::net::lookup_host`, because it is not documented to be cancel-safe. + Ok(ctx + .wait(tokio::task::spawn_blocking(move || { + // This should never panic, so unwrapping the task result is ok. + Ok(std::net::ToSocketAddrs::to_socket_addrs(&host)?.collect()) + })) + .await? + .unwrap()) + } +} diff --git a/node/libs/concurrency/src/net/tests.rs b/node/libs/concurrency/src/net/tests.rs new file mode 100644 index 00000000..69c7a1e7 --- /dev/null +++ b/node/libs/concurrency/src/net/tests.rs @@ -0,0 +1,36 @@ +use super::*; + +#[tokio::test] +async fn test_resolve() { + let ctx = &ctx::test_root(&ctx::RealClock); + let addrs = Host("localhost:1234".into()) + .resolve(ctx) + .await + .unwrap() + .unwrap(); + // We assume here that loopback host "localhost" is always configured here. + // If that turns out to be problematic we should use a more robust DNS resolution + // library, so that we can run our own DNS server in tests. + assert!(!addrs.is_empty()); + for addr in addrs { + // Verify the port and that the returned address is actually a loopback. + assert_eq!(addr.port(), 1234); + assert!(addr.ip().is_loopback()); + + // Verify that resolving the serialized address returns the same address. + let got = Host(addr.to_string()).resolve(ctx).await.unwrap().unwrap(); + assert!(got.len() == 1); + assert_eq!(got[0], addr); + } + + // Host without port should not resolve. + assert!(Host("localhost".into()) + .resolve(ctx) + .await + .unwrap() + .is_err()); + // Invalid host name should not resolve. + assert!(Host("$#$:6746".into()).resolve(ctx).await.unwrap().is_err()); + // Empty host name should not resolve. + assert!(Host(String::new()).resolve(ctx).await.unwrap().is_err()); +} diff --git a/node/libs/concurrency/src/time.rs b/node/libs/concurrency/src/time.rs index b46cca63..718317c6 100644 --- a/node/libs/concurrency/src/time.rs +++ b/node/libs/concurrency/src/time.rs @@ -18,6 +18,12 @@ impl std::fmt::Debug for Utc { } } +impl std::fmt::Display for Utc { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + (time::OffsetDateTime::UNIX_EPOCH + self.0).fmt(f) + } +} + /// Start of the unix epoch. pub const UNIX_EPOCH: Utc = Utc(Duration::ZERO); diff --git a/node/libs/protobuf/Cargo.toml b/node/libs/protobuf/Cargo.toml index e7302137..889b531c 100644 --- a/node/libs/protobuf/Cargo.toml +++ b/node/libs/protobuf/Cargo.toml @@ -9,6 +9,7 @@ links = "zksync_protobuf_proto" [dependencies] zksync_concurrency.workspace = true +zksync_consensus_utils.workspace = true anyhow.workspace = true bit-vec.workspace = true @@ -19,6 +20,7 @@ quick-protobuf.workspace = true rand.workspace = true serde.workspace = true serde_json.workspace = true +serde_yaml.workspace = true [dev-dependencies] tokio.workspace = true @@ -29,4 +31,4 @@ tracing-subscriber.workspace = true zksync_protobuf_build.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/node/libs/protobuf/src/testonly/gen_value.rs b/node/libs/protobuf/src/testonly/gen_value.rs deleted file mode 100644 index 4c091e37..00000000 --- a/node/libs/protobuf/src/testonly/gen_value.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! Util types for generating random values in tests. - -use rand::Rng; - -/// Generator of random values. -pub struct GenValue<'a, R: Rng> { - /// Underlying RNG. - pub rng: &'a mut R, - /// Generate values with only required fields. - pub required_only: bool, - /// Generate decimal fractions for f64 - /// to avoid rounding errors of decimal encodings. - pub decimal_fractions: bool, -} - -impl<'a, R: Rng> GenValue<'a, R> { - /// Generates a random value of type `C`. - pub fn gen(&mut self) -> C { - C::sample(self) - } -} - -/// Types that can be used to generate a random instance. -pub trait RandomValue { - /// Generates a random value. - fn sample(g: &mut GenValue) -> Self; -} diff --git a/node/libs/protobuf/src/testonly/mod.rs b/node/libs/protobuf/src/testonly/mod.rs index e86a6479..051db885 100644 --- a/node/libs/protobuf/src/testonly/mod.rs +++ b/node/libs/protobuf/src/testonly/mod.rs @@ -1,14 +1,12 @@ //! Testonly utilities. - -pub mod gen_value; -use super::{canonical, canonical_raw, decode, encode, read_fields, ProtoFmt, Wire}; -pub use gen_value::*; +use super::{canonical, canonical_raw, decode, encode, read_fields, ProtoFmt, ProtoRepr, Wire}; use prost::Message as _; -use prost_reflect::ReflectMessage as _; +use prost_reflect::ReflectMessage; use rand::{ distributions::{Distribution, Standard}, Rng, }; +use zksync_consensus_utils::EncodeDist; /// Test encoding and canonical encoding properties. #[track_caller] @@ -27,7 +25,7 @@ pub fn test_encode(rng: &mut } /// Syntax sugar for `test_encode`, -/// because `test_encode(rng,&rng::gen())` doesn't compile. +/// because `test_encode(rng,&rng.gen())` doesn't compile. #[track_caller] pub fn test_encode_random(rng: &mut impl Rng) where @@ -90,3 +88,101 @@ pub(crate) fn encode_shuffled(rng: &mut R, x: &T) -> Vec anyhow::Result; + /// build. + fn build(this: &Self::Type) -> Self::Proto; +} + +fn encode_proto(msg: &X::Type) -> Vec { + let msg = X::build(msg); + canonical_raw(&msg.encode_to_vec(), &msg.descriptor()).unwrap() +} + +fn decode_proto(bytes: &[u8]) -> anyhow::Result { + X::read(&X::Proto::decode(bytes)?) +} + +fn encode_json(msg: &X::Type) -> String { + let mut s = serde_json::Serializer::pretty(vec![]); + crate::serde::serialize_proto(&X::build(msg), &mut s).unwrap(); + String::from_utf8(s.into_inner()).unwrap() +} + +fn decode_json(json: &str) -> anyhow::Result { + let mut d = serde_json::Deserializer::from_str(json); + X::read(&crate::serde::deserialize_proto(&mut d)?) +} + +fn encode_yaml(msg: &X::Type) -> String { + let mut s = serde_yaml::Serializer::new(vec![]); + crate::serde::serialize_proto(&X::build(msg), &mut s).unwrap(); + String::from_utf8(s.into_inner().unwrap()).unwrap() +} + +fn decode_yaml(yaml: &str) -> anyhow::Result { + let d = serde_yaml::Deserializer::from_str(yaml); + X::read(&crate::serde::deserialize_proto(d)?) +} + +/// Wrapper for `ProtoRepr`, implementing ProtoConv; +pub struct ReprConv(std::marker::PhantomData

); +/// Wrapper for `ProtoFmt`, implementing ProtoConv; +pub struct FmtConv(std::marker::PhantomData); + +impl ProtoConv for FmtConv { + type Type = T; + type Proto = T::Proto; + fn read(r: &T::Proto) -> anyhow::Result { + ProtoFmt::read(r) + } + fn build(this: &T) -> T::Proto { + ProtoFmt::build(this) + } +} + +impl ProtoConv for ReprConv

{ + type Type = P::Type; + type Proto = P; + fn read(r: &P) -> anyhow::Result { + ProtoRepr::read(r) + } + fn build(this: &P::Type) -> P { + ProtoRepr::build(this) + } +} + +/// Test reencoding random values in various formats. +#[track_caller] +pub fn test_encode_all_formats(rng: &mut impl Rng) +where + X::Type: std::fmt::Debug + PartialEq, + EncodeDist: Distribution, +{ + for required_only in [false, true] { + let want: X::Type = EncodeDist { + required_only, + decimal_fractions: false, + } + .sample(rng); + let got = decode_proto::(&encode_proto::(&want)).unwrap(); + assert_eq!(&want, &got, "binary encoding"); + let got = decode_yaml::(&encode_yaml::(&want)).unwrap(); + assert_eq!(&want, &got, "yaml encoding"); + + let want: X::Type = EncodeDist { + required_only, + decimal_fractions: true, + } + .sample(rng); + let got = decode_json::(&encode_json::(&want)).unwrap(); + assert_eq!(&want, &got, "json encoding"); + } +} diff --git a/node/libs/utils/Cargo.toml b/node/libs/utils/Cargo.toml index 07ac636b..b6a0d6a9 100644 --- a/node/libs/utils/Cargo.toml +++ b/node/libs/utils/Cargo.toml @@ -9,7 +9,8 @@ license.workspace = true [dependencies] zksync_concurrency.workspace = true +rand.workspace = true thiserror.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/node/libs/utils/src/encode.rs b/node/libs/utils/src/encode.rs new file mode 100644 index 00000000..621c017f --- /dev/null +++ b/node/libs/utils/src/encode.rs @@ -0,0 +1,123 @@ +//! Utilities for testing encodings. +use rand::{ + distributions::{Alphanumeric, DistString, Distribution}, + Rng, +}; +use zksync_concurrency::net; + +/// Distribution for testing encodings. +pub struct EncodeDist { + /// Generate configs with only required fields. + pub required_only: bool, + /// Generate decimal fractions for f64 + /// to avoid rounding errors of decimal encodings. + pub decimal_fractions: bool, +} + +impl EncodeDist { + /// Returns a small non-empty range if `required_only` is false. + /// Returns an empty range otherwise. + pub fn sample_range(&self, rng: &mut (impl Rng + ?Sized)) -> std::ops::Range { + if self.required_only { + 0..0 + } else { + 0..rng.gen_range(5..10) + } + } + + /// Returns `Some(f())` if `required_only` is false. + /// Returns `None otherwise. + pub fn sample_opt(&self, f: impl FnOnce() -> T) -> Option { + if self.required_only { + None + } else { + Some(f()) + } + } + + /// Samples a collection of type T. + pub fn sample_collect>( + &self, + rng: &mut (impl Rng + ?Sized), + ) -> T + where + EncodeDist: Distribution, + { + self.sample_range(rng).map(|_| self.sample(rng)).collect() + } +} + +impl Distribution> for EncodeDist +where + EncodeDist: Distribution, +{ + fn sample(&self, rng: &mut R) -> Option { + self.sample_opt(|| self.sample(rng)) + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> std::net::SocketAddr { + std::net::SocketAddr::new(std::net::IpAddr::from(rng.gen::<[u8; 16]>()), rng.gen()) + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> net::Host { + Distribution::::sample(self, rng).into() + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> String { + let n = self.sample_range(rng).len(); + Alphanumeric.sample_string(rng, n) + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> bool { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u8 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u16 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u32 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> u64 { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> usize { + rng.gen() + } +} +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> std::num::NonZeroU32 { + rng.gen() + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> f64 { + if self.decimal_fractions { + const PRECISION: usize = 1000000; + #[allow(clippy::float_arithmetic)] + return rng.gen_range(0..PRECISION) as f64 / PRECISION as f64; + } + rng.gen() + } +} diff --git a/node/libs/utils/src/lib.rs b/node/libs/utils/src/lib.rs index 31e43ba1..71fc6af1 100644 --- a/node/libs/utils/src/lib.rs +++ b/node/libs/utils/src/lib.rs @@ -1,4 +1,7 @@ //! Crate that holds several small utilities and primitives. +mod encode; pub mod enum_util; pub mod pipe; + +pub use encode::*; diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index f8863a03..c5d2557e 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -1,10 +1,8 @@ //! Deployer for the kubernetes cluster. use clap::Parser; use std::collections::HashMap; -use zksync_consensus_roles::node::SecretKey; -use zksync_consensus_roles::validator; -use zksync_consensus_tools::k8s::ConsensusNode; -use zksync_consensus_tools::{k8s, AppConfig}; +use zksync_consensus_roles::{node::SecretKey, validator}; +use zksync_consensus_tools::{k8s, k8s::ConsensusNode, AppConfig}; /// Command line arguments. #[derive(Debug, Parser)] diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index e5ef14bb..a13db3a6 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -61,21 +61,21 @@ fn main() -> anyhow::Result<()> { default_config.with_metrics_server_addr(metrics_server_addr); } let mut cfgs: Vec<_> = (0..nodes) - .map(|i| default_config.with_public_addr(addrs[i]).clone()) + .map(|i| default_config.with_public_addr(addrs[i].into()).clone()) .collect(); // Construct a gossip network with optimal diameter. for i in 0..nodes { for j in 0..peers { let next = (i * peers + j + 1) % nodes; - cfgs[i].add_gossip_static_outbound(node_keys[next].public(), addrs[next]); + cfgs[i].add_gossip_static_outbound(node_keys[next].public(), addrs[next].into()); cfgs[next].add_gossip_static_inbound(node_keys[i].public()); } } for (i, cfg) in cfgs.into_iter().enumerate() { // Recreate the directory for the node's config. - let root = args.output_dir.join(cfg.public_addr.to_string()); + let root = args.output_dir.join(&cfg.public_addr.0); let _ = fs::remove_dir_all(&root); fs::create_dir_all(&root).with_context(|| format!("create_dir_all({:?})", root))?; cfg.write_to_file(&root)?; diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index ffb8410c..944c39b5 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -9,7 +9,7 @@ use std::{ path::{Path, PathBuf}, str::FromStr, }; -use zksync_concurrency::ctx; +use zksync_concurrency::{ctx, net}; use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; @@ -43,11 +43,11 @@ pub(crate) fn encode_with_serializer( String::from_utf8(serializer.into_inner()).unwrap() } -/// Pair of (public key, ip address) for a gossip network node. +/// Pair of (public key, host addr) for a gossip network node. #[derive(Debug, Clone)] pub struct NodeAddr { pub key: node::PublicKey, - pub addr: SocketAddr, + pub addr: net::Host, } impl ProtoFmt for NodeAddr { @@ -55,14 +55,14 @@ impl ProtoFmt for NodeAddr { fn read(r: &Self::Proto) -> anyhow::Result { let key = read_required_text(&r.key).context("key")?; - let addr = read_required_text(&r.addr).context("addr")?; + let addr = net::Host(required(&r.addr).context("addr")?.clone()); Ok(Self { addr, key }) } fn build(&self) -> Self::Proto { Self::Proto { key: Some(TextFmt::encode(&self.key)), - addr: Some(TextFmt::encode(&self.addr)), + addr: Some(self.addr.0.clone()), } } } @@ -72,7 +72,7 @@ impl ProtoFmt for NodeAddr { #[derive(Debug, PartialEq, Clone)] pub struct AppConfig { pub server_addr: SocketAddr, - pub public_addr: SocketAddr, + pub public_addr: net::Host, pub metrics_server_addr: Option, pub genesis: validator::Genesis, @@ -80,7 +80,7 @@ pub struct AppConfig { pub gossip_dynamic_inbound_limit: usize, pub gossip_static_inbound: HashSet, - pub gossip_static_outbound: HashMap, + pub gossip_static_outbound: HashMap, } impl ProtoFmt for AppConfig { @@ -104,7 +104,7 @@ impl ProtoFmt for AppConfig { } Ok(Self { server_addr: read_required_text(&r.server_addr).context("server_addr")?, - public_addr: read_required_text(&r.public_addr).context("public_addr")?, + public_addr: net::Host(required(&r.public_addr).context("public_addr")?.clone()), metrics_server_addr: read_optional_text(&r.metrics_server_addr) .context("metrics_server_addr")?, @@ -124,7 +124,7 @@ impl ProtoFmt for AppConfig { fn build(&self) -> Self::Proto { Self::Proto { server_addr: Some(self.server_addr.encode()), - public_addr: Some(self.public_addr.encode()), + public_addr: Some(self.public_addr.0.clone()), metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode), genesis: Some(self.genesis.build()), @@ -143,7 +143,7 @@ impl ProtoFmt for AppConfig { .iter() .map(|(key, addr)| proto::NodeAddr { key: Some(TextFmt::encode(key)), - addr: Some(TextFmt::encode(addr)), + addr: Some(addr.0.clone()), }) .collect(), } @@ -234,7 +234,7 @@ impl AppConfig { pub fn default_for(genesis: validator::Genesis) -> AppConfig { Self { server_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), NODES_PORT), - public_addr: SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), NODES_PORT), + public_addr: SocketAddr::new(Ipv4Addr::LOCALHOST.into(), NODES_PORT).into(), metrics_server_addr: None, genesis, @@ -251,7 +251,7 @@ impl AppConfig { self } - pub fn with_public_addr(&mut self, public_addr: SocketAddr) -> &mut Self { + pub fn with_public_addr(&mut self, public_addr: net::Host) -> &mut Self { self.public_addr = public_addr; self } @@ -272,7 +272,7 @@ impl AppConfig { pub fn add_gossip_static_outbound( &mut self, key: node::PublicKey, - addr: SocketAddr, + addr: net::Host, ) -> &mut Self { self.gossip_static_outbound.insert(key, addr); self @@ -292,9 +292,10 @@ impl AppConfig { self } - pub fn check_public_addr(&mut self) -> anyhow::Result<()> { + /// Tries to load the public_addr IP from the env var. + pub fn try_load_public_addr(&mut self) -> anyhow::Result<()> { if let Ok(public_addr) = std::env::var("PUBLIC_ADDR") { - self.public_addr = SocketAddr::from_str(&format!("{public_addr}:{NODES_PORT}"))?; + self.public_addr = SocketAddr::from_str(&format!("{public_addr}:{NODES_PORT}"))?.into(); } Ok(()) } @@ -310,7 +311,7 @@ impl Configs { let e = executor::Executor { config: executor::Config { server_addr: self.app.server_addr, - public_addr: self.app.public_addr, + public_addr: self.app.public_addr.clone(), node_key: self.node_key.clone(), gossip_dynamic_inbound_limit: self.app.gossip_dynamic_inbound_limit, gossip_static_inbound: self.app.gossip_static_inbound.clone(), diff --git a/node/tools/src/k8s.rs b/node/tools/src/k8s.rs index 50cb012b..40c1936f 100644 --- a/node/tools/src/k8s.rs +++ b/node/tools/src/k8s.rs @@ -75,7 +75,7 @@ impl ConsensusNode { .context("Pod IP address not present")?; self.node_addr = Some(NodeAddr { key: self.key.public(), - addr: SocketAddr::new(ip.parse()?, config::NODES_PORT), + addr: SocketAddr::new(ip.parse()?, config::NODES_PORT).into(), }); Ok(()) } diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index a17efa32..7f272131 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -130,14 +130,17 @@ async fn main() -> anyhow::Result<()> { let mut configs = args.config_args().load().context("config_args().load()")?; // if `PUBLIC_ADDR` env var is set, use it to override publicAddr in config - configs.app.check_public_addr().context("Public Address")?; + configs + .app + .try_load_public_addr() + .context("Public Address")?; let (executor, runner) = configs .make_executor(ctx) .await .context("configs.into_executor()")?; - let mut rpc_addr = configs.app.public_addr; + let mut rpc_addr = configs.app.server_addr; if let Some(port) = args.rpc_port { rpc_addr.set_port(port); } else { diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index 15d2f840..95f6c489 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -1,33 +1,26 @@ use crate::{store, AppConfig}; -use rand::{ - distributions::{Distribution, Standard}, - Rng, -}; +use rand::{distributions::Distribution, Rng}; use tempfile::TempDir; use zksync_concurrency::ctx; -use zksync_consensus_roles::{node, validator::testonly::Setup}; +use zksync_consensus_roles::validator::testonly::Setup; use zksync_consensus_storage::{testonly, PersistentBlockStore}; -use zksync_protobuf::testonly::test_encode_random; +use zksync_consensus_utils::EncodeDist; +use zksync_protobuf::testonly::{test_encode_all_formats, FmtConv}; -fn make_addr(rng: &mut R) -> std::net::SocketAddr { - std::net::SocketAddr::new(std::net::IpAddr::from(rng.gen::<[u8; 16]>()), rng.gen()) -} - -impl Distribution for Standard { +impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> AppConfig { AppConfig { - server_addr: make_addr(rng), - public_addr: make_addr(rng), - metrics_server_addr: Some(make_addr(rng)), + server_addr: self.sample(rng), + public_addr: self.sample(rng), + metrics_server_addr: Some(self.sample(rng)), genesis: rng.gen(), gossip_dynamic_inbound_limit: rng.gen(), - gossip_static_inbound: (0..5) - .map(|_| rng.gen::().public()) - .collect(), - gossip_static_outbound: (0..6) - .map(|_| (rng.gen::().public(), make_addr(rng))) + gossip_static_inbound: self.sample_range(rng).map(|_| rng.gen()).collect(), + gossip_static_outbound: self + .sample_range(rng) + .map(|_| (rng.gen(), self.sample(rng))) .collect(), max_payload_size: rng.gen(), } @@ -38,7 +31,7 @@ impl Distribution for Standard { fn test_schema_encoding() { let ctx = ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - test_encode_random::(rng); + test_encode_all_formats::>(rng); } #[tokio::test]