From 3b90e3e5479d4354992cbe465d50e56c033d8fc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Sat, 24 Aug 2024 08:45:40 +0800 Subject: [PATCH] Use `LexRequirement` alias as much as possible (#12130) --- datafusion-examples/examples/custom_file_format.rs | 5 +++-- datafusion/core/src/datasource/file_format/arrow.rs | 5 +++-- datafusion/core/src/datasource/file_format/csv.rs | 5 +++-- datafusion/core/src/datasource/file_format/json.rs | 5 +++-- datafusion/core/src/datasource/file_format/mod.rs | 6 ++++-- .../core/src/datasource/file_format/parquet.rs | 5 +++-- .../src/physical_optimizer/enforce_distribution.rs | 3 ++- .../core/src/physical_optimizer/sort_pushdown.rs | 11 ++++++----- .../core/src/physical_optimizer/test_utils.rs | 6 ++++-- datafusion/physical-expr-common/src/sort_expr.rs | 2 +- datafusion/physical-expr/src/equivalence/mod.rs | 2 +- .../physical-optimizer/src/output_requirements.rs | 2 +- datafusion/physical-plan/src/execution_plan.rs | 7 +++---- datafusion/physical-plan/src/insert.rs | 13 ++++++------- .../physical-plan/src/joins/sort_merge_join.rs | 3 ++- .../physical-plan/src/joins/symmetric_hash_join.rs | 3 ++- .../src/sorts/sort_preserving_merge.rs | 3 ++- .../src/windows/bounded_window_agg_exec.rs | 5 +++-- datafusion/physical-plan/src/windows/mod.rs | 7 ++++--- .../physical-plan/src/windows/window_agg_exec.rs | 4 ++-- 20 files changed, 58 insertions(+), 44 deletions(-) diff --git a/datafusion-examples/examples/custom_file_format.rs b/datafusion-examples/examples/custom_file_format.rs index 8612a1cc4430..1d9b587f15b9 100644 --- a/datafusion-examples/examples/custom_file_format.rs +++ b/datafusion-examples/examples/custom_file_format.rs @@ -23,6 +23,7 @@ use arrow::{ }; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::physical_expr::LexRequirement; use datafusion::{ datasource::{ file_format::{ @@ -38,7 +39,7 @@ use datafusion::{ prelude::SessionContext, }; use datafusion_common::{GetExt, Statistics}; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExpr; use object_store::{ObjectMeta, ObjectStore}; use tempfile::tempdir; @@ -123,7 +124,7 @@ impl FileFormat for TSVFileFormat { input: Arc, state: &SessionState, conf: FileSinkConfig, - order_requirements: Option>, + order_requirements: Option, ) -> Result> { self.csv_file_format .create_writer_physical_plan(input, state, conf, order_requirements) diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 95f76195e63d..6ee4280956e8 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -47,12 +47,13 @@ use datafusion_common::{ not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION, }; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::insert::{DataSink, DataSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::Bytes; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::stream::BoxStream; use futures::StreamExt; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; @@ -178,7 +179,7 @@ impl FileFormat for ArrowFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, - order_requirements: Option>, + order_requirements: Option, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Arrow format"); diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 24d55ea54068..d1ce2afcccf3 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -46,11 +46,12 @@ use datafusion_common::{ exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION, }; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; use bytes::{Buf, Bytes}; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; @@ -371,7 +372,7 @@ impl FileFormat for CsvFormat { input: Arc, state: &SessionState, conf: FileSinkConfig, - order_requirements: Option>, + order_requirements: Option, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for CSV"); diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 7c579e890c8c..4471d7d6cb31 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -46,12 +46,13 @@ use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions}; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::ExecutionPlan; use async_trait::async_trait; use bytes::{Buf, Bytes}; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -249,7 +250,7 @@ impl FileFormat for JsonFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, - order_requirements: Option>, + order_requirements: Option, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Json"); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index a324a4578424..d21464b74b53 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -45,12 +45,14 @@ use crate::physical_plan::{ExecutionPlan, Statistics}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{internal_err, not_impl_err, GetExt}; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use file_compression_type::FileCompressionType; use object_store::{ObjectMeta, ObjectStore}; use std::fmt::Debug; + /// Factory for creating [`FileFormat`] instances based on session and command level options /// /// Users can provide their own `FileFormatFactory` to support arbitrary file formats @@ -132,7 +134,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { _input: Arc, _state: &SessionState, _conf: FileSinkConfig, - _order_requirements: Option>, + _order_requirements: Option, ) -> Result> { not_impl_err!("Writer not implemented for this format") } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 83f77ca9371a..23e765f0f2cd 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -51,7 +51,7 @@ use datafusion_common_runtime::SpawnedTask; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::MetricsSet; use async_trait::async_trait; @@ -76,6 +76,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; use crate::datasource::physical_plan::parquet::ParquetExecBuilder; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; @@ -376,7 +377,7 @@ impl FileFormat for ParquetFormat { input: Arc, _state: &SessionState, conf: FileSinkConfig, - order_requirements: Option>, + order_requirements: Option, ) -> Result> { if conf.overwrite { return not_impl_err!("Overwrites are not implemented yet for Parquet"); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 2ee5624c83dd..77ddd098c7be 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1419,6 +1419,7 @@ pub(crate) mod tests { expressions, expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, }; + use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_plan::PlanProperties; /// Models operators like BoundedWindowExec that require an input @@ -1489,7 +1490,7 @@ pub(crate) mod tests { } // model that it requires the output ordering of its input - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { if self.expr.is_empty() { vec![None] } else { diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 17d63a06a6f8..9ab6802d18f1 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -36,6 +36,7 @@ use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{ LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement, }; +use datafusion_physical_expr_common::sort_expr::LexRequirement; /// This is a "data class" we use within the [`EnforceSorting`] rule to push /// down [`SortExec`] in the plan. In some cases, we can reduce the total @@ -46,7 +47,7 @@ use datafusion_physical_expr::{ /// [`EnforceSorting`]: crate::physical_optimizer::enforce_sorting::EnforceSorting #[derive(Default, Clone)] pub struct ParentRequirements { - ordering_requirement: Option>, + ordering_requirement: Option, fetch: Option, } @@ -159,7 +160,7 @@ fn pushdown_sorts_helper( fn pushdown_requirement_to_children( plan: &Arc, parent_required: LexRequirementRef, -) -> Result>>>> { +) -> Result>>> { let maintains_input_order = plan.maintains_input_order(); if is_window(plan) { let required_input_ordering = plan.required_input_ordering(); @@ -345,7 +346,7 @@ fn try_pushdown_requirements_to_join( parent_required: LexRequirementRef, sort_expr: Vec, push_side: JoinSide, -) -> Result>>>> { +) -> Result>>> { let left_eq_properties = smj.left().equivalence_properties(); let right_eq_properties = smj.right().equivalence_properties(); let mut smj_required_orderings = smj.required_input_ordering(); @@ -460,7 +461,7 @@ fn expr_source_side( fn shift_right_required( parent_required: LexRequirementRef, left_columns_len: usize, -) -> Result> { +) -> Result { let new_right_required = parent_required .iter() .filter_map(|r| { @@ -486,7 +487,7 @@ enum RequirementsCompatibility { /// Requirements satisfy Satisfy, /// Requirements compatible - Compatible(Option>), + Compatible(Option), /// Requirements not compatible NonCompatible, } diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs index 90853c347672..98f1a7c21a39 100644 --- a/datafusion/core/src/physical_optimizer/test_utils.rs +++ b/datafusion/core/src/physical_optimizer/test_utils.rs @@ -56,7 +56,9 @@ use datafusion_physical_plan::{ use async_trait::async_trait; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr_common::sort_expr::PhysicalSortRequirement; +use datafusion_physical_expr_common::sort_expr::{ + LexRequirement, PhysicalSortRequirement, +}; async fn register_current_csv( ctx: &SessionContext, @@ -416,7 +418,7 @@ impl ExecutionPlan for RequirementsTestExec { self.input.properties() } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { let requirement = PhysicalSortRequirement::from_sort_exprs(&self.required_input_ordering); vec![Some(requirement)] diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index 9dc54d2eb2d0..745ec543c31a 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -221,7 +221,7 @@ impl PhysicalSortRequirement { /// [`ExecutionPlan::required_input_ordering`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering pub fn from_sort_exprs<'a>( ordering: impl IntoIterator, - ) -> Vec { + ) -> LexRequirement { ordering .into_iter() .cloned() diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index b9228282b081..d862eda5018e 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -239,7 +239,7 @@ mod tests { // Convert each tuple to PhysicalSortRequirement pub fn convert_to_sort_reqs( in_data: &[(&Arc, Option)], - ) -> Vec { + ) -> LexRequirement { in_data .iter() .map(|(expr, options)| { diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index fdfdd349e36e..4f6f91a2348f 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -165,7 +165,7 @@ impl ExecutionPlan for OutputRequirementExec { vec![&self.input] } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { vec![self.order_requirement.clone()] } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index a6a15e46860c..e1182719293d 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -36,9 +36,8 @@ pub use datafusion_physical_expr::window::WindowExpr; pub use datafusion_physical_expr::{ expressions, functions, udf, AggregateExpr, Distribution, Partitioning, PhysicalExpr, }; -use datafusion_physical_expr::{ - EquivalenceProperties, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, -}; +use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr}; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; @@ -125,7 +124,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// NOTE that checking `!is_empty()` does **not** check for a /// required input ordering. Instead, the correct check is that at /// least one entry must be `Some` - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { vec![None; self.children().len()] } diff --git a/datafusion/physical-plan/src/insert.rs b/datafusion/physical-plan/src/insert.rs index 5cd864125e29..5dc27bc239d2 100644 --- a/datafusion/physical-plan/src/insert.rs +++ b/datafusion/physical-plan/src/insert.rs @@ -35,11 +35,10 @@ use arrow_array::{ArrayRef, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{ - Distribution, EquivalenceProperties, PhysicalSortRequirement, -}; +use datafusion_physical_expr::{Distribution, EquivalenceProperties}; use async_trait::async_trait; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::StreamExt; /// `DataSink` implements writing streams of [`RecordBatch`]es to @@ -90,7 +89,7 @@ pub struct DataSinkExec { /// Schema describing the structure of the output data. count_schema: SchemaRef, /// Optional required sort order for output data. - sort_order: Option>, + sort_order: Option, cache: PlanProperties, } @@ -106,7 +105,7 @@ impl DataSinkExec { input: Arc, sink: Arc, sink_schema: SchemaRef, - sort_order: Option>, + sort_order: Option, ) -> Self { let count_schema = make_count_schema(); let cache = Self::create_schema(&input, count_schema); @@ -131,7 +130,7 @@ impl DataSinkExec { } /// Optional sort order for output data - pub fn sort_order(&self) -> &Option> { + pub fn sort_order(&self) -> &Option { &self.sort_order } @@ -189,7 +188,7 @@ impl ExecutionPlan for DataSinkExec { vec![Distribution::SinglePartition; self.children().len()] } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { // The required input ordering is set externally (e.g. by a `ListingTable`). // Otherwise, there is no specific requirement (i.e. `sort_expr` is `None`). vec![self.sort_order.as_ref().cloned()] diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index b5994d16d2ee..56cd699bf6e7 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -51,6 +51,7 @@ use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use crate::expressions::PhysicalSortExpr; use crate::joins::utils::{ @@ -288,7 +289,7 @@ impl ExecutionPlan for SortMergeJoinExec { ] } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { vec![ Some(PhysicalSortRequirement::from_sort_exprs( &self.left_sort_exprs, diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 7dab664502e9..ac718a95e9f4 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -72,6 +72,7 @@ use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph; use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement}; use ahash::RandomState; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{ready, Stream, StreamExt}; use hashbrown::HashSet; use parking_lot::Mutex; @@ -410,7 +411,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { } } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { vec![ self.left_sort_exprs .as_ref() diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index f34701d0d713..131fa71217cc 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -35,6 +35,7 @@ use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalSortRequirement; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -187,7 +188,7 @@ impl ExecutionPlan for SortPreservingMergeExec { vec![false] } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))] } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index efb5dea1ec6e..084436ee376d 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -59,7 +59,8 @@ use datafusion_expr::ColumnarValue; use datafusion_physical_expr::window::{ PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowState, }; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::stream::Stream; use futures::{ready, StreamExt}; use hashbrown::raw::RawTable; @@ -253,7 +254,7 @@ impl ExecutionPlan for BoundedWindowAggExec { vec![&self.input] } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { let partition_bys = self.window_expr()[0].partition_by(); let order_keys = self.window_expr()[0].order_by(); if self.input_order_mode != InputOrderMode::Sorted diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 63f4ffcfaacc..d607bb79b44e 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -56,6 +56,7 @@ use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; +use datafusion_physical_expr_common::sort_expr::LexRequirement; pub use window_agg_exec::WindowAggExec; /// Build field from window function and add it into schema @@ -401,7 +402,7 @@ pub(crate) fn calc_requirements< >( partition_by_exprs: impl IntoIterator, orderby_sort_exprs: impl IntoIterator, -) -> Option> { +) -> Option { let mut sort_reqs = partition_by_exprs .into_iter() .map(|partition_by| { @@ -571,7 +572,7 @@ pub fn get_window_mode( input: &Arc, ) -> Option<(bool, InputOrderMode)> { let input_eqs = input.equivalence_properties().clone(); - let mut partition_by_reqs: Vec = vec![]; + let mut partition_by_reqs: LexRequirement = vec![]; let (_, indices) = input_eqs.find_longest_permutation(partitionby_exprs); partition_by_reqs.extend(indices.iter().map(|&idx| PhysicalSortRequirement { expr: Arc::clone(&partitionby_exprs[idx]), @@ -728,7 +729,7 @@ mod tests { orderbys.push(PhysicalSortExpr { expr, options }); } - let mut expected: Option> = None; + let mut expected: Option = None; for (col_name, reqs) in expected_params { let options = reqs.map(|(descending, nulls_first)| SortOptions { descending, diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index d2f7090fca17..afe9700ed08c 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -43,7 +43,7 @@ use datafusion_common::stats::Precision; use datafusion_common::utils::{evaluate_partition_ranges, transpose}; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::PhysicalSortRequirement; +use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{ready, Stream, StreamExt}; /// Window execution plan @@ -191,7 +191,7 @@ impl ExecutionPlan for WindowAggExec { vec![true] } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { let partition_bys = self.window_expr()[0].partition_by(); let order_keys = self.window_expr()[0].order_by(); if self.ordered_partition_by_indices.len() < partition_bys.len() {