Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request for Comment: Native TopK Operator #7250

Closed
wants to merge 15 commits into from
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Traits for physical query plan, supporting parallel execution for partitioned relations.

mod topk;
mod visitor;
pub use self::metrics::Metric;
use self::metrics::MetricsSet;
Expand All @@ -27,6 +28,7 @@ use crate::datasource::physical_plan::FileScanConfig;
use crate::physical_plan::expressions::PhysicalSortExpr;
use datafusion_common::Result;
pub use datafusion_common::{internal_err, ColumnStatistics, Statistics};
pub use topk::TopK;
pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

use arrow::datatypes::SchemaRef;
Expand Down
77 changes: 54 additions & 23 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::physical_plan::metrics::{
};
use crate::physical_plan::sorts::merge::streaming_merge;
use crate::physical_plan::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use crate::physical_plan::topk::TopK;
use crate::physical_plan::{
DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan,
Partitioning, SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -759,7 +760,12 @@ impl DisplayAs for SortExec {
let expr: Vec<String> = self.expr.iter().map(|e| e.to_string()).collect();
match self.fetch {
Some(fetch) => {
write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(","))
write!(
f,
// TODO should this say topk?
"SortExec: fetch={fetch}, expr=[{}]",
expr.join(",")
)
}
None => write!(f, "SortExec: expr=[{}]", expr.join(",")),
}
Expand Down Expand Up @@ -847,29 +853,54 @@ impl ExecutionPlan for SortExec {

trace!("End SortExec's input.execute for partition: {}", partition);

let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
);
if let Some(fetch) = self.fetch.as_ref() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this prototype I simply used the TopK implementation for SortExec when there was a fetch as it produces the same output.

However, longer term I think we should make a real TopKExec ExecutionPlan so that the optimizers know about it and can avoid repartitioning / trying to do anything else fancy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is some threshold at which the fetch is large enough that sorting is the better approach, certainly if the fetch is large enough that we need to spill

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible approach for starters (i.e. prior to TopKExec) could be to run the new TopK when fetch is Some and there was no memory limits set (i.e. RuntimeEnv uses UnboundedMemoryPool), given that the ExternalSorter does a relatively good job of obeying the memory limits (though it will use it all up in this scenario).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold and @gruuya -- these are good ideas.

Maybe we can figure out how to switch between sort and TopK dynamically -- like if TopK exceeds some memory threshold revert back to the normal ExternalSorter. Let me think about this 🤔

let mut topk = TopK::try_new(
partition,
input.schema(),
self.expr.clone(),
*fetch,
context.session_config().batch_size(),
context.runtime_env(),
&self.metrics_set,
partition,
)?;

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
topk.insert_batch(batch)?;
}
topk.emit()
})
.try_flatten(),
)))
} else {
let mut sorter = ExternalSorter::new(
partition,
input.schema(),
self.expr.clone(),
context.session_config().batch_size(),
self.fetch,
execution_options.sort_spill_reservation_bytes,
execution_options.sort_in_place_threshold_bytes,
&self.metrics_set,
context.runtime_env(),
);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort()
})
.try_flatten(),
)))
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort()
})
.try_flatten(),
)))
}
}

fn metrics(&self) -> Option<MetricsSet> {
Expand Down
Loading