diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 01b19f618f8e4..4a7928efe504d 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -171,11 +171,16 @@ impl SortPreservingMergeStream { // Ensure all non-exhausted streams have a cursor from which // rows can be pulled loop { - for i in 0..self.streams.partitions() { + let uninitiated_cursors = self.uninitiated_cursors.clone(); + for i in uninitiated_cursors { if self.uninitiated_cursors.contains(&i) { - if let Poll::Ready(Err(e)) = self.maybe_poll_stream(cx, i) { - self.aborted = true; - return Poll::Ready(Some(Err(e))); + match self.maybe_poll_stream(cx, i) { + Poll::Ready(Err(e)) => { + self.aborted = true; + return Poll::Ready(Some(Err(e))); + } + Poll::Pending => return Poll::Pending, + _ => {} } } }