From 89497b4c24b133d37ce68ec919b79fd73c4e2f45 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Mon, 13 Jan 2025 00:00:00 +0000 Subject: [PATCH] refactor(http/retry): refactor `PeekTrailersBody` logic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit this is a squashed commit containing the following: --- refactor(http/retry): decompose `WithPeekTrailersBody` type alias this commit breaks this large type out into two halves. this is a purely cosmetic change. Signed-off-by: katelyn martin refactor(http/retry): `PeekTrailersBody` is pin projected we must pass our `Pin`'edness down to the inner `B`-typed body for `PeekTrailersBody` to itself implement `http_body::Body`. this commit tweaks the existing code to rely on the `pin-project` library. this generates a `project()` method to pin inner fields whose `poll_data()` and `poll_trailers()` functions we delegate to. this is a noöp change. Signed-off-by: katelyn martin refactor(http/retry): defer construction of `PeekTrailersBody` this commit refactors the polling logic in `PeekTrailersBody::read_response`. this commit makes some subtle changes with the migration to hyper 1.0 in mind, to make this function more easily portable to the new signature of `http_body::Body`. see https://github.com/linkerd/linkerd2/issues/8733 for more information. this commit defers the `Self` construction of the `PeekTrailersBody` body. this means that the control flow does not need to reach through to e.g. `body.inner` to poll the inner body being peeked. additionally, it provides us with `let` bindings for the first data frame yielded, and the trailers frame yielded thereafter. this is largely cosmetic, but will make it easier to deal with the additional matching we'll need to do when there is a single polling function that yields us `Frame` objects. Signed-off-by: katelyn martin refactor(http/retry): `PeekTrailersBody` transforms `B` this is a small structural change to the `PeekTrailersBody::read_response()` function to facilitate writing some unit tests. rather than transforming a `Response` into a `Response>`, we hoist the `Response::into_parts()` and `Response::from_parts()` calls up. `read_response()` is renamed to `read_body()`. Signed-off-by: katelyn martin --- Cargo.lock | 1 + linkerd/http/retry/Cargo.toml | 1 + linkerd/http/retry/src/peek_trailers.rs | 97 +++++++++++++++---------- 3 files changed, 61 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a855ad89a..b5dbbdb225 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1806,6 +1806,7 @@ dependencies = [ "linkerd-stack", "linkerd-tracing", "parking_lot", + "pin-project", "thiserror 2.0.11", "tokio", "tower", diff --git a/linkerd/http/retry/Cargo.toml b/linkerd/http/retry/Cargo.toml index a59da0d9ab..09e15bd356 100644 --- a/linkerd/http/retry/Cargo.toml +++ b/linkerd/http/retry/Cargo.toml @@ -12,6 +12,7 @@ futures = { version = "0.3", default-features = false } http-body = { workspace = true } http = { workspace = true } parking_lot = "0.12" +pin-project = "1" tokio = { version = "1", features = ["macros", "rt"] } tower = { version = "0.4", features = ["retry"] } tracing = "0.1" diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index a2a362d537..c221fa4e83 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -4,6 +4,7 @@ use futures::{ }; use http_body::Body; use linkerd_http_box::BoxBody; +use pin_project::pin_project; use std::{ future::Future, pin::Pin, @@ -15,10 +16,12 @@ use std::{ /// /// If the first frame of the body stream was *not* a `TRAILERS` frame, this /// behaves identically to a normal body. +#[pin_project(project = Projection)] pub struct PeekTrailersBody { /// The inner [`Body`]. /// /// This is the request or response body whose trailers are being peeked. + #[pin] inner: B, /// The first DATA frame received from the inner body, or an error that @@ -41,10 +44,13 @@ pub struct PeekTrailersBody { trailers: Option, B::Error>>, } -pub type WithPeekTrailersBody = Either< - futures::future::Ready>>, - Pin>> + Send + 'static>>, ->; +/// A future that yields a response instrumented with [`PeekTrailersBody`]. +pub type WithPeekTrailersBody = Either, ReadingResponse>; +/// A future that immediately yields a response. +type ReadyResponse = future::Ready>>; +/// A boxed future that must poll a body before yielding a response. +type ReadingResponse = + Pin>> + Send + 'static>>; // === impl WithTrailers === @@ -81,46 +87,51 @@ impl PeekTrailersBody { } // Otherwise, return a future that tries to read the next frame. - Either::Right(Box::pin(Self::read_response(rsp))) + Either::Right(Box::pin(async move { + let (parts, body) = rsp.into_parts(); + let body = Self::read_body(body).await; + http::Response::from_parts(parts, body) + })) } - async fn read_response(rsp: http::Response) -> http::Response + async fn read_body(mut body: B) -> Self where B: Send + Unpin, B::Data: Send + Unpin, B::Error: Send, { - let (parts, body) = rsp.into_parts(); - let mut body = Self { - inner: body, - first_data: None, - trailers: None, - }; - + // First, poll the body for its first frame. tracing::debug!("Buffering first data frame"); - if let Some(data) = body.inner.data().await { - // The body has data; stop waiting for trailers. - body.first_data = Some(data); - - // Peek to see if there's immediately a trailers frame, and grab - // it if so. Otherwise, bail. - // XXX(eliza): the documentation for the `http::Body` trait says - // that `poll_trailers` should only be called after `poll_data` - // returns `None`...but, in practice, I'm fairly sure that this just - // means that it *will not return `Ready`* until there are no data - // frames left, which is fine for us here, because we `now_or_never` - // it. - body.trailers = body.inner.trailers().now_or_never(); + let first_data = body.data().await; + + // Now, inspect the frame yielded. If the body yielded a data frame, we will only peek + // the trailers if they are immediately available. If the body did not yield a data frame, + // we will poll a future to yield the trailers. + let trailers = if first_data.is_some() { + // The body has data; stop waiting for trailers. Peek to see if there's immediately a + // trailers frame, and grab it if so. Otherwise, bail. + // + // XXX(eliza): the documentation for the `http::Body` trait says that `poll_trailers` + // should only be called after `poll_data` returns `None`...but, in practice, I'm + // fairly sure that this just means that it *will not return `Ready`* until there are + // no data frames left, which is fine for us here, because we `now_or_never` it. + body.trailers().now_or_never() } else { - // Okay, `poll_data` has returned `None`, so there are no data - // frames left. Let's see if there's trailers... - body.trailers = Some(body.inner.trailers().await); - } - if body.trailers.is_some() { + // Okay, `poll_data` has returned `None`, so there are no data frames left. Let's see + // if there's trailers... + let trls = body.trailers().await; + Some(trls) + }; + + if trailers.is_some() { tracing::debug!("Buffered trailers frame"); } - http::Response::from_parts(parts, body) + Self { + inner: body, + first_data, + trailers, + } } /// Returns a response with an inert [`PeekTrailersBody`]. @@ -148,24 +159,34 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let this = self.get_mut(); - if let Some(first_data) = this.first_data.take() { + let Projection { + inner, + first_data, + trailers: _, + } = self.project(); + + if let Some(first_data) = first_data.take() { return Poll::Ready(Some(first_data)); } - Pin::new(&mut this.inner).poll_data(cx) + inner.poll_data(cx) } fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let this = self.get_mut(); - if let Some(trailers) = this.trailers.take() { + let Projection { + inner, + first_data: _, + trailers, + } = self.project(); + + if let Some(trailers) = trailers.take() { return Poll::Ready(trailers); } - Pin::new(&mut this.inner).poll_trailers(cx) + inner.poll_trailers(cx) } #[inline]