From 7553b3b5edeb45dfda891b3646e3b7ea3e07e790 Mon Sep 17 00:00:00 2001 From: irenjj Date: Tue, 26 Nov 2024 00:38:00 +0800 Subject: [PATCH] Rename `BuiltInWindow*` to `StandardWindow*` (#13536) * Rename `BuiltInWindow` to `UDFWindow` * fmt * add * fix issues --- datafusion/physical-expr/src/window/mod.rs | 14 +++++++--- .../src/window/{built_in.rs => standard.rs} | 26 +++++++++---------- ...pr.rs => standard_window_function_expr.rs} | 8 +++--- .../src/windows/bounded_window_agg_exec.rs | 8 +++--- datafusion/physical-plan/src/windows/mod.rs | 21 +++++++-------- .../proto/src/physical_plan/to_proto.rs | 8 +++--- .../tests/cases/roundtrip_physical_plan.rs | 8 +++--- 7 files changed, 49 insertions(+), 44 deletions(-) rename datafusion/physical-expr/src/window/{built_in.rs => standard.rs} (93%) rename datafusion/physical-expr/src/window/{built_in_window_function_expr.rs => standard_window_function_expr.rs} (92%) diff --git a/datafusion/physical-expr/src/window/mod.rs b/datafusion/physical-expr/src/window/mod.rs index e7a318b860fd..bc7c716783bd 100644 --- a/datafusion/physical-expr/src/window/mod.rs +++ b/datafusion/physical-expr/src/window/mod.rs @@ -16,15 +16,21 @@ // under the License. mod aggregate; -mod built_in; -mod built_in_window_function_expr; mod sliding_aggregate; +mod standard; +mod standard_window_function_expr; mod window_expr; +#[deprecated(since = "44.0.0", note = "use StandardWindowExpr")] +pub type BuiltInWindowExpr = StandardWindowExpr; + +#[deprecated(since = "44.0.0", note = "use StandardWindowFunctionExpr")] +pub type BuiltInWindowFunctionExpr = dyn StandardWindowFunctionExpr; + pub use aggregate::PlainAggregateWindowExpr; -pub use built_in::BuiltInWindowExpr; -pub use built_in_window_function_expr::BuiltInWindowFunctionExpr; pub use sliding_aggregate::SlidingAggregateWindowExpr; +pub use standard::StandardWindowExpr; +pub use standard_window_function_expr::StandardWindowFunctionExpr; pub use window_expr::PartitionBatches; pub use window_expr::PartitionKey; pub use window_expr::PartitionWindowAggStates; diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/standard.rs similarity index 93% rename from datafusion/physical-expr/src/window/built_in.rs rename to datafusion/physical-expr/src/window/standard.rs index 0f6c3f921892..82e48a5f9338 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/standard.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! Physical exec for built-in window function expressions. +//! Physical exec for standard window function expressions. use std::any::Any; use std::ops::Range; use std::sync::Arc; -use super::{BuiltInWindowFunctionExpr, WindowExpr}; +use super::{StandardWindowFunctionExpr, WindowExpr}; use crate::window::window_expr::{get_orderby_values, WindowFn}; use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState}; use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr}; @@ -35,19 +35,19 @@ use datafusion_expr::window_state::{WindowAggState, WindowFrameContext}; use datafusion_expr::WindowFrame; use datafusion_physical_expr_common::sort_expr::LexOrdering; -/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`]. +/// A window expr that takes the form of a [`StandardWindowFunctionExpr`]. #[derive(Debug)] -pub struct BuiltInWindowExpr { - expr: Arc, +pub struct StandardWindowExpr { + expr: Arc, partition_by: Vec>, order_by: LexOrdering, window_frame: Arc, } -impl BuiltInWindowExpr { - /// create a new built-in window function expression +impl StandardWindowExpr { + /// create a new standard window function expression pub fn new( - expr: Arc, + expr: Arc, partition_by: &[Arc], order_by: &LexOrdering, window_frame: Arc, @@ -60,8 +60,8 @@ impl BuiltInWindowExpr { } } - /// Get BuiltInWindowFunction expr of BuiltInWindowExpr - pub fn get_built_in_func_expr(&self) -> &Arc { + /// Get StandardWindowFunction expr of StandardWindowExpr + pub fn get_standard_func_expr(&self) -> &Arc { &self.expr } @@ -79,7 +79,7 @@ impl BuiltInWindowExpr { eq_properties .add_new_orderings([LexOrdering::new(vec![fn_res_ordering])]); } else { - // If we have a PARTITION BY, built-in functions can not introduce + // If we have a PARTITION BY, standard functions can not introduce // a global ordering unless the existing ordering is compatible // with PARTITION BY expressions. To elaborate, when PARTITION BY // expressions and existing ordering expressions are equal (w.r.t. @@ -96,7 +96,7 @@ impl BuiltInWindowExpr { } } -impl WindowExpr for BuiltInWindowExpr { +impl WindowExpr for StandardWindowExpr { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self @@ -264,7 +264,7 @@ impl WindowExpr for BuiltInWindowExpr { fn get_reverse_expr(&self) -> Option> { self.expr.reverse_expr().map(|reverse_expr| { - Arc::new(BuiltInWindowExpr::new( + Arc::new(StandardWindowExpr::new( reverse_expr, &self.partition_by.clone(), reverse_order_bys(self.order_by.as_ref()).as_ref(), diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/standard_window_function_expr.rs similarity index 92% rename from datafusion/physical-expr/src/window/built_in_window_function_expr.rs rename to datafusion/physical-expr/src/window/standard_window_function_expr.rs index 7aa4f6536a6e..d308812a0e35 100644 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/standard_window_function_expr.rs @@ -36,7 +36,7 @@ use std::sync::Arc; /// but others such as `first_value`, `last_value`, and /// `nth_value` need the value. #[allow(rustdoc::private_intra_doc_links)] -pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { +pub trait StandardWindowFunctionExpr: Send + Sync + std::fmt::Debug { /// Returns the aggregate expression as [`Any`] so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; @@ -50,7 +50,7 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default /// implementation returns placeholder text. fn name(&self) -> &str { - "BuiltInWindowFunctionExpr: default name" + "StandardWindowFunctionExpr: default name" } /// Evaluate window function's arguments against the input window @@ -71,7 +71,7 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { /// a particular partition. fn create_evaluator(&self) -> Result>; - /// Construct a new [`BuiltInWindowFunctionExpr`] that produces + /// Construct a new [`StandardWindowFunctionExpr`] that produces /// the same result as this function on a window with reverse /// order. The return value of this function is used by the /// DataFusion optimizer to avoid re-sorting the data when @@ -80,7 +80,7 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { /// Returns `None` (the default) if no reverse is known (or possible). /// /// For example, the reverse of `lead(10)` is `lag(10)`. - fn reverse_expr(&self) -> Option> { + fn reverse_expr(&self) -> Option> { None } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 50666f6cc368..398b5eb292d7 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1189,7 +1189,7 @@ mod tests { use crate::common::collect; use crate::memory::MemoryExec; - use datafusion_physical_expr::window::BuiltInWindowExpr; + use datafusion_physical_expr::window::StandardWindowExpr; use futures::future::Shared; use futures::{pin_mut, ready, FutureExt, Stream, StreamExt}; use itertools::Itertools; @@ -1562,7 +1562,7 @@ mod tests { let window_exprs = vec![ // LAST_VALUE(a) - Arc::new(BuiltInWindowExpr::new( + Arc::new(StandardWindowExpr::new( last_value_func, &[], &LexOrdering::default(), @@ -1573,7 +1573,7 @@ mod tests { )), )) as _, // NTH_VALUE(a, -1) - Arc::new(BuiltInWindowExpr::new( + Arc::new(StandardWindowExpr::new( nth_value_func1, &[], &LexOrdering::default(), @@ -1584,7 +1584,7 @@ mod tests { )), )) as _, // NTH_VALUE(a, -2) - Arc::new(BuiltInWindowExpr::new( + Arc::new(StandardWindowExpr::new( nth_value_func2, &[], &LexOrdering::default(), diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 32173c3ef17d..222a8bb71a02 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -35,7 +35,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio use datafusion_physical_expr::equivalence::collapse_lex_req; use datafusion_physical_expr::{ reverse_order_bys, - window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr}, + window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr}, ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement, }; use itertools::Itertools; @@ -50,7 +50,7 @@ use datafusion_functions_window_common::field::WindowUDFFieldArgs; use datafusion_functions_window_common::partition::PartitionEvaluatorArgs; use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::{ - BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, + PlainAggregateWindowExpr, StandardWindowExpr, WindowExpr, }; use datafusion_physical_expr_common::sort_expr::LexRequirement; pub use window_agg_exec::WindowAggExec; @@ -117,7 +117,7 @@ pub fn create_window_expr( aggregate, ) } - WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new( + WindowFunctionDefinition::WindowUDF(fun) => Arc::new(StandardWindowExpr::new( create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?, partition_by, order_by, @@ -153,14 +153,14 @@ fn window_expr_from_aggregate_expr( } } -/// Creates a `BuiltInWindowFunctionExpr` suitable for a user defined window function +/// Creates a `StandardWindowFunctionExpr` suitable for a user defined window function pub fn create_udwf_window_expr( fun: &Arc, args: &[Arc], input_schema: &Schema, name: String, ignore_nulls: bool, -) -> Result> { +) -> Result> { // need to get the types into an owned vec for some reason let input_types: Vec<_> = args .iter() @@ -192,7 +192,7 @@ pub fn create_udwf_window_expr( Ok(udwf_expr) } -/// Implements [`BuiltInWindowFunctionExpr`] for [`WindowUDF`] +/// Implements [`StandardWindowFunctionExpr`] for [`WindowUDF`] #[derive(Clone, Debug)] pub struct WindowUDFExpr { fun: Arc, @@ -215,7 +215,7 @@ impl WindowUDFExpr { } } -impl BuiltInWindowFunctionExpr for WindowUDFExpr { +impl StandardWindowFunctionExpr for WindowUDFExpr { fn as_any(&self) -> &dyn std::any::Any { self } @@ -244,7 +244,7 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr { &self.name } - fn reverse_expr(&self) -> Option> { + fn reverse_expr(&self) -> Option> { match self.fun.reverse_expr() { ReversedUDWF::Identical => Some(Arc::new(self.clone())), ReversedUDWF::NotSupported => None, @@ -345,10 +345,9 @@ pub(crate) fn window_equivalence_properties( .extend(input.equivalence_properties().clone()); for expr in window_expr { - if let Some(builtin_window_expr) = - expr.as_any().downcast_ref::() + if let Some(udf_window_expr) = expr.as_any().downcast_ref::() { - builtin_window_expr.add_equal_orderings(&mut window_eq_properties); + udf_window_expr.add_equal_orderings(&mut window_eq_properties); } } window_eq_properties diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 7d9a524af828..c1bbb3ad26ce 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -19,7 +19,7 @@ use std::sync::Arc; #[cfg(feature = "parquet")] use datafusion::datasource::file_format::parquet::ParquetSink; -use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr}; +use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, @@ -120,9 +120,9 @@ pub fn serialize_physical_window_expr( window_frame, codec, )? - } else if let Some(built_in_window_expr) = expr.downcast_ref::() { - if let Some(expr) = built_in_window_expr - .get_built_in_func_expr() + } else if let Some(udf_window_expr) = expr.downcast_ref::() { + if let Some(expr) = udf_window_expr + .get_standard_func_expr() .as_any() .downcast_ref::() { diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index efa462aa7a85..3311b57f5d6b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -54,7 +54,7 @@ use datafusion::functions_window::nth_value::nth_value_udwf; use datafusion::functions_window::row_number::row_number_udwf; use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility}; use datafusion::physical_expr::expressions::Literal; -use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr}; +use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; use datafusion::physical_expr::{ LexOrdering, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr, }; @@ -279,7 +279,7 @@ fn roundtrip_udwf() -> Result<()> { let field_b = Field::new("b", DataType::Int64, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); - let udwf_expr = Arc::new(BuiltInWindowExpr::new( + let udwf_expr = Arc::new(StandardWindowExpr::new( create_udwf_window_expr( &row_number_udwf(), &[], @@ -326,7 +326,7 @@ fn roundtrip_window() -> Result<()> { "NTH_VALUE(a, 2) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".to_string(), false, )?; - let udwf_expr = Arc::new(BuiltInWindowExpr::new( + let udwf_expr = Arc::new(StandardWindowExpr::new( nth_value_window, &[col("b", &schema)?], &LexOrdering { @@ -1125,7 +1125,7 @@ fn roundtrip_udwf_extension_codec() -> Result<()> { WindowFrameBound::CurrentRow, ); - let udwf_expr = Arc::new(BuiltInWindowExpr::new( + let udwf_expr = Arc::new(StandardWindowExpr::new( udwf, &[col("b", &schema)?], &LexOrdering {