Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into gprusak-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Apr 4, 2024
2 parents 1b72d63 + a7f3ca1 commit 7974a8b
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 61 deletions.
6 changes: 5 additions & 1 deletion node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
fmt,
sync::Arc,
};
use zksync_concurrency::{ctx, net, scope, time};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network as network;
use zksync_consensus_roles::{node, validator};
Expand Down Expand Up @@ -95,6 +95,10 @@ impl Executor {
validator_key: self.validator.as_ref().map(|v| v.key.clone()),
ping_timeout: Some(time::Duration::seconds(10)),
max_block_size: self.config.max_payload_size.saturating_add(kB),
tcp_accept_rate: limiter::Rate {
burst: 10,
refresh: time::Duration::milliseconds(100),
},
rpc: network::RpcConfig::default(),
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/actors/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub struct Config {
/// the connection is dropped.
/// `None` disables sending ping messages (useful for tests).
pub ping_timeout: Option<time::Duration>,
/// Max rate at which inbound TCP connections should be accepted.
pub tcp_accept_rate: limiter::Rate,
/// Rate limiting config for RPCs.
pub rpc: RpcConfig,
}
9 changes: 6 additions & 3 deletions node/actors/network/src/consensus/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Context as _;
use zksync_concurrency::{ctx, time};
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_roles::{node, validator};
use zksync_protobuf::{read_required, ProtoFmt};
use zksync_protobuf::{kB, read_required, ProtoFmt};

#[cfg(test)]
mod testonly;
Expand All @@ -13,6 +13,9 @@ mod tests;
/// Timeout on performing a handshake.
const TIMEOUT: time::Duration = time::Duration::seconds(5);

/// Max size of a handshake frame.
const MAX_FRAME: usize = 10 * kB;

/// First message exchanged by nodes after establishing e2e encryption.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Handshake {
Expand Down Expand Up @@ -74,7 +77,7 @@ pub(super) async fn outbound(
)
.await
.map_err(Error::Stream)?;
let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size())
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
if h.genesis != genesis {
Expand All @@ -98,7 +101,7 @@ pub(super) async fn inbound(
) -> Result<validator::PublicKey, Error> {
let ctx = &ctx.with_timeout(TIMEOUT);
let session_id = node::SessionId(stream.id().encode());
let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size())
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
if h.genesis != genesis {
Expand Down
8 changes: 3 additions & 5 deletions node/actors/network/src/consensus/handshake/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn test_session_id_mismatch() {
let (mut s1, s2) = noise::testonly::pipe(ctx).await;
s.spawn_bg(async {
let mut s2 = s2;
let _: Handshake = frame::recv_proto(ctx, &mut s2, Handshake::max_size()).await?;
let _: Handshake = frame::recv_proto(ctx, &mut s2, MAX_FRAME).await?;
frame::send_proto(
ctx,
&mut s2,
Expand Down Expand Up @@ -149,9 +149,7 @@ async fn test_genesis_mismatch() {
Ok(())
});
let session_id = node::SessionId(s1.id().encode());
let _: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size())
.await
.unwrap();
let _: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await.unwrap();
frame::send_proto(
ctx,
&mut s1,
Expand Down Expand Up @@ -184,7 +182,7 @@ async fn test_invalid_signature() {
let (mut s0, s1) = noise::testonly::pipe(ctx).await;
s.spawn_bg(async {
let mut s1 = s1;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()).await?;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await?;
h.session_id.key = key1.public();
frame::send_proto(ctx, &mut s1, &h).await?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ async fn test_genesis_mismatch() {
.unwrap();

tracing::info!("Accept a connection with mismatching genesis.");
let stream = metrics::MeteredStream::listen(ctx, &mut listener)
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("listen()")?;
.context("accept()")?;
let (mut stream, endpoint) = preface::accept(ctx, stream)
.await
.context("preface::accept()")?;
Expand Down
9 changes: 6 additions & 3 deletions node/actors/network/src/gossip/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::Context as _;
use zksync_concurrency::{ctx, time};
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_roles::{node, validator};
use zksync_protobuf::{read_required, required, ProtoFmt};
use zksync_protobuf::{kB, read_required, required, ProtoFmt};

#[cfg(test)]
mod testonly;
Expand All @@ -13,6 +13,9 @@ mod tests;
/// Timeout on performing a handshake.
const TIMEOUT: time::Duration = time::Duration::seconds(5);

/// Max size of a handshake frame.
const MAX_FRAME: usize = 10 * kB;

/// First message exchanged by nodes after establishing e2e encryption.
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct Handshake {
Expand Down Expand Up @@ -80,7 +83,7 @@ pub(super) async fn outbound(
)
.await
.map_err(Error::Stream)?;
let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size())
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
if h.genesis != genesis {
Expand All @@ -104,7 +107,7 @@ pub(super) async fn inbound(
) -> Result<node::PublicKey, Error> {
let ctx = &ctx.with_timeout(TIMEOUT);
let session_id = node::SessionId(stream.id().encode());
let h: Handshake = frame::recv_proto(ctx, stream, Handshake::max_size())
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
if h.session_id.msg != session_id {
Expand Down
8 changes: 3 additions & 5 deletions node/actors/network/src/gossip/handshake/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn test_session_id_mismatch() {
let (mut s1, s2) = noise::testonly::pipe(ctx).await;
s.spawn_bg(async {
let mut s2 = s2;
let _: Handshake = frame::recv_proto(ctx, &mut s2, Handshake::max_size()).await?;
let _: Handshake = frame::recv_proto(ctx, &mut s2, MAX_FRAME).await?;
frame::send_proto(
ctx,
&mut s2,
Expand Down Expand Up @@ -162,9 +162,7 @@ async fn test_genesis_mismatch() {
Ok(())
});
let session_id = node::SessionId(s1.id().encode());
let _: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size())
.await
.unwrap();
let _: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await.unwrap();
frame::send_proto(
ctx,
&mut s1,
Expand Down Expand Up @@ -198,7 +196,7 @@ async fn test_invalid_signature() {
let (mut s0, s1) = noise::testonly::pipe(ctx).await;
s.spawn_bg(async {
let mut s1 = s1;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()).await?;
let mut h: Handshake = frame::recv_proto(ctx, &mut s1, MAX_FRAME).await?;
h.session_id.key = cfg1.key.public();
frame::send_proto(ctx, &mut s1, &h).await?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions node/actors/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ async fn test_genesis_mismatch() {
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));

tracing::info!("Accept a connection with mismatching genesis.");
let stream = metrics::MeteredStream::listen(ctx, &mut listener)
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("listen()")?;
.context("accept()")?;
let (mut stream, endpoint) = preface::accept(ctx, stream)
.await
.context("preface::accept()")?;
Expand Down
34 changes: 20 additions & 14 deletions node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Network actor maintaining a pool of outbound and inbound connections to other nodes.
use anyhow::Context as _;
use std::sync::Arc;
use zksync_concurrency::{ctx, ctx::channel, scope, time};
use zksync_concurrency::{ctx, ctx::channel, limiter, scope, time};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_utils::pipe::ActorPipe;

Expand Down Expand Up @@ -109,15 +109,15 @@ impl Network {
impl Runner {
/// Runs the network actor.
pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let mut listener = self
.net
.gossip
.cfg
.server_addr
.bind()
.context("server_addr.bind()")?;
let res: ctx::Result<()> = scope::run!(ctx, |ctx, s| async {
let mut listener = self
.net
.gossip
.cfg
.server_addr
.bind()
.context("server_addr.bind()")?;

scope::run!(ctx, |ctx, s| async {
// Handle incoming messages.
s.spawn(async {
// We don't propagate cancellation errors
Expand Down Expand Up @@ -165,9 +165,12 @@ impl Runner {
}
}

// TODO(gprusak): add rate limit and inflight limit for inbound handshakes.
while let Ok(stream) = metrics::MeteredStream::listen(ctx, &mut listener).await {
let stream = stream.context("listener.accept()")?;
let accept_limiter = limiter::Limiter::new(ctx, self.net.gossip.cfg.tcp_accept_rate);
loop {
accept_limiter.acquire(ctx, 1).await?;
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("accept()")?;
s.spawn(async {
let res = async {
let (stream, endpoint) = preface::accept(ctx, stream)
Expand Down Expand Up @@ -198,8 +201,11 @@ impl Runner {
Ok(())
});
}
Ok(())
})
.await
.await;
match res {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
}
}
}
2 changes: 1 addition & 1 deletion node/actors/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl MeteredStream {
}

/// Accepts an inbound connection and returns a metered stream.
pub(crate) async fn listen(
pub(crate) async fn accept(
ctx: &ctx::Ctx,
listener: &mut net::tcp::Listener,
) -> ctx::OrCanceled<io::Result<Self>> {
Expand Down
9 changes: 4 additions & 5 deletions node/actors/network/src/mux/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use super::CapabilityId;
use crate::proto::mux as proto;
use anyhow::Context as _;
use std::collections::HashMap;
use zksync_protobuf::required;
use zksync_protobuf::{kB, required};

/// Max size of a handshkake frame.
pub(super) const MAX_FRAME: usize = 10 * kB;

pub(super) struct Handshake {
/// Maximal supported number of the accept streams per capability.
Expand Down Expand Up @@ -40,10 +43,6 @@ fn build_capabilities(
impl zksync_protobuf::ProtoFmt for Handshake {
type Proto = proto::Handshake;

fn max_size() -> usize {
zksync_protobuf::kB
}

fn read(r: &Self::Proto) -> anyhow::Result<Self> {
Ok(Self {
accept_max_streams: read_max_streams(&r.accept).context("accept")?,
Expand Down
3 changes: 1 addition & 2 deletions node/actors/network/src/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ use crate::{frame, noise::bytes};
use anyhow::Context as _;
use std::{collections::BTreeMap, sync::Arc};
use zksync_concurrency::{ctx, ctx::channel, io, scope, sync};
use zksync_protobuf::ProtoFmt as _;

mod config;
mod handshake;
Expand Down Expand Up @@ -279,7 +278,7 @@ impl Mux {
let h = self.handshake();
frame::send_proto(ctx, &mut write, &h).await
});
frame::recv_proto(ctx, &mut read, Handshake::max_size()).await
frame::recv_proto(ctx, &mut read, handshake::MAX_FRAME).await
})
.await
.map_err(RunError::Protocol)?;
Expand Down
6 changes: 2 additions & 4 deletions node/actors/network/src/mux/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::{
},
};
use zksync_concurrency::{ctx, scope, testonly::abort_on_panic};
use zksync_protobuf::ProtoFmt as _;

mod proto;

Expand Down Expand Up @@ -122,7 +121,7 @@ async fn run_server(
s.spawn(async {
let mut stream = stream;
while let Ok((req, _)) =
frame::mux_recv_proto::<Req>(ctx, &mut stream.read, Req::max_size()).await
frame::mux_recv_proto::<Req>(ctx, &mut stream.read, usize::MAX).await
{
let resp = Resp {
output: rpc_handler(req.0),
Expand Down Expand Up @@ -164,8 +163,7 @@ async fn run_client(
frame::mux_send_proto(ctx, &mut stream.write, &Req(req.clone())).await?;
stream.write.flush(ctx).await?;
let (resp, _) =
frame::mux_recv_proto::<Resp>(ctx, &mut stream.read, Resp::max_size())
.await?;
frame::mux_recv_proto::<Resp>(ctx, &mut stream.read, usize::MAX).await?;
assert_eq!(resp.output, rpc_handler(req));
assert_eq!(resp.capability_id, cap);
}
Expand Down
18 changes: 9 additions & 9 deletions node/actors/network/src/preface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
//! and multiplex between multiple endpoints available on the same TCP port.
use crate::{frame, metrics, noise, proto::preface as proto};
use zksync_concurrency::{ctx, time};
use zksync_protobuf::{required, ProtoFmt};
use zksync_protobuf::{kB, required, ProtoFmt};

/// Timeout on executing the preface protocol.
const TIMEOUT: time::Duration = time::Duration::seconds(5);

/// Max size of the frames exchanged during preface.
const MAX_FRAME: usize = 10 * kB;

/// E2E encryption protocol to use on a TCP connection.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum Encryption {
Expand All @@ -32,9 +35,6 @@ pub(crate) enum Endpoint {

impl ProtoFmt for Encryption {
type Proto = proto::Encryption;
fn max_size() -> usize {
10 * zksync_protobuf::kB
}
fn read(r: &Self::Proto) -> anyhow::Result<Self> {
use proto::encryption::T;
Ok(match required(&r.t)? {
Expand All @@ -52,9 +52,6 @@ impl ProtoFmt for Encryption {

impl ProtoFmt for Endpoint {
type Proto = proto::Endpoint;
fn max_size() -> usize {
10 * zksync_protobuf::kB
}
fn read(r: &Self::Proto) -> anyhow::Result<Self> {
use proto::endpoint::T;
Ok(match required(&r.t)? {
Expand Down Expand Up @@ -92,8 +89,11 @@ pub(crate) async fn accept(
mut stream: metrics::MeteredStream,
) -> anyhow::Result<(noise::Stream, Endpoint)> {
let ctx = &ctx.with_timeout(TIMEOUT);
let _: Encryption = frame::recv_proto(ctx, &mut stream, Encryption::max_size()).await?;
let encryption: Encryption = frame::recv_proto(ctx, &mut stream, MAX_FRAME).await?;
if encryption != Encryption::NoiseNN {
anyhow::bail!("unsupported encryption protocol: {encryption:?}");
}
let mut stream = noise::Stream::server_handshake(ctx, stream).await?;
let endpoint = frame::recv_proto(ctx, &mut stream, Encryption::max_size()).await?;
let endpoint = frame::recv_proto(ctx, &mut stream, MAX_FRAME).await?;
Ok((stream, endpoint))
}
4 changes: 3 additions & 1 deletion node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use zksync_concurrency::{ctx, ctx::channel, io, net, scope, sync};
use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -69,6 +69,7 @@ pub fn new_configs(
static_outbound: HashMap::default(),
},
max_block_size: usize::MAX,
tcp_accept_rate: limiter::Rate::INF,
rpc: RpcConfig::default(),
}
});
Expand Down Expand Up @@ -104,6 +105,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config {
static_outbound: [(peer.gossip.key.public(), peer.public_addr.clone())].into(),
},
max_block_size: usize::MAX,
tcp_accept_rate: limiter::Rate::INF,
rpc: RpcConfig::default(),
}
}
Expand Down
Loading

0 comments on commit 7974a8b

Please sign in to comment.