Skip to content

Commit

Permalink
Rename BuiltInWindow* to StandardWindow* (#13536)
Browse files Browse the repository at this point in the history
* Rename `BuiltInWindow` to `UDFWindow`

* fmt

* add

* fix issues
  • Loading branch information
irenjj authored Nov 25, 2024
1 parent 2e05648 commit 7553b3b
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 44 deletions.
14 changes: 10 additions & 4 deletions datafusion/physical-expr/src/window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@
// under the License.

mod aggregate;
mod built_in;
mod built_in_window_function_expr;
mod sliding_aggregate;
mod standard;
mod standard_window_function_expr;
mod window_expr;

#[deprecated(since = "44.0.0", note = "use StandardWindowExpr")]
pub type BuiltInWindowExpr = StandardWindowExpr;

#[deprecated(since = "44.0.0", note = "use StandardWindowFunctionExpr")]
pub type BuiltInWindowFunctionExpr = dyn StandardWindowFunctionExpr;

pub use aggregate::PlainAggregateWindowExpr;
pub use built_in::BuiltInWindowExpr;
pub use built_in_window_function_expr::BuiltInWindowFunctionExpr;
pub use sliding_aggregate::SlidingAggregateWindowExpr;
pub use standard::StandardWindowExpr;
pub use standard_window_function_expr::StandardWindowFunctionExpr;
pub use window_expr::PartitionBatches;
pub use window_expr::PartitionKey;
pub use window_expr::PartitionWindowAggStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

//! Physical exec for built-in window function expressions.
//! Physical exec for standard window function expressions.
use std::any::Any;
use std::ops::Range;
use std::sync::Arc;

use super::{BuiltInWindowFunctionExpr, WindowExpr};
use super::{StandardWindowFunctionExpr, WindowExpr};
use crate::window::window_expr::{get_orderby_values, WindowFn};
use crate::window::{PartitionBatches, PartitionWindowAggStates, WindowState};
use crate::{reverse_order_bys, EquivalenceProperties, PhysicalExpr};
Expand All @@ -35,19 +35,19 @@ use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
use datafusion_expr::WindowFrame;
use datafusion_physical_expr_common::sort_expr::LexOrdering;

/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
/// A window expr that takes the form of a [`StandardWindowFunctionExpr`].
#[derive(Debug)]
pub struct BuiltInWindowExpr {
expr: Arc<dyn BuiltInWindowFunctionExpr>,
pub struct StandardWindowExpr {
expr: Arc<dyn StandardWindowFunctionExpr>,
partition_by: Vec<Arc<dyn PhysicalExpr>>,
order_by: LexOrdering,
window_frame: Arc<WindowFrame>,
}

impl BuiltInWindowExpr {
/// create a new built-in window function expression
impl StandardWindowExpr {
/// create a new standard window function expression
pub fn new(
expr: Arc<dyn BuiltInWindowFunctionExpr>,
expr: Arc<dyn StandardWindowFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
Expand All @@ -60,8 +60,8 @@ impl BuiltInWindowExpr {
}
}

/// Get BuiltInWindowFunction expr of BuiltInWindowExpr
pub fn get_built_in_func_expr(&self) -> &Arc<dyn BuiltInWindowFunctionExpr> {
/// Get StandardWindowFunction expr of StandardWindowExpr
pub fn get_standard_func_expr(&self) -> &Arc<dyn StandardWindowFunctionExpr> {
&self.expr
}

Expand All @@ -79,7 +79,7 @@ impl BuiltInWindowExpr {
eq_properties
.add_new_orderings([LexOrdering::new(vec![fn_res_ordering])]);
} else {
// If we have a PARTITION BY, built-in functions can not introduce
// If we have a PARTITION BY, standard functions can not introduce
// a global ordering unless the existing ordering is compatible
// with PARTITION BY expressions. To elaborate, when PARTITION BY
// expressions and existing ordering expressions are equal (w.r.t.
Expand All @@ -96,7 +96,7 @@ impl BuiltInWindowExpr {
}
}

impl WindowExpr for BuiltInWindowExpr {
impl WindowExpr for StandardWindowExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -264,7 +264,7 @@ impl WindowExpr for BuiltInWindowExpr {

fn get_reverse_expr(&self) -> Option<Arc<dyn WindowExpr>> {
self.expr.reverse_expr().map(|reverse_expr| {
Arc::new(BuiltInWindowExpr::new(
Arc::new(StandardWindowExpr::new(
reverse_expr,
&self.partition_by.clone(),
reverse_order_bys(self.order_by.as_ref()).as_ref(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::sync::Arc;
/// but others such as `first_value`, `last_value`, and
/// `nth_value` need the value.
#[allow(rustdoc::private_intra_doc_links)]
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
pub trait StandardWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Returns the aggregate expression as [`Any`] so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
Expand All @@ -50,7 +50,7 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
/// implementation returns placeholder text.
fn name(&self) -> &str {
"BuiltInWindowFunctionExpr: default name"
"StandardWindowFunctionExpr: default name"
}

/// Evaluate window function's arguments against the input window
Expand All @@ -71,7 +71,7 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// a particular partition.
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;

/// Construct a new [`BuiltInWindowFunctionExpr`] that produces
/// Construct a new [`StandardWindowFunctionExpr`] that produces
/// the same result as this function on a window with reverse
/// order. The return value of this function is used by the
/// DataFusion optimizer to avoid re-sorting the data when
Expand All @@ -80,7 +80,7 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
/// Returns `None` (the default) if no reverse is known (or possible).
///
/// For example, the reverse of `lead(10)` is `lag(10)`.
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
fn reverse_expr(&self) -> Option<Arc<dyn StandardWindowFunctionExpr>> {
None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ mod tests {

use crate::common::collect;
use crate::memory::MemoryExec;
use datafusion_physical_expr::window::BuiltInWindowExpr;
use datafusion_physical_expr::window::StandardWindowExpr;
use futures::future::Shared;
use futures::{pin_mut, ready, FutureExt, Stream, StreamExt};
use itertools::Itertools;
Expand Down Expand Up @@ -1562,7 +1562,7 @@ mod tests {

let window_exprs = vec![
// LAST_VALUE(a)
Arc::new(BuiltInWindowExpr::new(
Arc::new(StandardWindowExpr::new(
last_value_func,
&[],
&LexOrdering::default(),
Expand All @@ -1573,7 +1573,7 @@ mod tests {
)),
)) as _,
// NTH_VALUE(a, -1)
Arc::new(BuiltInWindowExpr::new(
Arc::new(StandardWindowExpr::new(
nth_value_func1,
&[],
&LexOrdering::default(),
Expand All @@ -1584,7 +1584,7 @@ mod tests {
)),
)) as _,
// NTH_VALUE(a, -2)
Arc::new(BuiltInWindowExpr::new(
Arc::new(StandardWindowExpr::new(
nth_value_func2,
&[],
&LexOrdering::default(),
Expand Down
21 changes: 10 additions & 11 deletions datafusion/physical-plan/src/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctio
use datafusion_physical_expr::equivalence::collapse_lex_req;
use datafusion_physical_expr::{
reverse_order_bys,
window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr},
window::{SlidingAggregateWindowExpr, StandardWindowFunctionExpr},
ConstExpr, EquivalenceProperties, LexOrdering, PhysicalSortRequirement,
};
use itertools::Itertools;
Expand All @@ -50,7 +50,7 @@ use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
PlainAggregateWindowExpr, StandardWindowExpr, WindowExpr,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
pub use window_agg_exec::WindowAggExec;
Expand Down Expand Up @@ -117,7 +117,7 @@ pub fn create_window_expr(
aggregate,
)
}
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(BuiltInWindowExpr::new(
WindowFunctionDefinition::WindowUDF(fun) => Arc::new(StandardWindowExpr::new(
create_udwf_window_expr(fun, args, input_schema, name, ignore_nulls)?,
partition_by,
order_by,
Expand Down Expand Up @@ -153,14 +153,14 @@ fn window_expr_from_aggregate_expr(
}
}

/// Creates a `BuiltInWindowFunctionExpr` suitable for a user defined window function
/// Creates a `StandardWindowFunctionExpr` suitable for a user defined window function
pub fn create_udwf_window_expr(
fun: &Arc<WindowUDF>,
args: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
name: String,
ignore_nulls: bool,
) -> Result<Arc<dyn BuiltInWindowFunctionExpr>> {
) -> Result<Arc<dyn StandardWindowFunctionExpr>> {
// need to get the types into an owned vec for some reason
let input_types: Vec<_> = args
.iter()
Expand Down Expand Up @@ -192,7 +192,7 @@ pub fn create_udwf_window_expr(
Ok(udwf_expr)
}

/// Implements [`BuiltInWindowFunctionExpr`] for [`WindowUDF`]
/// Implements [`StandardWindowFunctionExpr`] for [`WindowUDF`]
#[derive(Clone, Debug)]
pub struct WindowUDFExpr {
fun: Arc<WindowUDF>,
Expand All @@ -215,7 +215,7 @@ impl WindowUDFExpr {
}
}

impl BuiltInWindowFunctionExpr for WindowUDFExpr {
impl StandardWindowFunctionExpr for WindowUDFExpr {
fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand Down Expand Up @@ -244,7 +244,7 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
&self.name
}

fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
fn reverse_expr(&self) -> Option<Arc<dyn StandardWindowFunctionExpr>> {
match self.fun.reverse_expr() {
ReversedUDWF::Identical => Some(Arc::new(self.clone())),
ReversedUDWF::NotSupported => None,
Expand Down Expand Up @@ -345,10 +345,9 @@ pub(crate) fn window_equivalence_properties(
.extend(input.equivalence_properties().clone());

for expr in window_expr {
if let Some(builtin_window_expr) =
expr.as_any().downcast_ref::<BuiltInWindowExpr>()
if let Some(udf_window_expr) = expr.as_any().downcast_ref::<StandardWindowExpr>()
{
builtin_window_expr.add_equal_orderings(&mut window_eq_properties);
udf_window_expr.add_equal_orderings(&mut window_eq_properties);
}
}
window_eq_properties
Expand Down
8 changes: 4 additions & 4 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

#[cfg(feature = "parquet")]
use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr};
use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Expand Down Expand Up @@ -120,9 +120,9 @@ pub fn serialize_physical_window_expr(
window_frame,
codec,
)?
} else if let Some(built_in_window_expr) = expr.downcast_ref::<BuiltInWindowExpr>() {
if let Some(expr) = built_in_window_expr
.get_built_in_func_expr()
} else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
if let Some(expr) = udf_window_expr
.get_standard_func_expr()
.as_any()
.downcast_ref::<WindowUDFExpr>()
{
Expand Down
8 changes: 4 additions & 4 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use datafusion::functions_window::nth_value::nth_value_udwf;
use datafusion::functions_window::row_number::row_number_udwf;
use datafusion::logical_expr::{create_udf, JoinType, Operator, Volatility};
use datafusion::physical_expr::expressions::Literal;
use datafusion::physical_expr::window::{BuiltInWindowExpr, SlidingAggregateWindowExpr};
use datafusion::physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
use datafusion::physical_expr::{
LexOrdering, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr,
};
Expand Down Expand Up @@ -279,7 +279,7 @@ fn roundtrip_udwf() -> Result<()> {
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));

let udwf_expr = Arc::new(BuiltInWindowExpr::new(
let udwf_expr = Arc::new(StandardWindowExpr::new(
create_udwf_window_expr(
&row_number_udwf(),
&[],
Expand Down Expand Up @@ -326,7 +326,7 @@ fn roundtrip_window() -> Result<()> {
"NTH_VALUE(a, 2) PARTITION BY [b] ORDER BY [a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW".to_string(),
false,
)?;
let udwf_expr = Arc::new(BuiltInWindowExpr::new(
let udwf_expr = Arc::new(StandardWindowExpr::new(
nth_value_window,
&[col("b", &schema)?],
&LexOrdering {
Expand Down Expand Up @@ -1125,7 +1125,7 @@ fn roundtrip_udwf_extension_codec() -> Result<()> {
WindowFrameBound::CurrentRow,
);

let udwf_expr = Arc::new(BuiltInWindowExpr::new(
let udwf_expr = Arc::new(StandardWindowExpr::new(
udwf,
&[col("b", &schema)?],
&LexOrdering {
Expand Down

0 comments on commit 7553b3b

Please sign in to comment.