diff --git a/arrow/json/writer.go b/arrow/json/writer.go index 6d410b2..df0971f 100644 --- a/arrow/json/writer.go +++ b/arrow/json/writer.go @@ -4,9 +4,10 @@ import ( "encoding/json" "errors" "fmt" + "io" + "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" - "io" ) var ( diff --git a/arrow/json/writer_test.go b/arrow/json/writer_test.go index afd5ccc..f3ea0ff 100644 --- a/arrow/json/writer_test.go +++ b/arrow/json/writer_test.go @@ -211,15 +211,15 @@ func TestToGo(t *testing.T) { // uint8 TODO support this case // []uint8 will be converted base64-ed string /* - { - data: func() *array.Data { - b := array.NewUint8Builder(pool) - b.AppendValues([]uint8{0, 1, 2}, nil) - return b.NewUint8Array().Data() - }(), - expected: []uint8{0, 1, 2}, - err: nil, - }, + { + data: func() *array.Data { + b := array.NewUint8Builder(pool) + b.AppendValues([]uint8{0, 1, 2}, nil) + return b.NewUint8Array().Data() + }(), + expected: []uint8{0, 1, 2}, + err: nil, + }, */ // uint16 diff --git a/columnifier/parquet.go b/columnifier/parquet.go index fdce56a..a9aea47 100644 --- a/columnifier/parquet.go +++ b/columnifier/parquet.go @@ -2,10 +2,11 @@ package columnifier import ( "bytes" + "io/ioutil" + "github.com/reproio/columnify/arrow/json" "github.com/reproio/columnify/record" "github.com/xitongsys/parquet-go/marshal" - "io/ioutil" "github.com/reproio/columnify/parquet" "github.com/reproio/columnify/schema" diff --git a/record/arrow.go b/record/arrow.go index 7001400..23ac24d 100644 --- a/record/arrow.go +++ b/record/arrow.go @@ -2,11 +2,11 @@ package record import ( "fmt" - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" "strconv" "time" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" ) type WrappedRecord struct { @@ -19,24 +19,18 @@ func NewWrappedRecord(b *array.RecordBuilder) *WrappedRecord { } } -func formatMapToArrowRecord(s *arrow.Schema, maps []map[string]interface{}) (*WrappedRecord, error) { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s) - defer b.Release() - - for _, m := range maps { - for i, f := range s.Fields() { - if v, ok := m[f.Name]; ok { - if _, err := formatMapToArrowField(b.Field(i), f.Type, f.Nullable, v); err != nil { - return nil, err - } - } else { - b.Field(i).AppendNull() +func formatMapToArrowRecord(b *array.RecordBuilder, m map[string]interface{}) (*array.RecordBuilder, error) { + for i, f := range b.Schema().Fields() { + if v, ok := m[f.Name]; ok { + if _, err := formatMapToArrowField(b.Field(i), f.Type, f.Nullable, v); err != nil { + return nil, err } + } else { + b.Field(i).AppendNull() } } - return NewWrappedRecord(b), nil + return b, nil } func formatMapToArrowStruct(b *array.StructBuilder, s *arrow.StructType, m map[string]interface{}) (*array.StructBuilder, error) { diff --git a/record/arrow_test.go b/record/arrow_test.go index 02eda1d..7381c9f 100644 --- a/record/arrow_test.go +++ b/record/arrow_test.go @@ -430,17 +430,23 @@ func TestNewArrowSchemaFromAvroSchema(t *testing.T) { }, } + pool := memory.NewGoAllocator() for _, c := range cases { expectedRecord := c.expected(c.schema) - actual, err := formatMapToArrowRecord(c.schema.ArrowSchema, c.input) + b := array.NewRecordBuilder(pool, c.schema.ArrowSchema) + defer b.Release() - if err != c.err { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) + for _, v := range c.input { + _, err := formatMapToArrowRecord(b, v) + if err != c.err { + t.Errorf("expected: %v, but actual: %v\n", c.err, err) + } } - if !reflect.DeepEqual(actual, expectedRecord) { - t.Errorf("values: expected: %v, but actual: %v\n", expectedRecord, actual) + r := NewWrappedRecord(b) + if !reflect.DeepEqual(r, expectedRecord) { + t.Errorf("values: expected: %v, but actual: %v\n", expectedRecord, r) } } } diff --git a/record/avro.go b/record/avro.go index 156bee7..168907c 100644 --- a/record/avro.go +++ b/record/avro.go @@ -4,6 +4,8 @@ import ( "bytes" "fmt" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" "github.com/reproio/columnify/schema" "github.com/linkedin/goavro/v2" @@ -57,10 +59,30 @@ func FormatAvroToMap(data []byte) ([]map[string]interface{}, error) { } func FormatAvroToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatAvroToMap(data) + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + r, err := goavro.NewOCFReader(bytes.NewReader(data)) if err != nil { return nil, err } - return formatMapToArrowRecord(s.ArrowSchema, maps) + for r.Scan() { + v, err := r.Read() + if err != nil { + return nil, err + } + + m, mapOk := v.(map[string]interface{}) + if !mapOk { + return nil, fmt.Errorf("invalid value %v: %w", v, ErrUnconvertibleRecord) + } + + if _, err = formatMapToArrowRecord(b, flattenAvroUnion(m)); err != nil { + return nil, err + } + } + + return NewWrappedRecord(b), nil } diff --git a/record/avro_test.go b/record/avro_test.go index b68f9e2..35e2991 100644 --- a/record/avro_test.go +++ b/record/avro_test.go @@ -5,6 +5,11 @@ import ( "reflect" "testing" + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/reproio/columnify/schema" + "github.com/linkedin/goavro/v2" ) @@ -136,3 +141,147 @@ func TestFormatAvroToMap(t *testing.T) { } } } + +func TestFormatAvroToArrow(t *testing.T) { + cases := []struct { + input []byte + schema *schema.IntermediateSchema + expected func(s *schema.IntermediateSchema) *WrappedRecord + isErr bool + }{ + { + input: func() []byte { + w := &bytes.Buffer{} + + r, err := goavro.NewOCFWriter(goavro.OCFConfig{ + W: w, + Schema: ` +{ + "type": "record", + "name": "Primitives", + "fields" : [ + {"name": "boolean", "type": "boolean"}, + {"name": "int", "type": "int"}, + {"name": "long", "type": "long"}, + {"name": "float", "type": "float"}, + {"name": "double", "type": "double"}, + {"name": "bytes", "type": "bytes"}, + {"name": "string", "type": "string"} + ] +} +`, + }) + if err != nil { + t.Fatal(err) + } + + err = r.Append([]map[string]interface{}{ + { + "boolean": false, + "bytes": string([]byte("foo")), + "double": 1.1, + "float": 1.1, + "int": 1, + "long": 1, + "string": "foo", + }, + { + "boolean": true, + "bytes": string([]byte("bar")), + "double": 2.2, + "float": 2.2, + "int": 2, + "long": 2, + "string": "bar", + }, + }) + if err != nil { + t.Fatal(err) + } + + return w.Bytes() + }(), + schema: schema.NewIntermediateSchema( + arrow.NewSchema( + []arrow.Field{ + { + Name: "boolean", + Type: arrow.FixedWidthTypes.Boolean, + Nullable: false, + }, + { + Name: "int", + Type: arrow.PrimitiveTypes.Uint32, + Nullable: false, + }, + { + Name: "long", + Type: arrow.PrimitiveTypes.Uint64, + Nullable: false, + }, + { + Name: "float", + Type: arrow.PrimitiveTypes.Float32, + Nullable: false, + }, + { + Name: "double", + Type: arrow.PrimitiveTypes.Float64, + Nullable: false, + }, + { + Name: "bytes", + Type: arrow.BinaryTypes.Binary, + Nullable: false, + }, + { + Name: "string", + Type: arrow.BinaryTypes.String, + Nullable: false, + }, + }, nil), + "primitives"), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) + b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) + b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) + b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) + b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) + b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) + b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) + + return NewWrappedRecord(b) + }, + isErr: false, + }, + + // Not avro + { + input: []byte("not-valid-avro"), + schema: schema.NewIntermediateSchema( + arrow.NewSchema([]arrow.Field{}, nil), + ""), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + return nil + }, + isErr: true, + }, + } + + for _, c := range cases { + actual, err := FormatAvroToArrow(c.schema, c.input) + + if err != nil != c.isErr { + t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + } + + expectedRecord := c.expected(c.schema) + if !reflect.DeepEqual(actual, expectedRecord) { + t.Errorf("expected: %v, but actual: %v\n", expectedRecord, actual) + } + } +} diff --git a/record/csv.go b/record/csv.go index a12ed71..c5f3f7e 100644 --- a/record/csv.go +++ b/record/csv.go @@ -7,6 +7,9 @@ import ( "strconv" "strings" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/reproio/columnify/schema" ) @@ -89,10 +92,62 @@ func FormatCsvToMap(s *schema.IntermediateSchema, data []byte, delimiter delimit } func FormatCsvToArrow(s *schema.IntermediateSchema, data []byte, delimiter delimiter) (*WrappedRecord, error) { - maps, err := FormatCsvToMap(s, data, delimiter) + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + names, err := getFieldNamesFromSchema(s) if err != nil { return nil, err } - return formatMapToArrowRecord(s.ArrowSchema, maps) + reader := csv.NewReader(strings.NewReader(string(data))) + reader.Comma = rune(delimiter) + + numFields := len(names) + for { + values, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + if numFields != len(values) { + return nil, fmt.Errorf("incompleted value %v: %w", values, ErrUnconvertibleRecord) + } + + e := make(map[string]interface{}) + for i, v := range values { + // bool + if v != "0" && v != "1" { + if vv, err := strconv.ParseBool(v); err == nil { + e[names[i]] = vv + continue + } + } + + // int + if vv, err := strconv.ParseInt(v, 10, 64); err == nil { + e[names[i]] = vv + continue + } + + // float + if vv, err := strconv.ParseFloat(v, 64); err == nil { + e[names[i]] = vv + continue + } + + // others; to string + e[names[i]] = v + } + + if _, err := formatMapToArrowRecord(b, e); err != nil { + return nil, err + } + } + + return NewWrappedRecord(b), nil } diff --git a/record/csv_test.go b/record/csv_test.go index a75406f..13c0e25 100644 --- a/record/csv_test.go +++ b/record/csv_test.go @@ -4,6 +4,9 @@ import ( "reflect" "testing" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/arrow" "github.com/reproio/columnify/schema" ) @@ -188,3 +191,179 @@ true 2 2 2.2 2.2 bar bar`), } } } + +func TestFormatCsvToArrow(t *testing.T) { + cases := []struct { + schema *schema.IntermediateSchema + input []byte + delimiter delimiter + expected func(s *schema.IntermediateSchema) *WrappedRecord + isErr bool + }{ + // csv; Primitives + { + schema: schema.NewIntermediateSchema( + arrow.NewSchema( + []arrow.Field{ + { + Name: "boolean", + Type: arrow.FixedWidthTypes.Boolean, + Nullable: false, + }, + { + Name: "int", + Type: arrow.PrimitiveTypes.Uint32, + Nullable: false, + }, + { + Name: "long", + Type: arrow.PrimitiveTypes.Uint64, + Nullable: false, + }, + { + Name: "float", + Type: arrow.PrimitiveTypes.Float32, + Nullable: false, + }, + { + Name: "double", + Type: arrow.PrimitiveTypes.Float64, + Nullable: false, + }, + { + Name: "bytes", + Type: arrow.BinaryTypes.Binary, + Nullable: false, + }, + { + Name: "string", + Type: arrow.BinaryTypes.String, + Nullable: false, + }, + }, nil), + "primitives"), + input: []byte(`false,1,1,1.1,1.1,"foo","foo" +true,2,2,2.2,2.2,"bar","bar"`), + delimiter: CsvDelimiter, + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) + b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) + b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) + b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) + b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) + b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) + b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) + + return NewWrappedRecord(b) + }, + isErr: false, + }, + + // tsv; Primitives + { + schema: schema.NewIntermediateSchema( + arrow.NewSchema( + []arrow.Field{ + { + Name: "boolean", + Type: arrow.FixedWidthTypes.Boolean, + Nullable: false, + }, + { + Name: "int", + Type: arrow.PrimitiveTypes.Uint32, + Nullable: false, + }, + { + Name: "long", + Type: arrow.PrimitiveTypes.Uint64, + Nullable: false, + }, + { + Name: "float", + Type: arrow.PrimitiveTypes.Float32, + Nullable: false, + }, + { + Name: "double", + Type: arrow.PrimitiveTypes.Float64, + Nullable: false, + }, + { + Name: "bytes", + Type: arrow.BinaryTypes.Binary, + Nullable: false, + }, + { + Name: "string", + Type: arrow.BinaryTypes.String, + Nullable: false, + }, + }, nil), + "primitives"), + input: []byte(`false 1 1 1.1 1.1 foo foo +true 2 2 2.2 2.2 bar bar`), + delimiter: TsvDelimiter, + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) + b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) + b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) + b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) + b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) + b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) + b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) + + return NewWrappedRecord(b) + }, + isErr: false, + }, + + // Not csv + { + schema: schema.NewIntermediateSchema( + arrow.NewSchema([]arrow.Field{}, nil), + "primitives", + ), + input: []byte("not-valid-csv"), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + return nil + }, + isErr: true, + }, + + // Not tsv + { + schema: schema.NewIntermediateSchema( + arrow.NewSchema([]arrow.Field{}, nil), + "primitives", + ), + input: []byte("not-valid-tsv"), + delimiter: TsvDelimiter, + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + return nil + }, + isErr: true, + }, + } + + for _, c := range cases { + actual, err := FormatCsvToArrow(c.schema, c.input, c.delimiter) + + if err != nil != c.isErr { + t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + } + + expectedRecord := c.expected(c.schema) + if !reflect.DeepEqual(actual, expectedRecord) { + t.Errorf("expected: %v, but actual: %v\n", expectedRecord, actual) + } + } +} diff --git a/record/jsonl.go b/record/jsonl.go index 48144cd..74a1fe2 100644 --- a/record/jsonl.go +++ b/record/jsonl.go @@ -4,6 +4,9 @@ import ( "encoding/json" "strings" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/reproio/columnify/schema" ) @@ -29,10 +32,25 @@ func FormatJsonlToMap(data []byte) ([]map[string]interface{}, error) { } func FormatJsonlToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatJsonlToMap(data) - if err != nil { - return nil, err + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + for _, l := range strings.Split(string(data), "\n") { + if l == "" { + // skip blank line + continue + } + + var e map[string]interface{} + if err := json.Unmarshal([]byte(l), &e); err != nil { + return nil, err + } + + if _, err := formatMapToArrowRecord(b, e); err != nil { + return nil, err + } } - return formatMapToArrowRecord(s.ArrowSchema, maps) + return NewWrappedRecord(b), nil } diff --git a/record/jsonl_test.go b/record/jsonl_test.go index db07a9e..a5d744e 100644 --- a/record/jsonl_test.go +++ b/record/jsonl_test.go @@ -3,6 +3,11 @@ package record import ( "reflect" "testing" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/reproio/columnify/schema" ) func TestFormatJsonlToMap(t *testing.T) { @@ -60,3 +65,101 @@ func TestFormatJsonlToMap(t *testing.T) { } } } + +func TestFormatJsonlToArrow(t *testing.T) { + cases := []struct { + input []byte + schema *schema.IntermediateSchema + expected func(s *schema.IntermediateSchema) *WrappedRecord + isErr bool + }{ + // Primitives + { + input: []byte( + `{"boolean": false, "int": 1, "long": 1, "float": 1.1, "double": 1.1, "bytes": "foo", "string": "foo"} +{"boolean": true, "int": 2, "long": 2, "float": 2.2, "double": 2.2, "bytes": "bar", "string": "bar"}`, + ), + schema: schema.NewIntermediateSchema( + arrow.NewSchema( + []arrow.Field{ + { + Name: "boolean", + Type: arrow.FixedWidthTypes.Boolean, + Nullable: false, + }, + { + Name: "int", + Type: arrow.PrimitiveTypes.Uint32, + Nullable: false, + }, + { + Name: "long", + Type: arrow.PrimitiveTypes.Uint64, + Nullable: false, + }, + { + Name: "float", + Type: arrow.PrimitiveTypes.Float32, + Nullable: false, + }, + { + Name: "double", + Type: arrow.PrimitiveTypes.Float64, + Nullable: false, + }, + { + Name: "bytes", + Type: arrow.BinaryTypes.Binary, + Nullable: false, + }, + { + Name: "string", + Type: arrow.BinaryTypes.String, + Nullable: false, + }, + }, nil), + "primitives"), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) + b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) + b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) + b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) + b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) + b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) + b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) + + return NewWrappedRecord(b) + }, + isErr: false, + }, + + // Not JSONL + { + input: []byte("not-valid-json"), + schema: schema.NewIntermediateSchema( + arrow.NewSchema([]arrow.Field{}, nil), + ""), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + return nil + }, + isErr: true, + }, + } + + for _, c := range cases { + actual, err := FormatJsonlToArrow(c.schema, c.input) + + if err != nil != c.isErr { + t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + } + + expectedRecord := c.expected(c.schema) + if !reflect.DeepEqual(actual, expectedRecord) { + t.Errorf("expected: %v, but actual: %v\n", expectedRecord, actual) + } + } +} diff --git a/record/ltsv.go b/record/ltsv.go index c76c1d9..eabc590 100644 --- a/record/ltsv.go +++ b/record/ltsv.go @@ -4,6 +4,9 @@ import ( "strconv" "strings" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/reproio/columnify/schema" "github.com/Songmu/go-ltsv" @@ -54,10 +57,48 @@ func FormatLtsvToMap(data []byte) ([]map[string]interface{}, error) { } func FormatLtsvToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatLtsvToMap(data) - if err != nil { - return nil, err + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + for _, l := range strings.Split(string(data), "\n") { + v := map[string]string{} + + err := ltsv.Unmarshal([]byte(l), &v) + if err != nil { + return nil, err + } + + m := make(map[string]interface{}) + for k, v := range v { + // bool + if v != "0" && v != "1" { + if vv, err := strconv.ParseBool(v); err == nil { + m[k] = vv + continue + } + } + + // int + if vv, err := strconv.ParseInt(v, 10, 64); err == nil { + m[k] = vv + continue + } + + // float + if vv, err := strconv.ParseFloat(v, 64); err == nil { + m[k] = vv + continue + } + + // others; to string + m[k] = v + } + + if _, err := formatMapToArrowRecord(b, m); err != nil { + return nil, err + } } - return formatMapToArrowRecord(s.ArrowSchema, maps) + return NewWrappedRecord(b), nil } diff --git a/record/ltsv_test.go b/record/ltsv_test.go index 3b0ff18..b784d44 100644 --- a/record/ltsv_test.go +++ b/record/ltsv_test.go @@ -3,6 +3,11 @@ package record import ( "reflect" "testing" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/reproio/columnify/schema" ) func TestFormatLtsvToMap(t *testing.T) { @@ -58,3 +63,99 @@ boolean:true int:2 long:2 float:2.2 double:2.2 bytes:bar string:bar`), } } } + +func TestFormatLtsvToArrow(t *testing.T) { + cases := []struct { + input []byte + schema *schema.IntermediateSchema + expected func(s *schema.IntermediateSchema) *WrappedRecord + isErr bool + }{ + // Primitives + { + input: []byte(`boolean:false int:1 long:1 float:1.1 double:1.1 bytes:foo string:foo +boolean:true int:2 long:2 float:2.2 double:2.2 bytes:bar string:bar`), + schema: schema.NewIntermediateSchema( + arrow.NewSchema( + []arrow.Field{ + { + Name: "boolean", + Type: arrow.FixedWidthTypes.Boolean, + Nullable: false, + }, + { + Name: "int", + Type: arrow.PrimitiveTypes.Uint32, + Nullable: false, + }, + { + Name: "long", + Type: arrow.PrimitiveTypes.Uint64, + Nullable: false, + }, + { + Name: "float", + Type: arrow.PrimitiveTypes.Float32, + Nullable: false, + }, + { + Name: "double", + Type: arrow.PrimitiveTypes.Float64, + Nullable: false, + }, + { + Name: "bytes", + Type: arrow.BinaryTypes.Binary, + Nullable: false, + }, + { + Name: "string", + Type: arrow.BinaryTypes.String, + Nullable: false, + }, + }, nil), + "primitives"), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) + b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) + b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) + b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) + b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) + b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) + b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) + + return NewWrappedRecord(b) + }, + isErr: false, + }, + + // Not LTSV + { + input: []byte("not-valid-ltsv"), + schema: schema.NewIntermediateSchema( + arrow.NewSchema([]arrow.Field{}, nil), + ""), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + return nil + }, + isErr: true, + }, + } + + for _, c := range cases { + actual, err := FormatLtsvToArrow(c.schema, c.input) + + if err != nil != c.isErr { + t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + } + + expectedRecord := c.expected(c.schema) + if !reflect.DeepEqual(actual, expectedRecord) { + t.Errorf("expected: %v, but actual: %v\n", expectedRecord, actual) + } + } +} diff --git a/record/msgpack.go b/record/msgpack.go index 7200ed2..fe677c8 100644 --- a/record/msgpack.go +++ b/record/msgpack.go @@ -5,6 +5,9 @@ import ( "fmt" "io" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/reproio/columnify/schema" "github.com/vmihailenco/msgpack/v4" @@ -34,10 +37,28 @@ func FormatMsgpackToMap(data []byte) ([]map[string]interface{}, error) { } func FormatMsgpackToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatMsgpackToMap(data) - if err != nil { - return nil, err + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + d := msgpack.NewDecoder(bytes.NewReader(data)) + for { + arr, err := d.DecodeInterface() + if err == io.EOF { + break + } else if err != nil { + return nil, err + } + + m, mapOk := arr.(map[string]interface{}) + if !mapOk { + return nil, fmt.Errorf("invalid input %v: %w", arr, ErrUnconvertibleRecord) + } + + if _, err = formatMapToArrowRecord(b, m); err != nil { + return nil, err + } } - return formatMapToArrowRecord(s.ArrowSchema, maps) + return NewWrappedRecord(b), nil } diff --git a/record/msgpack_test.go b/record/msgpack_test.go index 7206c35..84779db 100644 --- a/record/msgpack_test.go +++ b/record/msgpack_test.go @@ -5,6 +5,11 @@ import ( "errors" "reflect" "testing" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/reproio/columnify/schema" ) func TestFormatMsgpackToMap(t *testing.T) { @@ -71,3 +76,110 @@ func TestFormatMsgpackToMap(t *testing.T) { } } } + +func TestFormatMsgpackToArrow(t *testing.T) { + cases := []struct { + input []byte + schema *schema.IntermediateSchema + expected func(s *schema.IntermediateSchema) *WrappedRecord + err error + }{ + // Primitives + { + // examples/record/primitives.msgpack + input: bytes.Join([][]byte{ + []byte("\x87\xa7\x62\x6f\x6f\x6c\x65\x61\x6e\xc2\xa3\x69\x6e\x74\x01\xa4"), + []byte("\x6c\x6f\x6e\x67\x01\xa5\x66\x6c\x6f\x61\x74\xcb\x3f\xf1\x99\x99"), + []byte("\x99\x99\x99\x9a\xa6\x64\x6f\x75\x62\x6c\x65\xcb\x3f\xf1\x99\x99"), + []byte("\x99\x99\x99\x9a\xa5\x62\x79\x74\x65\x73\xa3\x66\x6f\x6f\xa6\x73"), + []byte("\x74\x72\x69\x6e\x67\xa3\x66\x6f\x6f\x87\xa7\x62\x6f\x6f\x6c\x65"), + []byte("\x61\x6e\xc3\xa3\x69\x6e\x74\x02\xa4\x6c\x6f\x6e\x67\x02\xa5\x66"), + []byte("\x6c\x6f\x61\x74\xcb\x40\x01\x99\x99\x99\x99\x99\x9a\xa6\x64\x6f"), + []byte("\x75\x62\x6c\x65\xcb\x40\x01\x99\x99\x99\x99\x99\x9a\xa5\x62\x79"), + []byte("\x74\x65\x73\xa3\x62\x61\x72\xa6\x73\x74\x72\x69\x6e\x67\xa3\x62"), + []byte("\x61\x72"), + }, []byte("")), + schema: schema.NewIntermediateSchema( + arrow.NewSchema( + []arrow.Field{ + { + Name: "boolean", + Type: arrow.FixedWidthTypes.Boolean, + Nullable: false, + }, + { + Name: "int", + Type: arrow.PrimitiveTypes.Uint32, + Nullable: false, + }, + { + Name: "long", + Type: arrow.PrimitiveTypes.Uint64, + Nullable: false, + }, + { + Name: "float", + Type: arrow.PrimitiveTypes.Float32, + Nullable: false, + }, + { + Name: "double", + Type: arrow.PrimitiveTypes.Float64, + Nullable: false, + }, + { + Name: "bytes", + Type: arrow.BinaryTypes.Binary, + Nullable: false, + }, + { + Name: "string", + Type: arrow.BinaryTypes.String, + Nullable: false, + }, + }, nil), + "primitives"), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + pool := memory.NewGoAllocator() + b := array.NewRecordBuilder(pool, s.ArrowSchema) + defer b.Release() + + b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) + b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) + b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) + b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) + b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) + b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) + b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) + + return NewWrappedRecord(b) + }, + err: nil, + }, + + // Not map type + { + input: []byte("\xa7compact"), + schema: schema.NewIntermediateSchema( + arrow.NewSchema([]arrow.Field{}, nil), + ""), + expected: func(s *schema.IntermediateSchema) *WrappedRecord { + return nil + }, + err: ErrUnconvertibleRecord, + }, + } + + for _, c := range cases { + actual, err := FormatMsgpackToArrow(c.schema, c.input) + + if !errors.Is(err, c.err) { + t.Errorf("expected: %v, but actual: %v\n", c.err, err) + } + + expectedRecord := c.expected(c.schema) + if !reflect.DeepEqual(actual, expectedRecord) { + t.Errorf("expected: %v, but actual: %v\n", expectedRecord, actual) + } + } +}