From c64ca4b64b67d2d1a5dfd95db256209597bc15cc Mon Sep 17 00:00:00 2001 From: aceforeverd Date: Sun, 15 Oct 2023 23:32:34 +0000 Subject: [PATCH] save changes to last join (window) --- cases/query/last_join_window_query.yaml | 38 ++++ .../toydb/src/tablet/tablet_catalog.cc | 33 +-- .../toydb/src/tablet/tablet_catalog.h | 5 +- .../src/testing/toydb_engine_test_base.cc | 21 +- hybridse/include/codec/row.h | 2 +- hybridse/include/codec/row_list.h | 2 +- hybridse/include/vm/catalog.h | 3 +- hybridse/include/vm/mem_catalog.h | 156 +++++++++++++- hybridse/include/vm/physical_op.h | 37 ++-- .../passes/physical/batch_request_optimize.cc | 21 +- .../physical/group_and_sort_optimized.cc | 81 +++++++- .../physical/group_and_sort_optimized.h | 13 ++ .../physical/transform_up_physical_pass.h | 1 - hybridse/src/plan/planner.cc | 3 +- hybridse/src/testing/engine_test_base.h | 3 +- hybridse/src/vm/catalog_wrapper.cc | 1 + hybridse/src/vm/catalog_wrapper.h | 192 ++++++++++++++++++ hybridse/src/vm/engine.cc | 2 +- hybridse/src/vm/generator.cc | 1 + hybridse/src/vm/generator.h | 35 +++- hybridse/src/vm/mem_catalog.cc | 2 - hybridse/src/vm/runner.cc | 136 +++++++++---- hybridse/src/vm/runner.h | 65 +++--- hybridse/src/vm/transform.cc | 57 ++++-- 24 files changed, 724 insertions(+), 186 deletions(-) diff --git a/cases/query/last_join_window_query.yaml b/cases/query/last_join_window_query.yaml index a11fce4369f..37ed519a96f 100644 --- a/cases/query/last_join_window_query.yaml +++ b/cases/query/last_join_window_query.yaml @@ -420,3 +420,41 @@ cases: - [2,"aa", 2, 20, 4, 20 ] - [3,"bb", 2, null, 2, NULL] - [4,"cc", 2, 21, 2, 21] + + # =================================================================== + # LAST JOIN (WINDOW) + # =================================================================== + - id: 8 + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",2,1590738989000] + - ["bb",3,1590738990000] + - ["cc",4,1590738991000] + - name: t2 + columns: ["c1 string", "c2 int", "c4 timestamp"] + indexs: ["index1:c1:c4", "index2:c2:c4"] + rows: + - ["aa",1, 1590738989000] + - ["bb",3, 1590738990000] + - ["dd",4, 1590738991000] + sql: | + select t1.c1, tx.c1 as c1r, tx.c2 as c2r, agg + from t1 last join ( + select c1, c2, count(c4) over w as agg + from t2 + window w as ( + partition by c1 order by c4 + rows between 1 preceding and current row + ) + ) tx + on t1.c2 = tx.c2 + expect: + columns: ["c1 string", "c1r string", "c2r int", "agg int64"] + order: c1 + data: | + aa, NULL, NULL, NULL + bb, bb, 3, 1 + cc, dd, 4, 1 diff --git a/hybridse/examples/toydb/src/tablet/tablet_catalog.cc b/hybridse/examples/toydb/src/tablet/tablet_catalog.cc index feeb750ab6f..71c2f34f407 100644 --- a/hybridse/examples/toydb/src/tablet/tablet_catalog.cc +++ b/hybridse/examples/toydb/src/tablet/tablet_catalog.cc @@ -136,22 +136,6 @@ RowIterator* TabletTableHandler::GetRawIterator() { return new storage::FullTableIterator(table_->GetSegments(), table_->GetSegCnt(), table_); } -const uint64_t TabletTableHandler::GetCount() { - auto iter = GetIterator(); - uint64_t cnt = 0; - while (iter->Valid()) { - iter->Next(); - cnt++; - } - return cnt; -} -Row TabletTableHandler::At(uint64_t pos) { - auto iter = GetIterator(); - while (pos-- > 0 && iter->Valid()) { - iter->Next(); - } - return iter->Valid() ? iter->GetValue() : Row(); -} TabletCatalog::TabletCatalog() : tables_(), db_() {} @@ -249,22 +233,6 @@ std::unique_ptr TabletSegmentHandler::GetWindowIterator( const std::string& idx_name) { return std::unique_ptr(); } -const uint64_t TabletSegmentHandler::GetCount() { - auto iter = GetIterator(); - uint64_t cnt = 0; - while (iter->Valid()) { - cnt++; - iter->Next(); - } - return cnt; -} -Row TabletSegmentHandler::At(uint64_t pos) { - auto iter = GetIterator(); - while (pos-- > 0 && iter->Valid()) { - iter->Next(); - } - return iter->Valid() ? iter->GetValue() : Row(); -} const uint64_t TabletPartitionHandler::GetCount() { auto iter = GetWindowIterator(); @@ -275,5 +243,6 @@ const uint64_t TabletPartitionHandler::GetCount() { } return cnt; } + } // namespace tablet } // namespace hybridse diff --git a/hybridse/examples/toydb/src/tablet/tablet_catalog.h b/hybridse/examples/toydb/src/tablet/tablet_catalog.h index fa41140a495..dd5bea22c51 100644 --- a/hybridse/examples/toydb/src/tablet/tablet_catalog.h +++ b/hybridse/examples/toydb/src/tablet/tablet_catalog.h @@ -68,8 +68,6 @@ class TabletSegmentHandler : public TableHandler { std::unique_ptr GetIterator() override; RowIterator* GetRawIterator() override; std::unique_ptr GetWindowIterator(const std::string& idx_name) override; - const uint64_t GetCount() override; - Row At(uint64_t pos) override; const std::string GetHandlerTypeName() override { return "TabletSegmentHandler"; } @@ -104,6 +102,7 @@ class TabletPartitionHandler std::unique_ptr GetWindowIterator() override { return table_handler_->GetWindowIterator(index_name_); } + const uint64_t GetCount() override; std::shared_ptr GetSegment(const std::string& key) override { @@ -152,8 +151,6 @@ class TabletTableHandler RowIterator* GetRawIterator() override; std::unique_ptr GetWindowIterator( const std::string& idx_name); - virtual const uint64_t GetCount(); - Row At(uint64_t pos) override; virtual std::shared_ptr GetPartition( const std::string& index_name) { diff --git a/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc b/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc index fcaa71d8373..35a595b431e 100644 --- a/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc +++ b/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc @@ -15,8 +15,9 @@ */ #include "testing/toydb_engine_test_base.h" + +#include "absl/strings/str_join.h" #include "gtest/gtest.h" -#include "gtest/internal/gtest-param-util.h" using namespace llvm; // NOLINT (build/namespaces) using namespace llvm::orc; // NOLINT (build/namespaces) @@ -141,18 +142,12 @@ std::shared_ptr BuildOnePkTableStorage( } return catalog; } -void BatchRequestEngineCheckWithCommonColumnIndices( - const SqlCase& sql_case, const EngineOptions options, - const std::set& common_column_indices) { - std::ostringstream oss; - for (size_t index : common_column_indices) { - oss << index << ","; - } - LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: " - "common_column_indices = [" - << oss.str() << "]"; - ToydbBatchRequestEngineTestRunner engine_test(sql_case, options, - common_column_indices); +// Run check with common column index info +void BatchRequestEngineCheckWithCommonColumnIndices(const SqlCase& sql_case, const EngineOptions options, + const std::set& common_column_indices) { + LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: common_column_indices = [" + << absl::StrJoin(common_column_indices, ",") << "]"; + ToydbBatchRequestEngineTestRunner engine_test(sql_case, options, common_column_indices); engine_test.RunCheck(); } diff --git a/hybridse/include/codec/row.h b/hybridse/include/codec/row.h index cd6abb0a3a1..69158d41e85 100644 --- a/hybridse/include/codec/row.h +++ b/hybridse/include/codec/row.h @@ -54,7 +54,7 @@ class Row { inline int32_t size() const { return slice_.size(); } inline int32_t size(int32_t pos) const { - return 0 == pos ? slice_.size() : slices_[pos - 1].size(); + return 0 == pos ? slice_.size() : slices_.at(pos - 1).size(); } // Return true if the length of the referenced data is zero diff --git a/hybridse/include/codec/row_list.h b/hybridse/include/codec/row_list.h index b32ad24c3eb..cfc83fae6a1 100644 --- a/hybridse/include/codec/row_list.h +++ b/hybridse/include/codec/row_list.h @@ -76,7 +76,7 @@ class ListV { virtual const uint64_t GetCount() { auto iter = GetIterator(); uint64_t cnt = 0; - while (iter->Valid()) { + while (iter && iter->Valid()) { iter->Next(); cnt++; } diff --git a/hybridse/include/vm/catalog.h b/hybridse/include/vm/catalog.h index 30e68316606..42d6b81e7e0 100644 --- a/hybridse/include/vm/catalog.h +++ b/hybridse/include/vm/catalog.h @@ -65,7 +65,7 @@ class TableHandler; class RowHandler; class Tablet; -enum HandlerType { kRowHandler, kTableHandler, kPartitionHandler }; +enum HandlerType { kRowHandler, kTableHandler, kPartitionHandler, kDataSetHandler }; enum OrderType { kDescOrder, kAscOrder, kNoneOrder }; /// \brief The basic dataset operation abstraction. @@ -217,6 +217,7 @@ class TableHandler : public DataHandler { virtual ~TableHandler() {} /// Return table column Types information. + /// TODO: rm it, never used virtual const Types& GetTypes() = 0; /// Return the index information diff --git a/hybridse/include/vm/mem_catalog.h b/hybridse/include/vm/mem_catalog.h index 2fc5df4960c..3692528b4b2 100644 --- a/hybridse/include/vm/mem_catalog.h +++ b/hybridse/include/vm/mem_catalog.h @@ -674,6 +674,156 @@ class MemPartitionHandler IndexHint index_hint_; OrderType order_type_; }; + +class ConcatIterator final : public RowIterator { + public: + ConcatIterator(std::unique_ptr&& left, size_t left_slices, std::unique_ptr&& right, + size_t right_slices) + : left_(std::move(left)), left_slices_(left_slices), right_(std::move(right)), right_slices_(right_slices) { + SeekToFirst(); + } + ~ConcatIterator() override {} + + bool Valid() const override { + return left_ && left_->Valid(); + } + void Next() override { + left_->Next(); + if (right_ && right_->Valid()) { + right_->Next(); + } + } + const uint64_t& GetKey() const override { + return left_->GetKey(); + } + const Row& GetValue() override { + if (!right_ || !right_->Valid()) { + buf_ = Row(left_slices_, left_->GetValue(), right_slices_, Row()); + } else { + buf_ = Row(left_slices_, left_->GetValue(), right_slices_, right_->GetValue()); + } + return buf_; + } + + bool IsSeekable() const override { return true; }; + + void Seek(const uint64_t& key) override { + left_->Seek(key); + if (right_ && right_->Valid()) { + right_->Seek(key); + } + } + + void SeekToFirst() override { + left_->SeekToFirst(); + if (right_) { + right_->SeekToFirst(); + } + } + + private: + std::unique_ptr left_; + size_t left_slices_; + std::unique_ptr right_; + size_t right_slices_; + + Row buf_; +}; + +class SimpleConcatTableHandler final : public TableHandler { + public: + SimpleConcatTableHandler(std::shared_ptr left, size_t left_slices, + std::shared_ptr right, size_t right_slices) + : left_(left), left_slices_(left_slices), right_(right), right_slices_(right_slices) {} + ~SimpleConcatTableHandler() override {} + + std::unique_ptr GetIterator() override { + auto p = GetRawIterator(); + if (p == nullptr) { + return {}; + } + return std::unique_ptr(p); + } + + RowIterator* GetRawIterator() override { + auto li = left_->GetIterator(); + if (!li) { + return nullptr; + } + auto ri = right_->GetIterator(); + return new ConcatIterator(std::move(li), left_slices_, std::move(ri), right_slices_); + } + + std::unique_ptr GetWindowIterator(const std::string& idx_name) override { return nullptr; } + + const Types& GetTypes() override { return left_->GetTypes(); } + + const IndexHint& GetIndex() override { return left_->GetIndex(); } + + // unimplemented + const Schema* GetSchema() override { return left_->GetSchema(); } + const std::string& GetName() override { return left_->GetName(); } + const std::string& GetDatabase() override { return left_->GetDatabase(); } + + private: + std::shared_ptr left_; + size_t left_slices_; + std::shared_ptr right_; + size_t right_slices_; +}; + +class ConcatPartitionHandler final : public PartitionHandler { + public: + ConcatPartitionHandler(std::shared_ptr left, size_t left_slices, + std::shared_ptr right, size_t right_slices) + : left_(left), left_slices_(left_slices), right_(right), right_slices_(right_slices) {} + ~ConcatPartitionHandler() override {} + + std::unique_ptr GetIterator() override { + auto p = GetRawIterator(); + if (p == nullptr) { + return {}; + } + return std::unique_ptr(p); + } + + RowIterator* GetRawIterator() override { + auto li = left_->GetIterator(); + if (!li) { + return nullptr; + } + auto ri = right_->GetIterator(); + return new ConcatIterator(std::move(li), left_slices_, std::move(ri), right_slices_); + } + + std::unique_ptr GetWindowIterator(const std::string &idx_name) override { + return nullptr; + } + std::unique_ptr GetWindowIterator() override { return nullptr;} + + std::shared_ptr GetSegment(const std::string &key) override { + auto left_seg = left_->GetSegment(key); + auto right_seg = right_->GetSegment(key); + return std::shared_ptr( + new SimpleConcatTableHandler(left_seg, left_slices_, right_seg, right_slices_)); + } + + const Types& GetTypes() override { return left_->GetTypes(); } + + const IndexHint& GetIndex() override { return left_->GetIndex(); } + + // unimplemented + const Schema* GetSchema() override { return nullptr; } + const std::string& GetName() override { return left_->GetName(); } + const std::string& GetDatabase() override { return left_->GetDatabase(); } + + private: + std::shared_ptr left_; + size_t left_slices_; + std::shared_ptr right_; + size_t right_slices_; +}; + class ConcatTableHandler : public MemTimeTableHandler { public: ConcatTableHandler(std::shared_ptr left, size_t left_slices, @@ -692,19 +842,19 @@ class ConcatTableHandler : public MemTimeTableHandler { status_ = SyncValue(); return MemTimeTableHandler::At(pos); } - std::unique_ptr GetIterator() { + std::unique_ptr GetIterator() override { if (status_.isRunning()) { status_ = SyncValue(); } return MemTimeTableHandler::GetIterator(); } - RowIterator* GetRawIterator() { + RowIterator* GetRawIterator() override { if (status_.isRunning()) { status_ = SyncValue(); } return MemTimeTableHandler::GetRawIterator(); } - virtual const uint64_t GetCount() { + const uint64_t GetCount() override { if (status_.isRunning()) { status_ = SyncValue(); } diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h index d2fdafb5349..3d05123b72b 100644 --- a/hybridse/include/vm/physical_op.h +++ b/hybridse/include/vm/physical_op.h @@ -785,7 +785,11 @@ class PhysicalAggregationNode : public PhysicalProjectNode { public: PhysicalAggregationNode(PhysicalOpNode *node, const ColumnProjects &project, const node::ExprNode *condition) : PhysicalProjectNode(node, kAggregation, project, true), having_condition_(condition) { - output_type_ = kSchemaTypeRow; + if (node->GetOutputType() == kSchemaTypeGroup) { + output_type_ = kSchemaTypeGroup; + } else { + output_type_ = kSchemaTypeRow; + } fn_infos_.push_back(&having_condition_.fn_info()); } virtual ~PhysicalAggregationNode() {} @@ -1415,7 +1419,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { instance_not_in_window_(false), exclude_current_time_(false), output_request_row_(true) { - output_type_ = kSchemaTypeTable; + InitOuptput(); fn_infos_.push_back(&window_.partition_.fn_info()); fn_infos_.push_back(&window_.index_key_.fn_info()); @@ -1427,7 +1431,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { instance_not_in_window_(w_ptr->instance_not_in_window()), exclude_current_time_(w_ptr->exclude_current_time()), output_request_row_(true) { - output_type_ = kSchemaTypeTable; + InitOuptput(); fn_infos_.push_back(&window_.partition_.fn_info()); fn_infos_.push_back(&window_.sort_.fn_info()); @@ -1443,7 +1447,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { instance_not_in_window_(instance_not_in_window), exclude_current_time_(exclude_current_time), output_request_row_(output_request_row) { - output_type_ = kSchemaTypeTable; + InitOuptput(); fn_infos_.push_back(&window_.partition_.fn_info()); fn_infos_.push_back(&window_.sort_.fn_info()); @@ -1484,11 +1488,10 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { std::vector GetDependents() const override; - const bool instance_not_in_window() const { - return instance_not_in_window_; - } - const bool exclude_current_time() const { return exclude_current_time_; } - const bool output_request_row() const { return output_request_row_; } + bool instance_not_in_window() const { return instance_not_in_window_; } + bool exclude_current_time() const { return exclude_current_time_; } + bool output_request_row() const { return output_request_row_; } + void set_output_request_row(bool flag) { output_request_row_ = flag; } const RequestWindowOp &window() const { return window_; } const RequestWindowUnionList &window_unions() const { return window_unions_; @@ -1506,10 +1509,20 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { } RequestWindowOp window_; - const bool instance_not_in_window_; - const bool exclude_current_time_; - const bool output_request_row_; + bool instance_not_in_window_; + bool exclude_current_time_; + bool output_request_row_; RequestWindowUnionList window_unions_; + + private: + void InitOuptput() { + auto left = GetProducer(0); + if (left->GetOutputType() == kSchemaTypeRow) { + output_type_ = kSchemaTypeTable; + } else { + output_type_ = kSchemaTypeGroup; + } + } }; class PhysicalRequestAggUnionNode : public PhysicalOpNode { diff --git a/hybridse/src/passes/physical/batch_request_optimize.cc b/hybridse/src/passes/physical/batch_request_optimize.cc index 52488e6a981..86fdfee92c5 100644 --- a/hybridse/src/passes/physical/batch_request_optimize.cc +++ b/hybridse/src/passes/physical/batch_request_optimize.cc @@ -269,6 +269,7 @@ static Status UpdateProjectExpr( return replacer.Replace(expr->DeepCopy(ctx->node_manager()), output); } +// simplify simple project, remove orphan descendant producer nodes static Status CreateSimplifiedProject(PhysicalPlanContext* ctx, PhysicalOpNode* input, const ColumnProjects& projects, @@ -279,8 +280,7 @@ static Status CreateSimplifiedProject(PhysicalPlanContext* ctx, can_project = false; for (size_t i = 0; i < cur_input->producers().size(); ++i) { auto cand_input = cur_input->GetProducer(i); - if (cand_input->GetOutputType() != - PhysicalSchemaType::kSchemaTypeRow) { + if (cand_input->GetOutputType() != PhysicalSchemaType::kSchemaTypeRow) { continue; } bool is_valid = true; @@ -949,21 +949,16 @@ Status CommonColumnOptimize::ProcessJoin(PhysicalPlanContext* ctx, } } else if (is_non_common_join) { // join only depend on non-common left part - if (left_state->non_common_op == join_op->GetProducer(0) && - right == join_op->GetProducer(1)) { + if (left_state->non_common_op == join_op->GetProducer(0) && right == join_op->GetProducer(1)) { state->common_op = nullptr; state->non_common_op = join_op; } else { PhysicalRequestJoinNode* new_join = nullptr; - CHECK_STATUS(ctx->CreateOp( - &new_join, left_state->non_common_op, right, join_op->join(), - join_op->output_right_only())); - CHECK_STATUS(ReplaceComponentExpr( - join_op->join(), join_op->joined_schemas_ctx(), - new_join->joined_schemas_ctx(), ctx->node_manager(), - &new_join->join_)); - state->common_op = - join_op->output_right_only() ? nullptr : left_state->common_op; + CHECK_STATUS(ctx->CreateOp(&new_join, left_state->non_common_op, right, + join_op->join(), join_op->output_right_only())); + CHECK_STATUS(ReplaceComponentExpr(join_op->join(), join_op->joined_schemas_ctx(), + new_join->joined_schemas_ctx(), ctx->node_manager(), &new_join->join_)); + state->common_op = join_op->output_right_only() ? nullptr : left_state->common_op; state->non_common_op = new_join; if (!join_op->output_right_only()) { for (size_t left_idx : left_state->common_column_indices) { diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.cc b/hybridse/src/passes/physical/group_and_sort_optimized.cc index ae333b6af47..642f64023c5 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.cc +++ b/hybridse/src/passes/physical/group_and_sort_optimized.cc @@ -25,6 +25,7 @@ #include "absl/cleanup/cleanup.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" +#include "node/node_enum.h" #include "vm/physical_op.h" namespace hybridse { @@ -294,6 +295,7 @@ bool GroupAndSortOptimized::KeysOptimized(const SchemasContext* root_schemas_ctx absl::Cleanup clean = [&]() { expr_cache_.clear(); + optimize_info_ = nullptr; }; auto s = BuildExprCache(left_key->keys(), root_schemas_ctx); @@ -347,6 +349,15 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas if (DataProviderType::kProviderTypeTable == scan_op->provider_type_ || DataProviderType::kProviderTypePartition == scan_op->provider_type_) { + if (optimize_info_) { + if (optimize_info_->left_key == left_key && optimize_info_->index_key == index_key && + optimize_info_->right_key == right_key && optimize_info_->sort_key == sort) { + if (optimize_info_->optimized != nullptr) { + *new_in = optimize_info_->optimized; + return true; + } + } + } const node::ExprListNode* right_partition = right_key == nullptr ? left_key->keys() : right_key->keys(); @@ -453,13 +464,15 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas dynamic_cast(node_manager_->MakeOrderByNode(node_manager_->MakeExprList( node_manager_->MakeOrderExpression(nullptr, first_order_expression->is_asc()))))); } + + optimize_info_.reset(new OptimizeInfo(left_key, index_key, right_key, sort, partition_op)); *new_in = partition_op; return true; } } else if (PhysicalOpType::kPhysicalOpSimpleProject == in->GetOpType()) { PhysicalOpNode* new_depend; - if (!KeysOptimizedImpl(in->GetProducer(0)->schemas_ctx(), in->GetProducer(0), left_key, index_key, right_key, sort, - &new_depend)) { + if (!KeysOptimizedImpl(in->GetProducer(0)->schemas_ctx(), in->GetProducer(0), left_key, index_key, right_key, + sort, &new_depend)) { return false; } @@ -493,7 +506,8 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas PhysicalFilterNode* filter_op = dynamic_cast(in); PhysicalOpNode* new_depend; - if (!KeysOptimizedImpl(root_schemas_ctx, in->producers()[0], left_key, index_key, right_key, sort, &new_depend)) { + if (!KeysOptimizedImpl(root_schemas_ctx, in->producers()[0], left_key, index_key, right_key, sort, + &new_depend)) { return false; } PhysicalFilterNode* new_filter = nullptr; @@ -515,8 +529,16 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas &new_depend)) { return false; } + PhysicalOpNode* new_right = in->GetProducer(1); + if (request_join->join_.join_type_ == node::kJoinTypeConcat) { + // for concat join, only acceptable if the two inputs (of course same table) optimized by the same index + auto* rebase_sc = in->GetProducer(1)->schemas_ctx(); + if (!KeysOptimizedImpl(rebase_sc, in->GetProducer(1), left_key, index_key, right_key, sort, &new_right)) { + return false; + } + } PhysicalRequestJoinNode* new_join = nullptr; - auto s = plan_ctx_->CreateOp(&new_join, new_depend, request_join->GetProducer(1), + auto s = plan_ctx_->CreateOp(&new_join, new_depend, new_right, request_join->join(), request_join->output_right_only()); if (!s.isOK()) { LOG(WARNING) << "Fail to create new request join op: " << s; @@ -545,6 +567,57 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas *new_in = new_join; return true; + } else if (PhysicalOpType::kPhysicalOpProject == in->GetOpType()) { + auto * project = dynamic_cast(in); + if (project == nullptr || project->project_type_ != vm::kAggregation) { + return false; + } + + auto * agg_project = dynamic_cast(in); + + PhysicalOpNode* new_depend = nullptr; + auto* rebase_sc = in->GetProducer(0)->schemas_ctx(); + if (!KeysOptimizedImpl(rebase_sc, in->GetProducer(0), left_key, index_key, right_key, sort, + &new_depend)) { + return false; + } + + vm::PhysicalAggregationNode* new_agg = nullptr; + if (!plan_ctx_ + ->CreateOp(&new_agg, new_depend, agg_project->project(), + agg_project->having_condition_.condition()) + .isOK()) { + return false; + } + *new_in = new_agg; + return true; + } else if (PhysicalOpType::kPhysicalOpRequestUnion == in->GetOpType()) { + // JOIN (..., AGG(REQUEST_UNION(left, ...))): JOIN condition optimizing left + PhysicalOpNode* new_depend = nullptr; + auto* rebase_sc = in->GetProducer(0)->schemas_ctx(); + if (!KeysOptimizedImpl(rebase_sc, in->GetProducer(0), left_key, index_key, right_key, sort, + &new_depend)) { + return false; + } + + auto * request_union = dynamic_cast(in); + + vm::PhysicalRequestUnionNode* new_union = nullptr; + if (!plan_ctx_ + ->CreateOp( + &new_union, new_depend, in->GetProducer(1), request_union->window(), + request_union->instance_not_in_window(), request_union->exclude_current_time(), + request_union->output_request_row()) + .isOK()) { + return false; + } + for (auto& pair : request_union->window_unions().window_unions_) { + if (!new_union->AddWindowUnion(pair.first)) { + return false; + } + } + *new_in = new_union; + return true; } return false; } diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.h b/hybridse/src/passes/physical/group_and_sort_optimized.h index 1d410f2b8e8..95fcabb7a1a 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.h +++ b/hybridse/src/passes/physical/group_and_sort_optimized.h @@ -93,6 +93,17 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { std::string db_name; }; + struct OptimizeInfo { + OptimizeInfo(const Key* left_key, const Key* index_key, const Key* right_key, const Sort* s, + PhysicalOpNode* optimized) + : left_key(left_key), index_key(index_key), right_key(right_key), sort_key(s), optimized(optimized) {} + const Key* left_key; + const Key* index_key; + const Key* right_key; + const Sort* sort_key; + PhysicalOpNode* optimized; + }; + private: bool Transform(PhysicalOpNode* in, PhysicalOpNode** output); @@ -149,6 +160,8 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { // A source column name is the column name in string that refers to a physical table, // only one table got optimized each time std::unordered_map expr_cache_; + + std::unique_ptr optimize_info_; }; } // namespace passes } // namespace hybridse diff --git a/hybridse/src/passes/physical/transform_up_physical_pass.h b/hybridse/src/passes/physical/transform_up_physical_pass.h index fed721d4c66..a9a80bd90b4 100644 --- a/hybridse/src/passes/physical/transform_up_physical_pass.h +++ b/hybridse/src/passes/physical/transform_up_physical_pass.h @@ -17,7 +17,6 @@ #define HYBRIDSE_SRC_PASSES_PHYSICAL_TRANSFORM_UP_PHYSICAL_PASS_H_ #include -#include #include #include diff --git a/hybridse/src/plan/planner.cc b/hybridse/src/plan/planner.cc index 1584d76acbb..e44075addea 100644 --- a/hybridse/src/plan/planner.cc +++ b/hybridse/src/plan/planner.cc @@ -272,7 +272,8 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, n auto first_window_project = dynamic_cast(project_list_vec[1]); node::ProjectListNode *merged_project = node_manager_->MakeProjectListPlanNode(first_window_project->GetW(), true); - if (!is_cluster_optimized_ && !enable_batch_window_parallelization_ && + // TODO(ace): make the split in the optimizer, like clustrer optimzed + if (!is_cluster_optimized_ && !enable_batch_window_parallelization_ && node::ProjectListNode::MergeProjectList(simple_project, first_window_project, merged_project)) { project_list_vec[0] = nullptr; project_list_vec[1] = merged_project; diff --git a/hybridse/src/testing/engine_test_base.h b/hybridse/src/testing/engine_test_base.h index e759169f0fd..0805ff1b3c5 100644 --- a/hybridse/src/testing/engine_test_base.h +++ b/hybridse/src/testing/engine_test_base.h @@ -318,8 +318,7 @@ class BatchRequestEngineTestRunner : public EngineTestRunner { bool has_batch_request = !sql_case_.batch_request().columns_.empty(); if (!has_batch_request) { - LOG(WARNING) << "No batch request field in case, " - << "try use last row from primary input"; + LOG(WARNING) << "No batch request field in case, try use last row from primary input"; } std::vector original_request_data; diff --git a/hybridse/src/vm/catalog_wrapper.cc b/hybridse/src/vm/catalog_wrapper.cc index d134a92e51b..2c511176369 100644 --- a/hybridse/src/vm/catalog_wrapper.cc +++ b/hybridse/src/vm/catalog_wrapper.cc @@ -164,5 +164,6 @@ RowIterator* LazyLastJoinWindowIterator::GetRawValue() { return new LazyLastJoinIterator(std::move(iter), right_, parameter_, join_); } + } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/catalog_wrapper.h b/hybridse/src/vm/catalog_wrapper.h index 11441b4bf54..ac1cff8b041 100644 --- a/hybridse/src/vm/catalog_wrapper.h +++ b/hybridse/src/vm/catalog_wrapper.h @@ -17,10 +17,12 @@ #ifndef HYBRIDSE_SRC_VM_CATALOG_WRAPPER_H_ #define HYBRIDSE_SRC_VM_CATALOG_WRAPPER_H_ +#include #include #include #include +#include "codec/row_iterator.h" #include "vm/catalog.h" #include "vm/generator.h" @@ -705,6 +707,196 @@ class LazyLastJoinWindowIterator final : public codec::WindowIterator { std::shared_ptr join_; }; +class LazyRequestUnionDataSetPartitionHandler final : public PartitionHandler { + public: + LazyRequestUnionDataSetPartitionHandler(std::shared_ptr left, + std::function(const Row&)> func) + : left_(left), func_(func) {} + ~LazyRequestUnionDataSetPartitionHandler() override {} + + std::unique_ptr GetWindowIterator(const std::string& idx_name) override { return nullptr; } + std::unique_ptr GetWindowIterator() override { return nullptr; } + + std::shared_ptr GetSegment(const std::string& key) override { return nullptr; } + + const HandlerType GetHandlerType() override { + return kDataSetHandler; + } + + const std::string GetHandlerTypeName() override { + return "LazyRequestUnionDataSePartitiontHandler"; + } + + std::unique_ptr GetIterator() override { + return std::unique_ptr(GetRawIterator()); + } + + const IndexHint& GetIndex() override { return left_->GetIndex(); } + + // unimplemented + const Types& GetTypes() override { return left_->GetTypes(); } + + // unimplemented + const Schema* GetSchema() override { return nullptr; } + const std::string& GetName() override { return left_->GetName(); } + const std::string& GetDatabase() override { return left_->GetDatabase(); } + + base::ConstIterator* GetRawIterator() override { + // auto left_it = left_->GetIterator(); + // return new LazyRequestUnionIterator(std::move(left_it), parameter_, func_); + return nullptr; + } + + auto Left() const { return left_; } + auto Func() const { return func_; } + + private: + std::shared_ptr left_; + std::function(const Row&)> func_; +}; + +class LazyAggIterator final : public RowIterator { + public: + LazyAggIterator(std::unique_ptr&& it, std::function(const Row&)> func, + std::shared_ptr agg_gen, const Row& param) + : it_(std::move(it)), func_(func), agg_gen_(agg_gen), parameter_(param) { + SeekToFirst(); + } + + ~LazyAggIterator() override {} + + bool Valid() const override { + return it_->Valid(); + } + void Next() override { it_->Next(); } + const uint64_t& GetKey() const override { return it_->GetKey(); } + const Row& GetValue() override { + if (Valid()) { + auto request = it_->GetValue(); + auto window = func_(request); + if (window) { + buf_ = agg_gen_->Gen(parameter_, window); + return buf_; + } + } + + buf_ = Row(); + return buf_; + } + + bool IsSeekable() const override { return true; }; + + void Seek(const uint64_t& key) override { it_->Seek(key); } + + void SeekToFirst() override { + it_->SeekToFirst(); + } + + private: + std::unique_ptr it_; + std::function(const Row&)> func_; + std::shared_ptr agg_gen_; + const Row& parameter_; + + Row buf_; +}; + +class LazyAggTableHandler final : public TableHandler { + public: + LazyAggTableHandler(std::shared_ptr left, + std::function(const Row&)> func, + std::shared_ptr agg_gen, const Row& param) + : left_(left), func_(func), agg_gen_(agg_gen), parameter_(param) { + DLOG(INFO) << "iterator count = " << left_->GetCount(); + } + ~LazyAggTableHandler() override {} + + std::unique_ptr GetIterator() override { + auto* it = GetRawIterator(); + if (it == nullptr) { + return {}; + } + return std::unique_ptr(it); + } + + // unimplemented + const Types& GetTypes() override { return left_->GetTypes(); } + + const IndexHint& GetIndex() override { return left_->GetIndex(); } + + // unimplemented + std::unique_ptr GetWindowIterator(const std::string& idx_name) override { return nullptr; } + + // unimplemented + const Schema* GetSchema() override { return nullptr; } + const std::string& GetName() override { return left_->GetName(); } + const std::string& GetDatabase() override { return left_->GetDatabase(); } + + base::ConstIterator* GetRawIterator() override { + auto it = left_->GetIterator(); + if (!it) { + return nullptr; + } + return new LazyAggIterator(std::move(it), func_, agg_gen_, parameter_); + } + + std::shared_ptr GetPartition(const std::string& index_name) override { + return nullptr; + } + + + private: + std::shared_ptr left_; + std::function(const Row&)> func_; + std::shared_ptr agg_gen_; + const Row& parameter_; +}; + +class LazyAggPartitionHandler final : public PartitionHandler { + public: + LazyAggPartitionHandler(std::shared_ptr input, + std::shared_ptr agg_gen, const Row& param) + : input_(input), agg_gen_(agg_gen), parameter_(param) {} + ~LazyAggPartitionHandler() override {} + + + std::shared_ptr GetSegment(const std::string &key) override { + auto seg = input_->Left()->GetSegment(key); + return std::shared_ptr(new LazyAggTableHandler(seg, input_->Func(), agg_gen_, parameter_)); + } + + const std::string GetHandlerTypeName() override { + return "LazyLastJoinPartitionHandler"; + } + + std::unique_ptr GetIterator() override { + auto it = input_->Left()->GetIterator(); + return std::unique_ptr(new LazyAggIterator(std::move(it), input_->Func(), agg_gen_, parameter_)); + } + + std::unique_ptr GetWindowIterator() override { return nullptr; } + + const IndexHint& GetIndex() override { return input_->GetIndex(); } + + // unimplemented + const Types& GetTypes() override { return input_->GetTypes(); } + + // unimplemented + const Schema* GetSchema() override { return nullptr; } + const std::string& GetName() override { return input_->GetName(); } + const std::string& GetDatabase() override { return input_->GetDatabase(); } + + // unimplemented + base::ConstIterator* GetRawIterator() override { return nullptr; } + + private: + std::shared_ptr input_; + std::shared_ptr agg_gen_; + const Row& parameter_; +}; + + + } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/engine.cc b/hybridse/src/vm/engine.cc index 4fdc368887e..fc88a6ccda1 100644 --- a/hybridse/src/vm/engine.cc +++ b/hybridse/src/vm/engine.cc @@ -153,7 +153,7 @@ bool Engine::Get(const std::string& sql, const std::string& db, RunSession& sess DLOG(INFO) << "Compile Engine ..."; status = base::Status::OK(); std::shared_ptr info = std::make_shared(); - auto& sql_context = std::dynamic_pointer_cast(info)->get_sql_context(); + auto& sql_context = info->get_sql_context(); sql_context.sql = sql; sql_context.db = db; sql_context.engine_mode = session.engine_mode(); diff --git a/hybridse/src/vm/generator.cc b/hybridse/src/vm/generator.cc index 28542a7befb..aaa16ff2783 100644 --- a/hybridse/src/vm/generator.cc +++ b/hybridse/src/vm/generator.cc @@ -16,6 +16,7 @@ #include "vm/generator.h" +#include "vm/catalog_wrapper.h" #include "vm/runner.h" namespace hybridse { diff --git a/hybridse/src/vm/generator.h b/hybridse/src/vm/generator.h index 4dded0d6ebf..7bb49337794 100644 --- a/hybridse/src/vm/generator.h +++ b/hybridse/src/vm/generator.h @@ -79,11 +79,17 @@ class ConstProjectGenerator : public FnGenerator { const Row Gen(const Row& parameter); RowProjectFun fun_; }; -class AggGenerator : public FnGenerator { +class AggGenerator : public FnGenerator, public std::enable_shared_from_this { public: - explicit AggGenerator(const FnInfo& info) : FnGenerator(info) {} + [[nodiscard]] static std::shared_ptr Create(const FnInfo& info) { + return std::shared_ptr(new AggGenerator(info)); + } + virtual ~AggGenerator() {} const Row Gen(const codec::Row& parameter_row, std::shared_ptr table); + + private: + explicit AggGenerator(const FnInfo& info) : FnGenerator(info) {} }; class WindowProjectGenerator : public FnGenerator { public: @@ -112,8 +118,18 @@ class ConditionGenerator : public FnGenerator { const bool Gen(const Row& row, const Row& parameter) const; const bool Gen(std::shared_ptr table, const codec::Row& parameter_row); }; -class RangeGenerator { +class RangeGenerator : public std::enable_shared_from_this { public: + [[nodiscard]] static std::shared_ptr Create(const Range& range) { + return std::shared_ptr(new RangeGenerator(range)); + } + virtual ~RangeGenerator() {} + + const bool Valid() const { return ts_gen_.Valid(); } + OrderGenerator ts_gen_; + WindowRange window_range_; + + private: explicit RangeGenerator(const Range& range) : ts_gen_(range.fn_info()), window_range_() { if (range.frame_ != nullptr) { switch (range.frame()->frame_type()) { @@ -142,11 +158,8 @@ class RangeGenerator { } } } - virtual ~RangeGenerator() {} - const bool Valid() const { return ts_gen_.Valid(); } - OrderGenerator ts_gen_; - WindowRange window_range_; }; + class FilterKeyGenerator { public: explicit FilterKeyGenerator(const Key& filter_key) : filter_key_(filter_key.fn_info()) {} @@ -253,13 +266,15 @@ class FilterGenerator : public PredicateFun { class WindowGenerator { public: explicit WindowGenerator(const WindowOp& window) - : window_op_(window), partition_gen_(window.partition_), sort_gen_(window.sort_), range_gen_(window.range_) {} + : window_op_(window), partition_gen_(window.partition_), sort_gen_(window.sort_) { + range_gen_ = RangeGenerator::Create(window.range_); + } virtual ~WindowGenerator() {} - const int64_t OrderKey(const Row& row) { return range_gen_.ts_gen_.Gen(row); } + const int64_t OrderKey(const Row& row) { return range_gen_->ts_gen_.Gen(row); } const WindowOp window_op_; PartitionGenerator partition_gen_; SortGenerator sort_gen_; - RangeGenerator range_gen_; + std::shared_ptr range_gen_; }; class RequestWindowGenertor { diff --git a/hybridse/src/vm/mem_catalog.cc b/hybridse/src/vm/mem_catalog.cc index dca41c9355b..29a2e2791e4 100644 --- a/hybridse/src/vm/mem_catalog.cc +++ b/hybridse/src/vm/mem_catalog.cc @@ -18,8 +18,6 @@ #include -#include "absl/strings/substitute.h" - namespace hybridse { namespace vm { MemTimeTableIterator::MemTimeTableIterator(const MemTimeTable* table, diff --git a/hybridse/src/vm/runner.cc b/hybridse/src/vm/runner.cc index 586f75c6187..cea1d45ee8c 100644 --- a/hybridse/src/vm/runner.cc +++ b/hybridse/src/vm/runner.cc @@ -52,6 +52,15 @@ static bool IsPartitionProvider(vm::PhysicalOpNode* n) { } } +static vm::PhysicalDataProviderNode* request_node(vm::PhysicalOpNode* n) { + switch (n->GetOpType()) { + case kPhysicalOpDataProvider: + return dynamic_cast(n); + default: + return request_node(n->GetProducer(0)); + } +} + // Build Runner for each physical node // return cluster task of given runner // @@ -328,6 +337,16 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { } } } + if (support_cluster_optimized_) { + if (IsPartitionProvider(node->GetProducer(0))) { + // route by index of the left source, and it should uncompleted + auto& route_info = left_task.GetRouteInfo(); + runner->AddProducer(left_task.GetRoot()); + runner->AddProducer(right_task.GetRoot()); + return RegisterTask(node, + UnCompletedClusterTask(runner, route_info.table_handler_, route_info.index_)); + } + } return RegisterTask( node, BinaryInherit(left_task, right_task, runner, index_key, kRightBias)); @@ -372,6 +391,7 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { if (right_task.IsCompletedClusterTask() && right_task.GetRouteInfo().lazy_route_ && !op->join_.index_key_.ValidKey()) { + // join (.., filter) auto& route_info = right_task.GetRouteInfo(); runner->AddProducer(left_task.GetRoot()); runner->AddProducer(right_task.GetRoot()); @@ -387,10 +407,20 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { if (support_cluster_optimized_) { if (right_task.IsCompletedClusterTask() && right_task.GetRouteInfo().lazy_route_ && !op->join_.index_key_.ValidKey()) { + // concat join (.., filter) runner->AddProducer(left_task.GetRoot()); runner->AddProducer(right_task.GetRoot()); return RegisterTask(node, ClusterTask(runner, {}, RouteInfo{})); } + + // concat join (any(tx), any(tx)), tx is not request table + auto left = request_node(node->GetProducer(0)); + // auto right = request_node(node->GetProducer(1)); + if (left->provider_type_ == kProviderTypePartition) { + runner->AddProducer(left_task.GetRoot()); + runner->AddProducer(right_task.GetRoot()); + return RegisterTask(node, ClusterTask(runner, {}, left_task.GetRouteInfo())); + } } return RegisterTask(node, BinaryInherit(left_task, right_task, runner, Key(), kNoBias)); } @@ -1526,7 +1556,7 @@ void WindowAggRunner::RunWindowAggOnKey( int32_t min_union_pos = IteratorStatus::FindLastIteratorWithMininumKey(union_segment_status); int32_t cnt = output_table->GetCount(); - HistoryWindow window(instance_window_gen_.range_gen_.window_range_); + HistoryWindow window(instance_window_gen_.range_gen_->window_range_); window.set_instance_not_in_window(instance_not_in_window_); window.set_exclude_current_time(exclude_current_time_); @@ -1602,6 +1632,8 @@ std::shared_ptr RequestLastJoinRunner::Run( return join_gen_->LazyLastJoin(left_part, std::dynamic_pointer_cast(right), ctx.GetParameterRow()); } + + LOG(WARNING) << "skip due to performance: left source of request join is table handler (unoptimized)"; return std::shared_ptr(); } @@ -1745,6 +1777,10 @@ void Runner::PrintData(std::ostringstream& oss, } else { t.add("EmptyDataHandler"); } + if (data->GetHandlerType() == kDataSetHandler) { + oss << t; + return; + } for (size_t i = 0; i < schema_list->GetSchemaSourceSize(); ++i) { auto source = schema_list->GetSchemaSource(i); for (int j = 0; j < source->GetSchema()->size(); j++) { @@ -2101,20 +2137,23 @@ std::shared_ptr ConcatRunner::Run( auto right = inputs[1]; auto left = inputs[0]; size_t left_slices = producers_[0]->output_schemas()->GetSchemaSourceSize(); - size_t right_slices = - producers_[1]->output_schemas()->GetSchemaSourceSize(); + size_t right_slices = producers_[1]->output_schemas()->GetSchemaSourceSize(); if (!left) { return std::shared_ptr(); } switch (left->GetHandlerType()) { case kRowHandler: - return std::shared_ptr(new RowCombineWrapper( - std::dynamic_pointer_cast(left), left_slices, - std::dynamic_pointer_cast(right), right_slices)); + return std::shared_ptr( + new RowCombineWrapper(std::dynamic_pointer_cast(left), left_slices, + std::dynamic_pointer_cast(right), right_slices)); case kTableHandler: - return std::shared_ptr(new ConcatTableHandler( - std::dynamic_pointer_cast(left), left_slices, - std::dynamic_pointer_cast(right), right_slices)); + return std::shared_ptr( + new ConcatTableHandler(std::dynamic_pointer_cast(left), left_slices, + std::dynamic_pointer_cast(right), right_slices)); + case kPartitionHandler: + return std::shared_ptr( + new ConcatPartitionHandler(std::dynamic_pointer_cast(left), left_slices, + std::dynamic_pointer_cast(right), right_slices)); default: { LOG(WARNING) << "fail to run conncat runner: handler type unsupported"; @@ -2149,6 +2188,8 @@ std::shared_ptr LimitRunner::Run( LOG(WARNING) << "fail limit when input type isn't row or table"; return fail_ptr; } + default: + break; } return fail_ptr; } @@ -2205,7 +2246,7 @@ std::shared_ptr GroupAggRunner::Run( return std::shared_ptr(); } if (!having_condition_.Valid() || having_condition_.Gen(table, parameter)) { - output_table->AddRow(agg_gen_.Gen(parameter, table)); + output_table->AddRow(agg_gen_->Gen(parameter, table)); } return output_table; } else if (kPartitionHandler == input->GetHandlerType()) { @@ -2228,7 +2269,7 @@ std::shared_ptr GroupAggRunner::Run( if (limit_cnt_.has_value() && cnt++ >= limit_cnt_) { break; } - output_table->AddRow(agg_gen_.Gen(parameter, segment)); + output_table->AddRow(agg_gen_->Gen(parameter, segment)); } iter->Next(); } @@ -2305,10 +2346,10 @@ std::shared_ptr RequestAggUnionRunner::Run( } auto request = std::dynamic_pointer_cast(request_handler)->GetValue(); - int64_t ts_gen = range_gen_.Valid() ? range_gen_.ts_gen_.Gen(request) : -1; + int64_t ts_gen = range_gen_->Valid() ? range_gen_->ts_gen_.Gen(request) : -1; // Prepare Union Window - auto union_inputs = windows_union_gen_.RunInputs(ctx); + auto union_inputs = windows_union_gen_->RunInputs(ctx); if (ctx.is_debug()) { for (size_t i = 0; i < union_inputs.size(); i++) { std::ostringstream sss; @@ -2317,13 +2358,13 @@ std::shared_ptr RequestAggUnionRunner::Run( } } - auto& key_gen = windows_union_gen_.windows_gen_[0].index_seek_gen_.index_key_gen_; + auto& key_gen = windows_union_gen_->windows_gen_[0].index_seek_gen_.index_key_gen_; std::string key = key_gen.Gen(request, ctx.GetParameterRow()); // do not use codegen to gen the union outputs for aggr segment union_inputs.pop_back(); auto union_segments = - windows_union_gen_.GetRequestWindows(request, ctx.GetParameterRow(), union_inputs); + windows_union_gen_->GetRequestWindows(request, ctx.GetParameterRow(), union_inputs); // code_gen result of agg_segment is not correct. we correct the result here auto agg_segment = std::dynamic_pointer_cast(union_inputs[1])->GetSegment(key); if (agg_segment) { @@ -2342,12 +2383,12 @@ std::shared_ptr RequestAggUnionRunner::Run( std::shared_ptr window; if (agg_segment) { - window = RequestUnionWindow(request, union_segments, ts_gen, range_gen_.window_range_, output_request_row_, + window = RequestUnionWindow(request, union_segments, ts_gen, range_gen_->window_range_, output_request_row_, exclude_current_time_); } else { LOG(WARNING) << "Aggr segment is empty. Fall back to normal RequestUnionRunner"; - window = RequestUnionRunner::RequestUnionWindow(request, union_segments, ts_gen, range_gen_.window_range_, true, - exclude_current_time_); + window = RequestUnionRunner::RequestUnionWindow(request, union_segments, ts_gen, range_gen_->window_range_, + true, exclude_current_time_); } return window; @@ -2766,9 +2807,8 @@ std::shared_ptr ReduceRunner::Run( return row_handler; } -std::shared_ptr RequestUnionRunner::Run( - RunnerContext& ctx, - const std::vector>& inputs) { +std::shared_ptr RequestUnionRunner::Run(RunnerContext& ctx, + const std::vector>& inputs) { auto fail_ptr = std::shared_ptr(); if (inputs.size() < 2u) { LOG(WARNING) << "inputs size < 2"; @@ -2779,23 +2819,31 @@ std::shared_ptr RequestUnionRunner::Run( if (!left || !right) { return std::shared_ptr(); } - if (kRowHandler != left->GetHandlerType()) { - return std::shared_ptr(); + if (kRowHandler == left->GetHandlerType()) { + auto request = std::dynamic_pointer_cast(left)->GetValue(); + return RunOneRequest(&ctx, request); + } else if (kPartitionHandler == left->GetHandlerType()) { + auto left_part = std::dynamic_pointer_cast(left); + auto func = std::bind(&RequestUnionRunner::RunOneRequest, this, &ctx, std::placeholders::_1); + return std::shared_ptr( + new LazyRequestUnionDataSetPartitionHandler(left_part, func)); } - auto request = std::dynamic_pointer_cast(left)->GetValue(); - + LOG(WARNING) << "skip due to performance: left source of request union is table handler(unoptimized)"; + return std::shared_ptr(); +} +std::shared_ptr RequestUnionRunner::RunOneRequest(RunnerContext* ctx, const Row& request) { // ts_gen < 0 if there is no ORDER BY clause for WINDOW - int64_t ts_gen = range_gen_.Valid() ? range_gen_.ts_gen_.Gen(request) : -1; + int64_t ts_gen = range_gen_->Valid() ? range_gen_->ts_gen_.Gen(request) : -1; // Prepare Union Window - auto union_inputs = windows_union_gen_.RunInputs(ctx); - auto union_segments = - windows_union_gen_.GetRequestWindows(request, ctx.GetParameterRow(), union_inputs); + auto union_inputs = windows_union_gen_->RunInputs(*ctx); + auto union_segments = windows_union_gen_->GetRequestWindows(request, ctx->GetParameterRow(), union_inputs); // build window with start and end offset - return RequestUnionWindow(request, union_segments, ts_gen, range_gen_.window_range_, output_request_row_, + return RequestUnionWindow(request, union_segments, ts_gen, range_gen_->window_range_, output_request_row_, exclude_current_time_); } + std::shared_ptr RequestUnionRunner::RequestUnionWindow( const Row& request, std::vector> union_segments, int64_t ts_gen, const WindowRange& window_range, bool output_request_row, bool exclude_current_time) { @@ -2941,16 +2989,26 @@ std::shared_ptr AggRunner::Run( LOG(WARNING) << "input is empty"; return std::shared_ptr(); } - if (kTableHandler != input->GetHandlerType()) { - return std::shared_ptr(); - } - auto table = std::dynamic_pointer_cast(input); - auto parameter = ctx.GetParameterRow(); - if (having_condition_.Valid() && !having_condition_.Gen(table, parameter)) { - return std::shared_ptr(); + + if (kTableHandler == input->GetHandlerType()) { + auto table = std::dynamic_pointer_cast(input); + auto parameter = ctx.GetParameterRow(); + if (having_condition_.Valid() && !having_condition_.Gen(table, parameter)) { + return std::shared_ptr(); + } + auto row_handler = std::shared_ptr(new MemRowHandler(agg_gen_->Gen(parameter, table))); + return row_handler; + } else if (kDataSetHandler == input->GetHandlerType()) { + // lazify + auto data_set = std::dynamic_pointer_cast(input); + if (data_set == nullptr) { + return std::shared_ptr(); + } + + return std::shared_ptr(new LazyAggPartitionHandler(data_set, agg_gen_, ctx.GetParameterRow())); } - auto row_handler = std::shared_ptr(new MemRowHandler(agg_gen_.Gen(parameter, table))); - return row_handler; + + return std::shared_ptr(); } std::shared_ptr ProxyRequestRunner::BatchRequestRun( RunnerContext& ctx) { diff --git a/hybridse/src/vm/runner.h b/hybridse/src/vm/runner.h index 64e712bbde7..c4a543a15bf 100644 --- a/hybridse/src/vm/runner.h +++ b/hybridse/src/vm/runner.h @@ -32,7 +32,6 @@ #include "node/node_manager.h" #include "vm/aggregator.h" #include "vm/catalog.h" -#include "vm/catalog_wrapper.h" #include "vm/core_api.h" #include "vm/generator.h" #include "vm/mem_catalog.h" @@ -354,9 +353,12 @@ class WindowUnionGenerator : public InputsGenerator { std::vector windows_gen_; }; -class RequestWindowUnionGenerator : public InputsGenerator { +class RequestWindowUnionGenerator : public InputsGenerator, + public std::enable_shared_from_this { public: - RequestWindowUnionGenerator() : InputsGenerator() {} + [[nodiscard]] static std::shared_ptr Create() { + return std::shared_ptr(new RequestWindowUnionGenerator()); + } virtual ~RequestWindowUnionGenerator() {} void AddWindowUnion(const RequestWindowOp& window_op, Runner* runner) { @@ -373,6 +375,9 @@ class RequestWindowUnionGenerator : public InputsGenerator { return union_segments; } std::vector windows_gen_; + + private: + RequestWindowUnionGenerator() : InputsGenerator() {} }; class WindowJoinGenerator : public InputsGenerator { @@ -549,7 +554,7 @@ class GroupAggRunner : public Runner { : Runner(id, kRunnerGroupAgg, schema, limit_cnt), group_(group.fn_info()), having_condition_(having_condition.fn_info()), - agg_gen_(project) {} + agg_gen_(AggGenerator::Create(project)) {} ~GroupAggRunner() {} std::shared_ptr Run( RunnerContext& ctx, // NOLINT @@ -557,24 +562,22 @@ class GroupAggRunner : public Runner { override; // NOLINT KeyGenerator group_; ConditionGenerator having_condition_; - AggGenerator agg_gen_; + std::shared_ptr agg_gen_; }; class AggRunner : public Runner { public: - AggRunner(const int32_t id, const SchemasContext* schema, - const std::optional limit_cnt, - const ConditionFilter& having_condition, - const FnInfo& fn_info) + AggRunner(const int32_t id, const SchemasContext* schema, const std::optional limit_cnt, + const ConditionFilter& having_condition, const FnInfo& fn_info) : Runner(id, kRunnerAgg, schema, limit_cnt), having_condition_(having_condition.fn_info()), - agg_gen_(fn_info) {} + agg_gen_(AggGenerator::Create(fn_info)) {} ~AggRunner() {} std::shared_ptr Run( RunnerContext& ctx, // NOLINT const std::vector>& inputs) override; // NOLINT ConditionGenerator having_condition_; - AggGenerator agg_gen_; + std::shared_ptr agg_gen_; }; class ReduceRunner : public Runner { @@ -583,12 +586,12 @@ class ReduceRunner : public Runner { const ConditionFilter& having_condition, const FnInfo& fn_info) : Runner(id, kRunnerReduce, schema, limit_cnt), having_condition_(having_condition.fn_info()), - agg_gen_(fn_info) {} + agg_gen_(AggGenerator::Create(fn_info)) {} ~ReduceRunner() {} std::shared_ptr Run(RunnerContext& ctx, const std::vector>& inputs) override; ConditionGenerator having_condition_; - AggGenerator agg_gen_; + std::shared_ptr agg_gen_; }; class WindowAggRunner : public Runner { @@ -638,37 +641,39 @@ class WindowAggRunner : public Runner { class RequestUnionRunner : public Runner { public: - RequestUnionRunner(const int32_t id, const SchemasContext* schema, - const std::optional limit_cnt, const Range& range, - bool exclude_current_time, bool output_request_row) + RequestUnionRunner(const int32_t id, const SchemasContext* schema, const std::optional limit_cnt, + const Range& range, bool exclude_current_time, bool output_request_row) : Runner(id, kRunnerRequestUnion, schema, limit_cnt), - range_gen_(range), + range_gen_(RangeGenerator::Create(range)), exclude_current_time_(exclude_current_time), - output_request_row_(output_request_row) {} + output_request_row_(output_request_row) { + windows_union_gen_ = RequestWindowUnionGenerator::Create(); + } + + std::shared_ptr Run(RunnerContext& ctx, // NOLINT + const std::vector>& inputs) override; + + std::shared_ptr RunOneRequest(RunnerContext* ctx, const Row& request); - std::shared_ptr Run( - RunnerContext& ctx, // NOLINT - const std::vector>& inputs) - override; // NOLINT static std::shared_ptr RequestUnionWindow(const Row& request, std::vector> union_segments, int64_t request_ts, const WindowRange& window_range, bool output_request_row, bool exclude_current_time); void AddWindowUnion(const RequestWindowOp& window, Runner* runner) { - windows_union_gen_.AddWindowUnion(window, runner); + windows_union_gen_->AddWindowUnion(window, runner); } void Print(std::ostream& output, const std::string& tab, std::set* visited_ids) const override { Runner::Print(output, tab, visited_ids); output << "\n" << tab << "window unions:\n"; - for (auto& r : windows_union_gen_.input_runners_) { + for (auto& r : windows_union_gen_->input_runners_) { r->Print(output, tab + " ", visited_ids); } } - RequestWindowUnionGenerator windows_union_gen_; - RangeGenerator range_gen_; + std::shared_ptr windows_union_gen_; + std::shared_ptr range_gen_; bool exclude_current_time_; bool output_request_row_; }; @@ -679,7 +684,7 @@ class RequestAggUnionRunner : public Runner { const Range& range, bool exclude_current_time, bool output_request_row, const node::CallExprNode* project) : Runner(id, kRunnerRequestAggUnion, schema, limit_cnt), - range_gen_(range), + range_gen_(RangeGenerator::Create(range)), exclude_current_time_(exclude_current_time), output_request_row_(output_request_row), func_(project->GetFnDef()), @@ -704,7 +709,7 @@ class RequestAggUnionRunner : public Runner { const bool output_request_row, const bool exclude_current_time) const; void AddWindowUnion(const RequestWindowOp& window, Runner* runner) { - windows_union_gen_.AddWindowUnion(window, runner); + windows_union_gen_->AddWindowUnion(window, runner); } static std::string PrintEvalValue(const absl::StatusOr>& val); @@ -723,8 +728,8 @@ class RequestAggUnionRunner : public Runner { kMaxWhere, }; - RequestWindowUnionGenerator windows_union_gen_; - RangeGenerator range_gen_; + std::shared_ptr windows_union_gen_; + std::shared_ptr range_gen_; bool exclude_current_time_; // include request row from union. diff --git a/hybridse/src/vm/transform.cc b/hybridse/src/vm/transform.cc index d52667dbc6f..768f4f4a9b5 100644 --- a/hybridse/src/vm/transform.cc +++ b/hybridse/src/vm/transform.cc @@ -639,16 +639,13 @@ Status RequestModeTransformer::TransformWindowOp(PhysicalOpNode* depend, } case kPhysicalOpDataProvider: { auto data_op = dynamic_cast(depend); - CHECK_TRUE(data_op->provider_type_ == kProviderTypeRequest, - kPlanError, - "Do not support window on non-request input"); + CHECK_TRUE(data_op->provider_type_ != kProviderTypePartition, kPlanError, "data node already a partition"); auto name = data_op->table_handler_->GetName(); auto db_name = data_op->table_handler_->GetDatabase(); auto table = catalog_->GetTable(db_name, name); - CHECK_TRUE(table != nullptr, kPlanError, - "Fail to transform data provider op: table " + name + - "not exists"); + CHECK_TRUE(table != nullptr, kPlanError, "Fail to transform data provider op: table ", name, "not exists"); + PhysicalTableProviderNode* right = nullptr; CHECK_STATUS(CreateOp(&right, table)); @@ -657,6 +654,11 @@ Status RequestModeTransformer::TransformWindowOp(PhysicalOpNode* depend, data_op, right, table->GetDatabase(), table->GetName(), table->GetSchema(), nullptr, w_ptr, &request_union_op)); + if (data_op->provider_type_ == kProviderTypeTable) { + // REQUEST_UNION(t1, t1) do not has request table, dont output reqeust row + request_union_op->set_output_request_row(false); + } + if (!w_ptr->union_tables().empty()) { for (auto iter = w_ptr->union_tables().cbegin(); iter != w_ptr->union_tables().cend(); iter++) { @@ -1403,19 +1405,24 @@ Status BatchModeTransformer::CreatePhysicalProjectNode( } case kAggregation: { PhysicalAggregationNode* agg_op = nullptr; - CHECK_STATUS(CreateOp(&agg_op, depend, - column_projects, having_condition)); + CHECK_STATUS(CreateOp(&agg_op, depend, column_projects, having_condition)); *output = agg_op; break; } case kGroupAggregation: { - CHECK_TRUE(!node::ExprListNullOrEmpty(group_keys), kPlanError, - "Can not create group agg with non group keys"); + if (node::ExprListNullOrEmpty(group_keys)) { + PhysicalAggregationNode* agg_op = nullptr; + CHECK_STATUS(CreateOp(&agg_op, depend, column_projects, having_condition)); + *output = agg_op; + } else { + // CHECK_TRUE(!node::ExprListNullOrEmpty(group_keys), kPlanError, + // "Can not create group agg with non group keys"); - PhysicalGroupAggrerationNode* agg_op = nullptr; - CHECK_STATUS(CreateOp( - &agg_op, depend, column_projects, having_condition, group_keys)); - *output = agg_op; + PhysicalGroupAggrerationNode* agg_op = nullptr; + CHECK_STATUS(CreateOp(&agg_op, depend, column_projects, having_condition, + group_keys)); + *output = agg_op; + } break; } case kWindowAggregation: { @@ -1455,6 +1462,10 @@ base::Status BatchModeTransformer::ExtractGroupKeys(vm::PhysicalOpNode* depend, CHECK_STATUS(ExtractGroupKeys(depend->GetProducer(0), keys)) return base::Status::OK(); } + + if (depend->GetOpType() == kPhysicalOpRequestUnion) { + return base::Status::OK(); + } CHECK_TRUE(depend->GetOpType() == kPhysicalOpGroupBy, kPlanError, "Fail to extract group keys from op ", vm::PhysicalOpTypeName(depend->GetOpType())) *keys = dynamic_cast(depend)->group().keys_; @@ -1637,12 +1648,26 @@ Status BatchModeTransformer::ValidatePartitionDataProvider(PhysicalOpNode* in) { if (kPhysicalOpSimpleProject == in->GetOpType() || kPhysicalOpRename == in->GetOpType() || kPhysicalOpFilter == in->GetOpType()) { CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(0))) + } else if (kPhysicalOpProject == in->GetOpType()) { + auto* prj = dynamic_cast(in); + CHECK_TRUE(prj->project_type_ == kAggregation, kPlanError, + "can't optimize project node: ", in->GetTreeString()); + CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(0))); } else if (kPhysicalOpRequestJoin == in->GetOpType()) { CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(0))); CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(1))); + } else if (kPhysicalOpRequestUnion == in->GetOpType()) { + CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(0))); + auto n = dynamic_cast(in); + if (!n->instance_not_in_window()) { + CHECK_STATUS(ValidatePartitionDataProvider(in->GetProducer(1))); + } + for (auto& window_union : n->window_unions().window_unions_) { + CHECK_STATUS(ValidateWindowIndexOptimization(window_union.second, window_union.first)); + } } else { CHECK_TRUE(kPhysicalOpDataProvider == in->GetOpType() && - kProviderTypePartition == dynamic_cast(in)->provider_type_, + kProviderTypeTable != dynamic_cast(in)->provider_type_, kPlanError, "Isn't partition provider:", in->GetTreeString()); } return Status::OK(); @@ -1667,7 +1692,7 @@ Status BatchModeTransformer::ValidateJoinIndexOptimization( return Status::OK(); } else { CHECK_STATUS(ValidatePartitionDataProvider(right), - "Join node hasn't been optimized"); + "Join node hasn't been optimized: right=", right->GetTreeString()); } return Status::OK(); }