Skip to content

Commit

Permalink
feat(offline): WINDOW without ORDER BY
Browse files Browse the repository at this point in the history
only work if SkewWindowOpt is off
  • Loading branch information
aceforeverd committed Nov 23, 2023
1 parent 1c153e0 commit ae859d4
Show file tree
Hide file tree
Showing 13 changed files with 252 additions and 132 deletions.
29 changes: 29 additions & 0 deletions cases/query/window_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -904,6 +904,9 @@ cases:
# ======================================================================
# WINDOW without ORDER BY
#
# batch mode tests skipped since ordering in WINDOW is undefined, we only
# verify result for request mode, that's implmentation defined order, not SQL standard
# ======================================================================
- id: 24
desc: ROWS WINDOW WITHOUT ORDER BY
Expand Down Expand Up @@ -1132,3 +1135,29 @@ cases:
3, 1, 0, 3, 3
4, 2, 1, 3, 3
5, 3, 2, 3, 3
- id: 28
# simple case verify it compile & run for batch mode
desc: RANGE WINDOW WITHOUT ORDER BY
inputs:
- name: t1
columns:
- id int
- gp int
- ts timestamp
indexs:
- idx:gp:ts
data: |
1, 100, 20000
2, 100, 10000
3, 400, 20000
4, 400, 10
5, 400, 15000
sql: |
select id, count(ts) over w as agg
from t1
window w as (
partition by gp
rows_range between unbounded preceding and current row
)
expect:
success: true
125 changes: 13 additions & 112 deletions hybridse/include/vm/mem_catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,13 @@ class Window : public MemTimeTableHandler {
bool exclude_current_time() const { return exclude_current_time_; }
void set_exclude_current_time(bool flag) { exclude_current_time_ = flag; }

bool without_order_by() const { return without_order_by_; }
void set_without_order_by(bool flag) { without_order_by_ = flag; }

protected:
bool exclude_current_time_ = false;
bool instance_not_in_window_ = false;
bool without_order_by_ = false;
};
class WindowRange {
public:
Expand Down Expand Up @@ -356,44 +360,13 @@ class HistoryWindow : public Window {
PopFrontRow();
}
}
bool BufferData(uint64_t key, const Row& row) override;

// aad newer row into window
bool BufferData(uint64_t key, const Row& row) override {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
return false;
}
auto cur_size = table_.size();
if (cur_size < window_range_.start_row_) {
// current in the ROWS window
int64_t sub = key + window_range_.start_offset_;
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
if (0 == window_range_.end_offset_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);
}
} else if (0 == window_range_.end_offset_) {
// current in the ROWS_RANGE window
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
// current row BeforeWindow
int64_t sub = (key + window_range_.end_offset_);
uint64_t end_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentHistoryBuffer(key, row, end_ts);
}
}
// add newer row into window
bool BufferDataImpl(uint64_t key, const Row& row);

protected:
bool BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts) {
current_history_buffer_.emplace_front(key, row);
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
SlideWindow(start_ts, end_ts);
return true;
}
bool BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts);

// sliding rows data from `current_history_buffer_` into effective window
// by giving the new start_ts and end_ts.
Expand All @@ -413,77 +386,18 @@ class HistoryWindow : public Window {
// `start_ts_inclusive` and `end_ts_inclusive` can be empty, which effectively means less than 0.
// if `start_ts_inclusive` is empty, no rows goes out of effective window
// if `end_ts_inclusive` is empty, no rows goes out of history buffer and into effective window
void SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive) {
// always try to cleanup the stale rows out of effective window
if (start_ts_inclusive.has_value()) {
Slide(start_ts_inclusive);
}

if (!end_ts_inclusive.has_value()) {
return;
}

while (!current_history_buffer_.empty() && current_history_buffer_.back().first <= end_ts_inclusive) {
auto& back = current_history_buffer_.back();

BufferEffectiveWindow(back.first, back.second, start_ts_inclusive);
current_history_buffer_.pop_back();
}
}
void SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive);

// push the row to the start of window
// - pop last elements in window if exceed max window size
// - also pop last elements in window if there ts less than `start_ts`
//
// if `start_ts` is empty, no rows eliminated from window
bool BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts) {
AddFrontRow(key, row);
return Slide(start_ts);
}
bool BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts);

bool Slide(std::optional<uint64_t> start_ts) {
auto cur_size = table_.size();
while (window_range_.max_size_ > 0 &&
cur_size > window_range_.max_size_) {
PopBackRow();
--cur_size;
}
bool Slide(std::optional<uint64_t> start_ts);

// Slide window if window start bound >= rows/range preceding
while (cur_size > 0) {
const auto& pair = GetBackRow();
if ((kFrameRows == window_range_.frame_type_ || kFrameRowsMergeRowsRange == window_range_.frame_type_) &&
cur_size <= window_range_.start_row_ + 1) {
// note it is always current rows window
break;
}
if (kFrameRows == window_range_.frame_type_ || pair.first < start_ts) {
PopBackRow();
--cur_size;
} else {
break;
}
}
return true;
}

bool BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts) {
if (exclude_current_time_) {
// except `exclude current_row`, the current row is always added to the effective window
// but for next buffer action, previous current row already buffered in `current_history_buffer_`
// so the previous current row need eliminated for this next buf action
PopEffectiveDataIfAny();
if (key == 0) {
SlideWindow(start_ts, {});
} else {
SlideWindow(start_ts, key - 1);
}
current_history_buffer_.emplace_front(key, row);
}

// in queue the current row
return BufferEffectiveWindow(key, row, start_ts);
}
bool BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts);

WindowRange window_range_;
MemTimeTable current_history_buffer_;
Expand Down Expand Up @@ -512,20 +426,7 @@ class CurrentHistoryWindow : public HistoryWindow {

void PopFrontData() override { PopFrontRow(); }

bool BufferData(uint64_t key, const Row& row) override {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
return false;
}
int64_t sub = (key + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);

if (exclude_current_time_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);
}
}
bool BufferData(uint64_t key, const Row& row) override;
};

typedef std::map<std::string,
Expand Down
6 changes: 2 additions & 4 deletions hybridse/src/vm/core_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
#include "vm/core_api.h"
#include "base/sig_trace.h"
#include "codec/fe_row_codec.h"
#include "udf/default_udf_library.h"
#include "udf/udf.h"
#include "vm/jit_runtime.h"
#include "vm/jit_wrapper.h"
#include "vm/mem_catalog.h"
#include "vm/runner.h"
#include "vm/schemas_context.h"
Expand All @@ -30,14 +27,15 @@ namespace vm {

WindowInterface::WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool exclude_current_row,
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
uint64_t rows_preceding, uint64_t max_size) {
uint64_t rows_preceding, uint64_t max_size, bool without_order_by) {

Check warning on line 30 in hybridse/src/vm/core_api.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/core_api.cc#L30

Added line #L30 was not covered by tests
if (exclude_current_row && max_size > 0 && end_offset == 0) {
max_size++;
}
window_impl_ = std::make_unique<HistoryWindow>(
WindowRange(ExtractFrameType(frame_type_str), start_offset, end_offset, rows_preceding, max_size));
window_impl_->set_instance_not_in_window(instance_not_in_window);
window_impl_->set_exclude_current_time(exclude_current_time);
window_impl_->set_without_order_by(without_order_by);

Check warning on line 38 in hybridse/src/vm/core_api.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/core_api.cc#L38

Added line #L38 was not covered by tests
}

bool WindowInterface::BufferData(uint64_t key, const Row& row) {
Expand Down
9 changes: 7 additions & 2 deletions hybridse/src/vm/core_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#ifndef HYBRIDSE_SRC_VM_CORE_API_H_
#define HYBRIDSE_SRC_VM_CORE_API_H_

#include <map>
#include <memory>
#include <string>
#include "codec/fe_row_codec.h"
Expand All @@ -41,7 +40,13 @@ class WindowInterface {
public:
WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool execlude_current_row,
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
uint64_t rows_preceding, uint64_t max_size);
uint64_t rows_preceding, uint64_t max_size) {
WindowInterface(instance_not_in_window, exclude_current_time, execlude_current_row, frame_type_str,
start_offset, end_offset, rows_preceding, max_size, false);
}
WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool execlude_current_row,
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
uint64_t rows_preceding, uint64_t max_size, bool without_order_by);

bool BufferData(uint64_t key, const Row& row);

Expand Down
118 changes: 118 additions & 0 deletions hybridse/src/vm/mem_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -414,5 +414,123 @@ size_t RowGetSliceSize(int8_t* row_ptr, size_t idx) {
auto row = reinterpret_cast<Row*>(row_ptr);
return row->size(idx);
}
bool HistoryWindow::BufferData(uint64_t key, const Row& row) {
if (without_order_by()) {
return BufferDataImpl(0, row);
}

return BufferDataImpl(key, row);
}
bool HistoryWindow::BufferDataImpl(uint64_t key, const Row& row) {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key (" << key << ") less than latest key (" << GetFrontRow().first
<< ")";
return false;
}
auto cur_size = table_.size();
if (cur_size < window_range_.start_row_) {
// current in the ROWS window
int64_t sub = key + window_range_.start_offset_;
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
if (0 == window_range_.end_offset_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);

Check warning on line 438 in hybridse/src/vm/mem_catalog.cc

View check run for this annotation

Codecov / codecov/patch

hybridse/src/vm/mem_catalog.cc#L438

Added line #L438 was not covered by tests
}
} else if (0 == window_range_.end_offset_) {
// current in the ROWS_RANGE window
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
// current row BeforeWindow
int64_t sub = (key + window_range_.end_offset_);
uint64_t end_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
return BufferCurrentHistoryBuffer(key, row, end_ts);
}
}
bool HistoryWindow::BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts) {
current_history_buffer_.emplace_front(key, row);
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
SlideWindow(start_ts, end_ts);
return true;
}
void HistoryWindow::SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive) {
// always try to cleanup the stale rows out of effective window
if (start_ts_inclusive.has_value()) {
Slide(start_ts_inclusive);
}

if (!end_ts_inclusive.has_value()) {
return;
}

while (!current_history_buffer_.empty() && current_history_buffer_.back().first <= end_ts_inclusive) {
auto& back = current_history_buffer_.back();

BufferEffectiveWindow(back.first, back.second, start_ts_inclusive);
current_history_buffer_.pop_back();
}
}
bool HistoryWindow::BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts) {
AddFrontRow(key, row);
return Slide(start_ts);
}
bool HistoryWindow::Slide(std::optional<uint64_t> start_ts) {
auto cur_size = table_.size();
while (window_range_.max_size_ > 0 && cur_size > window_range_.max_size_) {
PopBackRow();
--cur_size;
}

// Slide window if window start bound >= rows/range preceding
while (cur_size > 0) {
const auto& pair = GetBackRow();
if ((kFrameRows == window_range_.frame_type_ || kFrameRowsMergeRowsRange == window_range_.frame_type_) &&
cur_size <= window_range_.start_row_ + 1) {
// note it is always current rows window
break;
}
if (kFrameRows == window_range_.frame_type_ || pair.first < start_ts) {
PopBackRow();
--cur_size;
} else {
break;
}
}
return true;
}
bool HistoryWindow::BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts) {
if (exclude_current_time_) {
// except `exclude current_row`, the current row is always added to the effective window
// but for next buffer action, previous current row already buffered in `current_history_buffer_`
// so the previous current row need eliminated for this next buf action
PopEffectiveDataIfAny();
if (key == 0) {
SlideWindow(start_ts, {});
} else {
SlideWindow(start_ts, key - 1);
}
current_history_buffer_.emplace_front(key, row);
}

// in queue the current row
return BufferEffectiveWindow(key, row, start_ts);
}
bool CurrentHistoryWindow::BufferData(uint64_t key, const Row& row) {
if (!table_.empty() && GetFrontRow().first > key) {
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
return false;
}
int64_t sub = (key + window_range_.start_offset_);
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);

if (exclude_current_time_) {
return BufferCurrentTimeBuffer(key, row, start_ts);
} else {
return BufferEffectiveWindow(key, row, start_ts);
}
}
} // namespace vm
} // namespace hybridse
1 change: 1 addition & 0 deletions hybridse/src/vm/runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ void WindowAggRunner::RunWindowAggOnKey(
HistoryWindow window(instance_window_gen_.range_gen_->window_range_);
window.set_instance_not_in_window(instance_not_in_window_);
window.set_exclude_current_time(exclude_current_time_);
window.set_without_order_by(without_order_by());

while (instance_segment_iter->Valid()) {
if (limit_cnt_.has_value() && cnt >= limit_cnt_) {
Expand Down
2 changes: 2 additions & 0 deletions hybridse/src/vm/runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ class WindowAggRunner : public Runner {
const bool instance_not_in_window_;
const bool exclude_current_time_;

bool without_order_by() const { return !instance_window_gen_.sort_gen_.Valid(); }

// slice size outputed of the first producer node
const size_t append_slices_;
WindowGenerator instance_window_gen_;
Expand Down
Loading

0 comments on commit ae859d4

Please sign in to comment.