Skip to content

Commit

Permalink
make MinMaxStatistics only care about sorting column statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
suremarc committed Nov 7, 2024
1 parent b469b71 commit 31d3716
Showing 1 changed file with 36 additions and 30 deletions.
66 changes: 36 additions & 30 deletions datafusion/physical-plan/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,37 @@ impl MinMaxStatistics {
) -> Result<Self> {
use datafusion_common::ScalarValue;

let statistics = statistics.into_iter().collect::<Vec<_>>();
let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or(
DataFusionError::Plan("sort expression must be on column".to_string()),
)?;

let projection = sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>();

// Project the schema & sort order down to just the relevant columns

let projected_schema = Arc::new(schema.project(&projection)?);

let projected_sort_order = LexOrdering {
inner: sort_columns
.iter()
.zip(sort_order.iter())
.enumerate()
.map(|(i, (col, sort))| PhysicalSortExpr {
expr: Arc::new(Column::new(col.name(), i)),
options: sort.options,
})
.collect::<Vec<_>>(),
};

// Helper function to get min/max statistics for a given column of projected_schema
let projected_statistics = statistics
.into_iter()
.cloned()
.map(|s| s.project(Some(&projection)))
.collect::<Vec<_>>();

// Helper function to get min/max statistics
let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>, Vec<ScalarValue>)> {
Ok(statistics
Ok(projected_statistics
.iter()
.map(|s| {
s.column_statistics[i]
Expand All @@ -94,31 +120,11 @@ impl MinMaxStatistics {
.unzip())
};

let sort_columns = sort_columns_from_physical_sort_exprs(sort_order).ok_or(
DataFusionError::Plan("sort expression must be on column".to_string()),
)?;

// Project the schema & sort order down to just the relevant columns
let min_max_schema = Arc::new(
schema
.project(&(sort_columns.iter().map(|c| c.index()).collect::<Vec<_>>()))?,
);
let min_max_sort_order = LexOrdering {
inner: sort_columns
.iter()
.zip(sort_order.iter())
.enumerate()
.map(|(i, (col, sort))| PhysicalSortExpr {
expr: Arc::new(Column::new(col.name(), i)),
options: sort.options,
})
.collect::<Vec<_>>(),
};

let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
.iter()
.map(|c| {
let (min, max) = get_min_max(c.index()).map_err(|e| {
.enumerate()
.map(|(i, c)| {
let (min, max) = get_min_max(i).map_err(|e| {
e.context(format!("get min/max for column: '{}'", c.name()))
})?;
Ok((
Expand All @@ -132,14 +138,14 @@ impl MinMaxStatistics {
.unzip();

Self::new(
&min_max_sort_order,
&min_max_schema,
RecordBatch::try_new(Arc::clone(&min_max_schema), min_values).map_err(
&projected_sort_order,
&projected_schema,
RecordBatch::try_new(Arc::clone(&projected_schema), min_values).map_err(
|e| {
DataFusionError::ArrowError(e, Some("\ncreate min batch".to_string()))
},
)?,
RecordBatch::try_new(Arc::clone(&min_max_schema), max_values).map_err(
RecordBatch::try_new(Arc::clone(&projected_schema), max_values).map_err(
|e| {
DataFusionError::ArrowError(e, Some("\ncreate max batch".to_string()))
},
Expand Down

0 comments on commit 31d3716

Please sign in to comment.