Skip to content

Commit

Permalink
save changes to last join (window)
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd committed Oct 24, 2023
1 parent 8b32e6d commit c64ca4b
Show file tree
Hide file tree
Showing 24 changed files with 724 additions and 186 deletions.
38 changes: 38 additions & 0 deletions cases/query/last_join_window_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,41 @@ cases:
- [2,"aa", 2, 20, 4, 20 ]
- [3,"bb", 2, null, 2, NULL]
- [4,"cc", 2, 21, 2, 21]

# ===================================================================
# LAST JOIN (WINDOW)
# ===================================================================
- id: 8
inputs:
- name: t1
columns: ["c1 string","c2 int","c4 timestamp"]
indexs: ["index1:c1:c4"]
rows:
- ["aa",2,1590738989000]
- ["bb",3,1590738990000]
- ["cc",4,1590738991000]
- name: t2
columns: ["c1 string", "c2 int", "c4 timestamp"]
indexs: ["index1:c1:c4", "index2:c2:c4"]
rows:
- ["aa",1, 1590738989000]
- ["bb",3, 1590738990000]
- ["dd",4, 1590738991000]
sql: |
select t1.c1, tx.c1 as c1r, tx.c2 as c2r, agg
from t1 last join (
select c1, c2, count(c4) over w as agg
from t2
window w as (
partition by c1 order by c4
rows between 1 preceding and current row
)
) tx
on t1.c2 = tx.c2
expect:
columns: ["c1 string", "c1r string", "c2r int", "agg int64"]
order: c1
data: |
aa, NULL, NULL, NULL
bb, bb, 3, 1
cc, dd, 4, 1
33 changes: 1 addition & 32 deletions hybridse/examples/toydb/src/tablet/tablet_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,6 @@ RowIterator* TabletTableHandler::GetRawIterator() {
return new storage::FullTableIterator(table_->GetSegments(),
table_->GetSegCnt(), table_);
}
const uint64_t TabletTableHandler::GetCount() {
auto iter = GetIterator();
uint64_t cnt = 0;
while (iter->Valid()) {
iter->Next();
cnt++;
}
return cnt;
}
Row TabletTableHandler::At(uint64_t pos) {
auto iter = GetIterator();
while (pos-- > 0 && iter->Valid()) {
iter->Next();
}
return iter->Valid() ? iter->GetValue() : Row();
}

TabletCatalog::TabletCatalog() : tables_(), db_() {}

Expand Down Expand Up @@ -249,22 +233,6 @@ std::unique_ptr<WindowIterator> TabletSegmentHandler::GetWindowIterator(
const std::string& idx_name) {
return std::unique_ptr<WindowIterator>();
}
const uint64_t TabletSegmentHandler::GetCount() {
auto iter = GetIterator();
uint64_t cnt = 0;
while (iter->Valid()) {
cnt++;
iter->Next();
}
return cnt;
}
Row TabletSegmentHandler::At(uint64_t pos) {
auto iter = GetIterator();
while (pos-- > 0 && iter->Valid()) {
iter->Next();
}
return iter->Valid() ? iter->GetValue() : Row();
}

const uint64_t TabletPartitionHandler::GetCount() {
auto iter = GetWindowIterator();
Expand All @@ -275,5 +243,6 @@ const uint64_t TabletPartitionHandler::GetCount() {
}
return cnt;
}

} // namespace tablet
} // namespace hybridse
5 changes: 1 addition & 4 deletions hybridse/examples/toydb/src/tablet/tablet_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ class TabletSegmentHandler : public TableHandler {
std::unique_ptr<vm::RowIterator> GetIterator() override;
RowIterator* GetRawIterator() override;
std::unique_ptr<codec::WindowIterator> GetWindowIterator(const std::string& idx_name) override;
const uint64_t GetCount() override;
Row At(uint64_t pos) override;
const std::string GetHandlerTypeName() override {
return "TabletSegmentHandler";
}
Expand Down Expand Up @@ -104,6 +102,7 @@ class TabletPartitionHandler
std::unique_ptr<codec::WindowIterator> GetWindowIterator() override {
return table_handler_->GetWindowIterator(index_name_);
}

const uint64_t GetCount() override;

std::shared_ptr<TableHandler> GetSegment(const std::string& key) override {
Expand Down Expand Up @@ -152,8 +151,6 @@ class TabletTableHandler
RowIterator* GetRawIterator() override;
std::unique_ptr<codec::WindowIterator> GetWindowIterator(
const std::string& idx_name);
virtual const uint64_t GetCount();
Row At(uint64_t pos) override;

virtual std::shared_ptr<PartitionHandler> GetPartition(
const std::string& index_name) {
Expand Down
21 changes: 8 additions & 13 deletions hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/

#include "testing/toydb_engine_test_base.h"

#include "absl/strings/str_join.h"
#include "gtest/gtest.h"
#include "gtest/internal/gtest-param-util.h"

using namespace llvm; // NOLINT (build/namespaces)
using namespace llvm::orc; // NOLINT (build/namespaces)
Expand Down Expand Up @@ -141,18 +142,12 @@ std::shared_ptr<tablet::TabletCatalog> BuildOnePkTableStorage(
}
return catalog;
}
void BatchRequestEngineCheckWithCommonColumnIndices(
const SqlCase& sql_case, const EngineOptions options,
const std::set<size_t>& common_column_indices) {
std::ostringstream oss;
for (size_t index : common_column_indices) {
oss << index << ",";
}
LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: "
"common_column_indices = ["
<< oss.str() << "]";
ToydbBatchRequestEngineTestRunner engine_test(sql_case, options,
common_column_indices);
// Run check with common column index info
void BatchRequestEngineCheckWithCommonColumnIndices(const SqlCase& sql_case, const EngineOptions options,
const std::set<size_t>& common_column_indices) {
LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: common_column_indices = ["
<< absl::StrJoin(common_column_indices, ",") << "]";
ToydbBatchRequestEngineTestRunner engine_test(sql_case, options, common_column_indices);
engine_test.RunCheck();
}

Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/codec/row.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class Row {

inline int32_t size() const { return slice_.size(); }
inline int32_t size(int32_t pos) const {
return 0 == pos ? slice_.size() : slices_[pos - 1].size();
return 0 == pos ? slice_.size() : slices_.at(pos - 1).size();
}

// Return true if the length of the referenced data is zero
Expand Down
2 changes: 1 addition & 1 deletion hybridse/include/codec/row_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class ListV {
virtual const uint64_t GetCount() {
auto iter = GetIterator();
uint64_t cnt = 0;
while (iter->Valid()) {
while (iter && iter->Valid()) {
iter->Next();
cnt++;
}
Expand Down
3 changes: 2 additions & 1 deletion hybridse/include/vm/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class TableHandler;
class RowHandler;
class Tablet;

enum HandlerType { kRowHandler, kTableHandler, kPartitionHandler };
enum HandlerType { kRowHandler, kTableHandler, kPartitionHandler, kDataSetHandler };
enum OrderType { kDescOrder, kAscOrder, kNoneOrder };

/// \brief The basic dataset operation abstraction.
Expand Down Expand Up @@ -217,6 +217,7 @@ class TableHandler : public DataHandler {
virtual ~TableHandler() {}

/// Return table column Types information.
/// TODO: rm it, never used
virtual const Types& GetTypes() = 0;

/// Return the index information
Expand Down
156 changes: 153 additions & 3 deletions hybridse/include/vm/mem_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,156 @@ class MemPartitionHandler
IndexHint index_hint_;
OrderType order_type_;
};

class ConcatIterator final : public RowIterator {
public:
ConcatIterator(std::unique_ptr<RowIterator>&& left, size_t left_slices, std::unique_ptr<RowIterator>&& right,
size_t right_slices)
: left_(std::move(left)), left_slices_(left_slices), right_(std::move(right)), right_slices_(right_slices) {
SeekToFirst();
}
~ConcatIterator() override {}

bool Valid() const override {
return left_ && left_->Valid();
}
void Next() override {
left_->Next();
if (right_ && right_->Valid()) {
right_->Next();
}
}
const uint64_t& GetKey() const override {
return left_->GetKey();
}
const Row& GetValue() override {
if (!right_ || !right_->Valid()) {
buf_ = Row(left_slices_, left_->GetValue(), right_slices_, Row());
} else {
buf_ = Row(left_slices_, left_->GetValue(), right_slices_, right_->GetValue());
}
return buf_;
}

bool IsSeekable() const override { return true; };

void Seek(const uint64_t& key) override {
left_->Seek(key);
if (right_ && right_->Valid()) {
right_->Seek(key);
}
}

void SeekToFirst() override {
left_->SeekToFirst();
if (right_) {
right_->SeekToFirst();
}
}

private:
std::unique_ptr<RowIterator> left_;
size_t left_slices_;
std::unique_ptr<RowIterator> right_;
size_t right_slices_;

Row buf_;
};

class SimpleConcatTableHandler final : public TableHandler {
public:
SimpleConcatTableHandler(std::shared_ptr<TableHandler> left, size_t left_slices,
std::shared_ptr<TableHandler> right, size_t right_slices)
: left_(left), left_slices_(left_slices), right_(right), right_slices_(right_slices) {}
~SimpleConcatTableHandler() override {}

std::unique_ptr<RowIterator> GetIterator() override {
auto p = GetRawIterator();
if (p == nullptr) {
return {};
}
return std::unique_ptr<RowIterator>(p);
}

RowIterator* GetRawIterator() override {
auto li = left_->GetIterator();
if (!li) {
return nullptr;
}
auto ri = right_->GetIterator();
return new ConcatIterator(std::move(li), left_slices_, std::move(ri), right_slices_);
}

std::unique_ptr<WindowIterator> GetWindowIterator(const std::string& idx_name) override { return nullptr; }

const Types& GetTypes() override { return left_->GetTypes(); }

const IndexHint& GetIndex() override { return left_->GetIndex(); }

// unimplemented
const Schema* GetSchema() override { return left_->GetSchema(); }
const std::string& GetName() override { return left_->GetName(); }
const std::string& GetDatabase() override { return left_->GetDatabase(); }

private:
std::shared_ptr<TableHandler> left_;
size_t left_slices_;
std::shared_ptr<TableHandler> right_;
size_t right_slices_;
};

class ConcatPartitionHandler final : public PartitionHandler {
public:
ConcatPartitionHandler(std::shared_ptr<PartitionHandler> left, size_t left_slices,
std::shared_ptr<PartitionHandler> right, size_t right_slices)
: left_(left), left_slices_(left_slices), right_(right), right_slices_(right_slices) {}
~ConcatPartitionHandler() override {}

std::unique_ptr<RowIterator> GetIterator() override {
auto p = GetRawIterator();
if (p == nullptr) {
return {};
}
return std::unique_ptr<RowIterator>(p);
}

RowIterator* GetRawIterator() override {
auto li = left_->GetIterator();
if (!li) {
return nullptr;
}
auto ri = right_->GetIterator();
return new ConcatIterator(std::move(li), left_slices_, std::move(ri), right_slices_);
}

std::unique_ptr<WindowIterator> GetWindowIterator(const std::string &idx_name) override {
return nullptr;
}
std::unique_ptr<WindowIterator> GetWindowIterator() override { return nullptr;}

std::shared_ptr<TableHandler> GetSegment(const std::string &key) override {
auto left_seg = left_->GetSegment(key);
auto right_seg = right_->GetSegment(key);
return std::shared_ptr<TableHandler>(
new SimpleConcatTableHandler(left_seg, left_slices_, right_seg, right_slices_));
}

const Types& GetTypes() override { return left_->GetTypes(); }

const IndexHint& GetIndex() override { return left_->GetIndex(); }

// unimplemented
const Schema* GetSchema() override { return nullptr; }
const std::string& GetName() override { return left_->GetName(); }
const std::string& GetDatabase() override { return left_->GetDatabase(); }

private:
std::shared_ptr<PartitionHandler> left_;
size_t left_slices_;
std::shared_ptr<PartitionHandler> right_;
size_t right_slices_;
};

class ConcatTableHandler : public MemTimeTableHandler {
public:
ConcatTableHandler(std::shared_ptr<TableHandler> left, size_t left_slices,
Expand All @@ -692,19 +842,19 @@ class ConcatTableHandler : public MemTimeTableHandler {
status_ = SyncValue();
return MemTimeTableHandler::At(pos);
}
std::unique_ptr<RowIterator> GetIterator() {
std::unique_ptr<RowIterator> GetIterator() override {
if (status_.isRunning()) {
status_ = SyncValue();
}
return MemTimeTableHandler::GetIterator();
}
RowIterator* GetRawIterator() {
RowIterator* GetRawIterator() override {
if (status_.isRunning()) {
status_ = SyncValue();
}
return MemTimeTableHandler::GetRawIterator();
}
virtual const uint64_t GetCount() {
const uint64_t GetCount() override {
if (status_.isRunning()) {
status_ = SyncValue();
}
Expand Down
Loading

0 comments on commit c64ca4b

Please sign in to comment.