Skip to content

Commit

Permalink
feat(codec): encode map type into row
Browse files Browse the repository at this point in the history
  • Loading branch information
aceforeverd committed Jan 22, 2024
1 parent 9fe51b9 commit 2ef30f5
Show file tree
Hide file tree
Showing 35 changed files with 1,739 additions and 785 deletions.
30 changes: 30 additions & 0 deletions cases/query/udf_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,33 @@ cases:
columns: ["e1 bool", "e2 bool", "e3 bool"]
data: |
true, false, true
- id: 15
mode: request-unsupport
sql: |
select map(1, 2, 3, 4) as c1
- id: 16
mode: request-unsupport
# this covers basic codec for map data type
sql: |
select
c1[3] as o1, c2[1] as o2, c3['6'] as o3, c4[timestamp(8000)] as o4,
c5[int64(12)] as o5
from (select
map(1, 2, 3, 4) as c1,
map(1, '2', 3, '4') as c2,
map('5', timestamp(8000), '6', timestamp(9000)) as c3,
map(timestamp(8000), date("2012-12-12"), timestamp(9000), date("2014-11-11")) as c4,
map(int64(10), int16(11), int64(12), int16(13)) as c5
)
expect:
columns: ["o1 int", "o2 string", "o3 timestamp", "o4 date", "o5 int16"]
data: |
4, 2, 9000, 2012-12-12, 13
- id: 17
mode: request-unsupport
sql: |
select c1 + 8 from (select 9 as c1)
31 changes: 31 additions & 0 deletions hybridse/include/base/fe_status.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,37 @@ static inline std::initializer_list<int> __output_literal_args(STREAM& stream,

#define MAX_STATUS_TRACE_SIZE 4096

// Evaluate and check the expression returns a absl::Status.
// End the current function by return status, if status is not OK
#define CHECK_ABSL_STATUS(expr) \
while (true) { \
auto _s = (expr); \
if (!_s.ok()) { \
return _s; \
} \
break; \
}

// Check the absl::StatusOr<T> object, end the current function
// by return 'object.status()' if it is not OK
#define CHECK_ABSL_STATUSOR(statusor) \
while (true) { \
if (!statusor.ok()) { \
return statusor.status(); \
} \
break; \
}

// Evaluate the expression returns Status, converted and return failed absl status if status not ok
#define CHECK_STATUS_TO_ABSL(expr) \
while (true) { \
auto _status = (expr); \
if (!_status.isOK()) { \
return absl::InternalError(_status.GetMsg()); \
} \
break; \
}

#define CHECK_STATUS(call, ...) \
while (true) { \
auto _status = (call); \
Expand Down
11 changes: 10 additions & 1 deletion hybridse/include/codec/fe_row_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,18 @@ static constexpr uint32_t UINT24_MAX = (1 << 24) - 1;
const std::string NONETOKEN = "!N@U#L$L%"; // NOLINT
const std::string EMPTY_STRING = "!@#$%"; // NOLINT

// TODO(chendihao): Change to inline function if do not depend on gflags
const std::unordered_map<::hybridse::type::Type, uint8_t>& GetTypeSizeMap();

// return true if the column considered base type in row codec.
// date & timestamp consider base type since they have single field in corresponding llvm struct,
// while string, map and array consider complex type.
//
// for base types, the column is written into row ptr by just writing the value of primitive type,
// for comple type, written is made by a string (or string-like) manner: str size + str data.
// map, array, or any other complex types, takes a extra encoding from their struct value into str data.
bool IsCodecBaseType(const type::ColumnSchema& sc);
bool IsCodecStrLikeType(const type::ColumnSchema& sc);

inline uint8_t GetAddrLength(uint32_t size) {
if (size <= UINT8_MAX) {
return 1;
Expand Down
3 changes: 3 additions & 0 deletions hybridse/include/codec/type_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ int32_t AppendString(int8_t* buf_ptr, uint32_t buf_size, uint32_t col_idx,
uint32_t str_start_offset, uint32_t str_field_offset,
uint32_t str_addr_space, uint32_t str_body_offset);

// write `str_offset` in address `str_offset_ptr`, actual written bytes determined by `str_addr_space`
void EncodeStrOffset(int8_t* str_offset_ptr, int32_t str_offset, int32_t str_addr_space);

inline int8_t GetAddrSpace(uint32_t size) {
if (size <= UINT8_MAX) {
return 1;
Expand Down
40 changes: 27 additions & 13 deletions hybridse/src/codec/fe_row_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const uint32_t BitMapSize(uint32_t size) {
}
}

const std::unordered_map<::hybridse::type::Type, uint8_t>&
static const std::unordered_map<::hybridse::type::Type, uint8_t>&
DEFAULT_TYPE_SIZE_MAP = {{::hybridse::type::kBool, sizeof(bool)},
{::hybridse::type::kInt16, sizeof(int16_t)},
{::hybridse::type::kInt32, sizeof(int32_t)},
Expand All @@ -44,7 +44,7 @@ const std::unordered_map<::hybridse::type::Type, uint8_t>&
{::hybridse::type::kDate, sizeof(int32_t)},
{::hybridse::type::kDouble, sizeof(double)}};

const std::unordered_map<::hybridse::type::Type, uint8_t>&
static const std::unordered_map<::hybridse::type::Type, uint8_t>&
SPARK_UNSAFEROW_TYPE_SIZE_MAP = {
{::hybridse::type::kBool, 8}, {::hybridse::type::kInt16, 8},
{::hybridse::type::kInt32, 8}, {::hybridse::type::kFloat, 8},
Expand All @@ -59,6 +59,15 @@ const std::unordered_map<::hybridse::type::Type, uint8_t>& GetTypeSizeMap() {
}
}

bool IsCodecBaseType(const type::ColumnSchema& sc) {
auto& map = GetTypeSizeMap();
return sc.has_base_type() && map.find(sc.base_type()) != map.end();
}

bool IsCodecStrLikeType(const type::ColumnSchema& sc) {
return sc.has_map_type() || sc.has_array_type() || (sc.has_base_type() && sc.base_type() == type::kVarchar);
}

RowBuilder::RowBuilder(const Schema& schema)
: schema_(schema),
buf_(NULL),
Expand All @@ -71,7 +80,7 @@ RowBuilder::RowBuilder(const Schema& schema)
str_field_start_offset_ = HEADER_LENGTH + BitMapSize(schema.size());
for (int idx = 0; idx < schema.size(); idx++) {
const ::hybridse::type::ColumnDef& column = schema.Get(idx);
if (column.type() == ::hybridse::type::kVarchar) {
if (IsCodecStrLikeType(column.schema())) {
if (FLAGS_enable_spark_unsaferow_format) {
offset_vec_.push_back(str_field_start_offset_);
str_field_start_offset_ += 8;
Expand All @@ -80,7 +89,7 @@ RowBuilder::RowBuilder(const Schema& schema)
}
str_field_cnt_++;
} else {
auto TYPE_SIZE_MAP = GetTypeSizeMap();
auto& TYPE_SIZE_MAP = GetTypeSizeMap();
auto iter = TYPE_SIZE_MAP.find(column.type());
if (iter == TYPE_SIZE_MAP.end()) {
LOG(WARNING) << ::hybridse::type::Type_Name(column.type())
Expand Down Expand Up @@ -155,7 +164,7 @@ bool RowBuilder::Check(::hybridse::type::Type type) {
return false;
}
if (column.type() != ::hybridse::type::kVarchar) {
auto TYPE_SIZE_MAP = GetTypeSizeMap();
auto& TYPE_SIZE_MAP = GetTypeSizeMap();
auto iter = TYPE_SIZE_MAP.find(column.type());
if (iter == TYPE_SIZE_MAP.end()) {
LOG(WARNING) << ::hybridse::type::Type_Name(column.type())
Expand Down Expand Up @@ -363,7 +372,7 @@ bool RowView::Init() {
uint32_t offset = HEADER_LENGTH + BitMapSize(schema_.size());
for (int idx = 0; idx < schema_.size(); idx++) {
const ::hybridse::type::ColumnDef& column = schema_.Get(idx);
if (column.type() == ::hybridse::type::kVarchar) {
if (IsCodecStrLikeType(column.schema())) {
if (FLAGS_enable_spark_unsaferow_format) {
offset_vec_.push_back(offset);
offset += 8;
Expand All @@ -372,7 +381,7 @@ bool RowView::Init() {
}
string_field_cnt_++;
} else {
auto TYPE_SIZE_MAP = GetTypeSizeMap();
auto& TYPE_SIZE_MAP = GetTypeSizeMap();
auto iter = TYPE_SIZE_MAP.find(column.type());
if (iter == TYPE_SIZE_MAP.end()) {
LOG(WARNING) << ::hybridse::type::Type_Name(column.type())
Expand Down Expand Up @@ -761,7 +770,12 @@ std::string RowView::GetAsString(uint32_t idx) {
return "NULL";
}
const ::hybridse::type::ColumnDef& column = schema_.Get(idx);
switch (column.type()) {
if (column.schema().has_map_type()) {
return "map{...}";
} else if (column.schema().has_array_type()) {
return "array[ ... ]";
} else if (column.schema().has_base_type()) {
switch (column.schema().base_type()) {
case hybridse::type::kInt32: {
int32_t value;
if (0 == GetInt32(idx, &value)) {
Expand Down Expand Up @@ -849,12 +863,12 @@ std::string RowView::GetAsString(uint32_t idx) {
break;
}
default: {
LOG(WARNING) << "fail to get string for "
"current row";
break;
}
}
}

LOG(WARNING) << "fail to get string for current row";
return "NA";
}

Expand Down Expand Up @@ -927,7 +941,8 @@ SliceFormat::SliceFormat(const hybridse::codec::Schema* schema)
uint32_t string_field_cnt = 0;
for (int32_t i = 0; i < schema_->size(); i++) {
const ::hybridse::type::ColumnDef& column = schema_->Get(i);
if (column.type() == ::hybridse::type::kVarchar) {
auto& schema = column.schema();
if (IsCodecStrLikeType(schema)) {
// backwards compatibility check
type::ColumnSchema col_schema;
if (column.has_schema()) {
Expand Down Expand Up @@ -968,8 +983,7 @@ SliceFormat::SliceFormat(const hybridse::codec::Schema* schema)
}
}
uint32_t next_pos = 0;
for (auto iter = next_str_pos_.rbegin(); iter != next_str_pos_.rend();
iter++) {
for (auto iter = next_str_pos_.rbegin(); iter != next_str_pos_.rend(); iter++) {
uint32_t tmp = iter->second;
iter->second = next_pos;
next_pos = tmp;
Expand Down
30 changes: 27 additions & 3 deletions hybridse/src/codec/type_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,33 @@ int32_t AppendString(int8_t* buf_ptr, uint32_t buf_size, uint32_t col_idx,
return str_body_offset + size;
}

int32_t GetStrCol(int8_t* input, int32_t row_idx, uint32_t col_idx,
int32_t str_field_offset, int32_t next_str_field_offset,
int32_t str_start_offset, int32_t type_id, int8_t* data) {
void EncodeStrOffset(int8_t* str_offset_ptr, int32_t str_body_offset, int32_t str_addr_space) {
switch (str_addr_space) {
case 1: {
*str_offset_ptr = static_cast<uint8_t>(str_body_offset);
break;
}

case 2: {
*(reinterpret_cast<uint16_t*>(str_offset_ptr)) = static_cast<uint16_t>(str_body_offset);
break;
}

case 3: {
*(reinterpret_cast<uint8_t*>(str_offset_ptr)) = str_body_offset >> 16;
*(reinterpret_cast<uint8_t*>(str_offset_ptr + 1)) = (str_body_offset & 0xFF00) >> 8;
*(reinterpret_cast<uint8_t*>(str_offset_ptr + 2)) = str_body_offset & 0x00FF;
break;
}

default: {
*(reinterpret_cast<uint32_t*>(str_offset_ptr)) = str_body_offset;
}
}
}

int32_t GetStrCol(int8_t* input, int32_t row_idx, uint32_t col_idx, int32_t str_field_offset,
int32_t next_str_field_offset, int32_t str_start_offset, int32_t type_id, int8_t* data) {
if (nullptr == input || nullptr == data) {
return -2;
}
Expand Down
21 changes: 10 additions & 11 deletions hybridse/src/codegen/aggregate_ir_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,8 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
const std::string& output_ptr_name,
const vm::Schema& output_schema) {
::llvm::LLVMContext& llvm_ctx = module_->getContext();
::llvm::IRBuilder<> builder(llvm_ctx);
CodeGenContextBase ctx(module_);
auto& builder = *ctx.GetBuilder();
auto void_ty = llvm::Type::getVoidTy(llvm_ctx);
auto int64_ty = llvm::Type::getInt64Ty(llvm_ctx);
expr_ir_builder->set_frame(nullptr, frame_node_);
Expand All @@ -596,7 +597,7 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
llvm::Type::getVoidTy(llvm_ctx), {ptr_ty, ptr_ty}, false);
::llvm::Function* fn = ::llvm::Function::Create(
fnt, llvm::Function::ExternalLinkage, fn_name, module_);
builder.SetInsertPoint(cur_block);
BlockGuard bg0(cur_block, &ctx);
builder.CreateCall(
module_->getOrInsertFunction(fn_name, fnt),
{window_ptr.GetValue(&builder), builder.CreateLoad(output_buf)});
Expand All @@ -615,7 +616,7 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
"Schedule agg ops failed")

// gen head
builder.SetInsertPoint(head_block);
BlockGuard bg1(head_block, &ctx);
for (auto& agg_generator : generators) {
agg_generator.GenInitState(&builder);
}
Expand All @@ -634,7 +635,7 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
builder.CreateBr(enter_block);

// gen iter begin
builder.SetInsertPoint(enter_block);
BlockGuard bg2(enter_block, &ctx);
auto bool_ty = llvm::Type::getInt1Ty(llvm_ctx);
auto has_next_func = module_->getOrInsertFunction(
"hybridse_storage_row_iter_has_next",
Expand All @@ -643,7 +644,7 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
builder.CreateCondBr(has_next, body_block, exit_block);

// gen iter body
builder.SetInsertPoint(body_block);
BlockGuard bg3(body_block, &ctx);
auto get_slice_func = module_->getOrInsertFunction(
"hybridse_storage_row_iter_get_cur_slice",
::llvm::FunctionType::get(ptr_ty, {ptr_ty, int64_ty}, false));
Expand Down Expand Up @@ -693,9 +694,7 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
auto& slice_info = used_slices[slice_idx];

ScopeVar dummy_scope_var;
BufNativeIRBuilder buf_builder(
schema_idx, schema_context_->GetRowFormat(),
body_block, &dummy_scope_var);
BufNativeIRBuilder buf_builder(&ctx, schema_idx, schema_context_->GetRowFormat(), &dummy_scope_var);
NativeValue field_value;
CHECK_TRUE(buf_builder.BuildGetField(info.col_idx, slice_info.first, slice_info.second, &field_value),
common::kCodegenGetFieldError, "fail to gen fetch column")
Expand Down Expand Up @@ -723,16 +722,16 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
builder.CreateBr(enter_block);

// gen iter end
builder.SetInsertPoint(exit_block);
BlockGuard bg4(exit_block, &ctx);
auto delete_iter_func = module_->getOrInsertFunction(
"hybridse_storage_row_iter_delete",
::llvm::FunctionType::get(void_ty, {ptr_ty}, false));
builder.CreateCall(delete_iter_func, {iter_ptr});

// store results to output row
std::map<uint32_t, NativeValue> dummy_map;
BufNativeEncoderIRBuilder output_encoder(&dummy_map, &output_schema,
exit_block);
BufNativeEncoderIRBuilder output_encoder(&ctx, &dummy_map, &output_schema);
CHECK_STATUS(output_encoder.Init());
for (auto& agg_generator : generators) {
std::vector<std::pair<size_t, NativeValue>> outputs;
agg_generator.GenOutputs(&builder, &outputs);
Expand Down
3 changes: 0 additions & 3 deletions hybridse/src/codegen/aggregate_ir_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ class AggregateIRBuilder {
AggregateIRBuilder(const vm::SchemasContext*, ::llvm::Module* module,
const node::FrameNode* frame_node, uint32_t id);

// TODO(someone): remove temporary implementations for row-wise agg
static bool EnableColumnAggOpt();

bool CollectAggColumn(const node::ExprNode* expr, size_t output_idx,
::hybridse::type::Type* col_type);

Expand Down
2 changes: 1 addition & 1 deletion hybridse/src/codegen/array_ir_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void ArrayIRBuilder::InitStructType() {
struct_type_ = stype;
}

absl::StatusOr<NativeValue> ArrayIRBuilder::Construct(CodeGenContext* ctx,
absl::StatusOr<NativeValue> ArrayIRBuilder::Construct(CodeGenContextBase* ctx,
absl::Span<NativeValue const> elements) const {
auto bb = ctx->GetCurrentBlock();
// alloc array struct
Expand Down
2 changes: 1 addition & 1 deletion hybridse/src/codegen/array_ir_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ArrayIRBuilder : public StructTypeIRBuilder {
~ArrayIRBuilder() override {}

// create a new array from `elements` as value
absl::StatusOr<NativeValue> Construct(CodeGenContext* ctx, absl::Span<const NativeValue> args) const override;
absl::StatusOr<NativeValue> Construct(CodeGenContextBase* ctx, absl::Span<const NativeValue> args) const override;

bool CopyFrom(::llvm::BasicBlock* block, ::llvm::Value* src, ::llvm::Value* dist) override { return true; }

Expand Down
Loading

0 comments on commit 2ef30f5

Please sign in to comment.