diff --git a/src/chunks.rs b/src/chunks.rs index ff17f7f..9736ce7 100644 --- a/src/chunks.rs +++ b/src/chunks.rs @@ -1,6 +1,7 @@ use futures::stream::{Fuse, Skip}; use futures::{Stream, StreamExt}; use pin_project::pin_project; +use std::cmp; use std::fmt::Debug; use std::pin::Pin; use std::task::{Context, Poll}; @@ -94,13 +95,13 @@ where } } - fn next_batch(self: Pin<&mut Self>) -> Option { + fn next_chunk(self: Pin<&mut Self>) -> Option { let this = self.project(); *this.current_chunk_size = 0; this.current_chunk.replace((this.new_chunk)()) } - fn final_batch(self: Pin<&mut Self>) -> Option { + fn final_chunk(self: Pin<&mut Self>) -> Option { let this = self.project(); *this.current_chunk_size = 0; this.current_chunk.take() @@ -119,34 +120,52 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut().project(); - // Add all ready items in the source stream to the current chunk: - while this.current_chunk_size < this.max_chunk_size { + // The number of items we attempt to get from the underlying stream in this poll. + // This must be limited, because it is possible that the underlying + // stream is always ready, and we must eventually yield back to the executor. + let mut remaining_count = cmp::min(128, *this.max_chunk_size - *this.current_chunk_size); + + // Add ready items in the source stream to the current chunk: + while remaining_count > 0 { match this.src.as_mut().poll_next(cx) { + // Add a ready item in the source stream to the current chunk: Poll::Ready(Some(item)) => { *this.current_chunk_size += 1; + remaining_count -= 1; let chunk = this.current_chunk.as_mut().expect("chunk must be set"); (this.accumulate)(chunk, item); } + // End of stream, emit the last batch. + // Subsequent calls will emit Poll::Ready(None) Poll::Ready(None) => { - return Poll::Ready(self.final_batch()); + return Poll::Ready(self.final_chunk()); } + // No more items in source Poll::Pending => { - // No more items, but we can't leave yet, we need to check the clock - // at the end of the loop + // Don't return yet, we need to check the clock, + // because maybe we need to emit the batch break; } } } + + // Check the clock, if installed let deadline_reached = match this.clock.as_mut().project() { ClockProj::Some(clock) => clock.poll_next(cx).is_ready(), ClockProj::None => false, }; + // Either the time limit reached or the item count limit reached - switch to the next batch if deadline_reached || this.current_chunk_size >= this.max_chunk_size { - Poll::Ready(self.next_batch()) - } else { - Poll::Pending + return Poll::Ready(self.next_chunk()); + } + + // If we fetched all the items we requested that means the underlying stream has likely + // more ready items waiting for us, so mark this task as ready, so we get polled again ASAP: + if remaining_count == 0 { + cx.waker().wake_by_ref(); } + Poll::Pending } }