Skip to content

Commit

Permalink
Chunk stream properly if underlying stream always ready
Browse files Browse the repository at this point in the history
The previous chunking stream adapter had a problem that if
the underlying stream never went into Poll::Pending state,
it looped forever. It never yielded back to the executor,
the time was not properly advanced, and the chunks were
not produced.

This commit fixes it by limiting the number
of items requested from the underlying stream
in a single poll_next, and letting the runtime poll again
immediately if there exist items ready to pick. This way
we give the async runtime a chance to execute a
different future or to do other important stuff.

(cherry picked from commit 70edd92)
  • Loading branch information
pkolaczk authored and vponomaryov committed Oct 29, 2024
1 parent cae8e19 commit f45bca7
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions src/chunks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use futures::stream::{Fuse, Skip};
use futures::{Stream, StreamExt};
use pin_project::pin_project;
use std::cmp;
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -94,13 +95,13 @@ where
}
}

fn next_batch(self: Pin<&mut Self>) -> Option<Chunk> {
fn next_chunk(self: Pin<&mut Self>) -> Option<Chunk> {
let this = self.project();
*this.current_chunk_size = 0;
this.current_chunk.replace((this.new_chunk)())
}

fn final_batch(self: Pin<&mut Self>) -> Option<Chunk> {
fn final_chunk(self: Pin<&mut Self>) -> Option<Chunk> {
let this = self.project();
*this.current_chunk_size = 0;
this.current_chunk.take()
Expand All @@ -119,34 +120,52 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();

// Add all ready items in the source stream to the current chunk:
while this.current_chunk_size < this.max_chunk_size {
// The number of items we attempt to get from the underlying stream in this poll.
// This must be limited, because it is possible that the underlying
// stream is always ready, and we must eventually yield back to the executor.
let mut remaining_count = cmp::min(128, *this.max_chunk_size - *this.current_chunk_size);

// Add ready items in the source stream to the current chunk:
while remaining_count > 0 {
match this.src.as_mut().poll_next(cx) {
// Add a ready item in the source stream to the current chunk:
Poll::Ready(Some(item)) => {
*this.current_chunk_size += 1;
remaining_count -= 1;
let chunk = this.current_chunk.as_mut().expect("chunk must be set");
(this.accumulate)(chunk, item);
}
// End of stream, emit the last batch.
// Subsequent calls will emit Poll::Ready(None)
Poll::Ready(None) => {
return Poll::Ready(self.final_batch());
return Poll::Ready(self.final_chunk());
}
// No more items in source
Poll::Pending => {
// No more items, but we can't leave yet, we need to check the clock
// at the end of the loop
// Don't return yet, we need to check the clock,
// because maybe we need to emit the batch
break;
}
}
}

// Check the clock, if installed
let deadline_reached = match this.clock.as_mut().project() {
ClockProj::Some(clock) => clock.poll_next(cx).is_ready(),
ClockProj::None => false,
};

// Either the time limit reached or the item count limit reached - switch to the next batch
if deadline_reached || this.current_chunk_size >= this.max_chunk_size {
Poll::Ready(self.next_batch())
} else {
Poll::Pending
return Poll::Ready(self.next_chunk());
}

// If we fetched all the items we requested that means the underlying stream has likely
// more ready items waiting for us, so mark this task as ready, so we get polled again ASAP:
if remaining_count == 0 {
cx.waker().wake_by_ref();
}
Poll::Pending
}
}

Expand Down

0 comments on commit f45bca7

Please sign in to comment.