Skip to content

Commit

Permalink
Bandaid for intermittent hanging when hyper::body::to_bytes is called
Browse files Browse the repository at this point in the history
  • Loading branch information
ashbeitz committed Dec 5, 2023
1 parent 368ac4e commit 35a31b2
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions core/common/src/grpc_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,32 +155,40 @@ where
let interceptor = self.interceptor.clone();

let (service_name, method_name) = Self::retrieve_grpc_names_from_uri(request.uri());
let is_applicable = interceptor.is_applicable(&service_name, &method_name) && (request.method() == Method::POST);
let is_applicable = interceptor.is_applicable(&service_name, &method_name)
&& (request.method() == Method::POST);

if is_applicable && interceptor.must_handle_request() {
let (parts, body) = request.into_parts();

// Rust requires that we initilaize body_bytes_timeout_result.
// Note: We will never use the initilaized value, so using an Ok value is fine.
let mut body_bytes_timeout_result = Ok(Ok(Bytes::new()));

// There is a known issue where hyper::body::to_bytes sometimes hangs in the code below.
// We will use a timeout to break out when this happens.
// We will use a timeout to break out when this happens. This fix is a bandaid. We will
// implement a better fix after we have upgraded to the latest major version of the hyper crate.
futures::executor::block_on(async {
body_bytes_timeout_result = async_std::future::timeout(core::time::Duration::from_secs(5), hyper::body::to_bytes(body)).await;
body_bytes_timeout_result = async_std::future::timeout(
core::time::Duration::from_secs(5),
hyper::body::to_bytes(body),
)
.await;
});

let mut body_bytes: Bytes = match body_bytes_timeout_result {
Ok(Ok(bytes)) => bytes,
Ok(Err(err)) => {
return Box::pin(async move {
Err(Box::new(err) as Box<dyn std::error::Error + Sync + Send>)
});
},
}
Err(err) => {
return Box::pin(async move {
Err(Box::new(err) as Box<dyn std::error::Error + Sync + Send>)
});
}
};
};

let protobuf_message_bytes: Bytes = body_bytes.split_off(GRPC_HEADER_LENGTH);
let grpc_header_bytes = body_bytes;
Expand Down Expand Up @@ -226,7 +234,9 @@ where
vec![Ok(grpc_header_bytes), Ok(new_protobuf_message_bytes)];
let stream = futures_util::stream::iter(new_body_chunks);
let new_body = tonic::transport::Body::wrap_stream(stream);
let new_box_body = HttpBody::map_err(new_body, |e| tonic::Status::from_error(Box::new(e))).boxed_unsync();
let new_box_body =
HttpBody::map_err(new_body, |e| tonic::Status::from_error(Box::new(e)))
.boxed_unsync();
response = http::response::Response::from_parts(parts, new_box_body);
}

Expand Down

0 comments on commit 35a31b2

Please sign in to comment.