Skip to content

Commit

Permalink
fix(#3941): support window union multiple join in request mode (#3493)
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd authored Sep 25, 2023
1 parent c0234c6 commit 9190ecf
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 8 deletions.
68 changes: 68 additions & 0 deletions cases/query/window_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -833,3 +833,71 @@ cases:
200, 1, 1, 1
300, 0, 0, 0
400, 1, 0, 0
- id: 23
sql: |
select
gp_id,
count(gp_id) over w as cnt,
-- t2 matches and t3 not matches
count_where(gp_id, not is_null(lcond) and is_null(cond)) over w as feat1,
from (select id as gp_id, 0 as lcond, 0 as cond, cast(90000 as timestamp) as ts from request)
window w as (
union (select t1.gp_id, t2.cond as lcond, t3.cond as cond, t1.ts from
t1 last join t2 on t1.gp_id = t2.account
last join t3 on t1.cond = t3.cond)
partition by gp_id order by ts
rows between unbounded preceding and current row
exclude current_row instance_not_in_window
)
inputs:
- name: request
columns: ["id int"]
indexs: ['idx:id']
data: |
100
200
300
400
- name: t1
columns:
- gp_id int
- cond int
- ts timestamp
indexs:
- idx2:gp_id:ts
data: |
100, 201, 10000
100, 201, 10000
200, 203, 10000
400, 204, 10000
400, 205, 10000
- name: t2
columns:
- account int
- cond int
- ts timestamp
indexs: ["idx1:account:ts"]
data: |
100, 201, 1000
200, 203, 4000
400, 209, 4000
- name: t3
columns:
- cond int
- ts timestamp
indexs: ["idx3:cond:ts"]
data: |
201, 1000
208, 1000
expect:
columns:
- gp_id int
- cnt int64
- feat1 int64
order: gp_id
data: |
100, 2, 0
200, 1, 1
300, 0, 0
400, 2, 2
7 changes: 6 additions & 1 deletion hybridse/include/vm/schemas_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class SchemaSource {
size_t size() const;
void Clear();

std::string ToString() const;
std::string DebugString() const;
friend std::ostream& operator<<(std::ostream& os, const SchemaSource& sc) { return os << sc.DebugString(); }

private:
bool CheckSourceSetIndex(size_t idx) const;
Expand Down Expand Up @@ -246,6 +247,10 @@ class SchemasContext {
void BuildTrivial(const std::vector<const codec::Schema*>& schemas);
void BuildTrivial(const std::string& default_db, const std::vector<const type::TableDef*>& tables);

std::string DebugString() const;

friend std::ostream& operator<<(std::ostream& os, const SchemasContext& sc) { return os << sc.DebugString(); }

private:
bool IsColumnAmbiguous(const std::string& column_name) const;

Expand Down
13 changes: 9 additions & 4 deletions hybridse/src/passes/physical/group_and_sort_optimized.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,8 @@ bool GroupAndSortOptimized::KeysOptimized(const SchemasContext* root_schemas_ctx
return false;
}
PhysicalSimpleProjectNode* new_simple_op = nullptr;
Status status = plan_ctx_->CreateOp<PhysicalSimpleProjectNode>(
&new_simple_op, new_depend, simple_project->project());
Status status =
plan_ctx_->CreateOp<PhysicalSimpleProjectNode>(&new_simple_op, new_depend, simple_project->project());
if (!status.isOK()) {
LOG(WARNING) << "Fail to create simple project op: " << status;
return false;
Expand Down Expand Up @@ -442,11 +442,16 @@ bool GroupAndSortOptimized::KeysOptimized(const SchemasContext* root_schemas_ctx
index_key, right_key, sort, &new_depend)) {
return false;
}
if (!ResetProducer(plan_ctx_, request_join, 0, new_depend)) {
PhysicalRequestJoinNode* new_join = nullptr;
auto s = plan_ctx_->CreateOp<PhysicalRequestJoinNode>(&new_join, new_depend, request_join->GetProducer(1),
request_join->join(),
request_join->output_right_only());
if (!s.isOK()) {
LOG(WARNING) << "Fail to create new request join op: " << s;
return false;
}

*new_in = request_join;
*new_in = new_join;
return true;
}
return false;
Expand Down
1 change: 1 addition & 0 deletions hybridse/src/vm/runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static bool IsPartitionProvider(vm::PhysicalOpNode* n) {
switch (n->GetOpType()) {
case kPhysicalOpSimpleProject:
case kPhysicalOpRename:
case kPhysicalOpRequestJoin:
return IsPartitionProvider(n->GetProducer(0));
case kPhysicalOpDataProvider:
return dynamic_cast<vm::PhysicalDataProviderNode*>(n)->provider_type_ == kProviderTypePartition;
Expand Down
40 changes: 37 additions & 3 deletions hybridse/src/vm/schemas_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/

#include "vm/schemas_context.h"

#include <set>

#include "absl/strings/str_join.h"
#include "passes/physical/physical_pass.h"
#include "vm/physical_op.h"

Expand Down Expand Up @@ -121,14 +124,18 @@ size_t SchemaSource::size() const {
return schema_ == nullptr ? 0 : schema_->size();
}

std::string SchemaSource::ToString() const {
// output: {db}.{table}[ {name}:{type}:{id}, ... ]
std::string SchemaSource::DebugString() const {
std::stringstream ss;
ss << source_db_ << "." << source_name_ << "[";
for (size_t i = 0; i < column_ids_.size(); ++i) {
ss << schema_->Get(i).name() << ":" << node::TypeName(schema_->Get(i).type()) << ":";
ss << "#" << std::to_string(column_ids_[i]);
if (i < column_ids_.size() - 1) {
ss << ", ";
}
}
ss << "]";
return ss.str();
}

Expand Down Expand Up @@ -173,7 +180,7 @@ void SchemasContext::Merge(size_t child_idx, const SchemasContext* child) {
db_name = source->GetSourceDB();
}
std::string rel_name = child->GetName();
if (rel_name.empty()&& !source->GetSourceName().empty()) {
if (rel_name.empty() && !source->GetSourceName().empty()) {
rel_name = source->GetSourceName();
}
new_source->SetSourceDBAndTableName(db_name, rel_name);
Expand Down Expand Up @@ -751,7 +758,34 @@ void SchemasContext::BuildTrivial(
this->Build();
}

RowParser::RowParser(const SchemasContext* schema_ctx) : schema_ctx_(schema_ctx) {
std::string SchemasContext::DebugString() const {
std::stringstream ss;
ss << absl::StrCat("{", root_db_name_, ",", root_relation_name_, ",", default_db_name_, ", ",
absl::StrJoin(schema_sources_, ",", [](std::string* out, const SchemaSource* source) {
absl::StrAppend(out, source->DebugString());
}));
ss << ", id_map={"
<< absl::StrJoin(column_id_map_, ",", [](std::string* out, decltype(column_id_map_)::const_reference e) {
absl::StrAppend(out, e.first, "=(", e.second.first, ",", e.second.second, ")");
}) << "}, ";
ss << "name_map={"
<< absl::StrJoin(column_name_map_, ",",
[](std::string* out, decltype(column_name_map_)::const_reference e) {
absl::StrAppend(
out, e.first, "=[",
absl::StrJoin(e.second, ",",
[](std::string* out, decltype(e.second)::const_reference ref) {
absl::StrAppend(out, "(", ref.first, ",", ref.second, ")");
}),
"]");
})
<< "}";
ss << "}";
return ss.str();
}

RowParser::RowParser(const SchemasContext* schema_ctx)
: schema_ctx_(schema_ctx) {
for (size_t i = 0; i < schema_ctx_->GetSchemaSourceSize(); ++i) {
auto source = schema_ctx_->GetSchemaSource(i);
row_view_list_.push_back(codec::RowView(*source->GetSchema()));
Expand Down

0 comments on commit 9190ecf

Please sign in to comment.