From e730345c08c6d9413de8240b7df9dddf1a6949d7 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Thu, 6 Aug 2020 19:44:01 +0900 Subject: [PATCH 1/6] Implement stream based record decoders instead of batch based --- columnifier/columnifier.go | 4 +-- columnifier/parquet.go | 28 ++++++++++++----- record/avro.go | 40 ++++++++++++++++++++++-- record/avro_test.go | 33 ++++++++++++-------- record/csv.go | 63 +++++++++++++++++++++++++++++++++++++- record/csv_test.go | 46 ++++++++++++---------------- record/jsonl.go | 24 +++++++++++++++ record/jsonl_test.go | 23 +++++++++++--- record/ltsv.go | 54 ++++++++++++++++++++++++++++++++ record/ltsv_test.go | 23 +++++++++++--- record/msgpack.go | 25 +++++++++++++++ record/msgpack_test.go | 22 ++++++++++--- record/record.go | 63 ++++++++++++++++++++++++++++++++++++++ record/record_test.go | 39 +++++++++++++++++++++++ 14 files changed, 424 insertions(+), 63 deletions(-) diff --git a/columnifier/columnifier.go b/columnifier/columnifier.go index 8d730d3..805cb1c 100644 --- a/columnifier/columnifier.go +++ b/columnifier/columnifier.go @@ -4,9 +4,9 @@ import "io" // Columnifier is the interface that converts input file to columnar format file. type Columnifier interface { - io.WriteCloser - + WriteFromReader(reader io.Reader) (int, error) WriteFromFiles(paths []string) (int, error) + Close() error } // NewColumnifier creates a new Columnifier. diff --git a/columnifier/parquet.go b/columnifier/parquet.go index 97f56a3..28f01b0 100644 --- a/columnifier/parquet.go +++ b/columnifier/parquet.go @@ -1,13 +1,16 @@ package columnifier import ( + "io" "io/ioutil" + "os" "github.com/reproio/columnify/record" "github.com/reproio/columnify/parquet" "github.com/reproio/columnify/schema" "github.com/xitongsys/parquet-go-source/local" + "github.com/xitongsys/parquet-go/marshal" parquetSource "github.com/xitongsys/parquet-go/source" "github.com/xitongsys/parquet-go/writer" ) @@ -65,17 +68,27 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi } // Write reads, converts input binary data and write it to buffer. -func (c *parquetColumnifier) Write(data []byte) (int, error) { +func (c *parquetColumnifier) WriteFromReader(reader io.Reader) (int, error) { // Intermediate record type is map[string]interface{} - c.w.MarshalFunc = parquet.MarshalMap - records, err := record.FormatToMap(data, c.schema, c.rt) + c.w.MarshalFunc = marshal.MarshalJSON + decoder, err := record.NewJsonDecoder(reader, c.schema, c.rt) if err != nil { return -1, err } beforeSize := c.w.Size - for _, r := range records { - if err := c.w.Write(r); err != nil { + for { + var v string + err = decoder.Decode(&v) + if err != nil { + if err == io.EOF { + break + } else { + return -1, err + } + } + + if err := c.w.Write(v); err != nil { return -1, err } } @@ -103,11 +116,12 @@ func (c *parquetColumnifier) WriteFromFiles(paths []string) (int, error) { var n int for _, p := range paths { - data, err := ioutil.ReadFile(p) + f, err := os.Open(p) if err != nil { return -1, err } - if n, err = c.Write(data); err != nil { + + if n, err = c.WriteFromReader(f); err != nil { return -1, err } } diff --git a/record/avro.go b/record/avro.go index 156bee7..2350ff2 100644 --- a/record/avro.go +++ b/record/avro.go @@ -3,12 +3,48 @@ package record import ( "bytes" "fmt" - - "github.com/reproio/columnify/schema" + "io" "github.com/linkedin/goavro/v2" + "github.com/reproio/columnify/schema" ) +type avroInnerDecoder struct { + r *goavro.OCFReader +} + +func newAvroInnerDecoder(r io.Reader) (*avroInnerDecoder, error) { + reader, err := goavro.NewOCFReader(r) + if err != nil { + return nil, err + } + + return &avroInnerDecoder{ + r: reader, + }, nil +} + +func (d *avroInnerDecoder) Decode(r *map[string]interface{}) error { + if d.r.Scan() { + v, err := d.r.Read() + if err != nil { + return err + } + + m, mapOk := v.(map[string]interface{}) + if !mapOk { + return fmt.Errorf("invalid value %v: %w", v, ErrUnconvertibleRecord) + } + + flatten := flattenAvroUnion(m) + *r = flatten + } else if d.r.RemainingBlockItems() == 0 { + return io.EOF + } + + return d.r.Err() +} + // flattenAvroUnion flattens nested map type has only 1 element. func flattenAvroUnion(in map[string]interface{}) map[string]interface{} { out := make(map[string]interface{}) diff --git a/record/avro_test.go b/record/avro_test.go index b68f9e2..b0f79aa 100644 --- a/record/avro_test.go +++ b/record/avro_test.go @@ -2,6 +2,7 @@ package record import ( "bytes" + "io" "reflect" "testing" @@ -35,11 +36,10 @@ func TestFlattenAvroUnion(t *testing.T) { } } -func TestFormatAvroToMap(t *testing.T) { +func TestAvroInnerDecoder_Decode(t *testing.T) { cases := []struct { input []byte expected []map[string]interface{} - isErr bool }{ { input: func() []byte { @@ -113,22 +113,29 @@ func TestFormatAvroToMap(t *testing.T) { "string": "bar", }, }, - isErr: false, - }, - - // Not avro - { - input: []byte("not-valid-avro"), - expected: nil, - isErr: true, }, } for _, c := range cases { - actual, err := FormatAvroToMap(c.input) + buf := bytes.NewReader(c.input) + d, err := newAvroInnerDecoder(buf) + if err != nil { + t.Fatal(err) + } + + actual := make([]map[string]interface{}, 0) + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } - if err != nil != c.isErr { - t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + if err != nil && err != io.EOF { + t.Errorf("expected no error or io.EOF, but actual: %v\n", err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/csv.go b/record/csv.go index a12ed71..af184f2 100644 --- a/record/csv.go +++ b/record/csv.go @@ -17,10 +17,71 @@ const ( TsvDelimiter delimiter = '\t' ) +type csvInnerDecoder struct { + r *csv.Reader + names []string +} + +func newCsvInnerDecoder(r io.Reader, s *schema.IntermediateSchema, delimiter delimiter) (*csvInnerDecoder, error) { + names, err := getFieldNamesFromSchema(s) + if err != nil { + return nil, err + } + + reader := csv.NewReader(r) + reader.Comma = rune(delimiter) + + return &csvInnerDecoder{ + r: reader, + names: names, + }, nil +} + +func (d *csvInnerDecoder) Decode(r *map[string]interface{}) error { + values, err := d.r.Read() + if err != nil { + return err + } + + if len(d.names) != len(values) { + return fmt.Errorf("incompleted value %v: %w", values, ErrUnconvertibleRecord) + } + + *r = make(map[string]interface{}) + for i, v := range values { + n := d.names[i] + + // bool + if v != "0" && v != "1" { + if vv, err := strconv.ParseBool(v); err == nil { + (*r)[n] = vv + continue + } + } + + // int + if vv, err := strconv.ParseInt(v, 10, 64); err == nil { + (*r)[n] = vv + continue + } + + // float + if vv, err := strconv.ParseFloat(v, 64); err == nil { + (*r)[n] = vv + continue + } + + // others; to string + (*r)[n] = v + } + + return nil +} + func getFieldNamesFromSchema(s *schema.IntermediateSchema) ([]string, error) { elems := s.ArrowSchema.Fields() - if len(elems) < 2 { + if len(elems) == 0 { return nil, fmt.Errorf("no element is available: %w", ErrUnconvertibleRecord) } diff --git a/record/csv_test.go b/record/csv_test.go index a75406f..d5577e7 100644 --- a/record/csv_test.go +++ b/record/csv_test.go @@ -1,6 +1,8 @@ package record import ( + "bytes" + "io" "reflect" "testing" @@ -8,7 +10,7 @@ import ( "github.com/reproio/columnify/schema" ) -func TestFormatCsvToMap(t *testing.T) { +func TestCsvInnerDecoder_Decode(t *testing.T) { cases := []struct { schema *schema.IntermediateSchema input []byte @@ -151,36 +153,28 @@ true 2 2 2.2 2.2 bar bar`), }, isErr: false, }, - - // Not csv - { - schema: schema.NewIntermediateSchema( - arrow.NewSchema([]arrow.Field{}, nil), - "primitives", - ), - input: []byte("not-valid-csv"), - expected: nil, - isErr: true, - }, - - // Not tsv - { - schema: schema.NewIntermediateSchema( - arrow.NewSchema([]arrow.Field{}, nil), - "primitives", - ), - input: []byte("not-valid-tsv"), - delimiter: TsvDelimiter, - expected: nil, - isErr: true, - }, } for _, c := range cases { - actual, err := FormatCsvToMap(c.schema, c.input, c.delimiter) + buf := bytes.NewReader(c.input) + d, err := newCsvInnerDecoder(buf, c.schema, c.delimiter) + if err != nil { + t.Fatal(err) + } + + actual := make([]map[string]interface{}, 0) + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } - if err != nil != c.isErr { + if (err != nil && err != io.EOF) != c.isErr { t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/jsonl.go b/record/jsonl.go index 48144cd..3e0736f 100644 --- a/record/jsonl.go +++ b/record/jsonl.go @@ -1,12 +1,36 @@ package record import ( + "bufio" "encoding/json" + "io" "strings" "github.com/reproio/columnify/schema" ) +type jsonlInnerDecoder struct { + s *bufio.Scanner +} + +func newJsonlInnerDecoder(r io.Reader) *jsonlInnerDecoder { + return &jsonlInnerDecoder{ + s: bufio.NewScanner(r), + } +} + +func (d *jsonlInnerDecoder) Decode(r *map[string]interface{}) error { + if d.s.Scan() { + if err := json.Unmarshal(d.s.Bytes(), r); err != nil { + return err + } + } else { + return io.EOF + } + + return d.s.Err() +} + func FormatJsonlToMap(data []byte) ([]map[string]interface{}, error) { lines := strings.Split(string(data), "\n") diff --git a/record/jsonl_test.go b/record/jsonl_test.go index db07a9e..569b716 100644 --- a/record/jsonl_test.go +++ b/record/jsonl_test.go @@ -1,11 +1,13 @@ package record import ( + "bytes" + "io" "reflect" "testing" ) -func TestFormatJsonlToMap(t *testing.T) { +func TestJsonlInnerDecoder_Decode(t *testing.T) { cases := []struct { input []byte expected []map[string]interface{} @@ -43,16 +45,29 @@ func TestFormatJsonlToMap(t *testing.T) { // Not JSONL { input: []byte("not-valid-json"), - expected: nil, + expected: []map[string]interface{}{}, isErr: true, }, } for _, c := range cases { - actual, err := FormatJsonlToMap(c.input) + buf := bytes.NewReader(c.input) + d := newJsonlInnerDecoder(buf) - if err != nil != c.isErr { + actual := make([]map[string]interface{}, 0) + var err error + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } + + if (err != nil && err != io.EOF) != c.isErr { t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/ltsv.go b/record/ltsv.go index c76c1d9..356884f 100644 --- a/record/ltsv.go +++ b/record/ltsv.go @@ -1,6 +1,8 @@ package record import ( + "bufio" + "io" "strconv" "strings" @@ -9,6 +11,58 @@ import ( "github.com/Songmu/go-ltsv" ) +type ltsvInnerDecoder struct { + s *bufio.Scanner +} + +func newLtsvInnerDecoder(r io.Reader) *ltsvInnerDecoder { + return <svInnerDecoder{ + s: bufio.NewScanner(r), + } +} + +func (d *ltsvInnerDecoder) Decode(r *map[string]interface{}) error { + if d.s.Scan() { + data := d.s.Bytes() + + m := map[string]string{} + err := ltsv.Unmarshal(data, &m) + if err != nil { + return err + } + + *r = make(map[string]interface{}) + for k, v := range m { + // bool + if v != "0" && v != "1" { + if vv, err := strconv.ParseBool(v); err == nil { + (*r)[k] = vv + continue + } + } + + // int + if vv, err := strconv.ParseInt(v, 10, 64); err == nil { + (*r)[k] = vv + continue + } + + // float + if vv, err := strconv.ParseFloat(v, 64); err == nil { + (*r)[k] = vv + continue + } + + // others; to string + (*r)[k] = v + } + } else { + return io.EOF + } + + return d.s.Err() +} + func FormatLtsvToMap(data []byte) ([]map[string]interface{}, error) { lines := strings.Split(string(data), "\n") diff --git a/record/ltsv_test.go b/record/ltsv_test.go index 3b0ff18..47a071a 100644 --- a/record/ltsv_test.go +++ b/record/ltsv_test.go @@ -1,11 +1,13 @@ package record import ( + "bytes" + "io" "reflect" "testing" ) -func TestFormatLtsvToMap(t *testing.T) { +func TestLtsvInnerDecoder_Decode(t *testing.T) { cases := []struct { input []byte expected []map[string]interface{} @@ -41,16 +43,29 @@ boolean:true int:2 long:2 float:2.2 double:2.2 bytes:bar string:bar`), // Not LTSV { input: []byte("not-valid-ltsv"), - expected: nil, + expected: []map[string]interface{}{}, isErr: true, }, } for _, c := range cases { - actual, err := FormatLtsvToMap(c.input) + buf := bytes.NewReader(c.input) + d := newLtsvInnerDecoder(buf) - if err != nil != c.isErr { + actual := make([]map[string]interface{}, 0) + var err error + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } + + if (err != nil && err != io.EOF) != c.isErr { t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/msgpack.go b/record/msgpack.go index 7200ed2..d45524d 100644 --- a/record/msgpack.go +++ b/record/msgpack.go @@ -10,6 +10,31 @@ import ( "github.com/vmihailenco/msgpack/v4" ) +type msgpackInnerDecoder struct { + d *msgpack.Decoder +} + +func newMsgpackInnerDecoder(r io.Reader) *msgpackInnerDecoder { + return &msgpackInnerDecoder{ + d: msgpack.NewDecoder(r), + } +} + +func (d *msgpackInnerDecoder) Decode(r *map[string]interface{}) error { + arr, err := d.d.DecodeInterface() + if err != nil { + return err + } + + m, mapOk := arr.(map[string]interface{}) + if !mapOk { + return fmt.Errorf("invalid input %v: %w", arr, ErrUnconvertibleRecord) + } + *r = m + + return nil +} + func FormatMsgpackToMap(data []byte) ([]map[string]interface{}, error) { d := msgpack.NewDecoder(bytes.NewReader(data)) diff --git a/record/msgpack_test.go b/record/msgpack_test.go index 7206c35..dc360af 100644 --- a/record/msgpack_test.go +++ b/record/msgpack_test.go @@ -3,11 +3,12 @@ package record import ( "bytes" "errors" + "io" "reflect" "testing" ) -func TestFormatMsgpackToMap(t *testing.T) { +func TestMsgpackInnnerDecoder_Decode(t *testing.T) { cases := []struct { input []byte expected []map[string]interface{} @@ -48,22 +49,35 @@ func TestFormatMsgpackToMap(t *testing.T) { "string": "bar", }, }, - err: nil, + err: io.EOF, }, // Not map type { input: []byte("\xa7compact"), - expected: nil, + expected: []map[string]interface{}{}, err: ErrUnconvertibleRecord, }, } for _, c := range cases { - actual, err := FormatMsgpackToMap(c.input) + buf := bytes.NewReader(c.input) + d := newMsgpackInnerDecoder(buf) + + actual := make([]map[string]interface{}, 0) + var err error + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } if !errors.Is(err, c.err) { t.Errorf("expected: %v, but actual: %v\n", c.err, err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/record.go b/record/record.go index 9941380..2db111d 100644 --- a/record/record.go +++ b/record/record.go @@ -1,8 +1,10 @@ package record import ( + "encoding/json" "errors" "fmt" + "io" "github.com/reproio/columnify/schema" ) @@ -21,6 +23,67 @@ var ( ErrUnconvertibleRecord = errors.New("input record is unable to convert") ) +// innerDecoder decodes data from given Reader to the intermediate representation. +type innerDecoder interface { + // Decode reads input data via Reader and extract it to the argument. + // If there is no data left to be read, Read returns nil, io.EOF. + Decode(r *map[string]interface{}) error +} + +// jsonDecoder decodes data with innerDecoder and returns JSON string value. +type jsonDecoder struct { + inner innerDecoder +} + +func NewJsonDecoder(r io.Reader, s *schema.IntermediateSchema, recordType string) (*jsonDecoder, error) { + var inner innerDecoder + var err error + + switch recordType { + case RecordTypeAvro: + inner, err = newAvroInnerDecoder(r) + + case RecordTypeCsv: + inner, err = newCsvInnerDecoder(r, s, CsvDelimiter) + + case RecordTypeJsonl: + inner = newJsonlInnerDecoder(r) + + case RecordTypeLtsv: + inner = newLtsvInnerDecoder(r) + + case RecordTypeMsgpack: + inner = newMsgpackInnerDecoder(r) + + case RecordTypeTsv: + inner, err = newCsvInnerDecoder(r, s, TsvDelimiter) + + default: + return nil, fmt.Errorf("unsupported record type %s: %w", recordType, ErrUnsupportedRecord) + } + + return &jsonDecoder{ + inner: inner, + }, err +} + +func (d *jsonDecoder) Decode(v *string) error { + var vv map[string]interface{} + + err := d.inner.Decode(&vv) + if err != nil { + return err + } + + data, err := json.Marshal(vv) + if err != nil { + return err + } + *v = string(data) + + return nil +} + func FormatToArrow(data []byte, s *schema.IntermediateSchema, recordType string) (*WrappedRecord, error) { switch recordType { case RecordTypeAvro: diff --git a/record/record_test.go b/record/record_test.go index d324d82..48461f2 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -1,12 +1,51 @@ package record import ( + "encoding/json" "errors" "testing" "github.com/reproio/columnify/schema" ) +type nopInnerDecoder struct { + r map[string]interface{} + err error +} + +func (d *nopInnerDecoder) Decode(r *map[string]interface{}) error { + *r = d.r + return d.err +} + +func TestJsonDecoder_Decode(t *testing.T) { + inner := &nopInnerDecoder{ + r: map[string]interface{}{ + "key1": 42, + "key2": "test", + }, + err: nil, + } + + d := jsonDecoder{ + inner: inner, + } + + var v string + err := d.Decode(&v) + if err != nil { + t.Errorf("expected no error, but actual: %v\n", err) + } + + data, err := json.Marshal(inner.r) + if err != nil { + t.Errorf("expected no error, but actual: %v\n", err) + } + if v != string(data) { + t.Errorf("expected: %v, but actual: %v\n", string(data), v) + } +} + func TestFormatToArrow(t *testing.T) { cases := []struct { input []byte From 5de7542b32d14aefa0777ae06ba2d1a26f9bcb2a Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Thu, 6 Aug 2020 22:50:55 +0900 Subject: [PATCH 2/6] Remove batch based formatters --- columnifier/parquet.go | 14 - parquet/doc.go | 12 - parquet/marshal_arrow.go | 266 --------------- parquet/marshal_arrow_test.go | 527 ------------------------------ parquet/marshal_map.go | 26 -- parquet/marshal_map_test.go | 588 ---------------------------------- parquet/parquet.go | 56 ---- parquet/stdio.go | 5 + record/arrow.go | 227 ------------- record/arrow_test.go | 446 -------------------------- record/avro.go | 34 -- record/csv.go | 66 ---- record/jsonl.go | 33 -- record/ltsv.go | 56 ---- record/msgpack.go | 35 -- record/record.go | 51 --- record/record_test.go | 55 ---- 17 files changed, 5 insertions(+), 2492 deletions(-) delete mode 100644 parquet/doc.go delete mode 100644 parquet/marshal_arrow.go delete mode 100644 parquet/marshal_arrow_test.go delete mode 100644 parquet/marshal_map.go delete mode 100644 parquet/marshal_map_test.go delete mode 100644 parquet/parquet.go delete mode 100644 record/arrow.go delete mode 100644 record/arrow_test.go diff --git a/columnifier/parquet.go b/columnifier/parquet.go index 28f01b0..4fff479 100644 --- a/columnifier/parquet.go +++ b/columnifier/parquet.go @@ -94,20 +94,6 @@ func (c *parquetColumnifier) WriteFromReader(reader io.Reader) (int, error) { } afterSize := c.w.Size - // Intermediate record type is wrapped Apache Arrow record - // It requires Arrow Golang implementation more logical type supports - // ref. https://github.com/apache/arrow/blob/9c9dc2012266442d0848e4af0cf52874bc4db151/go/arrow/array/builder.go#L211 - /* - c.w.MarshalFunc = parquet.MarshalArrow - records, err := record.FormatToArrow(data, c.schema, c.rt) - if err != nil { - return err - } - if err := c.w.Write(&records); err != nil { - return err - } - */ - return int(afterSize - beforeSize), nil } diff --git a/parquet/doc.go b/parquet/doc.go deleted file mode 100644 index d1559df..0000000 --- a/parquet/doc.go +++ /dev/null @@ -1,12 +0,0 @@ -/* - Package parquetgo is an utility and marshaler with go-friendly error handling for parquet-go. - https://github.com/xitongsys/parquet-go - - xitongsys/parquet-go provides simple, high-level API to convert to Parquet. - But provided features are limited (mainly it looks main users select Go struct or JSON ), - and the error handling is sometimes too simple (panic/recovery based). - - parquetgo package enriches these points for handling Arrow based data. - -*/ -package parquet diff --git a/parquet/marshal_arrow.go b/parquet/marshal_arrow.go deleted file mode 100644 index 42a45fb..0000000 --- a/parquet/marshal_arrow.go +++ /dev/null @@ -1,266 +0,0 @@ -package parquet - -import ( - "bytes" - "encoding/base64" - "fmt" - "reflect" - - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/reproio/columnify/record" - "github.com/xitongsys/parquet-go/common" - "github.com/xitongsys/parquet-go/layout" - "github.com/xitongsys/parquet-go/parquet" - "github.com/xitongsys/parquet-go/schema" - "github.com/xitongsys/parquet-go/types" -) - -// MarshalMap converts 1 arrow record to parquet tables. -func MarshalArrow(maybeRecord []interface{}, bgn int, end int, schemaHandler *schema.SchemaHandler) (*map[string]*layout.Table, error) { - // NOTE This marshaler expects record values aggregation has done before call - if len(maybeRecord) != 1 { - return nil, fmt.Errorf("size of records is invalid: %w", ErrInvalidParquetRecord) - } - - wrapped, recordOk := maybeRecord[0].(*record.WrappedRecord) - if !recordOk { - return nil, fmt.Errorf("unexpected input type %v: %w", reflect.TypeOf(maybeRecord[0]), ErrInvalidParquetRecord) - } - - return marshalArrowRecord(wrapped.Record, schemaHandler) -} - -func marshalArrowRecord(record array.Record, sh *schema.SchemaHandler) (*map[string]*layout.Table, error) { - tables, err := prepareTables(sh) - if err != nil { - return nil, err - } - - keys := make([]string, 0, len(record.Schema().Fields())) - for _, f := range record.Schema().Fields() { - keys = append(keys, common.HeadToUpper(f.Name)) - } - - for i, c := range record.Columns() { - childPathMap := sh.PathMap.Children[keys[i]] - data := c.Data() - tables, err = marshalArrowData(data, tables, sh, childPathMap, 0, 0) - if err != nil { - return nil, err - } - } - - return &tables, nil -} - -func marshalArrowData(data *array.Data, tables map[string]*layout.Table, sh *schema.SchemaHandler, pathMap *schema.PathMapType, rl int32, dl int32) (map[string]*layout.Table, error) { - pathStr := pathMap.Path - - var info *common.Tag - if i, ok := sh.MapIndex[pathStr]; ok { - info = sh.Infos[i] - } else { - return nil, fmt.Errorf("schema not found to path %v: %w", pathStr, ErrInvalidParquetSchema) - } - - switch data.DataType().ID() { - case arrow.BOOL: - values := array.NewBooleanData(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.UINT32: - values := array.NewUint32Data(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.UINT64: - values := array.NewUint64Data(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.FLOAT32: - values := array.NewFloat32Data(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.FLOAT64: - values := array.NewFloat64Data(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.STRING: - values := array.NewStringData(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.BINARY: - values := array.NewBinaryData(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.STRUCT: - values := array.NewStructData(data) - st, stOk := values.DataType().(*arrow.StructType) - if !stOk { - return nil, fmt.Errorf("unsupported data type %v: %w", values.DataType(), ErrInvalidParquetRecord) - } - keys := make([]string, 0, len(st.Fields())) - for _, f := range st.Fields() { - keys = append(keys, common.HeadToUpper(f.Name)) - } - deltaDl := int32(0) - if info.RepetitionType == parquet.FieldRepetitionType_OPTIONAL { - deltaDl = 1 - } - for i := 0; i < values.NumField(); i++ { - childPathMap := pathMap.Children[keys[i]] - data := values.Field(i).Data() - var err error - tables, err = marshalArrowData(data, tables, sh, childPathMap, rl, dl+deltaDl) - if err != nil { - return nil, err - } - } - - case arrow.LIST: - values := array.NewListData(data) - for i := 0; i < values.Len(); i++ { - o := i + values.Offset() - bgn := int64(values.Offsets()[o]) - end := int64(values.Offsets()[o+1]) - slice := array.NewSlice(values.ListValues(), bgn, end) - - // first - if slice.Len() > 0 { - first := array.NewSlice(slice, 0, 1) - var err error - tables, err = marshalArrowData(first.Data(), tables, sh, pathMap, rl, dl+1) - if err != nil { - return nil, err - } - } - - // repeated; repetition level += max repetition level - if slice.Len() > 1 { - repeated := array.NewSlice(slice, 1, int64(slice.Len())) - maxRl, err := sh.MaxRepetitionLevel(common.StrToPath(pathStr)) - if err != nil { - return nil, err - } - tables, err = marshalArrowData(repeated.Data(), tables, sh, pathMap, rl+maxRl, dl+1) - if err != nil { - return nil, err - } - - } - } - - default: - return nil, fmt.Errorf("unsupported type %v: %w", data.DataType(), ErrInvalidParquetRecord) - } - - return tables, nil -} - -func arrowPrimitiveToDataPageSource(value interface{}, isValid bool, info *common.Tag) (interface{}, int32, error) { - switch info.RepetitionType { - case parquet.FieldRepetitionType_REQUIRED: - if isValid { - if v, err := formatArrowPrimitive(value, info); err != nil { - return nil, -1, err - } else { - return v, 0, nil - } - } else { - return nil, -1, fmt.Errorf("null for required field %v: %w", info, ErrInvalidParquetRecord) - } - case parquet.FieldRepetitionType_OPTIONAL: - if isValid { - if v, err := formatArrowPrimitive(value, info); err != nil { - return nil, -1, err - } else { - return v, 1, nil - } - } else { - return nil, 0, nil - } - default: - return nil, -1, fmt.Errorf("invalid field repetition type for %v: %w", info, ErrInvalidParquetRecord) - } -} - -func formatArrowPrimitive(value interface{}, info *common.Tag) (interface{}, error) { - pT, cT := types.TypeNameToParquetType(info.Type, info.BaseType) - - var s string - if (*pT == parquet.Type_BYTE_ARRAY || *pT == parquet.Type_FIXED_LEN_BYTE_ARRAY) && cT == nil { - bin, binOk := value.([]byte) - if !binOk { - return nil, fmt.Errorf("%v is not []byte: %w", value, ErrInvalidParquetRecord) - } - - var buf bytes.Buffer - encoder := base64.NewEncoder(base64.StdEncoding, &buf) - defer func() { _ = encoder.Close() }() - - if _, err := encoder.Write(bin); err != nil { - return nil, err - } - s = buf.String() - } else { - s = fmt.Sprintf("%v", value) - } - - return types.StrToParquetType(s, pT, cT, int(info.Length), int(info.Scale)), nil -} diff --git a/parquet/marshal_arrow_test.go b/parquet/marshal_arrow_test.go deleted file mode 100644 index d82da5d..0000000 --- a/parquet/marshal_arrow_test.go +++ /dev/null @@ -1,527 +0,0 @@ -package parquet - -import ( - "reflect" - "testing" - - "github.com/reproio/columnify/record" - "github.com/reproio/columnify/schema" - "github.com/xitongsys/parquet-go/layout" - - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" -) - -func TestNewArrowSchemaFromAvroSchema(t *testing.T) { - cases := []struct { - input func(s *schema.IntermediateSchema) []interface{} - schema *schema.IntermediateSchema - expect *map[string]*layout.Table - err error - }{ - // Only primitives - { - input: func(s *schema.IntermediateSchema) []interface{} { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return []interface{}{record.NewWrappedRecord(b)} - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }, nil), - "primitives"), - expect: &map[string]*layout.Table{ - "Primitives.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - }, - err: nil, - }, - - // Nested - { - input: func(s *schema.IntermediateSchema) []interface{} { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - sb := b.Field(7).(*array.StructBuilder) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return []interface{}{record.NewWrappedRecord(b)} - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "record", - Type: arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - ), - Nullable: false, - }, - }, - nil), - "nested"), - expect: &map[string]*layout.Table{ - "Nested.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - }, - err: nil, - }, - - // Array - { - input: func(s *schema.IntermediateSchema) []interface{} { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - lb := b.Field(7).(*array.ListBuilder) - sb := lb.ValueBuilder().(*array.StructBuilder) - lb.Append(true) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - lb.Append(true) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return []interface{}{record.NewWrappedRecord(b)} - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "array", - Type: arrow.ListOf( - arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - )), - Nullable: false, - }, - }, nil), - "arrays"), - expect: &map[string]*layout.Table{ - "Arrays.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Array.Boolean": { - Values: []interface{}{false, true, false, true}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Int": { - Values: []interface{}{int32(1), int32(2), int32(1), int32(2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Long": { - Values: []interface{}{int64(1), int64(2), int64(1), int64(2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Float": { - Values: []interface{}{float32(1.1), float32(2.2), float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Double": { - Values: []interface{}{float64(1.1), float64(2.2), float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t), base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.String": { - Values: []interface{}{"foo", "bar", "foo", "bar"}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - }, - err: nil, - }, - } - - for _, c := range cases { - sh, err := schema.NewSchemaHandlerFromArrow(*c.schema) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - tables, err := MarshalArrow(c.input(c.schema), 0, 1, sh) - - if err != c.err { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } - - for k, v := range *c.expect { - actual := (*tables)[k] - - if !reflect.DeepEqual(actual.Values, v.Values) { - t.Errorf("expected: %v, but actual: %v\n", v.Values, actual.Values) - } - - if !reflect.DeepEqual(actual.DefinitionLevels, v.DefinitionLevels) { - t.Errorf("expected: %v, but actual: %v\n", v.DefinitionLevels, actual.DefinitionLevels) - } - - if !reflect.DeepEqual(actual.RepetitionLevels, v.RepetitionLevels) { - t.Errorf("expected: %v, but actual: %v\n", v.RepetitionLevels, actual.RepetitionLevels) - } - } - } -} diff --git a/parquet/marshal_map.go b/parquet/marshal_map.go deleted file mode 100644 index 7f1a55f..0000000 --- a/parquet/marshal_map.go +++ /dev/null @@ -1,26 +0,0 @@ -package parquet - -import ( - "encoding/json" - - "github.com/xitongsys/parquet-go/layout" - "github.com/xitongsys/parquet-go/marshal" - "github.com/xitongsys/parquet-go/schema" -) - -// MarshalMap converts []map[string]interface{} to parquet tables. -func MarshalMap(sources []interface{}, bgn int, end int, schemaHandler *schema.SchemaHandler) (*map[string]*layout.Table, error) { - jsons := make([]interface{}, 0, end-bgn) - - for _, d := range sources[bgn:end] { - e, err := json.Marshal(d) - if err != nil { - return nil, err - } - jsons = append(jsons, string(e)) - } - - // NOTE: reuse existing JSON marshaler. Implementing it ourselves is high cost - // NOTE: it requires redundant map -> json -> map conversions - return marshal.MarshalJSON(jsons, bgn, end, schemaHandler) -} diff --git a/parquet/marshal_map_test.go b/parquet/marshal_map_test.go deleted file mode 100644 index f1005b8..0000000 --- a/parquet/marshal_map_test.go +++ /dev/null @@ -1,588 +0,0 @@ -package parquet - -import ( - "bytes" - "encoding/base64" - "reflect" - "testing" - - "github.com/apache/arrow/go/arrow" - "github.com/reproio/columnify/schema" - "github.com/xitongsys/parquet-go/layout" -) - -func base64Str(d []byte, t *testing.T) string { - var buf bytes.Buffer - encoder := base64.NewEncoder(base64.StdEncoding, &buf) - - _, err := encoder.Write(d) - if err != nil { - t.Fatalf("invalid test case: %v", err) - } - - err = encoder.Close() - if err != nil { - t.Fatalf("invalid test case: %v", err) - } - - return buf.String() -} - -func TestMarshalMap(t *testing.T) { - cases := []struct { - input []interface{} - bgn int - end int - schema *schema.IntermediateSchema - expect *map[string]*layout.Table - err error - }{ - // Only primitives - { - input: []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - }, - }, - bgn: 0, - end: 2, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }, nil), - "primitives"), - expect: &map[string]*layout.Table{ - "Primitives.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - }, - err: nil, - }, - - // Nested - { - input: []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - "record": map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - }, - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - "record": map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - }, - }, - }, - bgn: 0, - end: 2, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - }, - { - Name: "record", - Type: arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - ), - Nullable: false, - }, - }, - nil), - "nested"), - expect: &map[string]*layout.Table{ - "Nested.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - }, - err: nil, - }, - - // Array - { - input: []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - "array": []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - }, - }, - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - "array": []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - }, - }, - }, - }, - bgn: 0, - end: 2, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "array", - Type: arrow.ListOf( - arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - )), - Nullable: false, - }, - }, nil), - "arrays"), - expect: &map[string]*layout.Table{ - "Arrays.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Array.Boolean": { - Values: []interface{}{false, true, false, true}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Int": { - Values: []interface{}{int32(1), int32(2), int32(1), int32(2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Long": { - Values: []interface{}{int64(1), int64(2), int64(1), int64(2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Float": { - Values: []interface{}{float32(1.1), float32(2.2), float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Double": { - Values: []interface{}{float64(1.1), float64(2.2), float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t), base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.String": { - Values: []interface{}{"foo", "bar", "foo", "bar"}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - }, - err: nil, - }, - } - - for _, c := range cases { - sh, err := schema.NewSchemaHandlerFromArrow(*c.schema) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - tables, err := MarshalMap(c.input, c.bgn, c.end, sh) - if err != c.err { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } - - for k, v := range *c.expect { - actual := (*tables)[k] - - if !reflect.DeepEqual(actual.Values, v.Values) { - t.Errorf("values: expected: %v, but actual: %v\n", v.Values, actual.Values) - } - - if !reflect.DeepEqual(actual.DefinitionLevels, v.DefinitionLevels) { - t.Errorf("definition levels: expected: %v, but actual: %v\n", v.DefinitionLevels, actual.DefinitionLevels) - } - if !reflect.DeepEqual(actual.RepetitionLevels, v.RepetitionLevels) { - t.Errorf("repetition levels: expected: %v, but actual: %v\n", v.RepetitionLevels, actual.RepetitionLevels) - } - } - } -} diff --git a/parquet/parquet.go b/parquet/parquet.go deleted file mode 100644 index cba88ca..0000000 --- a/parquet/parquet.go +++ /dev/null @@ -1,56 +0,0 @@ -package parquet - -import ( - "errors" - "fmt" - - "github.com/xitongsys/parquet-go/common" - "github.com/xitongsys/parquet-go/layout" - "github.com/xitongsys/parquet-go/schema" -) - -var ( - ErrInvalidParquetSchema = errors.New("invalid parquet schema") - ErrInvalidParquetRecord = errors.New("invalid parquet record") - ErrUnsupportedMethod = errors.New("unsupported method") -) - -// prepareTables returns tables from fields(non record) in schema elements. -func prepareTables(schemaHandler *schema.SchemaHandler) (map[string]*layout.Table, error) { - numSchemaElements := len(schemaHandler.SchemaElements) - if len(schemaHandler.Infos) != numSchemaElements { - return nil, fmt.Errorf("sizes of SchemaElement and Infos don't match: %w", ErrInvalidParquetSchema) - } - if len(schemaHandler.MapIndex) != numSchemaElements { - return nil, fmt.Errorf("sizes of SchemaElement and MapIndex don't match: %w", ErrInvalidParquetSchema) - } - - tables := make(map[string]*layout.Table) - for i, e := range schemaHandler.SchemaElements { - if e.GetNumChildren() == 0 { // fields(non record) - pathStr := schemaHandler.IndexMap[int32(i)] - path := common.StrToPath(pathStr) - - maxDefinitionLevel, err := schemaHandler.MaxDefinitionLevel(path) - if err != nil { - return nil, err - } - - maxRepetitionLevel, err := schemaHandler.MaxRepetitionLevel(path) - if err != nil { - return nil, err - } - - tables[pathStr] = &layout.Table{ - Path: path, - MaxDefinitionLevel: maxDefinitionLevel, - MaxRepetitionLevel: maxRepetitionLevel, - RepetitionType: e.GetRepetitionType(), - Schema: schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]], - Info: schemaHandler.Infos[i], - } - } - } - - return tables, nil -} diff --git a/parquet/stdio.go b/parquet/stdio.go index 90895e5..9ca5b9f 100644 --- a/parquet/stdio.go +++ b/parquet/stdio.go @@ -1,6 +1,7 @@ package parquet import ( + "errors" "fmt" "io" "os" @@ -8,6 +9,10 @@ import ( "github.com/xitongsys/parquet-go/source" ) +var ( + ErrUnsupportedMethod = errors.New("unsupported method") +) + // stdioFile is an implementation of ParquetFile, just writing data to stdout. type stdioFile struct { in io.ReadCloser diff --git a/record/arrow.go b/record/arrow.go deleted file mode 100644 index a5647ea..0000000 --- a/record/arrow.go +++ /dev/null @@ -1,227 +0,0 @@ -package record - -import ( - "fmt" - - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" -) - -type WrappedRecord struct { - Record array.Record -} - -func NewWrappedRecord(b *array.RecordBuilder) *WrappedRecord { - return &WrappedRecord{ - Record: b.NewRecord(), - } -} - -func formatMapToArrowRecord(s *arrow.Schema, maps []map[string]interface{}) (*WrappedRecord, error) { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s) - defer b.Release() - - for _, m := range maps { - for i, f := range s.Fields() { - if v, ok := m[f.Name]; ok { - if _, err := formatMapToArrowField(b.Field(i), f.Type, v); err != nil { - return nil, err - } - } else { - b.Field(i).AppendNull() - } - } - } - - return NewWrappedRecord(b), nil -} - -func formatMapToArrowStruct(b *array.StructBuilder, s *arrow.StructType, m map[string]interface{}) (*array.StructBuilder, error) { - for i, f := range s.Fields() { - if v, ok := m[f.Name]; ok { - if _, err := formatMapToArrowField(b.FieldBuilder(i), f.Type, v); err != nil { - return nil, err - } - } else { - b.FieldBuilder(i).AppendNull() - } - - } - - return b, nil -} - -func formatMapToArrowList(b *array.ListBuilder, l *arrow.ListType, list []interface{}) (*array.ListBuilder, error) { - for _, e := range list { - if _, err := formatMapToArrowField(b.ValueBuilder(), l.Elem(), e); err != nil { - return nil, err - } - } - - return b, nil -} - -func formatMapToArrowField(b array.Builder, t arrow.DataType, v interface{}) (array.Builder, error) { - switch t.ID() { - case arrow.BOOL: - vb, builderOk := b.(*array.BooleanBuilder) - vv, valueOk := v.(bool) - if builderOk && valueOk { - vb.Append(vv) - } else { - return nil, fmt.Errorf("unexpected input %v as bool: %w", v, ErrUnconvertibleRecord) - } - - case arrow.UINT32: - vb, builderOk := b.(*array.Uint32Builder) - if !builderOk { - return nil, fmt.Errorf("builder %v is wrong: %w", v, ErrUnconvertibleRecord) - } - switch vv := v.(type) { - case int: - vb.Append(uint32(vv)) - case int8: - vb.Append(uint32(vv)) - case int16: - vb.Append(uint32(vv)) - case int32: - vb.Append(uint32(vv)) - case int64: - vb.Append(uint32(vv)) - case uint: - vb.Append(uint32(vv)) - case uint8: - vb.Append(uint32(vv)) - case uint16: - vb.Append(uint32(vv)) - case uint32: - vb.Append(uint32(vv)) - case uint64: - vb.Append(uint32(vv)) - case float64: - vb.Append(uint32(vv)) - default: - return nil, fmt.Errorf("unexpected input %v as uint32: %w", v, ErrUnconvertibleRecord) - } - - case arrow.UINT64: - vb, builderOk := b.(*array.Uint64Builder) - if !builderOk { - return nil, fmt.Errorf("builder %v is wrong: %w", v, ErrUnconvertibleRecord) - } - switch vv := v.(type) { - case int8: - vb.Append(uint64(vv)) - case int16: - vb.Append(uint64(vv)) - case int32: - vb.Append(uint64(vv)) - case int64: - vb.Append(uint64(vv)) - case uint8: - vb.Append(uint64(vv)) - case uint16: - vb.Append(uint64(vv)) - case uint32: - vb.Append(uint64(vv)) - case uint64: - vb.Append(uint64(vv)) - case float64: - vb.Append(uint64(vv)) - default: - return nil, fmt.Errorf("unexpected input %v as uint64: %w", v, ErrUnconvertibleRecord) - } - - case arrow.FLOAT32: - vb, builderOk := b.(*array.Float32Builder) - if !builderOk { - return nil, fmt.Errorf("builder %v is wrong: %w", v, ErrUnconvertibleRecord) - } - switch vv := v.(type) { - case float32: - vb.Append(float32(vv)) - case float64: - vb.Append(float32(vv)) - default: - return nil, fmt.Errorf("unexpected input %v as float32: %w", v, ErrUnconvertibleRecord) - } - - case arrow.FLOAT64: - vb, builderOk := b.(*array.Float64Builder) - vv, valueOk := v.(float64) - if builderOk && valueOk { - vb.Append(vv) - } else { - return nil, fmt.Errorf("unexpected input %v as float64: %w", v, ErrUnconvertibleRecord) - } - - case arrow.STRING: - vb, builderOk := b.(*array.StringBuilder) - vv, valueOk := v.(string) - if builderOk && valueOk { - vb.Append(vv) - } else { - return nil, fmt.Errorf("unexpected input %v as string: %w", v, ErrUnconvertibleRecord) - } - - case arrow.BINARY: - vb, builderOk := b.(*array.BinaryBuilder) - if !builderOk { - return nil, fmt.Errorf("builder %v is wrong: %w", v, ErrUnconvertibleRecord) - } - switch vv := v.(type) { - case string: - vb.Append([]byte(vv)) - case []byte: - vb.Append(vv) - default: - return nil, fmt.Errorf("unexpected input %v as binary: %w", v, ErrUnconvertibleRecord) - } - - case arrow.STRUCT: - vb, builderOk := b.(*array.StructBuilder) - st, structOk := t.(*arrow.StructType) - if builderOk && structOk { - if v != nil { - vb.Append(true) - vv, valueOk := v.(map[string]interface{}) - if !valueOk { - return nil, fmt.Errorf("unexpected input %v as struct: %w", v, ErrUnconvertibleRecord) - } else if _, err := formatMapToArrowStruct(vb, st, vv); err != nil { - return nil, err - } - } else { - vb.Append(false) - } - } else { - return nil, fmt.Errorf("unexpected input %v as struct: %w", v, ErrUnconvertibleRecord) - } - - case arrow.LIST: - vb, builderOk := b.(*array.ListBuilder) - lt, listOk := t.(*arrow.ListType) - if builderOk && listOk { - if v != nil { - vb.Append(true) - vv, valueOk := v.([]interface{}) - if !valueOk { - return nil, fmt.Errorf("unexpected input %v as list: %w", v, ErrUnconvertibleRecord) - } - if _, err := formatMapToArrowList(vb, lt, vv); err != nil { - return nil, err - } - } else { - vb.Append(false) - } - } else { - return nil, fmt.Errorf("unexpected input %v as list: %w", v, ErrUnconvertibleRecord) - } - - default: - return nil, fmt.Errorf("unconvertable type %v: %w", t.ID(), ErrUnconvertibleRecord) - } - - return b, nil -} diff --git a/record/arrow_test.go b/record/arrow_test.go deleted file mode 100644 index 02eda1d..0000000 --- a/record/arrow_test.go +++ /dev/null @@ -1,446 +0,0 @@ -package record - -import ( - "reflect" - "testing" - - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" - "github.com/reproio/columnify/schema" -) - -func TestNewArrowSchemaFromAvroSchema(t *testing.T) { - cases := []struct { - input []map[string]interface{} - schema *schema.IntermediateSchema - expected func(s *schema.IntermediateSchema) *WrappedRecord - err error - }{ - // Primitives - { - input: []map[string]interface{}{ - { - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - }, - { - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - }, - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }, nil), - "primitives"), - expected: func(s *schema.IntermediateSchema) *WrappedRecord { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return NewWrappedRecord(b) - }, - err: nil, - }, - - // Nested - { - input: []map[string]interface{}{ - { - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - "record": map[string]interface{}{ - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - }, - }, - { - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - "record": map[string]interface{}{ - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - }, - }, - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "record", - Type: arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - ), - Nullable: false, - }, - }, - nil), - "nested"), - expected: func(s *schema.IntermediateSchema) *WrappedRecord { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - sb := b.Field(7).(*array.StructBuilder) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return NewWrappedRecord(b) - }, - err: nil, - }, - - // Array - { - input: []map[string]interface{}{ - { - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - "array": []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - }, - }, - }, - { - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - "array": []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - }, - }, - }, - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "array", - Type: arrow.ListOf( - arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - )), - Nullable: false, - }, - }, nil), - "arrays"), - expected: func(s *schema.IntermediateSchema) *WrappedRecord { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - lb := b.Field(7).(*array.ListBuilder) - sb := lb.ValueBuilder().(*array.StructBuilder) - lb.Append(true) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - lb.Append(true) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return NewWrappedRecord(b) - }, - err: nil, - }, - } - - for _, c := range cases { - expectedRecord := c.expected(c.schema) - - actual, err := formatMapToArrowRecord(c.schema.ArrowSchema, c.input) - - if err != c.err { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } - - if !reflect.DeepEqual(actual, expectedRecord) { - t.Errorf("values: expected: %v, but actual: %v\n", expectedRecord, actual) - } - } -} diff --git a/record/avro.go b/record/avro.go index 2350ff2..97e7ca4 100644 --- a/record/avro.go +++ b/record/avro.go @@ -1,12 +1,10 @@ package record import ( - "bytes" "fmt" "io" "github.com/linkedin/goavro/v2" - "github.com/reproio/columnify/schema" ) type avroInnerDecoder struct { @@ -68,35 +66,3 @@ func flattenAvroUnion(in map[string]interface{}) map[string]interface{} { return out } - -func FormatAvroToMap(data []byte) ([]map[string]interface{}, error) { - r, err := goavro.NewOCFReader(bytes.NewReader(data)) - if err != nil { - return nil, err - } - - maps := make([]map[string]interface{}, 0) - for r.Scan() { - v, err := r.Read() - if err != nil { - return nil, err - } - m, mapOk := v.(map[string]interface{}) - if !mapOk { - return nil, fmt.Errorf("invalid value %v: %w", v, ErrUnconvertibleRecord) - } - flatten := flattenAvroUnion(m) - maps = append(maps, flatten) - } - - return maps, nil -} - -func FormatAvroToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatAvroToMap(data) - if err != nil { - return nil, err - } - - return formatMapToArrowRecord(s.ArrowSchema, maps) -} diff --git a/record/csv.go b/record/csv.go index af184f2..c38c6e0 100644 --- a/record/csv.go +++ b/record/csv.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "strconv" - "strings" "github.com/reproio/columnify/schema" ) @@ -92,68 +91,3 @@ func getFieldNamesFromSchema(s *schema.IntermediateSchema) ([]string, error) { return names, nil } - -func FormatCsvToMap(s *schema.IntermediateSchema, data []byte, delimiter delimiter) ([]map[string]interface{}, error) { - names, err := getFieldNamesFromSchema(s) - if err != nil { - return nil, err - } - - reader := csv.NewReader(strings.NewReader(string(data))) - reader.Comma = rune(delimiter) - - numFields := len(names) - arr := make([]map[string]interface{}, 0) - for { - values, err := reader.Read() - if err == io.EOF { - break - } - if err != nil { - return nil, err - } - - if numFields != len(values) { - return nil, fmt.Errorf("incompleted value %v: %w", values, ErrUnconvertibleRecord) - } - - e := make(map[string]interface{}) - for i, v := range values { - // bool - if v != "0" && v != "1" { - if vv, err := strconv.ParseBool(v); err == nil { - e[names[i]] = vv - continue - } - } - - // int - if vv, err := strconv.ParseInt(v, 10, 64); err == nil { - e[names[i]] = vv - continue - } - - // float - if vv, err := strconv.ParseFloat(v, 64); err == nil { - e[names[i]] = vv - continue - } - - // others; to string - e[names[i]] = v - } - - arr = append(arr, e) - } - - return arr, nil -} - -func FormatCsvToArrow(s *schema.IntermediateSchema, data []byte, delimiter delimiter) (*WrappedRecord, error) { - maps, err := FormatCsvToMap(s, data, delimiter) - if err != nil { - return nil, err - } - - return formatMapToArrowRecord(s.ArrowSchema, maps) -} diff --git a/record/jsonl.go b/record/jsonl.go index 3e0736f..323feb6 100644 --- a/record/jsonl.go +++ b/record/jsonl.go @@ -4,9 +4,6 @@ import ( "bufio" "encoding/json" "io" - "strings" - - "github.com/reproio/columnify/schema" ) type jsonlInnerDecoder struct { @@ -30,33 +27,3 @@ func (d *jsonlInnerDecoder) Decode(r *map[string]interface{}) error { return d.s.Err() } - -func FormatJsonlToMap(data []byte) ([]map[string]interface{}, error) { - lines := strings.Split(string(data), "\n") - - records := make([]map[string]interface{}, 0) - for _, l := range lines { - if l == "" { - // skip blank line - continue - } - - var e map[string]interface{} - if err := json.Unmarshal([]byte(l), &e); err != nil { - return nil, err - } - - records = append(records, e) - } - - return records, nil -} - -func FormatJsonlToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatJsonlToMap(data) - if err != nil { - return nil, err - } - - return formatMapToArrowRecord(s.ArrowSchema, maps) -} diff --git a/record/ltsv.go b/record/ltsv.go index 356884f..ac4a93c 100644 --- a/record/ltsv.go +++ b/record/ltsv.go @@ -4,9 +4,6 @@ import ( "bufio" "io" "strconv" - "strings" - - "github.com/reproio/columnify/schema" "github.com/Songmu/go-ltsv" ) @@ -62,56 +59,3 @@ func (d *ltsvInnerDecoder) Decode(r *map[string]interface{}) error { return d.s.Err() } - -func FormatLtsvToMap(data []byte) ([]map[string]interface{}, error) { - lines := strings.Split(string(data), "\n") - - records := make([]map[string]interface{}, 0) - for _, l := range lines { - v := map[string]string{} - - err := ltsv.Unmarshal([]byte(l), &v) - if err != nil { - return nil, err - } - - m := make(map[string]interface{}) - for k, v := range v { - // bool - if v != "0" && v != "1" { - if vv, err := strconv.ParseBool(v); err == nil { - m[k] = vv - continue - } - } - - // int - if vv, err := strconv.ParseInt(v, 10, 64); err == nil { - m[k] = vv - continue - } - - // float - if vv, err := strconv.ParseFloat(v, 64); err == nil { - m[k] = vv - continue - } - - // others; to string - m[k] = v - } - - records = append(records, m) - } - - return records, nil -} - -func FormatLtsvToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatLtsvToMap(data) - if err != nil { - return nil, err - } - - return formatMapToArrowRecord(s.ArrowSchema, maps) -} diff --git a/record/msgpack.go b/record/msgpack.go index d45524d..3f1ff31 100644 --- a/record/msgpack.go +++ b/record/msgpack.go @@ -1,12 +1,9 @@ package record import ( - "bytes" "fmt" "io" - "github.com/reproio/columnify/schema" - "github.com/vmihailenco/msgpack/v4" ) @@ -34,35 +31,3 @@ func (d *msgpackInnerDecoder) Decode(r *map[string]interface{}) error { return nil } - -func FormatMsgpackToMap(data []byte) ([]map[string]interface{}, error) { - d := msgpack.NewDecoder(bytes.NewReader(data)) - - maps := make([]map[string]interface{}, 0) - for { - arr, err := d.DecodeInterface() - if err == io.EOF { - break - } else if err != nil { - return nil, err - } - - m, mapOk := arr.(map[string]interface{}) - if !mapOk { - return nil, fmt.Errorf("invalid input %v: %w", arr, ErrUnconvertibleRecord) - } - - maps = append(maps, m) - } - - return maps, nil -} - -func FormatMsgpackToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatMsgpackToMap(data) - if err != nil { - return nil, err - } - - return formatMapToArrowRecord(s.ArrowSchema, maps) -} diff --git a/record/record.go b/record/record.go index 2db111d..34846ad 100644 --- a/record/record.go +++ b/record/record.go @@ -83,54 +83,3 @@ func (d *jsonDecoder) Decode(v *string) error { return nil } - -func FormatToArrow(data []byte, s *schema.IntermediateSchema, recordType string) (*WrappedRecord, error) { - switch recordType { - case RecordTypeAvro: - return FormatAvroToArrow(s, data) - - case RecordTypeCsv: - return FormatCsvToArrow(s, data, CsvDelimiter) - - case RecordTypeJsonl: - return FormatJsonlToArrow(s, data) - - case RecordTypeLtsv: - return FormatLtsvToArrow(s, data) - - case RecordTypeMsgpack: - return FormatMsgpackToArrow(s, data) - - case RecordTypeTsv: - return FormatCsvToArrow(s, data, TsvDelimiter) - - default: - return nil, fmt.Errorf("unsupported record type %s: %w", recordType, ErrUnsupportedRecord) - } -} - -// FormatToMap converts input data to map based data with given schema. -func FormatToMap(data []byte, s *schema.IntermediateSchema, recordType string) ([]map[string]interface{}, error) { - switch recordType { - case RecordTypeAvro: - return FormatAvroToMap(data) - - case RecordTypeCsv: - return FormatCsvToMap(s, data, CsvDelimiter) - - case RecordTypeJsonl: - return FormatJsonlToMap(data) - - case RecordTypeLtsv: - return FormatLtsvToMap(data) - - case RecordTypeMsgpack: - return FormatMsgpackToMap(data) - - case RecordTypeTsv: - return FormatCsvToMap(s, data, TsvDelimiter) - - default: - return nil, fmt.Errorf("unsupported record type %s: %w", recordType, ErrUnsupportedRecord) - } -} diff --git a/record/record_test.go b/record/record_test.go index 48461f2..d892afa 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -2,10 +2,7 @@ package record import ( "encoding/json" - "errors" "testing" - - "github.com/reproio/columnify/schema" ) type nopInnerDecoder struct { @@ -45,55 +42,3 @@ func TestJsonDecoder_Decode(t *testing.T) { t.Errorf("expected: %v, but actual: %v\n", string(data), v) } } - -func TestFormatToArrow(t *testing.T) { - cases := []struct { - input []byte - schema *schema.IntermediateSchema - recordType string - err error - }{ - // TODO valid cases - - { - input: nil, - schema: nil, - recordType: "Unknown", - err: ErrUnsupportedRecord, - }, - } - - for _, c := range cases { - _, err := FormatToArrow(c.input, c.schema, c.recordType) - - if !errors.Is(err, c.err) { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } - } -} - -func TestFormatToMap(t *testing.T) { - cases := []struct { - input []byte - schema *schema.IntermediateSchema - recordType string - err error - }{ - // TODO valid cases - - { - input: nil, - schema: nil, - recordType: "Unknown", - err: ErrUnsupportedRecord, - }, - } - - for _, c := range cases { - _, err := FormatToMap(c.input, c.schema, c.recordType) - - if !errors.Is(err, c.err) { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } - } -} From c19f8b0c36e29793b8a2bbf454fe2a2852aa710f Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Fri, 7 Aug 2020 19:48:23 +0900 Subject: [PATCH 3/6] Add CHANGELOG --- CHANGELOG.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..98ddd96 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,34 @@ +# v0.1.x + +## Release v0.1.0 - 2020/08/xx + +### Breaking changes + +- `Columnifier` interface stops supporting `io.WriterCloser` + +### Enhancement + +- #52 Implement stream-based input record decoder's to reduce memory consumption. + + +# v0.0.x + +## Release v0.0.4 - 2020/07/29 + +### Bug fix + +- Fix #49 Capture the first error caused by columnifier + +... And some small improvements. + +## Release v0.0.3 - 2020/06/02 + +Ready to publish columnify as an OSS. + +## Release v0.0.2 - 2020/06/02 + +Ready for production. + +## Release v0.0.1 - 2020/04/19 + +Initial release. From 75d4a0e55e905acfb3771bdc0e747eea6d9aafe5 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Sun, 16 Aug 2020 22:15:48 +0900 Subject: [PATCH 4/6] jsonDecoder -> jsonStringConverter --- columnifier/parquet.go | 12 +++++++----- record/record.go | 10 +++++----- record/record_test.go | 6 +++--- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/columnifier/parquet.go b/columnifier/parquet.go index 4fff479..3148a31 100644 --- a/columnifier/parquet.go +++ b/columnifier/parquet.go @@ -5,12 +5,13 @@ import ( "io/ioutil" "os" + "github.com/xitongsys/parquet-go/marshal" + "github.com/reproio/columnify/record" "github.com/reproio/columnify/parquet" "github.com/reproio/columnify/schema" "github.com/xitongsys/parquet-go-source/local" - "github.com/xitongsys/parquet-go/marshal" parquetSource "github.com/xitongsys/parquet-go/source" "github.com/xitongsys/parquet-go/writer" ) @@ -60,6 +61,9 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi w.RowGroupSize = config.Parquet.RowGroupSize w.CompressionType = config.Parquet.CompressionCodec + // Intermediate record type is string typed JSON values + w.MarshalFunc = marshal.MarshalJSON + return &parquetColumnifier{ w: w, schema: intermediateSchema, @@ -69,9 +73,7 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi // Write reads, converts input binary data and write it to buffer. func (c *parquetColumnifier) WriteFromReader(reader io.Reader) (int, error) { - // Intermediate record type is map[string]interface{} - c.w.MarshalFunc = marshal.MarshalJSON - decoder, err := record.NewJsonDecoder(reader, c.schema, c.rt) + decoder, err := record.NewJsonStringConverter(reader, c.schema, c.rt) if err != nil { return -1, err } @@ -79,7 +81,7 @@ func (c *parquetColumnifier) WriteFromReader(reader io.Reader) (int, error) { beforeSize := c.w.Size for { var v string - err = decoder.Decode(&v) + err = decoder.Convert(&v) if err != nil { if err == io.EOF { break diff --git a/record/record.go b/record/record.go index 34846ad..c917870 100644 --- a/record/record.go +++ b/record/record.go @@ -30,12 +30,12 @@ type innerDecoder interface { Decode(r *map[string]interface{}) error } -// jsonDecoder decodes data with innerDecoder and returns JSON string value. -type jsonDecoder struct { +// jsonStringConverter converts data with innerDecoder and returns JSON string value. +type jsonStringConverter struct { inner innerDecoder } -func NewJsonDecoder(r io.Reader, s *schema.IntermediateSchema, recordType string) (*jsonDecoder, error) { +func NewJsonStringConverter(r io.Reader, s *schema.IntermediateSchema, recordType string) (*jsonStringConverter, error) { var inner innerDecoder var err error @@ -62,12 +62,12 @@ func NewJsonDecoder(r io.Reader, s *schema.IntermediateSchema, recordType string return nil, fmt.Errorf("unsupported record type %s: %w", recordType, ErrUnsupportedRecord) } - return &jsonDecoder{ + return &jsonStringConverter{ inner: inner, }, err } -func (d *jsonDecoder) Decode(v *string) error { +func (d *jsonStringConverter) Convert(v *string) error { var vv map[string]interface{} err := d.inner.Decode(&vv) diff --git a/record/record_test.go b/record/record_test.go index d892afa..447263e 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -15,7 +15,7 @@ func (d *nopInnerDecoder) Decode(r *map[string]interface{}) error { return d.err } -func TestJsonDecoder_Decode(t *testing.T) { +func TestJsonStringConverter_Convert(t *testing.T) { inner := &nopInnerDecoder{ r: map[string]interface{}{ "key1": 42, @@ -24,12 +24,12 @@ func TestJsonDecoder_Decode(t *testing.T) { err: nil, } - d := jsonDecoder{ + d := jsonStringConverter{ inner: inner, } var v string - err := d.Decode(&v) + err := d.Convert(&v) if err != nil { t.Errorf("expected no error, but actual: %v\n", err) } From baf29c06b79556ea68639eb8731d49fccbdfc206 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Sun, 16 Aug 2020 22:32:57 +0900 Subject: [PATCH 5/6] Check errors more in record pkg --- record/avro.go | 3 +++ record/csv.go | 19 ++++++++++--------- record/jsonl.go | 3 +++ record/ltsv.go | 3 +++ 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/record/avro.go b/record/avro.go index 97e7ca4..df5cee9 100644 --- a/record/avro.go +++ b/record/avro.go @@ -37,6 +37,9 @@ func (d *avroInnerDecoder) Decode(r *map[string]interface{}) error { flatten := flattenAvroUnion(m) *r = flatten } else if d.r.RemainingBlockItems() == 0 { + if d.r.Err() != nil { + return d.r.Err() + } return io.EOF } diff --git a/record/csv.go b/record/csv.go index c38c6e0..fe170de 100644 --- a/record/csv.go +++ b/record/csv.go @@ -37,43 +37,44 @@ func newCsvInnerDecoder(r io.Reader, s *schema.IntermediateSchema, delimiter del } func (d *csvInnerDecoder) Decode(r *map[string]interface{}) error { + numNames := len(d.names) + d.r.FieldsPerRecord = numNames + values, err := d.r.Read() if err != nil { return err } - if len(d.names) != len(values) { - return fmt.Errorf("incompleted value %v: %w", values, ErrUnconvertibleRecord) - } - - *r = make(map[string]interface{}) + record := make(map[string]interface{}, numNames) for i, v := range values { n := d.names[i] // bool if v != "0" && v != "1" { if vv, err := strconv.ParseBool(v); err == nil { - (*r)[n] = vv + record[n] = vv continue } } // int if vv, err := strconv.ParseInt(v, 10, 64); err == nil { - (*r)[n] = vv + record[n] = vv continue } // float if vv, err := strconv.ParseFloat(v, 64); err == nil { - (*r)[n] = vv + record[n] = vv continue } // others; to string - (*r)[n] = v + record[n] = v } + *r = record + return nil } diff --git a/record/jsonl.go b/record/jsonl.go index 323feb6..7ca8e32 100644 --- a/record/jsonl.go +++ b/record/jsonl.go @@ -22,6 +22,9 @@ func (d *jsonlInnerDecoder) Decode(r *map[string]interface{}) error { return err } } else { + if err := d.s.Err(); err != nil { + return err + } return io.EOF } diff --git a/record/ltsv.go b/record/ltsv.go index ac4a93c..c2f81f0 100644 --- a/record/ltsv.go +++ b/record/ltsv.go @@ -54,6 +54,9 @@ func (d *ltsvInnerDecoder) Decode(r *map[string]interface{}) error { (*r)[k] = v } } else { + if err := d.s.Err(); err != nil { + return err + } return io.EOF } From d2d859132be38f1ede0185ce2161eb98b693b135 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Sun, 16 Aug 2020 22:52:42 +0900 Subject: [PATCH 6/6] Use Fatal --- record/record_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/record/record_test.go b/record/record_test.go index 447263e..2dde443 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -31,14 +31,14 @@ func TestJsonStringConverter_Convert(t *testing.T) { var v string err := d.Convert(&v) if err != nil { - t.Errorf("expected no error, but actual: %v\n", err) + t.Fatalf("expected no error, but actual: %v\n", err) } data, err := json.Marshal(inner.r) if err != nil { - t.Errorf("expected no error, but actual: %v\n", err) + t.Fatalf("expected no error, but actual: %v\n", err) } if v != string(data) { - t.Errorf("expected: %v, but actual: %v\n", string(data), v) + t.Fatalf("expected: %v, but actual: %v\n", string(data), v) } }