Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support delete in pre-aggregator #2689

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
935fef1
feat: Delete record interface of specified timestamp
XLC-2 Aug 11, 2022
39f9221
fix: protect deletion process with lock and add judgment on the use o…
XLC-2 Aug 12, 2022
2991374
fix: use idx instead of idx_name when rollback
XLC-2 Aug 17, 2022
4c52a8f
fix: delete redundant code
XLC-2 Aug 17, 2022
b4a3b5c
fix: fix bugs found in code review
XLC-2 Aug 28, 2022
4aa4228
fix: judge nullptr after deleting a record from skiplist
XLC-2 Aug 29, 2022
ddfd423
fix: skip deleting a record when it is in use
XLC-2 Aug 29, 2022
aa5c0d4
fix: snapshot remove deleted key when ts_col in index and add test cases
Sep 6, 2022
97b6fa5
fix: fix compile bugs about snapshot_test and improve the process of …
Sep 10, 2022
5e6dcfb
fix: 1. CollectDeletedKey and RemoveDeletedKey have the same logic to…
Sep 18, 2022
245e986
fix: fix cpplint error
Sep 20, 2022
1ae48b3
fix: 1. parse the ts column only instead of the entire record when ne…
Sep 23, 2022
62ee01f
fix: 1. use version decoder decode ts column; 2. update statistical d…
Sep 29, 2022
82628f2
fix: when there is no record in the corresponding keyEntry after the …
Oct 2, 2022
fa3a8a6
Merge branch 'fix/data-consistency-pre-aggr2' of github.com:XLC-2/Ope…
Oct 19, 2022
f9780e4
feat: update aggr table when deleting a record
Oct 19, 2022
5b7228d
fix: The maximum pre aggregation table or minimum pre aggregation tab…
Oct 23, 2022
e51b609
fix: modify the process of updating the extreme value aggregation table
Oct 26, 2022
efd164d
fix: updating aggr table when tablet deletes a record
Oct 28, 2022
a4cf474
fix: use the same function to rebuild aggr buffer for MinAggr and Max…
Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
559 changes: 409 additions & 150 deletions src/storage/aggregator.cc

Large diffs are not rendered by default.

30 changes: 21 additions & 9 deletions src/storage/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ class Aggregator {

~Aggregator();

bool Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover = false);
bool Update(const std::string& key, const std::string& row, const uint64_t& offset, bool recover = false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call this function in Delete method of tablet_impl.cc

bool reverse = false);

bool DeleteAndUpdate(const std::string& key, const std::string& row,
const uint64_t& offset, bool recover = false);

bool Delete(const std::string& key);

bool FlushAll();

bool Init(std::shared_ptr<LogReplicator> base_replicator);
bool Init(std::shared_ptr<LogReplicator> base_replicator, std::shared_ptr<Table> base_table);

uint32_t GetIndexPos() const { return index_pos_; }

Expand Down Expand Up @@ -170,14 +174,17 @@ class Aggregator {
std::shared_ptr<LogReplicator> aggr_replicator_;
std::atomic<AggrStat> status_;

std::shared_ptr<Table> base_table_;

bool GetAggrBufferFromRowView(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* buffer);
bool FlushAggrBuffer(const std::string& key, const std::string& filter_key, const AggrBuffer& aggr_buffer);
bool UpdateFlushedBuffer(const std::string& key, const std::string& filter_key, const int8_t* base_row_ptr,
int64_t cur_ts, uint64_t offset);
int64_t cur_ts, uint64_t offset, bool reverse);
bool CheckBufferFilled(int64_t cur_ts, int64_t buffer_end, int32_t buffer_cnt);

private:
virtual bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) = 0;
virtual bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer,
bool reverse = false) = 0;
virtual bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) = 0;
virtual bool DecodeAggrVal(const int8_t* row_ptr, AggrBuffer* buffer) = 0;
int64_t AlignedStart(int64_t ts) {
Expand Down Expand Up @@ -220,7 +227,8 @@ class SumAggregator : public Aggregator {
~SumAggregator() = default;

private:
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override;
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer,
bool reverse = false) override;

bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) override;

Expand Down Expand Up @@ -251,7 +259,8 @@ class MinAggregator : public MinMaxBaseAggregator {
~MinAggregator() = default;

private:
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override;
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer,
bool reverse = false) override;
};

class MaxAggregator : public MinMaxBaseAggregator {
Expand All @@ -264,7 +273,8 @@ class MaxAggregator : public MinMaxBaseAggregator {
~MaxAggregator() = default;

private:
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override;
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer,
bool reverse = false) override;
};

class CountAggregator : public Aggregator {
Expand All @@ -277,7 +287,8 @@ class CountAggregator : public Aggregator {
~CountAggregator() = default;

private:
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override;
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer,
bool reverse = false) override;

bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) override;

Expand All @@ -296,7 +307,8 @@ class AvgAggregator : public Aggregator {
~AvgAggregator() = default;

private:
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) override;
bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer,
bool reverse = false) override;

bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) override;

Expand Down
Loading