From 368ac4e64fb50b3071d2dde7248a051d57a0bfc9 Mon Sep 17 00:00:00 2001 From: Ash Beitz <8304894+ashbeitz@users.noreply.github.com> Date: Tue, 5 Dec 2023 09:55:30 -0800 Subject: [PATCH] Bandaid for intermittent hanging when hyper::body::to_bytes is called --- README.md | 4 ++- core/common/Cargo.toml | 1 + core/common/src/grpc_interceptor.rs | 38 ++++++++++++++++++++--------- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 49fff3ec..75ce3775 100644 --- a/README.md +++ b/README.md @@ -108,7 +108,7 @@ Once you have installed the prerequisites, go to your enlistment's root director This should build all of the libraries and executables. -### Tokio Console Support +### Tokio Console Support Ibeji has support for using the [tokio console](https://github.com/tokio-rs/console) for advanced debugging. To enable this support, you need to build with the `tokio_console` feature enabled and with the `tokio_unstable` config flag for the rust compiler: @@ -116,6 +116,8 @@ Ibeji has support for using the [tokio console](https://github.com/tokio-rs/cons RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console ``` +Read the [tokio console documentation](https://github.com/tokio-rs/console) to learn how to install tokio console and how to run it. + Note that the tokio console will intercept trace-level logs, so these will not be visible when debugging with the tokio console. ## Running the Tests diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index fa15c191..8fa85569 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" license = "MIT" [dependencies] +async-std = { workspace = true } bytes = { workspace = true } config = { workspace = true } core-protobuf-data-access = { path = "../protobuf_data_access" } diff --git a/core/common/src/grpc_interceptor.rs b/core/common/src/grpc_interceptor.rs index f8f2d72d..b5178924 100644 --- a/core/common/src/grpc_interceptor.rs +++ b/core/common/src/grpc_interceptor.rs @@ -8,6 +8,7 @@ use dyn_clone::DynClone; use futures_core::task::{Context, Poll}; use http::uri::Uri; use http_body::Body; +use hyper::{body::HttpBody, Method}; use log::warn; use regex::Regex; use std::error::Error; @@ -154,19 +155,33 @@ where let interceptor = self.interceptor.clone(); let (service_name, method_name) = Self::retrieve_grpc_names_from_uri(request.uri()); - let is_applicable = interceptor.is_applicable(&service_name, &method_name); + let is_applicable = interceptor.is_applicable(&service_name, &method_name) && (request.method() == Method::POST); if is_applicable && interceptor.must_handle_request() { let (parts, body) = request.into_parts(); - let mut body_bytes: Bytes = - match futures::executor::block_on(hyper::body::to_bytes(body)) { - Ok(bytes) => bytes, - Err(err) => { - return Box::pin(async move { - Err(Box::new(err) as Box) - }) - } - }; + + // Rust requires that we initilaize body_bytes_timeout_result. + // Note: We will never use the initilaized value, so using an Ok value is fine. + let mut body_bytes_timeout_result = Ok(Ok(Bytes::new())); + // There is a known issue where hyper::body::to_bytes sometimes hangs in the code below. + // We will use a timeout to break out when this happens. + futures::executor::block_on(async { + body_bytes_timeout_result = async_std::future::timeout(core::time::Duration::from_secs(5), hyper::body::to_bytes(body)).await; + }); + let mut body_bytes: Bytes = match body_bytes_timeout_result { + Ok(Ok(bytes)) => bytes, + Ok(Err(err)) => { + return Box::pin(async move { + Err(Box::new(err) as Box) + }); + }, + Err(err) => { + return Box::pin(async move { + Err(Box::new(err) as Box) + }); + } + }; + let protobuf_message_bytes: Bytes = body_bytes.split_off(GRPC_HEADER_LENGTH); let grpc_header_bytes = body_bytes; let new_protobuf_message_bytes: Bytes = match interceptor.handle_request( @@ -211,8 +226,7 @@ where vec![Ok(grpc_header_bytes), Ok(new_protobuf_message_bytes)]; let stream = futures_util::stream::iter(new_body_chunks); let new_body = tonic::transport::Body::wrap_stream(stream); - let new_box_body = - new_body.map_err(|e| tonic::Status::from_error(Box::new(e))).boxed_unsync(); + let new_box_body = HttpBody::map_err(new_body, |e| tonic::Status::from_error(Box::new(e))).boxed_unsync(); response = http::response::Response::from_parts(parts, new_box_body); }