diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 1247cac32..0952337e4 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -61,20 +61,6 @@ jobs: run: ./ci/h2spec.sh if: matrix.rust == 'stable' - unexpected-cfgs: - runs-on: ubuntu-latest - needs: [style] - steps: - - uses: actions/checkout@v4 - - uses: dtolnay/rust-toolchain@nightly - - uses: Swatinem/rust-cache@v2 - - run: cargo check --all-features - env: - RUSTFLAGS: >- - -D unexpected_cfgs - --cfg h2_internal_check_unexpected_cfgs - --check-cfg=cfg(h2_internal_check_unexpected_cfgs,fuzzing) - #clippy_check: # runs-on: ubuntu-latest # steps: @@ -102,6 +88,11 @@ jobs: with: toolchain: ${{ steps.msrv.outputs.version }} + - name: Make sure tokio 1.38.1 is used for MSRV + run: | + cargo update + cargo update --package tokio --precise 1.38.1 + - run: cargo check -p h2 minimal-versions: diff --git a/Cargo.toml b/Cargo.toml index cf0b1d702..9d35b5a68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,9 @@ env_logger = { version = "0.10", default-features = false } tokio-rustls = "0.26" webpki-roots = "0.26" +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ["cfg(fuzzing)"] } + [package.metadata.docs.rs] features = ["stream"] diff --git a/src/client.rs b/src/client.rs index 6d2b9690d..ffeda6077 100644 --- a/src/client.rs +++ b/src/client.rs @@ -148,7 +148,6 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use std::usize; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tracing::Instrument; @@ -548,6 +547,16 @@ where pub fn is_extended_connect_protocol_enabled(&self) -> bool { self.inner.is_extended_connect_protocol_enabled() } + + /// Returns the current max send streams + pub fn current_max_send_streams(&self) -> usize { + self.inner.current_max_send_streams() + } + + /// Returns the current max recv streams + pub fn current_max_recv_streams(&self) -> usize { + self.inner.current_max_recv_streams() + } } impl fmt::Debug for SendRequest @@ -1060,7 +1069,7 @@ impl Builder { /// /// This function panics if `max` is larger than `u32::MAX`. pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { - assert!(max <= std::u32::MAX as usize); + assert!(max <= u32::MAX as usize); self.max_send_buffer_size = max; self } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index bb36cd4f0..0c756325f 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -564,12 +564,10 @@ impl Pseudo { let path = if !path.is_empty() { path + } else if method == Method::OPTIONS { + BytesStr::from_static("*") } else { - if method == Method::OPTIONS { - BytesStr::from_static("*") - } else { - BytesStr::from_static("/") - } + BytesStr::from_static("/") }; (parts.scheme, Some(path)) diff --git a/src/frame/stream_id.rs b/src/frame/stream_id.rs index 10a14d3c8..0f427dfb4 100644 --- a/src/frame/stream_id.rs +++ b/src/frame/stream_id.rs @@ -1,5 +1,3 @@ -use std::u32; - /// A stream identifier, as described in [Section 5.1.1] of RFC 7540. /// /// Streams are identified with an unsigned 31-bit integer. Streams diff --git a/src/hpack/encoder.rs b/src/hpack/encoder.rs index bd49056f6..06ccda400 100644 --- a/src/hpack/encoder.rs +++ b/src/hpack/encoder.rs @@ -695,7 +695,7 @@ mod test { fn encode(e: &mut Encoder, hdrs: Vec>>) -> BytesMut { let mut dst = BytesMut::with_capacity(1024); - e.encode(&mut hdrs.into_iter(), &mut dst); + e.encode(hdrs, &mut dst); dst } diff --git a/src/hpack/table.rs b/src/hpack/table.rs index 3e45f413b..d28ec0510 100644 --- a/src/hpack/table.rs +++ b/src/hpack/table.rs @@ -6,7 +6,7 @@ use http::method::Method; use std::collections::VecDeque; use std::hash::{Hash, Hasher}; -use std::{cmp, mem, usize}; +use std::{cmp, mem}; /// HPACK encoder table #[derive(Debug)] diff --git a/src/hpack/test/fixture.rs b/src/hpack/test/fixture.rs index d3f76e3bf..ed2f11c29 100644 --- a/src/hpack/test/fixture.rs +++ b/src/hpack/test/fixture.rs @@ -107,7 +107,7 @@ fn test_story(story: Value) { }) .collect(); - encoder.encode(&mut input.clone().into_iter(), &mut buf); + encoder.encode(input.clone(), &mut buf); decoder .decode(&mut Cursor::new(&mut buf), |e| { diff --git a/src/lib.rs b/src/lib.rs index 1120e1069..3d59ef21e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,7 +41,7 @@ //! library will start the handshake process, which consists of: //! //! * The client sends the connection preface (a predefined sequence of 24 -//! octets). +//! octets). //! * Both the client and the server sending a SETTINGS frame. //! //! See the [Starting HTTP/2] in the specification for more details. @@ -85,7 +85,6 @@ clippy::undocumented_unsafe_blocks )] #![allow(clippy::type_complexity, clippy::manual_range_contains)] -#![cfg_attr(not(h2_internal_check_unexpected_cfgs), allow(unexpected_cfgs))] #![cfg_attr(test, deny(warnings))] macro_rules! proto_err { diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index fdb07f1cd..a214892b6 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -1,7 +1,5 @@ use super::*; -use std::usize; - #[derive(Debug)] pub(super) struct Counts { /// Acting as a client or server. This allows us to track which values to diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 14b37e223..81825f404 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -839,10 +839,7 @@ impl Prioritize { }), None => { if let Some(reason) = stream.state.get_scheduled_reset() { - let stream_id = stream.id; - stream - .state - .set_reset(stream_id, reason, Initiator::Library); + stream.set_reset(reason, Initiator::Library); let frame = frame::Reset::new(stream.id, reason); Frame::Reset(frame) diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs index 46cb87cd0..a70527e2a 100644 --- a/src/proto/streams/recv.rs +++ b/src/proto/streams/recv.rs @@ -296,7 +296,7 @@ impl Recv { let is_open = stream.state.ensure_recv_open()?; if is_open { - stream.recv_task = Some(cx.waker().clone()); + stream.push_task = Some(cx.waker().clone()); Poll::Pending } else { Poll::Ready(None) @@ -719,16 +719,16 @@ impl Recv { // > that it cannot process. // // So, if peer is a server, we'll send a 431. In either case, - // an error is recorded, which will send a REFUSED_STREAM, + // an error is recorded, which will send a PROTOCOL_ERROR, // since we don't want any of the data frames either. tracing::debug!( - "stream error REFUSED_STREAM -- recv_push_promise: \ + "stream error PROTOCOL_ERROR -- recv_push_promise: \ headers frame is over size; promised_id={:?};", frame.promised_id(), ); return Err(Error::library_reset( frame.promised_id(), - Reason::REFUSED_STREAM, + Reason::PROTOCOL_ERROR, )); } @@ -760,6 +760,7 @@ impl Recv { .pending_recv .push_back(&mut self.buffer, Event::Headers(Server(req))); stream.notify_recv(); + stream.notify_push(); Ok(()) } @@ -814,6 +815,7 @@ impl Recv { stream.notify_send(); stream.notify_recv(); + stream.notify_push(); Ok(()) } @@ -826,6 +828,7 @@ impl Recv { // If a receiver is waiting, notify it stream.notify_send(); stream.notify_recv(); + stream.notify_push(); } pub fn go_away(&mut self, last_processed_id: StreamId) { @@ -837,6 +840,7 @@ impl Recv { stream.state.recv_eof(); stream.notify_send(); stream.notify_recv(); + stream.notify_push(); } pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) { diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs index db30c6dac..2a529f207 100644 --- a/src/proto/streams/send.rs +++ b/src/proto/streams/send.rs @@ -239,7 +239,7 @@ impl Send { } // Transition the state to reset no matter what. - stream.state.set_reset(stream_id, reason, initiator); + stream.set_reset(reason, initiator); // If closed AND the send queue is flushed, then the stream cannot be // reset explicitly, either. Implicit resets can still be queued. diff --git a/src/proto/streams/stream.rs b/src/proto/streams/stream.rs index 43e313647..c93c89d2f 100644 --- a/src/proto/streams/stream.rs +++ b/src/proto/streams/stream.rs @@ -1,8 +1,9 @@ +use crate::Reason; + use super::*; use std::task::{Context, Waker}; use std::time::Instant; -use std::usize; /// Tracks Stream related state /// @@ -105,6 +106,9 @@ pub(super) struct Stream { /// Task tracking receiving frames pub recv_task: Option, + /// Task tracking pushed promises. + pub push_task: Option, + /// The stream's pending push promises pub pending_push_promises: store::Queue, @@ -187,6 +191,7 @@ impl Stream { pending_recv: buffer::Deque::new(), is_recv: true, recv_task: None, + push_task: None, pending_push_promises: store::Queue::new(), content_length: ContentLength::Omitted, } @@ -370,6 +375,20 @@ impl Stream { task.wake(); } } + + pub(super) fn notify_push(&mut self) { + if let Some(task) = self.push_task.take() { + task.wake(); + } + } + + /// Set the stream's state to `Closed` with the given reason and initiator. + /// Notify the send and receive tasks, if they exist. + pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) { + self.state.set_reset(self.id, reason, initiator); + self.notify_push(); + self.notify_recv(); + } } impl store::Next for NextAccept { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 233b790c5..aa97e7c70 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -320,6 +320,16 @@ where .send .is_extended_connect_protocol_enabled() } + + pub fn current_max_send_streams(&self) -> usize { + let me = self.inner.lock().unwrap(); + me.counts.max_send_streams() + } + + pub fn current_max_recv_streams(&self) -> usize { + let me = self.inner.lock().unwrap(); + me.counts.max_recv_streams() + } } impl DynStreams<'_, B> { @@ -494,7 +504,7 @@ impl Inner { actions.send.schedule_implicit_reset( stream, - Reason::REFUSED_STREAM, + Reason::PROTOCOL_ERROR, counts, &mut actions.task); @@ -502,7 +512,7 @@ impl Inner { Ok(()) } else { - Err(Error::library_reset(stream.id, Reason::REFUSED_STREAM)) + Err(Error::library_reset(stream.id, Reason::PROTOCOL_ERROR)) } }, Err(RecvHeaderBlockError::State(err)) => Err(err), @@ -815,7 +825,7 @@ impl Inner { let parent = &mut self.store.resolve(parent_key); parent.pending_push_promises = ppp; - parent.notify_recv(); + parent.notify_push(); }; Ok(()) diff --git a/src/server.rs b/src/server.rs index c4685998f..4956a7e4c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -963,7 +963,7 @@ impl Builder { /// /// This function panics if `max` is larger than `u32::MAX`. pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self { - assert!(max <= std::u32::MAX as usize); + assert!(max <= u32::MAX as usize); self.max_send_buffer_size = max; self } diff --git a/src/share.rs b/src/share.rs index fd305708c..c07402af7 100644 --- a/src/share.rs +++ b/src/share.rs @@ -187,7 +187,7 @@ pub struct RecvStream { /// * The window size is now 0 bytes. The peer may not send any more data. /// * [`release_capacity`] is called with 1024. /// * The receive window size is now 1024 bytes. The peer may now send more -/// data. +/// data. /// /// [flow control]: ../index.html#flow-control /// [`release_capacity`]: struct.FlowControl.html#method.release_capacity diff --git a/tests/h2-support/src/future_ext.rs b/tests/h2-support/src/future_ext.rs index 9f659b344..cca18c66e 100644 --- a/tests/h2-support/src/future_ext.rs +++ b/tests/h2-support/src/future_ext.rs @@ -1,7 +1,9 @@ -use futures::FutureExt; +use futures::{FutureExt, TryFuture}; use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::task::{Context, Poll, Wake, Waker}; /// Future extension helpers that are useful for tests pub trait TestFuture: Future { @@ -15,9 +17,140 @@ pub trait TestFuture: Future { { Drive { driver: self, - future: Box::pin(other), + future: other.wakened(), } } + + fn wakened(self) -> Wakened + where + Self: Sized, + { + Wakened { + future: Box::pin(self), + woken: Arc::new(AtomicBool::new(true)), + } + } +} + +/// Wraps futures::future::join to ensure that the futures are only polled if they are woken. +pub fn join( + future1: Fut1, + future2: Fut2, +) -> futures::future::Join, Wakened> +where + Fut1: Future, + Fut2: Future, +{ + futures::future::join(future1.wakened(), future2.wakened()) +} + +/// Wraps futures::future::join3 to ensure that the futures are only polled if they are woken. +pub fn join3( + future1: Fut1, + future2: Fut2, + future3: Fut3, +) -> futures::future::Join3, Wakened, Wakened> +where + Fut1: Future, + Fut2: Future, + Fut3: Future, +{ + futures::future::join3(future1.wakened(), future2.wakened(), future3.wakened()) +} + +/// Wraps futures::future::join4 to ensure that the futures are only polled if they are woken. +pub fn join4( + future1: Fut1, + future2: Fut2, + future3: Fut3, + future4: Fut4, +) -> futures::future::Join4, Wakened, Wakened, Wakened> +where + Fut1: Future, + Fut2: Future, + Fut3: Future, + Fut4: Future, +{ + futures::future::join4( + future1.wakened(), + future2.wakened(), + future3.wakened(), + future4.wakened(), + ) +} + +/// Wraps futures::future::try_join to ensure that the futures are only polled if they are woken. +pub fn try_join( + future1: Fut1, + future2: Fut2, +) -> futures::future::TryJoin, Wakened> +where + Fut1: futures::future::TryFuture + Future, + Fut2: Future, + Wakened: futures::future::TryFuture, + Wakened: futures::future::TryFuture as TryFuture>::Error>, +{ + futures::future::try_join(future1.wakened(), future2.wakened()) +} + +/// Wraps futures::future::select to ensure that the futures are only polled if they are woken. +pub fn select(future1: A, future2: B) -> futures::future::Select, Wakened> +where + A: Future + Unpin, + B: Future + Unpin, +{ + futures::future::select(future1.wakened(), future2.wakened()) +} + +/// Wraps futures::future::join_all to ensure that the futures are only polled if they are woken. +pub fn join_all(iter: I) -> futures::future::JoinAll> +where + I: IntoIterator, + I::Item: Future, +{ + futures::future::join_all(iter.into_iter().map(|f| f.wakened())) +} + +/// A future that only polls the inner future if it has been woken (after the initial poll). +pub struct Wakened { + future: Pin>, + woken: Arc, +} + +/// A future that only polls the inner future if it has been woken (after the initial poll). +impl Future for Wakened +where + T: Future, +{ + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + if !this.woken.load(std::sync::atomic::Ordering::SeqCst) { + return Poll::Pending; + } + this.woken.store(false, std::sync::atomic::Ordering::SeqCst); + let my_waker = IfWokenWaker { + inner: cx.waker().clone(), + wakened: this.woken.clone(), + }; + let my_waker = Arc::new(my_waker).into(); + let mut cx = Context::from_waker(&my_waker); + this.future.as_mut().poll(&mut cx) + } +} + +impl Wake for IfWokenWaker { + fn wake(self: Arc) { + self.wakened + .store(true, std::sync::atomic::Ordering::SeqCst); + self.inner.wake_by_ref(); + } +} + +struct IfWokenWaker { + inner: Waker, + wakened: Arc, } impl TestFuture for T {} @@ -29,7 +162,7 @@ impl TestFuture for T {} /// This is useful for H2 futures that also require the connection to be polled. pub struct Drive<'a, T, U> { driver: &'a mut T, - future: Pin>, + future: Wakened, } impl<'a, T, U> Future for Drive<'a, T, U> diff --git a/tests/h2-support/src/prelude.rs b/tests/h2-support/src/prelude.rs index c40a518da..4338143fd 100644 --- a/tests/h2-support/src/prelude.rs +++ b/tests/h2-support/src/prelude.rs @@ -35,7 +35,7 @@ pub use {bytes, futures, http, tokio::io as tokio_io, tracing, tracing_subscribe pub use futures::{Future, Sink, Stream}; // And our Future extensions -pub use super::future_ext::TestFuture; +pub use super::future_ext::{join, join3, join4, join_all, select, try_join, TestFuture}; // Our client_ext helpers pub use super::client_ext::SendRequestExt; diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 9cc2f91e6..e914d4843 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1,10 +1,10 @@ -use futures::future::{join, ready, select, Either}; +use futures::future::{ready, Either}; use futures::stream::FuturesUnordered; use futures::StreamExt; use h2_support::prelude::*; -use std::io; use std::pin::Pin; use std::task::Context; +use std::{io, panic}; #[tokio::test] async fn handshake() { @@ -844,7 +844,7 @@ async fn recv_too_big_headers() { srv.send_frame(frames::headers(3).response(200)).await; // no reset for 1, since it's closed anyway // but reset for 3, since server hasn't closed stream - srv.recv_frame(frames::reset(3).refused()).await; + srv.recv_frame(frames::reset(3).protocol_error()).await; idle_ms(10).await; }; @@ -861,9 +861,11 @@ async fn recv_too_big_headers() { .unwrap(); let req1 = client.send_request(request, true); + // Spawn tasks to ensure that the error wakes up tasks that are blocked + // waiting for a response. let req1 = async move { let err = req1.expect("send_request").0.await.expect_err("response1"); - assert_eq!(err.reason(), Some(Reason::REFUSED_STREAM)); + assert_eq!(err.reason(), Some(Reason::PROTOCOL_ERROR)); }; let request = Request::builder() @@ -874,12 +876,12 @@ async fn recv_too_big_headers() { let req2 = client.send_request(request, true); let req2 = async move { let err = req2.expect("send_request").0.await.expect_err("response2"); - assert_eq!(err.reason(), Some(Reason::REFUSED_STREAM)); + assert_eq!(err.reason(), Some(Reason::PROTOCOL_ERROR)); }; conn.drive(join(req1, req2)).await; - conn.await.expect("client"); }; + join(srv, client).await; } diff --git a/tests/h2-tests/tests/codec_read.rs b/tests/h2-tests/tests/codec_read.rs index d955e186b..489d16daf 100644 --- a/tests/h2-tests/tests/codec_read.rs +++ b/tests/h2-tests/tests/codec_read.rs @@ -1,4 +1,3 @@ -use futures::future::join; use h2_support::prelude::*; #[tokio::test] diff --git a/tests/h2-tests/tests/codec_write.rs b/tests/h2-tests/tests/codec_write.rs index 0b85a2238..04627cdc9 100644 --- a/tests/h2-tests/tests/codec_write.rs +++ b/tests/h2-tests/tests/codec_write.rs @@ -1,4 +1,3 @@ -use futures::future::join; use h2_support::prelude::*; #[tokio::test] diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index dbb933286..e3caaff5f 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -1,4 +1,3 @@ -use futures::future::{join, join4}; use futures::{StreamExt, TryStreamExt}; use h2_support::prelude::*; use h2_support::util::yield_once; diff --git a/tests/h2-tests/tests/ping_pong.rs b/tests/h2-tests/tests/ping_pong.rs index 0f93578cc..2132c7acf 100644 --- a/tests/h2-tests/tests/ping_pong.rs +++ b/tests/h2-tests/tests/ping_pong.rs @@ -1,5 +1,4 @@ use futures::channel::oneshot; -use futures::future::join; use futures::StreamExt; use h2_support::assert_ping; use h2_support::prelude::*; diff --git a/tests/h2-tests/tests/prioritization.rs b/tests/h2-tests/tests/prioritization.rs index 11d2c2ccf..dd4ed9fea 100644 --- a/tests/h2-tests/tests/prioritization.rs +++ b/tests/h2-tests/tests/prioritization.rs @@ -1,4 +1,3 @@ -use futures::future::{join, select}; use futures::{pin_mut, FutureExt, StreamExt}; use h2_support::prelude::*; diff --git a/tests/h2-tests/tests/push_promise.rs b/tests/h2-tests/tests/push_promise.rs index 94c1154ef..b2d8fde0a 100644 --- a/tests/h2-tests/tests/push_promise.rs +++ b/tests/h2-tests/tests/push_promise.rs @@ -1,4 +1,3 @@ -use futures::future::join; use futures::{StreamExt, TryStreamExt}; use h2_support::prelude::*; @@ -249,7 +248,7 @@ async fn recv_push_promise_over_max_header_list_size() { frames::push_promise(1, 2).request("GET", "https://http2.akamai.com/style.css"), ) .await; - srv.recv_frame(frames::reset(2).refused()).await; + srv.recv_frame(frames::reset(2).protocol_error()).await; srv.send_frame(frames::headers(1).response(200).eos()).await; idle_ms(10).await; }; @@ -272,7 +271,7 @@ async fn recv_push_promise_over_max_header_list_size() { .0 .await .expect_err("response"); - assert_eq!(err.reason(), Some(Reason::REFUSED_STREAM)); + assert_eq!(err.reason(), Some(Reason::PROTOCOL_ERROR)); }; conn.drive(req).await; diff --git a/tests/h2-tests/tests/server.rs b/tests/h2-tests/tests/server.rs index 8bb30b3c1..06b6fac98 100644 --- a/tests/h2-tests/tests/server.rs +++ b/tests/h2-tests/tests/server.rs @@ -1,6 +1,5 @@ #![deny(warnings)] -use futures::future::join; use futures::StreamExt; use h2_support::prelude::*; use tokio::io::AsyncWriteExt; @@ -869,7 +868,7 @@ async fn too_big_headers_sends_reset_after_431_if_not_eos() { client .recv_frame(frames::headers(1).response(431).eos()) .await; - client.recv_frame(frames::reset(1).refused()).await; + client.recv_frame(frames::reset(1).protocol_error()).await; }; let srv = async move { diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index ceeb7e63a..a85f03ecb 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -1,6 +1,6 @@ #![deny(warnings)] -use futures::future::{join, join3, lazy, try_join}; +use futures::future::lazy; use futures::{FutureExt, StreamExt, TryStreamExt}; use h2_support::prelude::*; use h2_support::util::yield_once;