Skip to content

Commit

Permalink
feat(engine): array & map data type interface for row level
Browse files Browse the repository at this point in the history
This updated proto defines for `hybridse::type::ColumnDef`, old data
type field `ColumnDef::type`, as well as `ColumnDef::is_not_null` consider deprecated, and `ColumnDef::schema` consider the new data type
for new codes. Old fields still workable but may going to removed in further release.
  • Loading branch information
aceforeverd committed Jan 22, 2024
1 parent 3587dc6 commit 9fe51b9
Show file tree
Hide file tree
Showing 20 changed files with 355 additions and 167 deletions.
43 changes: 27 additions & 16 deletions hybridse/include/codec/fe_row_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
#include <map>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "absl/status/statusor.h"
#include "base/raw_buffer.h"
#include "butil/iobuf.h"
#include "gflags/gflags.h"
#include "proto/fe_type.pb.h"

namespace hybridse {
Expand Down Expand Up @@ -180,26 +179,38 @@ class RowView {
};

struct ColInfo {
::hybridse::type::Type type;
// type is still used in same lagecy udf context,
// cautious use for non-base types
::hybridse::type::Type type() const {
if (!schema.has_base_type()) {
return type::kNull;
}
return schema.base_type();
}

uint32_t idx;
uint32_t offset;
std::string name;
type::ColumnSchema schema;

ColInfo() {}
ColInfo(const std::string& name, ::hybridse::type::Type type, uint32_t idx,
uint32_t offset)
: type(type), idx(idx), offset(offset), name(name) {}
ColInfo(const std::string& name, ::hybridse::type::Type type, uint32_t idx, uint32_t offset)
: idx(idx), offset(offset), name(name) {
schema.set_base_type(type);
}

ColInfo(const std::string& name, const type::ColumnSchema& sc, uint32_t idx, uint32_t offset)
: idx(idx), offset(offset), name(name), schema(sc) {}
};

struct StringColInfo : public ColInfo {
uint32_t str_next_offset;
uint32_t str_start_offset;

StringColInfo() {}
StringColInfo(const std::string& name, ::hybridse::type::Type type,
StringColInfo(const std::string& name, ::hybridse::type::ColumnSchema sc,
uint32_t idx, uint32_t offset, uint32_t str_next_offset,
uint32_t str_start_offset)
: ColInfo(name, type, idx, offset),
: ColInfo(name, sc, idx, offset),
str_next_offset(str_next_offset),
str_start_offset(str_start_offset) {}
};
Expand All @@ -209,7 +220,7 @@ class SliceFormat {
explicit SliceFormat(const hybridse::codec::Schema* schema);
virtual ~SliceFormat() {}

bool GetStringColumnInfo(size_t idx, StringColInfo* res) const;
absl::StatusOr<StringColInfo> GetStringColumnInfo(size_t idx) const;

const ColInfo* GetColumnInfo(size_t idx) const;

Expand All @@ -224,7 +235,7 @@ class SliceFormat {
class RowFormat {
public:
virtual ~RowFormat() {}
virtual bool GetStringColumnInfo(size_t schema_idx, size_t idx, StringColInfo* res) const = 0;
virtual absl::StatusOr<StringColInfo> GetStringColumnInfo(size_t schema_idx, size_t idx) const = 0;
virtual const ColInfo* GetColumnInfo(size_t schema_idx, size_t idx) const = 0;
virtual size_t GetSliceId(size_t schema_idx) const = 0;
};
Expand All @@ -245,8 +256,8 @@ class MultiSlicesRowFormat : public RowFormat {
slice_formats_.clear();
}

bool GetStringColumnInfo(size_t schema_idx, size_t idx, StringColInfo* res) const override {
return slice_formats_[schema_idx].GetStringColumnInfo(idx, res);
absl::StatusOr<StringColInfo> GetStringColumnInfo(size_t schema_idx, size_t idx) const override {
return slice_formats_[schema_idx].GetStringColumnInfo(idx);
}

const ColInfo* GetColumnInfo(size_t schema_idx, size_t idx) const override {
Expand Down Expand Up @@ -287,8 +298,8 @@ class SingleSliceRowFormat : public RowFormat {
}
}

bool GetStringColumnInfo(size_t schema_idx, size_t idx, StringColInfo* res) const override {
return slice_format_->GetStringColumnInfo(offsets_[schema_idx] + idx, res);
absl::StatusOr<StringColInfo> GetStringColumnInfo(size_t schema_idx, size_t idx) const override {
return slice_format_->GetStringColumnInfo(offsets_[schema_idx] + idx);
}

const ColInfo* GetColumnInfo(size_t schema_idx, size_t idx) const override {
Expand Down
8 changes: 5 additions & 3 deletions hybridse/src/benchmark/udf_bm_case.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,10 @@ void SumArrayListCol(benchmark::State* state, MODE mode, int64_t data_size,
schemas_context.GetRowFormat(schema_idx)->GetColumnInfo(col_idx);

codegen::MemoryWindowDecodeIRBuilder builder(&schemas_context, nullptr);
node::TypeNode type;
codegen::SchemaType2DataType(info->type, &type);
node::NodeManager nm;
auto rs = codegen::ColumnSchema2Type(info->schema, &nm);
ASSERT_TRUE(rs.ok());
auto* type = rs.value();

uint32_t col_size;
ASSERT_TRUE(codegen::GetLlvmColumnSize(&type, &col_size));
Expand All @@ -193,7 +195,7 @@ void SumArrayListCol(benchmark::State* state, MODE mode, int64_t data_size,

ASSERT_EQ(0, ::hybridse::codec::v1::GetCol(
reinterpret_cast<int8_t*>(&list_table_ref), 0, info->idx,
info->offset, info->type, buf));
info->offset, info->type(), buf));

{
switch (mode) {
Expand Down
1 change: 1 addition & 0 deletions hybridse/src/case/sql_case.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ bool SqlCase::ExtractSchema(const std::vector<std::string>& columns,
}
column->set_type(type);
column->set_is_not_null(false);
column->mutable_schema()->set_base_type(column->type());
}
} catch (const std::exception& ex) {
LOG(WARNING) << "Fail to ExtractSchema: " << ex.what();
Expand Down
37 changes: 21 additions & 16 deletions hybridse/src/codec/fe_row_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -928,10 +928,18 @@ SliceFormat::SliceFormat(const hybridse::codec::Schema* schema)
for (int32_t i = 0; i < schema_->size(); i++) {
const ::hybridse::type::ColumnDef& column = schema_->Get(i);
if (column.type() == ::hybridse::type::kVarchar) {
// backwards compatibility check
type::ColumnSchema col_schema;
if (column.has_schema()) {
col_schema = column.schema();
} else {
col_schema.set_base_type(column.type());
}

if (FLAGS_enable_spark_unsaferow_format) {
infos_.emplace_back(column.name(), column.type(), i, offset);
infos_.emplace_back(column.name(), col_schema, i, offset);
} else {
infos_.emplace_back(column.name(), column.type(), i, string_field_cnt);
infos_.emplace_back(column.name(), col_schema, i, string_field_cnt);
}

infos_dict_[column.name()] = i;
Expand All @@ -943,13 +951,17 @@ SliceFormat::SliceFormat(const hybridse::codec::Schema* schema)
offset += 8;
}
} else {
auto TYPE_SIZE_MAP = codec::GetTypeSizeMap();
auto& TYPE_SIZE_MAP = codec::GetTypeSizeMap();
auto it = TYPE_SIZE_MAP.find(column.type());
if (it == TYPE_SIZE_MAP.end()) {
LOG(WARNING) << "fail to find column type "
<< ::hybridse::type::Type_Name(column.type());
} else {
infos_.emplace_back(column.name(), column.type(), i, offset);
if (column.has_schema()) {
infos_.emplace_back(column.name(), column.schema(), i, offset);
} else {
infos_.emplace_back(column.name(), column.type(), i, offset);
}
infos_dict_[column.name()] = i;
offset += it->second;
}
Expand All @@ -969,17 +981,12 @@ const ColInfo* SliceFormat::GetColumnInfo(size_t idx) const {
return idx < infos_.size() ? &infos_[idx] : nullptr;
}

bool SliceFormat::GetStringColumnInfo(size_t idx, StringColInfo* res) const {
if (nullptr == res) {
LOG(WARNING) << "input args have null";
return false;
}
absl::StatusOr<StringColInfo> SliceFormat::GetStringColumnInfo(size_t idx) const {
if (idx >= infos_.size()) {
return false;
return absl::NotFoundError("schemas empty");
}
// TODO(wangtaize) support null check
auto& base_col_info = infos_[idx];
auto ty = base_col_info.type;
uint32_t col_idx = base_col_info.idx;
uint32_t offset = base_col_info.offset;
uint32_t next_offset = -1;
Expand All @@ -990,17 +997,15 @@ bool SliceFormat::GetStringColumnInfo(size_t idx, StringColInfo* res) const {
if (FLAGS_enable_spark_unsaferow_format) {
// No need to get next offset for UnsafeRowOpt and ignore the warning
} else {
LOG(WARNING) << "fail to get string field next offset";
return false;
return absl::NotFoundError("fail to get string field next offset");
}
}
DLOG(INFO) << "get string with offset " << offset << " next offset "
<< next_offset << " str_field_start_offset "
<< str_field_start_offset_ << " for col " << base_col_info.name;

*res = StringColInfo(base_col_info.name, ty, col_idx, offset, next_offset,
str_field_start_offset_);
return true;
return StringColInfo(base_col_info.name, base_col_info.schema, col_idx, offset, next_offset,
str_field_start_offset_);
}

} // namespace codec
Expand Down
67 changes: 35 additions & 32 deletions hybridse/src/codec/fe_row_codec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,18 +426,17 @@ TEST_F(CodecTest, SliceFormatTest) {
if (i % 3 == 0) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kVarchar, info->type);
ASSERT_EQ(::hybridse::type::kVarchar, info->type());

codec::StringColInfo str_info;
ASSERT_TRUE(decoder.GetStringColumnInfo(i, &str_info));
ASSERT_TRUE(decoder.GetStringColumnInfo(i).ok());
} else if (i % 3 == 1) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kInt64, info->type);
ASSERT_EQ(::hybridse::type::kInt64, info->type());
} else if (i % 3 == 2) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kDouble, info->type);
ASSERT_EQ(::hybridse::type::kDouble, info->type());
}
}
}
Expand Down Expand Up @@ -487,40 +486,41 @@ TEST_F(CodecTest, SliceFormatOffsetTest) {
SliceFormat decoder(&table.columns());
{
const codec::ColInfo* info = decoder.GetColumnInfo(0);
ASSERT_EQ(::hybridse::type::kInt32, info->type);
ASSERT_EQ(::hybridse::type::kInt32, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(7u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(1);
ASSERT_EQ(::hybridse::type::kInt16, info->type);
ASSERT_EQ(::hybridse::type::kInt16, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(7u + 4u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(2);
ASSERT_EQ(::hybridse::type::kFloat, info->type);
ASSERT_EQ(::hybridse::type::kFloat, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(7u + 4u + 2u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(3);
ASSERT_EQ(::hybridse::type::kDouble, info->type);
ASSERT_EQ(::hybridse::type::kDouble, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(7u + 4u + 2u + 4u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(4);
ASSERT_EQ(::hybridse::type::kInt64, info->type);
ASSERT_EQ(::hybridse::type::kInt64, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(7u + 4u + 2u + 4u + 8u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(5);
ASSERT_EQ(::hybridse::type::kVarchar, info->type);
ASSERT_EQ(::hybridse::type::kVarchar, info->type());

codec::StringColInfo str_info;
decoder.GetStringColumnInfo(5, &str_info);
auto rs = decoder.GetStringColumnInfo(5);
ASSERT_TRUE(rs.ok());
auto& str_info = rs.value();
LOG(INFO) << "offset: " << str_info.offset
<< " next_offset: " << str_info.str_next_offset
<< " str_start_offset " << str_info.str_start_offset;
Expand All @@ -530,10 +530,11 @@ TEST_F(CodecTest, SliceFormatOffsetTest) {
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(6);
ASSERT_EQ(::hybridse::type::kVarchar, info->type);
ASSERT_EQ(::hybridse::type::kVarchar, info->type());

codec::StringColInfo str_info;
decoder.GetStringColumnInfo(6, &str_info);
auto rs = decoder.GetStringColumnInfo(6);
ASSERT_TRUE(rs.ok());
auto& str_info = rs.value();
LOG(INFO) << "offset: " << str_info.offset
<< " next_offset: " << str_info.str_next_offset
<< " str_start_offset " << str_info.str_start_offset;
Expand Down Expand Up @@ -596,40 +597,41 @@ TEST_F(CodecTest, SliceFormatOffsetLongHeaderTest) {
SliceFormat decoder(&table.columns());
{
const codec::ColInfo* info = decoder.GetColumnInfo(0);
ASSERT_EQ(::hybridse::type::kInt32, info->type);
ASSERT_EQ(::hybridse::type::kInt32, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(8u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(1);
ASSERT_EQ(::hybridse::type::kInt16, info->type);
ASSERT_EQ(::hybridse::type::kInt16, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(8u + 4u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(2);
ASSERT_EQ(::hybridse::type::kFloat, info->type);
ASSERT_EQ(::hybridse::type::kFloat, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(8u + 4u + 2u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(3);
ASSERT_EQ(::hybridse::type::kDouble, info->type);
ASSERT_EQ(::hybridse::type::kDouble, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(8u + 4u + 2u + 4u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(4);
ASSERT_EQ(::hybridse::type::kInt64, info->type);
ASSERT_EQ(::hybridse::type::kInt64, info->type());
LOG(INFO) << "offset: " << info->offset;
ASSERT_EQ(8u + 4u + 2u + 4u + 8u, info->offset);
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(5);
ASSERT_EQ(::hybridse::type::kVarchar, info->type);
ASSERT_EQ(::hybridse::type::kVarchar, info->type());

codec::StringColInfo str_info;
decoder.GetStringColumnInfo(5, &str_info);
auto str_info_wp = decoder.GetStringColumnInfo(5);
ASSERT_TRUE(str_info_wp.ok());
auto& str_info = str_info_wp.value();
LOG(INFO) << "offset: " << str_info.offset
<< " next_offset: " << str_info.str_next_offset
<< " str_start_offset " << str_info.str_start_offset;
Expand All @@ -639,10 +641,11 @@ TEST_F(CodecTest, SliceFormatOffsetLongHeaderTest) {
}
{
const codec::ColInfo* info = decoder.GetColumnInfo(6);
ASSERT_EQ(::hybridse::type::kVarchar, info->type);
ASSERT_EQ(::hybridse::type::kVarchar, info->type());

codec::StringColInfo str_info;
decoder.GetStringColumnInfo(6, &str_info);
auto str_info_wp = decoder.GetStringColumnInfo(6);
ASSERT_TRUE(str_info_wp.ok());
auto& str_info = str_info_wp.value();
LOG(INFO) << "offset: " << str_info.offset
<< " next_offset: " << str_info.str_next_offset
<< " str_start_offset " << str_info.str_start_offset;
Expand Down Expand Up @@ -691,18 +694,18 @@ TEST_F(CodecTest, SparkUnsaferowRowFormatTest) {
if (i % 3 == 0) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kVarchar, info->type);
ASSERT_EQ(::hybridse::type::kVarchar, info->type());

codec::StringColInfo str_info;
ASSERT_TRUE(decoder.GetStringColumnInfo(i, &str_info));
auto rs = decoder.GetStringColumnInfo(i);
ASSERT_TRUE(rs.ok());
} else if (i % 3 == 1) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kInt64, info->type);
ASSERT_EQ(::hybridse::type::kInt64, info->type());
} else if (i % 3 == 2) {
const codec::ColInfo* info = decoder.GetColumnInfo(i);
ASSERT_TRUE(info != nullptr);
ASSERT_EQ(::hybridse::type::kDouble, info->type);
ASSERT_EQ(::hybridse::type::kDouble, info->type());
}
}
}
Expand Down
Loading

0 comments on commit 9fe51b9

Please sign in to comment.