Skip to content

Commit

Permalink
refactor: per review
Browse files Browse the repository at this point in the history
  • Loading branch information
discord9 committed Aug 28, 2024
1 parent 3cf2157 commit d9027cc
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 11 deletions.
9 changes: 7 additions & 2 deletions src/flow/src/df_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use substrait::DFLogicalSubstraitConvertor;

use crate::adapter::FlownodeContext;
use crate::error::{DatafusionSnafu, Error, ExternalSnafu, UnexpectedSnafu};
use crate::expr::{TUMBLE_END, TUMBLE_START};
use crate::plan::TypedPlan;

// TODO(discord9): use `Analyzer` to manage rules if more `AnalyzerRule` is needed
Expand Down Expand Up @@ -369,7 +370,7 @@ fn expand_tumble_analyzer(
datafusion_expr::Expr::ScalarFunction(func) if func.name() == "tumble" => {
encountered_tumble = true;

let tumble_start = TumbleExpand::new("tumble_start");
let tumble_start = TumbleExpand::new(TUMBLE_START);
let tumble_start = datafusion_expr::expr::ScalarFunction::new_udf(
Arc::new(tumble_start.into()),
func.args.clone(),
Expand All @@ -378,7 +379,7 @@ fn expand_tumble_analyzer(
let start_col_name = tumble_start.name_for_alias()?;
new_group_expr.push(tumble_start);

let tumble_end = TumbleExpand::new("tumble_end");
let tumble_end = TumbleExpand::new(TUMBLE_END);
let tumble_end = datafusion_expr::expr::ScalarFunction::new_udf(
Arc::new(tumble_end.into()),
func.args.clone(),
Expand Down Expand Up @@ -439,6 +440,8 @@ fn expand_tumble_analyzer(
Ok(Transformed::no(plan))
}

/// This is a placeholder for tumble_start and tumble_end function, so that datafusion can
/// recognize them as scalar function
#[derive(Debug)]
pub struct TumbleExpand {
signature: Signature,
Expand Down Expand Up @@ -524,6 +527,7 @@ impl ScalarUDFImpl for TumbleExpand {
}
}

/// This rule check all group by exprs, and make sure they are also in select clause in a aggr query
struct CheckGroupByRule {}

impl CheckGroupByRule {
Expand Down Expand Up @@ -574,6 +578,7 @@ fn check_group_by_analyzer(
Ok(Transformed::no(plan))
}

/// Find all column names in a plan
#[derive(Debug, Default)]
struct FindColumn {
names_for_alias: HashSet<String>,
Expand Down
3 changes: 3 additions & 0 deletions src/flow/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ use snafu::{ensure, ResultExt};

use crate::expr::error::DataTypeSnafu;

pub const TUMBLE_START: &str = "tumble_start";
pub const TUMBLE_END: &str = "tumble_end";

/// A batch of vectors with the same length but without schema, only useful in dataflow
pub struct Batch {
batch: Vec<VectorRef>,
Expand Down
10 changes: 5 additions & 5 deletions src/flow/src/expr/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::expr::error::{
TryFromValueSnafu, TypeMismatchSnafu,
};
use crate::expr::signature::{GenericFn, Signature};
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr};
use crate::expr::{Batch, InvalidArgumentSnafu, ScalarExpr, TypedExpr, TUMBLE_END, TUMBLE_START};
use crate::repr::{self, value_to_internal_ts};

/// UnmaterializableFunc is a function that can't be eval independently,
Expand Down Expand Up @@ -316,8 +316,8 @@ impl UnaryFunc {
}

pub fn from_tumble_func(name: &str, args: &[TypedExpr]) -> Result<(Self, TypedExpr), Error> {
match name {
"tumble_start" | "tumble_end" => {
match name.to_lowercase().as_str() {
TUMBLE_START | TUMBLE_END => {
let ts = args.first().context(InvalidQuerySnafu {
reason: "Tumble window function requires a timestamp argument",
})?;
Expand Down Expand Up @@ -378,15 +378,15 @@ impl UnaryFunc {
None => None,
};

if name == "tumble_start" {
if name == TUMBLE_START {
Ok((
Self::TumbleWindowFloor {
window_size,
start_time,
},
ts.clone(),
))
} else if name == "tumble_end" {
} else if name == TUMBLE_END {
Ok((
Self::TumbleWindowCeiling {
window_size,
Expand Down
5 changes: 3 additions & 2 deletions src/flow/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use substrait_proto::proto::extensions::SimpleExtensionDeclaration;

use crate::adapter::FlownodeContext;
use crate::error::{Error, NotImplementedSnafu, UnexpectedSnafu};
use crate::expr::{TUMBLE_END, TUMBLE_START};
/// a simple macro to generate a not implemented error
macro_rules! not_impl_err {
($($arg:tt)*) => {
Expand Down Expand Up @@ -99,8 +100,8 @@ impl FunctionExtensions {
/// register flow-specific functions to the query engine
pub fn register_function_to_query_engine(engine: &Arc<dyn QueryEngine>) {
engine.register_function(Arc::new(TumbleFunction::new("tumble")));
engine.register_function(Arc::new(TumbleFunction::new("tumble_start")));
engine.register_function(Arc::new(TumbleFunction::new("tumble_end")));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_START)));
engine.register_function(Arc::new(TumbleFunction::new(TUMBLE_END)));
}

#[derive(Debug)]
Expand Down
4 changes: 2 additions & 2 deletions src/flow/src/transform/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::error::{
};
use crate::expr::{
BinaryFunc, DfScalarFunction, RawDfScalarFn, ScalarExpr, TypedExpr, UnaryFunc,
UnmaterializableFunc, VariadicFunc,
UnmaterializableFunc, VariadicFunc, TUMBLE_END, TUMBLE_START,
};
use crate::repr::{ColumnType, RelationDesc, RelationType};
use crate::transform::literal::{
Expand Down Expand Up @@ -361,7 +361,7 @@ impl TypedExpr {
Ok(TypedExpr::new(ret_expr, ret_type))
}
_var => {
if fn_name == "tumble_start" || fn_name == "tumble_end" {
if fn_name == TUMBLE_START || fn_name == TUMBLE_END {
let (func, arg) = UnaryFunc::from_tumble_func(fn_name, &arg_typed_exprs)?;

let ret_type = ColumnType::new_nullable(func.signature().output.clone());
Expand Down

0 comments on commit d9027cc

Please sign in to comment.