Skip to content

Commit

Permalink
Respond 400 + Reset to malformed request (hyperium#747)
Browse files Browse the repository at this point in the history
`h2` returns `RST_STREAM` frames with the `PROTOCOL_ERROR` bit set as a
response to many types of errors in client requests. Many of those cases,
when handled by an HTTP/1 server such as the one used in `hyper`, would
result in an HTTP 400 Bad Request response returned to the client rather
than a TCP reset (the HTTP/1 equivalent of a `RST_STREAM`). As a
consequence, a client will observe differences in behaviour between
HTTP/1 and HTTP/2: a malformed request will result in a 400 response when
HTTP/1 is used whereas the same request will result in a reset stream
with `h2`.

Make `h2` reply a `HEADERS`+`400`+`END_STREAM` frame followed by a
`RST_STREAM`+`PROTOCOL_ERROR` frame to all the `malformed!()` macro
invocations in `Peer::convert_poll_message()` in `src/server.rs`.

The `Reset` variant in the `h2::proto::error::Error` Enum now contains an
`Option<http::StatusCode>` value that is set by the `malformed!()` macro
with a `Some(400)`. That value is propagated all the way until
`h2::proto::streams::send::Send::send_reset()` where, if not `None`, a
`h2::frame::headers::Headers` frame with the status code and the
`END_STREAM` flag set will now be sent to the client before the
`RST_STREAM` frame.

Some of the parameters passed to `send_reset()` have been grouped into a
new `SendResetContext` Struct in order to avoid a
`clippy::too_many_arguments` lint.

Tests where malformed requests were sent have been updated to check that
an additional  `HEADERS`+`400`+`END_STREAM` frame is now received before
the `RST_STREAM + PROTOCOL_ERROR` frame.

This change has been validated with  other clients like `curl`, a custom
ad-hoc client written with `python3+httpx` and `varnishtest`.
  • Loading branch information
franfastly committed Feb 22, 2024
1 parent f87eca0 commit f98522a
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 32 deletions.
19 changes: 11 additions & 8 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::frame::StreamId;
use crate::proto::{self, Initiator};

use bytes::Bytes;
use http::StatusCode;
use std::{error, fmt, io};

pub use crate::frame::Reason;
Expand All @@ -26,7 +27,7 @@ pub struct Error {
enum Kind {
/// A RST_STREAM frame was received or sent.
#[allow(dead_code)]
Reset(StreamId, Reason, Initiator),
Reset(StreamId, Reason, Initiator, Option<StatusCode>),

/// A GO_AWAY frame was received or sent.
GoAway(Bytes, Reason, Initiator),
Expand All @@ -51,7 +52,7 @@ impl Error {
/// action taken by the peer (i.e. a protocol error).
pub fn reason(&self) -> Option<Reason> {
match self.kind {
Kind::Reset(_, reason, _) | Kind::GoAway(_, reason, _) | Kind::Reason(reason) => {
Kind::Reset(_, reason, _, _) | Kind::GoAway(_, reason, _) | Kind::Reason(reason) => {
Some(reason)
}
_ => None,
Expand Down Expand Up @@ -101,7 +102,7 @@ impl Error {
pub fn is_remote(&self) -> bool {
matches!(
self.kind,
Kind::GoAway(_, _, Initiator::Remote) | Kind::Reset(_, _, Initiator::Remote)
Kind::GoAway(_, _, Initiator::Remote) | Kind::Reset(_, _, Initiator::Remote, _)
)
}

Expand All @@ -111,7 +112,7 @@ impl Error {
pub fn is_library(&self) -> bool {
matches!(
self.kind,
Kind::GoAway(_, _, Initiator::Library) | Kind::Reset(_, _, Initiator::Library)
Kind::GoAway(_, _, Initiator::Library) | Kind::Reset(_, _, Initiator::Library, _)
)
}
}
Expand All @@ -122,7 +123,9 @@ impl From<proto::Error> for Error {

Error {
kind: match src {
Reset(stream_id, reason, initiator) => Kind::Reset(stream_id, reason, initiator),
Reset(stream_id, reason, initiator, status_code) => {
Kind::Reset(stream_id, reason, initiator, status_code)
}
GoAway(debug_data, reason, initiator) => {
Kind::GoAway(debug_data, reason, initiator)
}
Expand Down Expand Up @@ -162,13 +165,13 @@ impl From<UserError> for Error {
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
let debug_data = match self.kind {
Kind::Reset(_, reason, Initiator::User) => {
Kind::Reset(_, reason, Initiator::User, _) => {
return write!(fmt, "stream error sent by user: {}", reason)
}
Kind::Reset(_, reason, Initiator::Library) => {
Kind::Reset(_, reason, Initiator::Library, _) => {
return write!(fmt, "stream error detected: {}", reason)
}
Kind::Reset(_, reason, Initiator::Remote) => {
Kind::Reset(_, reason, Initiator::Remote, _) => {
return write!(fmt, "stream error received: {}", reason)
}
Kind::GoAway(ref debug_data, reason, Initiator::User) => {
Expand Down
2 changes: 1 addition & 1 deletion src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ where
// Attempting to read a frame resulted in a stream level error.
// This is handled by resetting the frame then trying to read
// another frame.
Err(Error::Reset(id, reason, initiator)) => {
Err(Error::Reset(id, reason, initiator, _)) => {
debug_assert_eq!(initiator, Initiator::Library);
tracing::trace!(?id, ?reason, "stream error");
self.streams.send_reset(id, reason);
Expand Down
19 changes: 14 additions & 5 deletions src/proto/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use crate::codec::SendError;
use crate::frame::{Reason, StreamId};

use bytes::Bytes;
use http::StatusCode;
use std::fmt;
use std::io;

/// Either an H2 reason or an I/O error
#[derive(Clone, Debug)]
pub enum Error {
Reset(StreamId, Reason, Initiator),
Reset(StreamId, Reason, Initiator, Option<StatusCode>),
GoAway(Bytes, Reason, Initiator),
Io(io::ErrorKind, Option<String>),
}
Expand All @@ -23,7 +24,7 @@ pub enum Initiator {
impl Error {
pub(crate) fn is_local(&self) -> bool {
match *self {
Self::Reset(_, _, initiator) | Self::GoAway(_, _, initiator) => initiator.is_local(),
Self::Reset(_, _, initiator, _) | Self::GoAway(_, _, initiator) => initiator.is_local(),
Self::Io(..) => true,
}
}
Expand All @@ -33,7 +34,15 @@ impl Error {
}

pub(crate) fn library_reset(stream_id: StreamId, reason: Reason) -> Self {
Self::Reset(stream_id, reason, Initiator::Library)
Self::Reset(stream_id, reason, Initiator::Library, None)
}

pub(crate) fn library_reset_with_status_code(
stream_id: StreamId,
reason: Reason,
status_code: StatusCode,
) -> Self {
Self::Reset(stream_id, reason, Initiator::Library, Some(status_code))
}

pub(crate) fn library_go_away(reason: Reason) -> Self {
Expand All @@ -45,7 +54,7 @@ impl Error {
}

pub(crate) fn remote_reset(stream_id: StreamId, reason: Reason) -> Self {
Self::Reset(stream_id, reason, Initiator::Remote)
Self::Reset(stream_id, reason, Initiator::Remote, None)
}

pub(crate) fn remote_go_away(debug_data: Bytes, reason: Reason) -> Self {
Expand All @@ -65,7 +74,7 @@ impl Initiator {
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
Self::Reset(_, reason, _) | Self::GoAway(_, reason, _) => reason.fmt(fmt),
Self::Reset(_, reason, _, _) | Self::GoAway(_, reason, _) => reason.fmt(fmt),
Self::Io(_, Some(ref inner)) => inner.fmt(fmt),
Self::Io(kind, None) => io::Error::from(kind).fmt(fmt),
}
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub use self::error::{Error, Initiator};
pub(crate) use self::peer::{Dyn as DynPeer, Peer};
pub(crate) use self::ping_pong::UserPings;
pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams};
pub(crate) use self::streams::{Open, PollReset, Prioritized};
pub(crate) use self::streams::{Open, PollReset, Prioritized, SendResetContext};

use crate::codec::Codec;

Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod streams;

pub(crate) use self::prioritize::Prioritized;
pub(crate) use self::recv::Open;
pub(crate) use self::send::PollReset;
pub(crate) use self::send::{PollReset, SendResetContext};
pub(crate) use self::streams::{DynStreams, OpaqueStreamRef, StreamRef, Streams};

use self::buffer::Buffer;
Expand Down
57 changes: 51 additions & 6 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,33 @@ pub(crate) enum PollReset {
Streaming,
}

/// Context for `send_reset`.
pub(crate) struct SendResetContext {
reason: Reason,
initiator: Initiator,
status_code: Option<http::StatusCode>,
}

impl SendResetContext {
/// Create a new `SendResetContext` with and optional `http::StatusCode`.
pub(crate) fn with_status_code(
reason: Reason,
initiator: Initiator,
status_code: Option<http::StatusCode>,
) -> Self {
Self {
reason,
initiator,
status_code,
}
}

/// Create a new `SendResetContext`
pub(crate) fn new(reason: Reason, initiator: Initiator) -> Self {
Self::with_status_code(reason, initiator, None)
}
}

impl Send {
/// Create a new `Send`
pub fn new(config: &Config) -> Self {
Expand Down Expand Up @@ -170,8 +197,7 @@ impl Send {
/// Send an explicit RST_STREAM frame
pub fn send_reset<B>(
&mut self,
reason: Reason,
initiator: Initiator,
context: SendResetContext,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
Expand All @@ -182,18 +208,25 @@ impl Send {
let is_empty = stream.pending_send.is_empty();
let stream_id = stream.id;

let SendResetContext {
reason,
initiator,
status_code,
} = context;

tracing::trace!(
"send_reset(..., reason={:?}, initiator={:?}, stream={:?}, ..., \
is_reset={:?}; is_closed={:?}; pending_send.is_empty={:?}; \
state={:?} \
state={:?}; status_code={:?} \
",
reason,
initiator,
stream_id,
is_reset,
is_closed,
is_empty,
stream.state
stream.state,
status_code
);

if is_reset {
Expand Down Expand Up @@ -225,6 +258,19 @@ impl Send {
// `reclaim_all_capacity`.
self.prioritize.clear_queue(buffer, stream);

// For malformed requests, a server may send an HTTP response prior to resetting the stream.
if let Some(status_code) = status_code {
tracing::trace!("send_reset -- sending response with status code: {status_code}");
let pseudo = frame::Pseudo::response(status_code);
let fields = http::HeaderMap::default();
let mut frame = frame::Headers::new(stream.id, pseudo, fields);
frame.set_end_stream();

tracing::trace!("send_reset -- queueing response; frame={:?}", frame);
self.prioritize
.queue_frame(frame.into(), buffer, stream, task);
}

let frame = frame::Reset::new(stream.id, reason);

tracing::trace!("send_reset -- queueing; frame={:?}", frame);
Expand Down Expand Up @@ -378,8 +424,7 @@ impl Send {
tracing::debug!("recv_stream_window_update !!; err={:?}", e);

self.send_reset(
Reason::FLOW_CONTROL_ERROR,
Initiator::Library,
SendResetContext::new(Reason::FLOW_CONTROL_ERROR, Initiator::Library),
buffer,
stream,
counts,
Expand Down
8 changes: 5 additions & 3 deletions src/proto/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ impl State {

/// Set the stream state to reset locally.
pub fn set_reset(&mut self, stream_id: StreamId, reason: Reason, initiator: Initiator) {
self.inner = Closed(Cause::Error(Error::Reset(stream_id, reason, initiator)));
self.inner = Closed(Cause::Error(Error::Reset(
stream_id, reason, initiator, None,
)));
}

/// Set the stream state to a scheduled reset.
Expand Down Expand Up @@ -364,7 +366,7 @@ impl State {
pub fn is_remote_reset(&self) -> bool {
matches!(
self.inner,
Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote)))
Closed(Cause::Error(Error::Reset(_, _, Initiator::Remote, _)))
)
}

Expand Down Expand Up @@ -446,7 +448,7 @@ impl State {
/// Returns a reason if the stream has been reset.
pub(super) fn ensure_reason(&self, mode: PollReset) -> Result<Option<Reason>, crate::Error> {
match self.inner {
Closed(Cause::Error(Error::Reset(_, reason, _)))
Closed(Cause::Error(Error::Reset(_, reason, _, _)))
| Closed(Cause::Error(Error::GoAway(_, reason, _)))
| Closed(Cause::ScheduledLibraryReset(reason)) => Ok(Some(reason)),
Closed(Cause::Error(ref e)) => Err(e.clone().into()),
Expand Down
14 changes: 9 additions & 5 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1537,8 +1537,7 @@ impl Actions {
) {
counts.transition(stream, |counts, stream| {
self.send.send_reset(
reason,
initiator,
proto::SendResetContext::new(reason, initiator),
send_buffer,
stream,
counts,
Expand All @@ -1557,15 +1556,20 @@ impl Actions {
counts: &mut Counts,
res: Result<(), Error>,
) -> Result<(), Error> {
if let Err(Error::Reset(stream_id, reason, initiator)) = res {
if let Err(Error::Reset(stream_id, reason, initiator, status_code)) = res {
debug_assert_eq!(stream_id, stream.id);

if counts.can_inc_num_local_error_resets() {
counts.inc_num_local_error_resets();

// Reset the stream.
self.send
.send_reset(reason, initiator, buffer, stream, counts, &mut self.task);
self.send.send_reset(
proto::SendResetContext::with_status_code(reason, initiator, status_code),
buffer,
stream,
counts,
&mut self.task,
);
Ok(())
} else {
tracing::warn!(
Expand Down
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ use crate::proto::{self, Config, Error, Prioritized};
use crate::{FlowControl, PingPong, RecvStream, SendStream};

use bytes::{Buf, Bytes};
use http::{HeaderMap, Method, Request, Response};
use http::{HeaderMap, Method, Request, Response, StatusCode};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -1517,7 +1517,7 @@ impl proto::Peer for Peer {
macro_rules! malformed {
($($arg:tt)*) => {{
tracing::debug!($($arg)*);
return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
return Err(Error::library_reset_with_status_code(stream_id, Reason::PROTOCOL_ERROR, StatusCode::BAD_REQUEST));
}}
}

Expand Down
12 changes: 12 additions & 0 deletions tests/h2-tests/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,9 @@ async fn recv_invalid_authority() {
let settings = client.assert_server_handshake().await;
assert_default_settings!(settings);
client.send_frame(bad_headers).await;
client
.recv_frame(frames::headers(1).status(StatusCode::BAD_REQUEST).eos())
.await;
client.recv_frame(frames::reset(1).protocol_error()).await;
};

Expand Down Expand Up @@ -1290,6 +1293,9 @@ async fn reject_pseudo_protocol_on_non_connect_request() {
)
.await;

client
.recv_frame(frames::headers(1).status(StatusCode::BAD_REQUEST).eos())
.await;
client.recv_frame(frames::reset(1).protocol_error()).await;
};

Expand Down Expand Up @@ -1329,6 +1335,9 @@ async fn reject_authority_target_on_extended_connect_request() {
)
.await;

client
.recv_frame(frames::headers(1).status(StatusCode::BAD_REQUEST).eos())
.await;
client.recv_frame(frames::reset(1).protocol_error()).await;
};

Expand Down Expand Up @@ -1364,6 +1373,9 @@ async fn reject_non_authority_target_on_connect_request() {
.send_frame(frames::headers(1).request("CONNECT", "https://bread/baguette"))
.await;

client
.recv_frame(frames::headers(1).status(StatusCode::BAD_REQUEST).eos())
.await;
client.recv_frame(frames::reset(1).protocol_error()).await;
};

Expand Down
3 changes: 3 additions & 0 deletions tests/h2-tests/tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ async fn recv_next_stream_id_updated_by_malformed_headers() {
assert_default_settings!(settings);
// bad headers -- should error.
client.send_frame(bad_headers).await;
client
.recv_frame(frames::headers(1).status(StatusCode::BAD_REQUEST).eos())
.await;
client.recv_frame(frames::reset(1).protocol_error()).await;
// this frame is good, but the stream id should already have been incr'd
client
Expand Down

0 comments on commit f98522a

Please sign in to comment.