From 35a31b2797c31b076cc8846b49658f9d46254f34 Mon Sep 17 00:00:00 2001 From: Ash Beitz <8304894+ashbeitz@users.noreply.github.com> Date: Tue, 5 Dec 2023 10:01:03 -0800 Subject: [PATCH] Bandaid for intermittent hanging when hyper::body::to_bytes is called --- core/common/src/grpc_interceptor.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/core/common/src/grpc_interceptor.rs b/core/common/src/grpc_interceptor.rs index b5178924..54db1e09 100644 --- a/core/common/src/grpc_interceptor.rs +++ b/core/common/src/grpc_interceptor.rs @@ -155,7 +155,8 @@ where let interceptor = self.interceptor.clone(); let (service_name, method_name) = Self::retrieve_grpc_names_from_uri(request.uri()); - let is_applicable = interceptor.is_applicable(&service_name, &method_name) && (request.method() == Method::POST); + let is_applicable = interceptor.is_applicable(&service_name, &method_name) + && (request.method() == Method::POST); if is_applicable && interceptor.must_handle_request() { let (parts, body) = request.into_parts(); @@ -163,24 +164,31 @@ where // Rust requires that we initilaize body_bytes_timeout_result. // Note: We will never use the initilaized value, so using an Ok value is fine. let mut body_bytes_timeout_result = Ok(Ok(Bytes::new())); + // There is a known issue where hyper::body::to_bytes sometimes hangs in the code below. - // We will use a timeout to break out when this happens. + // We will use a timeout to break out when this happens. This fix is a bandaid. We will + // implement a better fix after we have upgraded to the latest major version of the hyper crate. futures::executor::block_on(async { - body_bytes_timeout_result = async_std::future::timeout(core::time::Duration::from_secs(5), hyper::body::to_bytes(body)).await; + body_bytes_timeout_result = async_std::future::timeout( + core::time::Duration::from_secs(5), + hyper::body::to_bytes(body), + ) + .await; }); + let mut body_bytes: Bytes = match body_bytes_timeout_result { Ok(Ok(bytes)) => bytes, Ok(Err(err)) => { return Box::pin(async move { Err(Box::new(err) as Box) }); - }, + } Err(err) => { return Box::pin(async move { Err(Box::new(err) as Box) }); } - }; + }; let protobuf_message_bytes: Bytes = body_bytes.split_off(GRPC_HEADER_LENGTH); let grpc_header_bytes = body_bytes; @@ -226,7 +234,9 @@ where vec![Ok(grpc_header_bytes), Ok(new_protobuf_message_bytes)]; let stream = futures_util::stream::iter(new_body_chunks); let new_body = tonic::transport::Body::wrap_stream(stream); - let new_box_body = HttpBody::map_err(new_body, |e| tonic::Status::from_error(Box::new(e))).boxed_unsync(); + let new_box_body = + HttpBody::map_err(new_body, |e| tonic::Status::from_error(Box::new(e))) + .boxed_unsync(); response = http::response::Response::from_parts(parts, new_box_body); }