diff --git a/src/flow/src/df_optimizer.rs b/src/flow/src/df_optimizer.rs index e11452d1c854..d5368d5189e6 100644 --- a/src/flow/src/df_optimizer.rs +++ b/src/flow/src/df_optimizer.rs @@ -51,6 +51,7 @@ use substrait::DFLogicalSubstraitConvertor; use crate::adapter::FlownodeContext; use crate::error::{DatafusionSnafu, Error, ExternalSnafu, UnexpectedSnafu}; +use crate::expr::{TUMBLE_END, TUMBLE_START}; use crate::plan::TypedPlan; // TODO(discord9): use `Analyzer` to manage rules if more `AnalyzerRule` is needed @@ -369,7 +370,7 @@ fn expand_tumble_analyzer( datafusion_expr::Expr::ScalarFunction(func) if func.name() == "tumble" => { encountered_tumble = true; - let tumble_start = TumbleExpand::new("tumble_start"); + let tumble_start = TumbleExpand::new(TUMBLE_START); let tumble_start = datafusion_expr::expr::ScalarFunction::new_udf( Arc::new(tumble_start.into()), func.args.clone(), @@ -378,7 +379,7 @@ fn expand_tumble_analyzer( let start_col_name = tumble_start.name_for_alias()?; new_group_expr.push(tumble_start); - let tumble_end = TumbleExpand::new("tumble_end"); + let tumble_end = TumbleExpand::new(TUMBLE_END); let tumble_end = datafusion_expr::expr::ScalarFunction::new_udf( Arc::new(tumble_end.into()), func.args.clone(), @@ -439,6 +440,8 @@ fn expand_tumble_analyzer( Ok(Transformed::no(plan)) } +/// This is a placeholder for tumble_start and tumble_end function, so that datafusion can +/// recognize them as scalar function #[derive(Debug)] pub struct TumbleExpand { signature: Signature, @@ -524,6 +527,7 @@ impl ScalarUDFImpl for TumbleExpand { } } +/// This rule check all group by exprs, and make sure they are also in select clause in a aggr query struct CheckGroupByRule {} impl CheckGroupByRule { @@ -574,6 +578,7 @@ fn check_group_by_analyzer( Ok(Transformed::no(plan)) } +/// Find all column names in a plan #[derive(Debug, Default)] struct FindColumn { names_for_alias: HashSet, diff --git a/src/flow/src/expr.rs b/src/flow/src/expr.rs index 35f937cdc136..871b23c25dbc 100644 --- a/src/flow/src/expr.rs +++ b/src/flow/src/expr.rs @@ -37,6 +37,9 @@ use snafu::{ensure, ResultExt}; use crate::expr::error::DataTypeSnafu; +pub const TUMBLE_START: &str = "tumble_start"; +pub const TUMBLE_END: &str = "tumble_end"; + /// A batch of vectors with the same length but without schema, only useful in dataflow pub struct Batch { batch: Vec, diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 454cf5e70081..65da763e27d6 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -41,7 +41,7 @@ use crate::expr::error::{ TryFromValueSnafu, TypeMismatchSnafu, }; use crate::expr::signature::{GenericFn, Signature}; -use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr}; +use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr, TUMBLE_END, TUMBLE_START}; use crate::repr::{self, value_to_internal_ts}; /// UnmaterializableFunc is a function that can't be eval independently, @@ -316,8 +316,8 @@ impl UnaryFunc { } pub fn from_tumble_func(name: &str, args: &[TypedExpr]) -> Result<(Self, TypedExpr), Error> { - match name { - "tumble_start" | "tumble_end" => { + match name.to_lowercase().as_str() { + TUMBLE_START | TUMBLE_END => { let ts = args.first().context(InvalidQuerySnafu { reason: "Tumble window function requires a timestamp argument", })?; @@ -378,7 +378,7 @@ impl UnaryFunc { None => None, }; - if name == "tumble_start" { + if name == TUMBLE_START { Ok(( Self::TumbleWindowFloor { window_size, @@ -386,7 +386,7 @@ impl UnaryFunc { }, ts.clone(), )) - } else if name == "tumble_end" { + } else if name == TUMBLE_END { Ok(( Self::TumbleWindowCeiling { window_size, diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 135ce7d37fc0..f6dff58856db 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -29,6 +29,7 @@ use substrait_proto::proto::extensions::SimpleExtensionDeclaration; use crate::adapter::FlownodeContext; use crate::error::{Error, NotImplementedSnafu, UnexpectedSnafu}; +use crate::expr::{TUMBLE_END, TUMBLE_START}; /// a simple macro to generate a not implemented error macro_rules! not_impl_err { ($($arg:tt)*) => { @@ -99,8 +100,8 @@ impl FunctionExtensions { /// register flow-specific functions to the query engine pub fn register_function_to_query_engine(engine: &Arc) { engine.register_function(Arc::new(TumbleFunction::new("tumble"))); - engine.register_function(Arc::new(TumbleFunction::new("tumble_start"))); - engine.register_function(Arc::new(TumbleFunction::new("tumble_end"))); + engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START))); + engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END))); } #[derive(Debug)] diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 4c6a04594f44..de05b018ac51 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -33,7 +33,7 @@ use crate::error::{ }; use crate::expr::{ BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc, - UnmaterializableFunc, VariadicFunc, + UnmaterializableFunc, VariadicFunc, TUMBLE_END, TUMBLE_START, }; use crate::repr::{ColumnType, RelationDesc, RelationType}; use crate::transform::literal::{ @@ -361,7 +361,7 @@ impl TypedExpr { Ok(TypedExpr::new(ret_expr, ret_type)) } _var => { - if fn_name == "tumble_start" || fn_name == "tumble_end" { + if fn_name == TUMBLE_START || fn_name == TUMBLE_END { let (func, arg) = UnaryFunc::from_tumble_func(fn_name, &arg_typed_exprs)?; let ret_type = ColumnType::new_nullable(func.signature().output.clone());