Skip to content

Commit

Permalink
Merge branch 'master' into hyper/test-h2-connect
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn authored May 16, 2024
2 parents 17353c0 + ac84af6 commit 2c24305
Show file tree
Hide file tree
Showing 9 changed files with 669 additions and 81 deletions.
7 changes: 7 additions & 0 deletions benches/support/tokiort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ impl Timer for TokioTimer {
}
}

impl TokioTimer {
/// Create a new TokioTimer
pub fn new() -> Self {
Self {}
}
}

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
pin_project! {
Expand Down
2 changes: 1 addition & 1 deletion examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Handle the connection from the client using HTTP1 and pass any
// HTTP requests received on that connection to the `hello` function
if let Err(err) = http1::Builder::new()
.timer(TokioTimer)
.timer(TokioTimer::new())
.serve_connection(io, service_fn(hello))
.await
{
Expand Down
13 changes: 13 additions & 0 deletions src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,19 @@ impl Sender {
.map_err(|err| err.into_inner().expect("just sent Ok"))
}

#[cfg(feature = "http1")]
pub(crate) fn try_send_trailers(
&mut self,
trailers: HeaderMap,
) -> Result<(), Option<HeaderMap>> {
let tx = match self.trailers_tx.take() {
Some(tx) => tx,
None => return Err(None),
};

tx.send(trailers).map_err(|err| Some(err))
}

#[cfg(test)]
pub(crate) fn abort(mut self) {
self.send_error(crate::Error::new_body_write_aborted());
Expand Down
65 changes: 42 additions & 23 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use bytes::{Buf, Bytes};
use futures_util::ready;
use http::header::{HeaderValue, CONNECTION, TE};
use http::{HeaderMap, Method, Version};
use http_body::Frame;
use httparse::ParserConfig;

use super::io::Buffered;
Expand Down Expand Up @@ -268,10 +269,20 @@ where
self.try_keep_alive(cx);
}
} else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
self.state.reading = Reading::Continue(Decoder::new(msg.decode));
let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
self.state.reading = Reading::Continue(Decoder::new(
msg.decode,
self.state.h1_max_headers,
h1_max_header_size,
));
wants = wants.add(Wants::EXPECT);
} else {
self.state.reading = Reading::Body(Decoder::new(msg.decode));
let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
self.state.reading = Reading::Body(Decoder::new(
msg.decode,
self.state.h1_max_headers,
h1_max_header_size,
));
}

self.state.allow_trailer_fields = msg
Expand Down Expand Up @@ -312,33 +323,41 @@ where
pub(crate) fn poll_read_body(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Bytes>>> {
) -> Poll<Option<io::Result<Frame<Bytes>>>> {
debug_assert!(self.can_read_body());

let (reading, ret) = match self.state.reading {
Reading::Body(ref mut decoder) => {
match ready!(decoder.decode(cx, &mut self.io)) {
Ok(slice) => {
let (reading, chunk) = if decoder.is_eof() {
debug!("incoming body completed");
(
Reading::KeepAlive,
if !slice.is_empty() {
Some(Ok(slice))
} else {
None
},
)
} else if slice.is_empty() {
error!("incoming body unexpectedly ended");
// This should be unreachable, since all 3 decoders
// either set eof=true or return an Err when reading
// an empty slice...
(Reading::Closed, None)
Ok(frame) => {
if frame.is_data() {
let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
let (reading, maybe_frame) = if decoder.is_eof() {
debug!("incoming body completed");
(
Reading::KeepAlive,
if !slice.is_empty() {
Some(Ok(frame))
} else {
None
},
)
} else if slice.is_empty() {
error!("incoming body unexpectedly ended");
// This should be unreachable, since all 3 decoders
// either set eof=true or return an Err when reading
// an empty slice...
(Reading::Closed, None)
} else {
return Poll::Ready(Some(Ok(frame)));
};
(reading, Poll::Ready(maybe_frame))
} else if frame.is_trailers() {
(Reading::Closed, Poll::Ready(Some(Ok(frame))))
} else {
return Poll::Ready(Some(Ok(slice)));
};
(reading, Poll::Ready(chunk))
trace!("discarding unknown frame");
(Reading::Closed, Poll::Ready(None))
}
}
Err(e) => {
debug!("incoming body decode error: {}", e);
Expand Down
Loading

0 comments on commit 2c24305

Please sign in to comment.