diff --git a/cases/function/window/error_window.yaml b/cases/function/window/error_window.yaml index 9e9419bc74f..8b41d1ff0bf 100644 --- a/cases/function/window/error_window.yaml +++ b/cases/function/window/error_window.yaml @@ -17,15 +17,17 @@ debugs: [] version: 0.5.0 cases: - id: 0 - desc: no order by + desc: RANGE-type WINDOW with offset PRECEDING/FOLLOWING requires ORDER BY inputs: - columns: [ "id int","c1 string","c3 int","c4 bigint","c5 float","c6 double","c7 timestamp","c8 date" ] indexs: [ "index1:c8:c4" ] rows: - [1,"aa",20,30,1.1,2.1,1590738990000,"2020-05-01"] sql: | - SELECT id, c1, c4, count(c4) OVER w1 as w1_c4_count FROM {0} WINDOW w1 AS (PARTITION BY {0}.c8 ROWS BETWEEN 2 PRECEDING AND CURRENT ROW); + SELECT id, c1, c4, count(c4) OVER w1 as w1_c4_count FROM {0} + WINDOW w1 AS (PARTITION BY {0}.c8 ROWS_RANGE BETWEEN 2 PRECEDING AND CURRENT ROW); expect: + msg: RANGE/ROWS_RANGE-type FRAME with offset PRECEDING/FOLLOWING requires exactly one ORDER BY column success: false - id: 1 desc: no partition by @@ -301,3 +303,29 @@ cases: SELECT id, c1, c3, sum(c4) OVER w1 as w1_c4_sum FROM {0} WINDOW w1 AS (PARTITION BY {0}.c33 ORDER BY {0}.c7 ROWS_RANGE BETWEEN 2s PRECEDING AND CURRENT ROW); expect: success: false + - id: 17 + desc: ROWS WINDOW + EXCLUDE CURRENT_TIME requires order by + inputs: + - columns: [ "id int","c1 string","c3 int","c4 bigint","c5 float","c6 double","c7 timestamp","c8 date" ] + indexs: [ "index1:c8:c4" ] + rows: + - [1,"aa",20,30,1.1,2.1,1590738990000,"2020-05-01"] + sql: | + SELECT id, c1, c4, count(c4) OVER w1 as w1_c4_count FROM {0} + WINDOW w1 AS (PARTITION BY {0}.c8 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT_TIME); + expect: + msg: WINDOW with EXCLUDE CURRENT_TIME requires exactly one ORDER BY column + success: false + - id: 18 + desc: RANGE WINDOW + EXCLUDE CURRENT_TIME requires order by + inputs: + - columns: [ "id int","c1 string","c3 int","c4 bigint","c5 float","c6 double","c7 timestamp","c8 date" ] + indexs: [ "index1:c8:c4" ] + rows: + - [1,"aa",20,30,1.1,2.1,1590738990000,"2020-05-01"] + sql: | + SELECT id, c1, c4, count(c4) OVER w1 as w1_c4_count FROM {0} + WINDOW w1 AS (PARTITION BY {0}.c8 ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT_TIME); + expect: + msg: WINDOW with EXCLUDE CURRENT_TIME requires exactly one ORDER BY column + success: false diff --git a/cases/query/window_query.yaml b/cases/query/window_query.yaml index 84365be97f7..3c64259d8c5 100644 --- a/cases/query/window_query.yaml +++ b/cases/query/window_query.yaml @@ -901,3 +901,234 @@ cases: 200, 1, 1 300, 0, 0 400, 2, 2 + + # ====================================================================== + # WINDOW without ORDER BY + # ====================================================================== + - id: 24 + desc: ROWS WINDOW WITHOUT ORDER BY + mode: batch-unsupport + inputs: + - name: t1 + columns: + - id int + - gp int + - ts timestamp + indexs: + - idx:gp:ts + data: | + 1, 100, 20000 + 2, 100, 10000 + 3, 400, 20000 + 4, 400, 10000 + 5, 400, 15000 + 6, 400, 40000 + sql: | + select id, count(ts) over w as agg + from t1 + window w as ( + partition by gp + rows between 2 open preceding and current row + ) + request_plan: | + PROJECT(type=Aggregation) + REQUEST_UNION(partition_keys=(), orders=, rows=(, 2 OPEN PRECEDING, 0 CURRENT), index_keys=(gp)) + DATA_PROVIDER(request=t1) + DATA_PROVIDER(type=Partition, table=t1, index=idx) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(id, agg)) + REQUEST_JOIN(type=kJoinTypeConcat) + SIMPLE_PROJECT(sources=(id)) + DATA_PROVIDER(request=t1) + PROJECT(type=Aggregation) + REQUEST_UNION(partition_keys=(), orders=, rows=(, 2 OPEN PRECEDING, 0 CURRENT), index_keys=(gp)) + DATA_PROVIDER(request=t1) + DATA_PROVIDER(type=Partition, table=t1, index=idx) + expect: + columns: ["id int", "agg int64"] + order: id + data: | + 1, 1 + 2, 2 + 3, 1 + 4, 2 + 5, 2 + 6, 2 + - id: 25 + desc: RANGE WINDOW WITHOUT ORDER BY + mode: batch-unsupport + inputs: + - name: t1 + columns: + - id int + - gp int + - ts timestamp + indexs: + - idx:gp:ts + data: | + 1, 100, 20000 + 2, 100, 10000 + 3, 400, 20000 + 4, 400, 10 + 5, 400, 15000 + sql: | + select id, count(ts) over w as agg + from t1 + window w as ( + partition by gp + rows_range between unbounded preceding and current row + ) + request_plan: | + PROJECT(type=Aggregation) + REQUEST_UNION(partition_keys=(), orders=, range=(, 0 PRECEDING UNBOUND, 0 CURRENT), index_keys=(gp)) + DATA_PROVIDER(request=t1) + DATA_PROVIDER(type=Partition, table=t1, index=idx) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(id, agg)) + REQUEST_JOIN(type=kJoinTypeConcat) + SIMPLE_PROJECT(sources=(id)) + DATA_PROVIDER(request=t1) + PROJECT(type=Aggregation) + REQUEST_UNION(partition_keys=(), orders=, range=(, 0 PRECEDING UNBOUND, 0 CURRENT), index_keys=(gp)) + DATA_PROVIDER(request=t1) + DATA_PROVIDER(type=Partition, table=t1, index=idx) + expect: + columns: ["id int", "agg int64"] + order: id + data: | + 1, 1 + 2, 2 + 3, 1 + 4, 2 + 5, 3 + - id: 26 + desc: RANGE-type WINDOW WITHOUT ORDER BY + WINDOW attributes + mode: batch-unsupport + inputs: + - name: t1 + columns: + - id int + - gp int + - ts timestamp + indexs: + - idx:gp:ts + data: | + 1, 100, 20000 + 2, 100, 10000 + 3, 400, 20000 + 4, 400, 10000 + 5, 400, 15000 + - name: t2 + columns: + - id int + - gp int + - ts timestamp + indexs: + - idx:gp:ts + data: | + 1, 100, 20000 + 2, 100, 10000 + 3, 400, 20000 + 4, 400, 10000 + 5, 400, 15000 + sql: | + select id, + count(ts) over w1 as agg1, + count(ts) over w2 as agg2, + count(ts) over w3 as agg3, + count(ts) over w4 as agg4, + count(ts) over w5 as agg5, + count(ts) over w6 as agg6, + count(ts) over w7 as agg7, + from t1 + window w1 as ( + PARTITION by gp + ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), + w2 as (partition by gp + ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT_ROW), + w3 as (PARTITION BY gp + ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW MAXSIZE 1), + w4 as ( + UNION (select * from t2) + PARTITION BY gp + ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW INSTANCE_NOT_IN_WINDOW), + w5 as ( + UNION (select * from t2) + PARTITION BY gp + ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW INSTANCE_NOT_IN_WINDOW EXCLUDE CURRENT_ROW), + w6 as ( + UNION (select * from t2) + PARTITION BY gp + ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW MAXSIZE 2 INSTANCE_NOT_IN_WINDOW EXCLUDE CURRENT_ROW), + w7 as ( + UNION (select * from t2) + PARTITION BY gp + ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE CURRENT_ROW) + expect: + columns: ["id int", "agg1 int64", "agg2 int64", "agg3 int64", "agg4 int64", "agg5 int64", "agg6 int64", "agg7 int64"] + order: id + data: | + 1, 1, 0, 1, 3, 2, 2, 2 + 2, 2, 1, 1, 3, 2, 2, 3 + 3, 1, 0, 1, 4, 3, 2, 3 + 4, 2, 1, 1, 4, 3, 2, 4 + 5, 3, 2, 1, 4, 3, 2, 5 + - id: 27 + desc: ROWS-type WINDOW WITHOUT ORDER BY + WINDOW attributes + mode: batch-unsupport + inputs: + - name: t1 + columns: + - id int + - gp int + - ts timestamp + indexs: + - idx:gp:ts + data: | + 1, 100, 20000 + 2, 100, 10000 + 3, 400, 20000 + 4, 400, 10000 + 5, 400, 15000 + - name: t2 + columns: + - id int + - gp int + - ts timestamp + indexs: + - idx:gp:ts + data: | + 1, 100, 20000 + 2, 100, 10000 + 3, 400, 20000 + 4, 400, 10000 + 5, 400, 15000 + sql: | + select id, + count(ts) over w1 as agg1, + count(ts) over w2 as agg2, + count(ts) over w3 as agg3, + count(ts) over w4 as agg4, + from t1 + window w1 as ( + PARTITION by gp + ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), + w2 as (partition by gp + ROWS BETWEEN 2 PRECEDING AND CURRENT ROW EXCLUDE CURRENT_ROW), + w3 as ( + UNION (select * from t2) + PARTITION BY gp + ROWS BETWEEN 2 PRECEDING AND CURRENT ROW INSTANCE_NOT_IN_WINDOW), + w4 as ( + UNION (select * from t2) + PARTITION BY gp + ROWS BETWEEN 3 PRECEDING AND CURRENT ROW INSTANCE_NOT_IN_WINDOW EXCLUDE CURRENT_ROW) + expect: + columns: ["id int", "agg1 int64", "agg2 int64", "agg3 int64", "agg4 int64"] + order: id + data: | + 1, 1, 0, 3, 2 + 2, 2, 1, 3, 2 + 3, 1, 0, 3, 3 + 4, 2, 1, 3, 3 + 5, 3, 2, 3, 3 diff --git a/hybridse/include/node/sql_node.h b/hybridse/include/node/sql_node.h index bbdfc83313f..dcf162a96ab 100644 --- a/hybridse/include/node/sql_node.h +++ b/hybridse/include/node/sql_node.h @@ -1166,6 +1166,9 @@ class FrameBound : public SqlNode { int64_t GetOffset() const { return offset_; } void SetOffset(int64_t v) { offset_ = v; } + // is offset [OPEN] PRECEDING/FOLLOWING + bool is_offset_bound() const; + /// \brief get the inclusive frame bound offset value that has signed symbol /// diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h index ee3634615c8..d2fdafb5349 100644 --- a/hybridse/include/vm/physical_op.h +++ b/hybridse/include/vm/physical_op.h @@ -200,9 +200,9 @@ class Range : public FnComponent { const bool Valid() const { return nullptr != range_key_; } const std::string ToString() const { std::ostringstream oss; - if (nullptr != range_key_ && nullptr != frame_) { + if (nullptr != frame_) { if (nullptr != frame_->frame_range()) { - oss << "range=(" << range_key_->GetExprString() << ", " + oss << "range=(" << node::ExprString(range_key_) << ", " << frame_->frame_range()->start()->GetExprString() << ", " << frame_->frame_range()->end()->GetExprString(); @@ -216,7 +216,7 @@ class Range : public FnComponent { if (nullptr != frame_->frame_range()) { oss << ", "; } - oss << "rows=(" << range_key_->GetExprString() << ", " + oss << "rows=(" << node::ExprString(range_key_) << ", " << frame_->frame_rows()->start()->GetExprString() << ", " << frame_->frame_rows()->end()->GetExprString() << ")"; } @@ -578,7 +578,7 @@ class PhysicalRequestProviderNode : public PhysicalDataProviderNode { PhysicalOpNode **out) override; virtual ~PhysicalRequestProviderNode() {} - virtual void Print(std::ostream &output, const std::string &tab) const; + void Print(std::ostream &output, const std::string &tab) const override; }; class PhysicalRequestProviderNodeWithCommonColumn @@ -846,9 +846,7 @@ class WindowOp { std::ostringstream oss; oss << "partition_" << partition_.ToString(); oss << ", " << sort_.ToString(); - if (range_.Valid()) { - oss << ", " << range_.ToString(); - } + oss << ", " << range_.ToString(); return oss.str(); } const std::string FnDetail() const { diff --git a/hybridse/src/node/sql_node.cc b/hybridse/src/node/sql_node.cc index 16b88cd51ba..6fa2a82d42a 100644 --- a/hybridse/src/node/sql_node.cc +++ b/hybridse/src/node/sql_node.cc @@ -2100,6 +2100,11 @@ void FrameBound::Print(std::ostream &output, const std::string &org_tab) const { } } +bool FrameBound::is_offset_bound() const { + return bound_type_ == kPreceding || bound_type_ == kOpenPreceding || bound_type_ == kFollowing || + bound_type_ == kOpenFollowing; +} + int FrameBound::Compare(const FrameBound *bound1, const FrameBound *bound2) { if (SqlEquals(bound1, bound2)) { return 0; diff --git a/hybridse/src/plan/planner.cc b/hybridse/src/plan/planner.cc index c0a68e3104e..1584d76acbb 100644 --- a/hybridse/src/plan/planner.cc +++ b/hybridse/src/plan/planner.cc @@ -18,7 +18,6 @@ #include #include -#include #include #include #include diff --git a/hybridse/src/testing/engine_test_base.cc b/hybridse/src/testing/engine_test_base.cc index ca1af237936..2c3134d1257 100644 --- a/hybridse/src/testing/engine_test_base.cc +++ b/hybridse/src/testing/engine_test_base.cc @@ -240,8 +240,7 @@ void DoEngineCheckExpect(const SqlCase& sql_case, std::shared_ptr se if (!output_common_column_indices.empty() && output_common_column_indices.size() != static_cast(schema.size()) && sql_ctx.is_batch_request_optimized) { - LOG(INFO) << "Reorder batch request outputs for non-trival common " - "columns"; + DLOG(INFO) << "Reorder batch request outputs for non-trival columns"; auto& expect_common_column_indices = sql_case.expect().common_column_indices_; if (!expect_common_column_indices.empty()) { @@ -375,7 +374,7 @@ Status EngineTestRunner::Compile() { std::string placeholder = "{" + std::to_string(j) + "}"; boost::replace_all(sql_str, placeholder, sql_case_.inputs_[j].name_); } - LOG(INFO) << "Compile SQL:\n" << sql_str; + DLOG(INFO) << "Compile SQL:\n" << sql_str; CHECK_TRUE(session_ != nullptr, common::kTestEngineError, "Session is not set"); if (hybridse::sqlcase::SqlCase::IsDebug() || sql_case_.debug()) { session_->EnableDebug(); @@ -395,22 +394,23 @@ Status EngineTestRunner::Compile() { bool ok = engine_->Get(sql_str, sql_case_.db(), *session_, status); gettimeofday(&et, nullptr); double mill = (et.tv_sec - st.tv_sec) * 1000 + (et.tv_usec - st.tv_usec) / 1000.0; - LOG(INFO) << "SQL Compile take " << mill << " milliseconds"; + DLOG(INFO) << "SQL Compile take " << mill << " milliseconds"; if (!ok || !status.isOK()) { - LOG(INFO) << status; + DLOG(INFO) << status; + if (!sql_case_.expect().msg_.empty()) { + EXPECT_EQ(sql_case_.expect().msg_, status.msg); + } return_code_ = ENGINE_TEST_RET_COMPILE_ERROR; } else { - LOG(INFO) << "SQL output schema:"; + DLOG(INFO) << "SQL output schema:"; std::ostringstream oss; std::dynamic_pointer_cast(session_->GetCompileInfo())->GetPhysicalPlan()->Print(oss, ""); - LOG(INFO) << "Physical plan:"; - std::cerr << oss.str() << std::endl; + DLOG(INFO) << "Physical plan:\n" << oss.str(); std::ostringstream runner_oss; std::dynamic_pointer_cast(session_->GetCompileInfo())->GetClusterJob().Print(runner_oss, ""); - LOG(INFO) << "Runner plan:"; - std::cerr << runner_oss.str() << std::endl; + DLOG(INFO) << "Runner plan:\n" << runner_oss.str(); } return status; } diff --git a/hybridse/src/vm/runner.cc b/hybridse/src/vm/runner.cc index be954653b91..586f75c6187 100644 --- a/hybridse/src/vm/runner.cc +++ b/hybridse/src/vm/runner.cc @@ -2785,6 +2785,7 @@ std::shared_ptr RequestUnionRunner::Run( auto request = std::dynamic_pointer_cast(left)->GetValue(); + // ts_gen < 0 if there is no ORDER BY clause for WINDOW int64_t ts_gen = range_gen_.Valid() ? range_gen_.ts_gen_.Gen(request) : -1; // Prepare Union Window @@ -2798,31 +2799,35 @@ std::shared_ptr RequestUnionRunner::Run( std::shared_ptr RequestUnionRunner::RequestUnionWindow( const Row& request, std::vector> union_segments, int64_t ts_gen, const WindowRange& window_range, bool output_request_row, bool exclude_current_time) { - uint64_t start = 0; - // end is empty means end value < 0, that there is no effective window range + // range_start, range_end default to [0, MAX], so for the case without ORDER BY, + // RANGE-type WINDOW includes all rows in partition + uint64_t range_start = 0; + // range_end is empty means end value < 0, that there is no effective window range // this happend when `ts_gen` is 0 and exclude current_time needed - std::optional end = UINT64_MAX; - uint64_t rows_start_preceding = 0; - uint64_t max_size = 0; + std::optional range_end = UINT64_MAX; + uint64_t rows_start_preceding = window_range.start_row_; + uint64_t max_size = window_range.max_size_; if (ts_gen >= 0) { - start = (ts_gen + window_range.start_offset_) < 0 + range_start = (ts_gen + window_range.start_offset_) < 0 ? 0 : (ts_gen + window_range.start_offset_); if (exclude_current_time && 0 == window_range.end_offset_) { if (ts_gen == 0) { - end = {}; + range_end = {}; } else { - end = ts_gen - 1; + range_end = ts_gen - 1; } } else { - end = (ts_gen + window_range.end_offset_) < 0 + range_end = (ts_gen + window_range.end_offset_) < 0 ? 0 : (ts_gen + window_range.end_offset_); } - rows_start_preceding = window_range.start_row_; - max_size = window_range.max_size_; } - uint64_t request_key = ts_gen > 0 ? static_cast(ts_gen) : 0; + // INT64_MAX is the magic number as row key of input row, + // when WINDOW without ORDER BY + // + // DONT BELIEVE THE UNSIGNED TYPE, codegen still use int64_t as data type + uint64_t request_key = ts_gen >= 0 ? static_cast(ts_gen) : INT64_MAX; auto window_table = std::make_shared(); @@ -2841,7 +2846,7 @@ std::shared_ptr RequestUnionRunner::RequestUnionWindow( union_segment_status[i] = IteratorStatus(); continue; } - union_segment_iters[i]->Seek(end.value_or(0)); + union_segment_iters[i]->Seek(range_end.value_or(0)); if (!union_segment_iters[i]->Valid()) { union_segment_status[i] = IteratorStatus(); continue; @@ -2854,7 +2859,7 @@ std::shared_ptr RequestUnionRunner::RequestUnionWindow( uint64_t cnt = 0; auto range_status = window_range.GetWindowPositionStatus( cnt > rows_start_preceding, window_range.end_offset_ < 0, - request_key < start); + request_key < range_start); if (output_request_row) { window_table->AddRow(request_key, request); } @@ -2868,8 +2873,8 @@ std::shared_ptr RequestUnionRunner::RequestUnionWindow( } auto range_status = window_range.GetWindowPositionStatus( cnt > rows_start_preceding, - union_segment_status[max_union_pos].key_ > end, - union_segment_status[max_union_pos].key_ < start); + union_segment_status[max_union_pos].key_ > range_end, + union_segment_status[max_union_pos].key_ < range_start); if (WindowRange::kExceedWindow == range_status) { break; } diff --git a/hybridse/src/vm/transform.cc b/hybridse/src/vm/transform.cc index 8020c99741f..d52667dbc6f 100644 --- a/hybridse/src/vm/transform.cc +++ b/hybridse/src/vm/transform.cc @@ -25,8 +25,6 @@ #include "codegen/context.h" #include "codegen/fn_ir_builder.h" #include "codegen/fn_let_ir_builder.h" -#include "passes/expression/expr_pass.h" -#include "passes/lambdafy_projects.h" #include "passes/physical/batch_request_optimize.h" #include "passes/physical/cluster_optimized.h" #include "passes/physical/condition_optimized.h" @@ -2230,13 +2228,29 @@ Status BatchModeTransformer::CheckWindow( const node::WindowPlanNode* w_ptr, const vm::SchemasContext* schemas_ctx) { CHECK_TRUE(w_ptr != nullptr, common::kPlanError, "NULL Window"); CHECK_TRUE(!node::ExprListNullOrEmpty(w_ptr->GetKeys()), common::kPlanError, - "Invalid Window: Do not support window on non-partition"); - CHECK_TRUE(nullptr != w_ptr->GetOrders() && - !node::ExprListNullOrEmpty(w_ptr->GetOrders()->order_expressions_), - common::kPlanError, - "Invalid Window: Do not support window on non-order"); + "un-implemented: WINDOW without PARTITION BY clause"); CHECK_STATUS(CheckHistoryWindowFrame(w_ptr)); + // without ORDER BY clause: + if (w_ptr->GetOrders() == nullptr || node::ExprListNullOrEmpty(w_ptr->GetOrders()->order_expressions())) { + // 1. forbidden: RANGE/ROWS_RANGE WINDOW WITH offset PRECEDING/FOLLOWING + if (w_ptr->frame_node()->frame_type() != node::FrameType::kFrameRows) { + auto* range = w_ptr->frame_node()->frame_range(); + if ((range->start() && range->start()->is_offset_bound()) || + (range->end() && range->end()->is_offset_bound())) { + CHECK_TRUE( + false, common::kPlanError, + "RANGE/ROWS_RANGE-type FRAME with offset PRECEDING/FOLLOWING requires exactly one ORDER BY column") + } + } + + // 2. forbidden: WINDOW without ORDER BY + EXCLUDE CURRENT_TIME + if (w_ptr->exclude_current_time()) { + CHECK_TRUE(false, common::kPlanError, + "WINDOW with EXCLUDE CURRENT_TIME requires exactly one ORDER BY column"); + } + } + CHECK_STATUS(CheckTimeOrIntegerOrderColumn(w_ptr->GetOrders(), schemas_ctx)); return Status::OK();