Skip to content

Commit

Permalink
refactor(http/retry): refactor PeekTrailersBody<B> logic
Browse files Browse the repository at this point in the history
this is a squashed commit containing the following:

---

refactor(http/retry): decompose `WithPeekTrailersBody<B>` type alias

this commit breaks this large type out into two halves.

this is a purely cosmetic change.

Signed-off-by: katelyn martin <[email protected]>

refactor(http/retry): `PeekTrailersBody` is pin projected

we must pass our `Pin<T>`'edness down to the inner `B`-typed body for
`PeekTrailersBody` to itself implement `http_body::Body`.

this commit tweaks the existing code to rely on the `pin-project`
library. this generates a `project()` method to pin inner fields whose
`poll_data()` and `poll_trailers()` functions we delegate to.

this is a noöp change.

Signed-off-by: katelyn martin <[email protected]>

refactor(http/retry): defer construction of `PeekTrailersBody<B>`

this commit refactors the polling logic in
`PeekTrailersBody<B>::read_response`.

this commit makes some subtle changes with the migration to hyper 1.0 in
mind, to make this function more easily portable to the new signature of
`http_body::Body`.

see linkerd/linkerd2#8733 for more
information.

this commit defers the `Self` construction of the `PeekTrailersBody`
body. this means that the control flow does not need to reach through to
e.g. `body.inner` to poll the inner body being peeked. additionally, it
provides us with `let` bindings for the first data frame yielded, and
the trailers frame yielded thereafter.

this is largely cosmetic, but will make it easier to deal with the
additional matching we'll need to do when there is a single polling
function that yields us `Frame<D>` objects.

Signed-off-by: katelyn martin <[email protected]>

refactor(http/retry): `PeekTrailersBody` transforms `B`

this is a small structural change to the
`PeekTrailersBody::read_response()` function to facilitate writing some
unit tests.

rather than transforming a `Response<B>` into a
`Response<PeekTrailersBody<B>>`, we hoist the `Response::into_parts()`
and `Response::from_parts()` calls up. `read_response()` is renamed to
`read_body()`.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 22, 2025
1 parent 8169b3e commit 89497b4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 38 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1806,6 +1806,7 @@ dependencies = [
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
"pin-project",
"thiserror 2.0.11",
"tokio",
"tower",
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ futures = { version = "0.3", default-features = false }
http-body = { workspace = true }
http = { workspace = true }
parking_lot = "0.12"
pin-project = "1"
tokio = { version = "1", features = ["macros", "rt"] }
tower = { version = "0.4", features = ["retry"] }
tracing = "0.1"
Expand Down
97 changes: 59 additions & 38 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures::{
};
use http_body::Body;
use linkerd_http_box::BoxBody;
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
Expand All @@ -15,10 +16,12 @@ use std::{
///
/// If the first frame of the body stream was *not* a `TRAILERS` frame, this
/// behaves identically to a normal body.
#[pin_project(project = Projection)]
pub struct PeekTrailersBody<B: Body = BoxBody> {
/// The inner [`Body`].
///
/// This is the request or response body whose trailers are being peeked.
#[pin]
inner: B,

/// The first DATA frame received from the inner body, or an error that
Expand All @@ -41,10 +44,13 @@ pub struct PeekTrailersBody<B: Body = BoxBody> {
trailers: Option<Result<Option<http::HeaderMap>, B::Error>>,
}

pub type WithPeekTrailersBody<B> = Either<
futures::future::Ready<http::Response<PeekTrailersBody<B>>>,
Pin<Box<dyn Future<Output = http::Response<PeekTrailersBody<B>>> + Send + 'static>>,
>;
/// A future that yields a response instrumented with [`PeekTrailersBody<B>`].
pub type WithPeekTrailersBody<B> = Either<ReadyResponse<B>, ReadingResponse<B>>;
/// A future that immediately yields a response.
type ReadyResponse<B> = future::Ready<http::Response<PeekTrailersBody<B>>>;
/// A boxed future that must poll a body before yielding a response.
type ReadingResponse<B> =
Pin<Box<dyn Future<Output = http::Response<PeekTrailersBody<B>>> + Send + 'static>>;

// === impl WithTrailers ===

Expand Down Expand Up @@ -81,46 +87,51 @@ impl<B: Body> PeekTrailersBody<B> {
}

// Otherwise, return a future that tries to read the next frame.
Either::Right(Box::pin(Self::read_response(rsp)))
Either::Right(Box::pin(async move {
let (parts, body) = rsp.into_parts();
let body = Self::read_body(body).await;
http::Response::from_parts(parts, body)
}))
}

async fn read_response(rsp: http::Response<B>) -> http::Response<Self>
async fn read_body(mut body: B) -> Self
where
B: Send + Unpin,
B::Data: Send + Unpin,
B::Error: Send,
{
let (parts, body) = rsp.into_parts();
let mut body = Self {
inner: body,
first_data: None,
trailers: None,
};

// First, poll the body for its first frame.
tracing::debug!("Buffering first data frame");
if let Some(data) = body.inner.data().await {
// The body has data; stop waiting for trailers.
body.first_data = Some(data);

// Peek to see if there's immediately a trailers frame, and grab
// it if so. Otherwise, bail.
// XXX(eliza): the documentation for the `http::Body` trait says
// that `poll_trailers` should only be called after `poll_data`
// returns `None`...but, in practice, I'm fairly sure that this just
// means that it *will not return `Ready`* until there are no data
// frames left, which is fine for us here, because we `now_or_never`
// it.
body.trailers = body.inner.trailers().now_or_never();
let first_data = body.data().await;

// Now, inspect the frame yielded. If the body yielded a data frame, we will only peek
// the trailers if they are immediately available. If the body did not yield a data frame,
// we will poll a future to yield the trailers.
let trailers = if first_data.is_some() {
// The body has data; stop waiting for trailers. Peek to see if there's immediately a
// trailers frame, and grab it if so. Otherwise, bail.
//
// XXX(eliza): the documentation for the `http::Body` trait says that `poll_trailers`
// should only be called after `poll_data` returns `None`...but, in practice, I'm
// fairly sure that this just means that it *will not return `Ready`* until there are
// no data frames left, which is fine for us here, because we `now_or_never` it.
body.trailers().now_or_never()
} else {
// Okay, `poll_data` has returned `None`, so there are no data
// frames left. Let's see if there's trailers...
body.trailers = Some(body.inner.trailers().await);
}
if body.trailers.is_some() {
// Okay, `poll_data` has returned `None`, so there are no data frames left. Let's see
// if there's trailers...
let trls = body.trailers().await;
Some(trls)
};

if trailers.is_some() {
tracing::debug!("Buffered trailers frame");
}

http::Response::from_parts(parts, body)
Self {
inner: body,
first_data,
trailers,
}
}

/// Returns a response with an inert [`PeekTrailersBody<B>`].
Expand Down Expand Up @@ -148,24 +159,34 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let this = self.get_mut();
if let Some(first_data) = this.first_data.take() {
let Projection {
inner,
first_data,
trailers: _,
} = self.project();

if let Some(first_data) = first_data.take() {
return Poll::Ready(Some(first_data));
}

Pin::new(&mut this.inner).poll_data(cx)
inner.poll_data(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let this = self.get_mut();
if let Some(trailers) = this.trailers.take() {
let Projection {
inner,
first_data: _,
trailers,
} = self.project();

if let Some(trailers) = trailers.take() {
return Poll::Ready(trailers);
}

Pin::new(&mut this.inner).poll_trailers(cx)
inner.poll_trailers(cx)
}

#[inline]
Expand Down

0 comments on commit 89497b4

Please sign in to comment.