From bd8de8355e2e639353590411c86853ccb6c8c9fe Mon Sep 17 00:00:00 2001 From: elderhammer <81279860+elderhammer@users.noreply.github.com> Date: Fri, 14 Jun 2024 19:50:07 +0800 Subject: [PATCH 1/5] deserialize big block on rayon thread --- node/bft/src/gateway.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 60af4c788f..8ebe4e827f 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -631,8 +631,21 @@ impl Gateway { if let Some(sync_sender) = self.sync_sender.get() { // Retrieve the block response. let BlockResponse { request, blocks } = block_response; + // Perform the deferred non-blocking deserialization of the blocks. - let blocks = blocks.deserialize().await.map_err(|error| anyhow!("[BlockResponse] {error}"))?; + // The deserialization can take a long time (minutes). We should not be running + // this on a blocking task, but on a rayon thread pool. + let (send, recv) = tokio::sync::oneshot::channel(); + rayon::spawn_fifo(move || { + let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}")); + let _ = send.send(blocks); + }); + let blocks = match recv.await { + Ok(Ok(blocks)) => blocks, + Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"), + Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"), + }; + // Ensure the block response is well-formed. blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?; // Send the blocks to the sync module. From ed53d3e9ad03951254725a4b030d16ccaa9c6f24 Mon Sep 17 00:00:00 2001 From: elderhammer <81279860+elderhammer@users.noreply.github.com> Date: Tue, 18 Jun 2024 10:35:02 +0800 Subject: [PATCH 2/5] handle BlockRequest and BlockResponse messages in a separate task --- node/bft/src/gateway.rs | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 8ebe4e827f..1371e2fb21 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -968,6 +968,23 @@ impl Gateway { } } } + + /// Processes a message received from the network. + async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event) -> io::Result<()> { + // Process the message. Disconnect if the peer violated the protocol. + if let Err(error) = self.inbound(peer_addr, message).await { + if let Some(peer_ip) = self.resolver.get_listener(peer_addr) { + warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}"); + let self_ = self.clone(); + tokio::spawn(async move { + Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await; + // Disconnect from this peer. + self_.disconnect(peer_ip); + }); + } + } + Ok(()) + } } #[async_trait] @@ -1058,17 +1075,15 @@ impl Reading for Gateway { /// Processes a message received from the network. async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { - // Process the message. Disconnect if the peer violated the protocol. - if let Err(error) = self.inbound(peer_addr, message).await { - if let Some(peer_ip) = self.resolver.get_listener(peer_addr) { - warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}"); - let self_ = self.clone(); - tokio::spawn(async move { - Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await; - // Disconnect from this peer. - self_.disconnect(peer_ip); - }); - } + if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) { + let self_ = self.clone(); + // Handle BlockRequest and BlockResponse messages in a separate task to not block the + // inbound queue. + tokio::spawn(async move { + self_.process_message_inner(peer_addr, message).await; + }); + } else { + self.process_message_inner(peer_addr, message).await; } Ok(()) } From eb32416da8899405ed8c964db4c1c4584624b43c Mon Sep 17 00:00:00 2001 From: Haoran <81279860+elderhammer@users.noreply.github.com> Date: Mon, 24 Jun 2024 22:01:44 +0800 Subject: [PATCH 3/5] Update node/bft/src/gateway.rs ignore the result of process_message_inner Co-authored-by: Jos Dehaes --- node/bft/src/gateway.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 1371e2fb21..6928ae044f 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -1080,7 +1080,7 @@ impl Reading for Gateway { // Handle BlockRequest and BlockResponse messages in a separate task to not block the // inbound queue. tokio::spawn(async move { - self_.process_message_inner(peer_addr, message).await; + let _ = self_.process_message_inner(peer_addr, message).await; }); } else { self.process_message_inner(peer_addr, message).await; From 204c1101b58f52ffad41d75791599b5b28c636fe Mon Sep 17 00:00:00 2001 From: elderhammer <81279860+elderhammer@users.noreply.github.com> Date: Wed, 26 Jun 2024 21:26:05 +0800 Subject: [PATCH 4/5] process_message_inner does not need to return Result bcz infallible --- node/bft/src/gateway.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 6928ae044f..8b1c9a663b 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -970,7 +970,7 @@ impl Gateway { } /// Processes a message received from the network. - async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event) -> io::Result<()> { + async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event) { // Process the message. Disconnect if the peer violated the protocol. if let Err(error) = self.inbound(peer_addr, message).await { if let Some(peer_ip) = self.resolver.get_listener(peer_addr) { @@ -983,7 +983,6 @@ impl Gateway { }); } } - Ok(()) } } From db5efd1df5e8682b1914694b4c3d69b0eb300a4d Mon Sep 17 00:00:00 2001 From: elderhammer <81279860+elderhammer@users.noreply.github.com> Date: Wed, 26 Jun 2024 21:37:44 +0800 Subject: [PATCH 5/5] no longer need to ignore the result of process_message_inner --- node/bft/src/gateway.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 8b1c9a663b..a975bb24a6 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -1079,7 +1079,7 @@ impl Reading for Gateway { // Handle BlockRequest and BlockResponse messages in a separate task to not block the // inbound queue. tokio::spawn(async move { - let _ = self_.process_message_inner(peer_addr, message).await; + self_.process_message_inner(peer_addr, message).await; }); } else { self.process_message_inner(peer_addr, message).await;