Skip to content

Commit

Permalink
feat(online): support last join (window)
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd committed Oct 26, 2023
1 parent 8b32e6d commit b39cb17
Show file tree
Hide file tree
Showing 30 changed files with 1,351 additions and 270 deletions.
406 changes: 406 additions & 0 deletions cases/query/last_join_subquery_window.yml

Large diffs are not rendered by default.

33 changes: 1 addition & 32 deletions hybridse/examples/toydb/src/tablet/tablet_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,6 @@ RowIterator* TabletTableHandler::GetRawIterator() {
return new storage::FullTableIterator(table_->GetSegments(),
table_->GetSegCnt(), table_);
}
const uint64_t TabletTableHandler::GetCount() {
auto iter = GetIterator();
uint64_t cnt = 0;
while (iter->Valid()) {
iter->Next();
cnt++;
}
return cnt;
}
Row TabletTableHandler::At(uint64_t pos) {
auto iter = GetIterator();
while (pos-- > 0 && iter->Valid()) {
iter->Next();
}
return iter->Valid() ? iter->GetValue() : Row();
}

TabletCatalog::TabletCatalog() : tables_(), db_() {}

Expand Down Expand Up @@ -249,22 +233,6 @@ std::unique_ptr<WindowIterator> TabletSegmentHandler::GetWindowIterator(
const std::string& idx_name) {
return std::unique_ptr<WindowIterator>();
}
const uint64_t TabletSegmentHandler::GetCount() {
auto iter = GetIterator();
uint64_t cnt = 0;
while (iter->Valid()) {
cnt++;
iter->Next();
}
return cnt;
}
Row TabletSegmentHandler::At(uint64_t pos) {
auto iter = GetIterator();
while (pos-- > 0 && iter->Valid()) {
iter->Next();
}
return iter->Valid() ? iter->GetValue() : Row();
}

const uint64_t TabletPartitionHandler::GetCount() {
auto iter = GetWindowIterator();
Expand All @@ -275,5 +243,6 @@ const uint64_t TabletPartitionHandler::GetCount() {
}
return cnt;
}

} // namespace tablet
} // namespace hybridse
5 changes: 1 addition & 4 deletions hybridse/examples/toydb/src/tablet/tablet_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ class TabletSegmentHandler : public TableHandler {
std::unique_ptr<vm::RowIterator> GetIterator() override;
RowIterator* GetRawIterator() override;
std::unique_ptr<codec::WindowIterator> GetWindowIterator(const std::string& idx_name) override;
const uint64_t GetCount() override;
Row At(uint64_t pos) override;
const std::string GetHandlerTypeName() override {
return "TabletSegmentHandler";
}
Expand Down Expand Up @@ -104,6 +102,7 @@ class TabletPartitionHandler
std::unique_ptr<codec::WindowIterator> GetWindowIterator() override {
return table_handler_->GetWindowIterator(index_name_);
}

const uint64_t GetCount() override;

std::shared_ptr<TableHandler> GetSegment(const std::string& key) override {
Expand Down Expand Up @@ -152,8 +151,6 @@ class TabletTableHandler
RowIterator* GetRawIterator() override;
std::unique_ptr<codec::WindowIterator> GetWindowIterator(
const std::string& idx_name);
virtual const uint64_t GetCount();
Row At(uint64_t pos) override;

virtual std::shared_ptr<PartitionHandler> GetPartition(
const std::string& index_name) {
Expand Down
21 changes: 8 additions & 13 deletions hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/

#include "testing/toydb_engine_test_base.h"

#include "absl/strings/str_join.h"
#include "gtest/gtest.h"
#include "gtest/internal/gtest-param-util.h"

using namespace llvm; // NOLINT (build/namespaces)
using namespace llvm::orc; // NOLINT (build/namespaces)
Expand Down Expand Up @@ -141,18 +142,12 @@ std::shared_ptr<tablet::TabletCatalog> BuildOnePkTableStorage(
}
return catalog;
}
void BatchRequestEngineCheckWithCommonColumnIndices(
const SqlCase& sql_case, const EngineOptions options,
const std::set<size_t>& common_column_indices) {
std::ostringstream oss;
for (size_t index : common_column_indices) {
oss << index << ",";
}
LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: "
"common_column_indices = ["
<< oss.str() << "]";
ToydbBatchRequestEngineTestRunner engine_test(sql_case, options,
common_column_indices);
// Run check with common column index info
void BatchRequestEngineCheckWithCommonColumnIndices(const SqlCase& sql_case, const EngineOptions options,
const std::set<size_t>& common_column_indices) {
LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: common_column_indices = ["
<< absl::StrJoin(common_column_indices, ",") << "]";
ToydbBatchRequestEngineTestRunner engine_test(sql_case, options, common_column_indices);
engine_test.RunCheck();
}

Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/codec/row.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Row {

inline int32_t size() const { return slice_.size(); }
inline int32_t size(int32_t pos) const {
return 0 == pos ? slice_.size() : slices_[pos - 1].size();
return 0 == pos ? slice_.size() : slices_.at(pos - 1).size();
}

// Return true if the length of the referenced data is zero
Expand Down
9 changes: 8 additions & 1 deletion hybridse/include/codec/row_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,14 @@ class WindowIterator {
virtual bool Valid() = 0;
/// Return the RowIterator of current segment
/// of dataset if Valid() return `true`.
virtual std::unique_ptr<RowIterator> GetValue() = 0;
virtual std::unique_ptr<RowIterator> GetValue() {
auto p = GetRawValue();
if (!p) {
return nullptr;

Check warning on line 77 in hybridse/include/codec/row_iterator.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/codec/row_iterator.h#L74-L77

Added lines #L74 - L77 were not covered by tests
}

return std::unique_ptr<RowIterator>(p);

Check warning on line 80 in hybridse/include/codec/row_iterator.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/codec/row_iterator.h#L80

Added line #L80 was not covered by tests
}
/// Return the RowIterator of current segment
/// of dataset if Valid() return `true`.
virtual RowIterator *GetRawValue() = 0;
Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/codec/row_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class ListV {
virtual const uint64_t GetCount() {
auto iter = GetIterator();
uint64_t cnt = 0;
while (iter->Valid()) {
while (iter && iter->Valid()) {
iter->Next();
cnt++;
}
Expand Down
1 change: 1 addition & 0 deletions hybridse/include/vm/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class TableHandler : public DataHandler {
virtual ~TableHandler() {}

/// Return table column Types information.
/// TODO: rm it, never used
virtual const Types& GetTypes() = 0;

/// Return the index information
Expand Down
9 changes: 4 additions & 5 deletions hybridse/include/vm/mem_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include <string>
#include <utility>
#include <vector>
#include "base/fe_slice.h"
#include "codec/list_iterator_codec.h"
#include "glog/logging.h"
#include "vm/catalog.h"

Expand Down Expand Up @@ -674,6 +672,7 @@ class MemPartitionHandler
IndexHint index_hint_;
OrderType order_type_;
};

class ConcatTableHandler : public MemTimeTableHandler {
public:
ConcatTableHandler(std::shared_ptr<TableHandler> left, size_t left_slices,
Expand All @@ -692,19 +691,19 @@ class ConcatTableHandler : public MemTimeTableHandler {
status_ = SyncValue();
return MemTimeTableHandler::At(pos);
}
std::unique_ptr<RowIterator> GetIterator() {
std::unique_ptr<RowIterator> GetIterator() override {
if (status_.isRunning()) {
status_ = SyncValue();
}
return MemTimeTableHandler::GetIterator();
}
RowIterator* GetRawIterator() {
RowIterator* GetRawIterator() override {
if (status_.isRunning()) {
status_ = SyncValue();
}
return MemTimeTableHandler::GetRawIterator();
}
virtual const uint64_t GetCount() {
const uint64_t GetCount() override {

Check warning on line 706 in hybridse/include/vm/mem_catalog.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/vm/mem_catalog.h#L706

Added line #L706 was not covered by tests
if (status_.isRunning()) {
status_ = SyncValue();
}
Expand Down
47 changes: 30 additions & 17 deletions hybridse/include/vm/physical_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,11 @@ class PhysicalAggregationNode : public PhysicalProjectNode {
public:
PhysicalAggregationNode(PhysicalOpNode *node, const ColumnProjects &project, const node::ExprNode *condition)
: PhysicalProjectNode(node, kAggregation, project, true), having_condition_(condition) {
output_type_ = kSchemaTypeRow;
if (node->GetOutputType() == kSchemaTypeGroup) {
output_type_ = kSchemaTypeGroup;
} else {
output_type_ = kSchemaTypeRow;
}
fn_infos_.push_back(&having_condition_.fn_info());
}
virtual ~PhysicalAggregationNode() {}
Expand Down Expand Up @@ -1065,7 +1069,7 @@ class RequestWindowUnionList {
RequestWindowUnionList() : window_unions_() {}
virtual ~RequestWindowUnionList() {}
void AddWindowUnion(PhysicalOpNode *node, const RequestWindowOp &window) {
window_unions_.push_back(std::make_pair(node, window));
window_unions_.emplace_back(node, window);
}
const PhysicalOpNode *GetKey(uint32_t index) {
auto iter = window_unions_.begin();
Expand Down Expand Up @@ -1415,7 +1419,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
instance_not_in_window_(false),
exclude_current_time_(false),
output_request_row_(true) {
output_type_ = kSchemaTypeTable;
InitOuptput();

Check warning on line 1422 in hybridse/include/vm/physical_op.h

View check run for this annotation

Codecov / codecov/patch

hybridse/include/vm/physical_op.h#L1422

Added line #L1422 was not covered by tests

fn_infos_.push_back(&window_.partition_.fn_info());
fn_infos_.push_back(&window_.index_key_.fn_info());
Expand All @@ -1427,7 +1431,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
instance_not_in_window_(w_ptr->instance_not_in_window()),
exclude_current_time_(w_ptr->exclude_current_time()),
output_request_row_(true) {
output_type_ = kSchemaTypeTable;
InitOuptput();

fn_infos_.push_back(&window_.partition_.fn_info());
fn_infos_.push_back(&window_.sort_.fn_info());
Expand All @@ -1443,7 +1447,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
instance_not_in_window_(instance_not_in_window),
exclude_current_time_(exclude_current_time),
output_request_row_(output_request_row) {
output_type_ = kSchemaTypeTable;
InitOuptput();

fn_infos_.push_back(&window_.partition_.fn_info());
fn_infos_.push_back(&window_.sort_.fn_info());
Expand All @@ -1455,7 +1459,8 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
virtual void Print(std::ostream &output, const std::string &tab) const;
const bool Valid() { return true; }
static PhysicalRequestUnionNode *CastFrom(PhysicalOpNode *node);
bool AddWindowUnion(PhysicalOpNode *node) {
bool AddWindowUnion(PhysicalOpNode *node) { return AddWindowUnion(node, window_); }
bool AddWindowUnion(PhysicalOpNode *node, const RequestWindowOp& window) {
if (nullptr == node) {
LOG(WARNING) << "Fail to add window union : table is null";
return false;
Expand All @@ -1472,9 +1477,8 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
<< "Union Table and window input schema aren't consistent";
return false;
}
window_unions_.AddWindowUnion(node, window_);
RequestWindowOp &window_union =
window_unions_.window_unions_.back().second;
window_unions_.AddWindowUnion(node, window);
RequestWindowOp &window_union = window_unions_.window_unions_.back().second;
fn_infos_.push_back(&window_union.partition_.fn_info());
fn_infos_.push_back(&window_union.sort_.fn_info());
fn_infos_.push_back(&window_union.range_.fn_info());
Expand All @@ -1484,11 +1488,10 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {

std::vector<PhysicalOpNode *> GetDependents() const override;

const bool instance_not_in_window() const {
return instance_not_in_window_;
}
const bool exclude_current_time() const { return exclude_current_time_; }
const bool output_request_row() const { return output_request_row_; }
bool instance_not_in_window() const { return instance_not_in_window_; }
bool exclude_current_time() const { return exclude_current_time_; }
bool output_request_row() const { return output_request_row_; }
void set_output_request_row(bool flag) { output_request_row_ = flag; }
const RequestWindowOp &window() const { return window_; }
const RequestWindowUnionList &window_unions() const {
return window_unions_;
Expand All @@ -1506,10 +1509,20 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
}

RequestWindowOp window_;
const bool instance_not_in_window_;
const bool exclude_current_time_;
const bool output_request_row_;
bool instance_not_in_window_;
bool exclude_current_time_;
bool output_request_row_;
RequestWindowUnionList window_unions_;

private:
void InitOuptput() {
auto left = GetProducer(0);
if (left->GetOutputType() == kSchemaTypeRow) {
output_type_ = kSchemaTypeTable;
} else {
output_type_ = kSchemaTypeGroup;
}
}
};

class PhysicalRequestAggUnionNode : public PhysicalOpNode {
Expand Down
21 changes: 8 additions & 13 deletions hybridse/src/passes/physical/batch_request_optimize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ static Status UpdateProjectExpr(
return replacer.Replace(expr->DeepCopy(ctx->node_manager()), output);
}

// simplify simple project, remove orphan descendant producer nodes
static Status CreateSimplifiedProject(PhysicalPlanContext* ctx,
PhysicalOpNode* input,
const ColumnProjects& projects,
Expand All @@ -279,8 +280,7 @@ static Status CreateSimplifiedProject(PhysicalPlanContext* ctx,
can_project = false;
for (size_t i = 0; i < cur_input->producers().size(); ++i) {
auto cand_input = cur_input->GetProducer(i);
if (cand_input->GetOutputType() !=
PhysicalSchemaType::kSchemaTypeRow) {
if (cand_input->GetOutputType() != PhysicalSchemaType::kSchemaTypeRow) {
continue;
}
bool is_valid = true;
Expand Down Expand Up @@ -949,21 +949,16 @@ Status CommonColumnOptimize::ProcessJoin(PhysicalPlanContext* ctx,
}
} else if (is_non_common_join) {
// join only depend on non-common left part
if (left_state->non_common_op == join_op->GetProducer(0) &&
right == join_op->GetProducer(1)) {
if (left_state->non_common_op == join_op->GetProducer(0) && right == join_op->GetProducer(1)) {
state->common_op = nullptr;
state->non_common_op = join_op;
} else {
PhysicalRequestJoinNode* new_join = nullptr;
CHECK_STATUS(ctx->CreateOp<PhysicalRequestJoinNode>(
&new_join, left_state->non_common_op, right, join_op->join(),
join_op->output_right_only()));
CHECK_STATUS(ReplaceComponentExpr(
join_op->join(), join_op->joined_schemas_ctx(),
new_join->joined_schemas_ctx(), ctx->node_manager(),
&new_join->join_));
state->common_op =
join_op->output_right_only() ? nullptr : left_state->common_op;
CHECK_STATUS(ctx->CreateOp<PhysicalRequestJoinNode>(&new_join, left_state->non_common_op, right,
join_op->join(), join_op->output_right_only()));
CHECK_STATUS(ReplaceComponentExpr(join_op->join(), join_op->joined_schemas_ctx(),
new_join->joined_schemas_ctx(), ctx->node_manager(), &new_join->join_));
state->common_op = join_op->output_right_only() ? nullptr : left_state->common_op;
state->non_common_op = new_join;
if (!join_op->output_right_only()) {
for (size_t left_idx : left_state->common_column_indices) {
Expand Down
Loading

0 comments on commit b39cb17

Please sign in to comment.