Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support delete in pre-aggregator #2689

Closed
wants to merge 20 commits into from

Conversation

XLC-2
Copy link

@XLC-2 XLC-2 commented Oct 19, 2022

  • What kind of change does this PR introduce? (Bug fix, feature, docs update, ...)
    Bug fix

  • What is the current behavior? (You can also link to an open issue here)
    After deleting a record from a table, the pre aggregation table corresponding to this table will not be updated.

  • What is the new behavior (if this is a feature change)?
    After deleting a record from a table, the pre aggregation table corresponding to this table will be updated.

@github-actions github-actions bot added execute-engine hybridse sql engine storage-engine openmldb storage engine. nameserver & tablet labels Oct 19, 2022
@dl239 dl239 changed the title Fix/data consistency pre aggr2 feat: support delete in pre-aggregator Oct 19, 2022
op = node::FnOperator::kFnOpRLike;
break;
}
// case zetasql::ASTBinaryExpression::Op::RLIKE: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this file


bool Delete(const std::string& key);

bool FlushAll();

bool Init(std::shared_ptr<LogReplicator> base_replicator);

void SetReletedTable(std::shared_ptr<Table> base_table);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass base table in Init function

@@ -447,6 +495,8 @@ bool Aggregator::FlushAggrBuffer(const std::string& key, const std::string& filt
row_builder_.SetTimestamp(row_ptr, 1, buffer.ts_begin_);
row_builder_.SetTimestamp(row_ptr, 2, buffer.ts_end_);
row_builder_.SetInt32(row_ptr, 3, buffer.aggr_cnt_);
std::cout << "[DEBUG]: aggr_cnt: " << buffer.aggr_cnt_
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DLOG(INFO) << "aggr_cnt: " << buffer.aggr_cnt_ << " aggr_val: " << aggr_val;

@codecov
Copy link

codecov bot commented Oct 19, 2022

Codecov Report

❗ No coverage uploaded for pull request base (dev@770f04b). Click here to learn what that means.
Patch has no changes to coverable lines.

Additional details and impacted files
@@          Coverage Diff           @@
##             dev    #2689   +/-   ##
======================================
  Coverage       ?   36.70%           
  Complexity     ?      392           
======================================
  Files          ?      156           
  Lines          ?     9257           
  Branches       ?     1060           
======================================
  Hits           ?     3398           
  Misses         ?     5622           
  Partials       ?      237           

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

if (reverse) {
ok = UpdateAggrValAfterDeleteRange(base_row_view_, row_ptr, &aggr_buffer);
} else {
ok = UpdateAggrVal(base_row_view_, row_ptr, &aggr_buffer, reverse);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pass the reverse argument that is always false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because UpdateAggrVal will be called in other places, and the paremeter 'reverse' in other places may be true

@github-actions
Copy link
Contributor

github-actions bot commented Oct 19, 2022

Linux Test Report

577 tests   567 ✔️  26m 25s ⏱️
  57 suites      7 💤
  51 files        3

For more details on these failures, see this check.

Results for commit a4cf474.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Contributor

github-actions bot commented Oct 19, 2022

SDK Test Report

  83 files    83 suites   1m 40s ⏱️
174 tests 168 ✔️ 6 💤 0
220 runs  214 ✔️ 6 💤 0

Results for commit a4cf474.

♻️ This comment has been updated with latest results.

::openmldb::api::Dimension* dimension = entry.add_dimensions();
dimension->set_key(key);
dimension->set_idx(aggr_index_pos_);
ok = aggr_replicator_->AppendEntry(entry);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to call AppendEntry of agg_replicator

@github-actions
Copy link
Contributor

HybridSE Linux Test Report

19 719 tests   19 697 ✔️  8m 6s ⏱️
     240 suites           2 💤
       67 files           20

For more details on these failures, see this check.

Results for commit f9780e4.

@github-actions
Copy link
Contributor

HybridSE Mac Test Report

19 719 tests   19 696 ✔️  8m 51s ⏱️
     240 suites           2 💤
       67 files           21

For more details on these failures, see this check.

Results for commit f9780e4.

AddDefaultAggregatorSchema(&aggr_table_meta);
std::map<std::string, uint32_t> mapping;
mapping.insert(std::make_pair("idx", 0));
std::shared_ptr<Table> aggr_table =
Copy link
Collaborator

@dl239 dl239 Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::shared_ptr<Table> aggr_table = std::make_shared<MemTable>(aggr_table_meta);

@github-actions github-actions bot removed the execute-engine hybridse sql engine label Oct 24, 2022
if (!ok) {
PDLOG(ERROR, "Update aggr value failed");
return false;
}
return true;
}

bool Aggregator::DeleteAndUpdate(const std::string& key, const std::string& row,
const uint64_t& offset, bool recover) {
Copy link
Collaborator

@dl239 dl239 Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where this function is used except test case

@@ -3122,7 +3122,7 @@ int TabletImpl::LoadTableInternal(uint32_t tid, uint32_t pid, std::shared_ptr<::
auto aggrs = GetAggregators(tid, pid);
if (aggrs != nullptr) {
for (auto& aggr : *aggrs) {
if (!aggr->Init(replicator)) {
if (!aggr->Init(replicator, nullptr)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pass nullptr

break;
}
case DataType::kInt: {
int32_t val;
row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val);
aggr_buffer->aggr_val_.vlong += val;
if (reverse) {
aggr_buffer->aggr_val_.vlong -= val;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cpplint error

auto tmp_val = it->GetValue();
std::string origin_data = tmp_val.ToString();
const int8_t* row_ptr = reinterpret_cast<const int8_t*>(it->GetValue().data());
codec::RowView base_row_view(base_table_->GetTableMeta()->column_desc(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use row_view passed from the function arguments

const int8_t* row_ptr = reinterpret_cast<const int8_t*>(it->GetValue().data());
codec::RowView base_row_view(base_table_->GetTableMeta()->column_desc(),
reinterpret_cast<int8_t*>(const_cast<char*>(origin_data.c_str())),
origin_data.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call UpdateAggrVal(..., false) recursively?

@@ -5850,7 +5850,7 @@ bool TabletImpl::CreateAggregatorInternal(const ::openmldb::api::CreateAggregato
}

auto base_replicator = GetReplicator(base_meta->tid(), base_meta->pid());
if (!aggregator->Init(base_replicator)) {
if (!aggregator->Init(base_replicator, nullptr)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass base table?

@@ -173,6 +429,45 @@ void CheckSumAggrResult(std::shared_ptr<Table> aggr_table, DataType data_type, s
return;
}

template <typename T>
void CheckSumAggrResultAfterDelete(std::shared_ptr<Table> aggr_table, DataType data_type,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve cpplint error

@dl239 dl239 requested a review from zhanghaohit October 27, 2022 09:32
@dl239 dl239 marked this pull request as ready for review October 27, 2022 09:32
@@ -127,13 +127,14 @@ class Aggregator {

~Aggregator();

bool Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover = false);
bool Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover = false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call this function in Delete method of tablet_impl.cc

bool ok = UpdateAggrVal(base_row_view_, base_row_ptr, &tmp_buffer);
bool ok = false;
if (reverse) {
ok = aggr_table_->Delete(key, aggr_index_pos_, tmp_buffer.ts_begin_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should remove the cur_ts?

if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vint) {
aggr_buffer->aggr_val_.vint = val;
aggr_buffer->non_null_cnt_ = 0;
auto it = base_table_->NewTraverseIterator(0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use:

  1. NewWindowIterator(index_pos_)
  2. seek(key) and then seek(ts_begin_)
  3. reset the aggr_buffer
  4. iterate and update all the rows within the [ts_end_, ts_begin_]

The first two steps can also use NewTraverseIterator(index_pos_); and Seek(key, ts).

aggr_buffer->non_null_cnt_ = 0;
auto it = base_table_->NewTraverseIterator(0);
it->SeekToFirst();
while (it->Valid() && it->GetKey() >= aggr_buffer->ts_begin_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use the same common function for min and max

int64_t ts_end = aggr_buffer->ts_end_;
it->Seek(key, ts_end);
aggr_buffer->clear();
if (this->GetWindowType() == WindowType::kRowsRange) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for both kRowsRange and kkRowsNum, the ts_begin_ and ts_end_ should keep the same.

Comment on lines +852 to +858
if (this->GetWindowType() == WindowType::kRowsRange) {
if (aggr_buffer->ts_begin_ == -1) {
// the first record.
aggr_buffer->ts_end_ = it->GetKey();
}
aggr_buffer->ts_begin_ = it->GetKey();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why check the kRowsRange again here? you intend to check kRowsNum?

as we keep the ts_end_ and ts_start_ as previous buffer, can remove the code

PDLOG(ERROR, "base table is nullptr, cannot update MinAggr table");
return false;
}
auto it = base_table_->NewTraverseIterator(this->GetIndexPos());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you prefer to use TraverseIterator, you have to check the key and GetPK are the same. So here better to use WindowIterator to simplify the code and performance concern.

}
default: {
PDLOG(ERROR, "Unsupported data type");
return false;
}
}
aggr_buffer->non_null_cnt_++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be put for the else branch only

}
default: {
PDLOG(ERROR, "Unsupported data type");
return false;
}
}
aggr_buffer->non_null_cnt_++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non_null_cnt_++ for the else branch only

@dl239
Copy link
Collaborator

dl239 commented Jun 25, 2023

has done in #3327

@dl239 dl239 closed this Jun 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
storage-engine openmldb storage engine. nameserver & tablet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants