diff --git a/src/storage/aggregator.cc b/src/storage/aggregator.cc index a62bb74d376..42472d1586a 100644 --- a/src/storage/aggregator.cc +++ b/src/storage/aggregator.cc @@ -97,7 +97,8 @@ Aggregator::Aggregator(const ::openmldb::api::TableMeta& base_meta, const ::open Aggregator::~Aggregator() {} -bool Aggregator::Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover) { +bool Aggregator::Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover, + bool reverse) { if (!recover && GetStat() != AggrStat::kInited) { PDLOG(WARNING, "Aggregator status is not kInited"); return false; @@ -129,7 +130,6 @@ bool Aggregator::Update(const std::string& key, const std::string& row, const ui LOG(ERROR) << "unsupport rows bucket window for *_where agg op"; return false; } - AggrBufferLocked* aggr_buffer_lock; { std::lock_guard lock(mu_); @@ -170,7 +170,6 @@ bool Aggregator::Update(const std::string& key, const std::string& row, const ui return false; } } - if (cur_ts < aggr_buffer.ts_begin_) { // handle the case that the current timestamp is smaller than the begin timestamp in aggregate buffer lock.unlock(); @@ -178,14 +177,13 @@ bool Aggregator::Update(const std::string& key, const std::string& row, const ui // avoid out-of-order duplicate writes during the recovery phase return true; } - bool ok = UpdateFlushedBuffer(key, filter_key, row_ptr, cur_ts, offset); + bool ok = UpdateFlushedBuffer(key, filter_key, row_ptr, cur_ts, offset, reverse); if (!ok) { PDLOG(ERROR, "Update flushed buffer failed"); return false; } return true; } - if (CheckBufferFilled(cur_ts, aggr_buffer.ts_end_, aggr_buffer.aggr_cnt_)) { AggrBuffer flush_buffer = aggr_buffer; uint64_t latest_binlog = aggr_buffer.binlog_offset_ + 1; @@ -205,7 +203,7 @@ bool Aggregator::Update(const std::string& key, const std::string& row, const ui if (window_type_ == WindowType::kRowsNum) { aggr_buffer.ts_end_ = cur_ts; } - bool ok = UpdateAggrVal(base_row_view_, row_ptr, &aggr_buffer); + bool ok = UpdateAggrVal(base_row_view_, row_ptr, &aggr_buffer, key, reverse); if (!ok) { PDLOG(ERROR, "Update aggr value failed"); return false; @@ -270,8 +268,10 @@ bool Aggregator::FlushAll() { return true; } -bool Aggregator::Init(std::shared_ptr base_replicator) { +bool Aggregator::Init(std::shared_ptr base_replicator, + std::shared_ptr base_table) { std::unique_lock lock(mu_); + this->base_table_ = base_table; if (GetStat() != AggrStat::kUnInit) { PDLOG(INFO, "aggregator status is %s", AggrStatToString(GetStat())); return true; @@ -392,6 +392,7 @@ bool Aggregator::Init(std::shared_ptr base_replicator) { status_.store(AggrStat::kInited, std::memory_order_relaxed); return true; } + bool Aggregator::GetAggrBuffer(const std::string& key, AggrBuffer** buffer) { return GetAggrBuffer(key, "", buffer); } bool Aggregator::GetAggrBuffer(const std::string& key, const std::string& filter_key, AggrBuffer** buffer) { @@ -458,7 +459,6 @@ bool Aggregator::FlushAggrBuffer(const std::string& key, const std::string& filt } else { row_builder_.SetNULL(row_ptr, row_size, 6); } - int64_t time = ::baidu::common::timer::get_micros() / 1000; Dimensions dimensions; auto dimension = dimensions.Add(); @@ -483,7 +483,7 @@ bool Aggregator::FlushAggrBuffer(const std::string& key, const std::string& filt } bool Aggregator::UpdateFlushedBuffer(const std::string& key, const std::string& filter_key, const int8_t* base_row_ptr, - int64_t cur_ts, uint64_t offset) { + int64_t cur_ts, uint64_t offset, bool reverse) { auto it = aggr_table_->NewTraverseIterator(0); // If there is no repetition of ts, `seek` will locate to the position that less than ts. it->Seek(key, cur_ts + 1); @@ -528,7 +528,11 @@ bool Aggregator::UpdateFlushedBuffer(const std::string& key, const std::string& PDLOG(ERROR, "GetAggrBufferFromRowView failed"); return false; } - tmp_buffer.aggr_cnt_ += 1; + if (reverse) { + tmp_buffer.aggr_cnt_ -= 1; + } else { + tmp_buffer.aggr_cnt_ += 1; + } tmp_buffer.binlog_offset_ = std::max(tmp_buffer.binlog_offset_, offset); break; } @@ -543,12 +547,18 @@ bool Aggregator::UpdateFlushedBuffer(const std::string& key, const std::string& tmp_buffer.aggr_cnt_ = 1; tmp_buffer.binlog_offset_ = offset; } - bool ok = UpdateAggrVal(base_row_view_, base_row_ptr, &tmp_buffer); + bool ok = false; + ok = aggr_table_->Delete(key, aggr_index_pos_, tmp_buffer.ts_begin_); if (!ok) { - PDLOG(ERROR, "UpdateAggrVal failed"); + PDLOG(ERROR, "aggr table delete one record failed"); return false; } + ok = UpdateAggrVal(base_row_view_, base_row_ptr, &tmp_buffer, key, reverse); + if (!ok) { + PDLOG(ERROR, "UpdateAggrVal failed"); + return false; + } ok = FlushAggrBuffer(key, filter_key, tmp_buffer); if (!ok) { PDLOG(ERROR, "FlushAggrBuffer failed"); @@ -573,7 +583,8 @@ SumAggregator::SumAggregator(const ::openmldb::api::TableMeta& base_meta, const : Aggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col, window_tpye, window_size) {} -bool SumAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) { +bool SumAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse) { if (row_view.IsNULL(row_ptr, aggr_col_idx_)) { return true; } @@ -581,32 +592,52 @@ bool SumAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* case DataType::kSmallInt: { int16_t val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vlong += val; + if (reverse) { + aggr_buffer->aggr_val_.vlong -= val; + } else { + aggr_buffer->aggr_val_.vlong += val; + } break; } case DataType::kInt: { int32_t val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vlong += val; + if (reverse) { + aggr_buffer->aggr_val_.vlong -= val; + } else { + aggr_buffer->aggr_val_.vlong += val; + } break; } case DataType::kTimestamp: case DataType::kBigInt: { int64_t val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vlong += val; + if (reverse) { + aggr_buffer->aggr_val_.vlong -= val; + } else { + aggr_buffer->aggr_val_.vlong += val; + } break; } case DataType::kFloat: { float val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vfloat += val; + if (reverse) { + aggr_buffer->aggr_val_.vfloat -= val; + } else { + aggr_buffer->aggr_val_.vfloat += val; + } break; } case DataType::kDouble: { double val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vdouble += val; + if (reverse) { + aggr_buffer->aggr_val_.vdouble -= val; + } else { + aggr_buffer->aggr_val_.vdouble += val; + } break; } default: { @@ -614,7 +645,11 @@ bool SumAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* return false; } } - aggr_buffer->non_null_cnt_++; + if (reverse) { + aggr_buffer->non_null_cnt_--; + } else { + aggr_buffer->non_null_cnt_++; + } return true; } @@ -786,6 +821,47 @@ bool MinMaxBaseAggregator::DecodeAggrVal(const int8_t* row_ptr, AggrBuffer* buff return true; } +bool MinMaxBaseAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse) { + return true; +} + +bool MinMaxBaseAggregator::RebuildExtremumAggrBuffer(const codec::RowView& row_view, + AggrBuffer* aggr_buffer, + const std::string& key) { + if (base_table_ == nullptr) { + PDLOG(ERROR, "base table is nullptr, cannot update MinAggr table"); + return false; + } + auto it = base_table_->NewTraverseIterator(this->GetIndexPos()); + int64_t ts_begin = aggr_buffer->ts_begin_; + int64_t ts_end = aggr_buffer->ts_end_; + it->Seek(key, ts_end); + aggr_buffer->clear(); + if (this->GetWindowType() == WindowType::kRowsRange) { + aggr_buffer->ts_begin_ = ts_begin; + aggr_buffer->ts_end_ = ts_end; + } + while (it->Valid() && it->GetKey() >= (uint64_t)ts_begin) { + if (it->GetKey() <= (uint64_t)ts_end) { + const int8_t* base_row_ptr = reinterpret_cast(it->GetValue().data()); + if (!UpdateAggrVal(row_view, base_row_ptr, aggr_buffer, key, false)) { + PDLOG(ERROR, "Failed to update aggr Val during rebuilding Extermum aggr buffer"); + return false; + } + if (this->GetWindowType() == WindowType::kRowsRange) { + if (aggr_buffer->ts_begin_ == -1) { + // the first record. + aggr_buffer->ts_end_ = it->GetKey(); + } + aggr_buffer->ts_begin_ = it->GetKey(); + } + } + it->Next(); + } + return true; +} + MinAggregator::MinAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr
aggr_table, std::shared_ptr aggr_replicator, const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type, @@ -793,75 +869,80 @@ MinAggregator::MinAggregator(const ::openmldb::api::TableMeta& base_meta, const : MinMaxBaseAggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col, window_tpye, window_size) {} -bool MinAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) { - if (row_view.IsNULL(row_ptr, aggr_col_idx_)) { - return true; - } - switch (aggr_col_type_) { - case DataType::kSmallInt: { - int16_t val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vsmallint) { - aggr_buffer->aggr_val_.vsmallint = val; - } - break; +bool MinAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse) { + if (reverse) { + return RebuildExtremumAggrBuffer(row_view, aggr_buffer, key); + } else { + if (row_view.IsNULL(row_ptr, aggr_col_idx_)) { + return true; } - case DataType::kDate: - case DataType::kInt: { - int32_t val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vint) { - aggr_buffer->aggr_val_.vint = val; + switch (aggr_col_type_) { + case DataType::kSmallInt: { + int16_t val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vsmallint) { + aggr_buffer->aggr_val_.vsmallint = val; + } + break; } - break; - } - case DataType::kTimestamp: - case DataType::kBigInt: { - int64_t val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vlong) { - aggr_buffer->aggr_val_.vlong = val; + case DataType::kDate: + case DataType::kInt: { + int32_t val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vint) { + aggr_buffer->aggr_val_.vint = val; + } + break; } - break; - } - case DataType::kFloat: { - float val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vfloat) { - aggr_buffer->aggr_val_.vfloat = val; + case DataType::kTimestamp: + case DataType::kBigInt: { + int64_t val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vlong) { + aggr_buffer->aggr_val_.vlong = val; + } + break; } - break; - } - case DataType::kDouble: { - double val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vdouble) { - aggr_buffer->aggr_val_.vdouble = val; + case DataType::kFloat: { + float val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vfloat) { + aggr_buffer->aggr_val_.vfloat = val; + } + break; } - break; - } - case DataType::kString: - case DataType::kVarchar: { - char* ch = NULL; - uint32_t ch_length = 0; - row_view.GetValue(row_ptr, aggr_col_idx_, &ch, &ch_length); - auto& aggr_val = aggr_buffer->aggr_val_.vstring; - if (aggr_buffer->AggrValEmpty() || StringCompare(ch, ch_length, aggr_val.data, aggr_val.len) < 0) { - if (aggr_val.data != NULL && ch_length > aggr_val.len) { - delete[] aggr_val.data; - aggr_val.data = NULL; + case DataType::kDouble: { + double val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val < aggr_buffer->aggr_val_.vdouble) { + aggr_buffer->aggr_val_.vdouble = val; } - if (aggr_val.data == NULL) { - aggr_val.data = new char[ch_length]; + break; + } + case DataType::kString: + case DataType::kVarchar: { + char* ch = NULL; + uint32_t ch_length = 0; + row_view.GetValue(row_ptr, aggr_col_idx_, &ch, &ch_length); + auto& aggr_val = aggr_buffer->aggr_val_.vstring; + if (aggr_buffer->AggrValEmpty() || StringCompare(ch, ch_length, aggr_val.data, aggr_val.len) < 0) { + if (aggr_val.data != NULL && ch_length > aggr_val.len) { + delete[] aggr_val.data; + aggr_val.data = NULL; + } + if (aggr_val.data == NULL) { + aggr_val.data = new char[ch_length]; + } + aggr_val.len = ch_length; + memcpy(aggr_val.data, ch, ch_length); } - aggr_val.len = ch_length; - memcpy(aggr_val.data, ch, ch_length); + break; + } + default: { + PDLOG(ERROR, "Unsupported data type"); + return false; } - break; - } - default: { - PDLOG(ERROR, "Unsupported data type"); - return false; } } aggr_buffer->non_null_cnt_++; @@ -875,75 +956,80 @@ MaxAggregator::MaxAggregator(const ::openmldb::api::TableMeta& base_meta, const : MinMaxBaseAggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col, window_tpye, window_size) {} -bool MaxAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) { - if (row_view.IsNULL(row_ptr, aggr_col_idx_)) { - return true; - } - switch (aggr_col_type_) { - case DataType::kSmallInt: { - int16_t val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vsmallint) { - aggr_buffer->aggr_val_.vsmallint = val; - } - break; +bool MaxAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse) { + if (reverse) { + return RebuildExtremumAggrBuffer(row_view, aggr_buffer, key); + } else { + if (row_view.IsNULL(row_ptr, aggr_col_idx_)) { + return true; } - case DataType::kDate: - case DataType::kInt: { - int32_t val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vint) { - aggr_buffer->aggr_val_.vint = val; + switch (aggr_col_type_) { + case DataType::kSmallInt: { + int16_t val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vsmallint) { + aggr_buffer->aggr_val_.vsmallint = val; + } + break; } - break; - } - case DataType::kTimestamp: - case DataType::kBigInt: { - int64_t val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vlong) { - aggr_buffer->aggr_val_.vlong = val; + case DataType::kDate: + case DataType::kInt: { + int32_t val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vint) { + aggr_buffer->aggr_val_.vint = val; + } + break; } - break; - } - case DataType::kFloat: { - float val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vfloat) { - aggr_buffer->aggr_val_.vfloat = val; + case DataType::kTimestamp: + case DataType::kBigInt: { + int64_t val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vlong) { + aggr_buffer->aggr_val_.vlong = val; + } + break; } - break; - } - case DataType::kDouble: { - double val; - row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vdouble) { - aggr_buffer->aggr_val_.vdouble = val; + case DataType::kFloat: { + float val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vfloat) { + aggr_buffer->aggr_val_.vfloat = val; + } + break; } - break; - } - case DataType::kString: - case DataType::kVarchar: { - char* ch = NULL; - uint32_t ch_length = 0; - row_view.GetValue(row_ptr, aggr_col_idx_, &ch, &ch_length); - auto& aggr_val = aggr_buffer->aggr_val_.vstring; - if (aggr_buffer->AggrValEmpty() || StringCompare(ch, ch_length, aggr_val.data, aggr_val.len) > 0) { - if (aggr_val.data != NULL && ch_length > aggr_val.len) { - delete[] aggr_val.data; - aggr_val.data = NULL; + case DataType::kDouble: { + double val; + row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); + if (aggr_buffer->AggrValEmpty() || val > aggr_buffer->aggr_val_.vdouble) { + aggr_buffer->aggr_val_.vdouble = val; } - if (aggr_val.data == NULL) { - aggr_val.data = new char[ch_length]; + break; + } + case DataType::kString: + case DataType::kVarchar: { + char* ch = NULL; + uint32_t ch_length = 0; + row_view.GetValue(row_ptr, aggr_col_idx_, &ch, &ch_length); + auto& aggr_val = aggr_buffer->aggr_val_.vstring; + if (aggr_buffer->AggrValEmpty() || StringCompare(ch, ch_length, aggr_val.data, aggr_val.len) > 0) { + if (aggr_val.data != NULL && ch_length > aggr_val.len) { + delete[] aggr_val.data; + aggr_val.data = NULL; + } + if (aggr_val.data == NULL) { + aggr_val.data = new char[ch_length]; + } + aggr_val.len = ch_length; + memcpy(aggr_val.data, ch, ch_length); } - aggr_val.len = ch_length; - memcpy(aggr_val.data, ch, ch_length); + break; + } + default: { + PDLOG(ERROR, "Unsupported data type"); + return false; } - break; - } - default: { - PDLOG(ERROR, "Unsupported data type"); - return false; } } aggr_buffer->non_null_cnt_++; @@ -978,9 +1064,14 @@ bool CountAggregator::DecodeAggrVal(const int8_t* row_ptr, AggrBuffer* buffer) { return true; } -bool CountAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) { +bool CountAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse) { if (count_all || !row_view.IsNULL(row_ptr, aggr_col_idx_)) { - aggr_buffer->non_null_cnt_++; + if (reverse) { + aggr_buffer->non_null_cnt_--; + } else { + aggr_buffer->non_null_cnt_++; + } } return true; } @@ -992,7 +1083,8 @@ AvgAggregator::AvgAggregator(const ::openmldb::api::TableMeta& base_meta, const : Aggregator(base_meta, aggr_meta, aggr_table, aggr_replicator, index_pos, aggr_col, aggr_type, ts_col, window_tpye, window_size) {} -bool AvgAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) { +bool AvgAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse) { if (row_view.IsNULL(row_ptr, aggr_col_idx_)) { return true; } @@ -1000,31 +1092,51 @@ bool AvgAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* case DataType::kSmallInt: { int16_t val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vdouble += val; + if (reverse) { + aggr_buffer->aggr_val_.vdouble -= val; + } else { + aggr_buffer->aggr_val_.vdouble += val; + } break; } case DataType::kInt: { int32_t val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vdouble += val; + if (reverse) { + aggr_buffer->aggr_val_.vdouble -= val; + } else { + aggr_buffer->aggr_val_.vdouble += val; + } break; } case DataType::kBigInt: { int64_t val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vdouble += val; + if (reverse) { + aggr_buffer->aggr_val_.vdouble -= val; + } else { + aggr_buffer->aggr_val_.vdouble += val; + } break; } case DataType::kFloat: { float val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vdouble += val; + if (reverse) { + aggr_buffer->aggr_val_.vdouble -= val; + } else { + aggr_buffer->aggr_val_.vdouble += val; + } break; } case DataType::kDouble: { double val; row_view.GetValue(row_ptr, aggr_col_idx_, aggr_col_type_, &val); - aggr_buffer->aggr_val_.vdouble += val; + if (reverse) { + aggr_buffer->aggr_val_.vdouble -= val; + } else { + aggr_buffer->aggr_val_.vdouble += val; + } break; } default: { @@ -1032,7 +1144,11 @@ bool AvgAggregator::UpdateAggrVal(const codec::RowView& row_view, const int8_t* return false; } } - aggr_buffer->non_null_cnt_++; + if (reverse) { + aggr_buffer->non_null_cnt_--; + } else { + aggr_buffer->non_null_cnt_++; + } return true; } diff --git a/src/storage/aggregator.h b/src/storage/aggregator.h index a9b293a1082..b5307eb9a3e 100644 --- a/src/storage/aggregator.h +++ b/src/storage/aggregator.h @@ -127,13 +127,14 @@ class Aggregator { ~Aggregator(); - bool Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover = false); + bool Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover = false, + bool reverse = false); bool Delete(const std::string& key); bool FlushAll(); - bool Init(std::shared_ptr base_replicator); + bool Init(std::shared_ptr base_replicator, std::shared_ptr
base_table); uint32_t GetIndexPos() const { return index_pos_; } @@ -170,14 +171,17 @@ class Aggregator { std::shared_ptr aggr_replicator_; std::atomic status_; + std::shared_ptr
base_table_; + bool GetAggrBufferFromRowView(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* buffer); bool FlushAggrBuffer(const std::string& key, const std::string& filter_key, const AggrBuffer& aggr_buffer); bool UpdateFlushedBuffer(const std::string& key, const std::string& filter_key, const int8_t* base_row_ptr, - int64_t cur_ts, uint64_t offset); + int64_t cur_ts, uint64_t offset, bool reverse); bool CheckBufferFilled(int64_t cur_ts, int64_t buffer_end, int32_t buffer_cnt); private: - virtual bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) = 0; + virtual bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse = false) = 0; virtual bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) = 0; virtual bool DecodeAggrVal(const int8_t* row_ptr, AggrBuffer* buffer) = 0; int64_t AlignedStart(int64_t ts) { @@ -220,7 +224,8 @@ class SumAggregator : public Aggregator { ~SumAggregator() = default; private: - bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override; + bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse = false) override; bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) override; @@ -236,10 +241,16 @@ class MinMaxBaseAggregator : public Aggregator { ~MinMaxBaseAggregator() = default; + bool RebuildExtremumAggrBuffer(const codec::RowView& row_view, AggrBuffer* aggr_buffer, + const std::string& key); + private: bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) override; bool DecodeAggrVal(const int8_t* row_ptr, AggrBuffer* buffer) override; + + bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse = false) override; }; class MinAggregator : public MinMaxBaseAggregator { public: @@ -251,7 +262,8 @@ class MinAggregator : public MinMaxBaseAggregator { ~MinAggregator() = default; private: - bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override; + bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse = false) override; }; class MaxAggregator : public MinMaxBaseAggregator { @@ -264,7 +276,8 @@ class MaxAggregator : public MinMaxBaseAggregator { ~MaxAggregator() = default; private: - bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override; + bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse = false) override; }; class CountAggregator : public Aggregator { @@ -277,7 +290,8 @@ class CountAggregator : public Aggregator { ~CountAggregator() = default; private: - bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override; + bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse = false) override; bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) override; @@ -296,7 +310,8 @@ class AvgAggregator : public Aggregator { ~AvgAggregator() = default; private: - bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override; + bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer, + const std::string& key, bool reverse = false) override; bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) override; diff --git a/src/storage/aggregator_test.cc b/src/storage/aggregator_test.cc index 54e6a6d97b7..3cb1e9b7b3f 100644 --- a/src/storage/aggregator_test.cc +++ b/src/storage/aggregator_test.cc @@ -104,6 +104,155 @@ bool UpdateAggr(std::shared_ptr aggr, codec::RowBuilder* row_builder return true; } +bool UpdateMinAggr(std::shared_ptr aggr, codec::RowBuilder* row_builder, + std::shared_ptr
base_table, int *delete_index) { + std::string encoded_row; + std::string delete_row; + auto window_size = aggr->GetWindowSize(); + *delete_index = 80; + std::string str1("abc"); + std::string str2("hello"); + for (int i = 0; i <= 100; i++) { + std::string str = i % 2 == 0 ? str1 : str2; + uint32_t row_size = row_builder->CalTotalLength(6 + str.size()); + encoded_row.resize(row_size); + row_builder->SetBuffer(reinterpret_cast(&(encoded_row[0])), row_size); + row_builder->AppendString("id1", 3); + row_builder->AppendString("id2", 3); + row_builder->AppendTimestamp(static_cast(i) * window_size / 2); + row_builder->AppendInt32(i); + row_builder->AppendInt16(i); + row_builder->AppendInt64(i); + row_builder->AppendFloat(static_cast(i)); + row_builder->AppendDouble(static_cast(i)); + row_builder->AppendDate(i); + row_builder->AppendString(str.c_str(), str.size()); + row_builder->AppendNULL(); + row_builder->AppendInt32(i % 2); + Dimensions dimensions; + auto dimension = dimensions.Add(); + dimension->set_idx(0); + dimension->set_key("id1|id2"); + if (i == *delete_index) { + delete_row = encoded_row; + } + bool ok = base_table->Put(static_cast(i) * window_size / 2, encoded_row, dimensions); + if (!ok) { + return false; + } + ok = aggr->Update("id1|id2", encoded_row, i); + if (!ok) { + return false; + } + } + bool ok = base_table->Delete("id1|id2", 0, (*delete_index) * window_size / 2); + if (!ok) { + PDLOG(ERROR, "base table delete failed"); + return false; + } + ok = aggr->Update("id1|id2", delete_row, 101, false, true); + if (!ok) { + PDLOG(ERROR, "aggr deletes one record failed"); + return false; + } + return true; +} + +bool UpdateMaxAggr(std::shared_ptr aggr, codec::RowBuilder* row_builder, + std::shared_ptr
base_table, int *delete_index) { + std::string encoded_row; + std::string delete_row; + *delete_index = 81; + auto window_size = aggr->GetWindowSize(); + std::string str1("abc"); + std::string str2("hello"); + for (int i = 0; i <= 100; i++) { + std::string str = i % 2 == 0 ? str1 : str2; + uint32_t row_size = row_builder->CalTotalLength(6 + str.size()); + encoded_row.resize(row_size); + row_builder->SetBuffer(reinterpret_cast(&(encoded_row[0])), row_size); + row_builder->AppendString("id1", 3); + row_builder->AppendString("id2", 3); + row_builder->AppendTimestamp(static_cast(i) * window_size / 2); + row_builder->AppendInt32(i); + row_builder->AppendInt16(i); + row_builder->AppendInt64(i); + row_builder->AppendFloat(static_cast(i)); + row_builder->AppendDouble(static_cast(i)); + row_builder->AppendDate(i); + row_builder->AppendString(str.c_str(), str.size()); + row_builder->AppendNULL(); + row_builder->AppendInt32(i % 2); + Dimensions dimensions; + auto dimension = dimensions.Add(); + dimension->set_idx(0); + dimension->set_key("id1|id2"); + if (i == *delete_index) { + delete_row = encoded_row; + } + bool ok = base_table->Put(static_cast(i) * window_size / 2, encoded_row, dimensions); + if (!ok) { + return false; + } + ok = aggr->Update("id1|id2", encoded_row, i); + if (!ok) { + return false; + } + } + bool ok = base_table->Delete("id1|id2", 0, *delete_index * window_size / 2); + if (!ok) { + PDLOG(ERROR, "base table deletes failed"); + return false; + } + ok = aggr->Update("id1|id2", delete_row, 101, false, true); + if (!ok) { + PDLOG(ERROR, "aggr deletes one record failed"); + return false; + } + return true; +} + + +bool DeleteAndUpdateAggr(std::shared_ptr aggr, codec::RowBuilder* row_builder, + int *delete_index) { + std::string encoded_row; + std::string delete_row; + auto window_size = aggr->GetWindowSize(); + std::string str1("abc"); + std::string str2("hello"); + *delete_index = 80; + for (int i = 0; i <= 100; i++) { + std::string str = i % 2 == 0 ? str1 : str2; + uint32_t row_size = row_builder->CalTotalLength(6 + str.size()); + encoded_row.resize(row_size); + row_builder->SetBuffer(reinterpret_cast(&(encoded_row[0])), row_size); + row_builder->AppendString("id1", 3); + row_builder->AppendString("id2", 3); + row_builder->AppendTimestamp(static_cast(i) * window_size / 2); + row_builder->AppendInt32(i); + row_builder->AppendInt16(i); + row_builder->AppendInt64(i); + row_builder->AppendFloat(static_cast(i)); + row_builder->AppendDouble(static_cast(i)); + row_builder->AppendDate(i); + row_builder->AppendString(str.c_str(), str.size()); + row_builder->AppendNULL(); + row_builder->AppendInt32(i % 2); + bool ok = aggr->Update("id1|id2", encoded_row, i); + if (!ok) { + return false; + } + if (i == *delete_index) { + delete_row = encoded_row; + } + } + bool ok = aggr->Update("id1|id2", delete_row, 101, false, true); + if (!ok) { + return false; + } + return true; +} + bool GetUpdatedResult(const uint32_t& id, const std::string& aggr_col, const std::string& aggr_type, const std::string& bucket_size, std::shared_ptr& aggregator, // NOLINT std::shared_ptr
& table, AggrBuffer** buffer) { // NOLINT @@ -127,7 +276,7 @@ bool GetUpdatedResult(const uint32_t& id, const std::string& aggr_col, const std std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); codec::RowBuilder row_builder(base_table_meta.column_desc()); UpdateAggr(aggr, &row_builder); std::string key = "id1|id2"; @@ -138,6 +287,113 @@ bool GetUpdatedResult(const uint32_t& id, const std::string& aggr_col, const std return true; } +bool GetMinUpdatedResult(const uint32_t& id, const std::string& aggr_col, const std::string& aggr_type, + const std::string& bucket_size, std::shared_ptr& aggregator, // NOLINT + std::shared_ptr
& table, AggrBuffer** buffer, int *delete_index) { // NOLINT + ::openmldb::api::TableMeta base_table_meta; + base_table_meta.set_tid(id); + AddDefaultAggregatorBaseSchema(&base_table_meta); + ::openmldb::api::TableMeta aggr_table_meta; + aggr_table_meta.set_tid(id + 1); + AddDefaultAggregatorSchema(&aggr_table_meta); + std::shared_ptr
base_table = std::make_shared(base_table_meta); + base_table->Init(); + std::shared_ptr
aggr_table = std::make_shared(aggr_table_meta); + aggr_table->Init(); + + // replicator + std::map map; + std::string folder = "/tmp/" + GenRand() + "/"; + std::shared_ptr replicator = std::make_shared( + aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode); + replicator->Init(); + auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, aggr_col, aggr_type, + "ts_col", bucket_size, "low_card"); + std::shared_ptr base_replicator = std::make_shared( + base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); + base_replicator->Init(); + aggr->Init(base_replicator, base_table); + codec::RowBuilder row_builder(base_table_meta.column_desc()); + UpdateMinAggr(aggr, &row_builder, base_table, delete_index); + std::string key = "id1|id2"; + aggregator = aggr; + aggr->GetAggrBuffer("id1|id2", buffer); + table = aggr_table; + ::openmldb::base::RemoveDirRecursive(folder); + return true; +} + + +bool GetMaxUpdatedResult(const uint32_t& id, const std::string& aggr_col, const std::string& aggr_type, + const std::string& bucket_size, std::shared_ptr& aggregator, // NOLINT + std::shared_ptr
& table, AggrBuffer** buffer, int *delete_index) { // NOLINT + ::openmldb::api::TableMeta base_table_meta; + base_table_meta.set_tid(id); + AddDefaultAggregatorBaseSchema(&base_table_meta); + ::openmldb::api::TableMeta aggr_table_meta; + aggr_table_meta.set_tid(id + 1); + AddDefaultAggregatorSchema(&aggr_table_meta); + std::shared_ptr
base_table = std::make_shared(base_table_meta); + base_table->Init(); + std::shared_ptr
aggr_table = std::make_shared(aggr_table_meta); + aggr_table->Init(); + + // replicator + std::map map; + std::string folder = "/tmp/" + GenRand() + "/"; + std::shared_ptr replicator = std::make_shared( + aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode); + replicator->Init(); + auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, aggr_col, aggr_type, + "ts_col", bucket_size, "low_card"); + std::shared_ptr base_replicator = std::make_shared( + base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); + base_replicator->Init(); + aggr->Init(base_replicator, base_table); + codec::RowBuilder row_builder(base_table_meta.column_desc()); + UpdateMaxAggr(aggr, &row_builder, base_table, delete_index); + std::string key = "id1|id2"; + aggregator = aggr; + aggr->GetAggrBuffer("id1|id2", buffer); + table = aggr_table; + ::openmldb::base::RemoveDirRecursive(folder); + return true; +} + +bool GetDeleteAndUpdatedResult(const uint32_t& id, const std::string& aggr_col, const std::string& aggr_type, + const std::string& bucket_size, std::shared_ptr& aggregator, // NOLINT + std::shared_ptr
& table, AggrBuffer** buffer, int *delete_index) { // NOLINT + ::openmldb::api::TableMeta base_table_meta; + base_table_meta.set_tid(id); + AddDefaultAggregatorBaseSchema(&base_table_meta); + ::openmldb::api::TableMeta aggr_table_meta; + aggr_table_meta.set_tid(id + 1); + AddDefaultAggregatorSchema(&aggr_table_meta); + std::shared_ptr
aggr_table = std::make_shared(aggr_table_meta); + aggr_table->Init(); + + // replicator + std::map map; + std::string folder = "/tmp/" + GenRand() + "/"; + std::shared_ptr replicator = std::make_shared( + aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode); + replicator->Init(); + auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, aggr_col, aggr_type, + "ts_col", bucket_size, "low_card"); + std::shared_ptr base_replicator = std::make_shared( + base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); + base_replicator->Init(); + aggr->Init(base_replicator, nullptr); + codec::RowBuilder row_builder(base_table_meta.column_desc()); + DeleteAndUpdateAggr(aggr, &row_builder, delete_index); + std::string key = "id1|id2"; + aggregator = aggr; + aggr->GetAggrBuffer("id1|id2", buffer); + table = aggr_table; + ::openmldb::base::RemoveDirRecursive(folder); + return true; +} + template void CheckSumAggrResult(std::shared_ptr
aggr_table, DataType data_type, std::shared_ptr aggr, int32_t expect_null = 0) { @@ -173,6 +429,45 @@ void CheckSumAggrResult(std::shared_ptr
aggr_table, DataType data_type, s return; } +template +void CheckSumAggrResultAfterDelete(std::shared_ptr
aggr_table, DataType data_type, + std::shared_ptr aggr, int delete_index, + int32_t expect_null = 0) { + auto it = aggr_table->NewTraverseIterator(0); + it->SeekToFirst(); + int64_t window_size = aggr->GetWindowSize(); + for (int i = 50 - 1; i >= 0; --i) { + ASSERT_TRUE(it->Valid()); + auto tmp_val = it->GetValue(); + std::string origin_data = tmp_val.ToString(); + codec::RowView origin_row_view(aggr_table->GetTableMeta()->column_desc(), + reinterpret_cast(const_cast(origin_data.c_str())), + origin_data.size()); + int64_t ts_start, ts_end; + origin_row_view.GetTimestamp(1, &ts_start); + origin_row_view.GetTimestamp(2, &ts_end); + ASSERT_EQ(ts_start, i * window_size); + ASSERT_EQ(ts_end, i * window_size + window_size - 1); + int num_rows; + origin_row_view.GetInt32(3, &num_rows); + char* ch = NULL; + uint32_t ch_length = 0; + origin_row_view.GetString(4, &ch, &ch_length); + T origin_val = *(reinterpret_cast(ch)); + if (expect_null) { + ASSERT_EQ(origin_val, 0); + } else { + if (i == delete_index / 2) { + ASSERT_EQ(origin_val, static_cast(i * 2 + 1)); + } else { + ASSERT_EQ(origin_val, static_cast(i * 4 + 1)); + } + } + it->Next(); + } + return; +} + template void CheckMinAggrResult(std::shared_ptr
aggr_table, DataType data_type, int32_t expect_null = 0) { ASSERT_EQ(aggr_table->GetRecordCnt(), 50); @@ -198,6 +493,69 @@ void CheckMinAggrResult(std::shared_ptr
aggr_table, DataType data_type, i return; } +template +void CheckMinAggrResultAfterDelete(std::shared_ptr
aggr_table, DataType data_type, + int delete_index, int32_t expect_null = 0) { + auto it = aggr_table->NewTraverseIterator(0); + it->SeekToFirst(); + int i = 50 - 1; + while (it->Valid()) { + auto tmp_val = it->GetValue(); + std::string origin_data = tmp_val.ToString(); + codec::RowView origin_row_view(aggr_table->GetTableMeta()->column_desc(), + reinterpret_cast(const_cast(origin_data.c_str())), + origin_data.size()); + char* ch = NULL; + uint32_t ch_length = 0; + auto is_null = origin_row_view.GetString(4, &ch, &ch_length); + ASSERT_EQ(is_null, expect_null); + if (is_null == 0) { + T origin_val = *reinterpret_cast(ch); + if (i == delete_index / 2) { + ASSERT_EQ(origin_val, static_cast(i * 2 + 1)); + } else { + ASSERT_EQ(origin_val, static_cast(i * 2)); + } + } + it->Next(); + i--; + } + ASSERT_EQ(i, -1); + return; +} + + +template +void CheckMaxAggrResultAfterDelete(std::shared_ptr
aggr_table, DataType data_type, + int delete_index, int32_t expect_null = 0) { + auto it = aggr_table->NewTraverseIterator(0); + it->SeekToFirst(); + int i = 50 - 1; + while (it->Valid()) { + auto tmp_val = it->GetValue(); + std::string origin_data = tmp_val.ToString(); + codec::RowView origin_row_view(aggr_table->GetTableMeta()->column_desc(), + reinterpret_cast(const_cast(origin_data.c_str())), + origin_data.size()); + char* ch = NULL; + uint32_t ch_length = 0; + auto is_null = origin_row_view.GetString(4, &ch, &ch_length); + ASSERT_EQ(is_null, expect_null); + if (is_null == 0) { + T origin_val = *reinterpret_cast(ch); + if (i == delete_index / 2) { + ASSERT_EQ(origin_val, static_cast(i * 2)); + } else { + ASSERT_EQ(origin_val, static_cast(i * 2 + 1)); + } + } + it->Next(); + i--; + } + ASSERT_EQ(i, -1); + return; +} + template void CheckMaxAggrResult(std::shared_ptr
aggr_table, DataType data_type, int32_t expect_null = 0) { ASSERT_EQ(aggr_table->GetRecordCnt(), 50); @@ -338,7 +696,7 @@ TEST_F(AggregatorTest, CreateAggregator) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); ASSERT_TRUE(aggr != nullptr); ASSERT_EQ(aggr->GetAggrType(), AggrType::kSum); ASSERT_EQ(aggr->GetWindowType(), WindowType::kRowsNum); @@ -364,7 +722,7 @@ TEST_F(AggregatorTest, CreateAggregator) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); ASSERT_TRUE(aggr != nullptr); ASSERT_EQ(aggr->GetAggrType(), AggrType::kSum); ASSERT_EQ(aggr->GetWindowType(), WindowType::kRowsRange); @@ -389,7 +747,7 @@ TEST_F(AggregatorTest, CreateAggregator) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); ASSERT_TRUE(aggr != nullptr); ASSERT_EQ(aggr->GetAggrType(), AggrType::kSum); ASSERT_EQ(aggr->GetWindowType(), WindowType::kRowsRange); @@ -414,7 +772,7 @@ TEST_F(AggregatorTest, CreateAggregator) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); ASSERT_TRUE(aggr != nullptr); ASSERT_EQ(aggr->GetAggrType(), AggrType::kSum); ASSERT_EQ(aggr->GetWindowType(), WindowType::kRowsRange); @@ -439,7 +797,7 @@ TEST_F(AggregatorTest, CreateAggregator) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); ASSERT_TRUE(aggr != nullptr); ASSERT_EQ(aggr->GetAggrType(), AggrType::kSum); ASSERT_EQ(aggr->GetWindowType(), WindowType::kRowsRange); @@ -474,7 +832,7 @@ TEST_F(AggregatorTest, SumAggregatorUpdate) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); codec::RowBuilder row_builder(base_table_meta.column_desc()); ASSERT_TRUE(UpdateAggr(aggr, &row_builder)); std::string key = "id1|id2"; @@ -540,6 +898,78 @@ TEST_F(AggregatorTest, SumAggregatorUpdate) { } } +TEST_F(AggregatorTest, SumAggregatorDelete) { + // rows_num window type + { + std::map map; + std::string folder = "/tmp/" + GenRand() + "/"; + uint32_t id = counter++; + int delete_index = 0; + ::openmldb::api::TableMeta base_table_meta; + base_table_meta.set_tid(id); + AddDefaultAggregatorBaseSchema(&base_table_meta); + id = counter++; + ::openmldb::api::TableMeta aggr_table_meta; + aggr_table_meta.set_tid(id); + AddDefaultAggregatorSchema(&aggr_table_meta); + std::map mapping; + mapping.insert(std::make_pair("idx", 0)); + std::shared_ptr
aggr_table = std::make_shared(aggr_table_meta); + aggr_table->Init(); + std::shared_ptr replicator = std::make_shared( + aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode); + replicator->Init(); + auto aggr = + CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "sum", "ts_col", "2"); + std::shared_ptr base_replicator = std::make_shared( + base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); + base_replicator->Init(); + aggr->Init(base_replicator, nullptr); + codec::RowBuilder row_builder(base_table_meta.column_desc()); + ASSERT_TRUE(DeleteAndUpdateAggr(aggr, &row_builder, &delete_index)); + std::string key = "id1|id2"; + auto it = aggr_table->NewTraverseIterator(0); + it->SeekToFirst(); + for (int i = 50 - 1; i >= 0; --i) { + ASSERT_TRUE(it->Valid()); + auto tmp_val = it->GetValue(); + std::string origin_data = tmp_val.ToString(); + codec::RowView origin_row_view(aggr_table_meta.column_desc(), + reinterpret_cast(const_cast(origin_data.c_str())), + origin_data.size()); + char* ch = NULL; + uint32_t ch_length = 0; + origin_row_view.GetString(4, &ch, &ch_length); + int32_t val = *reinterpret_cast(ch); + if (i == delete_index / 2) { + ASSERT_EQ(val, i * 2 + 1); + } else { + ASSERT_EQ(val, i * 4 + 1); + } + it->Next(); + } + AggrBuffer* last_buffer; + auto ok = aggr->GetAggrBuffer(key, &last_buffer); + ASSERT_TRUE(ok); + ASSERT_EQ(last_buffer->aggr_cnt_, 1); + ASSERT_EQ(last_buffer->aggr_val_.vlong, 100); + ASSERT_EQ(last_buffer->binlog_offset_, 100); + ::openmldb::base::RemoveDirRecursive(folder); + } + // rows_range window type + { + int delete_index = 0; + std::shared_ptr aggregator; + AggrBuffer* last_buffer; + std::shared_ptr
aggr_table; + ASSERT_TRUE(GetDeleteAndUpdatedResult(counter, "col3", "sum", "1s", aggregator, + aggr_table, &last_buffer, &delete_index)); + CheckSumAggrResultAfterDelete(aggr_table, DataType::kInt, aggregator, delete_index); + ASSERT_EQ(last_buffer->aggr_val_.vlong, 100); + ASSERT_EQ(last_buffer->non_null_cnt_, static_cast(1)); + } +} + TEST_F(AggregatorTest, MinAggregatorUpdate) { std::shared_ptr aggregator; AggrBuffer* last_buffer; @@ -597,6 +1027,21 @@ TEST_F(AggregatorTest, MinAggregatorUpdate) { ASSERT_EQ(last_buffer->non_null_cnt_, static_cast(1)); } +TEST_F(AggregatorTest, PeakAggregatorDelete) { + std::shared_ptr aggregator; + AggrBuffer* last_buffer; + std::shared_ptr
aggr_table; + int delete_index; + ASSERT_TRUE(GetMinUpdatedResult(counter, "col3", "MIN", "1s", aggregator, + aggr_table, &last_buffer, &delete_index)); + CheckMinAggrResultAfterDelete(aggr_table, DataType::kInt, delete_index); + counter += 2; + ASSERT_TRUE(GetMaxUpdatedResult(counter, "col4", "MAX", "1m", aggregator, + aggr_table, &last_buffer, &delete_index)); + CheckMaxAggrResultAfterDelete(aggr_table, DataType::kSmallInt, delete_index); + +} + TEST_F(AggregatorTest, MaxAggregatorUpdate) { std::shared_ptr aggregator; AggrBuffer* last_buffer; @@ -741,7 +1186,7 @@ TEST_F(AggregatorTest, OutOfOrder) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); codec::RowBuilder row_builder(base_table_meta.column_desc()); std::string encoded_row; uint32_t row_size = row_builder.CalTotalLength(6); @@ -784,7 +1229,7 @@ TEST_F(AggregatorTest, OutOfOrder) { int32_t update_val = *reinterpret_cast(ch); ASSERT_EQ(update_val, 201); - // the old agg val + // the old agg val has been deleted. it->Next(); { auto val = it->GetValue(); @@ -799,7 +1244,7 @@ TEST_F(AggregatorTest, OutOfOrder) { origin_row_view.GetString(4, &ch, &ch_length); ASSERT_EQ(origin_cnt, 2); int32_t update_val = *reinterpret_cast(ch); - ASSERT_EQ(update_val, 101); + ASSERT_EQ(update_val, 97); } ::openmldb::base::RemoveDirRecursive(folder); } @@ -825,7 +1270,7 @@ TEST_F(AggregatorTest, OutOfOrderCountWhere) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); codec::RowBuilder row_builder(base_table_meta.column_desc()); ASSERT_TRUE(UpdateAggr(aggr, &row_builder)); ASSERT_EQ(aggr_table->GetRecordCnt(), 99); @@ -931,7 +1376,7 @@ TEST_F(AggregatorTest, AlignedCountWhere) { std::shared_ptr base_replicator = std::make_shared( base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode); base_replicator->Init(); - aggr->Init(base_replicator); + aggr->Init(base_replicator, nullptr); codec::RowBuilder row_builder(base_table_meta.column_desc()); ASSERT_TRUE(UpdateAggr(aggr, &row_builder)); ASSERT_EQ(aggr_table->GetRecordCnt(), 99); @@ -1139,4 +1584,4 @@ TEST_F(AggregatorTest, FlushAll) { int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} +} \ No newline at end of file diff --git a/src/storage/mem_table.cc b/src/storage/mem_table.cc index 5e8ddfdf07e..203dbb6fd28 100644 --- a/src/storage/mem_table.cc +++ b/src/storage/mem_table.cc @@ -330,6 +330,7 @@ void MemTable::SchedGc() { Segment* segment = segments_[i][j]; segment->IncrGcVersion(); segment->GcFreeList(gc_idx_cnt, gc_record_cnt, gc_record_byte_size); + segment->GcTsFreeList(gc_idx_cnt, gc_record_cnt, gc_record_byte_size); if (ttl_st_map.size() == 1) { segment->ExecuteGc(ttl_st_map.begin()->second, gc_idx_cnt, gc_record_cnt, gc_record_byte_size); } else { diff --git a/src/storage/segment.cc b/src/storage/segment.cc index 2c91eba38f3..8446f7c5a30 100644 --- a/src/storage/segment.cc +++ b/src/storage/segment.cc @@ -493,6 +493,15 @@ void Segment::GcTsEntryFreeList(uint64_t version, uint64_t& gc_idx_cnt, uint64_t } } +void Segment::GcTsFreeList(uint64_t& gc_idx_cnt, uint64_t& gc_record_cnt, uint64_t& gc_record_byte_size) { + uint64_t cur_version = gc_version_.load(std::memory_order_relaxed); + if (cur_version < FLAGS_gc_deleted_pk_version_delta) { + return; + } + uint64_t free_list_version = cur_version - FLAGS_gc_deleted_pk_version_delta; + GcTsEntryFreeList(free_list_version, gc_idx_cnt, gc_record_cnt, gc_record_byte_size); +} + void Segment::GcFreeList(uint64_t& gc_idx_cnt, uint64_t& gc_record_cnt, uint64_t& gc_record_byte_size) { uint64_t cur_version = gc_version_.load(std::memory_order_relaxed); if (cur_version < FLAGS_gc_deleted_pk_version_delta) { diff --git a/src/storage/segment.h b/src/storage/segment.h index 9feb5431f1a..2b741049198 100644 --- a/src/storage/segment.h +++ b/src/storage/segment.h @@ -231,6 +231,10 @@ class Segment { inline uint64_t GetPkCnt() { return pk_cnt_.load(std::memory_order_relaxed); } + void GcTsFreeList(uint64_t& gc_idx_cnt, // NOLINT + uint64_t& gc_record_cnt, // NOLINT + uint64_t& gc_record_byte_size); // NOLINT + void GcFreeList(uint64_t& entry_gc_idx_cnt, // NOLINT uint64_t& gc_record_cnt, // NOLINT uint64_t& gc_record_byte_size); // NOLINT diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index b7ca85d2e90..7180d87e6f9 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -773,7 +773,7 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques // in case there will be other Put jump into the middle auto update_aggr = [this, &request, &ok, &entry]() { ok = UpdateAggrs(request->tid(), request->pid(), request->value(), - request->dimensions(), entry.log_index()); + request->dimensions(), entry.log_index()); }; UpdateAggrClosure closure(update_aggr); replicator->AppendEntry(entry, &closure); @@ -1541,6 +1541,7 @@ void TabletImpl::Traverse(RpcController* controller, const ::openmldb::api::Trav void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::DeleteRequest* request, openmldb::api::GeneralResponse* response, Closure* done) { brpc::ClosureGuard done_guard(done); + std::string row; if (follower_.load(std::memory_order_relaxed)) { response->set_code(::openmldb::base::ReturnCode::kIsFollowerCluster); response->set_msg("is follower cluster"); @@ -1566,6 +1567,7 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete return; } uint32_t idx = 0; + bool ok = false; if (request->has_idx_name() && request->idx_name().size() > 0) { std::shared_ptr index_def = table->GetIndex(request->idx_name()); if (!index_def || !index_def->IsReady()) { @@ -1579,6 +1581,11 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete } else if (request->has_idx()) { idx = request->idx(); } + if (request->has_ts()) { + auto it = table->NewTraverseIterator(idx); + it->Seek(request->key(), request->ts()); + row = it->GetValue().ToString(); + } if ((request->has_ts() ? table->Delete(request->key(), idx, request->ts()) : table->Delete(request->key(), idx))) { response->set_code(::openmldb::base::ReturnCode::kOk); response->set_msg("ok"); @@ -1596,14 +1603,16 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete if (aggr->GetIndexPos() != idx) { continue; } - auto ok = aggr->Delete(request->key()); - if (!ok) { - PDLOG(WARNING, - "delete from aggr failed. base table: tid[%u] pid[%u] index[%u] key[%s]. aggr table: tid[%u]", - request->tid(), request->pid(), idx, request->key().c_str(), aggr->GetAggrTid()); - response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); - response->set_msg("delete from associated pre-aggr table failed"); - return; + if (!request->has_ts()) { + ok = aggr->Delete(request->key()); + if (!ok) { + PDLOG(WARNING, + "delete from aggr failed. base table: tid[%u] pid[%u] index[%u] key[%s]. aggr table: tid[%u]", + request->tid(), request->pid(), idx, request->key().c_str(), aggr->GetAggrTid()); + response->set_code(::openmldb::base::ReturnCode::kDeleteFailed); + response->set_msg("delete from associated pre-aggr table failed"); + return; + } } } } @@ -1624,7 +1633,29 @@ void TabletImpl::Delete(RpcController* controller, const ::openmldb::api::Delete ::openmldb::api::Dimension* dimension = entry.add_dimensions(); dimension->set_key(request->key()); dimension->set_idx(idx); - replicator->AppendEntry(entry); + auto update_aggr = [this, &request, &ok, &entry, &aggrs, &idx, &row]() { + ok = true; + if (!request->has_ts()) return; + for (const auto& aggr : *aggrs) { + if (aggr->GetIndexPos() != idx) { + continue; + } + ok = aggr->Update(request->key(), row, entry.log_index(), false, true); + if (!ok) { + PDLOG(WARNING, + "update from aggr failed. base table: tid[%u] pid[%u] index[%u] key[%s]. aggr table: tid[%u]", + request->tid(), request->pid(), idx, request->key().c_str(), aggr->GetAggrTid()); + return; + } + } + }; + UpdateAggrClosure closure(update_aggr); + replicator->AppendEntry(entry, &closure); + if (!ok) { + response->set_code(::openmldb::base::ReturnCode::kError); + response->set_msg("update aggr failed"); + return; + } } while (false); if (replicator && FLAGS_binlog_notify_on_put) { replicator->Notify(); @@ -3122,7 +3153,7 @@ int TabletImpl::LoadTableInternal(uint32_t tid, uint32_t pid, std::shared_ptr<:: auto aggrs = GetAggregators(tid, pid); if (aggrs != nullptr) { for (auto& aggr : *aggrs) { - if (!aggr->Init(replicator)) { + if (!aggr->Init(replicator, table)) { PDLOG(WARNING, "aggregator init failed"); } } @@ -5850,7 +5881,8 @@ bool TabletImpl::CreateAggregatorInternal(const ::openmldb::api::CreateAggregato } auto base_replicator = GetReplicator(base_meta->tid(), base_meta->pid()); - if (!aggregator->Init(base_replicator)) { + auto table = GetTable(base_meta->tid(), base_meta->pid()); + if (!aggregator->Init(base_replicator, table)) { PDLOG(WARNING, "aggregator init failed"); } uint64_t uid = (uint64_t) base_meta->tid() << 32 | base_meta->pid();