Skip to content

Commit

Permalink
Merge branch 'master' into malformed-400-reset
Browse files Browse the repository at this point in the history
  • Loading branch information
franfastly authored Aug 7, 2024
2 parents 2e62b78 + 36cf4f2 commit 3074855
Show file tree
Hide file tree
Showing 28 changed files with 221 additions and 67 deletions.
19 changes: 5 additions & 14 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,6 @@ jobs:
run: ./ci/h2spec.sh
if: matrix.rust == 'stable'

unexpected-cfgs:
runs-on: ubuntu-latest
needs: [style]
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
- uses: Swatinem/rust-cache@v2
- run: cargo check --all-features
env:
RUSTFLAGS: >-
-D unexpected_cfgs
--cfg h2_internal_check_unexpected_cfgs
--check-cfg=cfg(h2_internal_check_unexpected_cfgs,fuzzing)
#clippy_check:
# runs-on: ubuntu-latest
# steps:
Expand Down Expand Up @@ -102,6 +88,11 @@ jobs:
with:
toolchain: ${{ steps.msrv.outputs.version }}

- name: Make sure tokio 1.38.1 is used for MSRV
run: |
cargo update
cargo update --package tokio --precise 1.38.1
- run: cargo check -p h2

minimal-versions:
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ env_logger = { version = "0.10", default-features = false }
tokio-rustls = "0.26"
webpki-roots = "0.26"

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ["cfg(fuzzing)"] }

[package.metadata.docs.rs]
features = ["stream"]

Expand Down
13 changes: 11 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::usize;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing::Instrument;

Expand Down Expand Up @@ -548,6 +547,16 @@ where
pub fn is_extended_connect_protocol_enabled(&self) -> bool {
self.inner.is_extended_connect_protocol_enabled()
}

/// Returns the current max send streams
pub fn current_max_send_streams(&self) -> usize {
self.inner.current_max_send_streams()
}

/// Returns the current max recv streams
pub fn current_max_recv_streams(&self) -> usize {
self.inner.current_max_recv_streams()
}
}

impl<B> fmt::Debug for SendRequest<B>
Expand Down Expand Up @@ -1060,7 +1069,7 @@ impl Builder {
///
/// This function panics if `max` is larger than `u32::MAX`.
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
assert!(max <= std::u32::MAX as usize);
assert!(max <= u32::MAX as usize);
self.max_send_buffer_size = max;
self
}
Expand Down
8 changes: 3 additions & 5 deletions src/frame/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,10 @@ impl Pseudo {

let path = if !path.is_empty() {
path
} else if method == Method::OPTIONS {
BytesStr::from_static("*")
} else {
if method == Method::OPTIONS {
BytesStr::from_static("*")
} else {
BytesStr::from_static("/")
}
BytesStr::from_static("/")
};

(parts.scheme, Some(path))
Expand Down
2 changes: 0 additions & 2 deletions src/frame/stream_id.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::u32;

/// A stream identifier, as described in [Section 5.1.1] of RFC 7540.
///
/// Streams are identified with an unsigned 31-bit integer. Streams
Expand Down
2 changes: 1 addition & 1 deletion src/hpack/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ mod test {

fn encode(e: &mut Encoder, hdrs: Vec<Header<Option<HeaderName>>>) -> BytesMut {
let mut dst = BytesMut::with_capacity(1024);
e.encode(&mut hdrs.into_iter(), &mut dst);
e.encode(hdrs, &mut dst);
dst
}

Expand Down
2 changes: 1 addition & 1 deletion src/hpack/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use http::method::Method;

use std::collections::VecDeque;
use std::hash::{Hash, Hasher};
use std::{cmp, mem, usize};
use std::{cmp, mem};

/// HPACK encoder table
#[derive(Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/hpack/test/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn test_story(story: Value) {
})
.collect();

encoder.encode(&mut input.clone().into_iter(), &mut buf);
encoder.encode(input.clone(), &mut buf);

decoder
.decode(&mut Cursor::new(&mut buf), |e| {
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
//! library will start the handshake process, which consists of:
//!
//! * The client sends the connection preface (a predefined sequence of 24
//! octets).
//! octets).
//! * Both the client and the server sending a SETTINGS frame.
//!
//! See the [Starting HTTP/2] in the specification for more details.
Expand Down Expand Up @@ -85,7 +85,6 @@
clippy::undocumented_unsafe_blocks
)]
#![allow(clippy::type_complexity, clippy::manual_range_contains)]
#![cfg_attr(not(h2_internal_check_unexpected_cfgs), allow(unexpected_cfgs))]
#![cfg_attr(test, deny(warnings))]

macro_rules! proto_err {
Expand Down
2 changes: 0 additions & 2 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use super::*;

use std::usize;

#[derive(Debug)]
pub(super) struct Counts {
/// Acting as a client or server. This allows us to track which values to
Expand Down
5 changes: 1 addition & 4 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,7 @@ impl Prioritize {
}),
None => {
if let Some(reason) = stream.state.get_scheduled_reset() {
let stream_id = stream.id;
stream
.state
.set_reset(stream_id, reason, Initiator::Library);
stream.set_reset(reason, Initiator::Library);

let frame = frame::Reset::new(stream.id, reason);
Frame::Reset(frame)
Expand Down
12 changes: 8 additions & 4 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl Recv {
let is_open = stream.state.ensure_recv_open()?;

if is_open {
stream.recv_task = Some(cx.waker().clone());
stream.push_task = Some(cx.waker().clone());
Poll::Pending
} else {
Poll::Ready(None)
Expand Down Expand Up @@ -719,16 +719,16 @@ impl Recv {
// > that it cannot process.
//
// So, if peer is a server, we'll send a 431. In either case,
// an error is recorded, which will send a REFUSED_STREAM,
// an error is recorded, which will send a PROTOCOL_ERROR,
// since we don't want any of the data frames either.
tracing::debug!(
"stream error REFUSED_STREAM -- recv_push_promise: \
"stream error PROTOCOL_ERROR -- recv_push_promise: \
headers frame is over size; promised_id={:?};",
frame.promised_id(),
);
return Err(Error::library_reset(
frame.promised_id(),
Reason::REFUSED_STREAM,
Reason::PROTOCOL_ERROR,
));
}

Expand Down Expand Up @@ -760,6 +760,7 @@ impl Recv {
.pending_recv
.push_back(&mut self.buffer, Event::Headers(Server(req)));
stream.notify_recv();
stream.notify_push();
Ok(())
}

Expand Down Expand Up @@ -814,6 +815,7 @@ impl Recv {

stream.notify_send();
stream.notify_recv();
stream.notify_push();

Ok(())
}
Expand All @@ -826,6 +828,7 @@ impl Recv {
// If a receiver is waiting, notify it
stream.notify_send();
stream.notify_recv();
stream.notify_push();
}

pub fn go_away(&mut self, last_processed_id: StreamId) {
Expand All @@ -837,6 +840,7 @@ impl Recv {
stream.state.recv_eof();
stream.notify_send();
stream.notify_recv();
stream.notify_push();
}

pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {
Expand Down
2 changes: 1 addition & 1 deletion src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl Send {
}

// Transition the state to reset no matter what.
stream.state.set_reset(stream_id, reason, initiator);
stream.set_reset(reason, initiator);

// If closed AND the send queue is flushed, then the stream cannot be
// reset explicitly, either. Implicit resets can still be queued.
Expand Down
21 changes: 20 additions & 1 deletion src/proto/streams/stream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::Reason;

use super::*;

use std::task::{Context, Waker};
use std::time::Instant;
use std::usize;

/// Tracks Stream related state
///
Expand Down Expand Up @@ -105,6 +106,9 @@ pub(super) struct Stream {
/// Task tracking receiving frames
pub recv_task: Option<Waker>,

/// Task tracking pushed promises.
pub push_task: Option<Waker>,

/// The stream's pending push promises
pub pending_push_promises: store::Queue<NextAccept>,

Expand Down Expand Up @@ -187,6 +191,7 @@ impl Stream {
pending_recv: buffer::Deque::new(),
is_recv: true,
recv_task: None,
push_task: None,
pending_push_promises: store::Queue::new(),
content_length: ContentLength::Omitted,
}
Expand Down Expand Up @@ -370,6 +375,20 @@ impl Stream {
task.wake();
}
}

pub(super) fn notify_push(&mut self) {
if let Some(task) = self.push_task.take() {
task.wake();
}
}

/// Set the stream's state to `Closed` with the given reason and initiator.
/// Notify the send and receive tasks, if they exist.
pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
self.state.set_reset(self.id, reason, initiator);
self.notify_push();
self.notify_recv();
}
}

impl store::Next for NextAccept {
Expand Down
16 changes: 13 additions & 3 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,16 @@ where
.send
.is_extended_connect_protocol_enabled()
}

pub fn current_max_send_streams(&self) -> usize {
let me = self.inner.lock().unwrap();
me.counts.max_send_streams()
}

pub fn current_max_recv_streams(&self) -> usize {
let me = self.inner.lock().unwrap();
me.counts.max_recv_streams()
}
}

impl<B> DynStreams<'_, B> {
Expand Down Expand Up @@ -494,15 +504,15 @@ impl Inner {

actions.send.schedule_implicit_reset(
stream,
Reason::REFUSED_STREAM,
Reason::PROTOCOL_ERROR,
counts,
&mut actions.task);

actions.recv.enqueue_reset_expiration(stream, counts);

Ok(())
} else {
Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM))
Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR))
}
},
Err(RecvHeaderBlockError::State(err)) => Err(err),
Expand Down Expand Up @@ -815,7 +825,7 @@ impl Inner {

let parent = &mut self.store.resolve(parent_key);
parent.pending_push_promises = ppp;
parent.notify_recv();
parent.notify_push();
};

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,7 @@ impl Builder {
///
/// This function panics if `max` is larger than `u32::MAX`.
pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
assert!(max <= std::u32::MAX as usize);
assert!(max <= u32::MAX as usize);
self.max_send_buffer_size = max;
self
}
Expand Down
2 changes: 1 addition & 1 deletion src/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub struct RecvStream {
/// * The window size is now 0 bytes. The peer may not send any more data.
/// * [`release_capacity`] is called with 1024.
/// * The receive window size is now 1024 bytes. The peer may now send more
/// data.
/// data.
///
/// [flow control]: ../index.html#flow-control
/// [`release_capacity`]: struct.FlowControl.html#method.release_capacity
Expand Down
Loading

0 comments on commit 3074855

Please sign in to comment.