diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..dbfceb6b1f --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "http-body"] + path = http-body + url = git@github.com:olix0r/http-body.git +[submodule "hyper"] + path = hyper + url = git@github.com:olix0r/hyper.git diff --git a/Cargo.lock b/Cargo.lock index 5028f764c8..d28aa85573 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -830,8 +830,6 @@ dependencies = [ [[package]] name = "http-body" version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", "http", @@ -859,8 +857,6 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" version = "0.14.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" dependencies = [ "bytes", "futures-channel", diff --git a/Cargo.toml b/Cargo.toml index cf64691988..e6622eefeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,15 @@ members = [ "tools", ] +exclude = [ + "http-body", + "hyper", +] + [profile.release] debug = 1 lto = true + +[patch.crates-io] +http-body = { path = "http-body" } +hyper = { path = "hyper" } diff --git a/http-body b/http-body new file mode 160000 index 0000000000..30db493e9d --- /dev/null +++ b/http-body @@ -0,0 +1 @@ +Subproject commit 30db493e9d25bb576a978ddef7c0c728bdc01071 diff --git a/hyper b/hyper new file mode 160000 index 0000000000..542b48da09 --- /dev/null +++ b/hyper @@ -0,0 +1 @@ +Subproject commit 542b48da0906139744be3c8434927e9178839f46 diff --git a/hyper-balance/src/lib.rs b/hyper-balance/src/lib.rs index 4531212e32..01e5b67826 100644 --- a/hyper-balance/src/lib.rs +++ b/hyper-balance/src/lib.rs @@ -116,6 +116,11 @@ where Poll::Ready(ret) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().body.poll_progress(cx) + } + fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/app/core/src/errors/respond.rs b/linkerd/app/core/src/errors/respond.rs index 0b57fb36a6..a4abc30288 100644 --- a/linkerd/app/core/src/errors/respond.rs +++ b/linkerd/app/core/src/errors/respond.rs @@ -469,6 +469,13 @@ where } } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + ResponseBodyProj::Passthru(inner) => inner.poll_progress(cx), + ResponseBodyProj::GrpcRescue { inner, .. } => inner.poll_progress(cx), + } + } + #[inline] fn poll_trailers( self: Pin<&mut Self>, diff --git a/linkerd/http/box/src/body.rs b/linkerd/http/box/src/body.rs index 16f2ff4181..e149b5158d 100644 --- a/linkerd/http/box/src/body.rs +++ b/linkerd/http/box/src/body.rs @@ -58,6 +58,14 @@ impl Body for BoxBody { self.as_mut().inner.as_mut().poll_data(cx) } + #[inline] + fn poll_progress( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.as_mut().inner.as_mut().poll_progress(cx) + } + #[inline] fn poll_trailers( mut self: Pin<&mut Self>, @@ -116,12 +124,17 @@ where })) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_progress(cx).map_err(Into::into) + } + #[inline] fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>, Self::Error>> { - Poll::Ready(futures::ready!(self.project().0.poll_trailers(cx)).map_err(Into::into)) + self.project().0.poll_trailers(cx).map_err(Into::into) } #[inline] diff --git a/linkerd/http/metrics/src/requests/service.rs b/linkerd/http/metrics/src/requests/service.rs index ad1af7ecbc..9935c5fb88 100644 --- a/linkerd/http/metrics/src/requests/service.rs +++ b/linkerd/http/metrics/src/requests/service.rs @@ -283,6 +283,12 @@ where Poll::Ready(frame) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + + #[inline] fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -422,6 +428,11 @@ where Poll::Ready(frame) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx).map_err(Into::into) + } + fn poll_trailers( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 662ec68429..96256c1b2e 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -251,6 +251,21 @@ where Poll::Ready(Some(Ok(Data::Initial(chunk)))) } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let state = Self::acquire_state(&mut this.state, &this.shared.body); + tracing::trace!("ReplayBody::poll_progress"); + + if let Some(rest) = state.rest.as_mut() { + // If the inner body has previously ended, don't poll it again. + if !rest.is_end_stream() { + return Pin::new(rest).poll_progress(cx).map_err(Into::into); + } + } + + Poll::Ready(Ok(())) + } + fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/http/retry/src/with_trailers.rs b/linkerd/http/retry/src/with_trailers.rs index eb92c855f1..39b62d6f54 100644 --- a/linkerd/http/retry/src/with_trailers.rs +++ b/linkerd/http/retry/src/with_trailers.rs @@ -140,6 +140,11 @@ where Pin::new(&mut this.inner).poll_data(cx) } + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.inner).poll_progress(cx) + } + fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/proxy/http/src/classify/channel.rs b/linkerd/proxy/http/src/classify/channel.rs index 413887345b..3bb6bf87ae 100644 --- a/linkerd/proxy/http/src/classify/channel.rs +++ b/linkerd/proxy/http/src/classify/channel.rs @@ -232,6 +232,11 @@ where } } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/proxy/http/src/glue.rs b/linkerd/proxy/http/src/glue.rs index 16c4dde464..352e0421fe 100644 --- a/linkerd/proxy/http/src/glue.rs +++ b/linkerd/proxy/http/src/glue.rs @@ -49,6 +49,7 @@ pub struct HyperConnectFuture { } // === impl UpgradeBody === +// Note: There's no poll_progress implementation in hyper::Body. impl HttpBody for UpgradeBody { type Data = Bytes; type Error = hyper::Error; diff --git a/linkerd/proxy/http/src/orig_proto.rs b/linkerd/proxy/http/src/orig_proto.rs index 858b7a8431..cc45444b00 100644 --- a/linkerd/proxy/http/src/orig_proto.rs +++ b/linkerd/proxy/http/src/orig_proto.rs @@ -203,6 +203,7 @@ impl HttpBody for UpgradeResponseBody { self.inner.is_end_stream() } + #[inline] fn poll_data( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -212,6 +213,14 @@ impl HttpBody for UpgradeResponseBody { .map_err(downgrade_h2_error) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(self.project().inner) + .poll_progress(cx) + .map_err(downgrade_h2_error) + } + + #[inline] fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/linkerd/proxy/http/src/retain.rs b/linkerd/proxy/http/src/retain.rs index e5ed0f9deb..78be057058 100644 --- a/linkerd/proxy/http/src/retain.rs +++ b/linkerd/proxy/http/src/retain.rs @@ -113,6 +113,11 @@ impl http_body::Body for RetainBody { self.project().inner.poll_data(cx) } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + #[inline] fn poll_trailers( self: Pin<&mut Self>, diff --git a/linkerd/proxy/tap/src/service.rs b/linkerd/proxy/tap/src/service.rs index fc70d5eee1..0013ce1cee 100644 --- a/linkerd/proxy/tap/src/service.rs +++ b/linkerd/proxy/tap/src/service.rs @@ -190,6 +190,11 @@ where } } + #[inline] + fn poll_progress(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_progress(cx) + } + fn poll_trailers( mut self: Pin<&mut Self>, cx: &mut Context<'_>,