Skip to content

Commit

Permalink
Avoid using map based buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
syucream committed Jul 26, 2020
1 parent f524f61 commit acb6e0d
Show file tree
Hide file tree
Showing 15 changed files with 852 additions and 49 deletions.
3 changes: 2 additions & 1 deletion arrow/json/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"encoding/json"
"errors"
"fmt"
"io"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"io"
)

var (
Expand Down
18 changes: 9 additions & 9 deletions arrow/json/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,15 @@ func TestToGo(t *testing.T) {
// uint8 TODO support this case
// []uint8 will be converted base64-ed string
/*
{
data: func() *array.Data {
b := array.NewUint8Builder(pool)
b.AppendValues([]uint8{0, 1, 2}, nil)
return b.NewUint8Array().Data()
}(),
expected: []uint8{0, 1, 2},
err: nil,
},
{
data: func() *array.Data {
b := array.NewUint8Builder(pool)
b.AppendValues([]uint8{0, 1, 2}, nil)
return b.NewUint8Array().Data()
}(),
expected: []uint8{0, 1, 2},
err: nil,
},
*/

// uint16
Expand Down
3 changes: 2 additions & 1 deletion columnifier/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package columnifier

import (
"bytes"
"io/ioutil"

"github.com/reproio/columnify/arrow/json"
"github.com/reproio/columnify/record"
"github.com/xitongsys/parquet-go/marshal"
"io/ioutil"

"github.com/reproio/columnify/parquet"
"github.com/reproio/columnify/schema"
Expand Down
28 changes: 11 additions & 17 deletions record/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package record

import (
"fmt"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"strconv"
"time"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
)

type WrappedRecord struct {
Expand All @@ -19,24 +19,18 @@ func NewWrappedRecord(b *array.RecordBuilder) *WrappedRecord {
}
}

func formatMapToArrowRecord(s *arrow.Schema, maps []map[string]interface{}) (*WrappedRecord, error) {
pool := memory.NewGoAllocator()
b := array.NewRecordBuilder(pool, s)
defer b.Release()

for _, m := range maps {
for i, f := range s.Fields() {
if v, ok := m[f.Name]; ok {
if _, err := formatMapToArrowField(b.Field(i), f.Type, f.Nullable, v); err != nil {
return nil, err
}
} else {
b.Field(i).AppendNull()
func formatMapToArrowRecord(b *array.RecordBuilder, m map[string]interface{}) (*array.RecordBuilder, error) {
for i, f := range b.Schema().Fields() {
if v, ok := m[f.Name]; ok {
if _, err := formatMapToArrowField(b.Field(i), f.Type, f.Nullable, v); err != nil {
return nil, err
}
} else {
b.Field(i).AppendNull()
}
}

return NewWrappedRecord(b), nil
return b, nil
}

func formatMapToArrowStruct(b *array.StructBuilder, s *arrow.StructType, m map[string]interface{}) (*array.StructBuilder, error) {
Expand Down
16 changes: 11 additions & 5 deletions record/arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,17 +430,23 @@ func TestNewArrowSchemaFromAvroSchema(t *testing.T) {
},
}

pool := memory.NewGoAllocator()
for _, c := range cases {
expectedRecord := c.expected(c.schema)

actual, err := formatMapToArrowRecord(c.schema.ArrowSchema, c.input)
b := array.NewRecordBuilder(pool, c.schema.ArrowSchema)
defer b.Release()

if err != c.err {
t.Errorf("expected: %v, but actual: %v\n", c.err, err)
for _, v := range c.input {
_, err := formatMapToArrowRecord(b, v)
if err != c.err {
t.Errorf("expected: %v, but actual: %v\n", c.err, err)
}
}

if !reflect.DeepEqual(actual, expectedRecord) {
t.Errorf("values: expected: %v, but actual: %v\n", expectedRecord, actual)
r := NewWrappedRecord(b)
if !reflect.DeepEqual(r, expectedRecord) {
t.Errorf("values: expected: %v, but actual: %v\n", expectedRecord, r)
}
}
}
26 changes: 24 additions & 2 deletions record/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"fmt"

"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/reproio/columnify/schema"

"github.com/linkedin/goavro/v2"
Expand Down Expand Up @@ -57,10 +59,30 @@ func FormatAvroToMap(data []byte) ([]map[string]interface{}, error) {
}

func FormatAvroToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) {
maps, err := FormatAvroToMap(data)
pool := memory.NewGoAllocator()
b := array.NewRecordBuilder(pool, s.ArrowSchema)
defer b.Release()

r, err := goavro.NewOCFReader(bytes.NewReader(data))
if err != nil {
return nil, err
}

return formatMapToArrowRecord(s.ArrowSchema, maps)
for r.Scan() {
v, err := r.Read()
if err != nil {
return nil, err
}

m, mapOk := v.(map[string]interface{})
if !mapOk {
return nil, fmt.Errorf("invalid value %v: %w", v, ErrUnconvertibleRecord)
}

if _, err = formatMapToArrowRecord(b, flattenAvroUnion(m)); err != nil {
return nil, err
}
}

return NewWrappedRecord(b), nil
}
149 changes: 149 additions & 0 deletions record/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import (
"reflect"
"testing"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/reproio/columnify/schema"

"github.com/linkedin/goavro/v2"
)

Expand Down Expand Up @@ -136,3 +141,147 @@ func TestFormatAvroToMap(t *testing.T) {
}
}
}

func TestFormatAvroToArrow(t *testing.T) {
cases := []struct {
input []byte
schema *schema.IntermediateSchema
expected func(s *schema.IntermediateSchema) *WrappedRecord
isErr bool
}{
{
input: func() []byte {
w := &bytes.Buffer{}

r, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: w,
Schema: `
{
"type": "record",
"name": "Primitives",
"fields" : [
{"name": "boolean", "type": "boolean"},
{"name": "int", "type": "int"},
{"name": "long", "type": "long"},
{"name": "float", "type": "float"},
{"name": "double", "type": "double"},
{"name": "bytes", "type": "bytes"},
{"name": "string", "type": "string"}
]
}
`,
})
if err != nil {
t.Fatal(err)
}

err = r.Append([]map[string]interface{}{
{
"boolean": false,
"bytes": string([]byte("foo")),
"double": 1.1,
"float": 1.1,
"int": 1,
"long": 1,
"string": "foo",
},
{
"boolean": true,
"bytes": string([]byte("bar")),
"double": 2.2,
"float": 2.2,
"int": 2,
"long": 2,
"string": "bar",
},
})
if err != nil {
t.Fatal(err)
}

return w.Bytes()
}(),
schema: schema.NewIntermediateSchema(
arrow.NewSchema(
[]arrow.Field{
{
Name: "boolean",
Type: arrow.FixedWidthTypes.Boolean,
Nullable: false,
},
{
Name: "int",
Type: arrow.PrimitiveTypes.Uint32,
Nullable: false,
},
{
Name: "long",
Type: arrow.PrimitiveTypes.Uint64,
Nullable: false,
},
{
Name: "float",
Type: arrow.PrimitiveTypes.Float32,
Nullable: false,
},
{
Name: "double",
Type: arrow.PrimitiveTypes.Float64,
Nullable: false,
},
{
Name: "bytes",
Type: arrow.BinaryTypes.Binary,
Nullable: false,
},
{
Name: "string",
Type: arrow.BinaryTypes.String,
Nullable: false,
},
}, nil),
"primitives"),
expected: func(s *schema.IntermediateSchema) *WrappedRecord {
pool := memory.NewGoAllocator()
b := array.NewRecordBuilder(pool, s.ArrowSchema)
defer b.Release()

b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true})
b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true})
b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true})
b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true})
b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true})
b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true})
b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true})

return NewWrappedRecord(b)
},
isErr: false,
},

// Not avro
{
input: []byte("not-valid-avro"),
schema: schema.NewIntermediateSchema(
arrow.NewSchema([]arrow.Field{}, nil),
""),
expected: func(s *schema.IntermediateSchema) *WrappedRecord {
return nil
},
isErr: true,
},
}

for _, c := range cases {
actual, err := FormatAvroToArrow(c.schema, c.input)

if err != nil != c.isErr {
t.Errorf("expected: %v, but actual: %v\n", c.isErr, err)
}

expectedRecord := c.expected(c.schema)
if !reflect.DeepEqual(actual, expectedRecord) {
t.Errorf("expected: %v, but actual: %v\n", expectedRecord, actual)
}
}
}
Loading

0 comments on commit acb6e0d

Please sign in to comment.