Skip to content

Commit

Permalink
fix: fix select
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 committed Oct 31, 2023
1 parent 6baa940 commit 539c6fc
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 64 deletions.
31 changes: 31 additions & 0 deletions src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,37 @@ TEST_P(DBSDKTest, Select) {
ASSERT_TRUE(status.IsOK());
}

TEST_P(DBSDKTest, SelectSnappy) {
auto cli = GetParam();
cs = cli->cs;
sr = cli->sr;
hybridse::sdk::Status status;
if (cs->IsClusterMode()) {
sr->ExecuteSQL("SET @@execute_mode='online';", &status);
ASSERT_TRUE(status.IsOK()) << "error msg: " + status.msg;
}
std::string db = "db" + GenRand();
sr->ExecuteSQL("create database " + db + ";", &status);
ASSERT_TRUE(status.IsOK());
sr->ExecuteSQL("use " + db + ";", &status);
ASSERT_TRUE(status.IsOK());
std::string create_sql =
"create table trans (c1 string, c2 bigint, c3 date,"
"index(key=c1, ts=c2, abs_ttl=0, ttl_type=absolute)) options (compress_type='snappy');";
sr->ExecuteSQL(create_sql, &status);
ASSERT_TRUE(status.IsOK());
std::string insert_sql = "insert into trans values ('aaa', 1635247427000, \"2021-05-20\");";
sr->ExecuteSQL(insert_sql, &status);
ASSERT_TRUE(status.IsOK());
auto rs = sr->ExecuteSQL("select * from trans", &status);
ASSERT_TRUE(status.IsOK());
ASSERT_EQ(1, rs->Size());
sr->ExecuteSQL("drop table trans;", &status);
ASSERT_TRUE(status.IsOK());
sr->ExecuteSQL("drop database " + db + ";", &status);
ASSERT_TRUE(status.IsOK());
}

TEST_F(SqlCmdTest, SelectMultiPartition) {
auto sr = cluster_cli.sr;
std::string db_name = "test" + GenRand();
Expand Down
13 changes: 7 additions & 6 deletions src/storage/disk_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,10 @@ TableIterator* DiskTable::NewIterator(uint32_t idx, const std::string& pk, Ticke
if (inner_index && inner_index->GetIndex().size() > 1) {
auto ts_col = index_def->GetTsColumn();
if (ts_col) {
return new DiskTableIterator(db_, it, snapshot, pk, ts_col->GetId());
return new DiskTableIterator(db_, it, snapshot, pk, ts_col->GetId(), GetCompressType());
}
}
return new DiskTableIterator(db_, it, snapshot, pk);
return new DiskTableIterator(db_, it, snapshot, pk, GetCompressType());
}

TraverseIterator* DiskTable::NewTraverseIterator(uint32_t index) {
Expand All @@ -569,10 +569,10 @@ TraverseIterator* DiskTable::NewTraverseIterator(uint32_t index) {
auto ts_col = index_def->GetTsColumn();
if (ts_col) {
return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt,
ts_col->GetId());
ts_col->GetId(), GetCompressType());
}
}
return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt);
return new DiskTableTraverseIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, GetCompressType());
}

::hybridse::vm::WindowIterator* DiskTable::NewWindowIterator(uint32_t idx) {
Expand All @@ -595,10 +595,11 @@ ::hybridse::vm::WindowIterator* DiskTable::NewWindowIterator(uint32_t idx) {
auto ts_col = index_def->GetTsColumn();
if (ts_col) {
return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt,
ts_col->GetId(), cf_hs_[inner_pos + 1]);
ts_col->GetId(), cf_hs_[inner_pos + 1], GetCompressType());
}
}
return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt, cf_hs_[inner_pos + 1]);
return new DiskTableKeyIterator(db_, it, snapshot, ttl->ttl_type, expire_time, expire_cnt,
cf_hs_[inner_pos + 1], GetCompressType());
}

bool DiskTable::DeleteIndex(const std::string& idx_name) {
Expand Down
72 changes: 50 additions & 22 deletions src/storage/disk_table_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

#include "storage/disk_table_iterator.h"

#include <snappy.h>
#include <string>
#include "gflags/gflags.h"
#include "storage/key_transform.h"
Expand All @@ -26,12 +26,12 @@ namespace openmldb {
namespace storage {

DiskTableIterator::DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
const std::string& pk)
: db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0) {}
const std::string& pk, type::CompressType compress_type)
: db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), compress_type_(compress_type) {}

DiskTableIterator::DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
const std::string& pk, uint32_t ts_idx)
: db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), ts_idx_(ts_idx) {
const std::string& pk, uint32_t ts_idx, type::CompressType compress_type)
: db_(db), it_(it), snapshot_(snapshot), pk_(pk), ts_(0), ts_idx_(ts_idx), compress_type_(compress_type) {
has_ts_idx_ = true;
}

Expand All @@ -55,7 +55,13 @@ void DiskTableIterator::Next() { return it_->Next(); }

openmldb::base::Slice DiskTableIterator::GetValue() const {
rocksdb::Slice value = it_->value();
return openmldb::base::Slice(value.data(), value.size());
if (compress_type_ == type::CompressType::kSnappy) {
tmp_buf_.clear();
snappy::Uncompress(value.data(), value.size(), &tmp_buf_);
return openmldb::base::Slice(tmp_buf_);
} else {
return openmldb::base::Slice(value.data(), value.size());
}
}

std::string DiskTableIterator::GetPK() const { return pk_; }
Expand Down Expand Up @@ -85,28 +91,32 @@ void DiskTableIterator::Seek(const uint64_t ts) {
DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it,
const rocksdb::Snapshot* snapshot,
::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time,
const uint64_t& expire_cnt)
const uint64_t& expire_cnt,
type::CompressType compress_type)
: db_(db),
it_(it),
snapshot_(snapshot),
record_idx_(0),
expire_value_(expire_time, expire_cnt, ttl_type),
has_ts_idx_(false),
ts_idx_(0),
traverse_cnt_(0) {}
traverse_cnt_(0),
compress_type_(compress_type) {}

DiskTableTraverseIterator::DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it,
const rocksdb::Snapshot* snapshot,
::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time,
const uint64_t& expire_cnt, int32_t ts_idx)
const uint64_t& expire_cnt, int32_t ts_idx,
type::CompressType compress_type)
: db_(db),
it_(it),
snapshot_(snapshot),
record_idx_(0),
expire_value_(expire_time, expire_cnt, ttl_type),
has_ts_idx_(true),
ts_idx_(ts_idx),
traverse_cnt_(0) {}
traverse_cnt_(0),
compress_type_(compress_type) {}

DiskTableTraverseIterator::~DiskTableTraverseIterator() {
delete it_;
Expand Down Expand Up @@ -154,6 +164,11 @@ void DiskTableTraverseIterator::Next() {

openmldb::base::Slice DiskTableTraverseIterator::GetValue() const {
rocksdb::Slice value = it_->value();
if (compress_type_ == type::CompressType::kSnappy) {
tmp_buf_.clear();
snappy::Uncompress(value.data(), value.size(), &tmp_buf_);
return openmldb::base::Slice(tmp_buf_);
}
return openmldb::base::Slice(value.data(), value.size());
}

Expand Down Expand Up @@ -297,7 +312,8 @@ void DiskTableTraverseIterator::NextPK() {
DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it,
const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type,
const uint64_t& expire_time, const uint64_t& expire_cnt,
rocksdb::ColumnFamilyHandle* column_handle)
rocksdb::ColumnFamilyHandle* column_handle,
type::CompressType compress_type)
: db_(db),
it_(it),
snapshot_(snapshot),
Expand All @@ -306,12 +322,14 @@ DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* i
expire_cnt_(expire_cnt),
has_ts_idx_(false),
ts_idx_(0),
column_handle_(column_handle) {}
column_handle_(column_handle),
compress_type_(compress_type) {}

DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it,
const rocksdb::Snapshot* snapshot, ::openmldb::storage::TTLType ttl_type,
const uint64_t& expire_time, const uint64_t& expire_cnt, int32_t ts_idx,
rocksdb::ColumnFamilyHandle* column_handle)
rocksdb::ColumnFamilyHandle* column_handle,
type::CompressType compress_type)
: db_(db),
it_(it),
snapshot_(snapshot),
Expand All @@ -320,7 +338,8 @@ DiskTableKeyIterator::DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* i
expire_cnt_(expire_cnt),
has_ts_idx_(true),
ts_idx_(ts_idx),
column_handle_(column_handle) {}
column_handle_(column_handle),
compress_type_(compress_type) {}

DiskTableKeyIterator::~DiskTableKeyIterator() {
delete it_;
Expand Down Expand Up @@ -398,7 +417,7 @@ std::unique_ptr<::hybridse::vm::RowIterator> DiskTableKeyIterator::GetValue() {
ro.pin_data = true;
rocksdb::Iterator* it = db_->NewIterator(ro, column_handle_);
return std::make_unique<DiskTableRowIterator>(db_, it, snapshot, ttl_type_, expire_time_,
expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_);
expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_, compress_type_);
}

::hybridse::vm::RowIterator* DiskTableKeyIterator::GetRawValue() {
Expand All @@ -408,14 +427,14 @@ ::hybridse::vm::RowIterator* DiskTableKeyIterator::GetRawValue() {
// ro.prefix_same_as_start = true;
ro.pin_data = true;
rocksdb::Iterator* it = db_->NewIterator(ro, column_handle_);
return new DiskTableRowIterator(db_, it, snapshot, ttl_type_, expire_time_, expire_cnt_, pk_, ts_, has_ts_idx_,
ts_idx_);
return new DiskTableRowIterator(db_, it, snapshot, ttl_type_, expire_time_,
expire_cnt_, pk_, ts_, has_ts_idx_, ts_idx_, compress_type_);
}

DiskTableRowIterator::DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
::openmldb::storage::TTLType ttl_type, uint64_t expire_time,
uint64_t expire_cnt, std::string pk, uint64_t ts, bool has_ts_idx,
uint32_t ts_idx)
uint32_t ts_idx, type::CompressType compress_type)
: db_(db),
it_(it),
snapshot_(snapshot),
Expand All @@ -426,7 +445,8 @@ DiskTableRowIterator::DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* i
ts_(ts),
has_ts_idx_(has_ts_idx),
ts_idx_(ts_idx),
row_() {}
row_(),
compress_type_(compress_type) {}

DiskTableRowIterator::~DiskTableRowIterator() {
delete it_;
Expand Down Expand Up @@ -470,9 +490,17 @@ const ::hybridse::codec::Row& DiskTableRowIterator::GetValue() {
}
valid_value_ = true;
size_t size = it_->value().size();
int8_t* copyed_row_data = reinterpret_cast<int8_t*>(malloc(size));
memcpy(copyed_row_data, it_->value().data(), size);
row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, size));
if (compress_type_ == type::CompressType::kSnappy) {
tmp_buf_.clear();
snappy::Uncompress(it_->value().data(), size, &tmp_buf_);
int8_t* copyed_row_data = reinterpret_cast<int8_t*>(malloc(tmp_buf_.size()));
memcpy(copyed_row_data, tmp_buf_.data(), tmp_buf_.size());
row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, tmp_buf_.size()));
} else {
int8_t* copyed_row_data = reinterpret_cast<int8_t*>(malloc(size));
memcpy(copyed_row_data, it_->value().data(), size);
row_.Reset(::hybridse::base::RefCountedSlice::CreateManaged(copyed_row_data, size));
}
return row_;
}

Expand Down
27 changes: 19 additions & 8 deletions src/storage/disk_table_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ namespace storage {

class DiskTableIterator : public TableIterator {
public:
DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, const std::string& pk);
DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot, const std::string& pk,
uint32_t ts_idx);
DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
const std::string& pk, type::CompressType compress_type);
DiskTableIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
const std::string& pk, uint32_t ts_idx, type::CompressType compress_type);
virtual ~DiskTableIterator();
bool Valid() override;
void Next() override;
Expand All @@ -49,16 +50,18 @@ class DiskTableIterator : public TableIterator {
uint64_t ts_;
uint32_t ts_idx_;
bool has_ts_idx_ = false;
type::CompressType compress_type_;
mutable std::string tmp_buf_;
};

class DiskTableTraverseIterator : public TraverseIterator {
public:
DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time,
const uint64_t& expire_cnt);
const uint64_t& expire_cnt, type::CompressType compress_type);
DiskTableTraverseIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time,
const uint64_t& expire_cnt, int32_t ts_idx);
const uint64_t& expire_cnt, int32_t ts_idx, type::CompressType compress_type);
virtual ~DiskTableTraverseIterator();
bool Valid() override;
void Next() override;
Expand All @@ -84,13 +87,16 @@ class DiskTableTraverseIterator : public TraverseIterator {
bool has_ts_idx_;
uint32_t ts_idx_;
uint64_t traverse_cnt_;
type::CompressType compress_type_;
mutable std::string tmp_buf_;
};

class DiskTableRowIterator : public ::hybridse::vm::RowIterator {
public:
DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
::openmldb::storage::TTLType ttl_type, uint64_t expire_time, uint64_t expire_cnt,
std::string pk, uint64_t ts, bool has_ts_idx, uint32_t ts_idx);
std::string pk, uint64_t ts, bool has_ts_idx, uint32_t ts_idx,
type::CompressType compress_type);

~DiskTableRowIterator();

Expand Down Expand Up @@ -129,17 +135,21 @@ class DiskTableRowIterator : public ::hybridse::vm::RowIterator {
::hybridse::codec::Row row_;
bool pk_valid_;
bool valid_value_ = false;
type::CompressType compress_type_;
std::string tmp_buf_;
};

class DiskTableKeyIterator : public ::hybridse::vm::WindowIterator {
public:
DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt,
int32_t ts_idx, rocksdb::ColumnFamilyHandle* column_handle);
int32_t ts_idx, rocksdb::ColumnFamilyHandle* column_handle,
type::CompressType compress_type);

DiskTableKeyIterator(rocksdb::DB* db, rocksdb::Iterator* it, const rocksdb::Snapshot* snapshot,
::openmldb::storage::TTLType ttl_type, const uint64_t& expire_time, const uint64_t& expire_cnt,
rocksdb::ColumnFamilyHandle* column_handle);
rocksdb::ColumnFamilyHandle* column_handle,
type::CompressType compress_type);

~DiskTableKeyIterator() override;

Expand Down Expand Up @@ -171,6 +181,7 @@ class DiskTableKeyIterator : public ::hybridse::vm::WindowIterator {
uint64_t ts_;
uint32_t ts_idx_;
rocksdb::ColumnFamilyHandle* column_handle_;
type::CompressType compress_type_;
};

} // namespace storage
Expand Down
10 changes: 6 additions & 4 deletions src/storage/mem_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,8 @@ ::hybridse::vm::WindowIterator* MemTable::NewWindowIterator(uint32_t index) {
if (ts_col) {
ts_idx = ts_col->GetId();
}
return new MemTableKeyIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt, ts_idx);
return new MemTableKeyIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type,
expire_time, expire_cnt, ts_idx, GetCompressType());
}

TraverseIterator* MemTable::NewTraverseIterator(uint32_t index) {
Expand All @@ -768,10 +769,11 @@ TraverseIterator* MemTable::NewTraverseIterator(uint32_t index) {
uint32_t real_idx = index_def->GetInnerPos();
auto ts_col = index_def->GetTsColumn();
if (ts_col) {
return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt,
ts_col->GetId());
return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type,
expire_time, expire_cnt, ts_col->GetId(), GetCompressType());
}
return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type, expire_time, expire_cnt, 0);
return new MemTableTraverseIterator(segments_[real_idx], seg_cnt_, ttl->ttl_type,
expire_time, expire_cnt, 0, GetCompressType());
}

bool MemTable::GetBulkLoadInfo(::openmldb::api::BulkLoadInfoResponse* response) {
Expand Down
Loading

0 comments on commit 539c6fc

Please sign in to comment.