Skip to content

Commit

Permalink
feat!(deployment): auto create index for deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd committed Dec 8, 2023
1 parent e7538bd commit 0ce6e39
Show file tree
Hide file tree
Showing 16 changed files with 385 additions and 629 deletions.
6 changes: 6 additions & 0 deletions cases/query/union_query.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cases:
- id: 0
deployable: true
inputs:
- name: t1
columns: ["id string","c2 int","c4 timestamp"]
Expand Down Expand Up @@ -62,6 +63,7 @@ cases:
cc, cc, 6
dd, dd, 7
- id: 1
deployable: true
desc: select project over union
inputs:
- name: t1
Expand Down Expand Up @@ -176,6 +178,7 @@ cases:
cc, cc, 6
dd, dd, 7
- id: 3
deployable: true
desc: lastjoin(filter<un-optimized>(union)
inputs:
- name: t1
Expand Down Expand Up @@ -239,6 +242,7 @@ cases:
cc, cc, 3
dd, NULL, NULL
- id: 4
deployable: true
desc: lastjoin(filter<optimized>(union)
inputs:
- name: t1
Expand Down Expand Up @@ -300,6 +304,7 @@ cases:
cc, NULL, NULL
dd, NULL, NULL
- id: 5
deployable: true
desc: union(filter<un-optimized>(t2), filter<un-optimized>(t3))
inputs:
- name: t0
Expand Down Expand Up @@ -361,6 +366,7 @@ cases:
20, NULL, NULL, NULL, NULL
30, bb, 30, bb, 5
- id: 6
deployable: true
desc: nested union
inputs:
- name: t1
Expand Down
12 changes: 5 additions & 7 deletions hybridse/include/vm/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,14 @@
#ifndef HYBRIDSE_INCLUDE_VM_ENGINE_H_
#define HYBRIDSE_INCLUDE_VM_ENGINE_H_

#include <map>
#include <memory>
#include <mutex> //NOLINT
#include <set>
#include <string>
#include <utility>
#include <vector>
#include <unordered_map>
#include "base/raw_buffer.h"
#include "base/spin_lock.h"
#include "codec/fe_row_codec.h"
#include "codec/list_iterator_codec.h"
#include "gflags/gflags.h"
#include "llvm-c/Target.h"
#include "proto/fe_common.pb.h"
#include "vm/catalog.h"
#include "vm/engine_context.h"
#include "vm/router.h"
Expand Down Expand Up @@ -183,12 +176,17 @@ class RunSession {
options_ = options;
}

void SetIndexHintsHandler(std::shared_ptr<IndexHintHandler> handler) { index_hints_ = handler; }

protected:
std::shared_ptr<hybridse::vm::CompileInfo> compile_info_;
hybridse::vm::EngineMode engine_mode_;
bool is_debug_;
std::string sp_name_;
std::shared_ptr<const std::unordered_map<std::string, std::string>> options_ = nullptr;

// [ALPHA] output possible diagnostic infos from compiler
std::shared_ptr<IndexHintHandler> index_hints_;
friend Engine;
};

Expand Down
18 changes: 12 additions & 6 deletions hybridse/include/vm/engine_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ struct BatchRequestInfo {
std::set<size_t> output_common_column_indices;
};

class IndexHintHandler {
public:
// report a new index hint
virtual void Report(absl::string_view db, absl::string_view table, absl::Span<std::string const> keys,
absl::string_view ts) = 0;
// update the ttl type by Range info
virtual void UpdateLast(const hybridse::vm::WindowOp&) = 0;
};

enum ComileType {
kCompileSql,
};
Expand All @@ -56,13 +65,10 @@ class CompileInfo {
virtual const Schema& GetParameterSchema() const = 0;
virtual const std::string& GetRequestName() const = 0;
virtual const std::string& GetRequestDbName() const = 0;
virtual const hybridse::vm::BatchRequestInfo& GetBatchRequestInfo()
const = 0;
virtual const hybridse::vm::BatchRequestInfo& GetBatchRequestInfo() const = 0;
virtual const hybridse::vm::PhysicalOpNode* GetPhysicalPlan() const = 0;
virtual void DumpPhysicalPlan(std::ostream& output,
const std::string& tab) = 0;
virtual void DumpClusterJob(std::ostream& output,
const std::string& tab) = 0;
virtual void DumpPhysicalPlan(std::ostream& output, const std::string& tab) = 0;
virtual void DumpClusterJob(std::ostream& output, const std::string& tab) = 0;
};

/// @typedef EngineLRUCache
Expand Down
196 changes: 107 additions & 89 deletions hybridse/src/passes/physical/group_and_sort_optimized.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ bool GroupAndSortOptimized::Transform(PhysicalOpNode* in,
input)) {
return false;
}
} else {
if (plan_ctx_->index_hints() != nullptr) {
plan_ctx_->index_hints()->UpdateLast(window_agg_op->window());

Check warning on line 162 in hybridse/src/passes/physical/group_and_sort_optimized.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/passes/physical/group_and_sort_optimized.cc#L162

Added line #L162 was not covered by tests
}
}
}
// must prepare for window join column infer
Expand Down Expand Up @@ -187,6 +191,10 @@ bool GroupAndSortOptimized::Transform(PhysicalOpNode* in,
&window_union.second.partition_,
&window_union.second.sort_, &new_producer)) {
window_union.first = new_producer;
} else {
if (plan_ctx_->index_hints() != nullptr) {
plan_ctx_->index_hints()->UpdateLast(window_union.second);

Check warning on line 196 in hybridse/src/passes/physical/group_and_sort_optimized.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/passes/physical/group_and_sort_optimized.cc#L196

Added line #L196 was not covered by tests
}
}
}
}
Expand All @@ -205,6 +213,10 @@ bool GroupAndSortOptimized::Transform(PhysicalOpNode* in,
if (!ResetProducer(plan_ctx_, union_op, 1, new_producer)) {
return false;
}
} else {
if (plan_ctx_->index_hints() != nullptr) {
plan_ctx_->index_hints()->UpdateLast(union_op->window());
}
}
}

Expand All @@ -223,6 +235,10 @@ bool GroupAndSortOptimized::Transform(PhysicalOpNode* in,
window_union.first, &window.partition_,
&window.index_key_, &window.sort_, &new_producer)) {
window_union.first = new_producer;
} else {
if (plan_ctx_->index_hints() != nullptr) {
plan_ctx_->index_hints()->UpdateLast(window);
}
}
}
}
Expand Down Expand Up @@ -399,12 +415,42 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas
PhysicalPartitionProviderNode* partition_op = nullptr;
std::string index_name;

std::vector<std::string> keys;
std::vector<std::string> orders;
std::map<size_t, size_t> result_bitmap_mapping;

auto resolved_keys = ResolveExprToSrcColumnName(right_partition, scan_op);
for (size_t i = 0; i < resolved_keys.size(); ++i) {
if (resolved_keys[i].has_value()) {
result_bitmap_mapping[keys.size()] = i;
keys.push_back(resolved_keys[i].value());
}
}
if (keys.empty()) {
return false;
}
if (nullptr != sort && sort->orders() != nullptr) {
auto resolved_orders = ResolveExprToSrcColumnName(sort->orders()->order_expressions(), scan_op);
for (auto& val : resolved_orders) {
if (val.has_value()) {
orders.push_back(val.value());
}
}

if (orders.size() != 1) {
return false;
}
}

if (DataProviderType::kProviderTypeTable == scan_op->provider_type_) {
// Apply key columns and order column optimization with all indexes binding to scan_op->table_handler_
// Return false if fail to find an appropriate index
if (!TransformKeysAndOrderExpr( right_partition,
nullptr == sort ? nullptr : sort->orders_, scan_op,
&index_name, &bitmap)) {
if (!TransformKeysAndOrderExpr(keys, orders, scan_op, result_bitmap_mapping, &index_name, &bitmap)) {
// optimize failed ? report a index hint
if (plan_ctx_->index_hints() != nullptr) {
plan_ctx_->index_hints()->Report(scan_op->GetDb(), scan_op->GetName(), keys,
orders.empty() ? "" : orders.at(0));
}
return false;
}
Status status = plan_ctx_->CreateOp<PhysicalPartitionProviderNode>(&partition_op, scan_op, index_name);
Expand All @@ -417,9 +463,7 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas
index_name = partition_op->index_name_;
// Apply key columns and order column optimization with given index name
// Return false if given index do not match the keys and order column
if (!TransformKeysAndOrderExpr( right_partition,
nullptr == sort ? nullptr : sort->orders_, scan_op,
&index_name, &bitmap)) {
if (!TransformKeysAndOrderExpr(keys, orders, scan_op, result_bitmap_mapping, &index_name, &bitmap)) {
return false;
}
}
Expand Down Expand Up @@ -729,7 +773,8 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas
}
vm::PhysicalSetOperationNode* opt_set = nullptr;
if (!plan_ctx_
->CreateOp<vm::PhysicalSetOperationNode>(&opt_set, set_op->set_type_, opt_inputs, set_op->distinct_)
->CreateOp<vm::PhysicalSetOperationNode>(&opt_set, set_op->set_type_, opt_inputs,
set_op->distinct_)
.isOK()) {
return false;
}
Expand Down Expand Up @@ -806,95 +851,23 @@ bool GroupAndSortOptimized::KeyAndOrderOptimized(
sort, new_in);
}

bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const node::ExprListNode* groups, const node::OrderByNode* order,
vm::PhysicalDataProviderNode* data_node, std::string* index_name,
IndexBitMap* output_bitmap) {
if (nullptr == groups || nullptr == output_bitmap || nullptr == index_name) {
DLOG(WARNING) << "fail to transform keys expr : key expr or output "
"or index_name ptr is null";
return false;
}

if (nullptr == order) {
DLOG(INFO) << "keys optimized: " << node::ExprString(groups);
} else {
DLOG(INFO) << "keys and order optimized: keys=" << node::ExprString(groups)
<< ", order=" << node::ExprString(order);
}
std::vector<std::string> columns;
std::vector<std::string> order_columns;
std::map<size_t, size_t> result_bitmap_mapping;

for (size_t i = 0; i < groups->children_.size(); ++i) {
auto group = groups->children_[i];
switch (group->expr_type_) {
case node::kExprColumnRef: {
auto column = dynamic_cast<node::ColumnRefNode*>(group);
auto oop = expr_cache_.find(column);
if (oop == expr_cache_.end()) {
return false;
}

auto op = oop->second.find(data_node);
if (op == oop->second.end()) {
return false;
}

if (data_node-> table_handler_->GetName() != op->second.tb_name ||
data_node->table_handler_->GetDatabase() != op->second.db_name) {
return false;
}

result_bitmap_mapping[columns.size()] = i;
columns.emplace_back(op->second.col_name);
break;
}
default: {
break;
}
}
}

if (nullptr != order) {
for (size_t i = 0; i < order->order_expressions()->GetChildNum(); ++i) {
auto expr = order->GetOrderExpressionExpr(i);
if (nullptr != expr && expr->GetExprType() == node::kExprColumnRef) {
auto column = dynamic_cast<const node::ColumnRefNode*>(expr);
auto oop = expr_cache_.find(column);
if (oop == expr_cache_.end()) {
return false;
}

auto op = oop->second.find(data_node);
if (op == oop->second.end()) {
return false;
}

if (data_node->table_handler_->GetName() != op->second.tb_name ||
data_node->table_handler_->GetDatabase() != op->second.db_name) {
return false;
}

order_columns.emplace_back(op->second.col_name);
}
}
}
if (columns.empty()) {
return false;
}

bool GroupAndSortOptimized::TransformKeysAndOrderExpr(const std::vector<std::string>& keys,
const std::vector<std::string>& orders,
vm::PhysicalDataProviderNode* data_node,
const std::map<size_t, size_t>& result_bitmap_mapping,
std::string* index_name, IndexBitMap* output_bitmap) {
IndexBitMap match_bitmap;
// internal structure for MatchBestIndex, initially turn every bit true
IndexBitMap state_bitmap(std::vector<std::optional<ColIndexInfo>>(columns.size(), std::make_optional(0)));
if (!MatchBestIndex(columns, order_columns, data_node->table_handler_, &state_bitmap, index_name, &match_bitmap)) {
IndexBitMap state_bitmap(std::vector<std::optional<ColIndexInfo>>(keys.size(), std::make_optional(0)));
if (!MatchBestIndex(keys, orders, data_node->table_handler_, &state_bitmap, index_name, &match_bitmap)) {
return false;
}
if (match_bitmap.bitmap.size() != columns.size()) {
if (match_bitmap.bitmap.size() != keys.size()) {
return false;
}
for (size_t i = 0; i < columns.size(); ++i) {
for (size_t i = 0; i < keys.size(); ++i) {
if (match_bitmap.bitmap[i].has_value()) {
size_t origin_idx = result_bitmap_mapping[i];
size_t origin_idx = result_bitmap_mapping.at(i);
output_bitmap->bitmap.at(origin_idx) = match_bitmap.bitmap[i].value();
}
}
Expand Down Expand Up @@ -1023,5 +996,50 @@ bool GroupAndSortOptimized::MatchBestIndex(const std::vector<std::string>& colum
return succ;
}

std::vector<std::optional<std::string>> GroupAndSortOptimized::ResolveExprToSrcColumnName(
const node::ExprListNode* exprs, vm::PhysicalDataProviderNode* data_node) {
std::vector<std::optional<std::string>> columns;
for (auto node : exprs->children_) {
columns.push_back(ResolveExprToSrcColumnName(node, data_node));
}

return columns;
}

std::optional<std::string> GroupAndSortOptimized::ResolveExprToSrcColumnName(const node::ExprNode* node,
vm::PhysicalDataProviderNode* data_node) {
switch (node->expr_type_) {
case node::kExprColumnRef: {
auto ref = dynamic_cast<const node::ColumnRefNode*>(node);

auto oop = expr_cache_.find(ref);
if (oop == expr_cache_.end()) {
return std::nullopt;

Check warning on line 1017 in hybridse/src/passes/physical/group_and_sort_optimized.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/passes/physical/group_and_sort_optimized.cc#L1017

Added line #L1017 was not covered by tests
}

auto op = oop->second.find(data_node);
if (op == oop->second.end()) {
return std::nullopt;
}

if (data_node->table_handler_->GetName() != op->second.tb_name ||
data_node->table_handler_->GetDatabase() != op->second.db_name) {
return std::nullopt;

Check warning on line 1027 in hybridse/src/passes/physical/group_and_sort_optimized.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/passes/physical/group_and_sort_optimized.cc#L1027

Added line #L1027 was not covered by tests
}

return std::make_optional(op->second.col_name);
}
case node::kExprOrderExpression: {
auto order_expr = dynamic_cast<const node::OrderExpression*>(node);
return ResolveExprToSrcColumnName(order_expr->expr(), data_node);
}
default: {
break;
}
}

return std::nullopt;
}

} // namespace passes
} // namespace hybridse
Loading

0 comments on commit 0ce6e39

Please sign in to comment.