From dda067a55bf7b7b34476f5305789ed6899908fe1 Mon Sep 17 00:00:00 2001 From: aceforeverd Date: Wed, 6 Dec 2023 09:55:52 +0000 Subject: [PATCH] feat!(deployment): auto create index for deploy --- CMakeLists.txt | 5 + cases/query/union_query.yml | 6 + hybridse/include/vm/engine.h | 12 +- hybridse/include/vm/engine_context.h | 28 +- hybridse/include/vm/physical_op.h | 16 + .../physical/group_and_sort_optimized.cc | 197 +++---- .../physical/group_and_sort_optimized.h | 15 +- hybridse/src/sdk/hybridse_interface_core.i | 2 + hybridse/src/vm/engine.cc | 1 + hybridse/src/vm/physical_plan_context.h | 22 +- hybridse/src/vm/sql_compiler.cc | 30 +- hybridse/src/vm/sql_compiler.h | 69 +-- hybridse/src/vm/transform.cc | 13 +- hybridse/src/vm/transform.h | 16 +- src/base/ddl_parser.cc | 499 +++--------------- src/base/ddl_parser.h | 3 - src/base/ddl_parser_test.cc | 164 ++++-- src/sdk/sql_cluster_router.cc | 4 +- 18 files changed, 461 insertions(+), 641 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d9d6188a088..dd52c512daa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,6 +86,11 @@ if (CMAKE_SYSTEM_NAME STREQUAL "Darwin") add_definitions(-Wno-\#pragma-messages) endif () add_definitions(-DHAVE_STDBOOL_H=1) + +# TODO(#1528): more strict flags +# Specifically enable checking null pointers for nonnull functions +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wnonnull -Werror=nonnull") + set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) diff --git a/cases/query/union_query.yml b/cases/query/union_query.yml index 1ad6ab41e27..f6c912a6561 100644 --- a/cases/query/union_query.yml +++ b/cases/query/union_query.yml @@ -1,5 +1,6 @@ cases: - id: 0 + deployable: true inputs: - name: t1 columns: ["id string","c2 int","c4 timestamp"] @@ -62,6 +63,7 @@ cases: cc, cc, 6 dd, dd, 7 - id: 1 + deployable: true desc: select project over union inputs: - name: t1 @@ -176,6 +178,7 @@ cases: cc, cc, 6 dd, dd, 7 - id: 3 + deployable: true desc: lastjoin(filter(union) inputs: - name: t1 @@ -239,6 +242,7 @@ cases: cc, cc, 3 dd, NULL, NULL - id: 4 + deployable: true desc: lastjoin(filter(union) inputs: - name: t1 @@ -300,6 +304,7 @@ cases: cc, NULL, NULL dd, NULL, NULL - id: 5 + deployable: true desc: union(filter(t2), filter(t3)) inputs: - name: t0 @@ -361,6 +366,7 @@ cases: 20, NULL, NULL, NULL, NULL 30, bb, 30, bb, 5 - id: 6 + deployable: true desc: nested union inputs: - name: t1 diff --git a/hybridse/include/vm/engine.h b/hybridse/include/vm/engine.h index e552e5889c6..6829de40040 100644 --- a/hybridse/include/vm/engine.h +++ b/hybridse/include/vm/engine.h @@ -17,21 +17,14 @@ #ifndef HYBRIDSE_INCLUDE_VM_ENGINE_H_ #define HYBRIDSE_INCLUDE_VM_ENGINE_H_ -#include #include -#include //NOLINT #include #include #include #include #include -#include "base/raw_buffer.h" #include "base/spin_lock.h" #include "codec/fe_row_codec.h" -#include "codec/list_iterator_codec.h" -#include "gflags/gflags.h" -#include "llvm-c/Target.h" -#include "proto/fe_common.pb.h" #include "vm/catalog.h" #include "vm/engine_context.h" #include "vm/router.h" @@ -183,12 +176,17 @@ class RunSession { options_ = options; } + void SetIndexHintsHandler(std::shared_ptr handler) { index_hints_ = handler; } + protected: std::shared_ptr compile_info_; hybridse::vm::EngineMode engine_mode_; bool is_debug_; std::string sp_name_; std::shared_ptr> options_ = nullptr; + + // [ALPHA] output possible diagnostic infos from compiler + std::shared_ptr index_hints_; friend Engine; }; diff --git a/hybridse/include/vm/engine_context.h b/hybridse/include/vm/engine_context.h index 9812087cb9e..ce82dca832b 100644 --- a/hybridse/include/vm/engine_context.h +++ b/hybridse/include/vm/engine_context.h @@ -38,6 +38,25 @@ struct BatchRequestInfo { std::set output_common_column_indices; }; +class IndexHintHandler { + public: + // report a index hint. + // + // a index hint is determined by a few things: + // 1. source database & source table: where possible index will create upon + // 2. keys and ts: suggested index info + // 3. epxr node: referring to the physical node contains possible optimizable expression, + // e.g. a ReqeustJoin node contains the join condition 't1.key = t2.key' that may do optimize + // + // TODO(ace): multiple index suggestion ? choose one. + // Say a join condition: 't1.key1 = t2.key1 and t1.key2 = t2.keys', those indexes all sufficient for t2: + // 1. key = key1 + // 2. key = key2 + // 3. key = key1 + key2 + virtual void Report(absl::string_view db, absl::string_view table, absl::Span keys, + absl::string_view ts, const PhysicalOpNode* expr_node); +}; + enum ComileType { kCompileSql, }; @@ -56,13 +75,10 @@ class CompileInfo { virtual const Schema& GetParameterSchema() const = 0; virtual const std::string& GetRequestName() const = 0; virtual const std::string& GetRequestDbName() const = 0; - virtual const hybridse::vm::BatchRequestInfo& GetBatchRequestInfo() - const = 0; + virtual const hybridse::vm::BatchRequestInfo& GetBatchRequestInfo() const = 0; virtual const hybridse::vm::PhysicalOpNode* GetPhysicalPlan() const = 0; - virtual void DumpPhysicalPlan(std::ostream& output, - const std::string& tab) = 0; - virtual void DumpClusterJob(std::ostream& output, - const std::string& tab) = 0; + virtual void DumpPhysicalPlan(std::ostream& output, const std::string& tab) = 0; + virtual void DumpClusterJob(std::ostream& output, const std::string& tab) = 0; }; /// @typedef EngineLRUCache diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h index 3f84f18f5d0..030d71dcb53 100644 --- a/hybridse/include/vm/physical_op.h +++ b/hybridse/include/vm/physical_op.h @@ -478,6 +478,22 @@ class PhysicalOpNode : public node::NodeBase { */ size_t GetNodeId() const { return node_id(); } + // Return this node cast as a NodeType. + // Use only when this node is known to be that type, otherwise, behavior is undefined. + template + const NodeType *GetAsOrNull() const { + static_assert(std::is_base_of::value, + "NodeType must be a member of the PhysicalOpNode class hierarchy"); + return dynamic_cast(this); + } + + template + NodeType *GetAsOrNull() { + static_assert(std::is_base_of::value, + "NodeType must be a member of the PhysicalOpNode class hierarchy"); + return dynamic_cast(this); + } + protected: const PhysicalOpType type_; const bool is_block_; diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.cc b/hybridse/src/passes/physical/group_and_sort_optimized.cc index 135cdbfd87b..7c0a0d90c33 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.cc +++ b/hybridse/src/passes/physical/group_and_sort_optimized.cc @@ -122,6 +122,12 @@ bool GroupAndSortOptimized::Transform(PhysicalOpNode* in, PhysicalOpNode** output) { *output = in; TransformCxtGuard guard(&ctx_, KeysInfo(in->GetOpType(), nullptr, nullptr, nullptr, nullptr)); + + // save current optimizing node (where expressions comes from) during optimization + auto* prev_node = cur_optimizing_; + cur_optimizing_ = in; + absl::Cleanup reset = [prev_node, this]() { cur_optimizing_ = prev_node; }; + switch (in->GetOpType()) { case PhysicalOpType::kPhysicalOpGroupBy: { PhysicalGroupNode* group_op = dynamic_cast(in); @@ -304,6 +310,17 @@ bool GroupAndSortOptimized::Transform(PhysicalOpNode* in, return false; } +/** + * + * @param root_schemas_ctx the SchemasContext where left_key, index_key, right_key and sort can resolving on + * @param in Possible optimizable node + * @param left_key + * @param index_key + * @param right_key + * @param sort + * @param new_in optimized physical node for `in` + * @return true if optimized, new_in set to optimized node + */ bool GroupAndSortOptimized::KeysOptimized(const vm::SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* left_key, @@ -399,12 +416,42 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas PhysicalPartitionProviderNode* partition_op = nullptr; std::string index_name; + std::vector keys; + std::vector orders; + std::map result_bitmap_mapping; + + auto resolved_keys = ResolveExprToSrcColumnName(right_partition, scan_op); + for (size_t i = 0; i < resolved_keys.size(); ++i) { + if (resolved_keys[i].has_value()) { + result_bitmap_mapping[keys.size()] = i; + keys.push_back(resolved_keys[i].value()); + } + } + if (keys.empty()) { + return false; + } + if (nullptr != sort && sort->orders() != nullptr) { + auto resolved_orders = ResolveExprToSrcColumnName(sort->orders()->order_expressions(), scan_op); + for (auto& val : resolved_orders) { + if (val.has_value()) { + orders.push_back(val.value()); + } + } + + if (orders.size() != 1) { + return false; + } + } + if (DataProviderType::kProviderTypeTable == scan_op->provider_type_) { // Apply key columns and order column optimization with all indexes binding to scan_op->table_handler_ // Return false if fail to find an appropriate index - if (!TransformKeysAndOrderExpr( right_partition, - nullptr == sort ? nullptr : sort->orders_, scan_op, - &index_name, &bitmap)) { + if (!TransformKeysAndOrderExpr(keys, orders, scan_op, result_bitmap_mapping, &index_name, &bitmap)) { + // optimize failed ? report a index hint + if (plan_ctx_->index_hints() != nullptr) { + plan_ctx_->index_hints()->Report(scan_op->GetDb(), scan_op->GetName(), keys, + orders.empty() ? "" : orders.at(0), cur_optimizing_); + } return false; } Status status = plan_ctx_->CreateOp(&partition_op, scan_op, index_name); @@ -417,9 +464,7 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas index_name = partition_op->index_name_; // Apply key columns and order column optimization with given index name // Return false if given index do not match the keys and order column - if (!TransformKeysAndOrderExpr( right_partition, - nullptr == sort ? nullptr : sort->orders_, scan_op, - &index_name, &bitmap)) { + if (!TransformKeysAndOrderExpr(keys, orders, scan_op, result_bitmap_mapping, &index_name, &bitmap)) { return false; } } @@ -729,7 +774,8 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas } vm::PhysicalSetOperationNode* opt_set = nullptr; if (!plan_ctx_ - ->CreateOp(&opt_set, set_op->set_type_, opt_inputs, set_op->distinct_) + ->CreateOp(&opt_set, set_op->set_type_, opt_inputs, + set_op->distinct_) .isOK()) { return false; } @@ -806,95 +852,23 @@ bool GroupAndSortOptimized::KeyAndOrderOptimized( sort, new_in); } -bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const node::ExprListNode* groups, const node::OrderByNode* order, - vm::PhysicalDataProviderNode* data_node, std::string* index_name, - IndexBitMap* output_bitmap) { - if (nullptr == groups || nullptr == output_bitmap || nullptr == index_name) { - DLOG(WARNING) << "fail to transform keys expr : key expr or output " - "or index_name ptr is null"; - return false; - } - - if (nullptr == order) { - DLOG(INFO) << "keys optimized: " << node::ExprString(groups); - } else { - DLOG(INFO) << "keys and order optimized: keys=" << node::ExprString(groups) - << ", order=" << node::ExprString(order); - } - std::vector columns; - std::vector order_columns; - std::map result_bitmap_mapping; - - for (size_t i = 0; i < groups->children_.size(); ++i) { - auto group = groups->children_[i]; - switch (group->expr_type_) { - case node::kExprColumnRef: { - auto column = dynamic_cast(group); - auto oop = expr_cache_.find(column); - if (oop == expr_cache_.end()) { - return false; - } - - auto op = oop->second.find(data_node); - if (op == oop->second.end()) { - return false; - } - - if (data_node-> table_handler_->GetName() != op->second.tb_name || - data_node->table_handler_->GetDatabase() != op->second.db_name) { - return false; - } - - result_bitmap_mapping[columns.size()] = i; - columns.emplace_back(op->second.col_name); - break; - } - default: { - break; - } - } - } - - if (nullptr != order) { - for (size_t i = 0; i < order->order_expressions()->GetChildNum(); ++i) { - auto expr = order->GetOrderExpressionExpr(i); - if (nullptr != expr && expr->GetExprType() == node::kExprColumnRef) { - auto column = dynamic_cast(expr); - auto oop = expr_cache_.find(column); - if (oop == expr_cache_.end()) { - return false; - } - - auto op = oop->second.find(data_node); - if (op == oop->second.end()) { - return false; - } - - if (data_node->table_handler_->GetName() != op->second.tb_name || - data_node->table_handler_->GetDatabase() != op->second.db_name) { - return false; - } - - order_columns.emplace_back(op->second.col_name); - } - } - } - if (columns.empty()) { - return false; - } - +bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const std::vector& keys, + const std::vector& orders, + vm::PhysicalDataProviderNode* data_node, + const std::map& result_bitmap_mapping, + std::string* index_name, IndexBitMap* output_bitmap) { IndexBitMap match_bitmap; // internal structure for MatchBestIndex, initially turn every bit true - IndexBitMap state_bitmap(std::vector>(columns.size(), std::make_optional(0))); - if (!MatchBestIndex(columns, order_columns, data_node->table_handler_, &state_bitmap, index_name, &match_bitmap)) { + IndexBitMap state_bitmap(std::vector>(keys.size(), std::make_optional(0))); + if (!MatchBestIndex(keys, orders, data_node->table_handler_, &state_bitmap, index_name, &match_bitmap)) { return false; } - if (match_bitmap.bitmap.size() != columns.size()) { + if (match_bitmap.bitmap.size() != keys.size()) { return false; } - for (size_t i = 0; i < columns.size(); ++i) { + for (size_t i = 0; i < keys.size(); ++i) { if (match_bitmap.bitmap[i].has_value()) { - size_t origin_idx = result_bitmap_mapping[i]; + size_t origin_idx = result_bitmap_mapping.at(i); output_bitmap->bitmap.at(origin_idx) = match_bitmap.bitmap[i].value(); } } @@ -1023,5 +997,50 @@ bool GroupAndSortOptimized::MatchBestIndex(const std::vector& colum return succ; } +std::vector> GroupAndSortOptimized::ResolveExprToSrcColumnName( + const node::ExprListNode* exprs, vm::PhysicalDataProviderNode* data_node) { + std::vector> columns; + for (auto node : exprs->children_) { + columns.push_back(ResolveExprToSrcColumnName(node, data_node)); + } + + return columns; +} + +std::optional GroupAndSortOptimized::ResolveExprToSrcColumnName(const node::ExprNode* node, + vm::PhysicalDataProviderNode* data_node) { + switch (node->expr_type_) { + case node::kExprColumnRef: { + auto ref = dynamic_cast(node); + + auto oop = expr_cache_.find(ref); + if (oop == expr_cache_.end()) { + return std::nullopt; + } + + auto op = oop->second.find(data_node); + if (op == oop->second.end()) { + return std::nullopt; + } + + if (data_node->table_handler_->GetName() != op->second.tb_name || + data_node->table_handler_->GetDatabase() != op->second.db_name) { + return std::nullopt; + } + + return std::make_optional(op->second.col_name); + } + case node::kExprOrderExpression: { + auto order_expr = dynamic_cast(node); + return ResolveExprToSrcColumnName(order_expr->expr(), data_node); + } + default: { + break; + } + } + + return std::nullopt; +} + } // namespace passes } // namespace hybridse diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.h b/hybridse/src/passes/physical/group_and_sort_optimized.h index 69b0fcd2b36..9c6754f76b3 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.h +++ b/hybridse/src/passes/physical/group_and_sort_optimized.h @@ -17,6 +17,7 @@ #define HYBRIDSE_SRC_PASSES_PHYSICAL_GROUP_AND_SORT_OPTIMIZED_H_ #include +#include #include #include #include @@ -138,9 +139,10 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { PhysicalOpNode* in, Key* group, PhysicalOpNode** new_in); - bool TransformKeysAndOrderExpr(const node::ExprListNode* groups, const node::OrderByNode* order, - vm::PhysicalDataProviderNode* data_node, std::string* index, - IndexBitMap* best_bitmap); + bool TransformKeysAndOrderExpr(const std::vector& keys, const std::vector& orders, + vm::PhysicalDataProviderNode* data_node, + const std::map& result_bitmap_mapping, std::string* index_name, + IndexBitMap* output_bitmap); bool MatchBestIndex(const std::vector& columns, const std::vector& order_columns, std::shared_ptr table_handler, @@ -148,6 +150,10 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { std::string* index_name, IndexBitMap* best_bitmap); + std::vector> ResolveExprToSrcColumnName(const node::ExprListNode*, + vm::PhysicalDataProviderNode*); + std::optional ResolveExprToSrcColumnName(const node::ExprNode*, vm::PhysicalDataProviderNode*); + absl::Status BuildExprCache(const node::ExprNode* node, const SchemasContext* sc); private: @@ -160,6 +166,9 @@ class GroupAndSortOptimized : public TransformUpPysicalPass { expr_cache_; std::unique_ptr optimize_info_; + + // The PhysicalOpNode where current optimizing expression comes from + const PhysicalOpNode* cur_optimizing_ = nullptr; }; } // namespace passes } // namespace hybridse diff --git a/hybridse/src/sdk/hybridse_interface_core.i b/hybridse/src/sdk/hybridse_interface_core.i index 84ed798cf5a..660f9bac7a1 100644 --- a/hybridse/src/sdk/hybridse_interface_core.i +++ b/hybridse/src/sdk/hybridse_interface_core.i @@ -35,6 +35,7 @@ SWIG_JAVABODY_PROXY(public, public, SWIGTYPE) %shared_ptr(hybridse::vm::SimpleCatalog); %shared_ptr(hybridse::vm::CompileInfo); %shared_ptr(hybridse::vm::SqlCompileInfo); +%shared_ptr(hybridse::vm::IndexHintHandler); %typemap(jni) hybridse::vm::RawPtrHandle "jlong" %typemap(jtype) hybridse::vm::RawPtrHandle "long" @@ -138,6 +139,7 @@ using hybridse::vm::Key; using hybridse::vm::WindowOp; using hybridse::vm::EngineMode; using hybridse::vm::EngineOptions; +using hybridse::vm::IndexHintHandler; using hybridse::base::Iterator; using hybridse::base::ConstIterator; using hybridse::base::Trace; diff --git a/hybridse/src/vm/engine.cc b/hybridse/src/vm/engine.cc index 97eae8a9062..c0d9be8c333 100644 --- a/hybridse/src/vm/engine.cc +++ b/hybridse/src/vm/engine.cc @@ -160,6 +160,7 @@ bool Engine::Get(const std::string& sql, const std::string& db, RunSession& sess sql_context.enable_expr_optimize = options_.IsEnableExprOptimize(); sql_context.jit_options = options_.jit_options(); sql_context.options = session.GetOptions(); + sql_context.index_hints_ = session.index_hints_; if (session.engine_mode() == kBatchMode) { sql_context.parameter_types = dynamic_cast(&session)->GetParameterSchema(); } else if (session.engine_mode() == kBatchRequestMode) { diff --git a/hybridse/src/vm/physical_plan_context.h b/hybridse/src/vm/physical_plan_context.h index 122fac18c0a..38798e9a43c 100644 --- a/hybridse/src/vm/physical_plan_context.h +++ b/hybridse/src/vm/physical_plan_context.h @@ -19,13 +19,14 @@ #include #include #include +#include #include #include -#include #include "base/fe_status.h" #include "node/node_manager.h" #include "udf/udf_library.h" +#include "vm/engine_context.h" namespace hybridse { namespace vm { @@ -34,19 +35,18 @@ using hybridse::base::Status; class PhysicalPlanContext { public: - PhysicalPlanContext(node::NodeManager* nm, const udf::UdfLibrary* library, - const std::string& db, - const std::shared_ptr& catalog, - const codec::Schema* parameter_types, - bool enable_expr_opt, - const std::unordered_map* options = nullptr) + PhysicalPlanContext(node::NodeManager* nm, const udf::UdfLibrary* library, const std::string& db, + const std::shared_ptr& catalog, const codec::Schema* parameter_types, + bool enable_expr_opt, const std::unordered_map* options = nullptr, + std::shared_ptr index_hints = nullptr) : nm_(nm), library_(library), db_(db), catalog_(catalog), parameter_types_(parameter_types), enable_expr_opt_(enable_expr_opt), - options_(options) {} + options_(options), + index_hints_(index_hints) {} ~PhysicalPlanContext() {} /** @@ -133,6 +133,8 @@ class PhysicalPlanContext { // TODO(xxx): support udf type infer std::map legacy_udf_dict_; + std::shared_ptr index_hints() { return index_hints_; } + private: node::NodeManager* nm_; const udf::UdfLibrary* library_; @@ -160,6 +162,10 @@ class PhysicalPlanContext { bool enable_expr_opt_ = false; const std::unordered_map* options_ = nullptr; + + // possible index suggestion to optimize the query performance + // not standardized, maybe Diagnostic info ? + std::shared_ptr index_hints_; }; } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/sql_compiler.cc b/hybridse/src/vm/sql_compiler.cc index 4c819238a6a..c686e1401b4 100644 --- a/hybridse/src/vm/sql_compiler.cc +++ b/hybridse/src/vm/sql_compiler.cc @@ -76,8 +76,7 @@ bool SqlCompiler::Compile(SqlContext& ctx, Status& status) { // NOLINT auto m = ::llvm::make_unique<::llvm::Module>("sql", *llvm_ctx); ctx.udf_library = udf::DefaultUdfLibrary::get(); - status = - BuildPhysicalPlan(&ctx, ctx.logical_plan, m.get(), &ctx.physical_plan); + status = BuildPhysicalPlan(&ctx, ctx.logical_plan, m.get(), &ctx.physical_plan); if (!status.isOK()) { return false; } @@ -160,7 +159,7 @@ Status SqlCompiler::BuildBatchModePhysicalPlan(SqlContext* ctx, const ::hybridse vm::BatchModeTransformer transformer(&ctx->nm, ctx->db, cl_, &ctx->parameter_types, llvm_module, library, ctx->is_cluster_optimized, ctx->enable_expr_optimize, ctx->enable_batch_window_parallelization, ctx->enable_window_column_pruning, - ctx->options.get()); + ctx->options.get(), ctx->index_hints_); transformer.AddDefaultPasses(); CHECK_STATUS(transformer.TransformPhysicalPlan(plan_list, output), "Fail to generate physical plan batch mode"); ctx->schema = *(*output)->GetOutputSchema(); @@ -173,7 +172,7 @@ Status SqlCompiler::BuildRequestModePhysicalPlan(SqlContext* ctx, const ::hybrid PhysicalOpNode** output) { vm::RequestModeTransformer transformer(&ctx->nm, ctx->db, cl_, &ctx->parameter_types, llvm_module, library, {}, ctx->is_cluster_optimized, false, ctx->enable_expr_optimize, - enable_request_performance_sensitive, ctx->options.get()); + enable_request_performance_sensitive, ctx->options.get(), ctx->index_hints_); if (ctx->options && ctx->options->count(LONG_WINDOWS)) { transformer.AddPass(passes::kPassSplitAggregationOptimized); transformer.AddPass(passes::kPassLongWindowOptimized); @@ -195,9 +194,9 @@ Status SqlCompiler::BuildBatchRequestModePhysicalPlan(SqlContext* ctx, const ::h ::llvm::Module* llvm_module, udf::UdfLibrary* library, PhysicalOpNode** output) { vm::RequestModeTransformer transformer(&ctx->nm, ctx->db, cl_, &ctx->parameter_types, llvm_module, library, - ctx->batch_request_info.common_column_indices, - ctx->is_cluster_optimized, ctx->is_batch_request_optimized, - ctx->enable_expr_optimize, true, ctx->options.get()); + ctx->batch_request_info.common_column_indices, ctx->is_cluster_optimized, + ctx->is_batch_request_optimized, ctx->enable_expr_optimize, true, + ctx->options.get(), ctx->index_hints_); if (ctx->options && ctx->options->count(LONG_WINDOWS)) { transformer.AddPass(passes::kPassSplitAggregationOptimized); transformer.AddPass(passes::kPassLongWindowOptimized); @@ -255,9 +254,8 @@ Status SqlCompiler::BuildBatchRequestModePhysicalPlan(SqlContext* ctx, const ::h return Status::OK(); } -Status SqlCompiler::BuildPhysicalPlan( - SqlContext* ctx, const ::hybridse::node::PlanNodeList& plan_list, - ::llvm::Module* llvm_module, PhysicalOpNode** output) { +Status SqlCompiler::BuildPhysicalPlan(SqlContext* ctx, const ::hybridse::node::PlanNodeList& plan_list, + ::llvm::Module* llvm_module, PhysicalOpNode** output) { Status status; CHECK_TRUE(ctx != nullptr, kPlanError, "Null sql context"); @@ -266,23 +264,19 @@ Status SqlCompiler::BuildPhysicalPlan( switch (ctx->engine_mode) { case kBatchMode: { - CHECK_STATUS(BuildBatchModePhysicalPlan(ctx, plan_list, llvm_module, - library, output)); + CHECK_STATUS(BuildBatchModePhysicalPlan(ctx, plan_list, llvm_module, library, output)); break; } case kMockRequestMode: { - CHECK_STATUS(BuildRequestModePhysicalPlan(ctx, plan_list, false, llvm_module, - library, output)); + CHECK_STATUS(BuildRequestModePhysicalPlan(ctx, plan_list, false, llvm_module, library, output)); break; } case kRequestMode: { - CHECK_STATUS(BuildRequestModePhysicalPlan(ctx, plan_list, true, llvm_module, - library, output)); + CHECK_STATUS(BuildRequestModePhysicalPlan(ctx, plan_list, true, llvm_module, library, output)); break; } case kBatchRequestMode: { - CHECK_STATUS(BuildBatchRequestModePhysicalPlan( - ctx, plan_list, llvm_module, library, output)); + CHECK_STATUS(BuildBatchRequestModePhysicalPlan(ctx, plan_list, llvm_module, library, output)); break; } default: diff --git a/hybridse/src/vm/sql_compiler.h b/hybridse/src/vm/sql_compiler.h index 5d4b78e8ea2..a70f5275276 100644 --- a/hybridse/src/vm/sql_compiler.h +++ b/hybridse/src/vm/sql_compiler.h @@ -28,6 +28,7 @@ #include "vm/engine_context.h" #include "vm/jit_wrapper.h" #include "vm/physical_op.h" +#include "vm/physical_plan_context.h" #include "vm/runner.h" namespace hybridse { @@ -75,6 +76,10 @@ struct SqlContext { std::shared_ptr> options; + // [ALPHA] SQL diagnostic infos + // not standardized, only index hints, no error, no warning, no other hint/info + std::shared_ptr index_hints_; + SqlContext() {} ~SqlContext() {} }; @@ -82,63 +87,39 @@ struct SqlContext { class SqlCompileInfo : public CompileInfo { public: SqlCompileInfo() : sql_ctx() {} - virtual ~SqlCompileInfo() {} + ~SqlCompileInfo() override {} + hybridse::vm::SqlContext& get_sql_context() { return this->sql_ctx; } - bool GetIRBuffer(const base::RawBuffer& buf) { + bool GetIRBuffer(const base::RawBuffer& buf) override { auto& str = this->sql_ctx.ir; return buf.CopyFrom(str.data(), str.size()); } - size_t GetIRSize() { return this->sql_ctx.ir.size(); } + size_t GetIRSize() override { return this->sql_ctx.ir.size(); } - const hybridse::vm::Schema& GetSchema() const { return sql_ctx.schema; } + const hybridse::vm::Schema& GetSchema() const override { return sql_ctx.schema; } - const hybridse::vm::ComileType GetCompileType() const { - return ComileType::kCompileSql; - } - const hybridse::vm::EngineMode GetEngineMode() const { - return sql_ctx.engine_mode; - } - const std::string& GetEncodedSchema() const { - return sql_ctx.encoded_schema; - } + const hybridse::vm::ComileType GetCompileType() const override { return ComileType::kCompileSql; } + const hybridse::vm::EngineMode GetEngineMode() const override { return sql_ctx.engine_mode; } + const std::string& GetEncodedSchema() const override { return sql_ctx.encoded_schema; } - const std::string& GetSql() const { return sql_ctx.sql; } + const std::string& GetSql() const override { return sql_ctx.sql; } - virtual const Schema& GetRequestSchema() const { - return sql_ctx.request_schema; - } - virtual const Schema& GetParameterSchema() const { - return sql_ctx.parameter_types; - } - virtual const std::string& GetRequestName() const { - return sql_ctx.request_name; - } - virtual const std::string& GetRequestDbName() const { - return sql_ctx.request_db_name; - } - virtual const hybridse::vm::BatchRequestInfo& GetBatchRequestInfo() const { - return sql_ctx.batch_request_info; - } - virtual const hybridse::vm::PhysicalOpNode* GetPhysicalPlan() const { - return sql_ctx.physical_plan; - } - virtual hybridse::vm::Runner* GetMainTask() { - return sql_ctx.cluster_job.GetMainTask().GetRoot(); - } - virtual hybridse::vm::ClusterJob& GetClusterJob() { - return sql_ctx.cluster_job; - } - virtual void DumpPhysicalPlan(std::ostream& output, - const std::string& tab) { + const Schema& GetRequestSchema() const override { return sql_ctx.request_schema; } + const Schema& GetParameterSchema() const override { return sql_ctx.parameter_types; } + const std::string& GetRequestName() const override { return sql_ctx.request_name; } + const std::string& GetRequestDbName() const override { return sql_ctx.request_db_name; } + const hybridse::vm::BatchRequestInfo& GetBatchRequestInfo() const override { return sql_ctx.batch_request_info; } + const hybridse::vm::PhysicalOpNode* GetPhysicalPlan() const override { return sql_ctx.physical_plan; } + hybridse::vm::Runner* GetMainTask() { return sql_ctx.cluster_job.GetMainTask().GetRoot(); } + hybridse::vm::ClusterJob& GetClusterJob() { return sql_ctx.cluster_job; } + void DumpPhysicalPlan(std::ostream& output, const std::string& tab) override { sql_ctx.physical_plan->Print(output, tab); } - virtual void DumpClusterJob(std::ostream& output, const std::string& tab) { + void DumpClusterJob(std::ostream& output, const std::string& tab) override { sql_ctx.cluster_job.Print(output, tab); } - static SqlCompileInfo* CastFrom(CompileInfo* node) { - return dynamic_cast(node); - } + static SqlCompileInfo* CastFrom(CompileInfo* node) { return dynamic_cast(node); } private: hybridse::vm::SqlContext sql_ctx; diff --git a/hybridse/src/vm/transform.cc b/hybridse/src/vm/transform.cc index 8312f95a243..82e96b3c094 100644 --- a/hybridse/src/vm/transform.cc +++ b/hybridse/src/vm/transform.cc @@ -81,11 +81,12 @@ BatchModeTransformer::BatchModeTransformer(node::NodeManager* node_manager, cons const udf::UdfLibrary* library, bool cluster_optimized_mode, bool enable_expr_opt, bool enable_window_parallelization, bool enable_window_column_pruning, - const std::unordered_map* options) + const std::unordered_map* options, + std::shared_ptr index_hints) : node_manager_(node_manager), db_(db), catalog_(catalog), - plan_ctx_(node_manager, library, db, catalog, parameter_types, enable_expr_opt, options), + plan_ctx_(node_manager, library, db, catalog, parameter_types, enable_expr_opt, options, index_hints), module_(module), id_(0), cluster_optimized_mode_(cluster_optimized_mode), @@ -2350,10 +2351,12 @@ RequestModeTransformer::RequestModeTransformer(node::NodeManager* node_manager, udf::UdfLibrary* library, const std::set& common_column_indices, const bool cluster_optimized, const bool enable_batch_request_opt, bool enable_expr_opt, bool performance_sensitive, - const std::unordered_map* options) + const std::unordered_map* options, + std::shared_ptr hints) : BatchModeTransformer(node_manager, db, catalog, parameter_types, module, library, cluster_optimized, - enable_expr_opt, true, false, options), - enable_batch_request_opt_(enable_batch_request_opt), performance_sensitive_(performance_sensitive) { + enable_expr_opt, true, false, options, hints), + enable_batch_request_opt_(enable_batch_request_opt), + performance_sensitive_(performance_sensitive) { batch_request_info_.common_column_indices = common_column_indices; } diff --git a/hybridse/src/vm/transform.h b/hybridse/src/vm/transform.h index acd86be587f..eb88f8d8759 100644 --- a/hybridse/src/vm/transform.h +++ b/hybridse/src/vm/transform.h @@ -106,11 +106,11 @@ class BatchModeTransformer { public: BatchModeTransformer(node::NodeManager* node_manager, const std::string& db, const std::shared_ptr& catalog, const codec::Schema* parameter_types, - ::llvm::Module* module, const udf::UdfLibrary* library, - bool cluster_optimized_mode = false, bool enable_expr_opt = false, - bool enable_window_parallelization = true, + ::llvm::Module* module, const udf::UdfLibrary* library, bool cluster_optimized_mode = false, + bool enable_expr_opt = false, bool enable_window_parallelization = true, bool enable_window_column_pruning = false, - const std::unordered_map* options = nullptr); + const std::unordered_map* options = nullptr, + std::shared_ptr = nullptr); virtual ~BatchModeTransformer(); bool AddDefaultPasses(); @@ -284,10 +284,10 @@ class RequestModeTransformer : public BatchModeTransformer { RequestModeTransformer(node::NodeManager* node_manager, const std::string& db, const std::shared_ptr& catalog, const codec::Schema* parameter_types, ::llvm::Module* module, udf::UdfLibrary* library, - const std::set& common_column_indices, - const bool cluster_optimized, const bool enable_batch_request_opt, bool enable_expr_opt, - bool performance_sensitive = true, - const std::unordered_map* options = nullptr); + const std::set& common_column_indices, const bool cluster_optimized, + const bool enable_batch_request_opt, bool enable_expr_opt, bool performance_sensitive = true, + const std::unordered_map* options = nullptr, + std::shared_ptr = nullptr); virtual ~RequestModeTransformer(); const Schema& request_schema() const { return request_schema_; } diff --git a/src/base/ddl_parser.cc b/src/base/ddl_parser.cc index c69b6b502ed..02ef5b315e4 100644 --- a/src/base/ddl_parser.cc +++ b/src/base/ddl_parser.cc @@ -23,16 +23,14 @@ #include #include -#include "absl/strings/match.h" #include "codec/schema_codec.h" -#include "common/timer.h" #include "google/protobuf/util/message_differencer.h" #include "node/node_manager.h" #include "plan/plan_api.h" #include "proto/common.pb.h" #include "proto/fe_type.pb.h" +#include "proto/type.pb.h" #include "sdk/base_impl.h" -#include "sdk/sql_insert_row.h" #include "vm/physical_op.h" namespace openmldb::base { @@ -47,27 +45,20 @@ using hybridse::vm::PhysicalOpType; using hybridse::vm::SchemasContext; using hybridse::vm::Sort; -// Ref hybridse/src/passes/physical/group_and_sort_optimized.cc:651 -// TODO(hw): hybridse should open this method -bool ResolveColumnToSourceColumnName(const hybridse::node::ColumnRefNode* col, const SchemasContext* schemas_ctx, - std::string* source_name); - -class IndexMapBuilder { +class IndexMapBuilder final : public ::hybridse::vm::IndexHintHandler { public: IndexMapBuilder() = default; - // Create the index with unset TTLSt, return false if the index(same table, same keys, same ts) existed - bool CreateIndex(const std::shared_ptr& table, const hybridse::node::ExprListNode* keys, - const hybridse::node::OrderByNode* ts, const SchemasContext* ctx); - bool UpdateIndex(const hybridse::vm::Range& range); - // After ToMap, inner data will be cleared + + void Report(absl::string_view db, absl::string_view table, absl::Span keys, absl::string_view ts, + const PhysicalOpNode* expr_node) override; + MultiDBIndexMap ToMap(); private: - static std::vector NormalizeColumns(const std::vector& nodes, - const SchemasContext* ctx); + void UpdateTTLByWindow(const hybridse::vm::WindowOp&, common::TTLSt*); // db, table, keys and ts -> db$table:key1,key2,...;ts - std::string Encode(const std::string& db, const std::string& table, const hybridse::node::ExprListNode* keys, - const hybridse::node::OrderByNode* ts, const SchemasContext* ctx); + std::string Encode(absl::string_view db, absl::string_view table, absl::Span keys, + absl::string_view ts); // return db, table, index_str(key1,key2,...;ts), column_key static std::tuple Decode(const std::string& index_str); @@ -102,80 +93,10 @@ class IndexMapBuilder { static constexpr char TS_MARK = ';'; uint64_t index_id_ = 0; - std::string latest_record_; // map std::map index_map_; }; -// no plan_ctx_, node_manager_: we assume that creating new op won't affect the upper level structure. -class GroupAndSortOptimizedParser { - public: - GroupAndSortOptimizedParser() = default; - - // LRD - void Parse(PhysicalOpNode* cur_op) { - if (!cur_op) { - LOG(DFATAL) << "parse nullptr"; - return; - } - - // just parse, won't modify, but need to cast ptr, so we use non-const producers. - auto& producers = cur_op->producers(); - for (auto& producer : producers) { - Parse(producer); - } - - DLOG(INFO) << "parse " << hybridse::vm::PhysicalOpTypeName(cur_op->GetOpType()); - TransformParse(cur_op); - } - - MultiDBIndexMap GetIndexes() { return index_map_builder_.ToMap(); } - - private: - // recursive parse, return true iff kProviderTypeTable optimized - // new_in is useless, but we keep it, GroupAndSortOptimizedParser will be more similar to GroupAndSortOptimized. - bool KeysOptimizedParse(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* left_key, Key* index_key, - Key* right_key, Sort* sort, PhysicalOpNode** new_in); - - bool KeysAndOrderFilterOptimizedParse(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* group, - Key* hash, Sort* sort, PhysicalOpNode** new_in) { - return KeysOptimizedParse(root_schemas_ctx, in, group, hash, nullptr, sort, new_in); - } - - bool JoinKeysOptimizedParse(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Join* join, - PhysicalOpNode** new_in) { - if (nullptr == join) { - return false; - } - return FilterAndOrderOptimizedParse(root_schemas_ctx, in, join, &join->right_sort_, new_in); - } - bool FilterAndOrderOptimizedParse(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Filter* filter, - Sort* sort, PhysicalOpNode** new_in) { - return KeysOptimizedParse(root_schemas_ctx, in, &filter->left_key_, &filter->index_key_, &filter->right_key_, - sort, new_in); - } - bool FilterOptimizedParse(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Filter* filter, - PhysicalOpNode** new_in) { - return FilterAndOrderOptimizedParse(root_schemas_ctx, in, filter, nullptr, new_in); - } - bool KeyAndOrderOptimizedParse(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* group, Sort* sort, - PhysicalOpNode** new_in) { - Key mock_key; - return KeysAndOrderFilterOptimizedParse(root_schemas_ctx, in, group, &mock_key, sort, new_in); - } - bool GroupOptimizedParse(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, Key* group, - PhysicalOpNode** new_in) { - return KeyAndOrderOptimizedParse(root_schemas_ctx, in, group, nullptr, new_in); - } - - static std::vector InitJoinList(hybridse::vm::PhysicalWindowAggrerationNode* op); - - void TransformParse(PhysicalOpNode* in); - - private: - IndexMapBuilder index_map_builder_; -}; - // multi database MultiDBIndexMap DDLParser::ExtractIndexes(const std::string& sql, const std::string& used_db, const MultiDBTableDescMap& schemas) { @@ -186,13 +107,19 @@ MultiDBIndexMap DDLParser::ExtractIndexes(const std::string& sql, const std::str MultiDBIndexMap DDLParser::ExtractIndexes(const std::string& sql, const std::string& used_db, const std::shared_ptr& catalog) { hybridse::vm::MockRequestRunSession session; - if (!GetPlan(sql, used_db, catalog, &session)) { - LOG(ERROR) << "sql get plan failed"; - return {}; - } - auto compile_info = session.GetCompileInfo(); - auto plan = session.GetCompileInfo()->GetPhysicalPlan(); - return ParseIndexes(const_cast(plan)); + auto index_hints = std::make_shared(); + session.SetIndexHintsHandler(index_hints); + + ::hybridse::vm::Engine::InitializeGlobalLLVM(); + ::hybridse::vm::EngineOptions options; + options.SetKeepIr(true); + options.SetCompileOnly(true); + auto engine = std::make_shared(catalog, options); + + hybridse::base::Status status; + engine->Get(sql, used_db, session, status); + + return index_hints->ToMap(); } std::string DDLParser::PhysicalPlan(const std::string& sql, const ::hybridse::type::Database& db) { @@ -425,19 +352,6 @@ std::shared_ptr DDLParser::GetOutputSchema( return std::make_shared(*output_schema_ptr); } -// SQL transform added pass, ref AddDefaultPasses(): kPassSimpleProjectsOptimized, kPassFilterOptimized, -// kPassLeftJoinOptimized, kPassGroupAndSortOptimized, kPassLimitOptimized, kPassClusterOptimized. -// kPassGroupAndSortOptimized & kPassLeftJoinOptimized need index support, but only kPassGroupAndSortOptimized will -// find best index. So we just do GroupAndSortOptimizedParser -// TODO(hw): what about kPassClusterOptimized(cluster_optimized_mode_)? -MultiDBIndexMap DDLParser::ParseIndexes(hybridse::vm::PhysicalOpNode* node) { - // This physical plan has already been optimized, but no real optimization about index(cuz no index in fake - // catalog). So we can run GroupAndSortOptimizedParser on the plan(very like transformer's pass-ApplyPasses) - GroupAndSortOptimizedParser parser; - parser.Parse(node); - return parser.GetIndexes(); -} - bool DDLParser::GetPlan(const std::string& sql, const std::string& db, const std::shared_ptr& catalog, hybridse::vm::RunSession* session) { @@ -523,28 +437,64 @@ std::vector DDLParser::ValidateSQLInRequest(const std::string& sql, return ValidateSQLInRequest(sql, db, catalog); } -bool IndexMapBuilder::CreateIndex(const std::shared_ptr& table, - const hybridse::node::ExprListNode* keys, const hybridse::node::OrderByNode* ts, - const SchemasContext* ctx) { +void IndexMapBuilder::Report(absl::string_view db, absl::string_view table, absl::Span keys, + absl::string_view ts, const PhysicalOpNode* expr_node) { // we encode table, keys and ts to one string - auto index = Encode(table->GetDatabase(), table->GetName(), keys, ts, ctx); + auto index = Encode(db, table, keys, ts); if (index.empty()) { LOG(WARNING) << "index encode failed for table " << table; - return false; + return; } if (index_map_.find(index) != index_map_.end()) { // index id has unique idx, can't be dup. It's a weird case LOG(DFATAL) << "index " << index << " existed in cache"; - return false; + return; } - DLOG(INFO) << "create index with unset ttl: " << index; // default TTLSt is abs and ttl=0, rows will never expire. // default TTLSt debug string is {}, but if we get, they will be the default values. - index_map_[index] = new common::TTLSt; - latest_record_ = index; - return true; + auto* ttl = new common::TTLSt; + + if (expr_node != nullptr) { + switch (expr_node->GetOpType()) { + case hybridse::vm::kPhysicalOpRequestUnion: { + auto ru = expr_node->GetAsOrNull(); + UpdateTTLByWindow(ru->window(), ttl); + break; + } + case hybridse::vm::kPhysicalOpProject: { + auto ru = expr_node->GetAsOrNull(); + if (ru != nullptr && ru->project_type_ == hybridse::vm::ProjectType::kWindowAggregation) { + auto win_project = ru->GetAsOrNull(); + UpdateTTLByWindow(win_project->window_, ttl); + } + break; + } + case hybridse::vm::kPhysicalOpRequestJoin: { + auto join_node = expr_node->GetAsOrNull(); + if (join_node->join().join_type() == hybridse::node::JoinType::kJoinTypeLeft) { + ttl->set_ttl_type(type::TTLType::kLatestTime); + ttl->set_lat_ttl(0); + } + break; + } + case hybridse::vm::kPhysicalOpJoin: { + auto join_node = expr_node->GetAsOrNull(); + if (join_node->join().join_type() == hybridse::node::JoinType::kJoinTypeLeft) { + ttl->set_ttl_type(type::TTLType::kLatestTime); + ttl->set_lat_ttl(0); + } + break; + } + default: + break; + } + } + + index_map_[index] = ttl; + + LOG(INFO) << "suggest creating index for " << db << "." << table << ": " << index << ", " << ttl->DebugString(); } int64_t AbsTTLConvert(int64_t time_ms, bool zero_eq_unbounded) { @@ -573,51 +523,38 @@ int64_t LatTTLConvert(int64_t history_rows_start) { return history_rows_start == INT64_MIN ? 0 : LatTTLConvert(-1 * history_rows_start, false); } -bool IndexMapBuilder::UpdateIndex(const hybridse::vm::Range& range) { - if (latest_record_.empty() || index_map_.find(latest_record_) == index_map_.end()) { - LOG(DFATAL) << "want to update ttl status, but index is not created before"; - return false; - } - // TODO(hw): it's better to check the ts col name - // but range's column names may be renamed, needs schema context - - if (!range.Valid()) { - DLOG(INFO) << "range is invalid, can't update ttl, still use the default ttl"; - return true; - } +void IndexMapBuilder::UpdateTTLByWindow(const hybridse::vm::WindowOp& window, common::TTLSt* ttl_st_ptr) { + auto& range = window.range(); std::stringstream ss; range.frame()->Print(ss, ""); DLOG(INFO) << "frame info: " << ss.str(); - auto ttl_st_ptr = index_map_[latest_record_]; auto frame = range.frame(); auto type = frame->frame_type(); switch (type) { - case hybridse::node::kFrameRows: + case hybridse::node::kFrameRows: { ttl_st_ptr->set_ttl_type(type::TTLType::kLatestTime); ttl_st_ptr->set_lat_ttl(LatTTLConvert(frame->GetHistoryRowsStart())); break; + } case hybridse::node::kFrameRange: - case hybridse::node::kFrameRowsRange: + case hybridse::node::kFrameRowsRange: { ttl_st_ptr->set_ttl_type(type::TTLType::kAbsoluteTime); ttl_st_ptr->set_abs_ttl(AbsTTLConvert(frame->GetHistoryRangeStart())); break; - case hybridse::node::kFrameRowsMergeRowsRange: + } + case hybridse::node::kFrameRowsMergeRowsRange: { // use abs and ttl, only >abs_ttl and > lat_ttl will be expired ttl_st_ptr->set_ttl_type(type::TTLType::kAbsAndLat); ttl_st_ptr->set_abs_ttl(AbsTTLConvert(frame->GetHistoryRangeStart())); ttl_st_ptr->set_lat_ttl(LatTTLConvert(frame->GetHistoryRowsStart())); break; + } default: LOG(WARNING) << "invalid type"; - return false; + return; } - - DLOG(INFO) << latest_record_ << " update ttl " << index_map_[latest_record_]->DebugString(); - // to avoid double update - latest_record_.clear(); - return true; } MultiDBIndexMap IndexMapBuilder::ToMap() { @@ -663,11 +600,11 @@ MultiDBIndexMap IndexMapBuilder::ToMap() { return result; } -std::string IndexMapBuilder::Encode(const std::string& db, const std::string& table, - const hybridse::node::ExprListNode* keys, const hybridse::node::OrderByNode* ts, - const SchemasContext* ctx) { +std::string IndexMapBuilder::Encode(absl::string_view db, absl::string_view table, absl::Span keys, + absl::string_view ts) { // children are ColumnRefNode - auto cols = NormalizeColumns(keys->children_, ctx); + std::vector cols(keys.begin(), keys.end()); + std::sort(cols.begin(), cols.end()); if (cols.empty()) { return {}; } @@ -683,39 +620,11 @@ std::string IndexMapBuilder::Encode(const std::string& db, const std::string& ta ss << KEY_SEP << (*iter); } ss << TS_MARK; - - if (ts != nullptr && ts->order_expressions_ != nullptr) { - for (auto order : ts->order_expressions_->children_) { - auto cast = dynamic_cast(order); - if (cast && cast->expr() != nullptr) { - auto res = NormalizeColumns({const_cast(cast->expr())}, ctx); - if (res.size() != 1 || res[0].empty()) { - LOG(DFATAL) << "parse ts col from order node failed, skip it. " << cast->GetExprString(); - } else { - ss << res[0]; - } - } - } + if (!ts.empty()) { + ss << ts; } - return ss.str(); -} -std::vector IndexMapBuilder::NormalizeColumns(const std::vector& nodes, - const SchemasContext* ctx) { - std::vector result; - for (auto& node : nodes) { - if (nullptr != node && node->GetExprType() == hybridse::node::kExprColumnRef) { - auto cast = hybridse::node::ColumnRefNode::CastFrom(node); - std::string name; - if (!ResolveColumnToSourceColumnName(cast, ctx, &name)) { - return {}; - } - result.emplace_back(name); - } - } - // sort to avoid dup index - std::sort(result.begin(), result.end()); - return result; + return ss.str(); } // ColumnKey in result doesn't set ttl @@ -746,246 +655,6 @@ std::tuple IndexMapBui return std::make_tuple(db_name, table_name, index_str.substr(key_sep + 1), column_key); } -bool GroupAndSortOptimizedParser::KeysOptimizedParse(const SchemasContext* root_schemas_ctx, PhysicalOpNode* in, - Key* left_key, Key* index_key, Key* right_key, Sort* sort, - PhysicalOpNode**) { - if (nullptr == left_key || nullptr == index_key || !left_key->ValidKey()) { - return false; - } - - if (right_key != nullptr && !right_key->ValidKey()) { - return false; - } - - if (PhysicalOpType::kPhysicalOpDataProvider == in->GetOpType()) { - auto scan_op = dynamic_cast(in); - // Do not optimize with Request DataProvider (no index has been provided) - if (DataProviderType::kProviderTypeRequest == scan_op->provider_type_) { - return false; - } - - if (DataProviderType::kProviderTypeTable == scan_op->provider_type_ || - DataProviderType::kProviderTypePartition == scan_op->provider_type_) { - const hybridse::node::ExprListNode* right_partition = - right_key == nullptr ? left_key->keys() : right_key->keys(); - - size_t key_num = right_partition->GetChildNum(); - std::vector bitmap(key_num, false); - hybridse::node::ExprListNode order_values; - - if (DataProviderType::kProviderTypeTable == scan_op->provider_type_) { - // Apply key columns and order column optimization with all indexes binding to - // scan_op->table_handler_ Return false if fail to find an appropriate index - auto groups = right_partition; - auto order = (nullptr == sort ? nullptr : sort->orders_); - DLOG(INFO) << "keys and order optimized: keys=" << hybridse::node::ExprString(groups) - << ", order=" << (order == nullptr ? "null" : hybridse::node::ExprString(order)) - << " for table " << scan_op->table_handler_->GetName(); - - // columns in groups or order, may be renamed - index_map_builder_.CreateIndex(scan_op->table_handler_, groups, order, root_schemas_ctx); - // parser won't create partition_op - return true; - } else { - auto partition_op = dynamic_cast(scan_op); - DCHECK(partition_op != nullptr); - auto index_name = partition_op->index_name_; - // -- return false won't change index_name - LOG(WARNING) << "What if the index is not best index? Do we need to adjust index?"; - return false; - } - } - } else if (PhysicalOpType::kPhysicalOpSimpleProject == in->GetOpType()) { - auto simple_project = dynamic_cast(in); - PhysicalOpNode* new_depend; - return KeysOptimizedParse(root_schemas_ctx, simple_project->producers()[0], left_key, index_key, right_key, - sort, &new_depend); - - } else if (PhysicalOpType::kPhysicalOpRename == in->GetOpType()) { - PhysicalOpNode* new_depend; - return KeysOptimizedParse(root_schemas_ctx, in->producers()[0], left_key, index_key, right_key, sort, - &new_depend); - } - return false; -} - -std::vector GroupAndSortOptimizedParser::InitJoinList( - hybridse::vm::PhysicalWindowAggrerationNode* op) { - std::vector joined_op_list; - auto& window_joins = op->window_joins_.window_joins(); - PhysicalOpNode* cur = op->GetProducer(0); - for (auto& pair : window_joins) { - auto joined = new hybridse::vm::PhysicalJoinNode(cur, pair.first, pair.second); - joined_op_list.push_back(joined); - cur = joined; - } - return joined_op_list; -} - -void GroupAndSortOptimizedParser::TransformParse(PhysicalOpNode* in) { - switch (in->GetOpType()) { - case PhysicalOpType::kPhysicalOpGroupBy: { - auto group_op = dynamic_cast(in); - DCHECK(group_op); - PhysicalOpNode* new_producer; - if (GroupOptimizedParse(group_op->schemas_ctx(), group_op->GetProducer(0), &group_op->group_, - &new_producer)) { - // no orders->no sort->no ttl info - DLOG(INFO) << "ttl won't update by node:\n" << group_op->GetTreeString(); - } - break; - } - case PhysicalOpType::kPhysicalOpProject: { - auto project_op = dynamic_cast(in); - DCHECK(project_op); - if (hybridse::vm::ProjectType::kWindowAggregation == project_op->project_type_) { - auto window_agg_op = dynamic_cast(project_op); - CHECK_NOTNULL(window_agg_op); - PhysicalOpNode* input = window_agg_op->GetProducer(0); - - PhysicalOpNode* new_producer; - if (!window_agg_op->instance_not_in_window()) { - if (KeyAndOrderOptimizedParse(input->schemas_ctx(), input, &window_agg_op->window_.partition_, - &window_agg_op->window_.sort_, &new_producer)) { - index_map_builder_.UpdateIndex(window_agg_op->window_.range()); - } - } - // must prepare for window join column infer - auto& window_joins = window_agg_op->window_joins(); - auto& window_unions = window_agg_op->window_unions(); - auto joined_op_list = InitJoinList(window_agg_op); - if (!window_joins.Empty()) { - size_t join_idx = 0; - for (auto& window_join : window_joins.window_joins()) { - PhysicalOpNode* cur_joined = joined_op_list[join_idx]; - - PhysicalOpNode* new_join_right; - if (JoinKeysOptimizedParse(cur_joined->schemas_ctx(), window_join.first, &window_join.second, - &new_join_right)) { - // no range info - DLOG(INFO) << "ttl won't update by node:\n" << window_agg_op->GetTreeString(); - } - join_idx += 1; - } - } - // joined_op_list need to be deleted - for (auto& op : joined_op_list) { - delete op; - } - - if (!window_unions.Empty()) { - for (auto& window_union : window_unions.window_unions_) { - PhysicalOpNode* new_producer1; - if (KeyAndOrderOptimizedParse(window_union.first->schemas_ctx(), window_union.first, - &window_union.second.partition_, &window_union.second.sort_, - &new_producer1)) { - index_map_builder_.UpdateIndex(window_union.second.range()); - } - } - } - } - break; - } - case PhysicalOpType::kPhysicalOpRequestUnion: { - auto union_op = dynamic_cast(in); - DCHECK(union_op); - PhysicalOpNode* new_producer; - if (!union_op->instance_not_in_window()) { - if (KeysAndOrderFilterOptimizedParse(union_op->schemas_ctx(), union_op->GetProducer(1), - &union_op->window_.partition_, &union_op->window_.index_key_, - &union_op->window_.sort_, &new_producer)) { - index_map_builder_.UpdateIndex(union_op->window().range()); - } - } - - if (!union_op->window_unions().Empty()) { - for (auto& window_union : union_op->window_unions_.window_unions_) { - PhysicalOpNode* new_producer1; - auto& window = window_union.second; - if (KeysAndOrderFilterOptimizedParse(window_union.first->schemas_ctx(), window_union.first, - &window.partition_, &window.index_key_, &window.sort_, - &new_producer1)) { - index_map_builder_.UpdateIndex(window.range()); - } - } - } - break; - } - case PhysicalOpType::kPhysicalOpRequestJoin: { - auto* join_op = dynamic_cast(in); - DCHECK(join_op); - PhysicalOpNode* new_producer; - // Optimized Right Table Partition - if (JoinKeysOptimizedParse(join_op->schemas_ctx(), join_op->GetProducer(1), &join_op->join_, - &new_producer)) { - // no range info - DLOG(INFO) << "ttl won't update by node:\n" << join_op->GetTreeString(); - } - - break; - } - case PhysicalOpType::kPhysicalOpJoin: { - auto* join_op = dynamic_cast(in); - DCHECK(join_op); - PhysicalOpNode* new_producer; - // Optimized Right Table Partition - if (JoinKeysOptimizedParse(join_op->schemas_ctx(), join_op->GetProducer(1), &join_op->join_, - &new_producer)) { - // no range info - DLOG(INFO) << "ttl won't update by node:\n" << join_op->GetTreeString(); - } - - break; - } - case PhysicalOpType::kPhysicalOpFilter: { - auto* filter_op = dynamic_cast(in); - DCHECK(filter_op); - PhysicalOpNode* new_producer; - if (FilterOptimizedParse(filter_op->schemas_ctx(), filter_op->GetProducer(0), &filter_op->filter_, - &new_producer)) { - // no range info - DLOG(INFO) << "ttl won't update by node:\n" << filter_op->GetTreeString(); - } - } - default: { break; } - } -} - -bool ResolveColumnToSourceColumnName(const hybridse::node::ColumnRefNode* col, const SchemasContext* schemas_ctx, - std::string* source_name) { - // use detailed column resolve utility - size_t column_id; - int path_idx; - size_t child_column_id; - size_t source_column_id; - const PhysicalOpNode* source; - hybridse::base::Status status = - schemas_ctx->ResolveColumnID(col->GetDBName(), col->GetRelationName(), col->GetColumnName(), &column_id, - &path_idx, &child_column_id, &source_column_id, &source); - - // try loose the relation - if (!status.isOK() && !col->GetRelationName().empty()) { - status = schemas_ctx->ResolveColumnID("", "", col->GetColumnName(), &column_id, &path_idx, &child_column_id, - &source_column_id, &source); - } - - if (!status.isOK()) { - LOG(WARNING) << "Illegal index column: " << col->GetExprString(); - return false; - } - if (source == nullptr || source->GetOpType() != PhysicalOpType::kPhysicalOpDataProvider) { - LOG(WARNING) << "Index column is not from any source table: " << col->GetExprString(); - return false; - } - status = source->schemas_ctx()->ResolveColumnNameByID(source_column_id, source_name); - if (!status.isOK()) { - LOG(WARNING) << "Illegal source column id #" << source_column_id << " for index column " - << col->GetExprString(); - return false; - } - return true; -} - // return merged result: return new if new is bigger, else return old google::protobuf::uint64 TTLValueMerge(google::protobuf::uint64 old_value, google::protobuf::uint64 new_value) { google::protobuf::uint64 result = old_value; diff --git a/src/base/ddl_parser.h b/src/base/ddl_parser.h index ea4b9953131..a3095347bd9 100644 --- a/src/base/ddl_parser.h +++ b/src/base/ddl_parser.h @@ -113,9 +113,6 @@ class DDLParser { static std::string PhysicalPlan(const std::string& sql, const ::hybridse::type::Database& db); private: - // DLR - static MultiDBIndexMap ParseIndexes(hybridse::vm::PhysicalOpNode* node); - // real get plan func, multi db should use catalog, don't forget init catalog with enable_index=true // if you want the status, use this func static bool GetPlan(const std::string& sql, const std::string& db, diff --git a/src/base/ddl_parser_test.cc b/src/base/ddl_parser_test.cc index 6b6aaed90a0..659dbfe47d1 100644 --- a/src/base/ddl_parser_test.cc +++ b/src/base/ddl_parser_test.cc @@ -15,7 +15,9 @@ */ #include "base/ddl_parser.h" +#include +#include "absl/cleanup/cleanup.h" #include "codec/schema_codec.h" #include "glog/logging.h" #include "google/protobuf/util/message_differencer.h" @@ -171,6 +173,9 @@ class DDLParserTest : public ::testing::Test { ASSERT_TRUE(AddTableToDB(&db, "t2", {"col0", "string", "col1", "int32", "col2", "int16", "col3", "float", "col4", "double", "col5", "int64", "col6", "string"})); + ASSERT_TRUE(AddTableToDB(&db, "t3", + {"col0", "string", "col1", "int32", "col2", "int16", "col3", "float", "col4", "double", + "col5", "int64", "col6", "string"})); } static bool AddColumnToTable(const std::string& col_name, const std::string& col_type, @@ -285,6 +290,12 @@ class DDLParserTest : public ::testing::Test { return index_map.begin()->second; } + bool EnsurePass(const std::string& sql) { + auto catalog = std::make_shared(true); + catalog->AddDatabase(db); + return DDLParser::ValidateSQLInRequest(sql, db.name(), catalog).empty(); + } + protected: std::string DB_NAME = "DDLParserTest"; ::hybridse::type::Database db; @@ -385,19 +396,18 @@ TEST_F(DDLParserTest, joinExtract) { LOG(INFO) << "after add index:\n" << DDLParser::PhysicalPlan(sql, db); } - // TODO: fix later - // { - // ClearAllIndex(); - // // left join - // auto sql = "SELECT t1.col1, t1.col2, t2.col1, t2.col2 FROM t1 left join t2 on t1.col1 = t2.col2;"; - // - // auto index_map = ExtractIndexesWithSingleDB(sql, db); - // // {t2[col_name: "col2" ttl { ttl_type: kLatestTime lat_ttl: 1 }, ]} - // CheckEqual(index_map, {{"t2", {"col2;;lat,0,1"}}}); - // // the added index only has key, no ts - // AddIndexToDB(index_map, &db); - // LOG(INFO) << "after add index:\n" << DDLParser::PhysicalPlan(sql, db); - // } + { + ClearAllIndex(); + // left join + auto sql = "SELECT t1.col1, t1.col2, t2.col1, t2.col2 FROM t1 left join t2 on t1.col1 = t2.col2;"; + + auto index_map = ExtractIndexesWithSingleDB(sql, db); + // {t2[col_name: "col2" ttl { ttl_type: kLatestTime lat_ttl: 1 }, ]} + CheckEqual(index_map, {{"t2", {"col2;;lat,0,0"}}}); + // the added index only has key, no ts + AddIndexToDB(index_map, &db); + LOG(INFO) << "after add index:\n" << DDLParser::PhysicalPlan(sql, db); + } } TEST_F(DDLParserTest, complexJoin) { @@ -419,26 +429,26 @@ TEST_F(DDLParserTest, complexJoin) { LOG(INFO) << "after add index:\n" << DDLParser::PhysicalPlan(sql, db); } - // { - // ClearAllIndex(); - // // no simple equal condition, won't extract index - // auto sql = - // "SELECT t1.col1, t1.col2, t2.col1, t2.col2 FROM t1 left join t2 on timestamp(int64(t1.col6)) = " - // "timestamp(int64(t2.col6));"; - // auto index_map = ExtractIndexesWithSingleDB(sql, db); - // ASSERT_TRUE(index_map.empty()); - // // must have a simple equal condition - // sql = - // "SELECT t1.col1, t1.col2, t2.col1, t2.col2 FROM t1 left join t2 on timestamp(int64(t1.col6)) = " - // "timestamp(int64(t2.col6)) and t1.col1 = t2.col2;"; - // index_map = ExtractIndexesWithSingleDB(sql, db); - // // index is on t2.col2 {t2[col_name: "col2" ttl { ttl_type: kLatestTime lat_ttl: 1 }, ]} - // CheckEqual(index_map, {{"t2", {"col2;;lat,0,1"}}}); - // - // // the added index only has key, no ts - // AddIndexToDB(index_map, &db); - // LOG(INFO) << "after add index:\n" << DDLParser::PhysicalPlan(sql, db); - // } + { + ClearAllIndex(); + // no simple equal condition, won't extract index + auto sql = + "SELECT t1.col1, t1.col2, t2.col1, t2.col2 FROM t1 left join t2 on timestamp(int64(t1.col6)) = " + "timestamp(int64(t2.col6));"; + auto index_map = ExtractIndexesWithSingleDB(sql, db); + ASSERT_TRUE(index_map.empty()); + // must have a simple equal condition + sql = + "SELECT t1.col1, t1.col2, t2.col1, t2.col2 FROM t1 left join t2 on timestamp(int64(t1.col6)) = " + "timestamp(int64(t2.col6)) and t1.col1 = t2.col2;"; + index_map = ExtractIndexesWithSingleDB(sql, db); + // index is on t2.col2 {t2[col_name: "col2" ttl { ttl_type: kLatestTime lat_ttl: 1 }, ]} + CheckEqual(index_map, {{"t2", {"col2;;lat,0,0"}}}); + + // the added index only has key, no ts + AddIndexToDB(index_map, &db); + LOG(INFO) << "after add index:\n" << DDLParser::PhysicalPlan(sql, db); + } } TEST_F(DDLParserTest, multiJoin) { @@ -630,6 +640,94 @@ TEST_F(DDLParserTest, renameColumns) { CheckEqual(index_map, {{"tt1", {"col1;col2;lat,0,1000"}}, {"tt2", {"c1;c2;lat,0,1000"}}}); } +TEST_F(DDLParserTest, lastJoinOverLastJoin) { + absl::Cleanup clean = [&]() { ClearAllIndex(); }; + auto sql = + R"(select * from t1 last join (select * from t2 last join feedbackTable on t2.col1 = feedbackTable.actionValue) tx on t1.col0 = tx.col0)"; + auto index_map = ExtractIndexesWithSingleDB(sql, db); + CheckEqual(index_map, {{"t2", {"col0;;lat,0,1"}}, {"feedbackTable", {"actionValue;;lat,0,1"}}}); + + AddIndexToDB(index_map, &db); + EXPECT_TRUE(EnsurePass(sql)); +} + +TEST_F(DDLParserTest, lastJoinWindow) { + absl::Cleanup clean = [&]() { ClearAllIndex(); }; + auto sql = + R"(select * from t1 last join ( + select *, count(col0) over w as agg from t2 + window w as (partition by col1 order by col5 rows between 3 preceding and current row)) + tx on t1.col0 = tx.col0)"; + auto index_map = ExtractIndexesWithSingleDB(sql, db); + std::map> expect = {{"t2", {"col0;;lat,0,1", "col1;col5;lat,0,3"}}}; + CheckEqual(index_map, std::forward>>(expect)); + + AddIndexToDB(index_map, &db); + EXPECT_TRUE(EnsurePass(sql)); +} + +TEST_F(DDLParserTest, lastJoinUnion) { + absl::Cleanup clean = [&]() { ClearAllIndex(); }; + auto sql = + R"(select * from t1 last join ( + select * from t2 union all select * from t3 + ) tx on t1.col0 = tx.col0)"; + auto index_map = ExtractIndexesWithSingleDB(sql, db); + std::map> expect = {{"t2", {"col0;;lat,0,1"}}, {"t3", {"col0;;lat,0,1"}}}; + CheckEqual(index_map, std::forward>>(expect)); + + AddIndexToDB(index_map, &db); + EXPECT_TRUE(EnsurePass(sql)); +} + +TEST_F(DDLParserTest, windowWithoutOrderBy) { + { + absl::Cleanup clean = [&]() { ClearAllIndex(); }; + auto sql = + R"(select *, count(col0) over w as agg from t1 + window w as (partition by col1 rows between unbounded preceding and current row))"; + auto index_map = ExtractIndexesWithSingleDB(sql, db); + std::map> expect = {{"t1", {"col1;;lat,0,0"}}}; + CheckEqual(index_map, std::forward>>(expect)); + + AddIndexToDB(index_map, &db); + ASSERT_TRUE(EnsurePass(sql)); + } + { + absl::Cleanup clean = [&]() { ClearAllIndex(); }; + auto sql = + R"(select *, count(col0) over w as agg from t1 + window w as (partition by col1 rows between 4 preceding and current row))"; + auto index_map = ExtractIndexesWithSingleDB(sql, db); + std::map> expect = {{"t1", {"col1;;lat,0,4"}}}; + CheckEqual(index_map, std::forward>>(expect)); + + AddIndexToDB(index_map, &db); + ASSERT_TRUE(EnsurePass(sql)); + } + { + absl::Cleanup clean = [&]() { ClearAllIndex(); }; + auto sql = + R"(select *, count(col0) over w as agg from t1 + window w as (partition by col1 rows_range between unbounded preceding and current row))"; + auto index_map = ExtractIndexesWithSingleDB(sql, db); + std::map> expect = {{"t1", {"col1;;abs,0,0"}}}; + CheckEqual(index_map, std::forward>>(expect)); + + AddIndexToDB(index_map, &db); + ASSERT_TRUE(EnsurePass(sql)); + } + { + absl::Cleanup clean = [&]() { ClearAllIndex(); }; + // invalid SQL + auto sql = + R"(select *, count(col0) over w as agg from t1 + window w as (partition by col1 rows_range between 4 preceding and current row))"; + auto index_map = ExtractIndexesWithSingleDB(sql, db); + ASSERT_TRUE(index_map.empty()); + } +} + TEST_F(DDLParserTest, mergeNode) { AddTableToDB(&db, "t1", "id:int, pk1:string, col1:int32, std_ts:timestamp", ",", ":"); auto sql = diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index 8ef85e62a76..6dd0051082d 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -3701,7 +3701,7 @@ hybridse::sdk::Status SQLClusterRouter::HandleDeploy(const std::string& db, auto iter = deploy_node->Options()->find(SKIP_INDEX_CHECK_OPTION); if (iter != deploy_node->Options()->end()) { std::string skip_index_value = iter->second->GetExprString(); - if (absl::EqualsIgnoreCase(absl::string_view(skip_index_value), absl::string_view("true"))) { + if (absl::EqualsIgnoreCase(skip_index_value, "true")) { skip_index_check = true; } } @@ -3746,7 +3746,7 @@ hybridse::sdk::Status SQLClusterRouter::HandleDeploy(const std::string& db, auto iter = deploy_node->Options()->find(SYNC_OPTION); if (iter != deploy_node->Options()->end()) { std::string skip_index_value = iter->second->GetExprString(); - if (absl::EqualsIgnoreCase(absl::string_view(skip_index_value), absl::string_view("false"))) { + if (absl::EqualsIgnoreCase(skip_index_value, "false")) { sync = false; } }