Skip to content

Commit

Permalink
Added dns support for the addresses in the config. (#82)
Browse files Browse the repository at this point in the history
Added dns resolution support for the addresses in the config.
Now public_addr and gossip peers' addrs can be specified in
`<domain>:<port>` format.
Validator address discovery still uses `<IP>:<port>` and validators
announce their IP address only after verifying it (by establishing a
loopback connection using the public_addr).

DNS ecosystem is slightly more centralized (requires authoritative DNS
servers) than IP assignment, but not that much. It is useful in case of
systems which have problem with providing stable IPs (like kubernetes).
We need dns support at the very least for the non-public (stage)
environment of zksync-era.

This pr also implements some minor related upgrades (like merged ArcMap
functionality into Pool, and moved the sampling logic from zksync-era).
  • Loading branch information
pompon0 authored Mar 27, 2024
1 parent f35f2bc commit ce92795
Show file tree
Hide file tree
Showing 36 changed files with 595 additions and 318 deletions.
3 changes: 3 additions & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ rand04 = { package = "rand", version = "0.4" }
rocksdb = "0.21.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.95"
serde_yaml = "0.9"
sha3 = "0.10.8"
snow = "0.9.3"
syn = "2.0.17"
Expand Down
1 change: 0 additions & 1 deletion node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow::
let mut nodes = vec![];
for (i, spec) in specs.iter().enumerate() {
let (node, runner) = network::testonly::Instance::new(
ctx,
spec.net.clone(),
spec.block_store.clone(),
);
Expand Down
4 changes: 2 additions & 2 deletions node/actors/bft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use zksync_concurrency::{ctx, scope, time};
use zksync_consensus_roles::validator;

async fn run_test(behavior: Behavior, network: Network) {
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20));
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30));
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);

Expand All @@ -15,7 +15,7 @@ async fn run_test(behavior: Behavior, network: Network) {
Test {
network,
nodes,
blocks_to_finalize: 15,
blocks_to_finalize: 10,
}
.run(ctx)
.await
Expand Down
7 changes: 3 additions & 4 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct Config {
pub server_addr: std::net::SocketAddr,
/// Public TCP address that other nodes are expected to connect to.
/// It is announced over gossip network.
pub public_addr: std::net::SocketAddr,
pub public_addr: net::Host,
/// Maximal size of the block payload.
pub max_payload_size: usize,

Expand All @@ -59,7 +59,7 @@ pub struct Config {
pub gossip_static_inbound: HashSet<node::PublicKey>,
/// Outbound connections that the node should actively try to
/// establish and maintain.
pub gossip_static_outbound: HashMap<node::PublicKey, std::net::SocketAddr>,
pub gossip_static_outbound: HashMap<node::PublicKey, net::Host>,
}

impl Config {
Expand Down Expand Up @@ -90,7 +90,7 @@ impl Executor {
fn network_config(&self) -> network::Config {
network::Config {
server_addr: net::tcp::ListenerAddr::new(self.config.server_addr),
public_addr: self.config.public_addr,
public_addr: self.config.public_addr.clone(),
gossip: self.config.gossip(),
validator_key: self.validator.as_ref().map(|v| v.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
Expand Down Expand Up @@ -136,7 +136,6 @@ impl Executor {
s.spawn_blocking(|| dispatcher.run(ctx).context("IO Dispatcher stopped"));
s.spawn(async {
let (net, runner) = network::Network::new(
ctx,
network_config,
self.block_store.clone(),
network_actor_pipe,
Expand Down
2 changes: 1 addition & 1 deletion node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use zksync_consensus_storage::{
fn config(cfg: &network::Config) -> Config {
Config {
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr,
public_addr: cfg.public_addr.clone(),
max_payload_size: usize::MAX,
node_key: cfg.gossip.key.clone(),
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
Expand Down
7 changes: 5 additions & 2 deletions node/actors/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct GossipConfig {
pub static_inbound: HashSet<node::PublicKey>,
/// Outbound connections that the node should actively try to
/// establish and maintain.
pub static_outbound: HashMap<node::PublicKey, std::net::SocketAddr>,
pub static_outbound: HashMap<node::PublicKey, net::Host>,
}

/// Network actor config.
Expand All @@ -65,7 +65,10 @@ pub struct Config {
pub server_addr: net::tcp::ListenerAddr,
/// Public TCP address that other nodes are expected to connect to.
/// It is announced over gossip network.
pub public_addr: std::net::SocketAddr,
/// In case public_addr is a domain instead of ip, DNS resolution is
/// performed and a loopback connection is established before announcing
/// the IP address over the gossip network.
pub public_addr: net::Host,
/// Gossip network config.
pub gossip: GossipConfig,
/// Private key of the validator.
Expand Down
134 changes: 77 additions & 57 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
//! BFT consensus messages are exchanged over this network.
use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc};
use anyhow::Context as _;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use rand::seq::SliceRandom;
use std::{collections::HashSet, sync::Arc};
use zksync_concurrency::{ctx, oneshot, scope, sync, time};
use zksync_consensus_roles::validator;
use zksync_protobuf::kB;
Expand All @@ -21,18 +19,25 @@ const RESP_MAX_SIZE: usize = kB;
/// is down.
const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10);

/// Outbound connection state.
pub(crate) struct Connection {
/// Peer's address.
/// This is not used for now, but will be required for the debug page.
#[allow(dead_code)]
addr: std::net::SocketAddr,
consensus: rpc::Client<rpc::consensus::Rpc>,
}

/// Consensus network state.
pub(crate) struct Network {
/// Gossip network state to bootstrap consensus network from.
pub(crate) gossip: Arc<gossip::Network>,
/// This validator's secret key.
pub(crate) key: validator::SecretKey,
/// Set of the currently open inbound connections.
pub(crate) inbound: PoolWatch<validator::PublicKey>,
pub(crate) inbound: PoolWatch<validator::PublicKey, ()>,
/// Set of the currently open outbound connections.
pub(crate) outbound: PoolWatch<validator::PublicKey>,
/// RPC clients for all validators.
pub(crate) clients: HashMap<validator::PublicKey, rpc::Client<rpc::consensus::Rpc>>,
pub(crate) outbound: PoolWatch<validator::PublicKey, Arc<Connection>>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -61,22 +66,13 @@ impl rpc::Handler<rpc::consensus::Rpc> for &Network {

impl Network {
/// Constructs a new consensus network state.
pub(crate) fn new(ctx: &ctx::Ctx, gossip: Arc<gossip::Network>) -> Option<Arc<Self>> {
pub(crate) fn new(gossip: Arc<gossip::Network>) -> Option<Arc<Self>> {
let key = gossip.cfg.validator_key.clone()?;
let validators: HashSet<_> = gossip.genesis().validators.iter().cloned().collect();
Some(Arc::new(Self {
key,
inbound: PoolWatch::new(validators.clone(), 0),
outbound: PoolWatch::new(validators.clone(), 0),
clients: validators
.iter()
.map(|peer| {
(
peer.clone(),
rpc::Client::new(ctx, gossip.cfg.rpc.consensus_rate),
)
})
.collect(),
gossip,
}))
}
Expand All @@ -88,10 +84,11 @@ impl Network {
msg: validator::Signed<validator::ConsensusMsg>,
) -> anyhow::Result<()> {
let req = rpc::consensus::Req(msg);
let outbound = self.outbound.current();
scope::run!(ctx, |ctx, s| async {
for (peer, client) in &self.clients {
for (peer, conn) in &outbound {
s.spawn(async {
if let Err(err) = client.call(ctx, &req, RESP_MAX_SIZE).await {
if let Err(err) = conn.consensus.call(ctx, &req, RESP_MAX_SIZE).await {
tracing::info!("send({:?},<ConsensusMsg>): {err:#}", &*peer);
}
Ok(())
Expand All @@ -109,8 +106,11 @@ impl Network {
key: &validator::PublicKey,
msg: validator::Signed<validator::ConsensusMsg>,
) -> anyhow::Result<()> {
let client = self.clients.get(key).context("not an active validator")?;
client
let outbound = self.outbound.current();
outbound
.get(key)
.context("not an active validator")?
.consensus
.call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE)
.await?;
Ok(())
Expand All @@ -125,7 +125,7 @@ impl Network {
) -> anyhow::Result<()> {
let peer =
handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?;
self.inbound.insert(peer.clone()).await?;
self.inbound.insert(peer.clone(), ()).await?;
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_server(rpc::ping::Server, rpc::ping::RATE)
Expand All @@ -152,7 +152,6 @@ impl Network {
peer: &validator::PublicKey,
addr: std::net::SocketAddr,
) -> anyhow::Result<()> {
let client = self.clients.get(peer).context("not an active validator")?;
let mut stream = preface::connect(ctx, addr, preface::Endpoint::ConsensusNet).await?;
handshake::outbound(
ctx,
Expand All @@ -162,11 +161,15 @@ impl Network {
peer,
)
.await?;
self.outbound.insert(peer.clone()).await?;
let conn = Arc::new(Connection {
addr,
consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate),
});
self.outbound.insert(peer.clone(), conn.clone()).await?;
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_server(rpc::ping::Server, rpc::ping::RATE)
.add_client(client);
.add_client(&conn.consensus);
if let Some(ping_timeout) = &self.gossip.cfg.ping_timeout {
let ping_client = rpc::Client::<rpc::ping::Rpc>::new(ctx, rpc::ping::RATE);
service = service.add_client(&ping_client);
Expand All @@ -175,6 +178,28 @@ impl Network {
ping_client.ping_loop(ctx, *ping_timeout).await
});
}
// If this is a loopback connection, announce periodically the address of this
// validator to the network.
// Note that this is executed only for outbound end of the loopback connection.
// Inbound end doesn't know the public address of itself.
if peer == &self.key.public() {
s.spawn(async {
let mut sub = self.gossip.validator_addrs.subscribe();
while ctx.is_active() {
self.gossip
.validator_addrs
.announce(&self.key, addr, ctx.now_utc())
.await;
let _ = sync::wait_for(
&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL),
&mut sub,
|got| got.get(peer).map(|x| x.msg.addr) != Some(addr),
)
.await;
}
Ok(())
});
}
service.run(ctx, stream).await?;
Ok(())
})
Expand All @@ -183,9 +208,35 @@ impl Network {
res
}

async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let addr = *self
.gossip
.cfg
.public_addr
.resolve(ctx)
.await?
.context("resolve()")?
.choose(&mut ctx.rng())
.with_context(|| {
format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr)
})?;
self.run_outbound_stream(ctx, &self.key.public(), addr)
.await
}

/// Maintains a connection to the given validator.
/// If connection breaks, it tries to reconnect periodically.
pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) {
// Loopback connection is a special case, because the address is taken from
// the config rather than from `gossip.validator_addrs`.
if &self.key.public() == peer {
while ctx.is_active() {
if let Err(err) = self.run_loopback_stream(ctx).await {
tracing::info!("run_loopback_stream(): {err:#}");
}
}
return;
}
let addrs = &mut self.gossip.validator_addrs.subscribe();
let mut addr = None;
while ctx.is_active() {
Expand All @@ -204,35 +255,4 @@ impl Network {
}
}
}

/// Periodically announces this validator's public IP over gossip network,
/// so that other validators can discover and connect to this validator.
pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) {
let my_addr = self.gossip.cfg.public_addr;
let mut sub = self.gossip.validator_addrs.subscribe();
while ctx.is_active() {
let ctx = &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL);
let _ = sync::wait_for(ctx, &mut sub, |got| {
got.get(&self.key.public()).map(|x| &x.msg.addr) != Some(&my_addr)
})
.await;
let next_version = sub
.borrow()
.get(&self.key.public())
.map(|x| x.msg.version + 1)
.unwrap_or(0);
self.gossip
.validator_addrs
.update(
&self.gossip.genesis().validators,
&[Arc::new(self.key.sign_msg(validator::NetAddress {
addr: my_addr,
version: next_version,
timestamp: ctx.now_utc(),
}))],
)
.await
.unwrap();
}
}
}
Loading

0 comments on commit ce92795

Please sign in to comment.