Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added dns support for the addresses in the config. #82

Merged
merged 16 commits into from
Mar 27, 2024
3 changes: 3 additions & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ rand04 = { package = "rand", version = "0.4" }
rocksdb = "0.21.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.95"
serde_yaml = "0.9"
sha3 = "0.10.8"
snow = "0.9.3"
syn = "2.0.17"
Expand Down
1 change: 0 additions & 1 deletion node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ async fn run_nodes(ctx: &ctx::Ctx, network: Network, specs: &[Node]) -> anyhow::
let mut nodes = vec![];
for (i, spec) in specs.iter().enumerate() {
let (node, runner) = network::testonly::Instance::new(
ctx,
spec.net.clone(),
spec.block_store.clone(),
);
Expand Down
4 changes: 2 additions & 2 deletions node/actors/bft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use zksync_concurrency::{ctx, scope, time};
use zksync_consensus_roles::validator;

async fn run_test(behavior: Behavior, network: Network) {
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(20));
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(30));
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);

Expand All @@ -15,7 +15,7 @@ async fn run_test(behavior: Behavior, network: Network) {
Test {
network,
nodes,
blocks_to_finalize: 15,
blocks_to_finalize: 10,
}
.run(ctx)
.await
Expand Down
7 changes: 3 additions & 4 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct Config {
pub server_addr: std::net::SocketAddr,
/// Public TCP address that other nodes are expected to connect to.
/// It is announced over gossip network.
pub public_addr: std::net::SocketAddr,
pub public_addr: net::Host,
/// Maximal size of the block payload.
pub max_payload_size: usize,

Expand All @@ -59,7 +59,7 @@ pub struct Config {
pub gossip_static_inbound: HashSet<node::PublicKey>,
/// Outbound connections that the node should actively try to
/// establish and maintain.
pub gossip_static_outbound: HashMap<node::PublicKey, std::net::SocketAddr>,
pub gossip_static_outbound: HashMap<node::PublicKey, net::Host>,
}

impl Config {
Expand Down Expand Up @@ -90,7 +90,7 @@ impl Executor {
fn network_config(&self) -> network::Config {
network::Config {
server_addr: net::tcp::ListenerAddr::new(self.config.server_addr),
public_addr: self.config.public_addr,
public_addr: self.config.public_addr.clone(),
gossip: self.config.gossip(),
validator_key: self.validator.as_ref().map(|v| v.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
Expand Down Expand Up @@ -136,7 +136,6 @@ impl Executor {
s.spawn_blocking(|| dispatcher.run(ctx).context("IO Dispatcher stopped"));
s.spawn(async {
let (net, runner) = network::Network::new(
ctx,
network_config,
self.block_store.clone(),
network_actor_pipe,
Expand Down
2 changes: 1 addition & 1 deletion node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use zksync_consensus_storage::{
fn config(cfg: &network::Config) -> Config {
Config {
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr,
public_addr: cfg.public_addr.clone(),
max_payload_size: usize::MAX,
node_key: cfg.gossip.key.clone(),
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
Expand Down
7 changes: 5 additions & 2 deletions node/actors/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct GossipConfig {
pub static_inbound: HashSet<node::PublicKey>,
/// Outbound connections that the node should actively try to
/// establish and maintain.
pub static_outbound: HashMap<node::PublicKey, std::net::SocketAddr>,
pub static_outbound: HashMap<node::PublicKey, net::Host>,
}

/// Network actor config.
Expand All @@ -65,7 +65,10 @@ pub struct Config {
pub server_addr: net::tcp::ListenerAddr,
/// Public TCP address that other nodes are expected to connect to.
/// It is announced over gossip network.
pub public_addr: std::net::SocketAddr,
/// In case public_addr is a domain instead of ip, DNS resolution is
/// performed and a loopback connection is established before announcing
/// the IP address over the gossip network.
pub public_addr: net::Host,
/// Gossip network config.
pub gossip: GossipConfig,
/// Private key of the validator.
Expand Down
134 changes: 77 additions & 57 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
//! BFT consensus messages are exchanged over this network.
use crate::{config, gossip, io, noise, pool::PoolWatch, preface, rpc};
use anyhow::Context as _;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use rand::seq::SliceRandom;
use std::{collections::HashSet, sync::Arc};
use zksync_concurrency::{ctx, oneshot, scope, sync, time};
use zksync_consensus_roles::validator;
use zksync_protobuf::kB;
Expand All @@ -21,18 +19,25 @@ const RESP_MAX_SIZE: usize = kB;
/// is down.
const ADDRESS_ANNOUNCER_INTERVAL: time::Duration = time::Duration::minutes(10);

/// Outbound connection state.
pub(crate) struct Connection {
/// Peer's address.
/// This is not used for now, but will be required for the debug page.
#[allow(dead_code)]
addr: std::net::SocketAddr,
consensus: rpc::Client<rpc::consensus::Rpc>,
}

/// Consensus network state.
pub(crate) struct Network {
/// Gossip network state to bootstrap consensus network from.
pub(crate) gossip: Arc<gossip::Network>,
/// This validator's secret key.
pub(crate) key: validator::SecretKey,
/// Set of the currently open inbound connections.
pub(crate) inbound: PoolWatch<validator::PublicKey>,
pub(crate) inbound: PoolWatch<validator::PublicKey, ()>,
/// Set of the currently open outbound connections.
pub(crate) outbound: PoolWatch<validator::PublicKey>,
/// RPC clients for all validators.
pub(crate) clients: HashMap<validator::PublicKey, rpc::Client<rpc::consensus::Rpc>>,
pub(crate) outbound: PoolWatch<validator::PublicKey, Arc<Connection>>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -61,22 +66,13 @@ impl rpc::Handler<rpc::consensus::Rpc> for &Network {

impl Network {
/// Constructs a new consensus network state.
pub(crate) fn new(ctx: &ctx::Ctx, gossip: Arc<gossip::Network>) -> Option<Arc<Self>> {
pub(crate) fn new(gossip: Arc<gossip::Network>) -> Option<Arc<Self>> {
let key = gossip.cfg.validator_key.clone()?;
let validators: HashSet<_> = gossip.genesis().validators.iter().cloned().collect();
Some(Arc::new(Self {
key,
inbound: PoolWatch::new(validators.clone(), 0),
outbound: PoolWatch::new(validators.clone(), 0),
clients: validators
.iter()
.map(|peer| {
(
peer.clone(),
rpc::Client::new(ctx, gossip.cfg.rpc.consensus_rate),
)
})
.collect(),
gossip,
}))
}
Expand All @@ -88,10 +84,11 @@ impl Network {
msg: validator::Signed<validator::ConsensusMsg>,
) -> anyhow::Result<()> {
let req = rpc::consensus::Req(msg);
let outbound = self.outbound.current();
scope::run!(ctx, |ctx, s| async {
for (peer, client) in &self.clients {
for (peer, conn) in &outbound {
s.spawn(async {
if let Err(err) = client.call(ctx, &req, RESP_MAX_SIZE).await {
if let Err(err) = conn.consensus.call(ctx, &req, RESP_MAX_SIZE).await {
tracing::info!("send({:?},<ConsensusMsg>): {err:#}", &*peer);
}
Ok(())
Expand All @@ -109,8 +106,11 @@ impl Network {
key: &validator::PublicKey,
msg: validator::Signed<validator::ConsensusMsg>,
) -> anyhow::Result<()> {
let client = self.clients.get(key).context("not an active validator")?;
client
let outbound = self.outbound.current();
outbound
.get(key)
.context("not an active validator")?
.consensus
.call(ctx, &rpc::consensus::Req(msg), RESP_MAX_SIZE)
.await?;
Ok(())
Expand All @@ -125,7 +125,7 @@ impl Network {
) -> anyhow::Result<()> {
let peer =
handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?;
self.inbound.insert(peer.clone()).await?;
self.inbound.insert(peer.clone(), ()).await?;
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_server(rpc::ping::Server, rpc::ping::RATE)
Expand All @@ -152,7 +152,6 @@ impl Network {
peer: &validator::PublicKey,
addr: std::net::SocketAddr,
) -> anyhow::Result<()> {
let client = self.clients.get(peer).context("not an active validator")?;
let mut stream = preface::connect(ctx, addr, preface::Endpoint::ConsensusNet).await?;
handshake::outbound(
ctx,
Expand All @@ -162,11 +161,15 @@ impl Network {
peer,
)
.await?;
self.outbound.insert(peer.clone()).await?;
let conn = Arc::new(Connection {
addr,
consensus: rpc::Client::new(ctx, self.gossip.cfg.rpc.consensus_rate),
});
self.outbound.insert(peer.clone(), conn.clone()).await?;
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
.add_server(rpc::ping::Server, rpc::ping::RATE)
.add_client(client);
.add_client(&conn.consensus);
if let Some(ping_timeout) = &self.gossip.cfg.ping_timeout {
let ping_client = rpc::Client::<rpc::ping::Rpc>::new(ctx, rpc::ping::RATE);
service = service.add_client(&ping_client);
Expand All @@ -175,6 +178,28 @@ impl Network {
ping_client.ping_loop(ctx, *ping_timeout).await
});
}
// If this is a loopback connection, announce periodically the address of this
// validator to the network.
// Note that this is executed only for outbound end of the loopback connection.
// Inbound end doesn't know the public address of itself.
if peer == &self.key.public() {
s.spawn(async {
let mut sub = self.gossip.validator_addrs.subscribe();
while ctx.is_active() {
self.gossip
.validator_addrs
.announce(&self.key, addr, ctx.now_utc())
.await;
let _ = sync::wait_for(
&ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL),
&mut sub,
|got| got.get(peer).map(|x| x.msg.addr) != Some(addr),
)
.await;
}
Ok(())
});
}
service.run(ctx, stream).await?;
Ok(())
})
Expand All @@ -183,9 +208,35 @@ impl Network {
res
}

async fn run_loopback_stream(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let addr = *self
.gossip
.cfg
.public_addr
.resolve(ctx)
.await?
.context("resolve()")?
.choose(&mut ctx.rng())
.with_context(|| {
format!("{:?} resolved to no addresses", self.gossip.cfg.public_addr)
})?;
self.run_outbound_stream(ctx, &self.key.public(), addr)
.await
}

/// Maintains a connection to the given validator.
/// If connection breaks, it tries to reconnect periodically.
pub(crate) async fn maintain_connection(&self, ctx: &ctx::Ctx, peer: &validator::PublicKey) {
// Loopback connection is a special case, because the address is taken from
// the config rather than from `gossip.validator_addrs`.
if &self.key.public() == peer {
while ctx.is_active() {
if let Err(err) = self.run_loopback_stream(ctx).await {
tracing::info!("run_loopback_stream(): {err:#}");
}
}
return;
}
let addrs = &mut self.gossip.validator_addrs.subscribe();
let mut addr = None;
while ctx.is_active() {
Expand All @@ -204,35 +255,4 @@ impl Network {
}
}
}

/// Periodically announces this validator's public IP over gossip network,
/// so that other validators can discover and connect to this validator.
pub(crate) async fn run_address_announcer(&self, ctx: &ctx::Ctx) {
let my_addr = self.gossip.cfg.public_addr;
let mut sub = self.gossip.validator_addrs.subscribe();
while ctx.is_active() {
let ctx = &ctx.with_timeout(ADDRESS_ANNOUNCER_INTERVAL);
let _ = sync::wait_for(ctx, &mut sub, |got| {
got.get(&self.key.public()).map(|x| &x.msg.addr) != Some(&my_addr)
})
.await;
let next_version = sub
.borrow()
.get(&self.key.public())
.map(|x| x.msg.version + 1)
.unwrap_or(0);
self.gossip
.validator_addrs
.update(
&self.gossip.genesis().validators,
&[Arc::new(self.key.sign_msg(validator::NetAddress {
addr: my_addr,
version: next_version,
timestamp: ctx.now_utc(),
}))],
)
.await
.unwrap();
}
}
}
Loading
Loading