diff --git a/cases/query/window_query.yaml b/cases/query/window_query.yaml index 24ac38afe4f..84365be97f7 100644 --- a/cases/query/window_query.yaml +++ b/cases/query/window_query.yaml @@ -833,3 +833,71 @@ cases: 200, 1, 1, 1 300, 0, 0, 0 400, 1, 0, 0 + + - id: 23 + sql: | + select + gp_id, + count(gp_id) over w as cnt, + -- t2 matches and t3 not matches + count_where(gp_id, not is_null(lcond) and is_null(cond)) over w as feat1, + from (select id as gp_id, 0 as lcond, 0 as cond, cast(90000 as timestamp) as ts from request) + window w as ( + union (select t1.gp_id, t2.cond as lcond, t3.cond as cond, t1.ts from + t1 last join t2 on t1.gp_id = t2.account + last join t3 on t1.cond = t3.cond) + partition by gp_id order by ts + rows between unbounded preceding and current row + exclude current_row instance_not_in_window + ) + inputs: + - name: request + columns: ["id int"] + indexs: ['idx:id'] + data: | + 100 + 200 + 300 + 400 + - name: t1 + columns: + - gp_id int + - cond int + - ts timestamp + indexs: + - idx2:gp_id:ts + data: | + 100, 201, 10000 + 100, 201, 10000 + 200, 203, 10000 + 400, 204, 10000 + 400, 205, 10000 + - name: t2 + columns: + - account int + - cond int + - ts timestamp + indexs: ["idx1:account:ts"] + data: | + 100, 201, 1000 + 200, 203, 4000 + 400, 209, 4000 + - name: t3 + columns: + - cond int + - ts timestamp + indexs: ["idx3:cond:ts"] + data: | + 201, 1000 + 208, 1000 + expect: + columns: + - gp_id int + - cnt int64 + - feat1 int64 + order: gp_id + data: | + 100, 2, 0 + 200, 1, 1 + 300, 0, 0 + 400, 2, 2 diff --git a/hybridse/include/vm/schemas_context.h b/hybridse/include/vm/schemas_context.h index 43731f076cc..1541c64201d 100644 --- a/hybridse/include/vm/schemas_context.h +++ b/hybridse/include/vm/schemas_context.h @@ -58,7 +58,8 @@ class SchemaSource { size_t size() const; void Clear(); - std::string ToString() const; + std::string DebugString() const; + friend std::ostream& operator<<(std::ostream& os, const SchemaSource& sc) { return os << sc.DebugString(); } private: bool CheckSourceSetIndex(size_t idx) const; @@ -246,6 +247,10 @@ class SchemasContext { void BuildTrivial(const std::vector& schemas); void BuildTrivial(const std::string& default_db, const std::vector& tables); + std::string DebugString() const; + + friend std::ostream& operator<<(std::ostream& os, const SchemasContext& sc) { return os << sc.DebugString(); } + private: bool IsColumnAmbiguous(const std::string& column_name) const; diff --git a/hybridse/src/passes/physical/group_and_sort_optimized.cc b/hybridse/src/passes/physical/group_and_sort_optimized.cc index 2c10529fcc7..287919d9406 100644 --- a/hybridse/src/passes/physical/group_and_sort_optimized.cc +++ b/hybridse/src/passes/physical/group_and_sort_optimized.cc @@ -393,8 +393,8 @@ bool GroupAndSortOptimized::KeysOptimized(const SchemasContext* root_schemas_ctx return false; } PhysicalSimpleProjectNode* new_simple_op = nullptr; - Status status = plan_ctx_->CreateOp( - &new_simple_op, new_depend, simple_project->project()); + Status status = + plan_ctx_->CreateOp(&new_simple_op, new_depend, simple_project->project()); if (!status.isOK()) { LOG(WARNING) << "Fail to create simple project op: " << status; return false; @@ -442,11 +442,16 @@ bool GroupAndSortOptimized::KeysOptimized(const SchemasContext* root_schemas_ctx index_key, right_key, sort, &new_depend)) { return false; } - if (!ResetProducer(plan_ctx_, request_join, 0, new_depend)) { + PhysicalRequestJoinNode* new_join = nullptr; + auto s = plan_ctx_->CreateOp(&new_join, new_depend, request_join->GetProducer(1), + request_join->join(), + request_join->output_right_only()); + if (!s.isOK()) { + LOG(WARNING) << "Fail to create new request join op: " << s; return false; } - *new_in = request_join; + *new_in = new_join; return true; } return false; diff --git a/hybridse/src/vm/runner.cc b/hybridse/src/vm/runner.cc index a15a2626bf3..2072715c613 100644 --- a/hybridse/src/vm/runner.cc +++ b/hybridse/src/vm/runner.cc @@ -44,6 +44,7 @@ static bool IsPartitionProvider(vm::PhysicalOpNode* n) { switch (n->GetOpType()) { case kPhysicalOpSimpleProject: case kPhysicalOpRename: + case kPhysicalOpRequestJoin: return IsPartitionProvider(n->GetProducer(0)); case kPhysicalOpDataProvider: return dynamic_cast(n)->provider_type_ == kProviderTypePartition; diff --git a/hybridse/src/vm/schemas_context.cc b/hybridse/src/vm/schemas_context.cc index 9bff83940bb..214da20caa5 100644 --- a/hybridse/src/vm/schemas_context.cc +++ b/hybridse/src/vm/schemas_context.cc @@ -15,7 +15,10 @@ */ #include "vm/schemas_context.h" + #include + +#include "absl/strings/str_join.h" #include "passes/physical/physical_pass.h" #include "vm/physical_op.h" @@ -121,14 +124,18 @@ size_t SchemaSource::size() const { return schema_ == nullptr ? 0 : schema_->size(); } -std::string SchemaSource::ToString() const { +// output: {db}.{table}[ {name}:{type}:{id}, ... ] +std::string SchemaSource::DebugString() const { std::stringstream ss; + ss << source_db_ << "." << source_name_ << "["; for (size_t i = 0; i < column_ids_.size(); ++i) { + ss << schema_->Get(i).name() << ":" << node::TypeName(schema_->Get(i).type()) << ":"; ss << "#" << std::to_string(column_ids_[i]); if (i < column_ids_.size() - 1) { ss << ", "; } } + ss << "]"; return ss.str(); } @@ -173,7 +180,7 @@ void SchemasContext::Merge(size_t child_idx, const SchemasContext* child) { db_name = source->GetSourceDB(); } std::string rel_name = child->GetName(); - if (rel_name.empty()&& !source->GetSourceName().empty()) { + if (rel_name.empty() && !source->GetSourceName().empty()) { rel_name = source->GetSourceName(); } new_source->SetSourceDBAndTableName(db_name, rel_name); @@ -751,7 +758,34 @@ void SchemasContext::BuildTrivial( this->Build(); } -RowParser::RowParser(const SchemasContext* schema_ctx) : schema_ctx_(schema_ctx) { +std::string SchemasContext::DebugString() const { + std::stringstream ss; + ss << absl::StrCat("{", root_db_name_, ",", root_relation_name_, ",", default_db_name_, ", ", + absl::StrJoin(schema_sources_, ",", [](std::string* out, const SchemaSource* source) { + absl::StrAppend(out, source->DebugString()); + })); + ss << ", id_map={" + << absl::StrJoin(column_id_map_, ",", [](std::string* out, decltype(column_id_map_)::const_reference e) { + absl::StrAppend(out, e.first, "=(", e.second.first, ",", e.second.second, ")"); + }) << "}, "; + ss << "name_map={" + << absl::StrJoin(column_name_map_, ",", + [](std::string* out, decltype(column_name_map_)::const_reference e) { + absl::StrAppend( + out, e.first, "=[", + absl::StrJoin(e.second, ",", + [](std::string* out, decltype(e.second)::const_reference ref) { + absl::StrAppend(out, "(", ref.first, ",", ref.second, ")"); + }), + "]"); + }) + << "}"; + ss << "}"; + return ss.str(); +} + +RowParser::RowParser(const SchemasContext* schema_ctx) + : schema_ctx_(schema_ctx) { for (size_t i = 0; i < schema_ctx_->GetSchemaSourceSize(); ++i) { auto source = schema_ctx_->GetSchemaSource(i); row_view_list_.push_back(codec::RowView(*source->GetSchema()));