diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 9e60ffe748b0..5a9a4f64876d 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -17,10 +17,9 @@ use crate::expressions::CastExpr; use arrow_schema::SchemaRef; -use datafusion_common::{JoinSide, JoinType}; -use indexmap::IndexSet; +use datafusion_common::{JoinSide, JoinType, Result}; +use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; -use std::collections::{HashMap, HashSet}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -428,85 +427,69 @@ impl EquivalenceProperties { } /// we substitute the ordering according to input expression type, this is a simplified version - /// In this case, we just substitute when the expression satisfy the following confition + /// In this case, we just substitute when the expression satisfy the following condition: /// I. just have one column and is a CAST expression - /// II. just have one parameter and is a ScalarFUnctionexpression and it is monotonic - /// TODO: we could precompute all the senario that is computable, for example: atan(x + 1000) should also be substituted if - /// x is DESC or ASC + /// TODO: Add one-to-ones analysis for monotonic ScalarFunctions. + /// TODO: we could precompute all the scenario that is computable, for example: atan(x + 1000) should also be substituted if + /// x is DESC or ASC + /// After substitution, we may generate more than 1 `LexOrdering`. As an example, + /// `[a ASC, b ASC]` will turn into `[a ASC, b ASC], [CAST(a) ASC, b ASC]` when projection expressions `a, b, CAST(a)` is applied. pub fn substitute_ordering_component( - matching_exprs: Arc>>, + &self, + mapping: &ProjectionMapping, sort_expr: &[PhysicalSortExpr], - schema: SchemaRef, - ) -> Vec { - sort_expr + ) -> Result>> { + let new_orderings = sort_expr .iter() - .filter(|sort_expr| { - matching_exprs.iter().any(|matched| !matched.eq(*sort_expr)) - }) .map(|sort_expr| { - let referring_exprs: Vec<_> = matching_exprs + let referring_exprs: Vec<_> = mapping .iter() - .filter(|matched| expr_refers(matched, &sort_expr.expr)) + .map(|(source, _target)| source) + .filter(|source| expr_refers(source, &sort_expr.expr)) .cloned() .collect(); - // does not referring to any matching component, we just skip it - - if referring_exprs.len() == 1 { + let mut res = vec![sort_expr.clone()]; + // TODO: Add one-to-ones analysis for ScalarFunctions. + for r_expr in referring_exprs { // we check whether this expression is substitutable or not - let r_expr = referring_exprs[0].clone(); if let Some(cast_expr) = r_expr.as_any().downcast_ref::() { // we need to know whether the Cast Expr matches or not - let expr_type = - sort_expr.expr.data_type(schema.as_ref()).unwrap(); + let expr_type = sort_expr.expr.data_type(&self.schema)?; if cast_expr.expr.eq(&sort_expr.expr) && cast_expr.is_bigger_cast(expr_type) { - PhysicalSortExpr { + res.push(PhysicalSortExpr { expr: r_expr.clone(), options: sort_expr.options, - } - } else { - sort_expr.clone() + }); } - } else { - sort_expr.clone() } - } else { - sort_expr.clone() } + Ok(res) }) - .collect() + .collect::>>()?; + // Generate all valid orderings, given substituted expressions. + let res = new_orderings + .into_iter() + .multi_cartesian_product() + .collect::>(); + Ok(res) } + /// In projection, supposed we have a input function 'A DESC B DESC' and the output shares the same expression /// with A and B, we could surely use the ordering of the original ordering, However, if the A has been changed, /// for example, A-> Cast(A, Int64) or any other form, it is invalid if we continue using the original ordering /// Since it would cause bug in dependency constructions, we should substitute the input order in order to get correct /// dependency map, happen in issue 8838: - pub fn substitute_oeq_class( - &mut self, - exprs: &[(Arc, String)], - mapping: &ProjectionMapping, - schema: SchemaRef, - ) { - let matching_exprs: Arc> = Arc::new( - exprs - .iter() - .filter(|(expr, _)| mapping.iter().any(|(source, _)| source.eq(expr))) - .map(|(source, _)| source) - .collect(), - ); - let orderings = std::mem::take(&mut self.oeq_class.orderings); + pub fn substitute_oeq_class(&mut self, mapping: &ProjectionMapping) -> Result<()> { + let orderings = &self.oeq_class.orderings; let new_order = orderings - .into_iter() - .map(move |order| { - Self::substitute_ordering_component( - matching_exprs.clone(), - &order, - schema.clone(), - ) - }) - .collect(); + .iter() + .map(|order| self.substitute_ordering_component(mapping, order)) + .collect::>>()?; + let new_order = new_order.into_iter().flatten().collect(); self.oeq_class = OrderingEquivalenceClass::new(new_order); + Ok(()) } /// Projects argument `expr` according to `projection_mapping`, taking /// equivalences into account. @@ -559,7 +542,7 @@ impl EquivalenceProperties { /// c ASC: Node {None, HashSet{a ASC}} /// ``` fn construct_dependency_map(&self, mapping: &ProjectionMapping) -> DependencyMap { - let mut dependency_map = HashMap::new(); + let mut dependency_map = IndexMap::new(); for ordering in self.normalized_oeq_class().iter() { for (idx, sort_expr) in ordering.iter().enumerate() { let target_sort_expr = @@ -585,7 +568,7 @@ impl EquivalenceProperties { .entry(sort_expr.clone()) .or_insert_with(|| DependencyNode { target_sort_expr: target_sort_expr.clone(), - dependencies: HashSet::new(), + dependencies: IndexSet::new(), }) .insert_dependency(dependency); } @@ -977,7 +960,7 @@ fn referred_dependencies( source: &Arc, ) -> Vec { // Associate `PhysicalExpr`s with `PhysicalSortExpr`s that contain them: - let mut expr_to_sort_exprs = HashMap::::new(); + let mut expr_to_sort_exprs = IndexMap::::new(); for sort_expr in dependency_map .keys() .filter(|sort_expr| expr_refers(source, &sort_expr.expr)) @@ -1135,8 +1118,13 @@ impl DependencyNode { } } -type DependencyMap = HashMap; -type Dependencies = HashSet; +// Using `IndexMap` and `IndexSet` makes sure to generate consistent results across different executions for the same query. +// We could have used `HashSet`, `HashMap` in place of them without any loss of functionality. +// As an example, if existing orderings are `[a ASC, b ASC]`, `[c ASC]` for output ordering +// both `[a ASC, b ASC, c ASC]` and `[c ASC, a ASC, b ASC]` are valid (e.g. concatenated version of the alternative orderings). +// When using `HashSet`, `HashMap` it is not guaranteed to generate consistent result, among the possible 2 results in the example above. +type DependencyMap = IndexMap; +type Dependencies = IndexSet; /// This function recursively analyzes the dependencies of the given sort /// expression within the given dependency map to construct lexicographical diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 51423d37e77c..28b9ff7b011d 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -96,7 +96,7 @@ impl ProjectionExec { let mut input_eqs = input.equivalence_properties(); - input_eqs.substitute_oeq_class(&expr, &projection_mapping, input_schema.clone()); + input_eqs.substitute_oeq_class(&projection_mapping)?; let project_eqs = input_eqs.project(&projection_mapping, schema.clone()); let output_ordering = project_eqs.oeq_class().output_ordering(); @@ -204,11 +204,9 @@ impl ExecutionPlan for ProjectionExec { fn equivalence_properties(&self) -> EquivalenceProperties { let mut equi_properties = self.input.equivalence_properties(); - equi_properties.substitute_oeq_class( - &self.expr, - &self.projection_mapping, - self.input.schema().clone(), - ); + equi_properties + .substitute_oeq_class(&self.projection_mapping) + .unwrap(); equi_properties.project(&self.projection_mapping, self.schema()) } diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 57b810673358..48e79621d850 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -15,72 +15,160 @@ # specific language governing permissions and limitations # under the License. -# prepare the table +# Create a source where there is multiple orderings. statement ok -CREATE EXTERNAL TABLE delta_encoding_required_column ( - c_customer_sk INT NOT NULL, - c_current_cdemo_sk INT NOT NULL +CREATE EXTERNAL TABLE multiple_ordered_table ( + a0 INTEGER, + a INTEGER, + b INTEGER, + c INTEGER, + d INTEGER ) STORED AS CSV -WITH ORDER ( - c_customer_sk DESC, - c_current_cdemo_sk DESC -) -LOCATION '../../testing/data/csv/aggregate_test_100.csv'; +WITH HEADER ROW +WITH ORDER (a ASC, b ASC) +WITH ORDER (c ASC) +LOCATION '../core/tests/data/window_2.csv'; -# test for substitute CAST senario +# test for substitute CAST scenario query TT EXPLAIN SELECT - CAST(c_customer_sk AS BIGINT) AS c_customer_sk_big, - c_current_cdemo_sk -FROM delta_encoding_required_column -ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC; + CAST(a AS BIGINT) AS a_big, + b +FROM multiple_ordered_table +ORDER BY a_big ASC, b ASC; ---- logical_plan -Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST ---Projection: CAST(delta_encoding_required_column.c_customer_sk AS Int64) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk -----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk] +Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Projection: CAST(multiple_ordered_table.a AS Int64) AS a_big, multiple_ordered_table.b +----TableScan: multiple_ordered_table projection=[a, b] physical_plan -SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC] ---ProjectionExec: expr=[CAST(c_customer_sk@0 AS Int64) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk] +SortPreservingMergeExec: [a_big@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +--ProjectionExec: expr=[CAST(a@0 AS Int64) as a_big, b@1 as b] ----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true -# test for commom rename query TT EXPLAIN -SELECT - c_customer_sk AS c_customer_sk_big, - c_current_cdemo_sk -FROM delta_encoding_required_column -ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC; +SELECT a, CAST(a AS BIGINT) AS a_big, b +FROM multiple_ordered_table +ORDER BY a ASC, b ASC; +---- +logical_plan +Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Projection: multiple_ordered_table.a, CAST(multiple_ordered_table.a AS Int64) AS a_big, multiple_ordered_table.b +----TableScan: multiple_ordered_table projection=[a, b] +physical_plan +SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@2 ASC NULLS LAST] +--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true + +# Cast to larger types as well as preserving ordering +# doesn't invalidate lexicographical ordering. +# Hence '[CAST(a AS BIGINT) AS a_big ASC, b ASC]' +# is valid for the given ordering: '[a ASC, b ASC]'. +# See discussion for rationale: https://github.com/apache/arrow-datafusion/issues/8838#issue-2077714891 +query TT +EXPLAIN +SELECT a, CAST(a AS BIGINT) AS a_big, b +FROM multiple_ordered_table +ORDER BY a_big ASC, b ASC; ---- logical_plan -Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST ---Projection: delta_encoding_required_column.c_customer_sk AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk -----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk] +Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Projection: multiple_ordered_table.a, CAST(multiple_ordered_table.a AS Int64) AS a_big, multiple_ordered_table.b +----TableScan: multiple_ordered_table projection=[a, b] physical_plan -ProjectionExec: expr=[c_customer_sk@0 as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk] ---CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false +SortPreservingMergeExec: [a_big@1 ASC NULLS LAST,b@2 ASC NULLS LAST] +--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] +----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true + +# test for common rename +query TT +EXPLAIN +SELECT a, a AS a_big, b +FROM multiple_ordered_table +ORDER BY a_big ASC, b ASC; +---- +logical_plan +Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b +----TableScan: multiple_ordered_table projection=[a, b] +physical_plan +ProjectionExec: expr=[a@0 as a, a@0 as a_big, b@1 as b] +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true + +query TT +EXPLAIN +SELECT a, a AS a_big, b +FROM multiple_ordered_table +ORDER BY a ASC, b ASC; +---- +logical_plan +Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b +----TableScan: multiple_ordered_table projection=[a, b] +physical_plan +ProjectionExec: expr=[a@0 as a, a@0 as a_big, b@1 as b] +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true # test for cast Utf8 +# (must actually sort as the sort order for a number cast to utf8 is different than for int) +# See discussion: https://github.com/apache/arrow-datafusion/pull/9127#discussion_r1492336709 query TT EXPLAIN SELECT - CAST(c_customer_sk AS STRING) AS c_customer_sk_big, - c_current_cdemo_sk -FROM delta_encoding_required_column -ORDER BY c_customer_sk_big DESC, c_current_cdemo_sk DESC; + CAST(a AS STRING) AS a_str, + b +FROM multiple_ordered_table +ORDER BY a_str ASC, b ASC; +---- +logical_plan +Sort: a_str ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST +--Projection: CAST(multiple_ordered_table.a AS Utf8) AS a_str, multiple_ordered_table.b +----TableScan: multiple_ordered_table projection=[a, b] +physical_plan +SortPreservingMergeExec: [a_str@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +--SortExec: expr=[a_str@0 ASC NULLS LAST,b@1 ASC NULLS LAST] +----ProjectionExec: expr=[CAST(a@0 AS Utf8) as a_str, b@1 as b] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true + +# We cannot determine a+b is ordered from the +# invariant [a ASC, b ASC] is satisfied. Hence +# we should see a SortExec with a+b ASC in the plan. +query TT +EXPLAIN +SELECT a, b +FROM multiple_ordered_table +ORDER BY a + b ASC; +---- +logical_plan +Sort: multiple_ordered_table.a + multiple_ordered_table.b ASC NULLS LAST +--TableScan: multiple_ordered_table projection=[a, b] +physical_plan +SortExec: expr=[a@0 + b@1 ASC NULLS LAST] +--CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true + +# With similar reasoning above. It is not guaranteed sum_expr is ordered +# Hence we should see a SortExec with sum_expr ASC in the plan. +query TT +EXPLAIN +SELECT CAST(a+b AS BIGINT) sum_expr, a, b +FROM multiple_ordered_table +ORDER BY sum_expr ASC; ---- logical_plan -Sort: c_customer_sk_big DESC NULLS FIRST, delta_encoding_required_column.c_current_cdemo_sk DESC NULLS FIRST ---Projection: CAST(delta_encoding_required_column.c_customer_sk AS Utf8) AS c_customer_sk_big, delta_encoding_required_column.c_current_cdemo_sk -----TableScan: delta_encoding_required_column projection=[c_customer_sk, c_current_cdemo_sk] +Sort: sum_expr ASC NULLS LAST +--Projection: CAST(multiple_ordered_table.a + multiple_ordered_table.b AS Int64) AS sum_expr, multiple_ordered_table.a, multiple_ordered_table.b +----TableScan: multiple_ordered_table projection=[a, b] physical_plan -SortPreservingMergeExec: [c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC] ---SortExec: expr=[c_customer_sk_big@0 DESC,c_current_cdemo_sk@1 DESC] -----ProjectionExec: expr=[CAST(c_customer_sk@0 AS Utf8) as c_customer_sk_big, c_current_cdemo_sk@1 as c_current_cdemo_sk] +SortPreservingMergeExec: [sum_expr@0 ASC NULLS LAST] +--SortExec: expr=[sum_expr@0 ASC NULLS LAST] +----ProjectionExec: expr=[CAST(a@0 + b@1 AS Int64) as sum_expr, a@0 as a, b@1 as b] ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 ---------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c_customer_sk, c_current_cdemo_sk], output_ordering=[c_customer_sk@0 DESC, c_current_cdemo_sk@1 DESC], has_header=false +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], has_header=true diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 5a610c16bc7f..9276f6e1e325 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -3257,13 +3257,13 @@ ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_da --------ProjectionExec: expr=[CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a@0 as CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, a@1 as a, d@4 as d, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@6 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@7 as SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW] ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] ------------CoalesceBatchesExec: target_batch_size=4096 ---------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST +--------------RepartitionExec: partitioning=Hash([b@2, a@1], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a@0 ASC NULLS LAST ----------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[PartiallySorted([0])] ------------------CoalesceBatchesExec: target_batch_size=4096 ---------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), input_partitions=2, preserve_order=true, sort_exprs=CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST +--------------------RepartitionExec: partitioning=Hash([a@1, d@4], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a@0 ASC NULLS LAST ----------------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] ------------------------CoalesceBatchesExec: target_batch_size=4096 ---------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 2), input_partitions=2, preserve_order=true, sort_exprs=CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a@0 ASC NULLS LAST,a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST +--------------------------RepartitionExec: partitioning=Hash([a@1, b@2], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST,CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a@0 ASC NULLS LAST ----------------------------ProjectionExec: expr=[CAST(a@0 AS Int64) as CAST(annotated_data_infinite2.a AS Int64)annotated_data_infinite2.a, a@0 as a, b@1 as b, c@2 as c, d@3 as d] ------------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]