From 4e9f2d5260ecaca261076d2e0a626d3f595ff5a5 Mon Sep 17 00:00:00 2001 From: Jay Zhan Date: Mon, 15 Apr 2024 21:13:07 +0800 Subject: [PATCH] Move conversion of FIRST/LAST Aggregate function to independent physical optimizer rule (#10061) * move out the ordering ruel Signed-off-by: jayzhan211 * introduce rule Signed-off-by: jayzhan211 * revert test result Signed-off-by: jayzhan211 * pass mulit order test Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * with new childes Signed-off-by: jayzhan211 * revert slt Signed-off-by: jayzhan211 * revert back Signed-off-by: jayzhan211 * rm rewrite in new child Signed-off-by: jayzhan211 * backup Signed-off-by: jayzhan211 * only move conversion to optimizer Signed-off-by: jayzhan211 * find test that do reverse Signed-off-by: jayzhan211 * add test for first and last Signed-off-by: jayzhan211 * pass all test Signed-off-by: jayzhan211 * upd test Signed-off-by: jayzhan211 * upd test Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * add aggregate test Signed-off-by: jayzhan211 * cleanup Signed-off-by: jayzhan211 * final draft Signed-off-by: jayzhan211 * cleanup again Signed-off-by: jayzhan211 * pull out finer ordering code and reuse Signed-off-by: jayzhan211 * clippy Signed-off-by: jayzhan211 * remove finer in optimize rule Signed-off-by: jayzhan211 * add comments and clenaup Signed-off-by: jayzhan211 * rename fun Signed-off-by: jayzhan211 * rename fun Signed-off-by: jayzhan211 * fmt Signed-off-by: jayzhan211 * avoid unnecessary recursion and rename Signed-off-by: jayzhan211 * remove unnecessary rule Signed-off-by: jayzhan211 * fix merge Signed-off-by: jayzhan211 --------- Signed-off-by: jayzhan211 --- .../physical_optimizer/convert_first_last.rs | 260 ++++++++++++++++++ datafusion/core/src/physical_optimizer/mod.rs | 1 + .../core/src/physical_optimizer/optimizer.rs | 3 + .../core/tests/data/convert_first_last.csv | 11 + .../physical-plan/src/aggregates/mod.rs | 117 ++++---- datafusion/physical-plan/src/tree_node.rs | 1 + datafusion/physical-plan/src/windows/mod.rs | 2 +- .../sqllogictest/test_files/aggregate.slt | 42 +++ .../sqllogictest/test_files/explain.slt | 3 + .../sqllogictest/test_files/group_by.slt | 4 +- 10 files changed, 372 insertions(+), 72 deletions(-) create mode 100644 datafusion/core/src/physical_optimizer/convert_first_last.rs create mode 100644 datafusion/core/tests/data/convert_first_last.csv diff --git a/datafusion/core/src/physical_optimizer/convert_first_last.rs b/datafusion/core/src/physical_optimizer/convert_first_last.rs new file mode 100644 index 000000000000..4102313d3126 --- /dev/null +++ b/datafusion/core/src/physical_optimizer/convert_first_last.rs @@ -0,0 +1,260 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_common::Result; +use datafusion_common::{ + config::ConfigOptions, + tree_node::{Transformed, TransformedResult, TreeNode}, +}; +use datafusion_physical_expr::expressions::{FirstValue, LastValue}; +use datafusion_physical_expr::{ + equivalence::ProjectionMapping, reverse_order_bys, AggregateExpr, + EquivalenceProperties, PhysicalSortRequirement, +}; +use datafusion_physical_plan::aggregates::concat_slices; +use datafusion_physical_plan::{ + aggregates::{AggregateExec, AggregateMode}, + ExecutionPlan, ExecutionPlanProperties, InputOrderMode, +}; +use std::sync::Arc; + +use datafusion_physical_plan::windows::get_ordered_partition_by_indices; + +use super::PhysicalOptimizerRule; + +/// The optimizer rule check the ordering requirements of the aggregate expressions. +/// And convert between FIRST_VALUE and LAST_VALUE if possible. +/// For example, If we have an ascending values and we want LastValue from the descending requirement, +/// it is equivalent to FirstValue with the current ascending ordering. +/// +/// The concrete example is that, says we have values c1 with [1, 2, 3], which is an ascending order. +/// If we want LastValue(c1 order by desc), which is the first value of reversed c1 [3, 2, 1], +/// so we can convert the aggregate expression to FirstValue(c1 order by asc), +/// since the current ordering is already satisfied, it saves our time! +#[derive(Default)] +pub struct OptimizeAggregateOrder {} + +impl OptimizeAggregateOrder { + pub fn new() -> Self { + Self::default() + } +} + +impl PhysicalOptimizerRule for OptimizeAggregateOrder { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + plan.transform_up(&get_common_requirement_of_aggregate_input) + .data() + } + + fn name(&self) -> &str { + "OptimizeAggregateOrder" + } + + fn schema_check(&self) -> bool { + true + } +} + +fn get_common_requirement_of_aggregate_input( + plan: Arc, +) -> Result>> { + if let Some(aggr_exec) = plan.as_any().downcast_ref::() { + let input = aggr_exec.input(); + let mut aggr_expr = try_get_updated_aggr_expr_from_child(aggr_exec); + let group_by = aggr_exec.group_by(); + let mode = aggr_exec.mode(); + + let input_eq_properties = input.equivalence_properties(); + let groupby_exprs = group_by.input_exprs(); + // If existing ordering satisfies a prefix of the GROUP BY expressions, + // prefix requirements with this section. In this case, aggregation will + // work more efficiently. + let indices = get_ordered_partition_by_indices(&groupby_exprs, input); + let requirement = indices + .iter() + .map(|&idx| PhysicalSortRequirement { + expr: groupby_exprs[idx].clone(), + options: None, + }) + .collect::>(); + + try_convert_first_last_if_better( + &requirement, + &mut aggr_expr, + input_eq_properties, + )?; + + let required_input_ordering = (!requirement.is_empty()).then_some(requirement); + + let input_order_mode = + if indices.len() == groupby_exprs.len() && !indices.is_empty() { + InputOrderMode::Sorted + } else if !indices.is_empty() { + InputOrderMode::PartiallySorted(indices) + } else { + InputOrderMode::Linear + }; + let projection_mapping = + ProjectionMapping::try_new(group_by.expr(), &input.schema())?; + + let cache = AggregateExec::compute_properties( + input, + plan.schema().clone(), + &projection_mapping, + mode, + &input_order_mode, + ); + + let aggr_exec = aggr_exec.new_with_aggr_expr_and_ordering_info( + required_input_ordering, + aggr_expr, + cache, + input_order_mode, + ); + + Ok(Transformed::yes( + Arc::new(aggr_exec) as Arc + )) + } else { + Ok(Transformed::no(plan)) + } +} + +/// In `create_initial_plan` for LogicalPlan::Aggregate, we have a nested AggregateExec where the first layer +/// is in Partial mode and the second layer is in Final or Finalpartitioned mode. +/// If the first layer of aggregate plan is transformed, we need to update the child of the layer with final mode. +/// Therefore, we check it and get the updated aggregate expressions. +/// +/// If AggregateExec is created from elsewhere, we skip the check and return the original aggregate expressions. +fn try_get_updated_aggr_expr_from_child( + aggr_exec: &AggregateExec, +) -> Vec> { + let input = aggr_exec.input(); + if aggr_exec.mode() == &AggregateMode::Final + || aggr_exec.mode() == &AggregateMode::FinalPartitioned + { + // Some aggregators may be modified during initialization for + // optimization purposes. For example, a FIRST_VALUE may turn + // into a LAST_VALUE with the reverse ordering requirement. + // To reflect such changes to subsequent stages, use the updated + // `AggregateExpr`/`PhysicalSortExpr` objects. + // + // The bottom up transformation is the mirror of LogicalPlan::Aggregate creation in [create_initial_plan] + if let Some(c_aggr_exec) = input.as_any().downcast_ref::() { + if c_aggr_exec.mode() == &AggregateMode::Partial { + // If the input is an AggregateExec in Partial mode, then the + // input is a CoalescePartitionsExec. In this case, the + // AggregateExec is the second stage of aggregation. The + // requirements of the second stage are the requirements of + // the first stage. + return c_aggr_exec.aggr_expr().to_vec(); + } + } + } + + aggr_exec.aggr_expr().to_vec() +} + +/// Get the common requirement that satisfies all the aggregate expressions. +/// +/// # Parameters +/// +/// - `aggr_exprs`: A slice of `Arc` containing all the +/// aggregate expressions. +/// - `group_by`: A reference to a `PhysicalGroupBy` instance representing the +/// physical GROUP BY expression. +/// - `eq_properties`: A reference to an `EquivalenceProperties` instance +/// representing equivalence properties for ordering. +/// - `agg_mode`: A reference to an `AggregateMode` instance representing the +/// mode of aggregation. +/// +/// # Returns +/// +/// A `LexRequirement` instance, which is the requirement that satisfies all the +/// aggregate requirements. Returns an error in case of conflicting requirements. +/// +/// Similar to the one in datafusion/physical-plan/src/aggregates/mod.rs, but this +/// function care only the possible conversion between FIRST_VALUE and LAST_VALUE +fn try_convert_first_last_if_better( + prefix_requirement: &[PhysicalSortRequirement], + aggr_exprs: &mut [Arc], + eq_properties: &EquivalenceProperties, +) -> Result<()> { + for aggr_expr in aggr_exprs.iter_mut() { + let aggr_req = aggr_expr.order_bys().unwrap_or(&[]); + let reverse_aggr_req = reverse_order_bys(aggr_req); + let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req); + let reverse_aggr_req = + PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req); + + if let Some(first_value) = aggr_expr.as_any().downcast_ref::() { + let mut first_value = first_value.clone(); + + if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &aggr_req, + )) { + first_value = first_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(first_value) as _; + } else if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &reverse_aggr_req, + )) { + // Converting to LAST_VALUE enables more efficient execution + // given the existing ordering: + let mut last_value = first_value.convert_to_last(); + last_value = last_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(last_value) as _; + } else { + // Requirement is not satisfied with existing ordering. + first_value = first_value.with_requirement_satisfied(false); + *aggr_expr = Arc::new(first_value) as _; + } + continue; + } + if let Some(last_value) = aggr_expr.as_any().downcast_ref::() { + let mut last_value = last_value.clone(); + if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &aggr_req, + )) { + last_value = last_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(last_value) as _; + } else if eq_properties.ordering_satisfy_requirement(&concat_slices( + prefix_requirement, + &reverse_aggr_req, + )) { + // Converting to FIRST_VALUE enables more efficient execution + // given the existing ordering: + let mut first_value = last_value.convert_to_first(); + first_value = first_value.with_requirement_satisfied(true); + *aggr_expr = Arc::new(first_value) as _; + } else { + // Requirement is not satisfied with existing ordering. + last_value = last_value.with_requirement_satisfied(false); + *aggr_expr = Arc::new(last_value) as _; + } + continue; + } + } + + Ok(()) +} diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index e990fead610d..c80668c6da74 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -24,6 +24,7 @@ pub mod aggregate_statistics; pub mod coalesce_batches; pub mod combine_partial_final_agg; +mod convert_first_last; pub mod enforce_distribution; pub mod enforce_sorting; pub mod join_selection; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 48da68cb2e37..08cbf68fa617 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -19,6 +19,7 @@ use std::sync::Arc; +use super::convert_first_last::OptimizeAggregateOrder; use super::projection_pushdown::ProjectionPushdown; use crate::config::ConfigOptions; use crate::physical_optimizer::aggregate_statistics::AggregateStatistics; @@ -101,6 +102,8 @@ impl PhysicalOptimizer { // Note that one should always run this rule after running the EnforceDistribution rule // as the latter may break local sorting requirements. Arc::new(EnforceSorting::new()), + // Run once after the local sorting requirement is changed + Arc::new(OptimizeAggregateOrder::new()), // TODO: `try_embed_to_hash_join` in the ProjectionPushdown rule would be block by the CoalesceBatches, so add it before CoalesceBatches. Maybe optimize it in the future. Arc::new(ProjectionPushdown::new()), // The CoalesceBatches rule will not influence the distribution and ordering of the diff --git a/datafusion/core/tests/data/convert_first_last.csv b/datafusion/core/tests/data/convert_first_last.csv new file mode 100644 index 000000000000..059b631e5711 --- /dev/null +++ b/datafusion/core/tests/data/convert_first_last.csv @@ -0,0 +1,11 @@ +c1,c2,c3 +1,9,0 +2,8,1 +3,7,2 +4,6,3 +5,5,4 +6,4,5 +7,3,6 +8,2,7 +9,1,8 +10,0,9 \ No newline at end of file diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 98c44e23c6c7..ba9a6b1be0ef 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -39,12 +39,15 @@ use datafusion_common::stats::Precision; use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_execution::TaskContext; use datafusion_expr::Accumulator; +use datafusion_physical_expr::aggregate::is_order_sensitive; +use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ - aggregate::is_order_sensitive, - equivalence::{collapse_lex_req, ProjectionMapping}, - expressions::{Column, FirstValue, LastValue, Max, Min, UnKnownColumn}, - physical_exprs_contains, reverse_order_bys, AggregateExpr, EquivalenceProperties, - LexOrdering, LexRequirement, PhysicalExpr, PhysicalSortRequirement, + equivalence::ProjectionMapping, + expressions::{Column, Max, Min, UnKnownColumn}, + AggregateExpr, LexRequirement, PhysicalExpr, +}; +use datafusion_physical_expr::{ + physical_exprs_contains, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; use itertools::Itertools; @@ -269,6 +272,36 @@ pub struct AggregateExec { } impl AggregateExec { + /// Function used in `ConvertFirstLast` optimizer rule, + /// where we need parts of the new value, others cloned from the old one + pub fn new_with_aggr_expr_and_ordering_info( + &self, + required_input_ordering: Option, + aggr_expr: Vec>, + cache: PlanProperties, + input_order_mode: InputOrderMode, + ) -> Self { + Self { + aggr_expr, + required_input_ordering, + metrics: ExecutionPlanMetricsSet::new(), + input_order_mode, + cache, + // clone the rest of the fields + mode: self.mode, + group_by: self.group_by.clone(), + filter_expr: self.filter_expr.clone(), + limit: self.limit, + input: self.input.clone(), + schema: self.schema.clone(), + input_schema: self.input_schema.clone(), + } + } + + pub fn cache(&self) -> &PlanProperties { + &self.cache + } + /// Create a new hash aggregate execution plan pub fn try_new( mode: AggregateMode, @@ -336,8 +369,7 @@ impl AggregateExec { }) .collect::>(); - let req = get_aggregate_exprs_requirement( - &new_requirement, + let req = get_finer_aggregate_exprs_requirement( &mut aggr_expr, &group_by, input_eq_properties, @@ -369,6 +401,7 @@ impl AggregateExec { &mode, &input_order_mode, ); + Ok(AggregateExec { mode, group_by, @@ -507,7 +540,7 @@ impl AggregateExec { } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. - fn compute_properties( + pub fn compute_properties( input: &Arc, schema: SchemaRef, projection_mapping: &ProjectionMapping, @@ -683,9 +716,9 @@ impl ExecutionPlan for AggregateExec { children[0].clone(), self.input_schema.clone(), self.schema.clone(), - //self.original_schema.clone(), )?; me.limit = self.limit; + Ok(Arc::new(me)) } @@ -870,7 +903,7 @@ fn finer_ordering( } /// Concatenates the given slices. -fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { +pub fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { [lhs, rhs].concat() } @@ -891,8 +924,7 @@ fn concat_slices(lhs: &[T], rhs: &[T]) -> Vec { /// /// A `LexRequirement` instance, which is the requirement that satisfies all the /// aggregate requirements. Returns an error in case of conflicting requirements. -fn get_aggregate_exprs_requirement( - prefix_requirement: &[PhysicalSortRequirement], +fn get_finer_aggregate_exprs_requirement( aggr_exprs: &mut [Arc], group_by: &PhysicalGroupBy, eq_properties: &EquivalenceProperties, @@ -900,60 +932,6 @@ fn get_aggregate_exprs_requirement( ) -> Result { let mut requirement = vec![]; for aggr_expr in aggr_exprs.iter_mut() { - let aggr_req = aggr_expr.order_bys().unwrap_or(&[]); - let reverse_aggr_req = reverse_order_bys(aggr_req); - let aggr_req = PhysicalSortRequirement::from_sort_exprs(aggr_req); - let reverse_aggr_req = - PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_req); - - if let Some(first_value) = aggr_expr.as_any().downcast_ref::() { - let mut first_value = first_value.clone(); - if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &aggr_req, - )) { - first_value = first_value.with_requirement_satisfied(true); - *aggr_expr = Arc::new(first_value) as _; - } else if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &reverse_aggr_req, - )) { - // Converting to LAST_VALUE enables more efficient execution - // given the existing ordering: - let mut last_value = first_value.convert_to_last(); - last_value = last_value.with_requirement_satisfied(true); - *aggr_expr = Arc::new(last_value) as _; - } else { - // Requirement is not satisfied with existing ordering. - first_value = first_value.with_requirement_satisfied(false); - *aggr_expr = Arc::new(first_value) as _; - } - continue; - } - if let Some(last_value) = aggr_expr.as_any().downcast_ref::() { - let mut last_value = last_value.clone(); - if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &aggr_req, - )) { - last_value = last_value.with_requirement_satisfied(true); - *aggr_expr = Arc::new(last_value) as _; - } else if eq_properties.ordering_satisfy_requirement(&concat_slices( - prefix_requirement, - &reverse_aggr_req, - )) { - // Converting to FIRST_VALUE enables more efficient execution - // given the existing ordering: - let mut first_value = last_value.convert_to_first(); - first_value = first_value.with_requirement_satisfied(true); - *aggr_expr = Arc::new(first_value) as _; - } else { - // Requirement is not satisfied with existing ordering. - last_value = last_value.with_requirement_satisfied(false); - *aggr_expr = Arc::new(last_value) as _; - } - continue; - } if let Some(finer_ordering) = finer_ordering(&requirement, aggr_expr, group_by, eq_properties, agg_mode) { @@ -1003,6 +981,7 @@ fn get_aggregate_exprs_requirement( continue; } } + // Neither the existing requirement and current aggregate requirement satisfy the other, this means // requirements are conflicting. Currently, we do not support // conflicting requirements. @@ -1010,6 +989,7 @@ fn get_aggregate_exprs_requirement( "Conflicting ordering requirements in aggregate functions is not supported" ); } + Ok(PhysicalSortRequirement::from_sort_exprs(&requirement)) } @@ -1235,7 +1215,7 @@ mod tests { use datafusion_execution::memory_pool::FairSpillPool; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_physical_expr::expressions::{ - lit, ApproxDistinct, Count, LastValue, Median, OrderSensitiveArrayAgg, + lit, ApproxDistinct, Count, FirstValue, LastValue, Median, OrderSensitiveArrayAgg, }; use datafusion_physical_expr::{ reverse_order_bys, AggregateExpr, EquivalenceProperties, PhysicalExpr, @@ -2171,8 +2151,7 @@ mod tests { }) .collect::>(); let group_by = PhysicalGroupBy::new_single(vec![]); - let res = get_aggregate_exprs_requirement( - &[], + let res = get_finer_aggregate_exprs_requirement( &mut aggr_exprs, &group_by, &eq_properties, diff --git a/datafusion/physical-plan/src/tree_node.rs b/datafusion/physical-plan/src/tree_node.rs index 52a52f81bdaf..46460cbb6684 100644 --- a/datafusion/physical-plan/src/tree_node.rs +++ b/datafusion/physical-plan/src/tree_node.rs @@ -64,6 +64,7 @@ impl PlanContext { pub fn update_plan_from_children(mut self) -> Result { let children_plans = self.children.iter().map(|c| c.plan.clone()).collect(); self.plan = with_new_children_if_necessary(self.plan, children_plans)?; + Ok(self) } } diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index c5c845614c7b..e01ee06a12b8 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -373,7 +373,7 @@ pub(crate) fn calc_requirements< /// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used, /// this vector will be [1, 0]. It means that when we iterate b, a columns with the order [1, 0] /// resulting vector (a, b) is a preset of the existing ordering (a, b, c). -pub(crate) fn get_ordered_partition_by_indices( +pub fn get_ordered_partition_by_indices( partition_by_exprs: &[Arc], input: &Arc, ) -> Vec { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 30d5c7243f26..3d24fe3888d7 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -3455,3 +3455,45 @@ SELECT LAST_VALUE(column1 ORDER BY column2 DESC) IGNORE NULLS FROM t; statement ok DROP TABLE t; + +# Test Convert FirstLast optimizer rule +statement ok +CREATE EXTERNAL TABLE convert_first_last_table ( +c1 INT NOT NULL, +c2 INT NOT NULL, +c3 INT NOT NULL +) +STORED AS CSV +WITH HEADER ROW +WITH ORDER (c1 ASC) +WITH ORDER (c2 DESC) +WITH ORDER (c3 ASC) +LOCATION '../core/tests/data/convert_first_last.csv'; + +# test first to last, the result does not show difference, we need to check the conversion by `explain` +query TT +explain select first_value(c1 order by c3 desc) from convert_first_last_table; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]] +02)--TableScan: convert_first_last_table projection=[c1, c3] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], has_header=true + +# test last to first +query TT +explain select last_value(c1 order by c2 asc) from convert_first_last_table; +---- +logical_plan +01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]] +02)--TableScan: convert_first_last_table projection=[c1, c2] +physical_plan +01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)] +02)--CoalescePartitionsExec +03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)] +04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], has_header=true diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index c357391e70b5..d8c8dcd41b6a 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -248,6 +248,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE +physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true @@ -304,6 +305,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE +physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements @@ -340,6 +342,7 @@ physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE physical_plan after EnforceDistribution SAME TEXT AS ABOVE physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE physical_plan after EnforceSorting SAME TEXT AS ABOVE +physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after OutputRequirements diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 1acdcde9c8ee..5c5bf58dd049 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2805,7 +2805,7 @@ logical_plan 04)------TableScan: sales_global projection=[country, ts, amount] physical_plan 01)ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@2 as lv1, SUM(sales_global.amount) ORDER BY [sales_global.ts DESC NULLS FIRST]@3 as sum1] -02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[LAST_VALUE(sales_global.amount), FIRST_VALUE(sales_global.amount), SUM(sales_global.amount)] +02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount), SUM(sales_global.amount)] 03)----MemoryExec: partitions=1, partition_sizes=[1] query TRRR rowsort @@ -3800,7 +3800,7 @@ logical_plan 03)----TableScan: multiple_ordered_table projection=[a, c, d] physical_plan 01)ProjectionExec: expr=[FIRST_VALUE(multiple_ordered_table.a) ORDER BY [multiple_ordered_table.a ASC NULLS LAST]@1 as first_a, LAST_VALUE(multiple_ordered_table.c) ORDER BY [multiple_ordered_table.c DESC NULLS FIRST]@2 as last_c] -02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)] +02)--AggregateExec: mode=FinalPartitioned, gby=[d@0 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), LAST_VALUE(multiple_ordered_table.c)] 03)----CoalesceBatchesExec: target_batch_size=2 04)------RepartitionExec: partitioning=Hash([d@0], 8), input_partitions=8 05)--------AggregateExec: mode=Partial, gby=[d@2 as d], aggr=[FIRST_VALUE(multiple_ordered_table.a), FIRST_VALUE(multiple_ordered_table.c)]