Skip to content

Commit

Permalink
Bandaid for intermittent hanging when hyper::body::to_bytes is called
Browse files Browse the repository at this point in the history
  • Loading branch information
ashbeitz committed Dec 5, 2023
1 parent 79efe56 commit 368ac4e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,16 @@ Once you have installed the prerequisites, go to your enlistment's root director

This should build all of the libraries and executables.

### Tokio Console Support
### <a name="tokio-console-support">Tokio Console Support</a>

Ibeji has support for using the [tokio console](https://github.com/tokio-rs/console) for advanced debugging. To enable this support, you need to build with the `tokio_console` feature enabled and with the `tokio_unstable` config flag for the rust compiler:

```bash
RUSTFLAGS="--cfg tokio_unstable" cargo build --features tokio_console
```

Read the [tokio console documentation](https://github.com/tokio-rs/console) to learn how to install tokio console and how to run it.

Note that the tokio console will intercept trace-level logs, so these will not be visible when debugging with the tokio console.

## <a name="running-the-tests">Running the Tests</a>
Expand Down
1 change: 1 addition & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
license = "MIT"

[dependencies]
async-std = { workspace = true }
bytes = { workspace = true }
config = { workspace = true }
core-protobuf-data-access = { path = "../protobuf_data_access" }
Expand Down
38 changes: 26 additions & 12 deletions core/common/src/grpc_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use dyn_clone::DynClone;
use futures_core::task::{Context, Poll};
use http::uri::Uri;
use http_body::Body;
use hyper::{body::HttpBody, Method};
use log::warn;
use regex::Regex;
use std::error::Error;
Expand Down Expand Up @@ -154,19 +155,33 @@ where
let interceptor = self.interceptor.clone();

let (service_name, method_name) = Self::retrieve_grpc_names_from_uri(request.uri());
let is_applicable = interceptor.is_applicable(&service_name, &method_name);
let is_applicable = interceptor.is_applicable(&service_name, &method_name) && (request.method() == Method::POST);

if is_applicable && interceptor.must_handle_request() {
let (parts, body) = request.into_parts();
let mut body_bytes: Bytes =
match futures::executor::block_on(hyper::body::to_bytes(body)) {
Ok(bytes) => bytes,
Err(err) => {
return Box::pin(async move {
Err(Box::new(err) as Box<dyn std::error::Error + Sync + Send>)
})
}
};

// Rust requires that we initilaize body_bytes_timeout_result.
// Note: We will never use the initilaized value, so using an Ok value is fine.
let mut body_bytes_timeout_result = Ok(Ok(Bytes::new()));
// There is a known issue where hyper::body::to_bytes sometimes hangs in the code below.
// We will use a timeout to break out when this happens.
futures::executor::block_on(async {
body_bytes_timeout_result = async_std::future::timeout(core::time::Duration::from_secs(5), hyper::body::to_bytes(body)).await;
});
let mut body_bytes: Bytes = match body_bytes_timeout_result {
Ok(Ok(bytes)) => bytes,
Ok(Err(err)) => {
return Box::pin(async move {
Err(Box::new(err) as Box<dyn std::error::Error + Sync + Send>)
});
},
Err(err) => {
return Box::pin(async move {
Err(Box::new(err) as Box<dyn std::error::Error + Sync + Send>)
});
}
};

let protobuf_message_bytes: Bytes = body_bytes.split_off(GRPC_HEADER_LENGTH);
let grpc_header_bytes = body_bytes;
let new_protobuf_message_bytes: Bytes = match interceptor.handle_request(
Expand Down Expand Up @@ -211,8 +226,7 @@ where
vec![Ok(grpc_header_bytes), Ok(new_protobuf_message_bytes)];
let stream = futures_util::stream::iter(new_body_chunks);
let new_body = tonic::transport::Body::wrap_stream(stream);
let new_box_body =
new_body.map_err(|e| tonic::Status::from_error(Box::new(e))).boxed_unsync();
let new_box_body = HttpBody::map_err(new_body, |e| tonic::Status::from_error(Box::new(e))).boxed_unsync();
response = http::response::Response::from_parts(parts, new_box_body);
}

Expand Down

0 comments on commit 368ac4e

Please sign in to comment.