Skip to content

Commit

Permalink
refactor(http/retry): PeekTrailersBody<B> uses BodyExt::frame()
Browse files Browse the repository at this point in the history
this commit reworks `PeekTrailersBody<B>`.

the most important goal here is replacing the control flow of
`read_body()`, which polls a body using `BodyExt` future combinators
`data()` and `frame()` for up to two frames, with varying levels of
persistence depending on outcomes.

to quote #3556:

> the intent of this type is to only yield the asynchronous task
> responsible for reading the body once. depending on what the inner
> body yields, and when, this can result in combinations of: no data
> frames and no trailers, no data frames with trailers, one data frame
> and no trailers, one data frame with trailers, or two data frames.
> moreover, depending on which of these are yielded, the body will call
> .await some scenarios, and only poll functions once in others.
>
> migrating this to the Frame<T> and poll_frame() style of the 1.0 Body
> interface, away from the 0.4 interface that provides distinct
> poll_data() and poll_trailers() methods, is fundamentally tricky.

this means that `PeekTrailersBody<B>` is notably difficult to port
across the http-body 0.4/1.0 upgrade boundary.

this body middleware must navigate a number of edge conditions, and once
it _has_ obtained a `Frame<T>`, make use of conversion methods to
ascertain whether it is a data or trailers frame, due to the fact that
its internal enum representation is not exposed publicly. one it has
done all of that, it must do the same thing once more to examine the
second frame.

this commit uses the compatibility facilities and backported `Frame<T>`
introduced in the previous commit, and rewrites this control flow using
a form of the `BodyExt::frame()` combinator.

this means that this middleware is forward-compatible with http-body
1.0, which will dramatically simplify the remaining migration work to be
done in #3504.

see linkerd/linkerd2#8733 for more information
and other links related to this ongoing migration work.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Jan 23, 2025
1 parent 8551d5a commit a29df3f
Showing 1 changed file with 193 additions and 104 deletions.
297 changes: 193 additions & 104 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::{
future::{self, Either},
FutureExt,
};
use http::HeaderMap;
use http_body::Body;
use linkerd_http_box::BoxBody;
use pin_project::pin_project;
Expand All @@ -17,31 +18,34 @@ use std::{
/// If the first frame of the body stream was *not* a `TRAILERS` frame, this
/// behaves identically to a normal body.
#[pin_project(project = Projection)]
pub struct PeekTrailersBody<B: Body = BoxBody> {
/// The inner [`Body`].
pub enum PeekTrailersBody<B: Body = BoxBody> {
/// An empty body.
Empty,
/// A body that contains zero or one DATA frame.
///
/// This is the request or response body whose trailers are being peeked.
#[pin]
inner: B,

/// The first DATA frame received from the inner body, or an error that
/// occurred while polling for data.
/// This variant MAY have trailers that can be peeked.
Unary {
data: Option<Result<B::Data, B::Error>>,
trailers: Option<Result<HeaderMap, B::Error>>,
},
/// A body that (potentially) contains more than one DATA frame.
///
/// If this is `None`, then the body has completed without any DATA frames.
first_data: Option<Result<B::Data, B::Error>>,

/// The inner body's trailers, if it was terminated by a `TRAILERS` frame
/// after 0-1 DATA frames, or an error if polling for trailers failed.
/// This variant indicates that the inner body's trailers could not be observed.
Unknown {
first: Option<Result<B::Data, B::Error>>,
second: Option<Result<B::Data, B::Error>>,
/// The inner [`Body`].
#[pin]
inner: B,
},
/// A transparent, inert body.
///
/// Yes, this is a bit of a complex type, so let's break it down:
/// - the outer `Option` indicates whether any trailers were received by
/// `WithTrailers`; if it's `None`, then we don't *know* if the response
/// had trailers, as it is not yet complete.
/// - the inner `Result` and `Option` are the `Result` and `Option` returned
/// by `HttpBody::trailers` on the inner body. If this is `Ok(None)`, then
/// the body has terminated without trailers --- it is *known* to not have
/// trailers.
trailers: Option<Result<Option<http::HeaderMap>, B::Error>>,
/// This variant will not attempt to peek the inner body's trailers.
Passthru {
/// The inner [`Body`].
#[pin]
inner: B,
},
}

/// A future that yields a response instrumented with [`PeekTrailersBody<B>`].
Expand All @@ -60,9 +64,19 @@ impl<B: Body> PeekTrailersBody<B> {
/// This function will return `None` if the body's trailers could not be peeked, or if there
/// were no trailers included.
pub fn peek_trailers(&self) -> Option<&http::HeaderMap> {
self.trailers
.as_ref()
.and_then(|trls| trls.as_ref().ok()?.as_ref())
match self {
Self::Unary {
trailers: Some(Ok(trailers)),
..
} => Some(trailers),
Self::Unary {
trailers: None | Some(Err(_)),
..
}
| Self::Empty
| Self::Unknown { .. }
| Self::Passthru { .. } => None,
}
}

pub fn map_response(rsp: http::Response<B>) -> WithPeekTrailersBody<B>
Expand All @@ -76,14 +90,14 @@ impl<B: Body> PeekTrailersBody<B> {
// If the response isn't an HTTP version that has trailers, skip trying
// to read a trailers frame.
if let Version::HTTP_09 | Version::HTTP_10 | Version::HTTP_11 = rsp.version() {
return Either::Left(future::ready(Self::no_trailers(rsp)));
return Either::Left(future::ready(rsp.map(|inner| Self::Passthru { inner })));
}

// If the response doesn't have a body stream, also skip trying to read
// a trailers frame.
if rsp.is_end_stream() {
tracing::debug!("Skipping trailers for empty body");
return Either::Left(future::ready(Self::no_trailers(rsp)));
return Either::Left(future::ready(rsp.map(|_| Self::Empty)));
}

// Otherwise, return a future that tries to read the next frame.
Expand All @@ -94,55 +108,103 @@ impl<B: Body> PeekTrailersBody<B> {
}))
}

async fn read_body(mut body: B) -> Self
async fn read_body(body: B) -> Self
where
B: Send + Unpin,
B::Data: Send + Unpin,
B::Error: Send,
{
// XXX(kate): for now, wrap this in a compatibility adapter that yields `Frame<T>`s.
// this can be removed when we upgrade to http-body 1.0.
use crate::compat::ForwardCompatibleBody;
let mut body = ForwardCompatibleBody::new(body);

// First, poll the body for its first frame.
tracing::debug!("Buffering first data frame");
let first_data = body.data().await;

// Now, inspect the frame yielded. If the body yielded a data frame, we will only peek
// the trailers if they are immediately available. If the body did not yield a data frame,
// we will poll a future to yield the trailers.
let trailers = if first_data.is_some() {
// The body has data; stop waiting for trailers. Peek to see if there's immediately a
// trailers frame, and grab it if so. Otherwise, bail.
//
// XXX(eliza): the documentation for the `http::Body` trait says that `poll_trailers`
// should only be called after `poll_data` returns `None`...but, in practice, I'm
// fairly sure that this just means that it *will not return `Ready`* until there are
// no data frames left, which is fine for us here, because we `now_or_never` it.
body.trailers().now_or_never()
} else {
// Okay, `poll_data` has returned `None`, so there are no data frames left. Let's see
// if there's trailers...
let trls = body.trailers().await;
Some(trls)
let first_frame = body
.frame()
.map(|f| f.map(|r| r.map(Self::split_frame)))
.await;
let body: Self = match first_frame {
// The body has no frames. It is empty.
None => Self::Empty,
// The body yielded an error. We are done.
Some(Err(error)) => Self::Unary {
data: Some(Err(error)),
trailers: None,
},
// The body yielded a TRAILERS frame. We are done.
Some(Ok(Err(trailers))) => Self::Unary {
data: None,
trailers: Some(Ok(trailers)),
},
// The body yielded a DATA frame. Check for a second frame, without yielding again.
Some(Ok(Ok(first))) => {
if let Some(second) = body
.frame()
.map(|f| f.map(|r| r.map(Self::split_frame)))
.now_or_never()
{
// The second frame is available. Let's inspect it and determine what to do.
match second {
// The body is finished. There is not a TRAILERS frame.
None => Self::Unary {
data: Some(Ok(first)),
trailers: None,
},
// We immediately yielded a result, but it was an error. Alas!
Some(Err(error)) => Self::Unary {
data: Some(Ok(first)),
trailers: Some(Err(error)),
},
// We immediately yielded another frame, but it was a second DATA frame.
// We hold on to each frame, but we cannot wait for the TRAILERS.
Some(Ok(Ok(second))) => Self::Unknown {
first: Some(Ok(first)),
second: Some(Ok(second)),
inner: body.into_inner(),
},
// The body immediately yielded a second TRAILERS frame. Nice!
Some(Ok(Err(trailers))) => Self::Unary {
data: Some(Ok(first)),
trailers: Some(Ok(trailers)),
},
}
} else {
// If we are here, the second frame is not yet available. We cannot be sure
// that a second DATA frame is on the way, and we are no longer willing to
// await additional frames. There are no trailers to peek.
Self::Unknown {
first: None,
second: None,
inner: body.into_inner(),
}
}
}
};

if trailers.is_some() {
if body.peek_trailers().is_some() {
tracing::debug!("Buffered trailers frame");
}

Self {
inner: body,
first_data,
trailers,
}
body
}

/// Returns a response with an inert [`PeekTrailersBody<B>`].
/// Splits a `Frame<T>` into a `Result<T, E>`.
///
/// This will not peek the inner body's trailers.
fn no_trailers(rsp: http::Response<B>) -> http::Response<Self> {
rsp.map(|inner| Self {
inner,
first_data: None,
trailers: None,
})
/// Frames do not expose their inner enums, and instead expose `into_data()` and
/// `into_trailers()` methods. This function will return `Ok(data)` if it is given a DATA
/// frame, and `Err(trailers)` if it is given a TRAILERS frame.
///
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(frame: crate::compat::Frame<B::Data>) -> Result<B::Data, HeaderMap> {
frame
.into_data()
.map_err(|frame| match frame.into_trailers() {
Ok(trls) => trls,
// Safety: this is not reachable, we called `into_data()` above.
Err(_) => unreachable!("into_data() and `into_trailers()` both returned `Err(_)`"),
})
}
}

Expand All @@ -159,68 +221,95 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let Projection {
inner,
first_data,
trailers: _,
} = self.project();

if let Some(first_data) = first_data.take() {
return Poll::Ready(Some(first_data));
let this = self.project();
match this {
Projection::Empty => Poll::Ready(None),
Projection::Passthru { inner } => inner.poll_data(cx),
Projection::Unary { data, .. } => Poll::Ready(data.take()),
Projection::Unknown {
first,
second,
inner,
} => {
if let data @ Some(_) = first.take().or_else(|| second.take()) {
Poll::Ready(data)
} else {
inner.poll_data(cx)
}
}
}

inner.poll_data(cx)
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let Projection {
inner,
first_data: _,
trailers,
} = self.project();

if let Some(trailers) = trailers.take() {
return Poll::Ready(trailers);
let this = self.project();
match this {
Projection::Empty => Poll::Ready(Ok(None)),
Projection::Passthru { inner } => inner.poll_trailers(cx),
Projection::Unary { trailers, .. } => Poll::Ready(trailers.take().transpose()),
Projection::Unknown { inner, .. } => inner.poll_trailers(cx),
}

inner.poll_trailers(cx)
}

#[inline]
fn is_end_stream(&self) -> bool {
let Self {
inner,
first_data,
trailers,
} = self;

let trailers_finished = match trailers {
Some(Ok(Some(_)) | Err(_)) => false,
None | Some(Ok(None)) => true,
};

first_data.is_none() && trailers_finished && inner.is_end_stream()
match self {
Self::Empty => true,
Self::Passthru { inner } => inner.is_end_stream(),
Self::Unary {
data: None,
trailers: None,
} => true,
Self::Unary { .. } => false,
Self::Unknown {
inner,
first: None,
second: None,
} => inner.is_end_stream(),
Self::Unknown { .. } => false,
}
}

#[inline]
fn size_hint(&self) -> http_body::SizeHint {
use bytes::Buf;

let mut hint = self.inner.size_hint();
// If we're holding onto a chunk of data, add its length to the inner
// `Body`'s size hint.
if let Some(Ok(chunk)) = self.first_data.as_ref() {
let buffered = chunk.remaining() as u64;
if let Some(upper) = hint.upper() {
hint.set_upper(upper + buffered);
match self {
Self::Empty => http_body::SizeHint::new(),
Self::Passthru { inner } => inner.size_hint(),
Self::Unary {
data: Some(Ok(data)),
..
} => {
let size = data.remaining() as u64;
http_body::SizeHint::with_exact(size)
}
Self::Unary {
data: None | Some(Err(_)),
..
} => http_body::SizeHint::new(),
Self::Unknown {
first,
second,
inner,
} => {
// Add any frames we've buffered to the inner `Body`'s size hint.
let mut hint = inner.size_hint();
let mut add_to_hint = |frame: &Option<Result<B::Data, B::Error>>| {
if let Some(Ok(buf)) = frame {
let size = buf.remaining() as u64;
if let Some(upper) = hint.upper() {
hint.set_upper(upper + size);
}
hint.set_lower(hint.lower() + size);
}
};
add_to_hint(first);
add_to_hint(second);
hint
}
hint.set_lower(hint.lower() + buffered);
}

hint
}
}

Expand Down

0 comments on commit a29df3f

Please sign in to comment.