Skip to content

Commit

Permalink
feat: offline map type (#3746)
Browse files Browse the repository at this point in the history
* feat(offline): query support for map data type

Happy path only.

TODO:
- support empty map

* feat(jit_wrapper): simplify jit initialization

* fix(offline): encoding map value with codegen

ensure each row encoded with distinct jit instance, work-around to #3748
  • Loading branch information
aceforeverd authored Mar 22, 2024
1 parent 065021e commit 524dba1
Show file tree
Hide file tree
Showing 31 changed files with 668 additions and 194 deletions.
5 changes: 3 additions & 2 deletions hybridse/include/base/fe_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

#ifndef HYBRIDSE_INCLUDE_BASE_FE_SLICE_H_
#define HYBRIDSE_INCLUDE_BASE_FE_SLICE_H_

#include <assert.h>
#include <memory.h>
#include <stddef.h>
#include <string.h>
#include <memory>

#include <string>

#include "base/raw_buffer.h"
#include "boost/smart_ptr/local_shared_ptr.hpp"

namespace hybridse {
namespace base {
Expand Down
3 changes: 1 addition & 2 deletions hybridse/include/base/raw_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

#ifndef HYBRIDSE_INCLUDE_BASE_RAW_BUFFER_H_
#define HYBRIDSE_INCLUDE_BASE_RAW_BUFFER_H_
#include <assert.h>

#include <stddef.h>
#include <string.h>
#include <string>

#include "glog/logging.h"

Expand Down
63 changes: 63 additions & 0 deletions hybridse/include/codec/fe_row_codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,30 @@
#define HYBRIDSE_INCLUDE_CODEC_FE_ROW_CODEC_H_

#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

#include "absl/status/statusor.h"
#include "base/fe_status.h"
#include "base/fe_slice.h"
#include "base/raw_buffer.h"
#include "codec/row.h"
#include "proto/fe_type.pb.h"

namespace hybridse {

namespace node {
class ExprNode;
}
namespace codegen {
class InsertRowBuilder;
}
namespace vm {
class HybridSeJitWrapper;
}

namespace codec {

const uint32_t BitMapSize(uint32_t size);
Expand Down Expand Up @@ -74,6 +89,54 @@ inline uint32_t GetStartOffset(int32_t column_count) {
void FillNullStringOffset(int8_t* buf, uint32_t start, uint32_t addr_length,
uint32_t str_idx, uint32_t str_offset);


// single slice builder from pure codegen
class SliceBuilder {
public:
SliceBuilder(vm::HybridSeJitWrapper*, const hybridse::codec::Schema* schema);
virtual ~SliceBuilder() {}

base::Status Build(const std::vector<node::ExprNode*>&, base::RefCountedSlice*);

base::Status Build(absl::Span<node::ExprNode* const>, base::RefCountedSlice*);

private:
void EnsureInitialized() {
assert(row_builder_ != nullptr && "must initialize the row builder before encoding");
}

const Schema* schema_;
std::shared_ptr<codegen::InsertRowBuilder> row_builder_ = nullptr;
};

// new row builder from pure codegen
class RowBuilder2 {
public:
RowBuilder2(vm::HybridSeJitWrapper*, int sliceSize);
RowBuilder2(vm::HybridSeJitWrapper*, const std::vector<codec::Schema>& schemas);
RowBuilder2(vm::HybridSeJitWrapper*, const std::vector<std::vector<hybridse::type::ColumnDef>>& schemas);
~RowBuilder2() {}

base::Status Init();

base::Status InitSchema(int idx, const codec::Schema& sc);

base::Status Build(const std::vector<node::ExprNode*>&, codec::Row*);

private:
void EnsureInitialized() {
assert(initialized_ && "RowBuild not initialized");
}

vm::HybridSeJitWrapper* jit_ = nullptr;
std::vector<codec::Schema> schemas_;
std::vector<std::shared_ptr<SliceBuilder>> builders_;

bool initialized_ = false;
};

// Old row builder in C
// limited data type support, no map, no array. U should upgrade to RowBuilder2
class RowBuilder {
public:
explicit RowBuilder(const hybridse::codec::Schema& schema);
Expand Down
5 changes: 0 additions & 5 deletions hybridse/include/codec/row.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@
#define HYBRIDSE_INCLUDE_CODEC_ROW_H_

#include <cstdint>
#include <map>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "base/fe_slice.h"
#include "base/raw_buffer.h"
#include "proto/fe_type.pb.h"

namespace hybridse {
namespace codec {
Expand Down
98 changes: 98 additions & 0 deletions hybridse/src/codec/fe_row_codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

#include "codec/type_codec.h"
#include "gflags/gflags.h"
#include "codegen/insert_row_builder.h"
#include "glog/logging.h"
#include "proto/fe_common.pb.h"
#include "vm/engine.h"

DECLARE_bool(enable_spark_unsaferow_format);

Expand Down Expand Up @@ -1033,5 +1036,100 @@ absl::StatusOr<StringColInfo> SliceFormat::GetStringColumnInfo(size_t idx) const
str_field_start_offset_);
}

SliceBuilder::SliceBuilder(vm::HybridSeJitWrapper* jit, const hybridse::codec::Schema* schema)
: schema_(schema) {
row_builder_ = std::make_shared<codegen::InsertRowBuilder>(jit, schema_);
}

base::Status SliceBuilder::Build(const std::vector<node::ExprNode* >& values, base::RefCountedSlice* slice) {
return Build(absl::MakeSpan(values), slice);
}

base::Status SliceBuilder::Build(absl::Span<node::ExprNode* const> values, base::RefCountedSlice* slice) {
EnsureInitialized();

auto rs = row_builder_->ComputeRowUnsafe(values);
if (!rs.ok()) {
return {common::kCodegenEncodeError, rs.status().ToString()};
}

auto buf = rs.value();
if (buf == nullptr) {
return {common::kCodegenEncodeError, "internal error: encoded buf is null"};
}

*slice = base::RefCountedSlice::CreateManaged(buf, RowView::GetSize(buf));

return {};
}

RowBuilder2::RowBuilder2(vm::HybridSeJitWrapper* jit, int sliceSize) : jit_(jit) {
schemas_.resize(sliceSize);
builders_.resize(sliceSize);
}
RowBuilder2::RowBuilder2(vm::HybridSeJitWrapper* jit, const std::vector<codec::Schema>& schemas)
: jit_(jit), schemas_(schemas) {
builders_.resize(schemas_.size());
}
RowBuilder2::RowBuilder2(vm::HybridSeJitWrapper* jit,
const std::vector<std::vector<hybridse::type::ColumnDef>>& schemas)
: jit_(jit) {
for (auto& sc : schemas) {
schemas_.push_back(Schema());
auto& ref = schemas_.back();
for (auto& col : sc) {
ref.Add()->CopyFrom(col);
}
}
builders_.resize(schemas_.size());
}

base::Status RowBuilder2::Init() {
CHECK_TRUE(jit_ != nullptr, common::kCodegenEncodeError, "jit is null");
for (size_t i = 0; i < schemas_.size(); ++i) {
CHECK_TRUE(!schemas_[i].empty(), common::kCodegenEncodeError, absl::StrCat(i, "th schema un-initialized"));
if (builders_[i] == nullptr) {
builders_[i] = std::make_shared<SliceBuilder>(jit_, &schemas_[i]);
}
}

initialized_ = true;
return {};
}

base::Status RowBuilder2::Build(const std::vector<node::ExprNode*>& values, codec::Row* out) {
EnsureInitialized();

auto expect_cols =
std::accumulate(schemas_.begin(), schemas_.end(), 0, [](int val, const auto& e) { return val + e.size(); });
CHECK_TRUE(values.size() == expect_cols, common::kCodegenEncodeError, "pass in expr number do not match, expect ",
expect_cols, " but got ", values.size());

int col_idx = 0;
Row row;
auto values_ref = absl::MakeSpan(values);
for (size_t i = 0; i < schemas_.size(); ++i) {
RefCountedSlice slice;
CHECK_STATUS(builders_[i]->Build(values_ref.subspan(col_idx, schemas_[i].size()), &slice));
if (i == 0) {
row.Reset(slice);
} else {
row.Append(slice);
}

col_idx += schemas_[i].size();
}

*out = row;

return {};
}
base::Status RowBuilder2::InitSchema(int idx, const codec::Schema& sc) {
if (idx >= schemas_.size()) {
return {common::kCodegenEncodeError, "idx out of bound"};
}
schemas_[idx] = sc;
return {};
}
} // namespace codec
} // namespace hybridse
3 changes: 1 addition & 2 deletions hybridse/src/codegen/aggregate_ir_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,7 @@ base::Status AggregateIRBuilder::BuildMulti(const std::string& base_funcname,
std::vector<std::pair<size_t, NativeValue>> outputs;
agg_generator.GenOutputs(&builder, &outputs);
for (auto pair : outputs) {
output_encoder.BuildEncodePrimaryField(output_arg, pair.first,
pair.second);
CHECK_STATUS(output_encoder.BuildEncodePrimaryField(output_arg, pair.first, pair.second));
}
}
builder.CreateRetVoid();
Expand Down
3 changes: 3 additions & 0 deletions hybridse/src/codegen/buf_ir_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ class BufNativeEncoderIRBuilder : public RowEncodeIRBuilder {

~BufNativeEncoderIRBuilder() override;

ABSL_MUST_USE_RESULT
base::Status Init() noexcept;

// the output_ptr like int8_t**
ABSL_MUST_USE_RESULT
base::Status BuildEncode(::llvm::Value* output_ptr) override;

ABSL_MUST_USE_RESULT
base::Status BuildEncodePrimaryField(::llvm::Value* buf, size_t idx, const NativeValue& val);

private:
Expand Down
3 changes: 0 additions & 3 deletions hybridse/src/codegen/buf_ir_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ void RunEncode(::hybridse::type::TableDef& table, // NOLINT
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(
vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());
ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
auto load_fn_jit = jit->FindFunction("fn");
void (*decode)(int8_t**) =
Expand Down Expand Up @@ -307,7 +306,6 @@ void LoadValue(T* result, bool* is_null,
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(
vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());
ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
auto load_fn_jit = jit->FindFunction("fn");

Expand Down Expand Up @@ -438,7 +436,6 @@ void RunColCase(T expected, type::TableDef& table, // NOLINT
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(
vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());
ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
jit->AddExternalFunction("print_list_i16",
reinterpret_cast<void*>(&PrintListInt16));
Expand Down
1 change: 0 additions & 1 deletion hybridse/src/codegen/fn_ir_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ void CheckResult(node::FnNodeFnDef *fn_def, R exp, V1 a, V2 b) {
m->print(::llvm::errs(), NULL, true, true);
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());
ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
auto test_fn = (R(*)(V1, V2))jit->FindFunction(fn_def->header_->GeIRFunctionName());
R result = test_fn(a, b);
Expand Down
1 change: 0 additions & 1 deletion hybridse/src/codegen/fn_let_ir_builder_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ void CheckFnLetBuilderWithParameterRow(::hybridse::node::NodeManager* manager, v
auto jit = std::unique_ptr<vm::HybridSeJitWrapper>(
vm::HybridSeJitWrapper::Create());
jit->Init();
vm::HybridSeJitWrapper::InitJitSymbols(jit.get());

ASSERT_TRUE(jit->AddModule(std::move(m), std::move(ctx)));
auto address = jit->FindFunction("test_at_fn");
Expand Down
Loading

0 comments on commit 524dba1

Please sign in to comment.