From 1b4d602c8273910981f1416f473c7a090352c8f2 Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 11 Aug 2024 19:06:13 +0200 Subject: [PATCH 1/2] read remaining data before deciding to send STOP_SENDING frame --- quinn/src/recv_stream.rs | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/quinn/src/recv_stream.rs b/quinn/src/recv_stream.rs index be2b64324..00681ba07 100644 --- a/quinn/src/recv_stream.rs +++ b/quinn/src/recv_stream.rs @@ -499,9 +499,38 @@ impl Drop for RecvStream { if conn.error.is_some() || (self.is_0rtt && conn.check_0rtt().is_err()) { return; } - if !self.all_data_read { + + let mut stream = conn.inner.recv_stream(self.stream); + + /// Read remaining data before deciding to send STOP_SENDING stream + fn really_all_data_read(stream: &mut proto::RecvStream) -> bool { + // read to all remaining data + let chunks = stream.read(true); + match chunks { + Ok(mut chunks) => { + let status: ReadStatus<_> = loop { + match chunks.next(usize::MAX) { + // Ignore data, user dropped stream + Ok(Some(_)) => (), + res => break (Option::::None, res.err()).into(), + } + }; + + let _ = chunks.finalize(); + match status { + ReadStatus::Readable(_) => false, + ReadStatus::Finished(_) => true, + ReadStatus::Failed(_, proto::ReadError::Blocked) => false, + ReadStatus::Failed(_, proto::ReadError::Reset(_)) => true, + } + } + Err(_) => false, + } + } + + if !self.all_data_read && !really_all_data_read(&mut stream) { // Ignore ClosedStream errors - let _ = conn.inner.recv_stream(self.stream).stop(0u32.into()); + let _ = stream.stop(0u32.into()); conn.wake(); } } From 7a5cc4241d4aa8dfa2bd943cf5ac553b75e5c13b Mon Sep 17 00:00:00 2001 From: ruben Date: Sun, 11 Aug 2024 19:34:13 +0200 Subject: [PATCH 2/2] add futures-io feature to the list --- quinn/Cargo.toml | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/quinn/Cargo.toml b/quinn/Cargo.toml index 15838f84e..879089ce5 100644 --- a/quinn/Cargo.toml +++ b/quinn/Cargo.toml @@ -30,6 +30,8 @@ runtime-smol = ["async-io", "smol"] # Configure `tracing` to log events via `log` if no `tracing` subscriber exists. log = ["tracing/log", "proto/log", "udp/log"] +futures-io = ["dep:futures-io"] + [dependencies] async-io = { workspace = true, optional = true } async-std = { workspace = true, optional = true } @@ -43,9 +45,11 @@ rustls = { workspace = true, optional = true } smol = { workspace = true, optional = true } socket2 = { workspace = true } thiserror = { workspace = true } -tracing = { workspace = true } +tracing = { workspace = true } tokio = { workspace = true } -udp = { package = "quinn-udp", path = "../quinn-udp", version = "0.5", default-features = false, features = ["tracing"] } +udp = { package = "quinn-udp", path = "../quinn-udp", version = "0.5", default-features = false, features = [ + "tracing", +] } [dev-dependencies] anyhow = { workspace = true } @@ -56,7 +60,12 @@ rand = { workspace = true } rcgen = { workspace = true } rustls-pemfile = { workspace = true } clap = { workspace = true } -tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "macros"] } +tokio = { workspace = true, features = [ + "rt", + "rt-multi-thread", + "time", + "macros", +] } tracing-subscriber = { workspace = true } tracing-futures = { workspace = true } url = { workspace = true }