Skip to content

Commit

Permalink
removed panicking when peer_addr fails (#164)
Browse files Browse the repository at this point in the history
peer_addr() may fail if the network socket is closed. It needs to be
handled gracefully.
I've also refactored some of the error handling to propagate
cancellation better.
  • Loading branch information
pompon0 authored Jul 29, 2024
1 parent 827607a commit b77b8a5
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 84 deletions.
14 changes: 7 additions & 7 deletions node/actors/network/src/consensus/handshake/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{frame, noise, proto::consensus as proto};
use anyhow::Context as _;
use zksync_concurrency::{ctx, time};
use zksync_concurrency::{ctx, error::Wrap as _, time};
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_roles::{node, validator};
use zksync_protobuf::{kB, read_required, ProtoFmt};
Expand Down Expand Up @@ -54,8 +54,8 @@ pub(super) enum Error {
PeerMismatch,
#[error("validator signature {0}")]
Signature(#[from] anyhow::Error),
#[error("stream {0}")]
Stream(#[source] anyhow::Error),
#[error(transparent)]
Stream(#[from] ctx::Error),
}

pub(super) async fn outbound(
Expand All @@ -76,10 +76,10 @@ pub(super) async fn outbound(
},
)
.await
.map_err(Error::Stream)?;
.wrap("send_proto()")?;
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
.wrap("recv_proto()")?;
if h.genesis != genesis {
return Err(Error::GenesisMismatch);
}
Expand All @@ -103,7 +103,7 @@ pub(super) async fn inbound(
let session_id = node::SessionId(stream.id().encode());
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
.wrap("recv_proto()")?;
if h.genesis != genesis {
return Err(Error::GenesisMismatch);
}
Expand All @@ -120,6 +120,6 @@ pub(super) async fn inbound(
},
)
.await
.map_err(Error::Stream)?;
.wrap("send_proto()")?;
Ok(h.session_id.key)
}
2 changes: 1 addition & 1 deletion node/actors/network/src/consensus/handshake/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async fn test_invalid_signature() {
frame::send_proto(ctx, &mut s1, &h).await
});
match inbound(ctx, &key0, genesis, &mut s0).await {
Err(Error::Signature(..)) => anyhow::Ok(()),
Err(Error::Signature(..)) => Ok(()),
res => panic!("unexpected res: {res:?}"),
}
})
Expand Down
8 changes: 4 additions & 4 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{io, metrics, preface, rpc, testonly};
use assert_matches::assert_matches;
use rand::Rng;
use std::collections::HashSet;
use zksync_concurrency::{ctx, net, scope, testonly::abort_on_panic};
use zksync_concurrency::{ctx, error::Wrap as _, net, scope, testonly::abort_on_panic};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::testonly::TestMemoryStorage;
use zksync_consensus_utils::enum_util::Variant as _;
Expand Down Expand Up @@ -181,11 +181,11 @@ async fn test_genesis_mismatch() {

tracing::info!("Accept a connection with mismatching genesis.");
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("accept()")?;
.await
.wrap("accept()")?;
let (mut stream, endpoint) = preface::accept(ctx, stream)
.await
.context("preface::accept()")?;
.wrap("preface::accept()")?;
assert_eq!(endpoint, preface::Endpoint::ConsensusNet);
tracing::info!("Expect the handshake to fail");
let res = handshake::inbound(ctx, &setup.validator_keys[1], rng.gen(), &mut stream).await;
Expand Down
38 changes: 26 additions & 12 deletions node/actors/network/src/frame.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Simple frame encoding format (length ++ value) for protobuf messages,
//! since protobuf messages do not have delimiters.
use crate::{mux, noise::bytes};
use anyhow::Context as _;
use zksync_concurrency::{ctx, io};

/// Reads a raw frame of bytes from the stream and interprets it as proto.
Expand Down Expand Up @@ -53,28 +54,41 @@ pub(crate) async fn recv_proto<T: zksync_protobuf::ProtoFmt, S: io::AsyncRead +
ctx: &ctx::Ctx,
stream: &mut S,
max_size: usize,
) -> anyhow::Result<T> {
) -> ctx::Result<T> {
let mut msg_size = [0u8; 4];
io::read_exact(ctx, stream, &mut msg_size).await??;
io::read_exact(ctx, stream, &mut msg_size)
.await?
.context("read_exact(len)")?;
let msg_size = u32::from_le_bytes(msg_size);
anyhow::ensure!(
msg_size as usize <= max_size,
"message too large: max = {max_size}, got {msg_size}",
);
if msg_size as usize > max_size {
return Err(
anyhow::format_err!("message too large: max = {max_size}, got {msg_size}",).into(),
);
}
let mut msg = vec![0u8; msg_size as usize];
io::read_exact(ctx, stream, &mut msg[..]).await??;
zksync_protobuf::decode(&msg)
io::read_exact(ctx, stream, &mut msg[..])
.await?
.context("read_exact(msg)")?;
Ok(zksync_protobuf::decode(&msg).context("decode()")?)
}

/// Sends a proto serialized to a raw frame of bytes to the stream.
pub(crate) async fn send_proto<T: zksync_protobuf::ProtoFmt, S: io::AsyncWrite + Unpin>(
ctx: &ctx::Ctx,
stream: &mut S,
msg: &T,
) -> anyhow::Result<()> {
) -> ctx::Result<()> {
let msg = zksync_protobuf::encode(msg);
io::write_all(ctx, stream, &u32::to_le_bytes(msg.len().try_into()?)).await??;
io::write_all(ctx, stream, &msg).await??;
io::flush(ctx, stream).await??;
io::write_all(
ctx,
stream,
&u32::to_le_bytes(msg.len().try_into().context("msg.len()")?),
)
.await?
.context("write(len)")?;
io::write_all(ctx, stream, &msg)
.await?
.context("write(msg)")?;
io::flush(ctx, stream).await?.context("flush")?;
Ok(())
}
12 changes: 6 additions & 6 deletions node/actors/network/src/gossip/handshake/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{frame, noise, proto::gossip as proto, GossipConfig};
use anyhow::Context as _;
use zksync_concurrency::{ctx, time};
use zksync_concurrency::{ctx, error::Wrap as _, time};
use zksync_consensus_crypto::ByteFmt;
use zksync_consensus_roles::{node, validator};
use zksync_protobuf::{kB, read_required, required, ProtoFmt};
Expand Down Expand Up @@ -60,7 +60,7 @@ pub(super) enum Error {
#[error(transparent)]
Signature(#[from] node::InvalidSignatureError),
#[error(transparent)]
Stream(anyhow::Error),
Stream(#[from] ctx::Error),
}

pub(super) async fn outbound(
Expand All @@ -82,10 +82,10 @@ pub(super) async fn outbound(
},
)
.await
.map_err(Error::Stream)?;
.wrap("send_proto()")?;
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
.wrap("recv_proto()")?;
if h.genesis != genesis {
return Err(Error::GenesisMismatch);
}
Expand All @@ -109,7 +109,7 @@ pub(super) async fn inbound(
let session_id = node::SessionId(stream.id().encode());
let h: Handshake = frame::recv_proto(ctx, stream, MAX_FRAME)
.await
.map_err(Error::Stream)?;
.wrap("recv_proto()")?;
if h.session_id.msg != session_id {
return Err(Error::SessionIdMismatch);
}
Expand All @@ -127,6 +127,6 @@ pub(super) async fn inbound(
},
)
.await
.map_err(Error::Stream)?;
.wrap("send_proto()")?;
Ok(h.session_id.key)
}
2 changes: 1 addition & 1 deletion node/actors/network/src/gossip/handshake/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ async fn test_invalid_signature() {
frame::send_proto(ctx, &mut s1, &h).await
});
match inbound(ctx, &cfg0, genesis, &mut s0).await {
Err(Error::Signature(..)) => anyhow::Ok(()),
Err(Error::Signature(..)) => Ok(()),
res => panic!("unexpected res: {res:?}"),
}
})
Expand Down
10 changes: 6 additions & 4 deletions node/actors/network/src/gossip/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::{
};
use tracing::Instrument as _;
use zksync_concurrency::{
ctx, net, scope, sync,
ctx,
error::Wrap as _,
net, scope, sync,
testonly::{abort_on_panic, set_timeout},
time,
};
Expand Down Expand Up @@ -351,11 +353,11 @@ async fn test_genesis_mismatch() {

tracing::info!("Accept a connection with mismatching genesis.");
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("accept()")?;
.await
.wrap("accept()")?;
let (mut stream, endpoint) = preface::accept(ctx, stream)
.await
.context("preface::accept()")?;
.wrap("preface::accept()")?;
assert_eq!(endpoint, preface::Endpoint::GossipNet);
tracing::info!("Expect the handshake to fail");
let res = handshake::inbound(ctx, &cfgs[1].gossip, rng.gen(), &mut stream).await;
Expand Down
7 changes: 4 additions & 3 deletions node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use tracing::Instrument as _;
use zksync_concurrency::{
ctx::{self, channel},
error::Wrap as _,
limiter, scope,
};
use zksync_consensus_storage::{BatchStore, BlockStore};
Expand Down Expand Up @@ -178,10 +179,10 @@ impl Runner {
loop {
accept_limiter.acquire(ctx, 1).await?;
let stream = metrics::MeteredStream::accept(ctx, &mut listener)
.await?
.context("accept()")?;
.await
.wrap("accept()")?;
s.spawn(async {
// This is a syscall which should always succeed on a correctly opened socket.
// May fail if the socket got closed.
let addr = stream.peer_addr().context("peer_addr()")?;
let res = async {
tracing::info!("new connection");
Expand Down
29 changes: 13 additions & 16 deletions node/actors/network/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! General-purpose network metrics.
use crate::Network;
use anyhow::Context as _;
use std::{
net::SocketAddr,
pin::Pin,
Expand Down Expand Up @@ -28,39 +28,36 @@ pub(crate) struct MeteredStream {

impl MeteredStream {
/// Opens a TCP connection to a remote host and returns a metered stream.
pub(crate) async fn connect(
ctx: &ctx::Ctx,
addr: SocketAddr,
) -> ctx::OrCanceled<io::Result<Self>> {
let io_result = net::tcp::connect(ctx, addr).await?;
Ok(io_result.map(|stream| Self::new(stream, Direction::Outbound)))
pub(crate) async fn connect(ctx: &ctx::Ctx, addr: SocketAddr) -> ctx::Result<Self> {
let stream = net::tcp::connect(ctx, addr).await?.context("connect()")?;
Ok(Self::new(stream, Direction::Outbound)?)
}

/// Accepts an inbound connection and returns a metered stream.
pub(crate) async fn accept(
ctx: &ctx::Ctx,
listener: &mut net::tcp::Listener,
) -> ctx::OrCanceled<io::Result<Self>> {
let io_result = net::tcp::accept(ctx, listener).await?;
Ok(io_result.map(|stream| Self::new(stream, Direction::Inbound)))
) -> ctx::Result<Self> {
let stream = net::tcp::accept(ctx, listener).await?.context("accept()")?;
Ok(Self::new(stream, Direction::Inbound)?)
}

#[cfg(test)]
pub(crate) async fn test_pipe(ctx: &ctx::Ctx) -> (Self, Self) {
let (outbound_stream, inbound_stream) = net::tcp::testonly::pipe(ctx).await;
let outbound_stream = Self::new(outbound_stream, Direction::Outbound);
let inbound_stream = Self::new(inbound_stream, Direction::Inbound);
let outbound_stream = Self::new(outbound_stream, Direction::Outbound).unwrap();
let inbound_stream = Self::new(inbound_stream, Direction::Inbound).unwrap();
(outbound_stream, inbound_stream)
}

fn new(stream: net::tcp::Stream, direction: Direction) -> Self {
fn new(stream: net::tcp::Stream, direction: Direction) -> anyhow::Result<Self> {
TCP_METRICS.established[&direction].inc();
let addr = stream.peer_addr().expect("Invalid address");
Self {
let addr = stream.peer_addr().context("peer_addr()")?;
Ok(Self {
stream,
stats: Arc::new(MeteredStreamStats::new(addr)),
_active: TCP_METRICS.active[&direction].inc_guard(1),
}
})
}

/// Returns a reference to the the Stream values for inspection
Expand Down
20 changes: 14 additions & 6 deletions node/actors/network/src/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
use crate::{frame, noise::bytes};
use anyhow::Context as _;
use std::{collections::BTreeMap, sync::Arc};
use zksync_concurrency::{ctx, ctx::channel, io, scope, sync};
use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, io, scope, sync};

mod config;
mod handshake;
Expand Down Expand Up @@ -282,15 +282,23 @@ impl Mux {
) -> Result<(), RunError> {
self.verify().map_err(RunError::Config)?;
let (mut read, mut write) = io::split(transport);
let handshake: Handshake = scope::run!(ctx, |ctx, s| async {
let res = scope::run!(ctx, |ctx, s| async {
s.spawn(async {
let h = self.handshake();
frame::send_proto(ctx, &mut write, &h).await
frame::send_proto(ctx, &mut write, &h)
.await
.wrap("send_proto()")
});
frame::recv_proto(ctx, &mut read, handshake::MAX_FRAME).await
frame::recv_proto(ctx, &mut read, handshake::MAX_FRAME)
.await
.wrap("recv_proto()")
})
.await
.map_err(RunError::Protocol)?;
.await;

let handshake = res.map_err(|err| match err {
ctx::Error::Canceled(err) => RunError::Canceled(err),
ctx::Error::Internal(err) => RunError::Protocol(err),
})?;

let (write_send, write_recv) = channel::bounded(1);
let flush = Arc::new(sync::Notify::new());
Expand Down
Loading

0 comments on commit b77b8a5

Please sign in to comment.