Skip to content

Commit

Permalink
refactor: per review
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Aug 9, 2024
1 parent 80786db commit 7926bf9
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 120 deletions.
114 changes: 58 additions & 56 deletions src/flow/src/expr/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use snafu::{ensure, OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
use substrait::df_logical_plan::consumer::name_to_op;

use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu};
use crate::error::{Error, ExternalSnafu, InvalidQuerySnafu, PlanSnafu, UnexpectedSnafu};
use crate::expr::error::{
CastValueSnafu, DivisionByZeroSnafu, EvalError, InternalSnafu, OverflowSnafu,
TryFromValueSnafu, TypeMismatchSnafu,
Expand Down Expand Up @@ -84,42 +84,10 @@ impl UnmaterializableFunc {
}

/// Create a UnmaterializableFunc from a string of the function name
pub fn from_str_args(name: &str, args: Vec<TypedExpr>) -> Result<Self, Error> {
pub fn from_str_args(name: &str, _args: Vec<TypedExpr>) -> Result<Self, Error> {
match name.to_lowercase().as_str() {
"now" => Ok(Self::Now),
"current_schema" => Ok(Self::CurrentSchema),
"tumble" => {
let ts = args.first().context(InvalidQuerySnafu {
reason: "Tumble window function requires a timestamp argument",
})?;
let window_size = args
.get(1)
.and_then(|expr| expr.expr.as_literal())
.context(InvalidQuerySnafu {
reason: "Tumble window function requires a window size argument"
})?.as_string() // TODO(discord9): since df to substrait convertor does not support interval type yet, we need to take a string and cast it to interval instead
.map(|s|cast(Value::from(s), &ConcreteDataType::interval_month_day_nano_datatype())).transpose().map_err(BoxedError::new).context(
ExternalSnafu
)?.and_then(|v|v.as_interval())
.with_context(||InvalidQuerySnafu {
reason: format!("Tumble window function requires window size argument to be a string describe a interval, found {:?}", args.get(1))
})?;
let start_time = match args.get(2) {
Some(start_time) => start_time.expr.as_literal(),
None => None,
}
.map(|s| cast(s.clone(), &ConcreteDataType::datetime_datatype())).transpose().map_err(BoxedError::new).context(ExternalSnafu)?.map(|v|v.as_datetime().with_context(
||InvalidQuerySnafu {
reason: format!("Tumble window function requires start time argument to be a datetime describe in string, found {:?}", args.get(2))
}
)).transpose()?;

Ok(Self::TumbleWindow {
ts: Box::new(ts.clone()),
window_size,
start_time,
})
}
_ => InvalidQuerySnafu {
reason: format!("Unknown unmaterializable function: {}", name),
}
Expand Down Expand Up @@ -227,27 +195,58 @@ impl UnaryFunc {
let ts = args.first().context(InvalidQuerySnafu {
reason: "Tumble window function requires a timestamp argument",
})?;
let window_size = args
.get(1)
.and_then(|expr| expr.expr.as_literal())
.context(InvalidQuerySnafu {
reason: "Tumble window function requires a window size argument"
})?.as_string() // TODO(discord9): since df to substrait convertor does not support interval type yet, we need to take a string and cast it to interval instead
.map(|s|cast(Value::from(s), &ConcreteDataType::interval_month_day_nano_datatype())).transpose().map_err(BoxedError::new).context(
ExternalSnafu
)?.and_then(|v|v.as_interval())
.with_context(||InvalidQuerySnafu {
reason: format!("Tumble window function requires window size argument to be a string describe a interval, found {:?}", args.get(1))
})?;
let window_size = {
let window_size_untyped = args
.get(1)
.and_then(|expr| expr.expr.as_literal())
.context(InvalidQuerySnafu {
reason: "Tumble window function requires a window size argument",
})?;
if let Some(window_size) = window_size_untyped.as_string() {
// cast as interval
cast(
Value::from(window_size),
&ConcreteDataType::interval_month_day_nano_datatype(),
)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_interval()
.context(UnexpectedSnafu {
reason: "Expect window size arg to be interval after successful cast"
.to_string(),
})?
} else if let Some(interval) = window_size_untyped.as_interval() {
interval
} else {
InvalidQuerySnafu {
reason: format!(
"Tumble window function requires window size argument to be either a interval or a string describe a interval, found {:?}",
window_size_untyped
)
}.fail()?
}
};
let start_time = match args.get(2) {
Some(start_time) => start_time.expr.as_literal(),
None => None,
}
.map(|s| cast(s.clone(), &ConcreteDataType::datetime_datatype())).transpose().map_err(BoxedError::new).context(ExternalSnafu)?.map(|v|v.as_datetime().with_context(
||InvalidQuerySnafu {
reason: format!("Tumble window function requires start time argument to be a datetime describe in string, found {:?}", args.get(2))
}
)).transpose()?;
Some(start_time) => {
if let Some(value) = start_time.expr.as_literal() {
// cast as DateTime
let ret = cast(value, &ConcreteDataType::datetime_datatype())
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.as_datetime()
.context(UnexpectedSnafu {
reason:
"Expect start time arg to be datetime after successful cast"
.to_string(),
})?;
Some(ret)
} else {
None
}
}
None => None,
};

if name == "tumble_start" {
Ok((
Self::TumbleWindowFloor {
Expand All @@ -268,7 +267,10 @@ impl UnaryFunc {
unreachable!()
}
}
_ => todo!(),
_ => crate::error::InternalSnafu {
reason: format!("Unknown tumble kind function: {}", name),
}
.fail()?,
}
}

Expand Down Expand Up @@ -623,7 +625,7 @@ impl BinaryFunc {
InvalidQuerySnafu {
reason: format!(
"Binary function {:?} requires both arguments to have the same type, left={:?}, right={:?}",
generic,t1,t2
generic, t1, t2
),
}
);
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl TypedPlan {
/// TODO(discord9): support `TableFunc`(by define FlatMap that map 1 to n)
/// Plan describe how to transform data in dataflow
///
/// This can be considered as a physical plan in dataflow, which describe how to transform data in
/// This can be considered as a physical plan in dataflow, which describe how to transform data in a streaming manner.
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
pub enum Plan {
/// A constant collection of rows.
Expand Down
9 changes: 7 additions & 2 deletions src/flow/src/transform/aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1578,8 +1578,13 @@ mod test {
)
.await
.unwrap(),
exprs: vec![ScalarExpr::Literal(Value::Interval(Interval::from_month_day_nano(0, 0, 30000000000)), CDT::interval_month_day_nano_datatype()),
ScalarExpr::Column(1).cast(CDT::timestamp_millisecond_datatype())],
exprs: vec![
ScalarExpr::Literal(
Value::Interval(Interval::from_month_day_nano(0, 0, 30000000000)),
CDT::interval_month_day_nano_datatype()
),
ScalarExpr::Column(1).cast(CDT::timestamp_millisecond_datatype())
],
}])
.unwrap()
.project(vec![2])
Expand Down
61 changes: 0 additions & 61 deletions src/flow/src/transform/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,6 @@ impl TypedExpr {

#[cfg(test)]
mod test {
use common_time::{DateTime, Interval};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -794,65 +793,5 @@ mod test {
},
}
);

let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![proto_col(0), lit("1 second"), lit("2021-07-01 00:00:00")],
options: vec![],
output_type: None,
..Default::default()
};
let input_schema = RelationType::new(vec![
ColumnType::new(CDT::timestamp_nanosecond_datatype(), false),
ColumnType::new(CDT::string_datatype(), false),
])
.into_unnamed();
let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]);
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions)
.await
.unwrap();

assert_eq!(
res,
ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
ts: Box::new(
ScalarExpr::Column(0)
.with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false))
),
window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000),
start_time: Some(DateTime::new(1625097600000))
})
.with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)),
);

let f = substrait_proto::proto::expression::ScalarFunction {
function_reference: 0,
arguments: vec![proto_col(0), lit("1 second")],
options: vec![],
output_type: None,
..Default::default()
};
let input_schema = RelationType::new(vec![
ColumnType::new(CDT::timestamp_nanosecond_datatype(), false),
ColumnType::new(CDT::string_datatype(), false),
])
.into_unnamed();
let extensions = FunctionExtensions::from_iter(vec![(0, "tumble".to_string())]);
let res = TypedExpr::from_substrait_scalar_func(&f, &input_schema, &extensions)
.await
.unwrap();

assert_eq!(
res,
ScalarExpr::CallUnmaterializable(UnmaterializableFunc::TumbleWindow {
ts: Box::new(
ScalarExpr::Column(0)
.with_type(ColumnType::new(CDT::timestamp_nanosecond_datatype(), false))
),
window_size: Interval::from_month_day_nano(0, 0, 1_000_000_000),
start_time: None
})
.with_type(ColumnType::new(CDT::timestamp_millisecond_datatype(), true)),
)
}
}

0 comments on commit 7926bf9

Please sign in to comment.