From 23bf5ab119b2bd72e92a2a8fdb23e2cd6decef13 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 19 Nov 2023 17:49:23 +0100 Subject: [PATCH 1/5] Add `BodyExt::with_trailers` --- http-body-util/Cargo.toml | 2 +- http-body-util/src/combinators/mod.rs | 2 + .../src/combinators/with_trailers.rs | 158 ++++++++++++++++++ http-body-util/src/lib.rs | 44 +++++ 4 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 http-body-util/src/combinators/with_trailers.rs diff --git a/http-body-util/Cargo.toml b/http-body-util/Cargo.toml index c9d36aa..e4110b6 100644 --- a/http-body-util/Cargo.toml +++ b/http-body-util/Cargo.toml @@ -33,4 +33,4 @@ http-body = { version = "1", path = "../http-body" } pin-project-lite = "0.2" [dev-dependencies] -tokio = { version = "1", features = ["macros", "rt"] } +tokio = { version = "1", features = ["macros", "rt", "sync"] } diff --git a/http-body-util/src/combinators/mod.rs b/http-body-util/src/combinators/mod.rs index 0ecdb0b..38d2637 100644 --- a/http-body-util/src/combinators/mod.rs +++ b/http-body-util/src/combinators/mod.rs @@ -5,6 +5,7 @@ mod collect; mod frame; mod map_err; mod map_frame; +mod with_trailers; pub use self::{ box_body::{BoxBody, UnsyncBoxBody}, @@ -12,4 +13,5 @@ pub use self::{ frame::Frame, map_err::MapErr, map_frame::MapFrame, + with_trailers::WithTrailers, }; diff --git a/http-body-util/src/combinators/with_trailers.rs b/http-body-util/src/combinators/with_trailers.rs new file mode 100644 index 0000000..9a9e525 --- /dev/null +++ b/http-body-util/src/combinators/with_trailers.rs @@ -0,0 +1,158 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::ready; +use http::HeaderMap; +use http_body::{Body, Frame}; +use pin_project_lite::pin_project; + +pin_project! { + /// Adds trailers to a body. + /// + /// See [`BodyExt::with_trailers`] for more details. + pub struct WithTrailers { + #[pin] + state: State, + } +} + +impl WithTrailers { + pub(crate) fn new(body: T, trailers: F) -> Self { + Self { + state: State::PollBody { + body, + trailers: Some(trailers), + }, + } + } +} + +pin_project! { + #[project = StateProj] + enum State { + PollBody { + #[pin] + body: T, + trailers: Option, + }, + PollTrailers { + #[pin] + trailers: F, + }, + Trailers { + trailers: Option, + } + } +} + +impl Body for WithTrailers +where + T: Body, + F: Future>>, +{ + type Data = T::Data; + type Error = T::Error; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + loop { + let mut this = self.as_mut().project(); + + let new_state: State<_, _> = match this.state.as_mut().project() { + StateProj::PollBody { body, trailers } => match ready!(body.poll_frame(cx)?) { + Some(frame) => { + return Poll::Ready(Some(Ok(frame))); + } + None => { + let trailers = trailers.take().unwrap(); + State::PollTrailers { trailers } + } + }, + StateProj::PollTrailers { trailers } => { + let trailers = ready!(trailers.poll(cx)?); + State::Trailers { trailers } + } + StateProj::Trailers { trailers } => { + return Poll::Ready(trailers.take().map(Frame::trailers).map(Ok)); + } + }; + + this.state.set(new_state); + } + } + + #[inline] + fn is_end_stream(&self) -> bool { + match &self.state { + State::PollBody { body, .. } => body.is_end_stream(), + State::PollTrailers { .. } | State::Trailers { .. } => true, + } + } + + #[inline] + fn size_hint(&self) -> http_body::SizeHint { + match &self.state { + State::PollBody { body, .. } => body.size_hint(), + State::PollTrailers { .. } | State::Trailers { .. } => Default::default(), + } + } +} + +#[cfg(test)] +mod tests { + use std::convert::Infallible; + + use bytes::Bytes; + use http::{HeaderMap, HeaderName, HeaderValue}; + + use crate::{BodyExt, Full}; + + #[allow(unused_imports)] + use super::*; + + #[tokio::test] + async fn works() { + let mut trailers = HeaderMap::new(); + trailers.insert( + HeaderName::from_static("foo"), + HeaderValue::from_static("bar"), + ); + + let body = + Full::::from("hello").with_trailers(std::future::ready(Some( + Ok::<_, Infallible>(trailers.clone()), + ))); + + futures_util::pin_mut!(body); + let waker = futures_util::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + let data = unwrap_ready(body.as_mut().poll_frame(&mut cx)) + .unwrap() + .unwrap() + .into_data() + .unwrap(); + assert_eq!(data, "hello"); + + let body_trailers = unwrap_ready(body.as_mut().poll_frame(&mut cx)) + .unwrap() + .unwrap() + .into_trailers() + .unwrap(); + assert_eq!(body_trailers, trailers); + + assert!(unwrap_ready(body.as_mut().poll_frame(&mut cx)).is_none()); + } + + fn unwrap_ready(poll: Poll) -> T { + match poll { + Poll::Ready(t) => t, + Poll::Pending => panic!("pending"), + } + } +} diff --git a/http-body-util/src/lib.rs b/http-body-util/src/lib.rs index 059ada6..1b715b3 100644 --- a/http-body-util/src/lib.rs +++ b/http-body-util/src/lib.rs @@ -89,6 +89,50 @@ pub trait BodyExt: http_body::Body { collected: Some(crate::Collected::default()), } } + + /// Add trailers to the body. + /// + /// The trailers will be sent when all previous frames have been sent and the `trailers` future + /// resolves. + /// + /// # Example + /// + /// ``` + /// use http::HeaderMap; + /// use http_body_util::{Full, BodyExt}; + /// use bytes::Bytes; + /// + /// # #[tokio::main] + /// async fn main() { + /// let (tx, rx) = tokio::sync::oneshot::channel::(); + /// + /// let body = Full::::from("Hello, World!") + /// // add trailers via a future + /// .with_trailers(async move { + /// match rx.await { + /// Ok(trailers) => Some(Ok(trailers)), + /// Err(_err) => None, + /// } + /// }); + /// + /// // compute the trailers in the background + /// tokio::spawn(async move { + /// let _ = tx.send(compute_trailers().await); + /// }); + /// + /// async fn compute_trailers() -> HeaderMap { + /// // ... + /// # unimplemented!() + /// } + /// # } + /// ``` + fn with_trailers(self, trailers: F) -> combinators::WithTrailers + where + Self: Sized, + F: std::future::Future>>, + { + combinators::WithTrailers::new(self, trailers) + } } impl BodyExt for T where T: http_body::Body {} From b29f730dc676f330e8b898bf92e353039e2b782b Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 19 Nov 2023 19:23:31 +0100 Subject: [PATCH 2/5] fix tests --- http-body-util/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http-body-util/Cargo.toml b/http-body-util/Cargo.toml index e4110b6..4b733e4 100644 --- a/http-body-util/Cargo.toml +++ b/http-body-util/Cargo.toml @@ -33,4 +33,4 @@ http-body = { version = "1", path = "../http-body" } pin-project-lite = "0.2" [dev-dependencies] -tokio = { version = "1", features = ["macros", "rt", "sync"] } +tokio = { version = "1", features = ["macros", "rt", "sync", "rt-multi-thread"] } From 1fd5c8b5b1c6639a098da56c6233c99e11e2f6e9 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Mon, 20 Nov 2023 15:07:33 +0100 Subject: [PATCH 3/5] merge with trailers from inner body --- .../src/combinators/with_trailers.rs | 81 +++++++++++++++++-- 1 file changed, 74 insertions(+), 7 deletions(-) diff --git a/http-body-util/src/combinators/with_trailers.rs b/http-body-util/src/combinators/with_trailers.rs index 9a9e525..fbaa1da 100644 --- a/http-body-util/src/combinators/with_trailers.rs +++ b/http-body-util/src/combinators/with_trailers.rs @@ -41,6 +41,7 @@ pin_project! { PollTrailers { #[pin] trailers: F, + prev_trailers: Option, }, Trailers { trailers: Option, @@ -65,17 +66,43 @@ where let new_state: State<_, _> = match this.state.as_mut().project() { StateProj::PollBody { body, trailers } => match ready!(body.poll_frame(cx)?) { - Some(frame) => { - return Poll::Ready(Some(Ok(frame))); - } + Some(frame) => match frame.into_trailers() { + Ok(prev_trailers) => { + let trailers = trailers.take().unwrap(); + State::PollTrailers { + trailers, + prev_trailers: Some(prev_trailers), + } + } + Err(frame) => { + return Poll::Ready(Some(Ok(frame))); + } + }, None => { let trailers = trailers.take().unwrap(); - State::PollTrailers { trailers } + State::PollTrailers { + trailers, + prev_trailers: None, + } } }, - StateProj::PollTrailers { trailers } => { + StateProj::PollTrailers { + trailers, + prev_trailers, + } => { let trailers = ready!(trailers.poll(cx)?); - State::Trailers { trailers } + match (trailers, prev_trailers.take()) { + (None, None) => return Poll::Ready(None), + (None, Some(trailers)) | (Some(trailers), None) => State::Trailers { + trailers: Some(trailers), + }, + (Some(new_trailers), Some(mut prev_trailers)) => { + prev_trailers.extend(new_trailers); + State::Trailers { + trailers: Some(prev_trailers), + } + } + } } StateProj::Trailers { trailers } => { return Poll::Ready(trailers.take().map(Frame::trailers).map(Ok)); @@ -110,7 +137,7 @@ mod tests { use bytes::Bytes; use http::{HeaderMap, HeaderName, HeaderValue}; - use crate::{BodyExt, Full}; + use crate::{BodyExt, Empty, Full}; #[allow(unused_imports)] use super::*; @@ -149,6 +176,46 @@ mod tests { assert!(unwrap_ready(body.as_mut().poll_frame(&mut cx)).is_none()); } + #[tokio::test] + async fn merges_trailers() { + let mut trailers_1 = HeaderMap::new(); + trailers_1.insert( + HeaderName::from_static("foo"), + HeaderValue::from_static("bar"), + ); + + let mut trailers_2 = HeaderMap::new(); + trailers_2.insert( + HeaderName::from_static("baz"), + HeaderValue::from_static("qux"), + ); + + let body = Empty::::new() + .with_trailers(std::future::ready(Some(Ok::<_, Infallible>( + trailers_1.clone(), + )))) + .with_trailers(std::future::ready(Some(Ok::<_, Infallible>( + trailers_2.clone(), + )))); + + futures_util::pin_mut!(body); + let waker = futures_util::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + let body_trailers = unwrap_ready(body.as_mut().poll_frame(&mut cx)) + .unwrap() + .unwrap() + .into_trailers() + .unwrap(); + + let mut all_trailers = HeaderMap::new(); + all_trailers.extend(trailers_1); + all_trailers.extend(trailers_2); + assert_eq!(body_trailers, all_trailers); + + assert!(unwrap_ready(body.as_mut().poll_frame(&mut cx)).is_none()); + } + fn unwrap_ready(poll: Poll) -> T { match poll { Poll::Ready(t) => t, From b7dc13a0cdbc57ccf8c2f92a082d8d45e0ccfbaa Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Mon, 20 Nov 2023 15:12:07 +0100 Subject: [PATCH 4/5] clean up setting state --- .../src/combinators/with_trailers.rs | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/http-body-util/src/combinators/with_trailers.rs b/http-body-util/src/combinators/with_trailers.rs index fbaa1da..10f699a 100644 --- a/http-body-util/src/combinators/with_trailers.rs +++ b/http-body-util/src/combinators/with_trailers.rs @@ -43,9 +43,7 @@ pin_project! { trailers: F, prev_trailers: Option, }, - Trailers { - trailers: Option, - } + Done, } } @@ -64,15 +62,15 @@ where loop { let mut this = self.as_mut().project(); - let new_state: State<_, _> = match this.state.as_mut().project() { + match this.state.as_mut().project() { StateProj::PollBody { body, trailers } => match ready!(body.poll_frame(cx)?) { Some(frame) => match frame.into_trailers() { Ok(prev_trailers) => { let trailers = trailers.take().unwrap(); - State::PollTrailers { + this.state.set(State::PollTrailers { trailers, prev_trailers: Some(prev_trailers), - } + }); } Err(frame) => { return Poll::Ready(Some(Ok(frame))); @@ -80,10 +78,10 @@ where }, None => { let trailers = trailers.take().unwrap(); - State::PollTrailers { + this.state.set(State::PollTrailers { trailers, prev_trailers: None, - } + }); } }, StateProj::PollTrailers { @@ -93,23 +91,21 @@ where let trailers = ready!(trailers.poll(cx)?); match (trailers, prev_trailers.take()) { (None, None) => return Poll::Ready(None), - (None, Some(trailers)) | (Some(trailers), None) => State::Trailers { - trailers: Some(trailers), - }, + (None, Some(trailers)) | (Some(trailers), None) => { + this.state.set(State::Done); + return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); + } (Some(new_trailers), Some(mut prev_trailers)) => { prev_trailers.extend(new_trailers); - State::Trailers { - trailers: Some(prev_trailers), - } + this.state.set(State::Done); + return Poll::Ready(Some(Ok(Frame::trailers(prev_trailers)))); } } } - StateProj::Trailers { trailers } => { - return Poll::Ready(trailers.take().map(Frame::trailers).map(Ok)); + StateProj::Done => { + return Poll::Ready(None); } - }; - - this.state.set(new_state); + } } } @@ -117,7 +113,7 @@ where fn is_end_stream(&self) -> bool { match &self.state { State::PollBody { body, .. } => body.is_end_stream(), - State::PollTrailers { .. } | State::Trailers { .. } => true, + State::PollTrailers { .. } | State::Done => true, } } @@ -125,7 +121,7 @@ where fn size_hint(&self) -> http_body::SizeHint { match &self.state { State::PollBody { body, .. } => body.size_hint(), - State::PollTrailers { .. } | State::Trailers { .. } => Default::default(), + State::PollTrailers { .. } | State::Done => Default::default(), } } } From 4e6c15f79526ac73c65fdcc559a5ce9a6b59bbd9 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Mon, 20 Nov 2023 15:51:02 +0100 Subject: [PATCH 5/5] fix `is_end_stream` --- http-body-util/src/combinators/with_trailers.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/http-body-util/src/combinators/with_trailers.rs b/http-body-util/src/combinators/with_trailers.rs index 10f699a..383e1ec 100644 --- a/http-body-util/src/combinators/with_trailers.rs +++ b/http-body-util/src/combinators/with_trailers.rs @@ -109,14 +109,6 @@ where } } - #[inline] - fn is_end_stream(&self) -> bool { - match &self.state { - State::PollBody { body, .. } => body.is_end_stream(), - State::PollTrailers { .. } | State::Done => true, - } - } - #[inline] fn size_hint(&self) -> http_body::SizeHint { match &self.state {