diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 7b5546628..635b5a3f3 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -182,8 +182,7 @@ impl<'a> SendStream<'a> { return Err(WriteError::Blocked); } - let limit = (self.state.max_data - self.state.data_sent) - .min(self.state.send_window - self.state.unacked_data); + let limit = self.state.write_limit(); let stream = self .state .send diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index a8435bb4a..f25ce6e9f 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -50,7 +50,7 @@ pub struct StreamsState { /// Streams are only added to this list when a write fails. pub(super) connection_blocked: Vec, /// Connection-level flow control budget dictated by the peer - pub(super) max_data: u64, + max_data: u64, /// The initial receive window receive_window: u64, /// Limit on incoming data, which is transmitted through `MAX_DATA` frames @@ -603,9 +603,18 @@ impl StreamsState { )); } + let write_limit = self.write_limit(); if let Some(ss) = self.send.get_mut(&id) { if ss.increase_max_data(offset) { - self.events.push_back(StreamEvent::Writable { id }); + if write_limit > 0 { + self.events.push_back(StreamEvent::Writable { id }); + } else if !ss.connection_blocked { + // The stream is still blocked on the connection flow control + // window. In order to get unblocked when the window relaxes + // it needs to be in the connection blocked list. + ss.connection_blocked = true; + self.connection_blocked.push(id); + } } } else if id.initiator() == self.side && self.is_local_unopened(id) { debug!("got MAX_STREAM_DATA on unopened {}", id); @@ -618,6 +627,11 @@ impl StreamsState { Ok(()) } + /// Returns the maximum amount of data this is allowed to be written on the connection + pub fn write_limit(&self) -> u64 { + (self.max_data - self.data_sent).min(self.send_window - self.unacked_data) + } + /// Yield stream events pub fn poll(&mut self) -> Option { if let Some(dir) = Dir::iter().find(|&i| mem::replace(&mut self.opened[i as usize], false)) @@ -625,7 +639,7 @@ impl StreamsState { return Some(StreamEvent::Opened { dir }); } - if self.data_sent < self.max_data && self.unacked_data < self.send_window { + if self.write_limit() > 0 { while let Some(id) = self.connection_blocked.pop() { let stream = match self.send.get_mut(&id) { None => continue,