Skip to content

Commit

Permalink
Prototype TopK operator
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Aug 11, 2023
1 parent f9a9f12 commit 524af05
Show file tree
Hide file tree
Showing 5 changed files with 825 additions and 73 deletions.
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Traits for physical query plan, supporting parallel execution for partitioned relations.
mod topk;
mod visitor;
pub use self::metrics::Metric;
use self::metrics::MetricsSet;
Expand All @@ -27,6 +28,7 @@ use crate::datasource::physical_plan::FileScanConfig;
use crate::physical_plan::expressions::PhysicalSortExpr;
use datafusion_common::Result;
pub use datafusion_common::{ColumnStatistics, Statistics};
pub use topk::TopK;
pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

use arrow::datatypes::SchemaRef;
Expand Down
77 changes: 54 additions & 23 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::physical_plan::metrics::{
};
use crate::physical_plan::sorts::merge::streaming_merge;
use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::physical_plan::topk::TopK;
use crate::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -759,7 +760,12 @@ impl DisplayAs for SortExec {
let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
match self.fetch {
Some(fetch) => {
write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(","))
write!(
f,
// TODO should this say topk?
"SortExec: fetch={fetch}, expr=[{}]",
expr.join(",")
)
}
None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
}
Expand Down Expand Up @@ -847,29 +853,54 @@ impl ExecutionPlan for SortExec {

trace!("End SortExec's input.execute for partition: {}", partition);

let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
);
if let Some(fetch) = self.fetch.as_ref() {
let mut topk = TopK::try_new(
partition,
input.schema(),
self.expr.clone(),
*fetch,
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
partition,
)?;

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
topk.insert_batch(batch)?;
}
topk.emit()
})
.try_flatten(),
)))
} else {
let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort()
})
.try_flatten(),
)))
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort()
})
.try_flatten(),
)))
}
}

fn metrics(&self) -> Option<MetricsSet> {
Expand Down
Loading

0 comments on commit 524af05

Please sign in to comment.