diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 54971d4a7580..1cc70215db32 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -32,7 +32,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use strum::{EnumIter, IntoEnumIterator}; use substrait::df_logical_plan::consumer::name_to_op; -use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu}; +use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu, UnexpectedSnafu}; use crate::expr::error::{ CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, OverflowSnafu, TryFromValueSnafu, TypeMismatchSnafu, @@ -84,42 +84,10 @@ impl UnmaterializableFunc { } /// Create a UnmaterializableFunc from a string of the function name - pub fn from_str_args(name: &str, args: Vec) -> Result { + pub fn from_str_args(name: &str, _args: Vec) -> Result { match name.to_lowercase().as_str() { "now" => Ok(Self::Now), "current_schema" => Ok(Self::CurrentSchema), - "tumble" => { - let ts = args.first().context(InvalidQuerySnafu { - reason: "Tumble window function requires a timestamp argument", - })?; - let window_size = args - .get(1) - .and_then(|expr| expr.expr.as_literal()) - .context(InvalidQuerySnafu { - reason: "Tumble window function requires a window size argument" - })?.as_string() // TODO(discord9): since df to substrait convertor does not support interval type yet, we need to take a string and cast it to interval instead - .map(|s|cast(Value::from(s), &ConcreteDataType::interval_month_day_nano_datatype())).transpose().map_err(BoxedError::new).context( - ExternalSnafu - )?.and_then(|v|v.as_interval()) - .with_context(||InvalidQuerySnafu { - reason: format!("Tumble window function requires window size argument to be a string describe a interval, found {:?}", args.get(1)) - })?; - let start_time = match args.get(2) { - Some(start_time) => start_time.expr.as_literal(), - None => None, - } - .map(|s| cast(s.clone(), &ConcreteDataType::datetime_datatype())).transpose().map_err(BoxedError::new).context(ExternalSnafu)?.map(|v|v.as_datetime().with_context( - ||InvalidQuerySnafu { - reason: format!("Tumble window function requires start time argument to be a datetime describe in string, found {:?}", args.get(2)) - } - )).transpose()?; - - Ok(Self::TumbleWindow { - ts: Box::new(ts.clone()), - window_size, - start_time, - }) - } _ => InvalidQuerySnafu { reason: format!("Unknown unmaterializable function: {}", name), } @@ -227,27 +195,58 @@ impl UnaryFunc { let ts = args.first().context(InvalidQuerySnafu { reason: "Tumble window function requires a timestamp argument", })?; - let window_size = args - .get(1) - .and_then(|expr| expr.expr.as_literal()) - .context(InvalidQuerySnafu { - reason: "Tumble window function requires a window size argument" - })?.as_string() // TODO(discord9): since df to substrait convertor does not support interval type yet, we need to take a string and cast it to interval instead - .map(|s|cast(Value::from(s), &ConcreteDataType::interval_month_day_nano_datatype())).transpose().map_err(BoxedError::new).context( - ExternalSnafu - )?.and_then(|v|v.as_interval()) - .with_context(||InvalidQuerySnafu { - reason: format!("Tumble window function requires window size argument to be a string describe a interval, found {:?}", args.get(1)) - })?; + let window_size = { + let window_size_untyped = args + .get(1) + .and_then(|expr| expr.expr.as_literal()) + .context(InvalidQuerySnafu { + reason: "Tumble window function requires a window size argument", + })?; + if let Some(window_size) = window_size_untyped.as_string() { + // cast as interval + cast( + Value::from(window_size), + &ConcreteDataType::interval_month_day_nano_datatype(), + ) + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .as_interval() + .context(UnexpectedSnafu { + reason: "Expect window size arg to be interval after successful cast" + .to_string(), + })? + } else if let Some(interval) = window_size_untyped.as_interval() { + interval + } else { + InvalidQuerySnafu { + reason: format!( + "Tumble window function requires window size argument to be either a interval or a string describe a interval, found {:?}", + window_size_untyped + ) + }.fail()? + } + }; let start_time = match args.get(2) { - Some(start_time) => start_time.expr.as_literal(), - None => None, - } - .map(|s| cast(s.clone(), &ConcreteDataType::datetime_datatype())).transpose().map_err(BoxedError::new).context(ExternalSnafu)?.map(|v|v.as_datetime().with_context( - ||InvalidQuerySnafu { - reason: format!("Tumble window function requires start time argument to be a datetime describe in string, found {:?}", args.get(2)) - } - )).transpose()?; + Some(start_time) => { + if let Some(value) = start_time.expr.as_literal() { + // cast as DateTime + let ret = cast(value, &ConcreteDataType::datetime_datatype()) + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .as_datetime() + .context(UnexpectedSnafu { + reason: + "Expect start time arg to be datetime after successful cast" + .to_string(), + })?; + Some(ret) + } else { + None + } + } + None => None, + }; + if name == "tumble_start" { Ok(( Self::TumbleWindowFloor { @@ -268,7 +267,10 @@ impl UnaryFunc { unreachable!() } } - _ => todo!(), + _ => crate::error::InternalSnafu { + reason: format!("Unknown tumble kind function: {}", name), + } + .fail()?, } } @@ -623,7 +625,7 @@ impl BinaryFunc { InvalidQuerySnafu { reason: format!( "Binary function {:?} requires both arguments to have the same type, left={:?}, right={:?}", - generic,t1,t2 + generic, t1, t2 ), } ); diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 6fc3b093c1c8..c1ca38ed8610 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -122,7 +122,7 @@ impl TypedPlan { /// TODO(discord9): support `TableFunc`(by define FlatMap that map 1 to n) /// Plan describe how to transform data in dataflow /// -/// This can be considered as a physical plan in dataflow, which describe how to transform data in +/// This can be considered as a physical plan in dataflow, which describe how to transform data in a streaming manner. #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)] pub enum Plan { /// A constant collection of rows. diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 0adf0322e306..1e556c528c4d 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -1578,8 +1578,13 @@ mod test { ) .await .unwrap(), - exprs: vec![ScalarExpr::Literal(Value::Interval(Interval::from_month_day_nano(0, 0, 30000000000)), CDT::interval_month_day_nano_datatype()), - ScalarExpr::Column(1).cast(CDT::timestamp_millisecond_datatype())], + exprs: vec![ + ScalarExpr::Literal( + Value::Interval(Interval::from_month_day_nano(0, 0, 30000000000)), + CDT::interval_month_day_nano_datatype() + ), + ScalarExpr::Column(1).cast(CDT::timestamp_millisecond_datatype()) + ], }]) .unwrap() .project(vec![2]) diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index b2e24f5fc35d..4c6a04594f44 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -537,7 +537,6 @@ impl TypedExpr { #[cfg(test)] mod test { - use common_time::{DateTime, Interval}; use datatypes::prelude::ConcreteDataType; use datatypes::value::Value; use pretty_assertions::assert_eq; @@ -794,65 +793,5 @@ mod test { }, } ); - - let f = substrait_proto::proto::expression::ScalarFunction { - function_reference: 0, - arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")], - options: vec![], - output_type: None, - ..Default::default() - }; - let input_schema = RelationType::new(vec![ - ColumnType::new(CDT::timestamp_nanosecond_datatype(), false), - ColumnType::new(CDT::string_datatype(), false), - ]) - .into_unnamed(); - let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]); - let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions) - .await - .unwrap(); - - assert_eq!( - res, - ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { - ts: Box::new( - ScalarExpr::Column(0) - .with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false)) - ), - window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000), - start_time: Some(DateTime::new(1625097600000)) - }) - .with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)), - ); - - let f = substrait_proto::proto::expression::ScalarFunction { - function_reference: 0, - arguments: vec![proto_col(0), lit("1 second")], - options: vec![], - output_type: None, - ..Default::default() - }; - let input_schema = RelationType::new(vec![ - ColumnType::new(CDT::timestamp_nanosecond_datatype(), false), - ColumnType::new(CDT::string_datatype(), false), - ]) - .into_unnamed(); - let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]); - let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions) - .await - .unwrap(); - - assert_eq!( - res, - ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow { - ts: Box::new( - ScalarExpr::Column(0) - .with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false)) - ), - window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000), - start_time: None - }) - .with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)), - ) } }