diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 2d0a6f66dab8..76695fd6053d 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -216,6 +216,7 @@ impl KeyValPlan { /// find out the column that should be time index in group exprs(which is all columns that should be keys) /// TODO(discord9): better ways to assign time index +/// for now, it will found the first column that is timestamp or has a tumble window floor function fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { group_exprs.iter().position(|expr| { matches!( @@ -224,7 +225,7 @@ fn find_time_index_in_group_exprs(group_exprs: &[TypedExpr]) -> Option { func: UnaryFunc::TumbleWindowFloor { .. }, expr: _ } - ) + ) || expr.typ.scalar_type.is_timestamp() }) } diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index efbc04047ded..1f94c3c3e033 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -17,6 +17,24 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -28,6 +46,24 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); | FLOW_FLUSHED | +----------------------------------------+ +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "SUM(numbers_input_basic.number)" BIGINT NULL, | +| | "window_start" TIMESTAMP(3) NOT NULL, | +| | "window_end" TIMESTAMP(3) NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("window_start"), | +| | PRIMARY KEY ("window_end") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic @@ -130,6 +166,22 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + DROP FLOW test_wildcard_basic; Affected Rows: 0 @@ -142,6 +194,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + +-- SQLNESS ARG restart=true INSERT INTO input_basic VALUES @@ -159,6 +228,22 @@ ADMIN FLUSH_FLOW('test_wildcard_basic'); | FLOW_FLUSHED | +-----------------------------------------+ +SHOW CREATE TABLE out_basic; + ++-----------+---------------------------------------------+ +| Table | Create Table | ++-----------+---------------------------------------------+ +| out_basic | CREATE TABLE IF NOT EXISTS "out_basic" ( | +| | "wildcard" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-----------+---------------------------------------------+ + SELECT wildcard FROM out_basic; +----------+ @@ -197,6 +282,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_distinct_basic; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | +| | "dis" INT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("dis") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -350,6 +452,23 @@ SHOW CREATE FLOW filter_numbers_basic; | | AS SELECT INTERVAL '1 day 1 second', INTERVAL '1 month 1 day 1 second', INTERVAL '1 year 1 month' FROM numbers_input_basic WHERE number > 10 | +----------------------+----------------------------------------------------------------------------------------------------------------------------------------------+ +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+-----------------------------------------------------------+ +| Table | Create Table | ++-------------------+-----------------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "a" INTERVAL NULL, | +| | "b" INTERVAL NULL, | +| | "c" INTERVAL NULL, | +| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+-----------------------------------------------------------+ + drop flow filter_numbers_basic; Affected Rows: 0 @@ -392,6 +511,22 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE approx_rate; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( | +| | "rate" DOUBLE NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO bytes_log VALUES @@ -481,6 +616,23 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE ngx_country; + ++-------------+---------------------------------------------+ +| Table | Create Table | ++-------------+---------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+---------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -614,6 +766,23 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE ngx_country; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -632,21 +801,20 @@ ADMIN FLUSH_FLOW('calc_ngx_country'); SHOW CREATE TABLE ngx_country; -+-------------+---------------------------------------------------------+ -| Table | Create Table | -+-------------+---------------------------------------------------------+ -| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | -| | "ngx_access_log.country" STRING NULL, | -| | "time_window" TIMESTAMP(3) NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder"), | -| | PRIMARY KEY ("ngx_access_log.country", "time_window") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+-------------+---------------------------------------------------------+ ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| ngx_country | CREATE TABLE IF NOT EXISTS "ngx_country" ( | +| | "ngx_access_log.country" STRING NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("ngx_access_log.country") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ SELECT "ngx_access_log.country", @@ -763,6 +931,23 @@ HAVING Affected Rows: 0 +SHOW CREATE TABLE temp_alerts; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| temp_alerts | CREATE TABLE IF NOT EXISTS "temp_alerts" ( | +| | "sensor_id" INT NULL, | +| | "loc" STRING NULL, | +| | "max_temp" DOUBLE NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + INSERT INTO temp_sensor_data VALUES @@ -902,6 +1087,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE ngx_distribution; + ++------------------+-------------------------------------------------+ +| Table | Create Table | ++------------------+-------------------------------------------------+ +| ngx_distribution | CREATE TABLE IF NOT EXISTS "ngx_distribution" ( | +| | "stat" INT NULL, | +| | "bucket_size" INT NULL, | +| | "total_logs" BIGINT NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window"), | +| | PRIMARY KEY ("stat", "bucket_size") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++------------------+-------------------------------------------------+ + INSERT INTO ngx_access_log VALUES @@ -1008,6 +1212,22 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE requests_without_ip; + ++---------------------+----------------------------------------------------+ +| Table | Create Table | ++---------------------+----------------------------------------------------+ +| requests_without_ip | CREATE TABLE IF NOT EXISTS "requests_without_ip" ( | +| | "service_name" STRING NULL, | +| | "val" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("ts") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++---------------------+----------------------------------------------------+ + INSERT INTO requests VALUES @@ -1093,6 +1313,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE android_log_abnormal; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | +| | "crash" BIGINT NULL, | +| | "fatal" BIGINT NULL, | +| | "backtrace" BIGINT NULL, | +| | "anr" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------------------+-----------------------------------------------------+ + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -1185,6 +1424,25 @@ GROUP BY Affected Rows: 0 +SHOW CREATE TABLE android_log_abnormal; + ++----------------------+-----------------------------------------------------+ +| Table | Create Table | ++----------------------+-----------------------------------------------------+ +| android_log_abnormal | CREATE TABLE IF NOT EXISTS "android_log_abnormal" ( | +| | "crash" BIGINT NULL, | +| | "fatal" BIGINT NULL, | +| | "backtrace" BIGINT NULL, | +| | "anr" BIGINT NULL, | +| | "time_window" TIMESTAMP(9) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++----------------------+-----------------------------------------------------+ + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -1260,6 +1518,22 @@ FROM Affected Rows: 0 +SHOW CREATE TABLE out_num_cnt_basic; + ++-------------------+--------------------------------------------------+ +| Table | Create Table | ++-------------------+--------------------------------------------------+ +| out_num_cnt_basic | CREATE TABLE IF NOT EXISTS "out_num_cnt_basic" ( | +| | "avg_after_filter_num" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------------+--------------------------------------------------+ + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index af47f3188905..63fc8e85d18e 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -13,11 +13,15 @@ FROM GROUP BY tumble(ts, '1 second', '2021-07-01 00:00:00'); +SHOW CREATE TABLE out_num_cnt_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_numbers_basic'); +SHOW CREATE TABLE out_num_cnt_basic; + -- SQLNESS ARG restart=true INSERT INTO numbers_input_basic @@ -75,6 +79,8 @@ SELECT FROM input_basic; +SHOW CREATE TABLE out_basic; + DROP FLOW test_wildcard_basic; CREATE FLOW test_wildcard_basic sink TO out_basic AS @@ -83,6 +89,9 @@ SELECT FROM input_basic; +SHOW CREATE TABLE out_basic; + +-- SQLNESS ARG restart=true INSERT INTO input_basic VALUES @@ -92,6 +101,8 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_wildcard_basic'); +SHOW CREATE TABLE out_basic; + SELECT wildcard FROM out_basic; DROP FLOW test_wildcard_basic; @@ -112,6 +123,8 @@ SELECT FROM distinct_basic; +SHOW CREATE TABLE out_distinct_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | @@ -186,6 +199,8 @@ where SHOW CREATE FLOW filter_numbers_basic; +SHOW CREATE TABLE out_num_cnt_basic; + drop flow filter_numbers_basic; drop table out_num_cnt_basic; @@ -216,6 +231,8 @@ from GROUP BY time_window; +SHOW CREATE TABLE approx_rate; + INSERT INTO bytes_log VALUES @@ -266,6 +283,8 @@ SELECT FROM ngx_access_log; +SHOW CREATE TABLE ngx_country; + INSERT INTO ngx_access_log VALUES @@ -331,6 +350,8 @@ GROUP BY country, time_window; +SHOW CREATE TABLE ngx_country; + INSERT INTO ngx_access_log VALUES @@ -409,6 +430,8 @@ GROUP BY HAVING max_temp > 100; +SHOW CREATE TABLE temp_alerts; + INSERT INTO temp_sensor_data VALUES @@ -488,6 +511,8 @@ GROUP BY time_window, bucket_size; +SHOW CREATE TABLE ngx_distribution; + INSERT INTO ngx_access_log VALUES @@ -551,6 +576,8 @@ SELECT FROM requests; +SHOW CREATE TABLE requests_without_ip; + INSERT INTO requests VALUES @@ -605,6 +632,8 @@ FROM android_log GROUP BY time_window; +SHOW CREATE TABLE android_log_abnormal; + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -657,6 +686,8 @@ FROM android_log GROUP BY time_window; +SHOW CREATE TABLE android_log_abnormal; + INSERT INTO android_log values ("am_crash", "2021-07-01 00:01:01.000"), ("asas.backtrace.ssss", "2021-07-01 00:01:01.000"); @@ -694,6 +725,8 @@ SELECT FROM numbers_input_basic; +SHOW CREATE TABLE out_num_cnt_basic; + -- TODO(discord9): confirm if it's necessary to flush flow here? -- because flush_flow result is at most 1 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |