From 539c6fc02104272e5f095b53a197057bc585de62 Mon Sep 17 00:00:00 2001 From: dl239 Date: Tue, 31 Oct 2023 14:56:51 +0800 Subject: [PATCH] fix: fix select --- src/cmd/sql_cmd_test.cc | 31 +++++++++++++ src/storage/disk_table.cc | 13 +++--- src/storage/disk_table_iterator.cc | 72 +++++++++++++++++++++--------- src/storage/disk_table_iterator.h | 27 +++++++---- src/storage/mem_table.cc | 10 +++-- src/storage/mem_table_iterator.cc | 34 ++++++++++---- src/storage/mem_table_iterator.h | 16 +++++-- src/tablet/tablet_impl.cc | 28 +++++++----- 8 files changed, 167 insertions(+), 64 deletions(-) diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index c5a78551a3d..f2fa7ae1f45 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -331,6 +331,37 @@ TEST_P(DBSDKTest, Select) { ASSERT_TRUE(status.IsOK()); } +TEST_P(DBSDKTest, SelectSnappy) { + auto cli = GetParam(); + cs = cli->cs; + sr = cli->sr; + hybridse::sdk::Status status; + if (cs->IsClusterMode()) { + sr->ExecuteSQL("SET @@execute_mode='online';", &status); + ASSERT_TRUE(status.IsOK()) << "error msg: " + status.msg; + } + std::string db = "db" + GenRand(); + sr->ExecuteSQL("create database " + db + ";", &status); + ASSERT_TRUE(status.IsOK()); + sr->ExecuteSQL("use " + db + ";", &status); + ASSERT_TRUE(status.IsOK()); + std::string create_sql = + "create table trans (c1 string, c2 bigint, c3 date," + "index(key=c1, ts=c2, abs_ttl=0, ttl_type=absolute)) options (compress_type='snappy');"; + sr->ExecuteSQL(create_sql, &status); + ASSERT_TRUE(status.IsOK()); + std::string insert_sql = "insert into trans values ('aaa', 1635247427000, \"2021-05-20\");"; + sr->ExecuteSQL(insert_sql, &status); + ASSERT_TRUE(status.IsOK()); + auto rs = sr->ExecuteSQL("select * from trans", &status); + ASSERT_TRUE(status.IsOK()); + ASSERT_EQ(1, rs->Size()); + sr->ExecuteSQL("drop table trans;", &status); + ASSERT_TRUE(status.IsOK()); + sr->ExecuteSQL("drop database " + db + ";", &status); + ASSERT_TRUE(status.IsOK()); +} + TEST_F(SqlCmdTest, SelectMultiPartition) { auto sr = cluster_cli.sr; std::string db_name = "test" + GenRand(); diff --git a/src/storage/disk_table.cc b/src/storage/disk_table.cc index 8f508bac6c5..beb1c2fc9e3 100644 --- a/src/storage/disk_table.cc +++ b/src/storage/disk_table.cc @@ -543,10 +543,10 @@ TableIterator* DiskTable::NewIterator(uint32_t idx, const std::string& pk, Ticke if (inner_index && inner_index->GetIndex().size() > 1) { auto ts_col = index_def->GetTsColumn(); if (ts_col) { - return new DiskTableIterator(db_, it, snapshot, pk, ts_col->GetId()); + return new DiskTableIterator(db_, it, snapshot, pk, ts_col->GetId(), GetCompressType()); } } - return new DiskTableIterator(db_, it, snapshot, pk); + return new DiskTableIterator(db_, it, snapshot, pk, GetCompressType()); } TraverseIterator* DiskTable::NewTraverseIterator(uint32_t index) { @@ -569,10 +569,10 @@ TraverseIterator* DiskTable::NewTraverseIterator(uint32_t index) { auto ts_col = index_def->GetTsColumn(); if (ts_col) { return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, - ts_col->GetId()); + ts_col->GetId(), GetCompressType()); } } - return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt); + return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, GetCompressType()); } ::hybridse::vm::WindowIterator* DiskTable::NewWindowIterator(uint32_t idx) { @@ -595,10 +595,11 @@ ::hybridse::vm::WindowIterator* DiskTable::NewWindowIterator(uint32_t idx) { auto ts_col = index_def->GetTsColumn(); if (ts_col) { return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, - ts_col->GetId(), cf_hs_[inner_pos + 1]); + ts_col->GetId(), cf_hs_[inner_pos + 1], GetCompressType()); } } - return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, cf_hs_[inner_pos + 1]); + return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, + cf_hs_[inner_pos + 1], GetCompressType()); } bool DiskTable::DeleteIndex(const std::string& idx_name) { diff --git a/src/storage/disk_table_iterator.cc b/src/storage/disk_table_iterator.cc index 7b78bec4f3e..d934715e880 100644 --- a/src/storage/disk_table_iterator.cc +++ b/src/storage/disk_table_iterator.cc @@ -15,7 +15,7 @@ */ #include "storage/disk_table_iterator.h" - +#include #include #include "gflags/gflags.h" #include "storage/key_transform.h" @@ -26,12 +26,12 @@ namespace openmldb { namespace storage { DiskTableIterator::DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, - const std::string& pk) - : db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0) {} + const std::string& pk, type::CompressType compress_type) + : db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), compress_type_(compress_type) {} DiskTableIterator::DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, - const std::string& pk, uint32_t ts_idx) - : db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), ts_idx_(ts_idx) { + const std::string& pk, uint32_t ts_idx, type::CompressType compress_type) + : db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), ts_idx_(ts_idx), compress_type_(compress_type) { has_ts_idx_ = true; } @@ -55,7 +55,13 @@ void DiskTableIterator::Next() { return it_->Next(); } openmldb::base::Slice DiskTableIterator::GetValue() const { rocksdb::Slice value = it_->value(); - return openmldb::base::Slice(value.data(), value.size()); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(value.data(), value.size(), &tmp_buf_); + return openmldb::base::Slice(tmp_buf_); + } else { + return openmldb::base::Slice(value.data(), value.size()); + } } std::string DiskTableIterator::GetPK() const { return pk_; } @@ -85,7 +91,8 @@ void DiskTableIterator::Seek(const uint64_t ts) { DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, - const uint64_t& expire_cnt) + const uint64_t& expire_cnt, + type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -93,12 +100,14 @@ DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::I expire_value_(expire_time, expire_cnt, ttl_type), has_ts_idx_(false), ts_idx_(0), - traverse_cnt_(0) {} + traverse_cnt_(0), + compress_type_(compress_type) {} DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, - const uint64_t& expire_cnt, int32_t ts_idx) + const uint64_t& expire_cnt, int32_t ts_idx, + type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -106,7 +115,8 @@ DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::I expire_value_(expire_time, expire_cnt, ttl_type), has_ts_idx_(true), ts_idx_(ts_idx), - traverse_cnt_(0) {} + traverse_cnt_(0), + compress_type_(compress_type) {} DiskTableTraverseIterator::~DiskTableTraverseIterator() { delete it_; @@ -154,6 +164,11 @@ void DiskTableTraverseIterator::Next() { openmldb::base::Slice DiskTableTraverseIterator::GetValue() const { rocksdb::Slice value = it_->value(); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(value.data(), value.size(), &tmp_buf_); + return openmldb::base::Slice(tmp_buf_); + } return openmldb::base::Slice(value.data(), value.size()); } @@ -297,7 +312,8 @@ void DiskTableTraverseIterator::NextPK() { DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt, - rocksdb::ColumnFamilyHandle* column_handle) + rocksdb::ColumnFamilyHandle* column_handle, + type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -306,12 +322,14 @@ DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* i expire_cnt_(expire_cnt), has_ts_idx_(false), ts_idx_(0), - column_handle_(column_handle) {} + column_handle_(column_handle), + compress_type_(compress_type) {} DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt, int32_t ts_idx, - rocksdb::ColumnFamilyHandle* column_handle) + rocksdb::ColumnFamilyHandle* column_handle, + type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -320,7 +338,8 @@ DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* i expire_cnt_(expire_cnt), has_ts_idx_(true), ts_idx_(ts_idx), - column_handle_(column_handle) {} + column_handle_(column_handle), + compress_type_(compress_type) {} DiskTableKeyIterator::~DiskTableKeyIterator() { delete it_; @@ -398,7 +417,7 @@ std::unique_ptr<::hybridse::vm::RowIterator> DiskTableKeyIterator::GetValue() { ro.pin_data = true; rocksdb::Iterator* it = db_->NewIterator(ro, column_handle_); return std::make_unique(db_, it, snapshot, ttl_type_, expire_time_, - expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_); + expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_, compress_type_); } ::hybridse::vm::RowIterator* DiskTableKeyIterator::GetRawValue() { @@ -408,14 +427,14 @@ ::hybridse::vm::RowIterator* DiskTableKeyIterator::GetRawValue() { // ro.prefix_same_as_start = true; ro.pin_data = true; rocksdb::Iterator* it = db_->NewIterator(ro, column_handle_); - return new DiskTableRowIterator(db_, it, snapshot, ttl_type_, expire_time_, expire_cnt_, pk_, ts_, has_ts_idx_, - ts_idx_); + return new DiskTableRowIterator(db_, it, snapshot, ttl_type_, expire_time_, + expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_, compress_type_); } DiskTableRowIterator::DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, uint64_t expire_cnt, std::string pk, uint64_t ts, bool has_ts_idx, - uint32_t ts_idx) + uint32_t ts_idx, type::CompressType compress_type) : db_(db), it_(it), snapshot_(snapshot), @@ -426,7 +445,8 @@ DiskTableRowIterator::DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* i ts_(ts), has_ts_idx_(has_ts_idx), ts_idx_(ts_idx), - row_() {} + row_(), + compress_type_(compress_type) {} DiskTableRowIterator::~DiskTableRowIterator() { delete it_; @@ -470,9 +490,17 @@ const ::hybridse::codec::Row& DiskTableRowIterator::GetValue() { } valid_value_ = true; size_t size = it_->value().size(); - int8_t* copyed_row_data = reinterpret_cast(malloc(size)); - memcpy(copyed_row_data, it_->value().data(), size); - row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, size)); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(it_->value().data(), size, &tmp_buf_); + int8_t* copyed_row_data = reinterpret_cast(malloc(tmp_buf_.size())); + memcpy(copyed_row_data, tmp_buf_.data(), tmp_buf_.size()); + row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, tmp_buf_.size())); + } else { + int8_t* copyed_row_data = reinterpret_cast(malloc(size)); + memcpy(copyed_row_data, it_->value().data(), size); + row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, size)); + } return row_; } diff --git a/src/storage/disk_table_iterator.h b/src/storage/disk_table_iterator.h index 88f7225c5a9..df9b98fca9c 100644 --- a/src/storage/disk_table_iterator.h +++ b/src/storage/disk_table_iterator.h @@ -29,9 +29,10 @@ namespace storage { class DiskTableIterator : public TableIterator { public: - DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, const std::string& pk); - DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, const std::string& pk, - uint32_t ts_idx); + DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, + const std::string& pk, type::CompressType compress_type); + DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, + const std::string& pk, uint32_t ts_idx, type::CompressType compress_type); virtual ~DiskTableIterator(); bool Valid() override; void Next() override; @@ -49,16 +50,18 @@ class DiskTableIterator : public TableIterator { uint64_t ts_; uint32_t ts_idx_; bool has_ts_idx_ = false; + type::CompressType compress_type_; + mutable std::string tmp_buf_; }; class DiskTableTraverseIterator : public TraverseIterator { public: DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, - const uint64_t& expire_cnt); + const uint64_t& expire_cnt, type::CompressType compress_type); DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, - const uint64_t& expire_cnt, int32_t ts_idx); + const uint64_t& expire_cnt, int32_t ts_idx, type::CompressType compress_type); virtual ~DiskTableTraverseIterator(); bool Valid() override; void Next() override; @@ -84,13 +87,16 @@ class DiskTableTraverseIterator : public TraverseIterator { bool has_ts_idx_; uint32_t ts_idx_; uint64_t traverse_cnt_; + type::CompressType compress_type_; + mutable std::string tmp_buf_; }; class DiskTableRowIterator : public ::hybridse::vm::RowIterator { public: DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, uint64_t expire_cnt, - std::string pk, uint64_t ts, bool has_ts_idx, uint32_t ts_idx); + std::string pk, uint64_t ts, bool has_ts_idx, uint32_t ts_idx, + type::CompressType compress_type); ~DiskTableRowIterator(); @@ -129,17 +135,21 @@ class DiskTableRowIterator : public ::hybridse::vm::RowIterator { ::hybridse::codec::Row row_; bool pk_valid_; bool valid_value_ = false; + type::CompressType compress_type_; + std::string tmp_buf_; }; class DiskTableKeyIterator : public ::hybridse::vm::WindowIterator { public: DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt, - int32_t ts_idx, rocksdb::ColumnFamilyHandle* column_handle); + int32_t ts_idx, rocksdb::ColumnFamilyHandle* column_handle, + type::CompressType compress_type); DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt, - rocksdb::ColumnFamilyHandle* column_handle); + rocksdb::ColumnFamilyHandle* column_handle, + type::CompressType compress_type); ~DiskTableKeyIterator() override; @@ -171,6 +181,7 @@ class DiskTableKeyIterator : public ::hybridse::vm::WindowIterator { uint64_t ts_; uint32_t ts_idx_; rocksdb::ColumnFamilyHandle* column_handle_; + type::CompressType compress_type_; }; } // namespace storage diff --git a/src/storage/mem_table.cc b/src/storage/mem_table.cc index 8cbb145e323..02290f709f5 100644 --- a/src/storage/mem_table.cc +++ b/src/storage/mem_table.cc @@ -749,7 +749,8 @@ ::hybridse::vm::WindowIterator* MemTable::NewWindowIterator(uint32_t index) { if (ts_col) { ts_idx = ts_col->GetId(); } - return new MemTableKeyIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt, ts_idx); + return new MemTableKeyIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, + expire_time, expire_cnt, ts_idx, GetCompressType()); } TraverseIterator* MemTable::NewTraverseIterator(uint32_t index) { @@ -768,10 +769,11 @@ TraverseIterator* MemTable::NewTraverseIterator(uint32_t index) { uint32_t real_idx = index_def->GetInnerPos(); auto ts_col = index_def->GetTsColumn(); if (ts_col) { - return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt, - ts_col->GetId()); + return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, + expire_time, expire_cnt, ts_col->GetId(), GetCompressType()); } - return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt, 0); + return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, + expire_time, expire_cnt, 0, GetCompressType()); } bool MemTable::GetBulkLoadInfo(::openmldb::api::BulkLoadInfoResponse* response) { diff --git a/src/storage/mem_table_iterator.cc b/src/storage/mem_table_iterator.cc index 8b0f074427a..22cd7964640 100644 --- a/src/storage/mem_table_iterator.cc +++ b/src/storage/mem_table_iterator.cc @@ -15,7 +15,7 @@ */ #include "storage/mem_table_iterator.h" - +#include #include #include "base/hash.h" #include "gflags/gflags.h" @@ -48,7 +48,13 @@ const uint64_t& MemTableWindowIterator::GetKey() const { } const ::hybridse::codec::Row& MemTableWindowIterator::GetValue() { - row_.Reset(reinterpret_cast(it_->GetValue()->data), it_->GetValue()->size); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(it_->GetValue()->data, it_->GetValue()->size, &tmp_buf_); + row_.Reset(reinterpret_cast(tmp_buf_.data()), tmp_buf_.size()); + } else { + row_.Reset(reinterpret_cast(it_->GetValue()->data), it_->GetValue()->size); + } return row_; } @@ -69,7 +75,8 @@ void MemTableWindowIterator::SeekToFirst() { } MemTableKeyIterator::MemTableKeyIterator(Segment** segments, uint32_t seg_cnt, ::openmldb::storage::TTLType ttl_type, - uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index) + uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index, + type::CompressType compress_type) : segments_(segments), seg_cnt_(seg_cnt), seg_idx_(0), @@ -79,7 +86,8 @@ MemTableKeyIterator::MemTableKeyIterator(Segment** segments, uint32_t seg_cnt, : expire_time_(expire_time), expire_cnt_(expire_cnt), ticket_(), - ts_idx_(0) { + ts_idx_(0), + compress_type_(compress_type) { uint32_t idx = 0; if (segments_[0]->GetTsIdx(ts_index, idx) == 0) { ts_idx_ = idx; @@ -142,7 +150,7 @@ ::hybridse::vm::RowIterator* MemTableKeyIterator::GetRawValue() { ticket_.Push((KeyEntry*)pk_it_->GetValue()); // NOLINT } it->SeekToFirst(); - return new MemTableWindowIterator(it, ttl_type_, expire_time_, expire_cnt_); + return new MemTableWindowIterator(it, ttl_type_, expire_time_, expire_cnt_, compress_type_); } std::unique_ptr<::hybridse::vm::RowIterator> MemTableKeyIterator::GetValue() { @@ -177,8 +185,9 @@ void MemTableKeyIterator::NextPK() { } MemTableTraverseIterator::MemTableTraverseIterator(Segment** segments, uint32_t seg_cnt, - ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, - uint64_t expire_cnt, uint32_t ts_index) + ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, + uint64_t expire_cnt, uint32_t ts_index, + type::CompressType compress_type) : segments_(segments), seg_cnt_(seg_cnt), seg_idx_(0), @@ -188,7 +197,8 @@ MemTableTraverseIterator::MemTableTraverseIterator(Segment** segments, uint32_t ts_idx_(0), expire_value_(expire_time, expire_cnt, ttl_type), ticket_(), - traverse_cnt_(0) { + traverse_cnt_(0), + compress_type_(compress_type) { uint32_t idx = 0; if (segments_[0]->GetTsIdx(ts_index, idx) == 0) { ts_idx_ = idx; @@ -320,7 +330,13 @@ void MemTableTraverseIterator::Seek(const std::string& key, uint64_t ts) { } openmldb::base::Slice MemTableTraverseIterator::GetValue() const { - return openmldb::base::Slice(it_->GetValue()->data, it_->GetValue()->size); + if (compress_type_ == type::CompressType::kSnappy) { + tmp_buf_.clear(); + snappy::Uncompress(it_->GetValue()->data, it_->GetValue()->size, &tmp_buf_); + return openmldb::base::Slice(tmp_buf_); + } else { + return openmldb::base::Slice(it_->GetValue()->data, it_->GetValue()->size); + } } uint64_t MemTableTraverseIterator::GetKey() const { diff --git a/src/storage/mem_table_iterator.h b/src/storage/mem_table_iterator.h index 967345fc2a9..5e5ba461181 100644 --- a/src/storage/mem_table_iterator.h +++ b/src/storage/mem_table_iterator.h @@ -27,8 +27,9 @@ namespace storage { class MemTableWindowIterator : public ::hybridse::vm::RowIterator { public: MemTableWindowIterator(TimeEntries::Iterator* it, ::openmldb::storage::TTLType ttl_type, uint64_t expire_time, - uint64_t expire_cnt) - : it_(it), record_idx_(1), expire_value_(expire_time, expire_cnt, ttl_type), row_() {} + uint64_t expire_cnt, type::CompressType compress_type) + : it_(it), record_idx_(1), expire_value_(expire_time, expire_cnt, ttl_type), + row_(), compress_type_(compress_type) {} ~MemTableWindowIterator(); @@ -51,12 +52,15 @@ class MemTableWindowIterator : public ::hybridse::vm::RowIterator { uint32_t record_idx_; TTLSt expire_value_; ::hybridse::codec::Row row_; + type::CompressType compress_type_; + std::string tmp_buf_; }; class MemTableKeyIterator : public ::hybridse::vm::WindowIterator { public: MemTableKeyIterator(Segment** segments, uint32_t seg_cnt, ::openmldb::storage::TTLType ttl_type, - uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index); + uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index, + type::CompressType compress_type); ~MemTableKeyIterator() override; @@ -87,12 +91,14 @@ class MemTableKeyIterator : public ::hybridse::vm::WindowIterator { uint64_t expire_cnt_; Ticket ticket_; uint32_t ts_idx_; + type::CompressType compress_type_; }; class MemTableTraverseIterator : public TraverseIterator { public: MemTableTraverseIterator(Segment** segments, uint32_t seg_cnt, ::openmldb::storage::TTLType ttl_type, - uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index); + uint64_t expire_time, uint64_t expire_cnt, uint32_t ts_index, + type::CompressType compress_type); ~MemTableTraverseIterator() override; inline bool Valid() override; void Next() override; @@ -115,6 +121,8 @@ class MemTableTraverseIterator : public TraverseIterator { TTLSt expire_value_; Ticket ticket_; uint64_t traverse_cnt_; + type::CompressType compress_type_; + mutable std::string tmp_buf_; }; } // namespace storage diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index f30f1f8b74b..e6924b64ce9 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -724,6 +724,22 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques response->set_msg("exceed max memory"); return; } + ::openmldb::api::LogEntry entry; + entry.set_pk(request->pk()); + entry.set_ts(request->time()); + if (table->GetCompressType() == openmldb::type::CompressType::kSnappy) { + const auto& raw_val = request->value(); + std::string* val = entry.mutable_value(); + ::snappy::Compress(raw_val.c_str(), raw_val.length(), val); + } else { + entry.set_value(request->value()); + } + if (request->dimensions_size() > 0) { + entry.mutable_dimensions()->CopyFrom(request->dimensions()); + } + if (request->ts_dimensions_size() > 0) { + entry.mutable_ts_dimensions()->CopyFrom(request->ts_dimensions()); + } bool ok = false; if (request->dimensions_size() > 0) { int32_t ret_code = CheckDimessionPut(request, table->GetIdxCnt()); @@ -733,7 +749,7 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques return; } DLOG(INFO) << "put data to tid " << tid << " pid " << pid << " with key " << request->dimensions(0).key(); - ok = table->Put(request->time(), request->value(), request->dimensions()); + ok = table->Put(entry.ts(), entry.value(), entry.dimensions()); } if (!ok) { response->set_code(::openmldb::base::ReturnCode::kPutFailed); @@ -743,23 +759,13 @@ void TabletImpl::Put(RpcController* controller, const ::openmldb::api::PutReques response->set_code(::openmldb::base::ReturnCode::kOk); std::shared_ptr replicator; - ::openmldb::api::LogEntry entry; do { replicator = GetReplicator(request->tid(), request->pid()); if (!replicator) { PDLOG(WARNING, "fail to find table tid %u pid %u leader's log replicator", tid, pid); break; } - entry.set_pk(request->pk()); - entry.set_ts(request->time()); - entry.set_value(request->value()); entry.set_term(replicator->GetLeaderTerm()); - if (request->dimensions_size() > 0) { - entry.mutable_dimensions()->CopyFrom(request->dimensions()); - } - if (request->ts_dimensions_size() > 0) { - entry.mutable_ts_dimensions()->CopyFrom(request->ts_dimensions()); - } // Aggregator update assumes that binlog_offset is strictly increasing // so the update should be protected within the replicator lock