From b39cb178ee8f3af911f1583fec391f6c4f33a6b5 Mon Sep 17 00:00:00 2001 From: aceforeverd Date: Sun, 15 Oct 2023 23:32:34 +0000 Subject: [PATCH] feat(online): support last join (window) --- cases/query/last_join_subquery_window.yml | 406 ++++++++++++++++++ .../toydb/src/tablet/tablet_catalog.cc | 33 +- .../toydb/src/tablet/tablet_catalog.h | 5 +- .../src/testing/toydb_engine_test_base.cc | 21 +- hybridse/include/codec/row.h | 2 +- hybridse/include/codec/row_iterator.h | 9 +- hybridse/include/codec/row_list.h | 2 +- hybridse/include/vm/catalog.h | 1 + hybridse/include/vm/mem_catalog.h | 9 +- hybridse/include/vm/physical_op.h | 47 +- .../passes/physical/batch_request_optimize.cc | 21 +- .../physical/group_and_sort_optimized.cc | 84 +++- .../physical/group_and_sort_optimized.h | 13 + .../physical/transform_up_physical_pass.h | 1 - hybridse/src/plan/planner.cc | 2 +- hybridse/src/testing/engine_test_base.cc | 2 + hybridse/src/testing/engine_test_base.h | 3 +- hybridse/src/vm/catalog_wrapper.cc | 219 ++++++++++ hybridse/src/vm/catalog_wrapper.h | 292 +++++++++++++ hybridse/src/vm/engine.cc | 2 +- hybridse/src/vm/generator.cc | 1 + hybridse/src/vm/generator.h | 35 +- hybridse/src/vm/internal/node_helper.cc | 62 +++ hybridse/src/vm/internal/node_helper.h | 7 + hybridse/src/vm/mem_catalog.cc | 2 - hybridse/src/vm/runner.cc | 138 ++++-- hybridse/src/vm/runner.h | 68 +-- hybridse/src/vm/transform.cc | 123 ++---- hybridse/src/vm/transform.h | 9 - src/sdk/sql_sdk_test.h | 2 + 30 files changed, 1351 insertions(+), 270 deletions(-) create mode 100644 cases/query/last_join_subquery_window.yml diff --git a/cases/query/last_join_subquery_window.yml b/cases/query/last_join_subquery_window.yml new file mode 100644 index 00000000000..81787f87e67 --- /dev/null +++ b/cases/query/last_join_subquery_window.yml @@ -0,0 +1,406 @@ +cases: + # =================================================================== + # LAST JOIN (WINDOW) + # =================================================================== + - id: 0 + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",2,1590738989000] + - ["bb",3,1590738990000] + - ["cc",4,1590738991000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp"] + indexs: ["index1:c1:c4", "index2:c2:c4"] + rows: + - ["aa",1, 1590738989000] + - ["bb",3, 1590738990000] + - ["dd",4, 1590738991000] + sql: | + select t1.c1, tx.c1 as c1r, tx.c2 as c2r, agg + from t1 last join ( + select c1, c2, count(c4) over w as agg + from t2 + window w as ( + partition by c1 order by c4 + rows between 1 preceding and current row + ) + ) tx + on t1.c2 = tx.c2 + request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg)) + REQUEST_JOIN(type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_REQUEST_ROW, partition_keys=(), orders=(ASC), rows=(c4, 1 PRECEDING, 0 CURRENT), index_keys=(c1)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, condition=, left_keys=(), right_keys=(), index_keys=(#5)) + SIMPLE_PROJECT(sources=(#5 -> t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1, c2, agg)) + REQUEST_JOIN(type=kJoinTypeConcat) + SIMPLE_PROJECT(sources=(c1, c2)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_REQUEST_ROW, partition_keys=(), orders=(ASC), rows=(c4, 1 PRECEDING, 0 CURRENT), index_keys=(c1)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + expect: + columns: ["c1 string", "c1r string", "c2r int", "agg int64"] + order: c1 + data: | + aa, NULL, NULL, NULL + bb, bb, 3, 1 + cc, dd, 4, 1 + - id: 1 + desc: last join window(attributes) + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",2,2000] + - ["bb",3,2000] + - ["cc",4,2000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp", "val int"] + indexs: ["index1:c1:c4", "index2:c2:c4"] + rows: + - ["aa",1, 1000, 1] + - ["aa",4, 2000, 2] + - ["bb",3, 3000, 3] + - ["dd",4, 8000, 4] + - ["dd",4, 7000, 5] + - ["dd",4, 9000, 6] + sql: | + select t1.c1, tx.c1 as c1r, tx.c2 as c2r, agg1, agg2 + from t1 last join ( + select c1, c2, c4, + count(c4) over w as agg1, + max(val) over w as agg2 + from t2 + window w as ( + partition by c1 order by c4 + rows between 2 preceding and current row + exclude current_row + ) + ) tx + order by tx.c4 + on t1.c2 = tx.c2 + request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg1, agg2)) + REQUEST_JOIN(type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_REQUEST_ROW, EXCLUDE_CURRENT_ROW, partition_keys=(), orders=(ASC), rows=(c4, 2 PRECEDING, 0 CURRENT), index_keys=(c1)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg1, agg2)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(#5)) + SIMPLE_PROJECT(sources=(#5 -> t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1, c2, c4, agg1, agg2)) + REQUEST_JOIN(type=kJoinTypeConcat) + SIMPLE_PROJECT(sources=(c1, c2, c4)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_REQUEST_ROW, EXCLUDE_CURRENT_ROW, partition_keys=(), orders=(ASC), rows=(c4, 2 PRECEDING, 0 CURRENT), index_keys=(c1)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + expect: + columns: ["c1 string", "c1r string", "c2r int", "agg1 int64", 'agg2 int'] + order: c1 + data: | + aa, NULL, NULL, NULL, NULL + bb, bb, 3, 0, NULL + cc, dd, 4, 2, 5 + - id: 2 + # issue on join to (multiple windows), fix later + mode: batch-unsupport + desc: last join multiple windows + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",2,2000] + - ["bb",3,2000] + - ["cc",4,2000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp", "val int", "gp int"] + indexs: ["index1:c1:c4", "index2:c2:c4", "index3:gp:c4"] + rows: + - ["aa",1, 1000, 1, 0] + - ["aa",4, 2000, 2, 0] + - ["bb",3, 3000, 3, 1] + - ["dd",4, 8000, 4, 1] + - ["dd",4, 7000, 5, 1] + - ["dd",4, 9000, 6, 1] + sql: | + select t1.c1, tx.c1 as c1r, tx.c2 as c2r, agg1, agg2, agg3 + from t1 last join ( + select c1, c2, c4, + count(c4) over w1 as agg1, + max(val) over w1 as agg2, + min(val) over w2 as agg3 + from t2 + window w1 as ( + partition by c1 order by c4 + rows between 2 preceding and current row + exclude current_row + ), + w2 as ( + partition by gp order by c4 + rows_range between 3s preceding and current row + exclude current_time + ) + ) tx + order by tx.c4 + on t1.c2 = tx.c2 + request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg1, agg2, agg3)) + REQUEST_JOIN(type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1, c2, c4, agg1, agg2, agg3)) + REQUEST_JOIN(type=kJoinTypeConcat) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_REQUEST_ROW, EXCLUDE_CURRENT_ROW, partition_keys=(), orders=(ASC), rows=(c4, 2 PRECEDING, 0 CURRENT), index_keys=(c1)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_REQUEST_ROW, EXCLUDE_CURRENT_TIME, partition_keys=(), orders=(ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=(gp)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t2, index=index3) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg1, agg2, agg3)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(#5)) + SIMPLE_PROJECT(sources=(#5 -> t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1, c2, c4, agg1, agg2, agg3)) + REQUEST_JOIN(type=kJoinTypeConcat) + REQUEST_JOIN(type=kJoinTypeConcat) + SIMPLE_PROJECT(sources=(c1, c2, c4)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_REQUEST_ROW, EXCLUDE_CURRENT_ROW, partition_keys=(), orders=(ASC), rows=(c4, 2 PRECEDING, 0 CURRENT), index_keys=(c1)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t2, index=index1) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_REQUEST_ROW, EXCLUDE_CURRENT_TIME, partition_keys=(), orders=(ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=(gp)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(type=Partition, table=t2, index=index3) + expect: + columns: ["c1 string", "c1r string", "c2r int", "agg1 int64", 'agg2 int', 'agg3 int'] + order: c1 + data: | + aa, NULL, NULL, NULL, NULL, NULL + bb, bb, 3, 0, NULL, NULL + cc, dd, 4, 2, 5, 4 + - id: 3 + desc: last join window union + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",2,2000] + - ["bb",3,2000] + - ["cc",4,2000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp", "val int"] + indexs: ["index1:c1:c4", "index2:c2:c4" ] + rows: + - ["aa",1, 1000, 1] + - ["aa",4, 2000, 2] + - ["bb",3, 3000, 3] + - ["dd",4, 8000, 4] + - ["dd",4, 9000, 6] + - name: t3 + columns: ["c1 string", "c2 int", "c4 timestamp", "val int"] + indexs: ["index1:c1:c4", "index2:c2:c4"] + rows: + - ["aa", 2, 1000, 5] + - ["bb", 3, 2000, 8] + - ["dd", 4, 4000, 12] + - ["dd", 4, 7000, 10] + - ["dd", 4, 6000, 11] + - ["dd", 4, 10000, 100] + sql: | + select t1.c1, tx.c1 as c1r, tx.c2 as c2r, agg1, agg2 + from t1 last join ( + select c1, c2, c4, + count(c4) over w1 as agg1, + max(val) over w1 as agg2, + from t2 + window w1 as ( + union t3 + partition by c1 order by c4 + rows_range between 3s preceding and current row + instance_not_in_window exclude current_row + ) + ) tx + order by tx.c4 + on t1.c2 = tx.c2 + request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg1, agg2)) + REQUEST_JOIN(type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_CURRENT_ROW, INSTANCE_NOT_IN_WINDOW, partition_keys=(c1), orders=(c4 ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=) + +-UNION(partition_keys=(), orders=(ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=(c1)) + RENAME(name=t2) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(table=t2) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg1, agg2)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(#5)) + SIMPLE_PROJECT(sources=(#5 -> t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1, c2, c4, agg1, agg2)) + REQUEST_JOIN(type=kJoinTypeConcat) + SIMPLE_PROJECT(sources=(c1, c2, c4)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_CURRENT_ROW, INSTANCE_NOT_IN_WINDOW, partition_keys=(c1), orders=(c4 ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=) + +-UNION(partition_keys=(), orders=(ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=(c1)) + RENAME(name=t2) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(table=t2) + expect: + columns: ["c1 string", "c1r string", "c2r int", "agg1 int64", 'agg2 int'] + order: c1 + data: | + aa, NULL, NULL, NULL, NULL + bb, bb, 3, 1, 8 + cc, dd, 4, 2, 11 + - id: 4 + desc: last join mulitple window union + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",2,2000] + - ["bb",3,2000] + - ["cc",4,2000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp", "val int"] + indexs: ["index1:c1:c4", "index2:c2:c4" ] + rows: + - ["aa",1, 1000, 1] + - ["aa",4, 2000, 2] + - ["bb",3, 3000, 3] + - ["dd",4, 8000, 4] + - ["dd",4, 9000, 6] + - name: t3 + columns: ["c1 string", "c2 int", "c4 timestamp", "val int"] + indexs: ["index1:c1:c4", "index2:c2:c4"] + rows: + - ["aa", 2, 1000, 5] + - ["bb", 3, 2000, 8] + - ["dd", 4, 4000, 12] + - ["dd", 4, 7000, 10] + - ["dd", 4, 6000, 11] + - ["dd", 4, 10000, 100] + sql: | + select t1.c1, tx.c1 as c1r, tx.c2 as c2r, agg1, agg2, agg3 + from t1 last join ( + select c1, c2, c4, + count(c4) over w1 as agg1, + max(val) over w1 as agg2, + min(val) over w2 as agg3 + from t2 + window w1 as ( + union t3 + partition by c1 order by c4 + rows_range between 3s preceding and current row + instance_not_in_window exclude current_row + ), + w2 as ( + union t3 + partition by c1 order by c4 + rows between 2 preceding and current row + instance_not_in_window + ) + ) tx + order by tx.c4 + on t1.c2 = tx.c2 + request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg1, agg2, agg3)) + REQUEST_JOIN(type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1, c2, c4, agg1, agg2, agg3)) + REQUEST_JOIN(type=kJoinTypeConcat) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_CURRENT_ROW, INSTANCE_NOT_IN_WINDOW, partition_keys=(c1), orders=(c4 ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=) + +-UNION(partition_keys=(), orders=(ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=(c1)) + RENAME(name=t2) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(table=t2) + PROJECT(type=Aggregation) + REQUEST_UNION(INSTANCE_NOT_IN_WINDOW, partition_keys=(c1), orders=(c4 ASC), rows=(c4, 2 PRECEDING, 0 CURRENT), index_keys=) + +-UNION(partition_keys=(), orders=(ASC), rows=(c4, 2 PRECEDING, 0 CURRENT), index_keys=(c1)) + RENAME(name=t2) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(table=t2) + cluster_request_plan: | + SIMPLE_PROJECT(sources=(t1.c1, tx.c1 -> c1r, tx.c2 -> c2r, agg1, agg2, agg3)) + REQUEST_JOIN(type=kJoinTypeConcat) + DATA_PROVIDER(request=t1) + REQUEST_JOIN(OUTPUT_RIGHT_ONLY, type=LastJoin, right_sort=(ASC), condition=, left_keys=(), right_keys=(), index_keys=(#5)) + SIMPLE_PROJECT(sources=(#5 -> t1.c2)) + DATA_PROVIDER(request=t1) + RENAME(name=tx) + SIMPLE_PROJECT(sources=(c1, c2, c4, agg1, agg2, agg3)) + REQUEST_JOIN(type=kJoinTypeConcat) + REQUEST_JOIN(type=kJoinTypeConcat) + SIMPLE_PROJECT(sources=(c1, c2, c4)) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + PROJECT(type=Aggregation) + REQUEST_UNION(EXCLUDE_CURRENT_ROW, INSTANCE_NOT_IN_WINDOW, partition_keys=(c1), orders=(c4 ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=) + +-UNION(partition_keys=(), orders=(ASC), range=(c4, 3000 PRECEDING, 0 CURRENT), index_keys=(c1)) + RENAME(name=t2) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(table=t2) + PROJECT(type=Aggregation) + REQUEST_UNION(INSTANCE_NOT_IN_WINDOW, partition_keys=(c1), orders=(c4 ASC), rows=(c4, 2 PRECEDING, 0 CURRENT), index_keys=) + +-UNION(partition_keys=(), orders=(ASC), rows=(c4, 2 PRECEDING, 0 CURRENT), index_keys=(c1)) + RENAME(name=t2) + DATA_PROVIDER(type=Partition, table=t3, index=index1) + DATA_PROVIDER(type=Partition, table=t2, index=index2) + DATA_PROVIDER(table=t2) + expect: + columns: ["c1 string", "c1r string", "c2r int", "agg1 int64", 'agg2 int', "agg3 int"] + order: c1 + data: | + aa, NULL, NULL, NULL, NULL, NULL + bb, bb, 3, 1, 8, 3 + cc, dd, 4, 2, 11, 6 diff --git a/hybridse/examples/toydb/src/tablet/tablet_catalog.cc b/hybridse/examples/toydb/src/tablet/tablet_catalog.cc index feeb750ab6f..71c2f34f407 100644 --- a/hybridse/examples/toydb/src/tablet/tablet_catalog.cc +++ b/hybridse/examples/toydb/src/tablet/tablet_catalog.cc @@ -136,22 +136,6 @@ RowIterator* TabletTableHandler::GetRawIterator() { return new storage::FullTableIterator(table_->GetSegments(), table_->GetSegCnt(), table_); } -const uint64_t TabletTableHandler::GetCount() { - auto iter = GetIterator(); - uint64_t cnt = 0; - while (iter->Valid()) { - iter->Next(); - cnt++; - } - return cnt; -} -Row TabletTableHandler::At(uint64_t pos) { - auto iter = GetIterator(); - while (pos-- > 0 && iter->Valid()) { - iter->Next(); - } - return iter->Valid() ? iter->GetValue() : Row(); -} TabletCatalog::TabletCatalog() : tables_(), db_() {} @@ -249,22 +233,6 @@ std::unique_ptr TabletSegmentHandler::GetWindowIterator( const std::string& idx_name) { return std::unique_ptr(); } -const uint64_t TabletSegmentHandler::GetCount() { - auto iter = GetIterator(); - uint64_t cnt = 0; - while (iter->Valid()) { - cnt++; - iter->Next(); - } - return cnt; -} -Row TabletSegmentHandler::At(uint64_t pos) { - auto iter = GetIterator(); - while (pos-- > 0 && iter->Valid()) { - iter->Next(); - } - return iter->Valid() ? iter->GetValue() : Row(); -} const uint64_t TabletPartitionHandler::GetCount() { auto iter = GetWindowIterator(); @@ -275,5 +243,6 @@ const uint64_t TabletPartitionHandler::GetCount() { } return cnt; } + } // namespace tablet } // namespace hybridse diff --git a/hybridse/examples/toydb/src/tablet/tablet_catalog.h b/hybridse/examples/toydb/src/tablet/tablet_catalog.h index fa41140a495..dd5bea22c51 100644 --- a/hybridse/examples/toydb/src/tablet/tablet_catalog.h +++ b/hybridse/examples/toydb/src/tablet/tablet_catalog.h @@ -68,8 +68,6 @@ class TabletSegmentHandler : public TableHandler { std::unique_ptr GetIterator() override; RowIterator* GetRawIterator() override; std::unique_ptr GetWindowIterator(const std::string& idx_name) override; - const uint64_t GetCount() override; - Row At(uint64_t pos) override; const std::string GetHandlerTypeName() override { return "TabletSegmentHandler"; } @@ -104,6 +102,7 @@ class TabletPartitionHandler std::unique_ptr GetWindowIterator() override { return table_handler_->GetWindowIterator(index_name_); } + const uint64_t GetCount() override; std::shared_ptr GetSegment(const std::string& key) override { @@ -152,8 +151,6 @@ class TabletTableHandler RowIterator* GetRawIterator() override; std::unique_ptr GetWindowIterator( const std::string& idx_name); - virtual const uint64_t GetCount(); - Row At(uint64_t pos) override; virtual std::shared_ptr GetPartition( const std::string& index_name) { diff --git a/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc b/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc index fcaa71d8373..35a595b431e 100644 --- a/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc +++ b/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc @@ -15,8 +15,9 @@ */ #include "testing/toydb_engine_test_base.h" + +#include "absl/strings/str_join.h" #include "gtest/gtest.h" -#include "gtest/internal/gtest-param-util.h" using namespace llvm; // NOLINT (build/namespaces) using namespace llvm::orc; // NOLINT (build/namespaces) @@ -141,18 +142,12 @@ std::shared_ptr BuildOnePkTableStorage( } return catalog; } -void BatchRequestEngineCheckWithCommonColumnIndices( - const SqlCase& sql_case, const EngineOptions options, - const std::set& common_column_indices) { - std::ostringstream oss; - for (size_t index : common_column_indices) { - oss << index << ","; - } - LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: " - "common_column_indices = [" - << oss.str() << "]"; - ToydbBatchRequestEngineTestRunner engine_test(sql_case, options, - common_column_indices); +// Run check with common column index info +void BatchRequestEngineCheckWithCommonColumnIndices(const SqlCase& sql_case, const EngineOptions options, + const std::set& common_column_indices) { + LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: common_column_indices = [" + << absl::StrJoin(common_column_indices, ",") << "]"; + ToydbBatchRequestEngineTestRunner engine_test(sql_case, options, common_column_indices); engine_test.RunCheck(); } diff --git a/hybridse/include/codec/row.h b/hybridse/include/codec/row.h index cd6abb0a3a1..69158d41e85 100644 --- a/hybridse/include/codec/row.h +++ b/hybridse/include/codec/row.h @@ -54,7 +54,7 @@ class Row { inline int32_t size() const { return slice_.size(); } inline int32_t size(int32_t pos) const { - return 0 == pos ? slice_.size() : slices_[pos - 1].size(); + return 0 == pos ? slice_.size() : slices_.at(pos - 1).size(); } // Return true if the length of the referenced data is zero diff --git a/hybridse/include/codec/row_iterator.h b/hybridse/include/codec/row_iterator.h index 2075918666c..fa60d21a37e 100644 --- a/hybridse/include/codec/row_iterator.h +++ b/hybridse/include/codec/row_iterator.h @@ -71,7 +71,14 @@ class WindowIterator { virtual bool Valid() = 0; /// Return the RowIterator of current segment /// of dataset if Valid() return `true`. - virtual std::unique_ptr GetValue() = 0; + virtual std::unique_ptr GetValue() { + auto p = GetRawValue(); + if (!p) { + return nullptr; + } + + return std::unique_ptr(p); + } /// Return the RowIterator of current segment /// of dataset if Valid() return `true`. virtual RowIterator *GetRawValue() = 0; diff --git a/hybridse/include/codec/row_list.h b/hybridse/include/codec/row_list.h index b32ad24c3eb..cfc83fae6a1 100644 --- a/hybridse/include/codec/row_list.h +++ b/hybridse/include/codec/row_list.h @@ -76,7 +76,7 @@ class ListV { virtual const uint64_t GetCount() { auto iter = GetIterator(); uint64_t cnt = 0; - while (iter->Valid()) { + while (iter && iter->Valid()) { iter->Next(); cnt++; } diff --git a/hybridse/include/vm/catalog.h b/hybridse/include/vm/catalog.h index 30e68316606..70a422f8924 100644 --- a/hybridse/include/vm/catalog.h +++ b/hybridse/include/vm/catalog.h @@ -217,6 +217,7 @@ class TableHandler : public DataHandler { virtual ~TableHandler() {} /// Return table column Types information. + /// TODO: rm it, never used virtual const Types& GetTypes() = 0; /// Return the index information diff --git a/hybridse/include/vm/mem_catalog.h b/hybridse/include/vm/mem_catalog.h index 2fc5df4960c..dffb17a8af1 100644 --- a/hybridse/include/vm/mem_catalog.h +++ b/hybridse/include/vm/mem_catalog.h @@ -25,8 +25,6 @@ #include #include #include -#include "base/fe_slice.h" -#include "codec/list_iterator_codec.h" #include "glog/logging.h" #include "vm/catalog.h" @@ -674,6 +672,7 @@ class MemPartitionHandler IndexHint index_hint_; OrderType order_type_; }; + class ConcatTableHandler : public MemTimeTableHandler { public: ConcatTableHandler(std::shared_ptr left, size_t left_slices, @@ -692,19 +691,19 @@ class ConcatTableHandler : public MemTimeTableHandler { status_ = SyncValue(); return MemTimeTableHandler::At(pos); } - std::unique_ptr GetIterator() { + std::unique_ptr GetIterator() override { if (status_.isRunning()) { status_ = SyncValue(); } return MemTimeTableHandler::GetIterator(); } - RowIterator* GetRawIterator() { + RowIterator* GetRawIterator() override { if (status_.isRunning()) { status_ = SyncValue(); } return MemTimeTableHandler::GetRawIterator(); } - virtual const uint64_t GetCount() { + const uint64_t GetCount() override { if (status_.isRunning()) { status_ = SyncValue(); } diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h index d2fdafb5349..0701bdda3a6 100644 --- a/hybridse/include/vm/physical_op.h +++ b/hybridse/include/vm/physical_op.h @@ -785,7 +785,11 @@ class PhysicalAggregationNode : public PhysicalProjectNode { public: PhysicalAggregationNode(PhysicalOpNode *node, const ColumnProjects &project, const node::ExprNode *condition) : PhysicalProjectNode(node, kAggregation, project, true), having_condition_(condition) { - output_type_ = kSchemaTypeRow; + if (node->GetOutputType() == kSchemaTypeGroup) { + output_type_ = kSchemaTypeGroup; + } else { + output_type_ = kSchemaTypeRow; + } fn_infos_.push_back(&having_condition_.fn_info()); } virtual ~PhysicalAggregationNode() {} @@ -1065,7 +1069,7 @@ class RequestWindowUnionList { RequestWindowUnionList() : window_unions_() {} virtual ~RequestWindowUnionList() {} void AddWindowUnion(PhysicalOpNode *node, const RequestWindowOp &window) { - window_unions_.push_back(std::make_pair(node, window)); + window_unions_.emplace_back(node, window); } const PhysicalOpNode *GetKey(uint32_t index) { auto iter = window_unions_.begin(); @@ -1415,7 +1419,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { instance_not_in_window_(false), exclude_current_time_(false), output_request_row_(true) { - output_type_ = kSchemaTypeTable; + InitOuptput(); fn_infos_.push_back(&window_.partition_.fn_info()); fn_infos_.push_back(&window_.index_key_.fn_info()); @@ -1427,7 +1431,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { instance_not_in_window_(w_ptr->instance_not_in_window()), exclude_current_time_(w_ptr->exclude_current_time()), output_request_row_(true) { - output_type_ = kSchemaTypeTable; + InitOuptput(); fn_infos_.push_back(&window_.partition_.fn_info()); fn_infos_.push_back(&window_.sort_.fn_info()); @@ -1443,7 +1447,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { instance_not_in_window_(instance_not_in_window), exclude_current_time_(exclude_current_time), output_request_row_(output_request_row) { - output_type_ = kSchemaTypeTable; + InitOuptput(); fn_infos_.push_back(&window_.partition_.fn_info()); fn_infos_.push_back(&window_.sort_.fn_info()); @@ -1455,7 +1459,8 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { virtual void Print(std::ostream &output, const std::string &tab) const; const bool Valid() { return true; } static PhysicalRequestUnionNode *CastFrom(PhysicalOpNode *node); - bool AddWindowUnion(PhysicalOpNode *node) { + bool AddWindowUnion(PhysicalOpNode *node) { return AddWindowUnion(node, window_); } + bool AddWindowUnion(PhysicalOpNode *node, const RequestWindowOp& window) { if (nullptr == node) { LOG(WARNING) << "Fail to add window union : table is null"; return false; @@ -1472,9 +1477,8 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { << "Union Table and window input schema aren't consistent"; return false; } - window_unions_.AddWindowUnion(node, window_); - RequestWindowOp &window_union = - window_unions_.window_unions_.back().second; + window_unions_.AddWindowUnion(node, window); + RequestWindowOp &window_union = window_unions_.window_unions_.back().second; fn_infos_.push_back(&window_union.partition_.fn_info()); fn_infos_.push_back(&window_union.sort_.fn_info()); fn_infos_.push_back(&window_union.range_.fn_info()); @@ -1484,11 +1488,10 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { std::vector GetDependents() const override; - const bool instance_not_in_window() const { - return instance_not_in_window_; - } - const bool exclude_current_time() const { return exclude_current_time_; } - const bool output_request_row() const { return output_request_row_; } + bool instance_not_in_window() const { return instance_not_in_window_; } + bool exclude_current_time() const { return exclude_current_time_; } + bool output_request_row() const { return output_request_row_; } + void set_output_request_row(bool flag) { output_request_row_ = flag; } const RequestWindowOp &window() const { return window_; } const RequestWindowUnionList &window_unions() const { return window_unions_; @@ -1506,10 +1509,20 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { } RequestWindowOp window_; - const bool instance_not_in_window_; - const bool exclude_current_time_; - const bool output_request_row_; + bool instance_not_in_window_; + bool exclude_current_time_; + bool output_request_row_; RequestWindowUnionList window_unions_; + + private: + void InitOuptput() { + auto left = GetProducer(0); + if (left->GetOutputType() == kSchemaTypeRow) { + output_type_ = kSchemaTypeTable; + } else { + output_type_ = kSchemaTypeGroup; + } + } }; class PhysicalRequestAggUnionNode : public PhysicalOpNode { diff --git a/hybridse/src/passes/physical/batch_request_optimize.cc b/hybridse/src/passes/physical/batch_request_optimize.cc index 52488e6a981..86fdfee92c5 100644 --- a/hybridse/src/passes/physical/batch_request_optimize.cc +++ b/hybridse/src/passes/physical/batch_request_optimize.cc @@ -269,6 +269,7 @@ static Status UpdateProjectExpr( return replacer.Replace(expr->DeepCopy(ctx->node_manager()), output); } +// simplify simple project, remove orphan descendant producer nodes static Status CreateSimplifiedProject(PhysicalPlanContext* ctx, PhysicalOpNode* input, const ColumnProjects& projects, @@ -279,8 +280,7 @@ static Status CreateSimplifiedProject(PhysicalPlanContext* ctx, can_project = false; for (size_t i = 0; i < cur_input->producers().size(); ++i) { auto cand_input = cur_input->GetProducer(i); - if (cand_input->GetOutputType() != - PhysicalSchemaType::kSchemaTypeRow) { + if (cand_input->GetOutputType() != PhysicalSchemaType::kSchemaTypeRow) { continue; } bool is_valid = true; @@ -949,21 +949,16 @@ Status CommonColumnOptimize::ProcessJoin(PhysicalPlanContext* ctx, } } else if (is_non_common_join) { // join only depend on non-common left part - if (left_state->non_common_op == join_op->GetProducer(0) && - right == join_op->GetProducer(1)) { + if (left_state->non_common_op == join_op->GetProducer(0) && right == join_op->GetProducer(1)) { state->common_op = nullptr; state->non_common_op = join_op; } else { PhysicalRequestJoinNode* new_join = nullptr; - CHECK_STATUS(ctx->CreateOp( - &new_join, left_state->non_common_op, right, join_op->join(), - join_op->output_right_only())); - CHECK_STATUS(ReplaceComponentExpr( - join_op->join(), join_op->joined_schemas_ctx(), - new_join->joined_schemas_ctx(), ctx->node_manager(), - &new_join->join_)); - state->common_op = - join_op->output_right_only() ? nullptr : left_state->common_op; + CHECK_STATUS(ctx->CreateOp(&new_join, left_state->non_common_op, right, + join_op->join(), join_op->output_right_only())); + CHECK_STATUS(ReplaceComponentExpr(join_op->join(), join_op->joined_schemas_ctx(), + new_join->joined_schemas_ctx(), ctx->node_manager(), &new_join->join_)); + state->common_op = join_op->output_right_only() ? nullptr : left_state->common_op; state->non_common_op = new_join; if (!join_op->output_right_only()) { for (size_t left_idx : left_state->common_column_indices) { diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.cc b/hybridse/src/passes/physical/group_and_sort_optimized.cc index ae333b6af47..2d51b336167 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.cc +++ b/hybridse/src/passes/physical/group_and_sort_optimized.cc @@ -25,6 +25,7 @@ #include "absl/cleanup/cleanup.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" +#include "node/node_enum.h" #include "vm/physical_op.h" namespace hybridse { @@ -294,6 +295,7 @@ bool GroupAndSortOptimized::KeysOptimized(const SchemasContext* root_schemas_ctx absl::Cleanup clean = [&]() { expr_cache_.clear(); + optimize_info_ = nullptr; }; auto s = BuildExprCache(left_key->keys(), root_schemas_ctx); @@ -347,6 +349,18 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas if (DataProviderType::kProviderTypeTable == scan_op->provider_type_ || DataProviderType::kProviderTypePartition == scan_op->provider_type_) { + auto* table_node = dynamic_cast(scan_op); + if (optimize_info_) { + if (optimize_info_->left_key == left_key && optimize_info_->index_key == index_key && + optimize_info_->right_key == right_key && optimize_info_->sort_key == sort) { + if (optimize_info_->optimized != nullptr && + table_node->GetDb() == optimize_info_->optimized->GetDb() && + table_node->GetName() == optimize_info_->optimized->GetName()) { + *new_in = optimize_info_->optimized; + return true; + } + } + } const node::ExprListNode* right_partition = right_key == nullptr ? left_key->keys() : right_key->keys(); @@ -453,13 +467,15 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas dynamic_cast(node_manager_->MakeOrderByNode(node_manager_->MakeExprList( node_manager_->MakeOrderExpression(nullptr, first_order_expression->is_asc()))))); } + + optimize_info_.reset(new OptimizeInfo(left_key, index_key, right_key, sort, partition_op)); *new_in = partition_op; return true; } } else if (PhysicalOpType::kPhysicalOpSimpleProject == in->GetOpType()) { PhysicalOpNode* new_depend; - if (!KeysOptimizedImpl(in->GetProducer(0)->schemas_ctx(), in->GetProducer(0), left_key, index_key, right_key, sort, - &new_depend)) { + if (!KeysOptimizedImpl(in->GetProducer(0)->schemas_ctx(), in->GetProducer(0), left_key, index_key, right_key, + sort, &new_depend)) { return false; } @@ -493,7 +509,8 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas PhysicalFilterNode* filter_op = dynamic_cast(in); PhysicalOpNode* new_depend; - if (!KeysOptimizedImpl(root_schemas_ctx, in->producers()[0], left_key, index_key, right_key, sort, &new_depend)) { + if (!KeysOptimizedImpl(root_schemas_ctx, in->producers()[0], left_key, index_key, right_key, sort, + &new_depend)) { return false; } PhysicalFilterNode* new_filter = nullptr; @@ -515,8 +532,16 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas &new_depend)) { return false; } + PhysicalOpNode* new_right = in->GetProducer(1); + if (request_join->join_.join_type_ == node::kJoinTypeConcat) { + // for concat join, only acceptable if the two inputs (of course same table) optimized by the same index + auto* rebase_sc = in->GetProducer(1)->schemas_ctx(); + if (!KeysOptimizedImpl(rebase_sc, in->GetProducer(1), left_key, index_key, right_key, sort, &new_right)) { + return false; + } + } PhysicalRequestJoinNode* new_join = nullptr; - auto s = plan_ctx_->CreateOp(&new_join, new_depend, request_join->GetProducer(1), + auto s = plan_ctx_->CreateOp(&new_join, new_depend, new_right, request_join->join(), request_join->output_right_only()); if (!s.isOK()) { LOG(WARNING) << "Fail to create new request join op: " << s; @@ -545,6 +570,57 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas *new_in = new_join; return true; + } else if (PhysicalOpType::kPhysicalOpProject == in->GetOpType()) { + auto * project = dynamic_cast(in); + if (project == nullptr || project->project_type_ != vm::kAggregation) { + return false; + } + + auto * agg_project = dynamic_cast(in); + + PhysicalOpNode* new_depend = nullptr; + auto* rebase_sc = in->GetProducer(0)->schemas_ctx(); + if (!KeysOptimizedImpl(rebase_sc, in->GetProducer(0), left_key, index_key, right_key, sort, + &new_depend)) { + return false; + } + + vm::PhysicalAggregationNode* new_agg = nullptr; + if (!plan_ctx_ + ->CreateOp(&new_agg, new_depend, agg_project->project(), + agg_project->having_condition_.condition()) + .isOK()) { + return false; + } + *new_in = new_agg; + return true; + } else if (PhysicalOpType::kPhysicalOpRequestUnion == in->GetOpType()) { + // JOIN (..., AGG(REQUEST_UNION(left, ...))): JOIN condition optimizing left + PhysicalOpNode* new_left_depend = nullptr; + auto* rebase_sc = in->GetProducer(0)->schemas_ctx(); + if (!KeysOptimizedImpl(rebase_sc, in->GetProducer(0), left_key, index_key, right_key, sort, + &new_left_depend)) { + return false; + } + + auto * request_union = dynamic_cast(in); + + vm::PhysicalRequestUnionNode* new_union = nullptr; + if (!plan_ctx_ + ->CreateOp( + &new_union, new_left_depend, in->GetProducer(1), request_union->window(), + request_union->instance_not_in_window(), request_union->exclude_current_time(), + request_union->output_request_row()) + .isOK()) { + return false; + } + for (auto& pair : request_union->window_unions().window_unions_) { + if (!new_union->AddWindowUnion(pair.first, pair.second)) { + return false; + } + } + *new_in = new_union; + return true; } return false; } diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.h b/hybridse/src/passes/physical/group_and_sort_optimized.h index 1d410f2b8e8..2e50571b29d 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.h +++ b/hybridse/src/passes/physical/group_and_sort_optimized.h @@ -93,6 +93,17 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { std::string db_name; }; + struct OptimizeInfo { + OptimizeInfo(const Key* left_key, const Key* index_key, const Key* right_key, const Sort* s, + vm::PhysicalPartitionProviderNode* optimized) + : left_key(left_key), index_key(index_key), right_key(right_key), sort_key(s), optimized(optimized) {} + const Key* left_key; + const Key* index_key; + const Key* right_key; + const Sort* sort_key; + vm::PhysicalPartitionProviderNode* optimized; + }; + private: bool Transform(PhysicalOpNode* in, PhysicalOpNode** output); @@ -149,6 +160,8 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { // A source column name is the column name in string that refers to a physical table, // only one table got optimized each time std::unordered_map expr_cache_; + + std::unique_ptr optimize_info_; }; } // namespace passes } // namespace hybridse diff --git a/hybridse/src/passes/physical/transform_up_physical_pass.h b/hybridse/src/passes/physical/transform_up_physical_pass.h index fed721d4c66..a9a80bd90b4 100644 --- a/hybridse/src/passes/physical/transform_up_physical_pass.h +++ b/hybridse/src/passes/physical/transform_up_physical_pass.h @@ -17,7 +17,6 @@ #define HYBRIDSE_SRC_PASSES_PHYSICAL_TRANSFORM_UP_PHYSICAL_PASS_H_ #include -#include #include #include diff --git a/hybridse/src/plan/planner.cc b/hybridse/src/plan/planner.cc index 1584d76acbb..fc350d1ffb6 100644 --- a/hybridse/src/plan/planner.cc +++ b/hybridse/src/plan/planner.cc @@ -272,7 +272,7 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, n auto first_window_project = dynamic_cast(project_list_vec[1]); node::ProjectListNode *merged_project = node_manager_->MakeProjectListPlanNode(first_window_project->GetW(), true); - if (!is_cluster_optimized_ && !enable_batch_window_parallelization_ && + if (!is_cluster_optimized_ && !enable_batch_window_parallelization_ && node::ProjectListNode::MergeProjectList(simple_project, first_window_project, merged_project)) { project_list_vec[0] = nullptr; project_list_vec[1] = merged_project; diff --git a/hybridse/src/testing/engine_test_base.cc b/hybridse/src/testing/engine_test_base.cc index 2c3134d1257..9a0ad6fdd39 100644 --- a/hybridse/src/testing/engine_test_base.cc +++ b/hybridse/src/testing/engine_test_base.cc @@ -536,6 +536,8 @@ INSTANTIATE_TEST_SUITE_P(EngineLastJoinQuery, EngineTest, INSTANTIATE_TEST_SUITE_P(EngineLastJoinWindowQuery, EngineTest, testing::ValuesIn(sqlcase::InitCases("cases/query/last_join_window_query.yaml"))); +INSTANTIATE_TEST_SUITE_P(EngineLastJoinSubqueryWindow, EngineTest, + testing::ValuesIn(sqlcase::InitCases("cases/query/last_join_subquery_window.yml"))); INSTANTIATE_TEST_SUITE_P(EngineLastJoinWhere, EngineTest, testing::ValuesIn(sqlcase::InitCases("cases/query/last_join_where.yaml"))); INSTANTIATE_TEST_SUITE_P(EngineWindowQuery, EngineTest, diff --git a/hybridse/src/testing/engine_test_base.h b/hybridse/src/testing/engine_test_base.h index e759169f0fd..0805ff1b3c5 100644 --- a/hybridse/src/testing/engine_test_base.h +++ b/hybridse/src/testing/engine_test_base.h @@ -318,8 +318,7 @@ class BatchRequestEngineTestRunner : public EngineTestRunner { bool has_batch_request = !sql_case_.batch_request().columns_.empty(); if (!has_batch_request) { - LOG(WARNING) << "No batch request field in case, " - << "try use last row from primary input"; + LOG(WARNING) << "No batch request field in case, try use last row from primary input"; } std::vector original_request_data; diff --git a/hybridse/src/vm/catalog_wrapper.cc b/hybridse/src/vm/catalog_wrapper.cc index d134a92e51b..b10c6f1c55b 100644 --- a/hybridse/src/vm/catalog_wrapper.cc +++ b/hybridse/src/vm/catalog_wrapper.cc @@ -164,5 +164,224 @@ RowIterator* LazyLastJoinWindowIterator::GetRawValue() { return new LazyLastJoinIterator(std::move(iter), right_, parameter_, join_); } + +std::shared_ptr ConcatPartitionHandler::GetSegment(const std::string& key) { + auto left_seg = left_->GetSegment(key); + auto right_seg = right_->GetSegment(key); + return std::shared_ptr( + new SimpleConcatTableHandler(left_seg, left_slices_, right_seg, right_slices_)); +} + +RowIterator* ConcatPartitionHandler::GetRawIterator() { + auto li = left_->GetIterator(); + if (!li) { + return nullptr; + } + auto ri = right_->GetIterator(); + return new ConcatIterator(std::move(li), left_slices_, std::move(ri), right_slices_); +} + +std::unique_ptr ConcatPartitionHandler::GetIterator() { + auto p = GetRawIterator(); + if (p == nullptr) { + return {}; + } + return std::unique_ptr(p); +} + +std::unique_ptr LazyRequestUnionPartitionHandler::GetWindowIterator() { + auto w = left_->GetWindowIterator(); + if (!w) { + return {}; + } + + return std::unique_ptr(new LazyRequestUnionWindowIterator(std::move(w), func_)); +} + +std::shared_ptr LazyRequestUnionPartitionHandler::GetSegment(const std::string& key) { + return nullptr; +} + +std::unique_ptr LazyRequestUnionPartitionHandler::GetIterator() { + return std::unique_ptr(GetRawIterator()); +} +const IndexHint& LazyRequestUnionPartitionHandler::GetIndex() { return left_->GetIndex(); } + +const Types& LazyRequestUnionPartitionHandler::GetTypes() { return left_->GetTypes(); } + +base::ConstIterator* LazyRequestUnionPartitionHandler::GetRawIterator() { return nullptr; } +bool LazyAggIterator::Valid() const { return it_->Valid(); } +void LazyAggIterator::Next() { it_->Next(); } +const uint64_t& LazyAggIterator::GetKey() const { return it_->GetKey(); } +const Row& LazyAggIterator::GetValue() { + if (Valid()) { + auto request = it_->GetValue(); + auto window = func_(request); + if (window) { + buf_ = agg_gen_->Gen(parameter_, window); + return buf_; + } + } + + buf_ = Row(); + return buf_; +} + +void LazyAggIterator::Seek(const uint64_t& key) { it_->Seek(key); } +void LazyAggIterator::SeekToFirst() { it_->SeekToFirst(); } +std::unique_ptr LazyAggTableHandler::GetIterator() { + auto* it = GetRawIterator(); + if (it == nullptr) { + return {}; + } + return std::unique_ptr(it); +} +std::unique_ptr LazyAggTableHandler::GetWindowIterator(const std::string& idx_name) { return nullptr; } +base::ConstIterator* LazyAggTableHandler::GetRawIterator() { + auto it = left_->GetIterator(); + if (!it) { + return nullptr; + } + return new LazyAggIterator(std::move(it), func_, agg_gen_, parameter_); +} +std::shared_ptr LazyAggTableHandler::GetPartition(const std::string& index_name) { return nullptr; } +const Types& LazyAggTableHandler::GetTypes() { return left_->GetTypes(); } +const IndexHint& LazyAggTableHandler::GetIndex() { return left_->GetIndex(); } +const Schema* LazyAggTableHandler::GetSchema() { return nullptr; } +const std::string& LazyAggTableHandler::GetName() { return left_->GetName(); } +const std::string& LazyAggTableHandler::GetDatabase() { return left_->GetDatabase(); } +std::shared_ptr LazyAggPartitionHandler::GetSegment(const std::string& key) { + auto seg = input_->Left()->GetSegment(key); + return std::shared_ptr(new LazyAggTableHandler(seg, input_->Func(), agg_gen_, parameter_)); +} +const std::string LazyAggPartitionHandler::GetHandlerTypeName() { return "LazyLastJoinPartitionHandler"; } +std::unique_ptr LazyAggPartitionHandler::GetIterator() { + auto it = input_->Left()->GetIterator(); + return std::unique_ptr(new LazyAggIterator(std::move(it), input_->Func(), agg_gen_, parameter_)); +} +base::ConstIterator* LazyAggPartitionHandler::GetRawIterator() { return nullptr; } +bool ConcatIterator::Valid() const { return left_ && left_->Valid(); } +void ConcatIterator::Next() { + left_->Next(); + if (right_ && right_->Valid()) { + right_->Next(); + } +} +const uint64_t& ConcatIterator::GetKey() const { return left_->GetKey(); } +const Row& ConcatIterator::GetValue() { + if (!right_ || !right_->Valid()) { + buf_ = Row(left_slices_, left_->GetValue(), right_slices_, Row()); + } else { + buf_ = Row(left_slices_, left_->GetValue(), right_slices_, right_->GetValue()); + } + return buf_; +} +void ConcatIterator::Seek(const uint64_t& key) { + left_->Seek(key); + if (right_ && right_->Valid()) { + right_->Seek(key); + } +} +void ConcatIterator::SeekToFirst() { + left_->SeekToFirst(); + if (right_) { + right_->SeekToFirst(); + } +} +std::unique_ptr SimpleConcatTableHandler::GetIterator() { + auto p = GetRawIterator(); + if (p == nullptr) { + return {}; + } + return std::unique_ptr(p); +} +RowIterator* SimpleConcatTableHandler::GetRawIterator() { + auto li = left_->GetIterator(); + if (!li) { + return nullptr; + } + auto ri = right_->GetIterator(); + return new ConcatIterator(std::move(li), left_slices_, std::move(ri), right_slices_); +} +std::unique_ptr SimpleConcatTableHandler::GetWindowIterator(const std::string& idx_name) { + return nullptr; +} +std::unique_ptr ConcatPartitionHandler::GetWindowIterator() { return nullptr; } +std::unique_ptr ConcatPartitionHandler::GetWindowIterator(const std::string& idx_name) { + return nullptr; +} + +std::unique_ptr LazyAggPartitionHandler::GetWindowIterator() { + auto w = input_->Left()->GetWindowIterator(); + return std::unique_ptr( + new LazyAggWindowIterator(std::move(w), input_->Func(), agg_gen_, parameter_)); +} + +RowIterator* LazyAggWindowIterator::GetRawValue() { + auto w = left_->GetValue(); + if (!w) { + return nullptr; + } + + return new LazyAggIterator(std::move(w), func_, agg_gen_, parameter_); +} +void LazyRequestUnionIterator::Next() { + if (Valid()) { + cur_iter_->Next(); + } + if (!Valid()) { + left_->Next(); + OnNewRow(); + } +} +bool LazyRequestUnionIterator::Valid() const { return cur_iter_ && cur_iter_->Valid(); } +void LazyRequestUnionIterator::Seek(const uint64_t& key) { + left_->Seek(key); + OnNewRow(false); +} +void LazyRequestUnionIterator::SeekToFirst() { + left_->SeekToFirst(); + OnNewRow(); +} +void LazyRequestUnionIterator::OnNewRow(bool continue_on_empty) { + while (left_->Valid()) { + auto row = left_->GetValue(); + auto tb = func_(row); + if (tb) { + auto it = tb->GetIterator(); + if (it) { + it->SeekToFirst(); + if (it->Valid()) { + cur_window_ = tb; + cur_iter_ = std::move(it); + break; + } + } + } + + if (continue_on_empty) { + left_->Next(); + } else { + cur_window_ = {}; + cur_iter_ = {}; + break; + } + } +} +const uint64_t& LazyRequestUnionIterator::GetKey() const { return cur_iter_->GetKey(); } +const Row& LazyRequestUnionIterator::GetValue() { return cur_iter_->GetValue(); } +RowIterator* LazyRequestUnionWindowIterator::GetRawValue() { + auto rows = left_->GetValue(); + if (!rows) { + return {}; + } + + return new LazyRequestUnionIterator(std::move(rows), func_); +} +bool LazyRequestUnionWindowIterator::Valid() { return left_ && left_->Valid(); } +const Row LazyRequestUnionWindowIterator::GetKey() { return left_->GetKey(); } +void LazyRequestUnionWindowIterator::SeekToFirst() { left_->SeekToFirst(); } +void LazyRequestUnionWindowIterator::Seek(const std::string& key) { left_->Seek(key); } +void LazyRequestUnionWindowIterator::Next() { left_->Next(); } } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/catalog_wrapper.h b/hybridse/src/vm/catalog_wrapper.h index 11441b4bf54..855eb1f703a 100644 --- a/hybridse/src/vm/catalog_wrapper.h +++ b/hybridse/src/vm/catalog_wrapper.h @@ -17,10 +17,12 @@ #ifndef HYBRIDSE_SRC_VM_CATALOG_WRAPPER_H_ #define HYBRIDSE_SRC_VM_CATALOG_WRAPPER_H_ +#include #include #include #include +#include "codec/row_iterator.h" #include "vm/catalog.h" #include "vm/generator.h" @@ -705,6 +707,296 @@ class LazyLastJoinWindowIterator final : public codec::WindowIterator { std::shared_ptr join_; }; +class LazyRequestUnionIterator final : public RowIterator { + public: + LazyRequestUnionIterator(std::unique_ptr&& left, + std::function(const Row&)> func) + : left_(std::move(left)), func_(func) { + SeekToFirst(); + } + ~LazyRequestUnionIterator() override {} + + bool Valid() const override; + void Next() override; + const uint64_t& GetKey() const override; + const Row& GetValue() override; + bool IsSeekable() const override { return true; } + + void Seek(const uint64_t& key) override; + void SeekToFirst() override; + + private: + void OnNewRow(bool continue_on_empty = true); + + private: + // all same keys from left form a window, although it is better that every row be a partition + std::unique_ptr left_; + std::function(const Row&)> func_; + + std::shared_ptr cur_window_; + std::unique_ptr cur_iter_; +}; + +class LazyRequestUnionWindowIterator final : public codec::WindowIterator { + public: + LazyRequestUnionWindowIterator(std::unique_ptr&& left, + std::function(const Row&)> func) + : left_(std::move(left)), func_(func) { + SeekToFirst(); + } + ~LazyRequestUnionWindowIterator() override {} + + RowIterator* GetRawValue() override; + + void Seek(const std::string& key) override; + void SeekToFirst() override; + void Next() override; + bool Valid() override; + const Row GetKey() override; + + private: + std::unique_ptr left_; + std::function(const Row&)> func_; +}; + +class LazyRequestUnionPartitionHandler final : public PartitionHandler { + public: + LazyRequestUnionPartitionHandler(std::shared_ptr left, + std::function(const Row&)> func) + : left_(left), func_(func) {} + ~LazyRequestUnionPartitionHandler() override {} + + std::unique_ptr GetWindowIterator() override; + + std::shared_ptr GetSegment(const std::string& key) override; + + const std::string GetHandlerTypeName() override { return "LazyRequestUnionPartitiontHandler"; } + + std::unique_ptr GetIterator() override; + + const IndexHint& GetIndex() override; + + // unimplemented + const Types& GetTypes() override; + + // unimplemented + const Schema* GetSchema() override { return nullptr; } + const std::string& GetName() override { return left_->GetName(); } + const std::string& GetDatabase() override { return left_->GetDatabase(); } + + base::ConstIterator* GetRawIterator() override; + + auto Left() const { return left_; } + auto Func() const { return func_; } + + private: + std::shared_ptr left_; + std::function(const Row&)> func_; +}; + +class LazyAggIterator final : public RowIterator { + public: + LazyAggIterator(std::unique_ptr&& it, std::function(const Row&)> func, + std::shared_ptr agg_gen, const Row& param) + : it_(std::move(it)), func_(func), agg_gen_(agg_gen), parameter_(param) { + SeekToFirst(); + } + + ~LazyAggIterator() override {} + + bool Valid() const override; + void Next() override; + const uint64_t& GetKey() const override; + const Row& GetValue() override; + bool IsSeekable() const override { return true; } + + void Seek(const uint64_t& key) override; + void SeekToFirst() override; + + private: + std::unique_ptr it_; + std::function(const Row&)> func_; + std::shared_ptr agg_gen_; + const Row& parameter_; + + Row buf_; +}; + +class LazyAggTableHandler final : public TableHandler { + public: + LazyAggTableHandler(std::shared_ptr left, + std::function(const Row&)> func, + std::shared_ptr agg_gen, const Row& param) + : left_(left), func_(func), agg_gen_(agg_gen), parameter_(param) { + DLOG(INFO) << "iterator count = " << left_->GetCount(); + } + ~LazyAggTableHandler() override {} + + std::unique_ptr GetIterator() override; + + // unimplemented + const Types& GetTypes() override; + const IndexHint& GetIndex() override; + std::unique_ptr GetWindowIterator(const std::string& idx_name) override; + const Schema* GetSchema() override; + const std::string& GetName() override; + const std::string& GetDatabase() override; + + base::ConstIterator* GetRawIterator() override; + + std::shared_ptr GetPartition(const std::string& index_name) override; + + private: + std::shared_ptr left_; + std::function(const Row&)> func_; + std::shared_ptr agg_gen_; + const Row& parameter_; +}; + +class LazyAggWindowIterator final : public codec::WindowIterator { + public: + LazyAggWindowIterator(std::unique_ptr left, + std::function(const Row&)> func, + std::shared_ptr gen, const Row& p) + : left_(std::move(left)), func_(func), agg_gen_(gen), parameter_(p) {} + ~LazyAggWindowIterator() override {} + + RowIterator* GetRawValue() override; + + void Seek(const std::string& key) override { left_->Seek(key); } + void SeekToFirst() override { left_->SeekToFirst(); } + void Next() override { left_->Next(); } + bool Valid() override { return left_ && left_->Valid(); } + const Row GetKey() override { return left_->GetKey(); } + + private: + std::unique_ptr left_; + std::function(const Row&)> func_; + std::shared_ptr agg_gen_; + const Row& parameter_; +}; + +class LazyAggPartitionHandler final : public PartitionHandler { + public: + LazyAggPartitionHandler(std::shared_ptr input, + std::shared_ptr agg_gen, const Row& param) + : input_(input), agg_gen_(agg_gen), parameter_(param) {} + ~LazyAggPartitionHandler() override {} + + std::shared_ptr GetSegment(const std::string& key) override; + + const std::string GetHandlerTypeName() override; + + std::unique_ptr GetIterator() override; + + std::unique_ptr GetWindowIterator() override; + + const IndexHint& GetIndex() override { return input_->GetIndex(); } + + // unimplemented + const Types& GetTypes() override { return input_->GetTypes(); } + const Schema* GetSchema() override { return nullptr; } + const std::string& GetName() override { return input_->GetName(); } + const std::string& GetDatabase() override { return input_->GetDatabase(); } + base::ConstIterator* GetRawIterator() override; + + private: + std::shared_ptr input_; + std::shared_ptr agg_gen_; + const Row& parameter_; +}; + +class ConcatIterator final : public RowIterator { + public: + ConcatIterator(std::unique_ptr&& left, size_t left_slices, std::unique_ptr&& right, + size_t right_slices) + : left_(std::move(left)), left_slices_(left_slices), right_(std::move(right)), right_slices_(right_slices) { + SeekToFirst(); + } + ~ConcatIterator() override {} + + bool Valid() const override; + void Next() override; + const uint64_t& GetKey() const override; + const Row& GetValue() override; + + bool IsSeekable() const override { return true; }; + + void Seek(const uint64_t& key) override; + + void SeekToFirst() override; + + private: + std::unique_ptr left_; + size_t left_slices_; + std::unique_ptr right_; + size_t right_slices_; + + Row buf_; +}; + +class SimpleConcatTableHandler final : public TableHandler { + public: + SimpleConcatTableHandler(std::shared_ptr left, size_t left_slices, + std::shared_ptr right, size_t right_slices) + : left_(left), left_slices_(left_slices), right_(right), right_slices_(right_slices) {} + ~SimpleConcatTableHandler() override {} + + std::unique_ptr GetIterator() override; + + RowIterator* GetRawIterator() override; + + std::unique_ptr GetWindowIterator(const std::string& idx_name) override; + + const Types& GetTypes() override { return left_->GetTypes(); } + + const IndexHint& GetIndex() override { return left_->GetIndex(); } + + // unimplemented + const Schema* GetSchema() override { return left_->GetSchema(); } + const std::string& GetName() override { return left_->GetName(); } + const std::string& GetDatabase() override { return left_->GetDatabase(); } + + private: + std::shared_ptr left_; + size_t left_slices_; + std::shared_ptr right_; + size_t right_slices_; +}; + +class ConcatPartitionHandler final : public PartitionHandler { + public: + ConcatPartitionHandler(std::shared_ptr left, size_t left_slices, + std::shared_ptr right, size_t right_slices) + : left_(left), left_slices_(left_slices), right_(right), right_slices_(right_slices) {} + ~ConcatPartitionHandler() override {} + + std::unique_ptr GetIterator() override; + + RowIterator* GetRawIterator() override; + + std::unique_ptr GetWindowIterator(const std::string& idx_name) override; + + std::unique_ptr GetWindowIterator() override; + + std::shared_ptr GetSegment(const std::string& key) override; + + const Types& GetTypes() override { return left_->GetTypes(); } + + const IndexHint& GetIndex() override { return left_->GetIndex(); } + + // unimplemented + const Schema* GetSchema() override { return nullptr; } + const std::string& GetName() override { return left_->GetName(); } + const std::string& GetDatabase() override { return left_->GetDatabase(); } + + private: + std::shared_ptr left_; + size_t left_slices_; + std::shared_ptr right_; + size_t right_slices_; +}; + } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/engine.cc b/hybridse/src/vm/engine.cc index 4fdc368887e..fc88a6ccda1 100644 --- a/hybridse/src/vm/engine.cc +++ b/hybridse/src/vm/engine.cc @@ -153,7 +153,7 @@ bool Engine::Get(const std::string& sql, const std::string& db, RunSession& sess DLOG(INFO) << "Compile Engine ..."; status = base::Status::OK(); std::shared_ptr info = std::make_shared(); - auto& sql_context = std::dynamic_pointer_cast(info)->get_sql_context(); + auto& sql_context = info->get_sql_context(); sql_context.sql = sql; sql_context.db = db; sql_context.engine_mode = session.engine_mode(); diff --git a/hybridse/src/vm/generator.cc b/hybridse/src/vm/generator.cc index 28542a7befb..aaa16ff2783 100644 --- a/hybridse/src/vm/generator.cc +++ b/hybridse/src/vm/generator.cc @@ -16,6 +16,7 @@ #include "vm/generator.h" +#include "vm/catalog_wrapper.h" #include "vm/runner.h" namespace hybridse { diff --git a/hybridse/src/vm/generator.h b/hybridse/src/vm/generator.h index 4dded0d6ebf..7bb49337794 100644 --- a/hybridse/src/vm/generator.h +++ b/hybridse/src/vm/generator.h @@ -79,11 +79,17 @@ class ConstProjectGenerator : public FnGenerator { const Row Gen(const Row& parameter); RowProjectFun fun_; }; -class AggGenerator : public FnGenerator { +class AggGenerator : public FnGenerator, public std::enable_shared_from_this { public: - explicit AggGenerator(const FnInfo& info) : FnGenerator(info) {} + [[nodiscard]] static std::shared_ptr Create(const FnInfo& info) { + return std::shared_ptr(new AggGenerator(info)); + } + virtual ~AggGenerator() {} const Row Gen(const codec::Row& parameter_row, std::shared_ptr table); + + private: + explicit AggGenerator(const FnInfo& info) : FnGenerator(info) {} }; class WindowProjectGenerator : public FnGenerator { public: @@ -112,8 +118,18 @@ class ConditionGenerator : public FnGenerator { const bool Gen(const Row& row, const Row& parameter) const; const bool Gen(std::shared_ptr table, const codec::Row& parameter_row); }; -class RangeGenerator { +class RangeGenerator : public std::enable_shared_from_this { public: + [[nodiscard]] static std::shared_ptr Create(const Range& range) { + return std::shared_ptr(new RangeGenerator(range)); + } + virtual ~RangeGenerator() {} + + const bool Valid() const { return ts_gen_.Valid(); } + OrderGenerator ts_gen_; + WindowRange window_range_; + + private: explicit RangeGenerator(const Range& range) : ts_gen_(range.fn_info()), window_range_() { if (range.frame_ != nullptr) { switch (range.frame()->frame_type()) { @@ -142,11 +158,8 @@ class RangeGenerator { } } } - virtual ~RangeGenerator() {} - const bool Valid() const { return ts_gen_.Valid(); } - OrderGenerator ts_gen_; - WindowRange window_range_; }; + class FilterKeyGenerator { public: explicit FilterKeyGenerator(const Key& filter_key) : filter_key_(filter_key.fn_info()) {} @@ -253,13 +266,15 @@ class FilterGenerator : public PredicateFun { class WindowGenerator { public: explicit WindowGenerator(const WindowOp& window) - : window_op_(window), partition_gen_(window.partition_), sort_gen_(window.sort_), range_gen_(window.range_) {} + : window_op_(window), partition_gen_(window.partition_), sort_gen_(window.sort_) { + range_gen_ = RangeGenerator::Create(window.range_); + } virtual ~WindowGenerator() {} - const int64_t OrderKey(const Row& row) { return range_gen_.ts_gen_.Gen(row); } + const int64_t OrderKey(const Row& row) { return range_gen_->ts_gen_.Gen(row); } const WindowOp window_op_; PartitionGenerator partition_gen_; SortGenerator sort_gen_; - RangeGenerator range_gen_; + std::shared_ptr range_gen_; }; class RequestWindowGenertor { diff --git a/hybridse/src/vm/internal/node_helper.cc b/hybridse/src/vm/internal/node_helper.cc index 9d97c14374a..46b3e0dfa8f 100644 --- a/hybridse/src/vm/internal/node_helper.cc +++ b/hybridse/src/vm/internal/node_helper.cc @@ -36,7 +36,69 @@ Status GetDependentTables(const PhysicalOpNode* root, std::setGetDependents(); }); return Status::OK(); } +absl::StatusOr ExtractRequestNode(PhysicalOpNode* in) { + if (in == nullptr) { + return absl::InvalidArgumentError("null input node"); + } + switch (in->GetOpType()) { + case vm::kPhysicalOpDataProvider: { + auto tp = dynamic_cast(in)->provider_type_; + if (tp == kProviderTypeRequest) { + return in; + } + + // else data provider is fine inside node tree, + // generally it is of type Partition, but can be Table as well e.g window (t1 instance_not_in_window) + return nullptr; + } + case vm::kPhysicalOpJoin: + case vm::kPhysicalOpUnion: + case vm::kPhysicalOpPostRequestUnion: + case vm::kPhysicalOpRequestUnion: + case vm::kPhysicalOpRequestAggUnion: + case vm::kPhysicalOpRequestJoin: { + // Binary Node + // - left or right status not ok -> error + // - left and right both has non-null value + // - the two not equals -> error + // - otherwise -> left as request node + auto left = ExtractRequestNode(in->GetProducer(0)); + if (!left.ok()) { + return left; + } + auto right = ExtractRequestNode(in->GetProducer(1)); + if (!right.ok()) { + return right; + } + + if (left.value() != nullptr && right.value() != nullptr) { + if (!left.value()->Equals(right.value())) { + return absl::NotFoundError( + absl::StrCat("different request table from left and right path:\n", in->GetTreeString())); + } + } + + return left.value(); + } + default: { + break; + } + } + + if (in->GetProducerCnt() == 0) { + // leaf node excepting DataProdiverNode + // consider ok as right source from one of the supported binary op + return nullptr; + } + + if (in->GetProducerCnt() > 1) { + return absl::UnimplementedError( + absl::StrCat("Non-support op with more than one producer:\n", in->GetTreeString())); + } + + return ExtractRequestNode(in->GetProducer(0)); +} } // namespace internal } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/internal/node_helper.h b/hybridse/src/vm/internal/node_helper.h index 7b9d5044748..15514dda764 100644 --- a/hybridse/src/vm/internal/node_helper.h +++ b/hybridse/src/vm/internal/node_helper.h @@ -26,6 +26,7 @@ #include "vm/physical_op.h" #include "vm/physical_plan_context.h" +/// PhysicalOpNode related utility functions namespace hybridse { namespace vm { namespace internal { @@ -68,6 +69,12 @@ State ReduceNode(const PhysicalOpNode* root, State state, BinOp&& op, GetKids&& // Get all dependent (db, table) info from physical plan Status GetDependentTables(const PhysicalOpNode*, std::set>*); +// Extract request node of the node tree. +// Returns +// - Request node on success +// - NULL if tree do not has request table but sufficient as as input tree of the big one +// - Error status otherwise +absl::StatusOr ExtractRequestNode(PhysicalOpNode* in); } // namespace internal } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/mem_catalog.cc b/hybridse/src/vm/mem_catalog.cc index dca41c9355b..29a2e2791e4 100644 --- a/hybridse/src/vm/mem_catalog.cc +++ b/hybridse/src/vm/mem_catalog.cc @@ -18,8 +18,6 @@ #include -#include "absl/strings/substitute.h" - namespace hybridse { namespace vm { MemTimeTableIterator::MemTimeTableIterator(const MemTimeTable* table, diff --git a/hybridse/src/vm/runner.cc b/hybridse/src/vm/runner.cc index 586f75c6187..7d26cdf899d 100644 --- a/hybridse/src/vm/runner.cc +++ b/hybridse/src/vm/runner.cc @@ -25,6 +25,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/substitute.h" #include "base/texttable.h" +#include "vm/catalog.h" #include "vm/catalog_wrapper.h" #include "vm/core_api.h" #include "vm/internal/eval.h" @@ -52,6 +53,15 @@ static bool IsPartitionProvider(vm::PhysicalOpNode* n) { } } +static vm::PhysicalDataProviderNode* request_node(vm::PhysicalOpNode* n) { + switch (n->GetOpType()) { + case kPhysicalOpDataProvider: + return dynamic_cast(n); + default: + return request_node(n->GetProducer(0)); + } +} + // Build Runner for each physical node // return cluster task of given runner // @@ -328,6 +338,16 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { } } } + if (support_cluster_optimized_) { + if (IsPartitionProvider(node->GetProducer(0))) { + // route by index of the left source, and it should uncompleted + auto& route_info = left_task.GetRouteInfo(); + runner->AddProducer(left_task.GetRoot()); + runner->AddProducer(right_task.GetRoot()); + return RegisterTask(node, + UnCompletedClusterTask(runner, route_info.table_handler_, route_info.index_)); + } + } return RegisterTask( node, BinaryInherit(left_task, right_task, runner, index_key, kRightBias)); @@ -372,6 +392,7 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { if (right_task.IsCompletedClusterTask() && right_task.GetRouteInfo().lazy_route_ && !op->join_.index_key_.ValidKey()) { + // join (.., filter) auto& route_info = right_task.GetRouteInfo(); runner->AddProducer(left_task.GetRoot()); runner->AddProducer(right_task.GetRoot()); @@ -387,10 +408,20 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { if (support_cluster_optimized_) { if (right_task.IsCompletedClusterTask() && right_task.GetRouteInfo().lazy_route_ && !op->join_.index_key_.ValidKey()) { + // concat join (.., filter) runner->AddProducer(left_task.GetRoot()); runner->AddProducer(right_task.GetRoot()); return RegisterTask(node, ClusterTask(runner, {}, RouteInfo{})); } + + // concat join (any(tx), any(tx)), tx is not request table + auto left = request_node(node->GetProducer(0)); + // auto right = request_node(node->GetProducer(1)); + if (left->provider_type_ == kProviderTypePartition) { + runner->AddProducer(left_task.GetRoot()); + runner->AddProducer(right_task.GetRoot()); + return RegisterTask(node, ClusterTask(runner, {}, left_task.GetRouteInfo())); + } } return RegisterTask(node, BinaryInherit(left_task, right_task, runner, Key(), kNoBias)); } @@ -1526,7 +1557,7 @@ void WindowAggRunner::RunWindowAggOnKey( int32_t min_union_pos = IteratorStatus::FindLastIteratorWithMininumKey(union_segment_status); int32_t cnt = output_table->GetCount(); - HistoryWindow window(instance_window_gen_.range_gen_.window_range_); + HistoryWindow window(instance_window_gen_.range_gen_->window_range_); window.set_instance_not_in_window(instance_not_in_window_); window.set_exclude_current_time(exclude_current_time_); @@ -1602,6 +1633,8 @@ std::shared_ptr RequestLastJoinRunner::Run( return join_gen_->LazyLastJoin(left_part, std::dynamic_pointer_cast(right), ctx.GetParameterRow()); } + + LOG(WARNING) << "skip due to performance: left source of request join is table handler (unoptimized)"; return std::shared_ptr(); } @@ -2101,20 +2134,23 @@ std::shared_ptr ConcatRunner::Run( auto right = inputs[1]; auto left = inputs[0]; size_t left_slices = producers_[0]->output_schemas()->GetSchemaSourceSize(); - size_t right_slices = - producers_[1]->output_schemas()->GetSchemaSourceSize(); + size_t right_slices = producers_[1]->output_schemas()->GetSchemaSourceSize(); if (!left) { return std::shared_ptr(); } switch (left->GetHandlerType()) { case kRowHandler: - return std::shared_ptr(new RowCombineWrapper( - std::dynamic_pointer_cast(left), left_slices, - std::dynamic_pointer_cast(right), right_slices)); + return std::shared_ptr( + new RowCombineWrapper(std::dynamic_pointer_cast(left), left_slices, + std::dynamic_pointer_cast(right), right_slices)); case kTableHandler: - return std::shared_ptr(new ConcatTableHandler( - std::dynamic_pointer_cast(left), left_slices, - std::dynamic_pointer_cast(right), right_slices)); + return std::shared_ptr( + new ConcatTableHandler(std::dynamic_pointer_cast(left), left_slices, + std::dynamic_pointer_cast(right), right_slices)); + case kPartitionHandler: + return std::shared_ptr( + new ConcatPartitionHandler(std::dynamic_pointer_cast(left), left_slices, + std::dynamic_pointer_cast(right), right_slices)); default: { LOG(WARNING) << "fail to run conncat runner: handler type unsupported"; @@ -2149,6 +2185,8 @@ std::shared_ptr LimitRunner::Run( LOG(WARNING) << "fail limit when input type isn't row or table"; return fail_ptr; } + default: + break; } return fail_ptr; } @@ -2205,7 +2243,7 @@ std::shared_ptr GroupAggRunner::Run( return std::shared_ptr(); } if (!having_condition_.Valid() || having_condition_.Gen(table, parameter)) { - output_table->AddRow(agg_gen_.Gen(parameter, table)); + output_table->AddRow(agg_gen_->Gen(parameter, table)); } return output_table; } else if (kPartitionHandler == input->GetHandlerType()) { @@ -2228,7 +2266,7 @@ std::shared_ptr GroupAggRunner::Run( if (limit_cnt_.has_value() && cnt++ >= limit_cnt_) { break; } - output_table->AddRow(agg_gen_.Gen(parameter, segment)); + output_table->AddRow(agg_gen_->Gen(parameter, segment)); } iter->Next(); } @@ -2305,10 +2343,10 @@ std::shared_ptr RequestAggUnionRunner::Run( } auto request = std::dynamic_pointer_cast(request_handler)->GetValue(); - int64_t ts_gen = range_gen_.Valid() ? range_gen_.ts_gen_.Gen(request) : -1; + int64_t ts_gen = range_gen_->Valid() ? range_gen_->ts_gen_.Gen(request) : -1; // Prepare Union Window - auto union_inputs = windows_union_gen_.RunInputs(ctx); + auto union_inputs = windows_union_gen_->RunInputs(ctx); if (ctx.is_debug()) { for (size_t i = 0; i < union_inputs.size(); i++) { std::ostringstream sss; @@ -2317,13 +2355,13 @@ std::shared_ptr RequestAggUnionRunner::Run( } } - auto& key_gen = windows_union_gen_.windows_gen_[0].index_seek_gen_.index_key_gen_; + auto& key_gen = windows_union_gen_->windows_gen_[0].index_seek_gen_.index_key_gen_; std::string key = key_gen.Gen(request, ctx.GetParameterRow()); // do not use codegen to gen the union outputs for aggr segment union_inputs.pop_back(); auto union_segments = - windows_union_gen_.GetRequestWindows(request, ctx.GetParameterRow(), union_inputs); + windows_union_gen_->GetRequestWindows(request, ctx.GetParameterRow(), union_inputs); // code_gen result of agg_segment is not correct. we correct the result here auto agg_segment = std::dynamic_pointer_cast(union_inputs[1])->GetSegment(key); if (agg_segment) { @@ -2342,12 +2380,12 @@ std::shared_ptr RequestAggUnionRunner::Run( std::shared_ptr window; if (agg_segment) { - window = RequestUnionWindow(request, union_segments, ts_gen, range_gen_.window_range_, output_request_row_, + window = RequestUnionWindow(request, union_segments, ts_gen, range_gen_->window_range_, output_request_row_, exclude_current_time_); } else { LOG(WARNING) << "Aggr segment is empty. Fall back to normal RequestUnionRunner"; - window = RequestUnionRunner::RequestUnionWindow(request, union_segments, ts_gen, range_gen_.window_range_, true, - exclude_current_time_); + window = RequestUnionRunner::RequestUnionWindow(request, union_segments, ts_gen, range_gen_->window_range_, + true, exclude_current_time_); } return window; @@ -2766,9 +2804,8 @@ std::shared_ptr ReduceRunner::Run( return row_handler; } -std::shared_ptr RequestUnionRunner::Run( - RunnerContext& ctx, - const std::vector>& inputs) { +std::shared_ptr RequestUnionRunner::Run(RunnerContext& ctx, + const std::vector>& inputs) { auto fail_ptr = std::shared_ptr(); if (inputs.size() < 2u) { LOG(WARNING) << "inputs size < 2"; @@ -2779,23 +2816,30 @@ std::shared_ptr RequestUnionRunner::Run( if (!left || !right) { return std::shared_ptr(); } - if (kRowHandler != left->GetHandlerType()) { - return std::shared_ptr(); + if (kRowHandler == left->GetHandlerType()) { + auto request = std::dynamic_pointer_cast(left)->GetValue(); + return RunOneRequest(&ctx, request); + } else if (kPartitionHandler == left->GetHandlerType()) { + auto left_part = std::dynamic_pointer_cast(left); + auto func = std::bind(&RequestUnionRunner::RunOneRequest, this, &ctx, std::placeholders::_1); + return std::shared_ptr(new LazyRequestUnionPartitionHandler(left_part, func)); } - auto request = std::dynamic_pointer_cast(left)->GetValue(); - + LOG(WARNING) << "skip due to performance: left source of request union is table handler(unoptimized)"; + return std::shared_ptr(); +} +std::shared_ptr RequestUnionRunner::RunOneRequest(RunnerContext* ctx, const Row& request) { // ts_gen < 0 if there is no ORDER BY clause for WINDOW - int64_t ts_gen = range_gen_.Valid() ? range_gen_.ts_gen_.Gen(request) : -1; + int64_t ts_gen = range_gen_->Valid() ? range_gen_->ts_gen_.Gen(request) : -1; // Prepare Union Window - auto union_inputs = windows_union_gen_.RunInputs(ctx); - auto union_segments = - windows_union_gen_.GetRequestWindows(request, ctx.GetParameterRow(), union_inputs); + auto union_inputs = windows_union_gen_->RunInputs(*ctx); + auto union_segments = windows_union_gen_->GetRequestWindows(request, ctx->GetParameterRow(), union_inputs); // build window with start and end offset - return RequestUnionWindow(request, union_segments, ts_gen, range_gen_.window_range_, output_request_row_, + return RequestUnionWindow(request, union_segments, ts_gen, range_gen_->window_range_, output_request_row_, exclude_current_time_); } + std::shared_ptr RequestUnionRunner::RequestUnionWindow( const Row& request, std::vector> union_segments, int64_t ts_gen, const WindowRange& window_range, bool output_request_row, bool exclude_current_time) { @@ -2862,9 +2906,9 @@ std::shared_ptr RequestUnionRunner::RequestUnionWindow( request_key < range_start); if (output_request_row) { window_table->AddRow(request_key, request); - } - if (WindowRange::kInWindow == range_status) { - cnt++; + if (WindowRange::kInWindow == range_status) { + cnt++; + } } while (-1 != max_union_pos) { @@ -2941,16 +2985,26 @@ std::shared_ptr AggRunner::Run( LOG(WARNING) << "input is empty"; return std::shared_ptr(); } - if (kTableHandler != input->GetHandlerType()) { - return std::shared_ptr(); - } - auto table = std::dynamic_pointer_cast(input); - auto parameter = ctx.GetParameterRow(); - if (having_condition_.Valid() && !having_condition_.Gen(table, parameter)) { - return std::shared_ptr(); + + if (kTableHandler == input->GetHandlerType()) { + auto table = std::dynamic_pointer_cast(input); + auto parameter = ctx.GetParameterRow(); + if (having_condition_.Valid() && !having_condition_.Gen(table, parameter)) { + return std::shared_ptr(); + } + auto row_handler = std::shared_ptr(new MemRowHandler(agg_gen_->Gen(parameter, table))); + return row_handler; + } else if (kPartitionHandler == input->GetHandlerType()) { + // lazify + auto data_set = std::dynamic_pointer_cast(input); + if (data_set == nullptr) { + return std::shared_ptr(); + } + + return std::shared_ptr(new LazyAggPartitionHandler(data_set, agg_gen_, ctx.GetParameterRow())); } - auto row_handler = std::shared_ptr(new MemRowHandler(agg_gen_.Gen(parameter, table))); - return row_handler; + + return std::shared_ptr(); } std::shared_ptr ProxyRequestRunner::BatchRequestRun( RunnerContext& ctx) { diff --git a/hybridse/src/vm/runner.h b/hybridse/src/vm/runner.h index 64e712bbde7..a9d135b5e33 100644 --- a/hybridse/src/vm/runner.h +++ b/hybridse/src/vm/runner.h @@ -32,7 +32,6 @@ #include "node/node_manager.h" #include "vm/aggregator.h" #include "vm/catalog.h" -#include "vm/catalog_wrapper.h" #include "vm/core_api.h" #include "vm/generator.h" #include "vm/mem_catalog.h" @@ -354,13 +353,16 @@ class WindowUnionGenerator : public InputsGenerator { std::vector windows_gen_; }; -class RequestWindowUnionGenerator : public InputsGenerator { +class RequestWindowUnionGenerator : public InputsGenerator, + public std::enable_shared_from_this { public: - RequestWindowUnionGenerator() : InputsGenerator() {} + [[nodiscard]] static std::shared_ptr Create() { + return std::shared_ptr(new RequestWindowUnionGenerator()); + } virtual ~RequestWindowUnionGenerator() {} void AddWindowUnion(const RequestWindowOp& window_op, Runner* runner) { - windows_gen_.push_back(RequestWindowGenertor(window_op)); + windows_gen_.emplace_back(window_op); AddInput(runner); } @@ -373,6 +375,9 @@ class RequestWindowUnionGenerator : public InputsGenerator { return union_segments; } std::vector windows_gen_; + + private: + RequestWindowUnionGenerator() : InputsGenerator() {} }; class WindowJoinGenerator : public InputsGenerator { @@ -549,7 +554,7 @@ class GroupAggRunner : public Runner { : Runner(id, kRunnerGroupAgg, schema, limit_cnt), group_(group.fn_info()), having_condition_(having_condition.fn_info()), - agg_gen_(project) {} + agg_gen_(AggGenerator::Create(project)) {} ~GroupAggRunner() {} std::shared_ptr Run( RunnerContext& ctx, // NOLINT @@ -557,24 +562,22 @@ class GroupAggRunner : public Runner { override; // NOLINT KeyGenerator group_; ConditionGenerator having_condition_; - AggGenerator agg_gen_; + std::shared_ptr agg_gen_; }; class AggRunner : public Runner { public: - AggRunner(const int32_t id, const SchemasContext* schema, - const std::optional limit_cnt, - const ConditionFilter& having_condition, - const FnInfo& fn_info) + AggRunner(const int32_t id, const SchemasContext* schema, const std::optional limit_cnt, + const ConditionFilter& having_condition, const FnInfo& fn_info) : Runner(id, kRunnerAgg, schema, limit_cnt), having_condition_(having_condition.fn_info()), - agg_gen_(fn_info) {} + agg_gen_(AggGenerator::Create(fn_info)) {} ~AggRunner() {} std::shared_ptr Run( RunnerContext& ctx, // NOLINT const std::vector>& inputs) override; // NOLINT ConditionGenerator having_condition_; - AggGenerator agg_gen_; + std::shared_ptr agg_gen_; }; class ReduceRunner : public Runner { @@ -583,12 +586,12 @@ class ReduceRunner : public Runner { const ConditionFilter& having_condition, const FnInfo& fn_info) : Runner(id, kRunnerReduce, schema, limit_cnt), having_condition_(having_condition.fn_info()), - agg_gen_(fn_info) {} + agg_gen_(AggGenerator::Create(fn_info)) {} ~ReduceRunner() {} std::shared_ptr Run(RunnerContext& ctx, const std::vector>& inputs) override; ConditionGenerator having_condition_; - AggGenerator agg_gen_; + std::shared_ptr agg_gen_; }; class WindowAggRunner : public Runner { @@ -638,37 +641,39 @@ class WindowAggRunner : public Runner { class RequestUnionRunner : public Runner { public: - RequestUnionRunner(const int32_t id, const SchemasContext* schema, - const std::optional limit_cnt, const Range& range, - bool exclude_current_time, bool output_request_row) + RequestUnionRunner(const int32_t id, const SchemasContext* schema, const std::optional limit_cnt, + const Range& range, bool exclude_current_time, bool output_request_row) : Runner(id, kRunnerRequestUnion, schema, limit_cnt), - range_gen_(range), + range_gen_(RangeGenerator::Create(range)), exclude_current_time_(exclude_current_time), - output_request_row_(output_request_row) {} + output_request_row_(output_request_row) { + windows_union_gen_ = RequestWindowUnionGenerator::Create(); + } + + std::shared_ptr Run(RunnerContext& ctx, // NOLINT + const std::vector>& inputs) override; + + std::shared_ptr RunOneRequest(RunnerContext* ctx, const Row& request); - std::shared_ptr Run( - RunnerContext& ctx, // NOLINT - const std::vector>& inputs) - override; // NOLINT static std::shared_ptr RequestUnionWindow(const Row& request, std::vector> union_segments, int64_t request_ts, const WindowRange& window_range, bool output_request_row, bool exclude_current_time); void AddWindowUnion(const RequestWindowOp& window, Runner* runner) { - windows_union_gen_.AddWindowUnion(window, runner); + windows_union_gen_->AddWindowUnion(window, runner); } void Print(std::ostream& output, const std::string& tab, std::set* visited_ids) const override { Runner::Print(output, tab, visited_ids); output << "\n" << tab << "window unions:\n"; - for (auto& r : windows_union_gen_.input_runners_) { + for (auto& r : windows_union_gen_->input_runners_) { r->Print(output, tab + " ", visited_ids); } } - RequestWindowUnionGenerator windows_union_gen_; - RangeGenerator range_gen_; + std::shared_ptr windows_union_gen_; + std::shared_ptr range_gen_; bool exclude_current_time_; bool output_request_row_; }; @@ -679,11 +684,12 @@ class RequestAggUnionRunner : public Runner { const Range& range, bool exclude_current_time, bool output_request_row, const node::CallExprNode* project) : Runner(id, kRunnerRequestAggUnion, schema, limit_cnt), - range_gen_(range), + range_gen_(RangeGenerator::Create(range)), exclude_current_time_(exclude_current_time), output_request_row_(output_request_row), func_(project->GetFnDef()), agg_col_(project->GetChild(0)) { + windows_union_gen_ = RequestWindowUnionGenerator::Create(); if (agg_col_->GetExprType() == node::kExprColumnRef) { agg_col_name_ = dynamic_cast(agg_col_)->GetColumnName(); } /* for kAllExpr like count(*), agg_col_name_ is empty */ @@ -704,7 +710,7 @@ class RequestAggUnionRunner : public Runner { const bool output_request_row, const bool exclude_current_time) const; void AddWindowUnion(const RequestWindowOp& window, Runner* runner) { - windows_union_gen_.AddWindowUnion(window, runner); + windows_union_gen_->AddWindowUnion(window, runner); } static std::string PrintEvalValue(const absl::StatusOr>& val); @@ -723,8 +729,8 @@ class RequestAggUnionRunner : public Runner { kMaxWhere, }; - RequestWindowUnionGenerator windows_union_gen_; - RangeGenerator range_gen_; + std::shared_ptr windows_union_gen_; + std::shared_ptr range_gen_; bool exclude_current_time_; // include request row from union. diff --git a/hybridse/src/vm/transform.cc b/hybridse/src/vm/transform.cc index d52667dbc6f..a0340d41fbe 100644 --- a/hybridse/src/vm/transform.cc +++ b/hybridse/src/vm/transform.cc @@ -639,16 +639,13 @@ Status RequestModeTransformer::TransformWindowOp(PhysicalOpNode* depend, } case kPhysicalOpDataProvider: { auto data_op = dynamic_cast(depend); - CHECK_TRUE(data_op->provider_type_ == kProviderTypeRequest, - kPlanError, - "Do not support window on non-request input"); + CHECK_TRUE(data_op->provider_type_ != kProviderTypePartition, kPlanError, "data node already a partition"); auto name = data_op->table_handler_->GetName(); auto db_name = data_op->table_handler_->GetDatabase(); auto table = catalog_->GetTable(db_name, name); - CHECK_TRUE(table != nullptr, kPlanError, - "Fail to transform data provider op: table " + name + - "not exists"); + CHECK_TRUE(table != nullptr, kPlanError, "Fail to transform data provider op: table ", name, "not exists"); + PhysicalTableProviderNode* right = nullptr; CHECK_STATUS(CreateOp(&right, table)); @@ -657,6 +654,12 @@ Status RequestModeTransformer::TransformWindowOp(PhysicalOpNode* depend, data_op, right, table->GetDatabase(), table->GetName(), table->GetSchema(), nullptr, w_ptr, &request_union_op)); + if (data_op->provider_type_ == kProviderTypeTable && !request_union_op->instance_not_in_window()) { + // REQUEST_UNION(t1, t1) do not has request table, dont output reqeust row, + // but should output if REQUEST_UNION(t1, t1, unions=xxx, instance_not_in_window) + request_union_op->set_output_request_row(false); + } + if (!w_ptr->union_tables().empty()) { for (auto iter = w_ptr->union_tables().cbegin(); iter != w_ptr->union_tables().cend(); iter++) { @@ -1403,19 +1406,24 @@ Status BatchModeTransformer::CreatePhysicalProjectNode( } case kAggregation: { PhysicalAggregationNode* agg_op = nullptr; - CHECK_STATUS(CreateOp(&agg_op, depend, - column_projects, having_condition)); + CHECK_STATUS(CreateOp(&agg_op, depend, column_projects, having_condition)); *output = agg_op; break; } case kGroupAggregation: { - CHECK_TRUE(!node::ExprListNullOrEmpty(group_keys), kPlanError, - "Can not create group agg with non group keys"); + if (node::ExprListNullOrEmpty(group_keys)) { + PhysicalAggregationNode* agg_op = nullptr; + CHECK_STATUS(CreateOp(&agg_op, depend, column_projects, having_condition)); + *output = agg_op; + } else { + // CHECK_TRUE(!node::ExprListNullOrEmpty(group_keys), kPlanError, + // "Can not create group agg with non group keys"); - PhysicalGroupAggrerationNode* agg_op = nullptr; - CHECK_STATUS(CreateOp( - &agg_op, depend, column_projects, having_condition, group_keys)); - *output = agg_op; + PhysicalGroupAggrerationNode* agg_op = nullptr; + CHECK_STATUS(CreateOp(&agg_op, depend, column_projects, having_condition, + group_keys)); + *output = agg_op; + } break; } case kWindowAggregation: { @@ -1455,6 +1463,10 @@ base::Status BatchModeTransformer::ExtractGroupKeys(vm::PhysicalOpNode* depend, CHECK_STATUS(ExtractGroupKeys(depend->GetProducer(0), keys)) return base::Status::OK(); } + + if (depend->GetOpType() == kPhysicalOpRequestUnion) { + return base::Status::OK(); + } CHECK_TRUE(depend->GetOpType() == kPhysicalOpGroupBy, kPlanError, "Fail to extract group keys from op ", vm::PhysicalOpTypeName(depend->GetOpType())) *keys = dynamic_cast(depend)->group().keys_; @@ -1637,12 +1649,26 @@ Status BatchModeTransformer::ValidatePartitionDataProvider(PhysicalOpNode* in) { if (kPhysicalOpSimpleProject == in->GetOpType() || kPhysicalOpRename == in->GetOpType() || kPhysicalOpFilter == in->GetOpType()) { CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(0))) + } else if (kPhysicalOpProject == in->GetOpType()) { + auto* prj = dynamic_cast(in); + CHECK_TRUE(prj->project_type_ == kAggregation, kPlanError, + "can't optimize project node: ", in->GetTreeString()); + CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(0))); } else if (kPhysicalOpRequestJoin == in->GetOpType()) { CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(0))); CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(1))); + } else if (kPhysicalOpRequestUnion == in->GetOpType()) { + CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(0))); + auto n = dynamic_cast(in); + if (!n->instance_not_in_window()) { + CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(1))); + } + for (auto& window_union : n->window_unions().window_unions_) { + CHECK_STATUS(ValidateWindowIndexOptimization(window_union.second, window_union.first)); + } } else { CHECK_TRUE(kPhysicalOpDataProvider == in->GetOpType() && - kProviderTypePartition == dynamic_cast(in)->provider_type_, + kProviderTypeTable != dynamic_cast(in)->provider_type_, kPlanError, "Isn't partition provider:", in->GetTreeString()); } return Status::OK(); @@ -1667,7 +1693,7 @@ Status BatchModeTransformer::ValidateJoinIndexOptimization( return Status::OK(); } else { CHECK_STATUS(ValidatePartitionDataProvider(right), - "Join node hasn't been optimized"); + "Join node hasn't been optimized: right=", right->GetTreeString()); } return Status::OK(); } @@ -2423,7 +2449,7 @@ Status RequestModeTransformer::TransformScanOp(const node::TablePlanNode* node, } } Status RequestModeTransformer::ValidateRequestTable(PhysicalOpNode* in) { - auto req = ExtractRequestNode(in); + auto req = internal::ExtractRequestNode(in); CHECK_TRUE(req.ok(), kPlanError, req.status()); std::set> db_tables; @@ -2433,69 +2459,6 @@ Status RequestModeTransformer::ValidateRequestTable(PhysicalOpNode* in) { return Status::OK(); } -absl::StatusOr RequestModeTransformer::ExtractRequestNode(PhysicalOpNode* in) { - if (in == nullptr) { - return absl::InvalidArgumentError("null input node"); - } - - switch (in->GetOpType()) { - case vm::kPhysicalOpDataProvider: { - auto tp = dynamic_cast(in)->provider_type_; - if (tp == kProviderTypeRequest) { - return in; - } - - // else data provider is fine inside node tree, - // generally it is of type Partition, but can be Table as well e.g window (t1 instance_not_in_window) - return nullptr; - } - case vm::kPhysicalOpJoin: - case vm::kPhysicalOpUnion: - case vm::kPhysicalOpPostRequestUnion: - case vm::kPhysicalOpRequestUnion: - case vm::kPhysicalOpRequestAggUnion: - case vm::kPhysicalOpRequestJoin: { - // Binary Node - // - left or right status not ok -> error - // - left and right both has non-null value - // - the two not equals -> error - // - otherwise -> left as request node - auto left = ExtractRequestNode(in->GetProducer(0)); - if (!left.ok()) { - return left; - } - auto right = ExtractRequestNode(in->GetProducer(1)); - if (!right.ok()) { - return right; - } - - if (left.value() != nullptr && right.value() != nullptr) { - if (!left.value()->Equals(right.value())) { - return absl::NotFoundError( - absl::StrCat("different request table from left and right path:\n", in->GetTreeString())); - } - } - - return left.value(); - } - default: { - break; - } - } - - if (in->GetProducerCnt() == 0) { - // leaf node excepting DataProdiverNode - // consider ok as right source from one of the supported binary op - return nullptr; - } - - if (in->GetProducerCnt() > 1) { - return absl::UnimplementedError( - absl::StrCat("Non-support op with more than one producer:\n", in->GetTreeString())); - } - - return ExtractRequestNode(in->GetProducer(0)); -} // transform a single `ProjectListNode` of `ProjectPlanNode` Status RequestModeTransformer::TransformProjectOp( diff --git a/hybridse/src/vm/transform.h b/hybridse/src/vm/transform.h index caaf63b655d..45c4d9660e7 100644 --- a/hybridse/src/vm/transform.h +++ b/hybridse/src/vm/transform.h @@ -21,7 +21,6 @@ #include #include #include -#include #include #include "absl/base/attributes.h" @@ -29,7 +28,6 @@ #include "base/fe_status.h" #include "base/graph.h" #include "llvm/Bitcode/BitcodeWriter.h" -#include "llvm/Support/raw_ostream.h" #include "node/node_manager.h" #include "node/plan_node.h" #include "node/sql_node.h" @@ -323,13 +321,6 @@ class RequestModeTransformer : public BatchModeTransformer { // - do not has any physical table refered Status ValidateRequestTable(PhysicalOpNode* in); - // Extract request node of the node tree - // returns - // - Request node on success - // - NULL if tree do not has request table but sufficient as as input tree of the big one - // - Error status otherwise - static absl::StatusOr ExtractRequestNode(PhysicalOpNode* in); - private: // Optimize simple project node which is the producer of window project Status OptimizeSimpleProjectAsWindowProducer(PhysicalSimpleProjectNode* depend, diff --git a/src/sdk/sql_sdk_test.h b/src/sdk/sql_sdk_test.h index 58d72cf458a..5eaadde6623 100644 --- a/src/sdk/sql_sdk_test.h +++ b/src/sdk/sql_sdk_test.h @@ -50,6 +50,8 @@ INSTANTIATE_TEST_SUITE_P(SQLSDKLastJoinQuery, SQLSDKQueryTest, testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/last_join_query.yaml"))); INSTANTIATE_TEST_SUITE_P(SQLSDKLastJoinWindowQuery, SQLSDKQueryTest, testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/last_join_window_query.yaml"))); +INSTANTIATE_TEST_SUITE_P(SQLSDKLastJoinSubqueryWindow, SQLSDKQueryTest, + testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/last_join_subquery_window.yml"))); INSTANTIATE_TEST_SUITE_P(SQLSDKLastJoinWhere, SQLSDKQueryTest, testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/last_join_where.yaml"))); INSTANTIATE_TEST_SUITE_P(SQLSDKParameterizedQuery, SQLSDKQueryTest,