diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 60af4c788f..a975bb24a6 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -631,8 +631,21 @@ impl Gateway { if let Some(sync_sender) = self.sync_sender.get() { // Retrieve the block response. let BlockResponse { request, blocks } = block_response; + // Perform the deferred non-blocking deserialization of the blocks. - let blocks = blocks.deserialize().await.map_err(|error| anyhow!("[BlockResponse] {error}"))?; + // The deserialization can take a long time (minutes). We should not be running + // this on a blocking task, but on a rayon thread pool. + let (send, recv) = tokio::sync::oneshot::channel(); + rayon::spawn_fifo(move || { + let blocks = blocks.deserialize_blocking().map_err(|error| anyhow!("[BlockResponse] {error}")); + let _ = send.send(blocks); + }); + let blocks = match recv.await { + Ok(Ok(blocks)) => blocks, + Ok(Err(error)) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"), + Err(error) => bail!("Peer '{peer_ip}' sent an invalid block response - {error}"), + }; + // Ensure the block response is well-formed. blocks.ensure_response_is_well_formed(peer_ip, request.start_height, request.end_height)?; // Send the blocks to the sync module. @@ -955,6 +968,22 @@ impl Gateway { } } } + + /// Processes a message received from the network. + async fn process_message_inner(&self, peer_addr: SocketAddr, message: Event) { + // Process the message. Disconnect if the peer violated the protocol. + if let Err(error) = self.inbound(peer_addr, message).await { + if let Some(peer_ip) = self.resolver.get_listener(peer_addr) { + warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}"); + let self_ = self.clone(); + tokio::spawn(async move { + Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await; + // Disconnect from this peer. + self_.disconnect(peer_ip); + }); + } + } + } } #[async_trait] @@ -1045,17 +1074,15 @@ impl Reading for Gateway { /// Processes a message received from the network. async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { - // Process the message. Disconnect if the peer violated the protocol. - if let Err(error) = self.inbound(peer_addr, message).await { - if let Some(peer_ip) = self.resolver.get_listener(peer_addr) { - warn!("{CONTEXT} Disconnecting from '{peer_ip}' - {error}"); - let self_ = self.clone(); - tokio::spawn(async move { - Transport::send(&self_, peer_ip, DisconnectReason::ProtocolViolation.into()).await; - // Disconnect from this peer. - self_.disconnect(peer_ip); - }); - } + if matches!(message, Event::BlockRequest(_) | Event::BlockResponse(_)) { + let self_ = self.clone(); + // Handle BlockRequest and BlockResponse messages in a separate task to not block the + // inbound queue. + tokio::spawn(async move { + self_.process_message_inner(peer_addr, message).await; + }); + } else { + self.process_message_inner(peer_addr, message).await; } Ok(()) }