diff --git a/cases/plan/create.yaml b/cases/plan/create.yaml index 315ec30a305..72875fa0f36 100644 --- a/cases/plan/create.yaml +++ b/cases/plan/create.yaml @@ -437,44 +437,45 @@ cases: | +-is_constant: 0 +-inner_node_list[list]: +-0: - +-node[kQuery]: kQueryUnion - +-union_type: ALL UNION - +-left: - | +-node[kQuery]: kQuerySelect - | +-distinct_opt: false - | +-where_expr: null - | +-group_expr_list: null - | +-having_expr: null - | +-order_expr_list: null - | +-limit: null - | +-select_list[list]: - | | +-0: - | | +-node[kResTarget] - | | +-val: - | | | +-expr[primary] - | | | +-value: 1 - | | | +-type: int32 - | | +-name: - | +-tableref_list: [] - | +-window_list: [] - +-right: - +-node[kQuery]: kQuerySelect - +-distinct_opt: false - +-where_expr: null - +-group_expr_list: null - +-having_expr: null - +-order_expr_list: null - +-limit: null - +-select_list[list]: - | +-0: - | +-node[kResTarget] - | +-val: - | | +-expr[primary] - | | +-value: 2 - | | +-type: int32 - | +-name: - +-tableref_list: [] - +-window_list: [] + +-node[kQuery]: kQuerySetOperation + +-operator: UNION ALL + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[primary] + | | | +-value: 1 + | | | +-type: int32 + | | +-name: + | +-tableref_list: [] + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[primary] + | | +-value: 2 + | | +-type: int32 + | +-name: + +-tableref_list: [] + +-window_list: [] - id: 21_create_procedure desc: create procedure, parameters and union distinct query @@ -515,68 +516,63 @@ cases: | +-is_constant: 0 +-inner_node_list[list]: +-0: - +-node[kQuery]: kQueryUnion - +-union_type: DISTINCT UNION - +-left: - | +-node[kQuery]: kQueryUnion - | +-union_type: DISTINCT UNION - | +-left: - | | +-node[kQuery]: kQuerySelect - | | +-distinct_opt: false - | | +-where_expr: null - | | +-group_expr_list: null - | | +-having_expr: null - | | +-order_expr_list: null - | | +-limit: null - | | +-select_list[list]: - | | | +-0: - | | | +-node[kResTarget] - | | | +-val: - | | | | +-expr[primary] - | | | | +-value: 1 - | | | | +-type: int32 - | | | +-name: - | | +-tableref_list: [] - | | +-window_list: [] - | +-right: - | +-node[kQuery]: kQuerySelect - | +-distinct_opt: false - | +-where_expr: null - | +-group_expr_list: null - | +-having_expr: null - | +-order_expr_list: null - | +-limit: null - | +-select_list[list]: - | | +-0: - | | +-node[kResTarget] - | | +-val: - | | | +-expr[primary] - | | | +-value: 2 - | | | +-type: int32 - | | +-name: - | +-tableref_list: [] - | +-window_list: [] - +-right: - +-node[kQuery]: kQuerySelect - +-distinct_opt: false - +-where_expr: null - +-group_expr_list: null - +-having_expr: null - +-order_expr_list: null - +-limit: null - +-select_list[list]: - | +-0: - | +-node[kResTarget] - | +-val: - | | +-expr[primary] - | | +-value: 3 - | | +-type: int32 - | +-name: - +-tableref_list: [] - +-window_list: [] - - - + +-node[kQuery]: kQuerySetOperation + +-operator: UNION DISTINCT + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[primary] + | | | +-value: 1 + | | | +-type: int32 + | | +-name: + | +-tableref_list: [] + | +-window_list: [] + +-1: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[primary] + | | | +-value: 2 + | | | +-type: int32 + | | +-name: + | +-tableref_list: [] + | +-window_list: [] + +-2: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[primary] + | | +-value: 3 + | | +-type: int32 + | +-name: + +-tableref_list: [] + +-window_list: [] - id: 22 desc: create index 1 diff --git a/cases/plan/simple_query.yaml b/cases/plan/simple_query.yaml index 20c7d8fc042..66cc542fbc0 100644 --- a/cases/plan/simple_query.yaml +++ b/cases/plan/simple_query.yaml @@ -466,53 +466,54 @@ cases: SELECT * FROM t1 UNION ALL SELECT * FROM t3 CONFIG ( zone = 'middle' ); expect: node_tree_str: | - +-node[kQuery]: kQueryUnion + +-node[kQuery]: kQuerySetOperation +-config_options: | +-zone: | +-expr[primary] | +-value: middle | +-type: string - +-union_type: ALL UNION - +-left: - | +-node[kQuery]: kQuerySelect - | +-distinct_opt: false - | +-where_expr: null - | +-group_expr_list: null - | +-having_expr: null - | +-order_expr_list: null - | +-limit: null - | +-select_list[list]: - | | +-0: - | | +-node[kResTarget] - | | +-val: - | | | +-expr[all] - | | +-name: - | +-tableref_list[list]: - | | +-0: - | | +-node[kTableRef]: kTable - | | +-table: t1 - | | +-alias: - | +-window_list: [] - +-right: - +-node[kQuery]: kQuerySelect - +-distinct_opt: false - +-where_expr: null - +-group_expr_list: null - +-having_expr: null - +-order_expr_list: null - +-limit: null - +-select_list[list]: - | +-0: - | +-node[kResTarget] - | +-val: - | | +-expr[all] - | +-name: - +-tableref_list[list]: - | +-0: - | +-node[kTableRef]: kTable - | +-table: t3 - | +-alias: - +-window_list: [] + +-operator: UNION ALL + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t1 + | | +-alias: + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[all] + | +-name: + +-tableref_list[list]: + | +-0: + | +-node[kTableRef]: kTable + | +-table: t3 + | +-alias: + +-window_list: [] - id: array desc: | array query diff --git a/cases/plan/union_query.yaml b/cases/plan/union_query.yaml index 9f6ac186775..f917722482e 100644 --- a/cases/plan/union_query.yaml +++ b/cases/plan/union_query.yaml @@ -13,29 +13,298 @@ # limitations under the License. cases: + - id: 0 + desc: DISTINCT UNION t1 t2 + sql: SELECT * FROM t1 UNION DISTINCT SELECT * FROM t2 UNION DISTINCT SELECT * FROM t3; + expect: + node_tree_str: | + +-node[kQuery]: kQuerySetOperation + +-operator: UNION DISTINCT + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t1 + | | +-alias: + | +-window_list: [] + +-1: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t2 + | | +-alias: + | +-window_list: [] + +-2: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[all] + | +-name: + +-tableref_list[list]: + | +-0: + | +-node[kTableRef]: kTable + | +-table: t3 + | +-alias: + +-window_list: [] - id: 1 - desc: 简单UNION两张表 - mode: request-unsupport + desc: UNION ALL t1 t2 sql: SELECT * FROM t1 UNION ALL SELECT * FROM t2; + expect: + node_tree_str: | + +-node[kQuery]: kQuerySetOperation + +-operator: UNION ALL + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t1 + | | +-alias: + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[all] + | +-name: + +-tableref_list[list]: + | +-0: + | +-node[kTableRef]: kTable + | +-table: t2 + | +-alias: + +-window_list: [] + plan_tree_str: | + +-[kQueryPlan] + +-[kUnionPlan] + +-operator: UNION ALL + +-children[list]: + +-[kQueryPlan] + +-[kProjectPlan] + +-table: t1 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t1 + +-[kQueryPlan] + +-[kProjectPlan] + +-table: t2 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t2 - id: 2 - desc: DISTINCT UNION t1 t2 - mode: request-unsupport - sql: SELECT * FROM t1 UNION DISTINCT SELECT * FROM t2; + desc: UNION must follow by ALL or DISTINCT + sql: SELECT * FROM t1 UNION select * FROM t2; + expect: + success: false + msg: | + Syntax error: Expected keyword ALL or keyword DISTINCT but got keyword SELECT [at 1:24] + SELECT * FROM t1 UNION select * FROM t2; + ^ - id: 3 - desc: UNION ALL t1 t2 - mode: request-unsupport - sql: SELECT * FROM t1 UNION ALL SELECT * FROM t2; + desc: UNION with exact DINSTINCT TYPE + sql: SELECT * FROM t1 UNION ALL select * FROM t2 UNION DISTINCT SELECT * FROM t3; + expect: + success: false + msg: | + Syntax error: Different set operations cannot be used in the same query without using parentheses for grouping [at 1:45] + SELECT * FROM t1 UNION ALL select * FROM t2 UNION DISTINCT SELECT * FROM t3; + ^ - id: 4 - desc: UNION ALL t1 t2 t3 - mode: request-unsupport - sql: SELECT * FROM t1 UNION ALL SELECT * FROM t2 UNION SELECT * FROM t3; + desc: for query with order by or limit, must quote with parantheses + sql: SELECT * FROM t1 LIMIT 10 UNION ALL select * FROM t2 + expect: + success: false + msg: | + Syntax error: Expected end of input but got keyword UNION [at 1:27] + SELECT * FROM t1 LIMIT 10 UNION ALL select * FROM t2 + ^ - id: 5 - desc: 两个拼表子查询UNION - mode: request-unsupport - sql: SELECT * FROM t1 left join t2 on t1.col1 = t2.col2 UNION ALL SELECT * FROM t3 UNION SELECT * FROM t4; + sql: (SELECT * FROM t1 LIMIT 10) UNION ALL (select * FROM t2 UNION DISTINCT select * from t3) + expect: + node_tree_str: | + +-node[kQuery]: kQuerySetOperation + +-operator: UNION ALL + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: + | | +-node[kLimit] + | | +-limit_cnt: 10 + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t1 + | | +-alias: + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySetOperation + +-operator: UNION DISTINCT + +-inputs[list]: + +-0: + | +-node[kQuery]: kQuerySelect + | +-distinct_opt: false + | +-where_expr: null + | +-group_expr_list: null + | +-having_expr: null + | +-order_expr_list: null + | +-limit: null + | +-select_list[list]: + | | +-0: + | | +-node[kResTarget] + | | +-val: + | | | +-expr[all] + | | +-name: + | +-tableref_list[list]: + | | +-0: + | | +-node[kTableRef]: kTable + | | +-table: t2 + | | +-alias: + | +-window_list: [] + +-1: + +-node[kQuery]: kQuerySelect + +-distinct_opt: false + +-where_expr: null + +-group_expr_list: null + +-having_expr: null + +-order_expr_list: null + +-limit: null + +-select_list[list]: + | +-0: + | +-node[kResTarget] + | +-val: + | | +-expr[all] + | +-name: + +-tableref_list[list]: + | +-0: + | +-node[kTableRef]: kTable + | +-table: t3 + | +-alias: + +-window_list: [] + plan_tree_str: | + +-[kQueryPlan] + +-[kUnionPlan] + +-operator: UNION ALL + +-children[list]: + +-[kQueryPlan] + +-[kLimitPlan] + +-limit_cnt: 10 + +-[kProjectPlan] + +-table: t1 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t1 + +-[kQueryPlan] + +-[kUnionPlan] + +-operator: UNION DISTINCT + +-children[list]: + +-[kQueryPlan] + +-[kProjectPlan] + +-table: t2 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t2 + +-[kQueryPlan] + +-[kProjectPlan] + +-table: t3 + +-project_list_vec[list]: + +-[kProjectList] + +-projects on table [list]: + +-[kProjectNode] + +-[0]*: * + +-[kTablePlan] + +-table: t3 - id: 6 - desc: 两个复杂子查询UNION + desc: config option only append at end of query statement, not expression sql: | - SELECT sum(COL1) as col1sum, * FROM t1 where col2 > 10 group by COL1, COL2 having col1sum > 0 order by - COL1+COL2 limit 10 UNION ALL SELECT sum(COL1) as col1sum, * FROM t1 group by COL1, COL2 having sum(COL1) > 0; + (SELECT * FROM t1 LIMIT 10 config ( c = 'c')) + UNION SELECT * FROM t2 + config (c = 'c') + expect: + success: false + msg: | + Syntax error: Expected ")" but got keyword CONFIG [at 1:28] + (SELECT * FROM t1 LIMIT 10 config ( c = 'c')) + ^ diff --git a/cases/query/union_query.yml b/cases/query/union_query.yml new file mode 100644 index 00000000000..6429f997478 --- /dev/null +++ b/cases/query/union_query.yml @@ -0,0 +1,42 @@ +cases: + - id: 0 + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + - ["cc",40,1000] + - ["dd",50,1000] + - name: t2 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",1, 2000] + - ["bb",2, 3000] + - ["cc",3, 3000] + - name: t3 + columns: ["c1 string", "val int", "c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",4, 4000] + - ["bb",5, 2000] + - ["cc",6, 3000] + - ["dd",7, 3000] + # Key + TS 完全相同情况下, 更右边的 排在输出临时表的更后面, 因此首先被 LAST JOIN 选中 + # Segment 迭代器是从 TS 最大 -> 最小 + sql: | + select t1.c1, tx.c1 as tx1, tx.val as val + from t1 last join ( + select * from t2 union all select * from t3 + ) tx + on t1.c1 = tx.c1 + expect: + order: c1 + columns: ["c1 string", "tx1 string", "val int"] + data: | + aa, aa, 4 + bb, bb, 2 + cc, cc, 6 + dd, dd, 7 diff --git a/hybridse/include/codec/row.h b/hybridse/include/codec/row.h index 69158d41e85..09ba4741090 100644 --- a/hybridse/include/codec/row.h +++ b/hybridse/include/codec/row.h @@ -66,6 +66,10 @@ class Row { // > 0 iff "*this" > "b" int compare(const Row &b) const; + friend bool operator<(const Row &lhs, const Row &rhs) { return lhs.compare(rhs) < 0; } + friend bool operator>(const Row &lhs, const Row &rhs) { return lhs.compare(rhs) > 0; } + friend bool operator==(const Row &lhs, const Row &rhs) { return lhs.compare(rhs) == 0; } + int8_t **GetRowPtrs() const; int32_t GetRowPtrCnt() const; diff --git a/hybridse/include/node/node_base.h b/hybridse/include/node/node_base.h index 97ba5cf8e49..8aa678c90a8 100644 --- a/hybridse/include/node/node_base.h +++ b/hybridse/include/node/node_base.h @@ -89,7 +89,7 @@ class NodeBase : public base::FeBaseObject { template bool EqualsOverride(const T* other, Pred&& pred) const { auto lhs = dynamic_cast(this); - if (lhs != nullptr) { + if (lhs == nullptr) { return false; } diff --git a/hybridse/include/node/node_enum.h b/hybridse/include/node/node_enum.h index a4159e2b20e..269cc3175b0 100644 --- a/hybridse/include/node/node_enum.h +++ b/hybridse/include/node/node_enum.h @@ -111,9 +111,9 @@ enum TableRefType { }; enum QueryType { - kQuerySelect, + kQuerySelect = 0, kQuerySub, - kQueryUnion, + kQuerySetOperation, }; enum ExprType { kExprUnknow = -1, @@ -254,8 +254,11 @@ enum JoinType { kJoinTypeCross, // AKA commma join }; -enum UnionType { kUnionTypeDistinct, kUnionTypeAll }; - +enum class SetOperationType { + UNION, + EXCEPT, + INTERSECT, +}; enum CmdType { kCmdCreateDatabase = 0, kCmdUseDatabase, @@ -302,7 +305,7 @@ enum PlanType { kPlanTypeFilter, kPlanTypeTable, kPlanTypeJoin, - kPlanTypeUnion, + kPlanTypeSetOperation, kPlanTypeSort, kPlanTypeGroup, kPlanTypeDistinct, diff --git a/hybridse/include/node/node_manager.h b/hybridse/include/node/node_manager.h index ab87e588a53..0cb581ffb33 100644 --- a/hybridse/include/node/node_manager.h +++ b/hybridse/include/node/node_manager.h @@ -104,8 +104,6 @@ class NodeManager { ExprListNode *group_expr_list, ExprNode *having_expr, ExprNode *order_expr_list, SqlNodeList *window_list_ptr, SqlNode *limit_ptr); - QueryNode *MakeUnionQueryNode(QueryNode *left, QueryNode *right, - bool is_all); TableRefNode *MakeTableNode(const std::string &name, const std::string &alias); TableRefNode *MakeTableNode(const std::string& db, diff --git a/hybridse/include/node/plan_node.h b/hybridse/include/node/plan_node.h index 096ae86aae1..3085b27c699 100644 --- a/hybridse/include/node/plan_node.h +++ b/hybridse/include/node/plan_node.h @@ -151,16 +151,6 @@ class JoinPlanNode : public BinaryPlanNode { const ExprNode *condition_; }; -class UnionPlanNode : public BinaryPlanNode { - public: - UnionPlanNode(PlanNode *left, PlanNode *right, bool is_all) - : BinaryPlanNode(kPlanTypeUnion, left, right), is_all(is_all) {} - void Print(std::ostream &output, const std::string &org_tab) const override; - virtual bool Equals(const PlanNode *that) const; - const bool is_all; - std::shared_ptr config_options_; -}; - class CrossProductPlanNode : public BinaryPlanNode { public: CrossProductPlanNode(PlanNode *left, PlanNode *right) : BinaryPlanNode(kPlanTypeJoin, left, right) {} @@ -204,6 +194,7 @@ class ProjectPlanNode : public UnaryPlanNode { class QueryPlanNode : public UnaryPlanNode { public: + QueryPlanNode() : UnaryPlanNode(kPlanTypeQuery) {} explicit QueryPlanNode(PlanNode *node) : UnaryPlanNode(node, kPlanTypeQuery) {} ~QueryPlanNode() {} void Print(std::ostream &output, const std::string &org_tab) const override; @@ -213,6 +204,29 @@ class QueryPlanNode : public UnaryPlanNode { std::shared_ptr config_options_; }; +class SetOperationPlanNode : public MultiChildPlanNode { + public: + SetOperationPlanNode(SetOperationType type, absl::Span input, bool distinct) + : MultiChildPlanNode(kPlanTypeSetOperation), op_type_(type), inputs_(input), distinct_(distinct) { + for (auto n : input) { + AddChild(n); + } + } + ~SetOperationPlanNode() override {} + + const absl::Span &inputs() const { return inputs_; } + SetOperationType op_type() const { return op_type_; } + bool distinct() const { return distinct_; } + + void Print(std::ostream &output, const std::string &org_tab) const override; + bool Equals(const PlanNode *that) const override; + + private: + SetOperationType op_type_; + absl::Span inputs_; + bool distinct_ = false; +}; + class WithClauseEntryPlanNode : public UnaryPlanNode { public: WithClauseEntryPlanNode(std::string alias, QueryPlanNode *query) diff --git a/hybridse/include/node/sql_node.h b/hybridse/include/node/sql_node.h index dcf162a96ab..2f7bc9f39c3 100644 --- a/hybridse/include/node/sql_node.h +++ b/hybridse/include/node/sql_node.h @@ -54,6 +54,8 @@ std::string NameOfSqlNodeType(const SqlNodeType &type); absl::string_view CmdTypeName(const CmdType type); +std::string SetOperatorName(SetOperationType type, bool dis); + inline const std::string ExplainTypeName(const ExplainType &explain_type) { switch (explain_type) { case kExplainLogical: @@ -176,8 +178,8 @@ inline const std::string QueryTypeName(const QueryType &type) { switch (type) { case kQuerySelect: return "kQuerySelect"; - case kQueryUnion: - return "kQueryUnion"; + case kQuerySetOperation: + return "kQuerySetOperation"; case kQuerySub: return "kQuerySub"; default: { @@ -772,15 +774,23 @@ class SelectQueryNode : public QueryNode { bool last_item) const; }; -class UnionQueryNode : public QueryNode { +class SetOperationNode : public QueryNode { public: - UnionQueryNode(const QueryNode *left, const QueryNode *right, bool is_all) - : QueryNode(kQueryUnion), left_(left), right_(right), is_all_(is_all) {} - void Print(std::ostream &output, const std::string &org_tab) const; - virtual bool Equals(const SqlNode *node) const; - const QueryNode *left_; - const QueryNode *right_; - const bool is_all_; + SetOperationNode(SetOperationType type, absl::Span input, bool distinct) + : QueryNode(kQuerySetOperation), op_type_(type), inputs_(input), distinct_(distinct) {} + ~SetOperationNode() override {} + + const absl::Span &inputs() const { return inputs_; } + SetOperationType op_type() const { return op_type_; } + bool distinct() const { return distinct_; } + + void Print(std::ostream &output, const std::string &org_tab) const override; + bool Equals(const SqlNode *node) const override; + + private: + SetOperationType op_type_; + absl::Span inputs_; + bool distinct_ = false; }; class ParameterExpr : public ExprNode { diff --git a/hybridse/include/vm/catalog.h b/hybridse/include/vm/catalog.h index 4bd007645bd..59902ea179e 100644 --- a/hybridse/include/vm/catalog.h +++ b/hybridse/include/vm/catalog.h @@ -257,6 +257,8 @@ class TableHandler : public DataHandler { virtual std::shared_ptr GetTablet(const std::string& index_name, const std::vector& pks) { return std::shared_ptr(); } + + static std::shared_ptr Cast(std::shared_ptr in); }; /// \brief A table dataset's error handler, representing a error table @@ -366,6 +368,8 @@ class PartitionHandler : public TableHandler { const std::string GetHandlerTypeName() override { return "PartitionHandler"; } + + static std::shared_ptr Cast(std::shared_ptr in); }; /// \brief A wrapper of table handler which is used as a asynchronous row diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h index dd51c73bfd1..c848dd409ba 100644 --- a/hybridse/include/vm/physical_op.h +++ b/hybridse/include/vm/physical_op.h @@ -47,7 +47,7 @@ enum PhysicalOpType { kPhysicalOpRename, kPhysicalOpDistinct, kPhysicalOpJoin, - kPhysicalOpUnion, + kPhysicalOpSetOperation, kPhysicalOpWindow, kPhysicalOpIndexSeek, kPhysicalOpRequestUnion, @@ -153,6 +153,7 @@ class FnComponent { // Sort Component can only handle single order expressions class Sort : public FnComponent { public: + Sort() = default; explicit Sort(const node::OrderByNode *orders) : orders_(orders) {} virtual ~Sort() {} @@ -184,6 +185,8 @@ class Sort : public FnComponent { base::Status ReplaceExpr(const passes::ExprReplacer &replacer, node::NodeManager *nm, Sort *out) const; + Sort ShadowCopy() { return Sort(orders_); } + const node::OrderByNode *orders_; }; @@ -296,6 +299,8 @@ class Key : public FnComponent { base::Status ReplaceExpr(const passes::ExprReplacer &replacer, node::NodeManager *nm, Key *out) const; + Key ShadowCopy() { return Key(keys_); } + const node::ExprListNode *keys_; }; @@ -445,7 +450,7 @@ class PhysicalOpNode : public node::NodeBase { // limit always >= 0 so it is safe to do that int32_t GetLimitCntValue() const { return limit_cnt_.value_or(-1); } - bool IsSameSchema(const vm::Schema &schema, const vm::Schema &exp_schema) const; + static bool IsSameSchema(const codec::Schema* schema, const codec::Schema* exp_schema); // `lhs` schema contains `rhs` and is start with `rhs` schema // @@ -834,7 +839,6 @@ class PhysicalGroupAggrerationNode : public PhysicalProjectNode { Key group_; }; -class PhysicalUnionNode; class PhysicalJoinNode; class WindowOp { @@ -1410,23 +1414,37 @@ class PhysicalRequestJoinNode : public PhysicalBinaryNode { } }; -class PhysicalUnionNode : public PhysicalBinaryNode { +class PhysicalSetOperationNode : public PhysicalOpNode { public: - PhysicalUnionNode(PhysicalOpNode *left, PhysicalOpNode *right, bool is_all) - : PhysicalBinaryNode(left, right, kPhysicalOpUnion, true), - is_all_(is_all) { - output_type_ = kSchemaTypeTable; + PhysicalSetOperationNode(node::SetOperationType type, absl::Span inputs, bool distinct) + : PhysicalOpNode(kPhysicalOpSetOperation, false), op_type_(type), distinct_(distinct) { + for (auto n : inputs) { + AddProducer(n); + } + bool group_optimized = true; + for (auto n : producers_) { + if (n-> GetOutputType() != kSchemaTypeGroup) { + group_optimized = false; + break; + } + } + + if (group_optimized && op_type_ == node::SetOperationType::UNION) { + output_type_ = kSchemaTypeGroup; + } else { + output_type_ = kSchemaTypeTable; + } } - virtual ~PhysicalUnionNode() {} - base::Status InitSchema(PhysicalPlanContext *) override; - virtual void Print(std::ostream &output, const std::string &tab) const; + ~PhysicalSetOperationNode() override {} - base::Status WithNewChildren(node::NodeManager *nm, - const std::vector &children, + base::Status InitSchema(PhysicalPlanContext *) override; + void Print(std::ostream &output, const std::string &tab) const override; + base::Status WithNewChildren(node::NodeManager *nm, const std::vector &children, PhysicalOpNode **out) override; - const bool is_all_; - static PhysicalUnionNode *CastFrom(PhysicalOpNode *node); + node::SetOperationType op_type_; + const bool distinct_ = false; + static PhysicalSetOperationNode *CastFrom(PhysicalOpNode *node); }; class PhysicalPostRequestUnionNode : public PhysicalBinaryNode { @@ -1515,9 +1533,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode { << "Fail to add window union : producer is empty or null"; return false; } - if (output_request_row() && - !IsSameSchema(*node->GetOutputSchema(), - *producers_[0]->GetOutputSchema())) { + if (output_request_row() && !IsSameSchema(node->GetOutputSchema(), producers_[0]->GetOutputSchema())) { LOG(WARNING) << "Union Table and window input schema aren't consistent"; return false; diff --git a/hybridse/include/vm/schemas_context.h b/hybridse/include/vm/schemas_context.h index b2e68d9477a..6840e3d6936 100644 --- a/hybridse/include/vm/schemas_context.h +++ b/hybridse/include/vm/schemas_context.h @@ -244,6 +244,9 @@ class SchemasContext { void BuildTrivial(const std::vector& schemas); void BuildTrivial(const std::string& default_db, const std::vector& tables); + // {db}.{table}({col_name}:{col_type}, ...) + std::string ReadableString() const { return ""; } + std::string DebugString() const; friend std::ostream& operator<<(std::ostream& os, const SchemasContext& sc) { return os << sc.DebugString(); } diff --git a/hybridse/src/node/node_manager.cc b/hybridse/src/node/node_manager.cc index 8f6f80d7517..2c52fe3476e 100644 --- a/hybridse/src/node/node_manager.cc +++ b/hybridse/src/node/node_manager.cc @@ -19,6 +19,7 @@ #include #include #include +#include "node/sql_node.h" namespace hybridse { namespace node { @@ -43,11 +44,6 @@ QueryNode *NodeManager::MakeSelectQueryNode(bool is_distinct, SqlNodeList *selec return node_ptr; } -QueryNode *NodeManager::MakeUnionQueryNode(QueryNode *left, QueryNode *right, bool is_all) { - UnionQueryNode *node_ptr = new UnionQueryNode(left, right, is_all); - RegisterNode(node_ptr); - return node_ptr; -} TableRefNode *NodeManager::MakeTableNode(const std::string &name, const std::string &alias) { return MakeTableNode("", name, alias); } diff --git a/hybridse/src/node/plan_node.cc b/hybridse/src/node/plan_node.cc index 9a613c8d0a4..b7f74e48e59 100644 --- a/hybridse/src/node/plan_node.cc +++ b/hybridse/src/node/plan_node.cc @@ -42,7 +42,7 @@ bool PlanNode::Equals(const PlanNode *that) const { if (nullptr == that || type_ != that->type_) { return false; } - return PlanListEquals(this->children_, that->children_); + return type_ == that->GetType() && PlanListEquals(this->children_, that->children_); } void PlanNode::PrintChildren(std::ostream &output, const std::string &tab) const { @@ -184,7 +184,7 @@ std::string NameOfPlanNodeType(const PlanType &type) { return std::string("kTablePlan"); case kPlanTypeJoin: return "kJoinPlan"; - case kPlanTypeUnion: + case kPlanTypeSetOperation: return "kUnionPlan"; case kPlanTypeSort: return "kSortPlan"; @@ -638,33 +638,17 @@ bool JoinPlanNode::Equals(const PlanNode *node) const { BinaryPlanNode::Equals(that); } -void UnionPlanNode::Print(std::ostream &output, - const std::string &org_tab) const { +void SetOperationPlanNode::Print(std::ostream &output, const std::string &org_tab) const { PlanNode::Print(output, org_tab); output << "\n"; - std::string tab = org_tab + INDENT; - PrintValue(output, tab, is_all ? "ALL" : "DISTINCT", "union_type", false); - if (config_options_ != nullptr) { - output << "\n"; - PrintValue(output, tab, config_options_.get(), "config_options", false); - } + PrintValue(output, org_tab + INDENT, SetOperatorName(op_type_, distinct_), "operator", false); output << "\n"; PrintChildren(output, org_tab); } -bool UnionPlanNode::Equals(const PlanNode *node) const { - if (nullptr == node) { - return false; - } - - if (this == node) { - return true; - } - - if (type_ != node->type_) { - return false; - } - const UnionPlanNode *that = dynamic_cast(node); - return this->is_all == that->is_all && BinaryPlanNode::Equals(that); +bool SetOperationPlanNode::Equals(const PlanNode *node) const { + auto casted = dynamic_cast(node); + return MultiChildPlanNode::Equals(node) && casted && op_type_ == casted->op_type() && + distinct_ == casted->distinct_; } void QueryPlanNode::Print(std::ostream &output, const std::string &org_tab) const { diff --git a/hybridse/src/node/sql_node.cc b/hybridse/src/node/sql_node.cc index 6fa2a82d42a..11e6a3d759b 100644 --- a/hybridse/src/node/sql_node.cc +++ b/hybridse/src/node/sql_node.cc @@ -1214,6 +1214,18 @@ absl::string_view CmdTypeName(const CmdType type) { return "undefined cmd type"; } +std::string SetOperatorName(SetOperationType type, bool dis) { + std::string distinct = dis ? "DISTINCT" : "ALL"; + switch (type) { + case SetOperationType::UNION: + return "UNION " + distinct; + case SetOperationType::EXCEPT: + return "EXCEPT " + distinct; + case SetOperationType::INTERSECT: + return "INTERSECT " + distinct; + } +} + std::string DataTypeName(DataType type) { auto &map = GetDataTypeNamesMap(); auto it = map.find(type); @@ -2025,23 +2037,6 @@ bool JoinNode::Equals(const SqlNode *node) const { ExprEquals(this->orders_, that->orders_) && SqlEquals(this->left_, that->right_); } -void UnionQueryNode::Print(std::ostream &output, const std::string &org_tab) const { - QueryNode::Print(output, org_tab); - const std::string tab = org_tab + INDENT + SPACE_ED; - output << "\n"; - PrintValue(output, tab, is_all_ ? "ALL UNION" : "DISTINCT UNION", "union_type", false); - output << "\n"; - PrintSqlNode(output, tab, left_, "left", false); - output << "\n"; - PrintSqlNode(output, tab, right_, "right", true); -} -bool UnionQueryNode::Equals(const SqlNode *node) const { - if (!QueryNode::Equals(node)) { - return false; - } - const UnionQueryNode *that = dynamic_cast(node); - return this->is_all_ && that->is_all_ && SqlEquals(this->left_, that->right_); -} void QueryExpr::Print(std::ostream &output, const std::string &org_tab) const { ExprNode::Print(output, org_tab); const std::string tab = org_tab + INDENT + SPACE_ED; @@ -2721,5 +2716,22 @@ std::string DropPathAction::DebugString() const { return absl::Substitute("DropPathAction ($0)", target_); } +bool SetOperationNode::Equals(const SqlNode *node) const { + auto *rhs = dynamic_cast(node); + return this->QueryNode::Equals(node) && this->op_type() == rhs->op_type() && this->distinct() == rhs->distinct() && + absl::c_equal(this->inputs(), rhs->inputs(), + [](const QueryNode *const l, const QueryNode *const r) { return SqlEquals(l, r); }); +} +void SetOperationNode::Print(std::ostream &output, const std::string &org_tab) const { + QueryNode::Print(output, org_tab); + output << "\n"; + PrintValue(output, org_tab + INDENT, SetOperatorName(op_type_, distinct_), "operator", false); + output << "\n" << org_tab + INDENT << SPACE_ST << "inputs[list]:"; + for (size_t i = 0; i < inputs().size(); i++) { + auto node = inputs()[i]; + output << "\n"; + PrintSqlNode(output, org_tab + INDENT + INDENT, node, std::to_string(i), i + 1 == inputs().size()); + } +} } // namespace node } // namespace hybridse diff --git a/hybridse/src/node/sql_node_test.cc b/hybridse/src/node/sql_node_test.cc index 545d9b647fd..4f466c82fb6 100644 --- a/hybridse/src/node/sql_node_test.cc +++ b/hybridse/src/node/sql_node_test.cc @@ -918,7 +918,7 @@ TEST_F(SqlNodeTest, ColumnIdTest) { TEST_F(SqlNodeTest, QueryTypeNameTest) { ASSERT_EQ("kQuerySelect", node::QueryTypeName(node::kQuerySelect)); - ASSERT_EQ("kQueryUnion", node::QueryTypeName(node::kQueryUnion)); + ASSERT_EQ("kQueryUnion", node::QueryTypeName(node::kQuerySetOperation)); ASSERT_EQ("kQuerySub", node::QueryTypeName(node::kQuerySub)); } diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.cc b/hybridse/src/passes/physical/group_and_sort_optimized.cc index 2d51b336167..7ba75f7ed30 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.cc +++ b/hybridse/src/passes/physical/group_and_sort_optimized.cc @@ -51,6 +51,15 @@ using hybridse::vm::ProjectType; static bool ResolveColumnToSourceColumnName(const node::ColumnRefNode* col, const SchemasContext* schemas_ctx, std::string* db, std::string* table, std::string* source_col); +template +T ShadowCopy(T* in) { + if (in != nullptr) { + return in->ShadowCopy(); + } + + return T{}; +} + // ExprNode may be resolving under different SchemasContext later (say one of its descendants context), // with column name etc it may not able to resvole since a column rename may happen in SimpleProject node. // With the column id hint written to corresponding ColumnRefNode earlier, resolving issue can be mitigated. @@ -349,13 +358,12 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas if (DataProviderType::kProviderTypeTable == scan_op->provider_type_ || DataProviderType::kProviderTypePartition == scan_op->provider_type_) { - auto* table_node = dynamic_cast(scan_op); if (optimize_info_) { if (optimize_info_->left_key == left_key && optimize_info_->index_key == index_key && optimize_info_->right_key == right_key && optimize_info_->sort_key == sort) { if (optimize_info_->optimized != nullptr && - table_node->GetDb() == optimize_info_->optimized->GetDb() && - table_node->GetName() == optimize_info_->optimized->GetName()) { + scan_op->GetDb() == optimize_info_->optimized->GetDb() && + scan_op->GetName() == optimize_info_->optimized->GetName()) { *new_in = optimize_info_->optimized; return true; } @@ -621,6 +629,52 @@ bool GroupAndSortOptimized::KeysOptimizedImpl(const SchemasContext* root_schemas } *new_in = new_union; return true; + } else if (PhysicalOpType::kPhysicalOpSetOperation == in->GetOpType()) { + auto set_op = dynamic_cast(in); + // keys optimize for each inputs for set operation + std::vector opt_inputs; + opt_inputs.reserve(in->GetProducerCnt()); + bool opt_all = true; + for (size_t i = 0; i < in->GetProducerCnt(); i++) { + auto n = in->GetProducer(i); + // expr_cache_.clear(); + // optimize_info_ = nullptr; + PhysicalOpNode* optimized = nullptr; + // copy keys + auto left_key_cp = ShadowCopy(left_key); + auto index_key_cp = ShadowCopy(index_key); + auto right_key_cp = ShadowCopy(right_key); + auto sort_cp = ShadowCopy(sort); + + expr_cache_.clear(); + optimize_info_ = nullptr; + if (!KeysOptimized(n->schemas_ctx(), n, &left_key_cp, &index_key_cp, &right_key_cp, &sort_cp, &optimized)) { + LOG(WARNING) << "unable to optimize operation set input: " << n->GetTreeString(); + opt_all = false; + } + opt_inputs.push_back(optimized == nullptr ? n : optimized); + + if (i + 1 == in->GetProducerCnt()) { + // write keys + left_key->set_keys(left_key_cp.keys()); + index_key->set_keys(index_key_cp.keys()); + if (right_key) { + right_key->set_keys(right_key_cp.keys()); + } + if (sort) { + sort->set_orders(sort_cp.orders()); + } + } + } + vm::PhysicalSetOperationNode* opt_set = nullptr; + if (!plan_ctx_ + ->CreateOp(&opt_set, set_op->op_type_, opt_inputs, set_op->distinct_) + .isOK()) { + return false; + } + *new_in = opt_set; + + return opt_all; } return false; } @@ -915,6 +969,12 @@ static bool ResolveColumnToSourceColumnName(const node::ColumnRefNode* col, cons const PhysicalOpNode* source; Status status = schemas_ctx->ResolveColumnID(db, rel, col_name, &column_id, &path_idx, &child_column_id, &source_column_id, &source); + // try loose the relation, fallback to column name resolving. + // useful when underlaying node is e.g a SetOperationNode. + if (!status.isOK() && !col->GetRelationName().empty()) { + status = schemas_ctx->ResolveColumnID("", "", col->GetColumnName(), &column_id, &path_idx, &child_column_id, + &source_column_id, &source); + } if (!status.isOK()) { LOG(WARNING) << "Illegal index column: " << col->GetExprString(); diff --git a/hybridse/src/plan/planner.cc b/hybridse/src/plan/planner.cc index fc350d1ffb6..94a48e73c9e 100644 --- a/hybridse/src/plan/planner.cc +++ b/hybridse/src/plan/planner.cc @@ -60,29 +60,56 @@ Planner::Planner(node::NodeManager *manager, const bool is_batch_mode, const boo } } -base::Status Planner::CreateQueryPlan(const node::QueryNode *root, PlanNode **plan_tree) { +base::Status Planner::CreateQueryPlan(const node::QueryNode *root, node::QueryPlanNode **plan_tree) { CHECK_TRUE(nullptr != root, common::kPlanError, "can not create query plan node with null query node"); + + auto out = node_manager_->MakeNode(); + + if (!root->with_clauses_.empty()) { + auto with_list = node_manager_->MakeList(); + for (auto q : root->with_clauses_) { + node::QueryPlanNode *with = nullptr; + // CHECK_TRUE(q->query_->query_type_ == node::kQuerySelect, common::kPlanError, + // "only support select query as with clause entry"); + CHECK_STATUS(CreateQueryPlan(q->query_, &with)); + + auto with_entry = node_manager_->MakeNode(q->alias_, with); + + with_list->data_.push_back(with_entry); + } + out->with_clauses_ = absl::MakeSpan(with_list->data_); + } + + if (root->config_options_ != nullptr) { + out->config_options_ = root->config_options_; + } + switch (root->query_type_) { case node::kQuerySelect: { - node::QueryPlanNode* plan = nullptr; - CHECK_STATUS(CreateSelectQueryPlan(dynamic_cast(root), &plan)); - *plan_tree = plan; + node::PlanNode* query_input = nullptr; + CHECK_STATUS(CreateSelectQueryPlan(dynamic_cast(root), &query_input)); + out->AddChild(query_input); break; } - case node::kQueryUnion: - CHECK_STATUS(CreateUnionQueryPlan(dynamic_cast(root), plan_tree)); + case node::kQuerySetOperation: { + node::SetOperationPlanNode* un = nullptr; + CHECK_STATUS(CreateSetOperationPlan(dynamic_cast(root), &un)); + out->AddChild(un); break; + } default: { FAIL_STATUS(common::kPlanError, "can not create query plan node with invalid query type " + node::QueryTypeName(root->query_type_)); } } + + *plan_tree = out; return base::Status::OK(); } // TODO(chenjing): refactor SELECT query logical plan // Deal with group by clause, order clause, having clause in physical plan instead of logical plan, since we need // schema context for column resolve. -base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, node::QueryPlanNode **plan_tree) { +base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, node::PlanNode **plan_tree) { const node::NodePointVector &table_ref_list = nullptr == root->GetTableRefList() ? std::vector() : root->GetTableRefList()->GetList(); std::vector relation_nodes; @@ -314,46 +341,24 @@ base::Status Planner::CreateSelectQueryPlan(const node::SelectQueryNode *root, n current_node = node_manager_->MakeLimitPlanNode(current_node, limit_ptr->GetLimitCount()); } - auto out = node_manager_->MakeNode(current_node); - - if (!root->with_clauses_.empty()) { - auto with_list = node_manager_->MakeList(); - for (auto q : root->with_clauses_) { - node::QueryPlanNode *with = nullptr; - CHECK_TRUE(q->query_->query_type_ == node::kQuerySelect, common::kPlanError, - "only support select query as with clause entry"); - CHECK_STATUS(CreateSelectQueryPlan(dynamic_cast(q->query_), &with)); - - auto with_entry = node_manager_->MakeNode(q->alias_, with); - - with_list->data_.push_back(with_entry); - } - out->with_clauses_ = absl::MakeSpan(with_list->data_); - } - - if (root->config_options_ != nullptr) { - out->config_options_ = root->config_options_; - } - *plan_tree = out; + *plan_tree = current_node; return base::Status::OK(); } -base::Status Planner::CreateUnionQueryPlan(const node::UnionQueryNode *root, PlanNode **plan_tree) { +base::Status Planner::CreateSetOperationPlan(const node::SetOperationNode *root, node::SetOperationPlanNode **plan_tree) { CHECK_TRUE(nullptr != root, common::kPlanError, "can not create query plan node with null query node") - node::PlanNode *left_plan = nullptr; - node::PlanNode *right_plan = nullptr; - CHECK_STATUS(CreateQueryPlan(root->left_, &left_plan), common::kPlanError, - "can not create union query plan left query") - CHECK_STATUS(CreateQueryPlan(root->right_, &right_plan), common::kPlanError, - "can not create union query plan right query") - auto* res = node_manager_->MakeNode(left_plan, right_plan, root->is_all_); - if (root->config_options_ != nullptr) { - res->config_options_ = root->config_options_; + auto list = node_manager_->MakeList(); + for (auto n : root->inputs()) { + node::QueryPlanNode* query = nullptr; + CHECK_STATUS(CreateQueryPlan(n, &query)); + list->data_.push_back(query); } - *plan_tree = res; + auto span = absl::MakeSpan(list->data_); + *plan_tree = node_manager_->MakeNode(root->op_type(), span, root->distinct()); return base::Status::OK(); } + base::Status Planner::CheckWindowFrame(const node::WindowDefNode *w_ptr) { CHECK_TRUE(nullptr != w_ptr->GetFrame(), common::kPlanError, "fail to create project list node: frame can't be unbound ") @@ -430,7 +435,7 @@ base::Status Planner::CreateCreateFunctionPlanNode(const node::CreateFunctionNod base::Status Planner::CreateSelectIntoPlanNode(const node::SelectIntoNode *root, node::PlanNode **output) { CHECK_TRUE(nullptr != root, common::kPlanError, "fail to create select into plan with null node"); - PlanNode *query = nullptr; + node::QueryPlanNode *query = nullptr; CHECK_STATUS(CreateQueryPlan(root->Query(), &query)) *output = node_manager_->MakeSelectIntoPlanNode(query, root->QueryStr(), root->OutFile(), root->Options(), root->ConfigOptions()); @@ -524,6 +529,7 @@ base::Status Planner::ValidateOnlineServingOp(node::PlanNode *node) { case node::kPlanTypeWindow: case node::kPlanTypeQuery: case node::kPlanTypeFilter: + case node::kPlanTypeSetOperation: case node::kPlanTypeJoin: { break; } @@ -618,7 +624,7 @@ base::Status Planner::PrepareRequestTable(node::PlanNode *node, std::vectortype_) { case node::kPlanTypeJoin: - case node::kPlanTypeUnion: { + case node::kPlanTypeSetOperation: { auto binary_op = dynamic_cast(node); CHECK_TRUE(nullptr != binary_op->GetLeft(), common::kPlanError, "Left child of ", node->GetTypeName(), " is null") @@ -663,7 +669,7 @@ base::Status SimplePlanner::CreatePlanTree(const NodePointVector &parser_trees, for (auto parser_tree : parser_trees) { switch (parser_tree->GetType()) { case node::kQuery: { - PlanNode *query_plan = nullptr; + node::QueryPlanNode *query_plan = nullptr; CHECK_STATUS(CreateQueryPlan(dynamic_cast(parser_tree), &query_plan)); if (!is_batch_mode_) { @@ -917,7 +923,9 @@ base::Status Planner::CreateTableReferencePlanNode(const node::TableRefNode *roo } case node::kRefQuery: { const node::QueryRefNode *sub_query_node = dynamic_cast(root); - CHECK_STATUS(CreateQueryPlan(sub_query_node->query_, &plan_node)) + node::QueryPlanNode* query = nullptr; + CHECK_STATUS(CreateQueryPlan(sub_query_node->query_, &query)) + plan_node = query; if (!sub_query_node->alias_table_name_.empty()) { *output = node_manager_->MakeRenamePlanNode(plan_node, sub_query_node->alias_table_name_); } else { diff --git a/hybridse/src/plan/planner.h b/hybridse/src/plan/planner.h index b484b3c594c..731663ab246 100644 --- a/hybridse/src/plan/planner.h +++ b/hybridse/src/plan/planner.h @@ -80,9 +80,9 @@ class Planner { // currently only apply to rows window bool ExpandCurrentHistoryWindow(std::vector *windows); base::Status CheckWindowFrame(const node::WindowDefNode *w_ptr); - base::Status CreateQueryPlan(const node::QueryNode *root, PlanNode **plan_tree); - base::Status CreateSelectQueryPlan(const node::SelectQueryNode *root, node::QueryPlanNode **plan_tree); - base::Status CreateUnionQueryPlan(const node::UnionQueryNode *root, PlanNode **plan_tree); + base::Status CreateQueryPlan(const node::QueryNode *root, node::QueryPlanNode **plan_tree); + base::Status CreateSelectQueryPlan(const node::SelectQueryNode *root, node::PlanNode **plan_tree); + base::Status CreateSetOperationPlan(const node::SetOperationNode *root, node::SetOperationPlanNode **plan_tree); base::Status CreateCreateTablePlan(const node::SqlNode *root, node::PlanNode **output); base::Status CreateTableReferencePlanNode(const node::TableRefNode *root, node::PlanNode **output); base::Status CreateCmdPlan(const SqlNode *root, node::PlanNode **output); diff --git a/hybridse/src/planv2/ast_node_converter.cc b/hybridse/src/planv2/ast_node_converter.cc index 44298d67d82..907e6983ed4 100644 --- a/hybridse/src/planv2/ast_node_converter.cc +++ b/hybridse/src/planv2/ast_node_converter.cc @@ -21,8 +21,11 @@ #include "absl/base/attributes.h" #include "absl/container/flat_hash_map.h" +#include "absl/strings/ascii.h" #include "absl/strings/match.h" +#include "absl/types/span.h" #include "base/fe_status.h" +#include "udf/udf.h" #include "zetasql/parser/ast_node_kind.h" namespace hybridse { @@ -52,6 +55,8 @@ static base::Status convertAlterAction(const zetasql::ASTAlterAction* action, no node::AlterActionBase** out); static base::Status ConvertAlterTableStmt(const zetasql::ASTAlterTableStatement* stmt, node::NodeManager* nm, node::SqlNode** out); +static base::Status ConvertSetOperation(const zetasql::ASTSetOperation* stmt, node::NodeManager* nm, + node::SetOperationNode** out); /// Used to convert zetasql ASTExpression Node into our ExprNode base::Status ConvertExprNode(const zetasql::ASTExpression* ast_expression, node::NodeManager* node_manager, @@ -339,7 +344,7 @@ base::Status ConvertExprNode(const zetasql::ASTExpression* ast_expression, node: "Un-support Modifiers for function call") std::string function_name = ""; CHECK_STATUS(AstPathExpressionToString(function_call->function(), &function_name)) - boost::to_lower(function_name); + absl::AsciiStrToLower(&function_name); // Convert function call TYPE(value) to cast expression CAST(value as TYPE) node::DataType data_type; base::Status status = node::StringToDataType(function_name, &data_type); @@ -1319,35 +1324,19 @@ base::Status ConvertQueryExpr(const zetasql::ASTQueryExpression* query_expressio return base::Status::OK(); } case zetasql::AST_SET_OPERATION: { - const auto set_op = query_expression->GetAsOrNull(); - CHECK_TRUE(set_op != nullptr, common::kSqlAstError, "not an ASTSetOperation"); - switch (set_op->op_type()) { - case zetasql::ASTSetOperation::OperationType::UNION: { - CHECK_TRUE(set_op->inputs().size() >= 2, common::kSqlAstError, - "Union Set Operation have inputs size less than 2"); - bool is_distinct = set_op->distinct(); - node::QueryNode* left = nullptr; - CHECK_STATUS(ConvertQueryExpr(set_op->inputs().at(0), node_manager, &left)); - - for (size_t i = 1; i < set_op->inputs().size(); ++i) { - auto input = set_op->inputs().at(i); - node::QueryNode* expr_node = nullptr; - // TODO(aceforeverd): support set operation - CHECK_STATUS(ConvertQueryExpr(input, node_manager, &expr_node)); - left = node_manager->MakeUnionQueryNode(left, expr_node, !is_distinct); - } - - *output = left; - return base::Status::OK(); - } - default: { - return base::Status(common::kSqlAstError, - absl::StrCat("Un-support set operation: ", set_op->GetSQLForOperation())); - } - } + node::SetOperationNode* set = nullptr; + CHECK_STATUS( + ConvertGuard(query_expression, node_manager, &set, ConvertSetOperation)); + *output = set; + return base::Status::OK(); + } + case zetasql::AST_QUERY: { + node::QueryNode* query = nullptr; + CHECK_STATUS(ConvertGuard(query_expression, node_manager, &query, ConvertQueryNode)); + *output = query; + return base::Status::OK(); } default: { - // NOTE: code basically won't reach here unless inner error return base::Status(common::kSqlAstError, absl::StrCat("can not create query plan node with invalid query type ", query_expression->GetNodeKindString())); @@ -1764,7 +1753,7 @@ base::Status ConvertTableOption(const zetasql::ASTOptionsEntry* entry, node::Nod } else if (absl::EqualsIgnoreCase("storage_mode", identifier_v)) { std::string storage_mode; CHECK_STATUS(AstStringLiteralToString(entry->value(), &storage_mode)); - boost::to_lower(storage_mode); + absl::AsciiStrToLower(&storage_mode); *output = node_manager->MakeStorageModeNode(node::NameToStorageMode(storage_mode)); } else { return base::Status(common::kSqlAstError, absl::StrCat("invalid option ", identifier)); @@ -2120,7 +2109,7 @@ base::Status ConvertAstOptionsListToMap(const zetasql::ASTOptionsList* options, for (auto entry : options->options_entries()) { std::string key = entry->name()->GetAsString(); if (to_lower) { - boost::to_lower(key); + absl::AsciiStrToLower(&key); } auto entry_value = entry->value(); node::ExprNode* value = nullptr; @@ -2371,5 +2360,31 @@ base::Status ConvertAlterTableStmt(const zetasql::ASTAlterTableStatement* ast_no return base::Status::OK(); } +base::Status ConvertSetOperation(const zetasql::ASTSetOperation* set_op, node::NodeManager* node_manager, + node::SetOperationNode** out) { + switch (set_op->op_type()) { + case zetasql::ASTSetOperation::OperationType::UNION: { + CHECK_TRUE(set_op->inputs().size() >= 2, common::kSqlAstError, + "Union Set Operation have inputs size less than 2"); + + auto list = node_manager->MakeList(); + for (auto n : set_op->inputs()) { + node::QueryNode* expr_node = nullptr; + CHECK_STATUS(ConvertQueryExpr(n, node_manager, &expr_node)); + list->data_.push_back(expr_node); + } + + auto span = absl::MakeSpan(list->data_); + *out = node_manager->MakeNode(node::SetOperationType::UNION, span, + set_op->distinct()); + return base::Status::OK(); + } + default: { + return base::Status(common::kSqlAstError, + absl::StrCat("Un-support set operation: ", set_op->GetSQLForOperation())); + } + } +} + } // namespace plan } // namespace hybridse diff --git a/hybridse/src/planv2/ast_node_converter.h b/hybridse/src/planv2/ast_node_converter.h index 5893439cfe0..e85c6cf8487 100644 --- a/hybridse/src/planv2/ast_node_converter.h +++ b/hybridse/src/planv2/ast_node_converter.h @@ -20,7 +20,6 @@ #include #include "node/node_manager.h" -#include "udf/udf.h" #include "zetasql/parser/parser.h" namespace hybridse { diff --git a/hybridse/src/planv2/ast_node_converter_test.cc b/hybridse/src/planv2/ast_node_converter_test.cc index 20daab24019..51447011f78 100644 --- a/hybridse/src/planv2/ast_node_converter_test.cc +++ b/hybridse/src/planv2/ast_node_converter_test.cc @@ -1204,6 +1204,8 @@ INSTANTIATE_TEST_SUITE_P(ASTHavingStatementTest, ASTNodeConverterTest, testing::ValuesIn(sqlcase::InitCases("cases/plan/having_query.yaml", FILTERS))); INSTANTIATE_TEST_SUITE_P(ASTHWindowQueryTest, ASTNodeConverterTest, testing::ValuesIn(sqlcase::InitCases("cases/plan/window_query.yaml", FILTERS))); +INSTANTIATE_TEST_SUITE_P(ASTUnionQueryTest, ASTNodeConverterTest, + testing::ValuesIn(sqlcase::InitCases("cases/plan/union_query.yaml", FILTERS))); } // namespace plan } // namespace hybridse diff --git a/hybridse/src/planv2/planner_v2_test.cc b/hybridse/src/planv2/planner_v2_test.cc index 4eb07bbb611..338861f9514 100644 --- a/hybridse/src/planv2/planner_v2_test.cc +++ b/hybridse/src/planv2/planner_v2_test.cc @@ -72,8 +72,8 @@ INSTANTIATE_TEST_SUITE_P(SqlOrderParse, PlannerV2Test, INSTANTIATE_TEST_SUITE_P(SqlJoinParse, PlannerV2Test, testing::ValuesIn(sqlcase::InitCases("cases/plan/join_query.yaml", FILTERS))); -// INSTANTIATE_TEST_SUITE_P(SqlUnionParse, PlannerV2Test, -// testing::ValuesIn(sqlcase::InitCases("cases/plan/union_query.yaml", FILTERS))); +INSTANTIATE_TEST_SUITE_P(SqlUnionParse, PlannerV2Test, + testing::ValuesIn(sqlcase::InitCases("cases/plan/union_query.yaml", FILTERS))); INSTANTIATE_TEST_SUITE_P(SqlSubQueryParse, PlannerV2Test, testing::ValuesIn(sqlcase::InitCases("cases/plan/sub_query.yaml", FILTERS))); @@ -99,9 +99,15 @@ TEST_P(PlannerV2Test, PlannerSucessTest) { node::PlanNodeList plan_trees; EXPECT_EQ(param.expect().success_, PlanAPI::CreatePlanTreeFromScript(sqlstr, plan_trees, manager_, status)) << status; - if (!param.expect().plan_tree_str_.empty()) { - // HACK: weak implementation, but usually it works - EXPECT_EQ(param.expect().plan_tree_str_, plan_trees.at(0)->GetTreeString()); + if (param.expect().success_) { + if (!param.expect().plan_tree_str_.empty()) { + // HACK: weak implementation, but usually it works + EXPECT_EQ(param.expect().plan_tree_str_, plan_trees.at(0)->GetTreeString()); + } + } else { + if (!param.expect().msg_.empty()) { + EXPECT_EQ(absl::StripAsciiWhitespace(param.expect().msg_), status.msg); + } } } TEST_P(PlannerV2Test, PlannerClusterOnlineServingOptTest) { diff --git a/hybridse/src/vm/catalog_wrapper.h b/hybridse/src/vm/catalog_wrapper.h index bfd1265aa82..b8d0ffecf21 100644 --- a/hybridse/src/vm/catalog_wrapper.h +++ b/hybridse/src/vm/catalog_wrapper.h @@ -21,6 +21,9 @@ #include #include #include +#include +#include +#include #include "absl/base/attributes.h" #include "codec/row_iterator.h" @@ -30,6 +33,9 @@ namespace hybridse { namespace vm { +static constexpr uint64_t INVALID_KEY = 0; +static const Row INVALID_ROW = Row(); + class IteratorProjectWrapper : public RowIterator { public: IteratorProjectWrapper(std::unique_ptr&& iter, const Row& parameter, const ProjectFun* fun) @@ -962,6 +968,275 @@ class ConcatPartitionHandler final : public PartitionHandler { size_t right_slices_; }; +class UnionIterator final : public codec::RowIterator { + public: + UnionIterator(absl::Span> inputs, bool distinct) : distinct_(distinct) { + size_t i = 0; + for (auto& n : inputs) { + if (n) { + n->SeekToFirst(); + if (n->Valid()) { + keys_.emplace(n->GetKey(), i++); + inputs_.push_back(std::move(n)); + } + } + } + } + ~UnionIterator() override {} + + bool Valid() const override { return !keys_.empty(); } + void Next() override { + while (!keys_.empty()) { + auto top = keys_.top(); + keys_.pop(); + inputs_.at(top.second)->Next(); + + if (inputs_.at(top.second)->Valid()) { + keys_.emplace(inputs_.at(top.second)->GetKey(), top.second); + break; + } + } + } + const uint64_t& GetKey() const override { + if (Valid()) { + auto& top = keys_.top(); + return inputs_.at(top.second)->GetKey(); + } + + return INVALID_KEY; + } + const Row& GetValue() override { + if (Valid()) { + auto& top = keys_.top(); + return inputs_.at(top.second)->GetValue(); + } + + return INVALID_ROW; + } + + bool IsSeekable() const override { return true; }; + + void Seek(const uint64_t& key) override { + for (auto& n : inputs_) { + n->Seek(key); + } + rebuild_keys(); + } + + void SeekToFirst() override { + for (auto& n : inputs_) { + n->SeekToFirst(); + } + rebuild_keys(); + } + + private: + using E = + std::pair().GetKey())>>, + decltype(std::vector().size())>; + struct PairLess { + constexpr bool operator() (const E& lhs, const E& rhs) const { + // larger key(larger index value if key equals) at top + // top key is the last/latest + if (lhs.first == rhs.first) { + return lhs.second < rhs.second; + } + return lhs.first < rhs.first; + } + }; + using MaxHeap = std::priority_queue, PairLess>; + + void rebuild_keys() { + keys_ = {}; + for (size_t i = 0; i < inputs_.size(); ++i) { + if (inputs_[i]->Valid()) { + keys_.emplace(inputs_[i]->GetKey(), i); + } + } + } + + std::vector> inputs_; + bool distinct_ = false; // NOLINT + + MaxHeap keys_; +}; + +class SetOperationHandler final : public TableHandler { + public: + SetOperationHandler(node::SetOperationType type, absl::Span const> inputs, + bool distinct) + : op_type_(type), inputs_(inputs.begin(), inputs.end()), distinct_(distinct) {} + SetOperationHandler(node::SetOperationType type, absl::Span const> inputs, + bool distinct) + : op_type_(type), inputs_(inputs.begin(), inputs.end()), distinct_(distinct) {} + ~SetOperationHandler() override {} + + RowIterator* GetRawIterator() override { + switch (op_type_) { + case node::SetOperationType::UNION: { + std::vector > iters; + for (auto tb : inputs_) { + iters.emplace_back(tb->GetIterator()); + } + return new UnionIterator(absl::MakeSpan(iters), distinct_); + } + default: + return nullptr; + } + } + + // unimplemented + const Types& GetTypes() override { return inputs_[0]->GetTypes(); } + const IndexHint& GetIndex() override { return inputs_[0]->GetIndex(); } + const Schema* GetSchema() override { return inputs_[0]->GetSchema(); } + const std::string& GetName() override { return inputs_[0]->GetName(); } + const std::string& GetDatabase() override { return inputs_[0]->GetDatabase(); } + + protected: + node::SetOperationType op_type_; + std::vector> inputs_; + bool distinct_ = false; +}; + +class SetOperationWindowIterator final : public codec::WindowIterator { + // NOTE: iterator ordering may out-of-order, same keys from different input iterator may output in two iteration. + // Because the input iterator may out-of-order itself. + public: + SetOperationWindowIterator(absl::Span> inputs, bool distinct) + : distinct_(distinct) { + size_t i = 0; + for (auto& n : inputs) { + if (n) { + n->SeekToFirst(); + if (n->Valid()) { + keys_[n->GetKey()].push_back(i++); + inputs_.push_back(std::move(n)); + } + } + } + } + ~SetOperationWindowIterator() override {} + + bool Valid() override { + return !keys_.empty(); + } + + RowIterator* GetRawValue() override { + std::vector> iters; + if (Valid()) { + auto& idxs = keys_.begin()->second; + for (auto i : idxs) { + iters.push_back(inputs_.at(i)->GetValue()); + } + } + + return new UnionIterator(absl::MakeSpan(iters), distinct_); + } + + void Seek(const std::string& key) override { + for (auto& i : inputs_) { + i->Seek(key); + } + rebuild_keys(); + } + + void SeekToFirst() override { + for (auto& i : inputs_) { + i->SeekToFirst(); + } + rebuild_keys(); + } + + void Next() override { + auto idxs = keys_.begin()->second; + keys_.erase(keys_.begin()); + for (auto i : idxs) { + inputs_.at(i)->Next(); + if (inputs_.at(i)->Valid()) { + keys_[inputs_.at(i)->GetKey()].push_back(i); + } + } + } + + const codec::Row GetKey() override { + if (Valid()) { + return keys_.begin()->first; + } + + return INVALID_ROW; + } + + private: + void rebuild_keys() { + keys_.clear(); + for (size_t i = 0; i Valid()) { + keys_[inputs_[i]->GetKey()].push_back(i); + } + } + } + std::vector> inputs_; + + // smaller key comes first + std::map, std::less> keys_; + bool distinct_; +}; + +class SetOperationPartitionHandler final : public PartitionHandler { + public: + SetOperationPartitionHandler(node::SetOperationType type, + absl::Span const> inputs, bool distinct) + : op_type_(type), inputs_(inputs.begin(), inputs.end()), distinct_(distinct) {} + ~SetOperationPartitionHandler() override {} + + RowIterator* GetRawIterator() override { + switch (op_type_) { + case node::SetOperationType::UNION: { + std::vector> iters; + for (auto tb : inputs_) { + iters.emplace_back(tb->GetIterator()); + } + return new UnionIterator(absl::MakeSpan(iters), distinct_); + } + default: + return nullptr; + } + } + + std::shared_ptr GetSegment(const std::string& key) override { + std::vector> segs; + for (auto n : inputs_) { + segs.push_back(n->GetSegment(key)); + } + + return std::shared_ptr(new SetOperationHandler(op_type_, segs, distinct_)); + } + + std::unique_ptr GetWindowIterator() override { + // NOTE: window iterator may out-of-order, use 'GetSegment' if ordering is mandatory + if (op_type_ != node::SetOperationType::UNION) { + return {}; + } + + std::vector> iters; + for (auto n : inputs_) { + iters.push_back(n->GetWindowIterator()); + } + + return std::unique_ptr(new SetOperationWindowIterator(absl::MakeSpan(iters), distinct_)); + } + + const Types& GetTypes() override { return inputs_[0]->GetTypes(); } + const IndexHint& GetIndex() override { return inputs_[0]->GetIndex(); } + const Schema* GetSchema() override { return inputs_[0]->GetSchema(); } + const std::string& GetName() override { return inputs_[0]->GetName(); } + const std::string& GetDatabase() override { return inputs_[0]->GetDatabase(); } + + private: + node::SetOperationType op_type_; + std::vector> inputs_; + bool distinct_ = false; +}; } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/internal/node_helper.cc b/hybridse/src/vm/internal/node_helper.cc index 46b3e0dfa8f..02b3ef24ca4 100644 --- a/hybridse/src/vm/internal/node_helper.cc +++ b/hybridse/src/vm/internal/node_helper.cc @@ -52,8 +52,10 @@ absl::StatusOr ExtractRequestNode(PhysicalOpNode* in) { // generally it is of type Partition, but can be Table as well e.g window (t1 instance_not_in_window) return nullptr; } + case vm::kPhysicalOpSetOperation: { + return ExtractRequestNode(in->GetProducer(0)); + } case vm::kPhysicalOpJoin: - case vm::kPhysicalOpUnion: case vm::kPhysicalOpPostRequestUnion: case vm::kPhysicalOpRequestUnion: case vm::kPhysicalOpRequestAggUnion: diff --git a/hybridse/src/vm/mem_catalog.cc b/hybridse/src/vm/mem_catalog.cc index f4f5897f10f..7244dda61af 100644 --- a/hybridse/src/vm/mem_catalog.cc +++ b/hybridse/src/vm/mem_catalog.cc @@ -20,6 +20,23 @@ namespace hybridse { namespace vm { + +std::shared_ptr TableHandler::Cast(std::shared_ptr in) { + switch (in->GetHandlerType()) { + case kRowHandler: { + auto left_table = std::shared_ptr(new MemTableHandler()); + left_table->AddRow(std::dynamic_pointer_cast(in)->GetValue()); + return left_table; + } + default: + return std::dynamic_pointer_cast(in); + } + return nullptr; +} +std::shared_ptr PartitionHandler::Cast(std::shared_ptr in) { + return std::dynamic_pointer_cast(in); +} + MemTimeTableIterator::MemTimeTableIterator(const MemTimeTable* table, const vm::Schema* schema) : table_(table), diff --git a/hybridse/src/vm/physical_op.cc b/hybridse/src/vm/physical_op.cc index aea329e65d5..c15dca61908 100644 --- a/hybridse/src/vm/physical_op.cc +++ b/hybridse/src/vm/physical_op.cc @@ -44,7 +44,7 @@ static absl::flat_hash_map CreatePhysicalOpTy {kPhysicalOpDistinct, "DISTINCT"}, {kPhysicalOpWindow, "WINDOW"}, {kPhysicalOpJoin, "JOIN"}, - {kPhysicalOpUnion, "UNION"}, + {kPhysicalOpSetOperation, "SET_OPERATION"}, {kPhysicalOpPostRequestUnion, "POST_REQUEST_UNION"}, {kPhysicalOpRequestUnion, "REQUEST_UNION"}, {kPhysicalOpRequestAggUnion, "REQUEST_AGG_UNION"}, @@ -102,23 +102,23 @@ void PhysicalOpNode::Print(std::ostream& output, const std::string& tab) const { output << tab << PhysicalOpTypeName(type_); } -bool PhysicalOpNode::IsSameSchema(const vm::Schema& schema, const vm::Schema& exp_schema) const { - if (schema.size() != exp_schema.size()) { +bool PhysicalOpNode::IsSameSchema(const codec::Schema* schema, const codec::Schema* exp_schema) { + if (schema->size() != exp_schema->size()) { LOG(WARNING) << "Schemas size aren't consistent: " - << "expect size " << exp_schema.size() << ", real size " << schema.size(); + << "expect size " << exp_schema->size() << ", real size " << schema->size(); return false; } - for (int i = 0; i < schema.size(); i++) { - if (schema.Get(i).name() != exp_schema.Get(i).name()) { + for (int i = 0; i < schema->size(); i++) { + if (schema->Get(i).name() != exp_schema->Get(i).name()) { LOG(WARNING) << "Schemas aren't consistent:\n" - << exp_schema.Get(i).DebugString() << "vs:\n" - << schema.Get(i).DebugString(); + << exp_schema->Get(i).DebugString() << "vs:\n" + << schema->Get(i).DebugString(); return false; } - if (schema.Get(i).type() != exp_schema.Get(i).type()) { + if (schema->Get(i).type() != exp_schema->Get(i).type()) { LOG(WARNING) << "Schemas aren't consistent:\n" - << exp_schema.Get(i).DebugString() << "vs:\n" - << schema.Get(i).DebugString(); + << exp_schema->Get(i).DebugString() << "vs:\n" + << schema->Get(i).DebugString(); return false; } } @@ -138,7 +138,14 @@ base::Status PhysicalOpNode::SchemaStartWith(const vm::Schema& lhs, const vm::Sc void PhysicalOpNode::Print() const { this->Print(std::cout, " "); } -void PhysicalOpNode::PrintChildren(std::ostream& output, const std::string& tab) const {} +void PhysicalOpNode::PrintChildren(std::ostream& output, const std::string& tab) const { + for (size_t i = 0; i < producers_.size(); ++i) { + producers_[i]->Print(output, tab + INDENT); + if (i + i < producers_.size()) { + output << "\n"; + } + } +} void PhysicalOpNode::UpdateProducer(int i, PhysicalOpNode* producer) { producers_[i] = producer; } void PhysicalUnaryNode::PrintChildren(std::ostream& output, const std::string& tab) const { if (producers_.empty() || nullptr == producers_[0]) { @@ -524,7 +531,9 @@ PhysicalSimpleProjectNode* PhysicalSimpleProjectNode::CastFrom(PhysicalOpNode* n return dynamic_cast(node); } -PhysicalUnionNode* PhysicalUnionNode::CastFrom(PhysicalOpNode* node) { return dynamic_cast(node); } +PhysicalSetOperationNode* PhysicalSetOperationNode::CastFrom(PhysicalOpNode* node) { + return dynamic_cast(node); +} PhysicalPostRequestUnionNode* PhysicalPostRequestUnionNode::CastFrom(PhysicalOpNode* node) { return dynamic_cast(node); @@ -861,7 +870,7 @@ bool PhysicalWindowAggrerationNode::AddWindowUnion(PhysicalOpNode* node) { return false; } } else { - if (!IsSameSchema(*node->GetOutputSchema(), *producers_[0]->GetOutputSchema())) { + if (!IsSameSchema(node->GetOutputSchema(), producers_[0]->GetOutputSchema())) { LOG(WARNING) << "Union Table and window input schema aren't consistent"; return false; } @@ -1215,7 +1224,7 @@ std::string PhysicalOpNode::SchemaToString(const std::string& tab) const { std::vector PhysicalOpNode::GetDependents() const { return GetProducers(); } -Status PhysicalUnionNode::InitSchema(PhysicalPlanContext* ctx) { +Status PhysicalSetOperationNode::InitSchema(PhysicalPlanContext* ctx) { CHECK_TRUE(!producers_.empty(), common::kPlanError, "Empty union"); schemas_ctx_.Clear(); schemas_ctx_.SetDefaultDBName(ctx->db()); @@ -1223,16 +1232,16 @@ Status PhysicalUnionNode::InitSchema(PhysicalPlanContext* ctx) { return Status::OK(); } -Status PhysicalUnionNode::WithNewChildren(node::NodeManager* nm, const std::vector& children, - PhysicalOpNode** out) { - CHECK_TRUE(children.size() == 2, common::kPlanError); - *out = nm->RegisterNode(new PhysicalUnionNode(children[0], children[1], is_all_)); +Status PhysicalSetOperationNode::WithNewChildren(node::NodeManager* nm, const std::vector& children, + PhysicalOpNode** out) { + absl::Span sp = absl::MakeSpan(children); + *out = nm->RegisterNode(new PhysicalSetOperationNode(op_type_, sp, distinct_)); return Status::OK(); } -void PhysicalUnionNode::Print(std::ostream& output, const std::string& tab) const { +void PhysicalSetOperationNode::Print(std::ostream& output, const std::string& tab) const { PhysicalOpNode::Print(output, tab); - output << "\n"; + output << "(" << node::SetOperatorName(op_type_, distinct_) << ")\n"; PrintChildren(output, tab); } diff --git a/hybridse/src/vm/runner.cc b/hybridse/src/vm/runner.cc index eb284e6e945..e44ec7b7d4b 100644 --- a/hybridse/src/vm/runner.cc +++ b/hybridse/src/vm/runner.cc @@ -40,6 +40,57 @@ namespace vm { #define MAX_DEBUG_LINES_CNT 20 #define MAX_DEBUG_COLUMN_MAX 20 +static absl::flat_hash_map CreateRunnerTypeToNamesMap() { + absl::flat_hash_map map = { + {kRunnerData, "DATA"}, + {kRunnerRequest, "REQUEST"}, + {kRunnerGroup, "GROUP"}, + {kRunnerGroupAndSort, "GROUP_AND_SORT"}, + {kRunnerFilter, "FILTER"}, + {kRunnerConstProject, "CONST_PROJECT"}, + {kRunnerTableProject, "TABLE_PROJECT"}, + {kRunnerRowProject, "ROW_PROJECT"}, + {kRunnerSimpleProject, "SIMPLE_PROJECT"}, + {kRunnerSelectSlice, "SELECT_SLICE"}, + {kRunnerGroupAgg, "GROUP_AGG_PROJECT"}, + {kRunnerAgg, "AGG_PROJECT"}, + {kRunnerReduce, "REDUCE_PROJECT"}, + {kRunnerWindowAgg, "WINDOW_AGG_PROJECT"}, + {kRunnerRequestUnion, "REQUEST_UNION"}, + {kRunnerRequestAggUnion, "REQUEST_AGG_UNION"}, + {kRunnerPostRequestUnion, "POST_REQUEST_UNION"}, + {kRunnerIndexSeek, "INDEX_SEEK"}, + {kRunnerJoin, "JOIN"}, + {kRunnerConcat, "CONCAT"}, + {kRunnerRequestJoin, "REQUEST_JOIN"}, + {kRunnerLimit, "LIMIT"}, + {kRunnerRequestRunProxy, "REQUEST_RUN_PROXY"}, + {kRunnerBatchRequestRunProxy, "BATCH_REQUEST_RUN_PROXY"}, + {kRunnerOrder, "ORDRE"}, + {kRunnerSetOperation, "SET_OPERATION"}, + {kRunnerUnknow, "UNKOWN_RUNNER"}, + }; + for (auto kind = 0; kind < RunnerType::kRunnerUnknow; ++kind) { + DCHECK(map.find(static_cast(kind)) != map.end()) + << "name of " << kind << " not exist"; + } + return map; +} + +static const auto& GetRunnerTypeToNamesMap() { + static const auto &map = *new auto(CreateRunnerTypeToNamesMap()); + return map; +} + +std::string RunnerTypeName(RunnerType type) { + auto& map = GetRunnerTypeToNamesMap(); + auto it = map.find(type); + if (it != map.end()) { + return std::string(it->second); + } + return "kUnknow"; +} + bool Runner::GetColumnBool(const int8_t* buf, const RowView* row_view, int idx, type::Type type) { bool key = false; @@ -2557,5 +2608,28 @@ int32_t IteratorStatus::FindFirstIteratorWithMaximizeKey(const std::vector SetOperationRunner::Run(RunnerContext& ctx, + const std::vector>& inputs) { + bool opt = true; + for (auto& n : inputs) { + if (n->GetHandlerType() != kPartitionHandler) { + opt = false; + break; + } + } + if (opt) { + std::vector> in; + for (auto n : inputs) { + in.emplace_back(PartitionHandler::Cast(n)); + } + return std::shared_ptr(new SetOperationPartitionHandler(op_type_, in, distinct_)); + } + + std::vector> in; + for (auto n : inputs) { + in.emplace_back(TableHandler::Cast(n)); + } + return std::shared_ptr(new SetOperationHandler(op_type_, in, distinct_)); +} } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/runner.h b/hybridse/src/vm/runner.h index b40130db812..3a2ea0416d7 100644 --- a/hybridse/src/vm/runner.h +++ b/hybridse/src/vm/runner.h @@ -74,62 +74,11 @@ enum RunnerType { kRunnerRequestJoin, kRunnerBatchRequestRunProxy, kRunnerLimit, + kRunnerSetOperation, kRunnerUnknow, }; -inline const std::string RunnerTypeName(const RunnerType& type) { - switch (type) { - case kRunnerData: - return "DATA"; - case kRunnerRequest: - return "REQUEST"; - case kRunnerGroup: - return "GROUP"; - case kRunnerGroupAndSort: - return "GROUP_AND_SORT"; - case kRunnerFilter: - return "FILTER"; - case kRunnerConstProject: - return "CONST_PROJECT"; - case kRunnerTableProject: - return "TABLE_PROJECT"; - case kRunnerRowProject: - return "ROW_PROJECT"; - case kRunnerSimpleProject: - return "SIMPLE_PROJECT"; - case kRunnerSelectSlice: - return "SELECT_SLICE"; - case kRunnerGroupAgg: - return "GROUP_AGG_PROJECT"; - case kRunnerAgg: - return "AGG_PROJECT"; - case kRunnerReduce: - return "REDUCE_PROJECT"; - case kRunnerWindowAgg: - return "WINDOW_AGG_PROJECT"; - case kRunnerRequestUnion: - return "REQUEST_UNION"; - case kRunnerRequestAggUnion: - return "REQUEST_AGG_UNION"; - case kRunnerPostRequestUnion: - return "POST_REQUEST_UNION"; - case kRunnerIndexSeek: - return "INDEX_SEEK"; - case kRunnerJoin: - return "JOIN"; - case kRunnerConcat: - return "CONCAT"; - case kRunnerRequestJoin: - return "REQUEST_JOIN"; - case kRunnerLimit: - return "LIMIT"; - case kRunnerRequestRunProxy: - return "REQUEST_RUN_PROXY"; - case kRunnerBatchRequestRunProxy: - return "BATCH_REQUEST_RUN_PROXY"; - default: - return "UNKNOW"; - } -} + +std::string RunnerTypeName(RunnerType type); class Runner : public node::NodeBase { public: @@ -835,6 +784,22 @@ class ProxyRequestRunner : public Runner { Runner* index_input_; }; +class SetOperationRunner : public Runner { + public: + SetOperationRunner(const int32_t id, const SchemasContext* schema, node::SetOperationType type, bool distinct) + : Runner(id, kRunnerSetOperation, schema), op_type_(type), distinct_(distinct) { + is_lazy_ = true; + } + ~SetOperationRunner() {} + + std::shared_ptr Run(RunnerContext& ctx, // NOLINT + const std::vector>& inputs) override; // NOLINT + + private: + node::SetOperationType op_type_; + bool distinct_ = false; +}; + } // namespace vm } // namespace hybridse #endif // HYBRIDSE_SRC_VM_RUNNER_H_ diff --git a/hybridse/src/vm/runner_builder.cc b/hybridse/src/vm/runner_builder.cc index 5d595ba9785..5f9e7a58d04 100644 --- a/hybridse/src/vm/runner_builder.cc +++ b/hybridse/src/vm/runner_builder.cc @@ -521,6 +521,21 @@ ClusterTask RunnerBuilder::Build(PhysicalOpNode* node, Status& status) { CreateRunner(id_++, node->schemas_ctx(), union_op->request_ts()); return RegisterTask(node, BinaryInherit(left_task, right_task, runner, Key(), kRightBias)); } + case kPhysicalOpSetOperation: { + auto set = dynamic_cast(node); + auto set_runner = + CreateRunner(id_++, node->schemas_ctx(), set->op_type_, set->distinct_); + for (auto n : node->GetProducers()) { + auto task = Build(n, status); + if (!status.isOK()) { + LOG(WARNING) << status; + return fail; + } + set_runner->AddProducer(task.GetRoot()); + } + // TODO: cluster task with index info + return RegisterTask(node, ClusterTask(set_runner)); + } default: { status.code = common::kExecutionPlanError; status.msg = absl::StrCat("Non-support node ", PhysicalOpTypeName(node->GetOpType()), diff --git a/hybridse/src/vm/transform.cc b/hybridse/src/vm/transform.cc index dc67a30c9a8..505522f076c 100644 --- a/hybridse/src/vm/transform.cc +++ b/hybridse/src/vm/transform.cc @@ -181,6 +181,12 @@ Status BatchModeTransformer::TransformPlanOp(const node::PlanNode* node, Physica TransformWithClauseEntry(dynamic_cast(node), &op)); break; } + case ::hybridse::node::kPlanTypeSetOperation: { + PhysicalSetOperationNode* set = nullptr; + CHECK_STATUS(TransformSetOperation(dynamic_cast(node), &set)); + op = set; + break; + } default: { FAIL_STATUS(kPlanError, "Fail to transform physical plan: " @@ -330,6 +336,33 @@ Status BatchModeTransformer::InitFnInfo(PhysicalOpNode* node, return Status::OK(); } +Status BatchModeTransformer::TransformSetOperation(const node::SetOperationPlanNode* node, + PhysicalSetOperationNode** out) { + CHECK_TRUE(node != nullptr && out != nullptr, kPlanError, "Input node or output node is null"); + + CHECK_TRUE(!node->distinct(), common::kPhysicalPlanError, "un-implemented: UNION DISTINCT"); + + std::vector inputs; + const SchemasContext* expect_sc = nullptr; + for (auto n : node->inputs()) { + PhysicalOpNode* query_out = nullptr; + CHECK_STATUS(TransformQueryPlan(n, &query_out)); + + if (expect_sc == nullptr) { + expect_sc = query_out->schemas_ctx(); + } else { + CHECK_TRUE(PhysicalOpNode::IsSameSchema(expect_sc->GetOutputSchema(), query_out->GetOutputSchema()), + common::kPlanError, "union sources have different schema: ", expect_sc->ReadableString(), " vs ", + query_out->schemas_ctx()->ReadableString()); + } + inputs.push_back(query_out); + } + PhysicalSetOperationNode* set = nullptr; + CHECK_STATUS(CreateOp(&set, node->op_type(), inputs, node->distinct())); + *out = set; + return Status::OK(); +} + Status BatchModeTransformer::TransformLimitOp(const node::LimitPlanNode* node, PhysicalOpNode** output) { CHECK_TRUE(node != nullptr && output != nullptr, kPlanError, @@ -1667,6 +1700,10 @@ Status BatchModeTransformer::ValidatePartitionDataProvider(PhysicalOpNode* in) { for (auto& window_union : n->window_unions().window_unions_) { CHECK_STATUS(ValidateWindowIndexOptimization(window_union.second, window_union.first)); } + } else if (kPhysicalOpSetOperation == in->GetOpType()) { + for (auto n : in->GetProducers()) { + CHECK_STATUS(ValidatePartitionDataProvider(n)); + } } else { CHECK_TRUE(kPhysicalOpDataProvider == in->GetOpType() && kProviderTypeTable != dynamic_cast(in)->provider_type_, @@ -1927,10 +1964,6 @@ Status BatchModeTransformer::TransformPhysicalPlan(const ::hybridse::node::PlanN *output = nullptr; break; } - case ::hybridse::node::kPlanTypeUnion: { - FAIL_STATUS(kPlanError, "Non-support UNION OP"); - break; - } case node::kPlanTypeCreate: { CHECK_STATUS(TransformCreateTableOp(dynamic_cast(node), output), "Fail to transform create table op"); diff --git a/hybridse/src/vm/transform.h b/hybridse/src/vm/transform.h index 45c4d9660e7..acd86be587f 100644 --- a/hybridse/src/vm/transform.h +++ b/hybridse/src/vm/transform.h @@ -156,6 +156,8 @@ class BatchModeTransformer { protected: Status TransformPlanOp(const ::hybridse::node::PlanNode* node, ::hybridse::vm::PhysicalOpNode** ouput); + Status TransformSetOperation(const node::SetOperationPlanNode* node, PhysicalSetOperationNode** out); + virtual Status TransformLimitOp(const node::LimitPlanNode* node, PhysicalOpNode** output); virtual Status TransformProjectPlanOp(const node::ProjectPlanNode*, PhysicalOpNode**);