Skip to content

Commit

Permalink
Merge branch 'main' into gprusak-scrape
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 authored Aug 21, 2024
2 parents 11e84bc + 5de3ba3 commit 24ba031
Show file tree
Hide file tree
Showing 27 changed files with 335 additions and 156 deletions.
2 changes: 2 additions & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ quote = "1.0.33"
rand = "0.8.0"
rand04 = { package = "rand", version = "0.4" }
rocksdb = "0.21.0"
semver = "1.0.23"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.95"
serde_yaml = "0.9"
Expand Down
1 change: 1 addition & 0 deletions node/actors/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ zksync_protobuf.workspace = true
anyhow.workspace = true
async-trait.workspace = true
rand.workspace = true
semver.workspace = true
tracing.workspace = true
vise.workspace = true

Expand Down
3 changes: 3 additions & 0 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct Validator {
/// Config of the node executor.
#[derive(Debug)]
pub struct Config {
/// Label identifying the build version of the binary that this node is running.
pub build_version: Option<semver::Version>,
/// IP:port to listen on, for incoming TCP connections.
/// Use `0.0.0.0:<port>` to listen on all network interfaces (i.e. on all IPs exposed by this VM).
pub server_addr: std::net::SocketAddr,
Expand Down Expand Up @@ -99,6 +101,7 @@ impl Executor {
/// Extracts a network crate config.
fn network_config(&self) -> network::Config {
network::Config {
build_version: self.config.build_version.clone(),
server_addr: net::tcp::ListenerAddr::new(self.config.server_addr),
public_addr: self.config.public_addr.clone(),
gossip: self.config.gossip(),
Expand Down
1 change: 1 addition & 0 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use zksync_consensus_storage::{

fn config(cfg: &network::Config) -> Config {
Config {
build_version: None,
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr.clone(),
max_payload_size: usize::MAX,
Expand Down
1 change: 1 addition & 0 deletions node/actors/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tls-listener.workspace = true
base64.workspace = true
build_html.workspace = true
bytesize.workspace = true
semver.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions node/actors/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ pub struct GossipConfig {
/// Network actor config.
#[derive(Debug, Clone)]
pub struct Config {
/// Label identifying the build version of the binary that this node is running.
/// There is no specific semantics assigned to it.
pub build_version: Option<semver::Version>,
/// TCP socket address to listen for inbound connections at.
pub server_addr: net::tcp::ListenerAddr,
/// Public TCP address that other nodes are expected to connect to.
Expand Down
8 changes: 2 additions & 6 deletions node/actors/network/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,7 @@ impl Network {
) -> anyhow::Result<()> {
let peer =
handshake::inbound(ctx, &self.key, self.gossip.genesis().hash(), &mut stream).await?;
self.inbound
.insert(peer.clone(), stream.get_values())
.await?;
self.inbound.insert(peer.clone(), stream.stats()).await?;
tracing::info!("peer = {peer:?}");
let res = scope::run!(ctx, |ctx, s| async {
let mut service = rpc::Service::new()
Expand Down Expand Up @@ -211,9 +209,7 @@ impl Network {
peer,
)
.await?;
self.outbound
.insert(peer.clone(), stream.get_values())
.await?;
self.outbound.insert(peer.clone(), stream.stats()).await?;
tracing::info!("peer = {peer:?}");
let consensus_cli =
rpc::Client::<rpc::consensus::Rpc>::new(ctx, self.gossip.cfg.rpc.consensus_rate);
Expand Down
45 changes: 34 additions & 11 deletions node/actors/network/src/gossip/handshake/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::{frame, noise, proto::gossip as proto, GossipConfig};
use super::Connection;
use crate::{frame, noise, proto::gossip as proto, Config};
use anyhow::Context as _;
use std::sync::Arc;
use zksync_concurrency::{ctx, error::Wrap as _, time};
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_roles::{node, validator};
Expand Down Expand Up @@ -28,6 +30,9 @@ pub(crate) struct Handshake {
/// Information whether the peer treats this connection as static.
/// It is informational only, it doesn't affect the logic of the node.
pub(crate) is_static: bool,
/// Version at which peer's binary has been built.
/// It is declared by peer (i.e. not verified in any way).
pub(crate) build_version: Option<semver::Version>,
}

impl ProtoFmt for Handshake {
Expand All @@ -37,13 +42,20 @@ impl ProtoFmt for Handshake {
session_id: read_required(&r.session_id).context("session_id")?,
genesis: read_required(&r.genesis).context("genesis")?,
is_static: *required(&r.is_static).context("is_static")?,
build_version: r
.build_version
.as_ref()
.map(|x| x.parse())
.transpose()
.context("build_version")?,
})
}
fn build(&self) -> Self::Proto {
Self::Proto {
session_id: Some(self.session_id.build()),
genesis: Some(self.genesis.build()),
is_static: Some(self.is_static),
build_version: self.build_version.as_ref().map(|x| x.to_string()),
}
}
}
Expand All @@ -65,20 +77,21 @@ pub(super) enum Error {

pub(super) async fn outbound(
ctx: &ctx::Ctx,
cfg: &GossipConfig,
cfg: &Config,
genesis: validator::GenesisHash,
stream: &mut noise::Stream,
peer: &node::PublicKey,
) -> Result<(), Error> {
) -> Result<Connection, Error> {
let ctx = &ctx.with_timeout(TIMEOUT);
let session_id = node::SessionId(stream.id().encode());
frame::send_proto(
ctx,
stream,
&Handshake {
session_id: cfg.key.sign_msg(session_id.clone()),
session_id: cfg.gossip.key.sign_msg(session_id.clone()),
genesis,
is_static: cfg.static_outbound.contains_key(peer),
is_static: cfg.gossip.static_outbound.contains_key(peer),
build_version: cfg.build_version.clone(),
},
)
.await
Expand All @@ -96,15 +109,19 @@ pub(super) async fn outbound(
return Err(Error::PeerMismatch);
}
h.session_id.verify()?;
Ok(())
Ok(Connection {
key: h.session_id.key,
build_version: h.build_version,
stats: stream.stats(),
})
}

pub(super) async fn inbound(
ctx: &ctx::Ctx,
cfg: &GossipConfig,
cfg: &Config,
genesis: validator::GenesisHash,
stream: &mut noise::Stream,
) -> Result<node::PublicKey, Error> {
) -> Result<Arc<Connection>, Error> {
let ctx = &ctx.with_timeout(TIMEOUT);
let session_id = node::SessionId(stream.id().encode());
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
Expand All @@ -121,12 +138,18 @@ pub(super) async fn inbound(
ctx,
stream,
&Handshake {
session_id: cfg.key.sign_msg(session_id.clone()),
build_version: cfg.build_version.clone(),
session_id: cfg.gossip.key.sign_msg(session_id.clone()),
genesis,
is_static: cfg.static_inbound.contains(&h.session_id.key),
is_static: cfg.gossip.static_inbound.contains(&h.session_id.key),
},
)
.await
.wrap("send_proto()")?;
Ok(h.session_id.key)
Ok(Connection {
key: h.session_id.key,
build_version: h.build_version,
stats: stream.stats(),
}
.into())
}
18 changes: 18 additions & 0 deletions node/actors/network/src/gossip/handshake/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,23 @@ use rand::{
};
use zksync_consensus_roles::node;

/// Semver has specific restrictions on how an identifier
/// should look like, so we play it safe and generate
/// a letter-only string of fixed length.
fn gen_semver_identifier<R: Rng + ?Sized>(rng: &mut R) -> String {
(0..10).map(|_| rng.gen_range('a'..='z')).collect()
}

fn gen_semver<R: Rng + ?Sized>(rng: &mut R) -> semver::Version {
semver::Version {
major: rng.gen(),
minor: rng.gen(),
patch: rng.gen(),
pre: semver::Prerelease::new(&gen_semver_identifier(rng)).unwrap(),
build: semver::BuildMetadata::new(&gen_semver_identifier(rng)).unwrap(),
}
}

impl Distribution<Handshake> for Standard {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> Handshake {
let key: node::SecretKey = rng.gen();
Expand All @@ -17,6 +34,7 @@ impl Distribution<Handshake> for Standard {
session_id: key.sign_msg(session_id),
genesis: rng.gen(),
is_static: rng.gen(),
build_version: Some(gen_semver(rng)),
}
}
}
Loading

0 comments on commit 24ba031

Please sign in to comment.