Skip to content

Commit

Permalink
Update merge.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
berkaysynnada committed Sep 2, 2024
1 parent 5690c12 commit b81b99c
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions datafusion/physical-plan/src/sorts/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;

use futures::Stream;
use hashbrown::HashSet;

/// A fallible [`PartitionedStream`] of [`Cursor`] and [`RecordBatch`]
type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>;
Expand Down Expand Up @@ -102,7 +103,7 @@ pub(crate) struct SortPreservingMergeStream<C: CursorValues> {
produced: usize,

/// Cursors which are not initialized yet
uninitiated_cursors: Vec<usize>,
uninitiated_cursors: HashSet<usize>,
}

impl<C: CursorValues> SortPreservingMergeStream<C> {
Expand Down Expand Up @@ -146,12 +147,12 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {

match futures::ready!(self.streams.poll_next(cx, idx)) {
None => {
self.uninitiated_cursors.remove(idx);
self.uninitiated_cursors.retain(|&x| x != idx);
Poll::Ready(Ok(()))
}
Some(Err(e)) => Poll::Ready(Err(e)),
Some(Ok((cursor, batch))) => {
self.uninitiated_cursors.remove(idx);
self.uninitiated_cursors.retain(|&x| x != idx);
self.cursors[idx] = Some(Cursor::new(cursor));
Poll::Ready(self.in_progress.push_batch(idx, batch))
}
Expand All @@ -178,10 +179,6 @@ impl<C: CursorValues> SortPreservingMergeStream<C> {
self.aborted = true;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.uninitiated_cursors.rotate_left(1);
return Poll::Pending;
}
_ => {}
}
}
Expand Down

0 comments on commit b81b99c

Please sign in to comment.