Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(offline): support & test new SQLs from online mode #3619

Merged
merged 2 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cases/query/window_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,9 @@ cases:

# ======================================================================
# WINDOW without ORDER BY
#
# batch mode tests skipped since ordering in WINDOW is undefined, we only
# verify result for request mode, that's implmentation defined order, not SQL standard
# ======================================================================
- id: 24
desc: ROWS WINDOW WITHOUT ORDER BY
Expand Down Expand Up @@ -1132,3 +1135,29 @@ cases:
3, 1, 0, 3, 3
4, 2, 1, 3, 3
5, 3, 2, 3, 3
- id: 28
# simple case verify it compile & run for batch mode
desc: RANGE WINDOW WITHOUT ORDER BY
inputs:
- name: t1
columns:
- id int
- gp int
- ts timestamp
indexs:
- idx:gp:ts
data: |
1, 100, 20000
2, 100, 10000
3, 400, 20000
4, 400, 10
5, 400, 15000
sql: |
select id, count(ts) over w as agg
from t1
window w as (
partition by gp
rows_range between unbounded preceding and current row
)
expect:
success: true
125 changes: 13 additions & 112 deletions hybridse/include/vm/mem_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,13 @@ class Window : public MemTimeTableHandler {
bool exclude_current_time() const { return exclude_current_time_; }
void set_exclude_current_time(bool flag) { exclude_current_time_ = flag; }

bool without_order_by() const { return without_order_by_; }
void set_without_order_by(bool flag) { without_order_by_ = flag; }

protected:
bool exclude_current_time_ = false;
bool instance_not_in_window_ = false;
bool without_order_by_ = false;
};
class WindowRange {
public:
Expand Down Expand Up @@ -356,44 +360,13 @@ class HistoryWindow : public Window {
PopFrontRow();
}
}
bool BufferData(uint64_t key, const Row& row) override;

// aad newer row into window
bool BufferData(uint64_t key, const Row& row) override {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
return false;
}
auto cur_size = table_.size();
if (cur_size < window_range_.start_row_) {
// current in the ROWS window
int64_t sub = key + window_range_.start_offset_;
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
if (0 == window_range_.end_offset_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);
}
} else if (0 == window_range_.end_offset_) {
// current in the ROWS_RANGE window
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
// current row BeforeWindow
int64_t sub = (key + window_range_.end_offset_);
uint64_t end_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentHistoryBuffer(key, row, end_ts);
}
}
// add newer row into window
bool BufferDataImpl(uint64_t key, const Row& row);

protected:
bool BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts) {
current_history_buffer_.emplace_front(key, row);
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
SlideWindow(start_ts, end_ts);
return true;
}
bool BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts);

// sliding rows data from `current_history_buffer_` into effective window
// by giving the new start_ts and end_ts.
Expand All @@ -413,77 +386,18 @@ class HistoryWindow : public Window {
// `start_ts_inclusive` and `end_ts_inclusive` can be empty, which effectively means less than 0.
// if `start_ts_inclusive` is empty, no rows goes out of effective window
// if `end_ts_inclusive` is empty, no rows goes out of history buffer and into effective window
void SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive) {
// always try to cleanup the stale rows out of effective window
if (start_ts_inclusive.has_value()) {
Slide(start_ts_inclusive);
}

if (!end_ts_inclusive.has_value()) {
return;
}

while (!current_history_buffer_.empty() && current_history_buffer_.back().first <= end_ts_inclusive) {
auto& back = current_history_buffer_.back();

BufferEffectiveWindow(back.first, back.second, start_ts_inclusive);
current_history_buffer_.pop_back();
}
}
void SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive);

// push the row to the start of window
// - pop last elements in window if exceed max window size
// - also pop last elements in window if there ts less than `start_ts`
//
// if `start_ts` is empty, no rows eliminated from window
bool BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts) {
AddFrontRow(key, row);
return Slide(start_ts);
}
bool BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts);

bool Slide(std::optional<uint64_t> start_ts) {
auto cur_size = table_.size();
while (window_range_.max_size_ > 0 &&
cur_size > window_range_.max_size_) {
PopBackRow();
--cur_size;
}
bool Slide(std::optional<uint64_t> start_ts);

// Slide window if window start bound >= rows/range preceding
while (cur_size > 0) {
const auto& pair = GetBackRow();
if ((kFrameRows == window_range_.frame_type_ || kFrameRowsMergeRowsRange == window_range_.frame_type_) &&
cur_size <= window_range_.start_row_ + 1) {
// note it is always current rows window
break;
}
if (kFrameRows == window_range_.frame_type_ || pair.first < start_ts) {
PopBackRow();
--cur_size;
} else {
break;
}
}
return true;
}

bool BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts) {
if (exclude_current_time_) {
// except `exclude current_row`, the current row is always added to the effective window
// but for next buffer action, previous current row already buffered in `current_history_buffer_`
// so the previous current row need eliminated for this next buf action
PopEffectiveDataIfAny();
if (key == 0) {
SlideWindow(start_ts, {});
} else {
SlideWindow(start_ts, key - 1);
}
current_history_buffer_.emplace_front(key, row);
}

// in queue the current row
return BufferEffectiveWindow(key, row, start_ts);
}
bool BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts);

WindowRange window_range_;
MemTimeTable current_history_buffer_;
Expand Down Expand Up @@ -512,20 +426,7 @@ class CurrentHistoryWindow : public HistoryWindow {

void PopFrontData() override { PopFrontRow(); }

bool BufferData(uint64_t key, const Row& row) override {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
return false;
}
int64_t sub = (key + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);

if (exclude_current_time_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);
}
}
bool BufferData(uint64_t key, const Row& row) override;
};

typedef std::map<std::string,
Expand Down
6 changes: 2 additions & 4 deletions hybridse/src/vm/core_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
#include "vm/core_api.h"
#include "base/sig_trace.h"
#include "codec/fe_row_codec.h"
#include "udf/default_udf_library.h"
#include "udf/udf.h"
#include "vm/jit_runtime.h"
#include "vm/jit_wrapper.h"
#include "vm/mem_catalog.h"
#include "vm/runner.h"
#include "vm/schemas_context.h"
Expand All @@ -30,14 +27,15 @@

WindowInterface::WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool exclude_current_row,
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
uint64_t rows_preceding, uint64_t max_size) {
uint64_t rows_preceding, uint64_t max_size, bool without_order_by) {

Check warning on line 30 in hybridse/src/vm/core_api.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/core_api.cc#L30

Added line #L30 was not covered by tests
if (exclude_current_row && max_size > 0 && end_offset == 0) {
max_size++;
}
window_impl_ = std::make_unique<HistoryWindow>(
WindowRange(ExtractFrameType(frame_type_str), start_offset, end_offset, rows_preceding, max_size));
window_impl_->set_instance_not_in_window(instance_not_in_window);
window_impl_->set_exclude_current_time(exclude_current_time);
window_impl_->set_without_order_by(without_order_by);

Check warning on line 38 in hybridse/src/vm/core_api.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/core_api.cc#L38

Added line #L38 was not covered by tests
}

bool WindowInterface::BufferData(uint64_t key, const Row& row) {
Expand Down
9 changes: 7 additions & 2 deletions hybridse/src/vm/core_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#ifndef HYBRIDSE_SRC_VM_CORE_API_H_
#define HYBRIDSE_SRC_VM_CORE_API_H_

#include <map>
#include <memory>
#include <string>
#include "codec/fe_row_codec.h"
Expand All @@ -41,7 +40,13 @@ class WindowInterface {
public:
WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool execlude_current_row,
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
uint64_t rows_preceding, uint64_t max_size);
uint64_t rows_preceding, uint64_t max_size) {
WindowInterface(instance_not_in_window, exclude_current_time, execlude_current_row, frame_type_str,
start_offset, end_offset, rows_preceding, max_size, false);
}
WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool execlude_current_row,
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
uint64_t rows_preceding, uint64_t max_size, bool without_order_by);

bool BufferData(uint64_t key, const Row& row);

Expand Down
118 changes: 118 additions & 0 deletions hybridse/src/vm/mem_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,5 +414,123 @@
auto row = reinterpret_cast<Row*>(row_ptr);
return row->size(idx);
}
bool HistoryWindow::BufferData(uint64_t key, const Row& row) {
if (without_order_by()) {
return BufferDataImpl(0, row);
}

return BufferDataImpl(key, row);
}
bool HistoryWindow::BufferDataImpl(uint64_t key, const Row& row) {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key (" << key << ") less than latest key (" << GetFrontRow().first
<< ")";
return false;
}
auto cur_size = table_.size();
if (cur_size < window_range_.start_row_) {
// current in the ROWS window
int64_t sub = key + window_range_.start_offset_;
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
if (0 == window_range_.end_offset_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);

Check warning on line 438 in hybridse/src/vm/mem_catalog.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/mem_catalog.cc#L438

Added line #L438 was not covered by tests
}
} else if (0 == window_range_.end_offset_) {
// current in the ROWS_RANGE window
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
// current row BeforeWindow
int64_t sub = (key + window_range_.end_offset_);
uint64_t end_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentHistoryBuffer(key, row, end_ts);
}
}
bool HistoryWindow::BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts) {
current_history_buffer_.emplace_front(key, row);
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
SlideWindow(start_ts, end_ts);
return true;
}
void HistoryWindow::SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive) {
// always try to cleanup the stale rows out of effective window
if (start_ts_inclusive.has_value()) {
Slide(start_ts_inclusive);
}

if (!end_ts_inclusive.has_value()) {
return;
}

while (!current_history_buffer_.empty() && current_history_buffer_.back().first <= end_ts_inclusive) {
auto& back = current_history_buffer_.back();

BufferEffectiveWindow(back.first, back.second, start_ts_inclusive);
current_history_buffer_.pop_back();
}
}
bool HistoryWindow::BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts) {
AddFrontRow(key, row);
return Slide(start_ts);
}
bool HistoryWindow::Slide(std::optional<uint64_t> start_ts) {
auto cur_size = table_.size();
while (window_range_.max_size_ > 0 && cur_size > window_range_.max_size_) {
PopBackRow();
--cur_size;
}

// Slide window if window start bound >= rows/range preceding
while (cur_size > 0) {
const auto& pair = GetBackRow();
if ((kFrameRows == window_range_.frame_type_ || kFrameRowsMergeRowsRange == window_range_.frame_type_) &&
cur_size <= window_range_.start_row_ + 1) {
// note it is always current rows window
break;
}
if (kFrameRows == window_range_.frame_type_ || pair.first < start_ts) {
PopBackRow();
--cur_size;
} else {
break;
}
}
return true;
}
bool HistoryWindow::BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts) {
if (exclude_current_time_) {
// except `exclude current_row`, the current row is always added to the effective window
// but for next buffer action, previous current row already buffered in `current_history_buffer_`
// so the previous current row need eliminated for this next buf action
PopEffectiveDataIfAny();
if (key == 0) {
SlideWindow(start_ts, {});
} else {
SlideWindow(start_ts, key - 1);
}
current_history_buffer_.emplace_front(key, row);
}

// in queue the current row
return BufferEffectiveWindow(key, row, start_ts);
}
bool CurrentHistoryWindow::BufferData(uint64_t key, const Row& row) {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
return false;
}
int64_t sub = (key + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);

if (exclude_current_time_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);
}
}
} // namespace vm
} // namespace hybridse
1 change: 1 addition & 0 deletions hybridse/src/vm/runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ void WindowAggRunner::RunWindowAggOnKey(
HistoryWindow window(instance_window_gen_.range_gen_->window_range_);
window.set_instance_not_in_window(instance_not_in_window_);
window.set_exclude_current_time(exclude_current_time_);
window.set_without_order_by(without_order_by());

while (instance_segment_iter->Valid()) {
if (limit_cnt_.has_value() && cnt >= limit_cnt_) {
Expand Down
2 changes: 2 additions & 0 deletions hybridse/src/vm/runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ class WindowAggRunner : public Runner {
const bool instance_not_in_window_;
const bool exclude_current_time_;

bool without_order_by() const { return !instance_window_gen_.sort_gen_.Valid(); }

// slice size outputed of the first producer node
const size_t append_slices_;
WindowGenerator instance_window_gen_;
Expand Down
Loading
Loading