From d931389a4c385a250c83646ac3d7421b0c9cb7b4 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 29 Nov 2024 11:06:27 +0800 Subject: [PATCH] fix(flow): minor fix about count(*)&sink keyword (#5061) * fix: SiNk * fix: sink&count(*) * tests: SiNk * refactor: per review --- src/flow/src/df_optimizer.rs | 2 + src/sql/src/parsers/create_parser.rs | 14 +++- .../standalone/common/flow/flow_basic.result | 67 +++++++++++++++++++ .../standalone/common/flow/flow_basic.sql | 37 ++++++++++ 4 files changed, 117 insertions(+), 3 deletions(-) diff --git a/src/flow/src/df_optimizer.rs b/src/flow/src/df_optimizer.rs index bb296cba7079..a6f609274978 100644 --- a/src/flow/src/df_optimizer.rs +++ b/src/flow/src/df_optimizer.rs @@ -23,6 +23,7 @@ use common_error::ext::BoxedError; use common_telemetry::debug; use datafusion::config::ConfigOptions; use datafusion::error::DataFusionError; +use datafusion::optimizer::analyzer::count_wildcard_rule::CountWildcardRule; use datafusion::optimizer::analyzer::type_coercion::TypeCoercion; use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use datafusion::optimizer::optimize_projections::OptimizeProjections; @@ -59,6 +60,7 @@ pub async fn apply_df_optimizer( ) -> Result { let cfg = ConfigOptions::new(); let analyzer = Analyzer::with_rules(vec![ + Arc::new(CountWildcardRule::new()), Arc::new(AvgExpandRule::new()), Arc::new(TumbleExpandRule::new()), Arc::new(CheckGroupByRule::new()), diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 296110f4039f..bb9aadadb703 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -259,9 +259,17 @@ impl<'a> ParserContext<'a> { let flow_name = self.intern_parse_table_name()?; - self.parser - .expect_token(&Token::make_keyword(SINK)) - .context(SyntaxSnafu)?; + // make `SINK` case in-sensitive + if let Token::Word(word) = self.parser.peek_token().token + && word.value.eq_ignore_ascii_case(SINK) + { + self.parser.next_token(); + } else { + Err(ParserError::ParserError( + "Expect `SINK` keyword".to_string(), + )) + .context(SyntaxSnafu)? + } self.parser .expect_keyword(Keyword::TO) .context(SyntaxSnafu)?; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index 4c6095d2529c..cc9b4e038b0f 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -112,6 +112,73 @@ DROP TABLE out_num_cnt_basic; Affected Rows: 0 +-- test count(*) rewrite +CREATE TABLE input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +Affected Rows: 0 + +CREATE FLOW test_wildcard_basic SiNk TO out_basic AS +SELECT + COUNT(*) as wildcard +FROM + input_basic; + +Affected Rows: 0 + +DROP FLOW test_wildcard_basic; + +Affected Rows: 0 + +CREATE FLOW test_wildcard_basic sink TO out_basic AS +SELECT + COUNT(*) as wildcard +FROM + input_basic; + +Affected Rows: 0 + +INSERT INTO + input_basic +VALUES + (23, "2021-07-01 00:00:01.000"), + (24, "2021-07-01 00:00:01.500"); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_wildcard_basic'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('test_wildcard_basic') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT wildcard FROM out_basic; + ++----------+ +| wildcard | ++----------+ +| 2 | ++----------+ + +DROP FLOW test_wildcard_basic; + +Affected Rows: 0 + +DROP TABLE out_basic; + +Affected Rows: 0 + +DROP TABLE input_basic; + +Affected Rows: 0 + -- test distinct CREATE TABLE distinct_basic ( number INT, diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 3a1a53d0edfb..70d7b14157c2 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -61,6 +61,43 @@ DROP TABLE numbers_input_basic; DROP TABLE out_num_cnt_basic; +-- test count(*) rewrite +CREATE TABLE input_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +); + +CREATE FLOW test_wildcard_basic SiNk TO out_basic AS +SELECT + COUNT(*) as wildcard +FROM + input_basic; + +DROP FLOW test_wildcard_basic; + +CREATE FLOW test_wildcard_basic sink TO out_basic AS +SELECT + COUNT(*) as wildcard +FROM + input_basic; + +INSERT INTO + input_basic +VALUES + (23, "2021-07-01 00:00:01.000"), + (24, "2021-07-01 00:00:01.500"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_wildcard_basic'); + +SELECT wildcard FROM out_basic; + +DROP FLOW test_wildcard_basic; +DROP TABLE out_basic; +DROP TABLE input_basic; + -- test distinct CREATE TABLE distinct_basic ( number INT,