From f45bca780f4e87f7f0c614782198be0ec91836bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Mon, 5 Aug 2024 09:43:59 +0200 Subject: [PATCH] Chunk stream properly if underlying stream always ready The previous chunking stream adapter had a problem that if the underlying stream never went into Poll::Pending state, it looped forever. It never yielded back to the executor, the time was not properly advanced, and the chunks were not produced. This commit fixes it by limiting the number of items requested from the underlying stream in a single poll_next, and letting the runtime poll again immediately if there exist items ready to pick. This way we give the async runtime a chance to execute a different future or to do other important stuff. (cherry picked from commit 70edd924b1c6c3b8af48048e4dadec025ccca39f) --- src/chunks.rs | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/chunks.rs b/src/chunks.rs index ff17f7f..9736ce7 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -1,6 +1,7 @@ use futures::stream::{Fuse, Skip}; use futures::{Stream, StreamExt}; use pin_project::pin_project; +use std::cmp; use std::fmt::Debug; use std::pin::Pin; use std::task::{Context, Poll}; @@ -94,13 +95,13 @@ where } } - fn next_batch(self: Pin<&mut Self>) -> Option { + fn next_chunk(self: Pin<&mut Self>) -> Option { let this = self.project(); *this.current_chunk_size = 0; this.current_chunk.replace((this.new_chunk)()) } - fn final_batch(self: Pin<&mut Self>) -> Option { + fn final_chunk(self: Pin<&mut Self>) -> Option { let this = self.project(); *this.current_chunk_size = 0; this.current_chunk.take() @@ -119,34 +120,52 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut().project(); - // Add all ready items in the source stream to the current chunk: - while this.current_chunk_size < this.max_chunk_size { + // The number of items we attempt to get from the underlying stream in this poll. + // This must be limited, because it is possible that the underlying + // stream is always ready, and we must eventually yield back to the executor. + let mut remaining_count = cmp::min(128, *this.max_chunk_size - *this.current_chunk_size); + + // Add ready items in the source stream to the current chunk: + while remaining_count > 0 { match this.src.as_mut().poll_next(cx) { + // Add a ready item in the source stream to the current chunk: Poll::Ready(Some(item)) => { *this.current_chunk_size += 1; + remaining_count -= 1; let chunk = this.current_chunk.as_mut().expect("chunk must be set"); (this.accumulate)(chunk, item); } + // End of stream, emit the last batch. + // Subsequent calls will emit Poll::Ready(None) Poll::Ready(None) => { - return Poll::Ready(self.final_batch()); + return Poll::Ready(self.final_chunk()); } + // No more items in source Poll::Pending => { - // No more items, but we can't leave yet, we need to check the clock - // at the end of the loop + // Don't return yet, we need to check the clock, + // because maybe we need to emit the batch break; } } } + + // Check the clock, if installed let deadline_reached = match this.clock.as_mut().project() { ClockProj::Some(clock) => clock.poll_next(cx).is_ready(), ClockProj::None => false, }; + // Either the time limit reached or the item count limit reached - switch to the next batch if deadline_reached || this.current_chunk_size >= this.max_chunk_size { - Poll::Ready(self.next_batch()) - } else { - Poll::Pending + return Poll::Ready(self.next_chunk()); + } + + // If we fetched all the items we requested that means the underlying stream has likely + // more ready items waiting for us, so mark this task as ready, so we get polled again ASAP: + if remaining_count == 0 { + cx.waker().wake_by_ref(); } + Poll::Pending } }