diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 73b4ca10daec7..41fe1cfe495ed 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -34,6 +34,7 @@ use datafusion_common::Result; use datafusion_execution::memory_pool::MemoryReservation; use futures::Stream; +use hashbrown::HashSet; /// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`] type CursorStream = Box>>; @@ -102,7 +103,7 @@ pub(crate) struct SortPreservingMergeStream { produced: usize, /// Cursors which are not initialized yet - uninitiated_cursors: Vec, + uninitiated_cursors: HashSet, } impl SortPreservingMergeStream { @@ -146,12 +147,12 @@ impl SortPreservingMergeStream { match futures::ready!(self.streams.poll_next(cx, idx)) { None => { - self.uninitiated_cursors.remove(idx); + self.uninitiated_cursors.retain(|&x| x != idx); Poll::Ready(Ok(())) } Some(Err(e)) => Poll::Ready(Err(e)), Some(Ok((cursor, batch))) => { - self.uninitiated_cursors.remove(idx); + self.uninitiated_cursors.retain(|&x| x != idx); self.cursors[idx] = Some(Cursor::new(cursor)); Poll::Ready(self.in_progress.push_batch(idx, batch)) } @@ -178,10 +179,6 @@ impl SortPreservingMergeStream { self.aborted = true; return Poll::Ready(Some(Err(e))); } - Poll::Pending => { - self.uninitiated_cursors.rotate_left(1); - return Poll::Pending; - } _ => {} } }