-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support delete in pre-aggregator #2689
Conversation
…f new and old interfaces
… generate combined_key, 2. RemoveDeletedKey add combined_key to distinguish deletion granularity
…eded; 2. remove the extra process of parsing records; 3. fix cpplint error
…ata when do FreeTsEntry
…last record is deleted, the keyEntry will also be deleted
…nMLDB into fix/data-consistency-pre-aggr2
op = node::FnOperator::kFnOpRLike; | ||
break; | ||
} | ||
// case zetasql::ASTBinaryExpression::Op::RLIKE: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this file
src/storage/aggregator.h
Outdated
|
||
bool Delete(const std::string& key); | ||
|
||
bool FlushAll(); | ||
|
||
bool Init(std::shared_ptr<LogReplicator> base_replicator); | ||
|
||
void SetReletedTable(std::shared_ptr<Table> base_table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass base table in Init
function
src/storage/aggregator.cc
Outdated
@@ -447,6 +495,8 @@ bool Aggregator::FlushAggrBuffer(const std::string& key, const std::string& filt | |||
row_builder_.SetTimestamp(row_ptr, 1, buffer.ts_begin_); | |||
row_builder_.SetTimestamp(row_ptr, 2, buffer.ts_end_); | |||
row_builder_.SetInt32(row_ptr, 3, buffer.aggr_cnt_); | |||
std::cout << "[DEBUG]: aggr_cnt: " << buffer.aggr_cnt_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DLOG(INFO) << "aggr_cnt: " << buffer.aggr_cnt_ << " aggr_val: " << aggr_val;
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## dev #2689 +/- ##
======================================
Coverage ? 36.70%
Complexity ? 392
======================================
Files ? 156
Lines ? 9257
Branches ? 1060
======================================
Hits ? 3398
Misses ? 5622
Partials ? 237 Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
src/storage/aggregator.cc
Outdated
if (reverse) { | ||
ok = UpdateAggrValAfterDeleteRange(base_row_view_, row_ptr, &aggr_buffer); | ||
} else { | ||
ok = UpdateAggrVal(base_row_view_, row_ptr, &aggr_buffer, reverse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why pass the reverse
argument that is always false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because UpdateAggrVal will be called in other places, and the paremeter 'reverse' in other places may be true
src/storage/aggregator.cc
Outdated
::openmldb::api::Dimension* dimension = entry.add_dimensions(); | ||
dimension->set_key(key); | ||
dimension->set_idx(aggr_index_pos_); | ||
ok = aggr_replicator_->AppendEntry(entry); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to call AppendEntry
of agg_replicator
src/storage/aggregator_test.cc
Outdated
AddDefaultAggregatorSchema(&aggr_table_meta); | ||
std::map<std::string, uint32_t> mapping; | ||
mapping.insert(std::make_pair("idx", 0)); | ||
std::shared_ptr<Table> aggr_table = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::shared_ptr<Table> aggr_table = std::make_shared<MemTable>(aggr_table_meta);
…le can be updated after deleting a record
src/storage/aggregator.cc
Outdated
if (!ok) { | ||
PDLOG(ERROR, "Update aggr value failed"); | ||
return false; | ||
} | ||
return true; | ||
} | ||
|
||
bool Aggregator::DeleteAndUpdate(const std::string& key, const std::string& row, | ||
const uint64_t& offset, bool recover) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where this function is used except test case
src/tablet/tablet_impl.cc
Outdated
@@ -3122,7 +3122,7 @@ int TabletImpl::LoadTableInternal(uint32_t tid, uint32_t pid, std::shared_ptr<:: | |||
auto aggrs = GetAggregators(tid, pid); | |||
if (aggrs != nullptr) { | |||
for (auto& aggr : *aggrs) { | |||
if (!aggr->Init(replicator)) { | |||
if (!aggr->Init(replicator, nullptr)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why pass nullptr
src/storage/aggregator.cc
Outdated
break; | ||
} | ||
case DataType::kInt: { | ||
int32_t val; | ||
row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); | ||
aggr_buffer->aggr_val_.vlong += val; | ||
if (reverse) { | ||
aggr_buffer->aggr_val_.vlong -= val; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpplint error
src/storage/aggregator.cc
Outdated
auto tmp_val = it->GetValue(); | ||
std::string origin_data = tmp_val.ToString(); | ||
const int8_t* row_ptr = reinterpret_cast<const int8_t*>(it->GetValue().data()); | ||
codec::RowView base_row_view(base_table_->GetTableMeta()->column_desc(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use row_view
passed from the function arguments
src/storage/aggregator.cc
Outdated
const int8_t* row_ptr = reinterpret_cast<const int8_t*>(it->GetValue().data()); | ||
codec::RowView base_row_view(base_table_->GetTableMeta()->column_desc(), | ||
reinterpret_cast<int8_t*>(const_cast<char*>(origin_data.c_str())), | ||
origin_data.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call UpdateAggrVal(..., false)
recursively?
src/tablet/tablet_impl.cc
Outdated
@@ -5850,7 +5850,7 @@ bool TabletImpl::CreateAggregatorInternal(const ::openmldb::api::CreateAggregato | |||
} | |||
|
|||
auto base_replicator = GetReplicator(base_meta->tid(), base_meta->pid()); | |||
if (!aggregator->Init(base_replicator)) { | |||
if (!aggregator->Init(base_replicator, nullptr)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass base table?
src/storage/aggregator_test.cc
Outdated
@@ -173,6 +429,45 @@ void CheckSumAggrResult(std::shared_ptr<Table> aggr_table, DataType data_type, s | |||
return; | |||
} | |||
|
|||
template <typename T> | |||
void CheckSumAggrResultAfterDelete(std::shared_ptr<Table> aggr_table, DataType data_type, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolve cpplint error
@@ -127,13 +127,14 @@ class Aggregator { | |||
|
|||
~Aggregator(); | |||
|
|||
bool Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover = false); | |||
bool Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover = false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call this function in Delete
method of tablet_impl.cc
src/storage/aggregator.cc
Outdated
bool ok = UpdateAggrVal(base_row_view_, base_row_ptr, &tmp_buffer); | ||
bool ok = false; | ||
if (reverse) { | ||
ok = aggr_table_->Delete(key, aggr_index_pos_, tmp_buffer.ts_begin_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should remove the cur_ts
?
src/storage/aggregator.cc
Outdated
if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vint) { | ||
aggr_buffer->aggr_val_.vint = val; | ||
aggr_buffer->non_null_cnt_ = 0; | ||
auto it = base_table_->NewTraverseIterator(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use:
NewWindowIterator(index_pos_)
- seek(key) and then seek(ts_begin_)
- reset the
aggr_buffer
- iterate and update all the rows within the [ts_end_, ts_begin_]
The first two steps can also use NewTraverseIterator(index_pos_);
and Seek(key, ts)
.
src/storage/aggregator.cc
Outdated
aggr_buffer->non_null_cnt_ = 0; | ||
auto it = base_table_->NewTraverseIterator(0); | ||
it->SeekToFirst(); | ||
while (it->Valid() && it->GetKey() >= aggr_buffer->ts_begin_) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use the same common function for min
and max
int64_t ts_end = aggr_buffer->ts_end_; | ||
it->Seek(key, ts_end); | ||
aggr_buffer->clear(); | ||
if (this->GetWindowType() == WindowType::kRowsRange) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for both kRowsRange
and kkRowsNum
, the ts_begin_
and ts_end_
should keep the same.
if (this->GetWindowType() == WindowType::kRowsRange) { | ||
if (aggr_buffer->ts_begin_ == -1) { | ||
// the first record. | ||
aggr_buffer->ts_end_ = it->GetKey(); | ||
} | ||
aggr_buffer->ts_begin_ = it->GetKey(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why check the kRowsRange
again here? you intend to check kRowsNum
?
as we keep the ts_end_
and ts_start_
as previous buffer, can remove the code
PDLOG(ERROR, "base table is nullptr, cannot update MinAggr table"); | ||
return false; | ||
} | ||
auto it = base_table_->NewTraverseIterator(this->GetIndexPos()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you prefer to use TraverseIterator
, you have to check the key
and GetPK
are the same. So here better to use WindowIterator
to simplify the code and performance concern.
} | ||
default: { | ||
PDLOG(ERROR, "Unsupported data type"); | ||
return false; | ||
} | ||
} | ||
aggr_buffer->non_null_cnt_++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be put for the else branch
only
} | ||
default: { | ||
PDLOG(ERROR, "Unsupported data type"); | ||
return false; | ||
} | ||
} | ||
aggr_buffer->non_null_cnt_++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non_null_cnt_++
for the else branch
only
has done in #3327 |
What kind of change does this PR introduce? (Bug fix, feature, docs update, ...)
Bug fix
What is the current behavior? (You can also link to an open issue here)
After deleting a record from a table, the pre aggregation table corresponding to this table will not be updated.
What is the new behavior (if this is a feature change)?
After deleting a record from a table, the pre aggregation table corresponding to this table will be updated.