Skip to content

Commit

Permalink
Update merge.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Sep 2, 2024
1 parent 37f5fd0 commit 470eb38
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,16 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
// Ensure all non-exhausted streams have a cursor from which
// rows can be pulled
loop {
for i in 0..self.streams.partitions() {
let uninitiated_cursors = self.uninitiated_cursors.clone();
for i in uninitiated_cursors {
if self.uninitiated_cursors.contains(&i) {
if let Poll::Ready(Err(e)) = self.maybe_poll_stream(cx, i) {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
match self.maybe_poll_stream(cx, i) {
Poll::Ready(Err(e)) => {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => return Poll::Pending,
_ => {}
}
}
}
Expand Down

0 comments on commit 470eb38

Please sign in to comment.