Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize SortPreservingMergeExec to avoid merging non-overlapping partitions #13296

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::Arc, vec,
};

use super::{get_projected_output_ordering, statistics::MinMaxStatistics};
use super::{get_projected_output_ordering, min_max_statistics_from_files};
use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl};
use crate::{error::Result, scalar::ScalarValue};

Expand Down Expand Up @@ -310,22 +310,12 @@ impl FileScanConfig {
sort_order: &LexOrdering,
) -> Result<Vec<Vec<PartitionedFile>>> {
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
// First Fit:
// * Choose the first file group that a file can be placed into.
// * If it fits into no existing file groups, create a new one.
//
// By sorting files by min values and then applying first-fit bin packing,
// we can produce the smallest number of file groups such that
// files within a group are in order and non-overlapping.
//
// Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
// https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
Comment on lines -313 to -322
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this and the relevant code into a new method, MinMaxStatistics::first_fit


if flattened_files.is_empty() {
return Ok(vec![]);
}

let statistics = MinMaxStatistics::new_from_files(
let statistics = min_max_statistics_from_files(
sort_order,
table_schema,
None,
Expand All @@ -335,24 +325,7 @@ impl FileScanConfig {
e.context("construct min/max statistics for split_groups_by_statistics")
})?;

let indices_sorted_by_min = statistics.min_values_sorted();
let mut file_groups_indices: Vec<Vec<usize>> = vec![];

for (idx, min) in indices_sorted_by_min {
let file_group_to_insert = file_groups_indices.iter_mut().find(|group| {
// If our file is non-overlapping and comes _after_ the last file,
// it fits in this file group.
min > statistics.max(
*group
.last()
.expect("groups should be nonempty at construction"),
)
});
match file_group_to_insert {
Some(group) => group.push(idx),
None => file_groups_indices.push(vec![idx]),
}
}
let file_groups_indices = statistics.first_fit();

// Assemble indices back into groups of PartitionedFiles
Ok(file_groups_indices
Expand Down
41 changes: 38 additions & 3 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ mod file_stream;
mod json;
#[cfg(feature = "parquet")]
pub mod parquet;
mod statistics;

pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
Expand All @@ -36,7 +35,9 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
use datafusion_common::{stats::Precision, ColumnStatistics, DataFusionError};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::statistics::MinMaxStatistics;
pub use file_groups::FileGroupPartitioner;
pub use file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
Expand Down Expand Up @@ -366,10 +367,10 @@ fn get_projected_output_ordering(
return false;
}

let statistics = match statistics::MinMaxStatistics::new_from_files(
let statistics = match min_max_statistics_from_files(
&new_ordering,
projected_schema,
base_config.projection.as_deref(),
base_config.projection.as_ref(),
group,
) {
Ok(statistics) => statistics,
Expand All @@ -395,6 +396,40 @@ fn get_projected_output_ordering(
all_orderings
}

/// Construct MinMaxStatistics from a list of files
fn min_max_statistics_from_files<'a>(
projected_sort_order: &LexOrdering, // Sort order with respect to projected schema
projected_schema: &SchemaRef, // Projected schema
projection: Option<&Vec<usize>>, // Indices of projection in full table schema (None = all columns)
files: impl IntoIterator<Item = &'a PartitionedFile>,
) -> Result<MinMaxStatistics> {
let projected_statistics = files
.into_iter()
.map(|file| {
let mut statistics = file.statistics.clone()?;
for partition in &file.partition_values {
statistics.column_statistics.push(ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(partition.clone()),
min_value: Precision::Exact(partition.clone()),
distinct_count: Precision::Exact(1),
});
}

Some(statistics.project(projection))
})
.collect::<Option<Vec<_>>>()
.ok_or_else(|| {
DataFusionError::Plan("Parquet file missing statistics".to_string())
})?;

MinMaxStatistics::new_from_statistics(
projected_sort_order,
projected_schema,
&projected_statistics,
)
}

/// Represents the possible outcomes of a range calculation.
///
/// This enum is used to encapsulate the result of calculating the range of
Expand Down
7 changes: 7 additions & 0 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
Ok(Statistics::new_unknown(&self.schema()))
}

fn statistics_by_partition(&self) -> Result<Vec<Statistics>> {
Ok(vec![
self.statistics()?;
self.properties().partitioning.partition_count()
])
}

Comment on lines +400 to +406
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in the PR description, this is what a proposed API would look like for statistics by partition, though it is certainly not final.

/// Returns `true` if a limit can be safely pushed down through this
/// `ExecutionPlan` node.
///
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub mod recursive_query;
pub mod repartition;
pub mod sorts;
pub mod spill;
pub mod statistics;
pub mod stream;
pub mod streaming;
pub mod tree_node;
Expand Down
44 changes: 35 additions & 9 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::common::spawn_buffered;
use crate::limit::LimitStream;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::statistics::MinMaxStatistics;
use crate::stream::RecordBatchStreamAdapter;
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
Expand All @@ -34,6 +36,7 @@ use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;

use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use futures::StreamExt;
use log::{debug, trace};

/// Sort preserving merge execution plan
Expand Down Expand Up @@ -249,16 +252,42 @@ impl ExecutionPlan for SortPreservingMergeExec {
MemoryConsumer::new(format!("SortPreservingMergeExec[{partition}]"))
.register(&context.runtime_env().memory_pool);

match input_partitions {
let statistics = MinMaxStatistics::new_from_statistics(
&self.expr,
&self.schema(),
&self.input.statistics_by_partition()?,
)?;

// Organize the input partitions into chains,
// where elements of each chain are input partitions that are
// non-overlapping, and each chain is ordered by their min/max statistics.
// Then concatenate each chain into a single stream.
let mut streams = statistics
.first_fit()
.into_iter()
.map(|chain| {
let streams = chain
.into_iter()
.map(|i| self.input.execute(i, Arc::clone(&context)))
.collect::<Result<Vec<_>>>()?;

// Concatenate the chain into a single stream
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.input.schema(),
futures::stream::iter(streams).flatten(),
)) as SendableRecordBatchStream)
})
.collect::<Result<Vec<_>>>()?;

match streams.len() {
0 => internal_err!(
"SortPreservingMergeExec requires at least one input partition"
),
1 => match self.fetch {
Some(fetch) => {
let stream = self.input.execute(0, context)?;
debug!("Done getting stream for SortPreservingMergeExec::execute with 1 input with {fetch}");
Ok(Box::pin(LimitStream::new(
stream,
streams.remove(0),
0,
Some(fetch),
BaselineMetrics::new(&self.metrics, partition),
Expand All @@ -271,12 +300,9 @@ impl ExecutionPlan for SortPreservingMergeExec {
}
},
_ => {
let receivers = (0..input_partitions)
.map(|partition| {
let stream =
self.input.execute(partition, Arc::clone(&context))?;
Ok(spawn_buffered(stream, 1))
})
let receivers = streams
.into_iter()
.map(|stream| Ok(spawn_buffered(stream, 1)))
.collect::<Result<_>>()?;

debug!("Done setting up sender-receiver for SortPreservingMergeExec::execute");
Expand Down
Loading
Loading