diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..98ddd96 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,34 @@ +# v0.1.x + +## Release v0.1.0 - 2020/08/xx + +### Breaking changes + +- `Columnifier` interface stops supporting `io.WriterCloser` + +### Enhancement + +- #52 Implement stream-based input record decoder's to reduce memory consumption. + + +# v0.0.x + +## Release v0.0.4 - 2020/07/29 + +### Bug fix + +- Fix #49 Capture the first error caused by columnifier + +... And some small improvements. + +## Release v0.0.3 - 2020/06/02 + +Ready to publish columnify as an OSS. + +## Release v0.0.2 - 2020/06/02 + +Ready for production. + +## Release v0.0.1 - 2020/04/19 + +Initial release. diff --git a/columnifier/columnifier.go b/columnifier/columnifier.go index 8d730d3..805cb1c 100644 --- a/columnifier/columnifier.go +++ b/columnifier/columnifier.go @@ -4,9 +4,9 @@ import "io" // Columnifier is the interface that converts input file to columnar format file. type Columnifier interface { - io.WriteCloser - + WriteFromReader(reader io.Reader) (int, error) WriteFromFiles(paths []string) (int, error) + Close() error } // NewColumnifier creates a new Columnifier. diff --git a/columnifier/parquet.go b/columnifier/parquet.go index 97f56a3..3148a31 100644 --- a/columnifier/parquet.go +++ b/columnifier/parquet.go @@ -1,7 +1,11 @@ package columnifier import ( + "io" "io/ioutil" + "os" + + "github.com/xitongsys/parquet-go/marshal" "github.com/reproio/columnify/record" @@ -57,6 +61,9 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi w.RowGroupSize = config.Parquet.RowGroupSize w.CompressionType = config.Parquet.CompressionCodec + // Intermediate record type is string typed JSON values + w.MarshalFunc = marshal.MarshalJSON + return &parquetColumnifier{ w: w, schema: intermediateSchema, @@ -65,36 +72,30 @@ func NewParquetColumnifier(st string, sf string, rt string, output string, confi } // Write reads, converts input binary data and write it to buffer. -func (c *parquetColumnifier) Write(data []byte) (int, error) { - // Intermediate record type is map[string]interface{} - c.w.MarshalFunc = parquet.MarshalMap - records, err := record.FormatToMap(data, c.schema, c.rt) +func (c *parquetColumnifier) WriteFromReader(reader io.Reader) (int, error) { + decoder, err := record.NewJsonStringConverter(reader, c.schema, c.rt) if err != nil { return -1, err } beforeSize := c.w.Size - for _, r := range records { - if err := c.w.Write(r); err != nil { + for { + var v string + err = decoder.Convert(&v) + if err != nil { + if err == io.EOF { + break + } else { + return -1, err + } + } + + if err := c.w.Write(v); err != nil { return -1, err } } afterSize := c.w.Size - // Intermediate record type is wrapped Apache Arrow record - // It requires Arrow Golang implementation more logical type supports - // ref. https://github.com/apache/arrow/blob/9c9dc2012266442d0848e4af0cf52874bc4db151/go/arrow/array/builder.go#L211 - /* - c.w.MarshalFunc = parquet.MarshalArrow - records, err := record.FormatToArrow(data, c.schema, c.rt) - if err != nil { - return err - } - if err := c.w.Write(&records); err != nil { - return err - } - */ - return int(afterSize - beforeSize), nil } @@ -103,11 +104,12 @@ func (c *parquetColumnifier) WriteFromFiles(paths []string) (int, error) { var n int for _, p := range paths { - data, err := ioutil.ReadFile(p) + f, err := os.Open(p) if err != nil { return -1, err } - if n, err = c.Write(data); err != nil { + + if n, err = c.WriteFromReader(f); err != nil { return -1, err } } diff --git a/parquet/doc.go b/parquet/doc.go deleted file mode 100644 index d1559df..0000000 --- a/parquet/doc.go +++ /dev/null @@ -1,12 +0,0 @@ -/* - Package parquetgo is an utility and marshaler with go-friendly error handling for parquet-go. - https://github.com/xitongsys/parquet-go - - xitongsys/parquet-go provides simple, high-level API to convert to Parquet. - But provided features are limited (mainly it looks main users select Go struct or JSON ), - and the error handling is sometimes too simple (panic/recovery based). - - parquetgo package enriches these points for handling Arrow based data. - -*/ -package parquet diff --git a/parquet/marshal_arrow.go b/parquet/marshal_arrow.go deleted file mode 100644 index 42a45fb..0000000 --- a/parquet/marshal_arrow.go +++ /dev/null @@ -1,266 +0,0 @@ -package parquet - -import ( - "bytes" - "encoding/base64" - "fmt" - "reflect" - - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/reproio/columnify/record" - "github.com/xitongsys/parquet-go/common" - "github.com/xitongsys/parquet-go/layout" - "github.com/xitongsys/parquet-go/parquet" - "github.com/xitongsys/parquet-go/schema" - "github.com/xitongsys/parquet-go/types" -) - -// MarshalMap converts 1 arrow record to parquet tables. -func MarshalArrow(maybeRecord []interface{}, bgn int, end int, schemaHandler *schema.SchemaHandler) (*map[string]*layout.Table, error) { - // NOTE This marshaler expects record values aggregation has done before call - if len(maybeRecord) != 1 { - return nil, fmt.Errorf("size of records is invalid: %w", ErrInvalidParquetRecord) - } - - wrapped, recordOk := maybeRecord[0].(*record.WrappedRecord) - if !recordOk { - return nil, fmt.Errorf("unexpected input type %v: %w", reflect.TypeOf(maybeRecord[0]), ErrInvalidParquetRecord) - } - - return marshalArrowRecord(wrapped.Record, schemaHandler) -} - -func marshalArrowRecord(record array.Record, sh *schema.SchemaHandler) (*map[string]*layout.Table, error) { - tables, err := prepareTables(sh) - if err != nil { - return nil, err - } - - keys := make([]string, 0, len(record.Schema().Fields())) - for _, f := range record.Schema().Fields() { - keys = append(keys, common.HeadToUpper(f.Name)) - } - - for i, c := range record.Columns() { - childPathMap := sh.PathMap.Children[keys[i]] - data := c.Data() - tables, err = marshalArrowData(data, tables, sh, childPathMap, 0, 0) - if err != nil { - return nil, err - } - } - - return &tables, nil -} - -func marshalArrowData(data *array.Data, tables map[string]*layout.Table, sh *schema.SchemaHandler, pathMap *schema.PathMapType, rl int32, dl int32) (map[string]*layout.Table, error) { - pathStr := pathMap.Path - - var info *common.Tag - if i, ok := sh.MapIndex[pathStr]; ok { - info = sh.Infos[i] - } else { - return nil, fmt.Errorf("schema not found to path %v: %w", pathStr, ErrInvalidParquetSchema) - } - - switch data.DataType().ID() { - case arrow.BOOL: - values := array.NewBooleanData(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.UINT32: - values := array.NewUint32Data(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.UINT64: - values := array.NewUint64Data(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.FLOAT32: - values := array.NewFloat32Data(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.FLOAT64: - values := array.NewFloat64Data(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.STRING: - values := array.NewStringData(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.BINARY: - values := array.NewBinaryData(data) - for i := 0; i < values.Len(); i++ { - v, deltaDl, err := arrowPrimitiveToDataPageSource(values.Value(i), values.IsValid(i), info) - if err != nil { - return nil, err - } - tables[pathStr].Values = append(tables[pathStr].Values, v) - tables[pathStr].DefinitionLevels = append(tables[pathStr].DefinitionLevels, dl+deltaDl) - tables[pathStr].RepetitionLevels = append(tables[pathStr].RepetitionLevels, rl) - } - - case arrow.STRUCT: - values := array.NewStructData(data) - st, stOk := values.DataType().(*arrow.StructType) - if !stOk { - return nil, fmt.Errorf("unsupported data type %v: %w", values.DataType(), ErrInvalidParquetRecord) - } - keys := make([]string, 0, len(st.Fields())) - for _, f := range st.Fields() { - keys = append(keys, common.HeadToUpper(f.Name)) - } - deltaDl := int32(0) - if info.RepetitionType == parquet.FieldRepetitionType_OPTIONAL { - deltaDl = 1 - } - for i := 0; i < values.NumField(); i++ { - childPathMap := pathMap.Children[keys[i]] - data := values.Field(i).Data() - var err error - tables, err = marshalArrowData(data, tables, sh, childPathMap, rl, dl+deltaDl) - if err != nil { - return nil, err - } - } - - case arrow.LIST: - values := array.NewListData(data) - for i := 0; i < values.Len(); i++ { - o := i + values.Offset() - bgn := int64(values.Offsets()[o]) - end := int64(values.Offsets()[o+1]) - slice := array.NewSlice(values.ListValues(), bgn, end) - - // first - if slice.Len() > 0 { - first := array.NewSlice(slice, 0, 1) - var err error - tables, err = marshalArrowData(first.Data(), tables, sh, pathMap, rl, dl+1) - if err != nil { - return nil, err - } - } - - // repeated; repetition level += max repetition level - if slice.Len() > 1 { - repeated := array.NewSlice(slice, 1, int64(slice.Len())) - maxRl, err := sh.MaxRepetitionLevel(common.StrToPath(pathStr)) - if err != nil { - return nil, err - } - tables, err = marshalArrowData(repeated.Data(), tables, sh, pathMap, rl+maxRl, dl+1) - if err != nil { - return nil, err - } - - } - } - - default: - return nil, fmt.Errorf("unsupported type %v: %w", data.DataType(), ErrInvalidParquetRecord) - } - - return tables, nil -} - -func arrowPrimitiveToDataPageSource(value interface{}, isValid bool, info *common.Tag) (interface{}, int32, error) { - switch info.RepetitionType { - case parquet.FieldRepetitionType_REQUIRED: - if isValid { - if v, err := formatArrowPrimitive(value, info); err != nil { - return nil, -1, err - } else { - return v, 0, nil - } - } else { - return nil, -1, fmt.Errorf("null for required field %v: %w", info, ErrInvalidParquetRecord) - } - case parquet.FieldRepetitionType_OPTIONAL: - if isValid { - if v, err := formatArrowPrimitive(value, info); err != nil { - return nil, -1, err - } else { - return v, 1, nil - } - } else { - return nil, 0, nil - } - default: - return nil, -1, fmt.Errorf("invalid field repetition type for %v: %w", info, ErrInvalidParquetRecord) - } -} - -func formatArrowPrimitive(value interface{}, info *common.Tag) (interface{}, error) { - pT, cT := types.TypeNameToParquetType(info.Type, info.BaseType) - - var s string - if (*pT == parquet.Type_BYTE_ARRAY || *pT == parquet.Type_FIXED_LEN_BYTE_ARRAY) && cT == nil { - bin, binOk := value.([]byte) - if !binOk { - return nil, fmt.Errorf("%v is not []byte: %w", value, ErrInvalidParquetRecord) - } - - var buf bytes.Buffer - encoder := base64.NewEncoder(base64.StdEncoding, &buf) - defer func() { _ = encoder.Close() }() - - if _, err := encoder.Write(bin); err != nil { - return nil, err - } - s = buf.String() - } else { - s = fmt.Sprintf("%v", value) - } - - return types.StrToParquetType(s, pT, cT, int(info.Length), int(info.Scale)), nil -} diff --git a/parquet/marshal_arrow_test.go b/parquet/marshal_arrow_test.go deleted file mode 100644 index d82da5d..0000000 --- a/parquet/marshal_arrow_test.go +++ /dev/null @@ -1,527 +0,0 @@ -package parquet - -import ( - "reflect" - "testing" - - "github.com/reproio/columnify/record" - "github.com/reproio/columnify/schema" - "github.com/xitongsys/parquet-go/layout" - - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" -) - -func TestNewArrowSchemaFromAvroSchema(t *testing.T) { - cases := []struct { - input func(s *schema.IntermediateSchema) []interface{} - schema *schema.IntermediateSchema - expect *map[string]*layout.Table - err error - }{ - // Only primitives - { - input: func(s *schema.IntermediateSchema) []interface{} { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return []interface{}{record.NewWrappedRecord(b)} - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }, nil), - "primitives"), - expect: &map[string]*layout.Table{ - "Primitives.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - }, - err: nil, - }, - - // Nested - { - input: func(s *schema.IntermediateSchema) []interface{} { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - sb := b.Field(7).(*array.StructBuilder) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return []interface{}{record.NewWrappedRecord(b)} - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "record", - Type: arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - ), - Nullable: false, - }, - }, - nil), - "nested"), - expect: &map[string]*layout.Table{ - "Nested.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - }, - err: nil, - }, - - // Array - { - input: func(s *schema.IntermediateSchema) []interface{} { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - lb := b.Field(7).(*array.ListBuilder) - sb := lb.ValueBuilder().(*array.StructBuilder) - lb.Append(true) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - lb.Append(true) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return []interface{}{record.NewWrappedRecord(b)} - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "array", - Type: arrow.ListOf( - arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - )), - Nullable: false, - }, - }, nil), - "arrays"), - expect: &map[string]*layout.Table{ - "Arrays.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Array.Boolean": { - Values: []interface{}{false, true, false, true}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Int": { - Values: []interface{}{int32(1), int32(2), int32(1), int32(2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Long": { - Values: []interface{}{int64(1), int64(2), int64(1), int64(2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Float": { - Values: []interface{}{float32(1.1), float32(2.2), float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Double": { - Values: []interface{}{float64(1.1), float64(2.2), float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t), base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.String": { - Values: []interface{}{"foo", "bar", "foo", "bar"}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - }, - err: nil, - }, - } - - for _, c := range cases { - sh, err := schema.NewSchemaHandlerFromArrow(*c.schema) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - tables, err := MarshalArrow(c.input(c.schema), 0, 1, sh) - - if err != c.err { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } - - for k, v := range *c.expect { - actual := (*tables)[k] - - if !reflect.DeepEqual(actual.Values, v.Values) { - t.Errorf("expected: %v, but actual: %v\n", v.Values, actual.Values) - } - - if !reflect.DeepEqual(actual.DefinitionLevels, v.DefinitionLevels) { - t.Errorf("expected: %v, but actual: %v\n", v.DefinitionLevels, actual.DefinitionLevels) - } - - if !reflect.DeepEqual(actual.RepetitionLevels, v.RepetitionLevels) { - t.Errorf("expected: %v, but actual: %v\n", v.RepetitionLevels, actual.RepetitionLevels) - } - } - } -} diff --git a/parquet/marshal_map.go b/parquet/marshal_map.go deleted file mode 100644 index 7f1a55f..0000000 --- a/parquet/marshal_map.go +++ /dev/null @@ -1,26 +0,0 @@ -package parquet - -import ( - "encoding/json" - - "github.com/xitongsys/parquet-go/layout" - "github.com/xitongsys/parquet-go/marshal" - "github.com/xitongsys/parquet-go/schema" -) - -// MarshalMap converts []map[string]interface{} to parquet tables. -func MarshalMap(sources []interface{}, bgn int, end int, schemaHandler *schema.SchemaHandler) (*map[string]*layout.Table, error) { - jsons := make([]interface{}, 0, end-bgn) - - for _, d := range sources[bgn:end] { - e, err := json.Marshal(d) - if err != nil { - return nil, err - } - jsons = append(jsons, string(e)) - } - - // NOTE: reuse existing JSON marshaler. Implementing it ourselves is high cost - // NOTE: it requires redundant map -> json -> map conversions - return marshal.MarshalJSON(jsons, bgn, end, schemaHandler) -} diff --git a/parquet/marshal_map_test.go b/parquet/marshal_map_test.go deleted file mode 100644 index f1005b8..0000000 --- a/parquet/marshal_map_test.go +++ /dev/null @@ -1,588 +0,0 @@ -package parquet - -import ( - "bytes" - "encoding/base64" - "reflect" - "testing" - - "github.com/apache/arrow/go/arrow" - "github.com/reproio/columnify/schema" - "github.com/xitongsys/parquet-go/layout" -) - -func base64Str(d []byte, t *testing.T) string { - var buf bytes.Buffer - encoder := base64.NewEncoder(base64.StdEncoding, &buf) - - _, err := encoder.Write(d) - if err != nil { - t.Fatalf("invalid test case: %v", err) - } - - err = encoder.Close() - if err != nil { - t.Fatalf("invalid test case: %v", err) - } - - return buf.String() -} - -func TestMarshalMap(t *testing.T) { - cases := []struct { - input []interface{} - bgn int - end int - schema *schema.IntermediateSchema - expect *map[string]*layout.Table - err error - }{ - // Only primitives - { - input: []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - }, - }, - bgn: 0, - end: 2, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }, nil), - "primitives"), - expect: &map[string]*layout.Table{ - "Primitives.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Primitives.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - }, - err: nil, - }, - - // Nested - { - input: []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - "record": map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - }, - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - "record": map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - }, - }, - }, - bgn: 0, - end: 2, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - }, - { - Name: "record", - Type: arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - ), - Nullable: false, - }, - }, - nil), - "nested"), - expect: &map[string]*layout.Table{ - "Nested.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Nested.Record.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - }, - err: nil, - }, - - // Array - { - input: []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - "array": []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - }, - }, - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - "array": []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": []byte("foo"), - "double": 1.1, - "float": 1.1, - "int": 1, - "long": 1, - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": []byte("bar"), - "double": 2.2, - "float": 2.2, - "int": 2, - "long": 2, - "string": "bar", - }, - }, - }, - }, - bgn: 0, - end: 2, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "array", - Type: arrow.ListOf( - arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - )), - Nullable: false, - }, - }, nil), - "arrays"), - expect: &map[string]*layout.Table{ - "Arrays.Boolean": { - Values: []interface{}{false, true}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Int": { - Values: []interface{}{int32(1), int32(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Long": { - Values: []interface{}{int64(1), int64(2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Float": { - Values: []interface{}{float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Double": { - Values: []interface{}{float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.String": { - Values: []interface{}{"foo", "bar"}, - DefinitionLevels: []int32{0, 0}, - RepetitionLevels: []int32{0, 0}, - }, - "Arrays.Array.Boolean": { - Values: []interface{}{false, true, false, true}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Int": { - Values: []interface{}{int32(1), int32(2), int32(1), int32(2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Long": { - Values: []interface{}{int64(1), int64(2), int64(1), int64(2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Float": { - Values: []interface{}{float32(1.1), float32(2.2), float32(1.1), float32(2.2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Double": { - Values: []interface{}{float64(1.1), float64(2.2), float64(1.1), float64(2.2)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.Bytes": { - Values: []interface{}{base64Str([]byte("foo"), t), base64Str([]byte("bar"), t), base64Str([]byte("foo"), t), base64Str([]byte("bar"), t)}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - "Arrays.Array.String": { - Values: []interface{}{"foo", "bar", "foo", "bar"}, - DefinitionLevels: []int32{1, 1, 1, 1}, - RepetitionLevels: []int32{0, 1, 0, 1}, - }, - }, - err: nil, - }, - } - - for _, c := range cases { - sh, err := schema.NewSchemaHandlerFromArrow(*c.schema) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - tables, err := MarshalMap(c.input, c.bgn, c.end, sh) - if err != c.err { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } - - for k, v := range *c.expect { - actual := (*tables)[k] - - if !reflect.DeepEqual(actual.Values, v.Values) { - t.Errorf("values: expected: %v, but actual: %v\n", v.Values, actual.Values) - } - - if !reflect.DeepEqual(actual.DefinitionLevels, v.DefinitionLevels) { - t.Errorf("definition levels: expected: %v, but actual: %v\n", v.DefinitionLevels, actual.DefinitionLevels) - } - if !reflect.DeepEqual(actual.RepetitionLevels, v.RepetitionLevels) { - t.Errorf("repetition levels: expected: %v, but actual: %v\n", v.RepetitionLevels, actual.RepetitionLevels) - } - } - } -} diff --git a/parquet/parquet.go b/parquet/parquet.go deleted file mode 100644 index cba88ca..0000000 --- a/parquet/parquet.go +++ /dev/null @@ -1,56 +0,0 @@ -package parquet - -import ( - "errors" - "fmt" - - "github.com/xitongsys/parquet-go/common" - "github.com/xitongsys/parquet-go/layout" - "github.com/xitongsys/parquet-go/schema" -) - -var ( - ErrInvalidParquetSchema = errors.New("invalid parquet schema") - ErrInvalidParquetRecord = errors.New("invalid parquet record") - ErrUnsupportedMethod = errors.New("unsupported method") -) - -// prepareTables returns tables from fields(non record) in schema elements. -func prepareTables(schemaHandler *schema.SchemaHandler) (map[string]*layout.Table, error) { - numSchemaElements := len(schemaHandler.SchemaElements) - if len(schemaHandler.Infos) != numSchemaElements { - return nil, fmt.Errorf("sizes of SchemaElement and Infos don't match: %w", ErrInvalidParquetSchema) - } - if len(schemaHandler.MapIndex) != numSchemaElements { - return nil, fmt.Errorf("sizes of SchemaElement and MapIndex don't match: %w", ErrInvalidParquetSchema) - } - - tables := make(map[string]*layout.Table) - for i, e := range schemaHandler.SchemaElements { - if e.GetNumChildren() == 0 { // fields(non record) - pathStr := schemaHandler.IndexMap[int32(i)] - path := common.StrToPath(pathStr) - - maxDefinitionLevel, err := schemaHandler.MaxDefinitionLevel(path) - if err != nil { - return nil, err - } - - maxRepetitionLevel, err := schemaHandler.MaxRepetitionLevel(path) - if err != nil { - return nil, err - } - - tables[pathStr] = &layout.Table{ - Path: path, - MaxDefinitionLevel: maxDefinitionLevel, - MaxRepetitionLevel: maxRepetitionLevel, - RepetitionType: e.GetRepetitionType(), - Schema: schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]], - Info: schemaHandler.Infos[i], - } - } - } - - return tables, nil -} diff --git a/parquet/stdio.go b/parquet/stdio.go index 90895e5..9ca5b9f 100644 --- a/parquet/stdio.go +++ b/parquet/stdio.go @@ -1,6 +1,7 @@ package parquet import ( + "errors" "fmt" "io" "os" @@ -8,6 +9,10 @@ import ( "github.com/xitongsys/parquet-go/source" ) +var ( + ErrUnsupportedMethod = errors.New("unsupported method") +) + // stdioFile is an implementation of ParquetFile, just writing data to stdout. type stdioFile struct { in io.ReadCloser diff --git a/record/arrow.go b/record/arrow.go deleted file mode 100644 index a5647ea..0000000 --- a/record/arrow.go +++ /dev/null @@ -1,227 +0,0 @@ -package record - -import ( - "fmt" - - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" -) - -type WrappedRecord struct { - Record array.Record -} - -func NewWrappedRecord(b *array.RecordBuilder) *WrappedRecord { - return &WrappedRecord{ - Record: b.NewRecord(), - } -} - -func formatMapToArrowRecord(s *arrow.Schema, maps []map[string]interface{}) (*WrappedRecord, error) { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s) - defer b.Release() - - for _, m := range maps { - for i, f := range s.Fields() { - if v, ok := m[f.Name]; ok { - if _, err := formatMapToArrowField(b.Field(i), f.Type, v); err != nil { - return nil, err - } - } else { - b.Field(i).AppendNull() - } - } - } - - return NewWrappedRecord(b), nil -} - -func formatMapToArrowStruct(b *array.StructBuilder, s *arrow.StructType, m map[string]interface{}) (*array.StructBuilder, error) { - for i, f := range s.Fields() { - if v, ok := m[f.Name]; ok { - if _, err := formatMapToArrowField(b.FieldBuilder(i), f.Type, v); err != nil { - return nil, err - } - } else { - b.FieldBuilder(i).AppendNull() - } - - } - - return b, nil -} - -func formatMapToArrowList(b *array.ListBuilder, l *arrow.ListType, list []interface{}) (*array.ListBuilder, error) { - for _, e := range list { - if _, err := formatMapToArrowField(b.ValueBuilder(), l.Elem(), e); err != nil { - return nil, err - } - } - - return b, nil -} - -func formatMapToArrowField(b array.Builder, t arrow.DataType, v interface{}) (array.Builder, error) { - switch t.ID() { - case arrow.BOOL: - vb, builderOk := b.(*array.BooleanBuilder) - vv, valueOk := v.(bool) - if builderOk && valueOk { - vb.Append(vv) - } else { - return nil, fmt.Errorf("unexpected input %v as bool: %w", v, ErrUnconvertibleRecord) - } - - case arrow.UINT32: - vb, builderOk := b.(*array.Uint32Builder) - if !builderOk { - return nil, fmt.Errorf("builder %v is wrong: %w", v, ErrUnconvertibleRecord) - } - switch vv := v.(type) { - case int: - vb.Append(uint32(vv)) - case int8: - vb.Append(uint32(vv)) - case int16: - vb.Append(uint32(vv)) - case int32: - vb.Append(uint32(vv)) - case int64: - vb.Append(uint32(vv)) - case uint: - vb.Append(uint32(vv)) - case uint8: - vb.Append(uint32(vv)) - case uint16: - vb.Append(uint32(vv)) - case uint32: - vb.Append(uint32(vv)) - case uint64: - vb.Append(uint32(vv)) - case float64: - vb.Append(uint32(vv)) - default: - return nil, fmt.Errorf("unexpected input %v as uint32: %w", v, ErrUnconvertibleRecord) - } - - case arrow.UINT64: - vb, builderOk := b.(*array.Uint64Builder) - if !builderOk { - return nil, fmt.Errorf("builder %v is wrong: %w", v, ErrUnconvertibleRecord) - } - switch vv := v.(type) { - case int8: - vb.Append(uint64(vv)) - case int16: - vb.Append(uint64(vv)) - case int32: - vb.Append(uint64(vv)) - case int64: - vb.Append(uint64(vv)) - case uint8: - vb.Append(uint64(vv)) - case uint16: - vb.Append(uint64(vv)) - case uint32: - vb.Append(uint64(vv)) - case uint64: - vb.Append(uint64(vv)) - case float64: - vb.Append(uint64(vv)) - default: - return nil, fmt.Errorf("unexpected input %v as uint64: %w", v, ErrUnconvertibleRecord) - } - - case arrow.FLOAT32: - vb, builderOk := b.(*array.Float32Builder) - if !builderOk { - return nil, fmt.Errorf("builder %v is wrong: %w", v, ErrUnconvertibleRecord) - } - switch vv := v.(type) { - case float32: - vb.Append(float32(vv)) - case float64: - vb.Append(float32(vv)) - default: - return nil, fmt.Errorf("unexpected input %v as float32: %w", v, ErrUnconvertibleRecord) - } - - case arrow.FLOAT64: - vb, builderOk := b.(*array.Float64Builder) - vv, valueOk := v.(float64) - if builderOk && valueOk { - vb.Append(vv) - } else { - return nil, fmt.Errorf("unexpected input %v as float64: %w", v, ErrUnconvertibleRecord) - } - - case arrow.STRING: - vb, builderOk := b.(*array.StringBuilder) - vv, valueOk := v.(string) - if builderOk && valueOk { - vb.Append(vv) - } else { - return nil, fmt.Errorf("unexpected input %v as string: %w", v, ErrUnconvertibleRecord) - } - - case arrow.BINARY: - vb, builderOk := b.(*array.BinaryBuilder) - if !builderOk { - return nil, fmt.Errorf("builder %v is wrong: %w", v, ErrUnconvertibleRecord) - } - switch vv := v.(type) { - case string: - vb.Append([]byte(vv)) - case []byte: - vb.Append(vv) - default: - return nil, fmt.Errorf("unexpected input %v as binary: %w", v, ErrUnconvertibleRecord) - } - - case arrow.STRUCT: - vb, builderOk := b.(*array.StructBuilder) - st, structOk := t.(*arrow.StructType) - if builderOk && structOk { - if v != nil { - vb.Append(true) - vv, valueOk := v.(map[string]interface{}) - if !valueOk { - return nil, fmt.Errorf("unexpected input %v as struct: %w", v, ErrUnconvertibleRecord) - } else if _, err := formatMapToArrowStruct(vb, st, vv); err != nil { - return nil, err - } - } else { - vb.Append(false) - } - } else { - return nil, fmt.Errorf("unexpected input %v as struct: %w", v, ErrUnconvertibleRecord) - } - - case arrow.LIST: - vb, builderOk := b.(*array.ListBuilder) - lt, listOk := t.(*arrow.ListType) - if builderOk && listOk { - if v != nil { - vb.Append(true) - vv, valueOk := v.([]interface{}) - if !valueOk { - return nil, fmt.Errorf("unexpected input %v as list: %w", v, ErrUnconvertibleRecord) - } - if _, err := formatMapToArrowList(vb, lt, vv); err != nil { - return nil, err - } - } else { - vb.Append(false) - } - } else { - return nil, fmt.Errorf("unexpected input %v as list: %w", v, ErrUnconvertibleRecord) - } - - default: - return nil, fmt.Errorf("unconvertable type %v: %w", t.ID(), ErrUnconvertibleRecord) - } - - return b, nil -} diff --git a/record/arrow_test.go b/record/arrow_test.go deleted file mode 100644 index 02eda1d..0000000 --- a/record/arrow_test.go +++ /dev/null @@ -1,446 +0,0 @@ -package record - -import ( - "reflect" - "testing" - - "github.com/apache/arrow/go/arrow" - "github.com/apache/arrow/go/arrow/array" - "github.com/apache/arrow/go/arrow/memory" - "github.com/reproio/columnify/schema" -) - -func TestNewArrowSchemaFromAvroSchema(t *testing.T) { - cases := []struct { - input []map[string]interface{} - schema *schema.IntermediateSchema - expected func(s *schema.IntermediateSchema) *WrappedRecord - err error - }{ - // Primitives - { - input: []map[string]interface{}{ - { - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - }, - { - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - }, - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }, nil), - "primitives"), - expected: func(s *schema.IntermediateSchema) *WrappedRecord { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return NewWrappedRecord(b) - }, - err: nil, - }, - - // Nested - { - input: []map[string]interface{}{ - { - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - "record": map[string]interface{}{ - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - }, - }, - { - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - "record": map[string]interface{}{ - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - }, - }, - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "record", - Type: arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - ), - Nullable: false, - }, - }, - nil), - "nested"), - expected: func(s *schema.IntermediateSchema) *WrappedRecord { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - sb := b.Field(7).(*array.StructBuilder) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return NewWrappedRecord(b) - }, - err: nil, - }, - - // Array - { - input: []map[string]interface{}{ - { - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - "array": []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - }, - }, - }, - { - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - "array": []interface{}{ - map[string]interface{}{ - "boolean": false, - "bytes": string([]byte("foo")), - "double": float64(1.1), - "float": float64(1.1), - "int": float64(1), - "long": float64(1), - "string": "foo", - }, - map[string]interface{}{ - "boolean": true, - "bytes": string([]byte("bar")), - "double": float64(2.2), - "float": float64(2.2), - "int": float64(2), - "long": float64(2), - "string": "bar", - }, - }, - }, - }, - schema: schema.NewIntermediateSchema( - arrow.NewSchema( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - { - Name: "array", - Type: arrow.ListOf( - arrow.StructOf( - []arrow.Field{ - { - Name: "boolean", - Type: arrow.FixedWidthTypes.Boolean, - Nullable: false, - }, - { - Name: "int", - Type: arrow.PrimitiveTypes.Uint32, - Nullable: false, - }, - { - Name: "long", - Type: arrow.PrimitiveTypes.Uint64, - Nullable: false, - }, - { - Name: "float", - Type: arrow.PrimitiveTypes.Float32, - Nullable: false, - }, - { - Name: "double", - Type: arrow.PrimitiveTypes.Float64, - Nullable: false, - }, - { - Name: "bytes", - Type: arrow.BinaryTypes.Binary, - Nullable: false, - }, - { - Name: "string", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, - }..., - )), - Nullable: false, - }, - }, nil), - "arrays"), - expected: func(s *schema.IntermediateSchema) *WrappedRecord { - pool := memory.NewGoAllocator() - b := array.NewRecordBuilder(pool, s.ArrowSchema) - - b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - b.Field(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - b.Field(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - b.Field(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - b.Field(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - b.Field(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - b.Field(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - lb := b.Field(7).(*array.ListBuilder) - sb := lb.ValueBuilder().(*array.StructBuilder) - lb.Append(true) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - lb.Append(true) - sb.AppendValues([]bool{true, true}) - sb.FieldBuilder(0).(*array.BooleanBuilder).AppendValues([]bool{false, true}, []bool{true, true}) - sb.FieldBuilder(1).(*array.Uint32Builder).AppendValues([]uint32{1, 2}, []bool{true, true}) - sb.FieldBuilder(2).(*array.Uint64Builder).AppendValues([]uint64{1, 2}, []bool{true, true}) - sb.FieldBuilder(3).(*array.Float32Builder).AppendValues([]float32{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(4).(*array.Float64Builder).AppendValues([]float64{1.1, 2.2}, []bool{true, true}) - sb.FieldBuilder(5).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("foo"), []byte("bar")}, []bool{true, true}) - sb.FieldBuilder(6).(*array.StringBuilder).AppendValues([]string{"foo", "bar"}, []bool{true, true}) - - return NewWrappedRecord(b) - }, - err: nil, - }, - } - - for _, c := range cases { - expectedRecord := c.expected(c.schema) - - actual, err := formatMapToArrowRecord(c.schema.ArrowSchema, c.input) - - if err != c.err { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } - - if !reflect.DeepEqual(actual, expectedRecord) { - t.Errorf("values: expected: %v, but actual: %v\n", expectedRecord, actual) - } - } -} diff --git a/record/avro.go b/record/avro.go index 156bee7..df5cee9 100644 --- a/record/avro.go +++ b/record/avro.go @@ -1,14 +1,51 @@ package record import ( - "bytes" "fmt" - - "github.com/reproio/columnify/schema" + "io" "github.com/linkedin/goavro/v2" ) +type avroInnerDecoder struct { + r *goavro.OCFReader +} + +func newAvroInnerDecoder(r io.Reader) (*avroInnerDecoder, error) { + reader, err := goavro.NewOCFReader(r) + if err != nil { + return nil, err + } + + return &avroInnerDecoder{ + r: reader, + }, nil +} + +func (d *avroInnerDecoder) Decode(r *map[string]interface{}) error { + if d.r.Scan() { + v, err := d.r.Read() + if err != nil { + return err + } + + m, mapOk := v.(map[string]interface{}) + if !mapOk { + return fmt.Errorf("invalid value %v: %w", v, ErrUnconvertibleRecord) + } + + flatten := flattenAvroUnion(m) + *r = flatten + } else if d.r.RemainingBlockItems() == 0 { + if d.r.Err() != nil { + return d.r.Err() + } + return io.EOF + } + + return d.r.Err() +} + // flattenAvroUnion flattens nested map type has only 1 element. func flattenAvroUnion(in map[string]interface{}) map[string]interface{} { out := make(map[string]interface{}) @@ -32,35 +69,3 @@ func flattenAvroUnion(in map[string]interface{}) map[string]interface{} { return out } - -func FormatAvroToMap(data []byte) ([]map[string]interface{}, error) { - r, err := goavro.NewOCFReader(bytes.NewReader(data)) - if err != nil { - return nil, err - } - - maps := make([]map[string]interface{}, 0) - for r.Scan() { - v, err := r.Read() - if err != nil { - return nil, err - } - m, mapOk := v.(map[string]interface{}) - if !mapOk { - return nil, fmt.Errorf("invalid value %v: %w", v, ErrUnconvertibleRecord) - } - flatten := flattenAvroUnion(m) - maps = append(maps, flatten) - } - - return maps, nil -} - -func FormatAvroToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatAvroToMap(data) - if err != nil { - return nil, err - } - - return formatMapToArrowRecord(s.ArrowSchema, maps) -} diff --git a/record/avro_test.go b/record/avro_test.go index b68f9e2..b0f79aa 100644 --- a/record/avro_test.go +++ b/record/avro_test.go @@ -2,6 +2,7 @@ package record import ( "bytes" + "io" "reflect" "testing" @@ -35,11 +36,10 @@ func TestFlattenAvroUnion(t *testing.T) { } } -func TestFormatAvroToMap(t *testing.T) { +func TestAvroInnerDecoder_Decode(t *testing.T) { cases := []struct { input []byte expected []map[string]interface{} - isErr bool }{ { input: func() []byte { @@ -113,22 +113,29 @@ func TestFormatAvroToMap(t *testing.T) { "string": "bar", }, }, - isErr: false, - }, - - // Not avro - { - input: []byte("not-valid-avro"), - expected: nil, - isErr: true, }, } for _, c := range cases { - actual, err := FormatAvroToMap(c.input) + buf := bytes.NewReader(c.input) + d, err := newAvroInnerDecoder(buf) + if err != nil { + t.Fatal(err) + } + + actual := make([]map[string]interface{}, 0) + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } - if err != nil != c.isErr { - t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + if err != nil && err != io.EOF { + t.Errorf("expected no error or io.EOF, but actual: %v\n", err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/csv.go b/record/csv.go index a12ed71..fe170de 100644 --- a/record/csv.go +++ b/record/csv.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "strconv" - "strings" "github.com/reproio/columnify/schema" ) @@ -17,82 +16,79 @@ const ( TsvDelimiter delimiter = '\t' ) -func getFieldNamesFromSchema(s *schema.IntermediateSchema) ([]string, error) { - elems := s.ArrowSchema.Fields() - - if len(elems) < 2 { - return nil, fmt.Errorf("no element is available: %w", ErrUnconvertibleRecord) - } - - names := make([]string, 0, len(elems)) - for _, e := range elems { - names = append(names, e.Name) - } - - return names, nil +type csvInnerDecoder struct { + r *csv.Reader + names []string } -func FormatCsvToMap(s *schema.IntermediateSchema, data []byte, delimiter delimiter) ([]map[string]interface{}, error) { +func newCsvInnerDecoder(r io.Reader, s *schema.IntermediateSchema, delimiter delimiter) (*csvInnerDecoder, error) { names, err := getFieldNamesFromSchema(s) if err != nil { return nil, err } - reader := csv.NewReader(strings.NewReader(string(data))) + reader := csv.NewReader(r) reader.Comma = rune(delimiter) - numFields := len(names) - arr := make([]map[string]interface{}, 0) - for { - values, err := reader.Read() - if err == io.EOF { - break - } - if err != nil { - return nil, err - } + return &csvInnerDecoder{ + r: reader, + names: names, + }, nil +} - if numFields != len(values) { - return nil, fmt.Errorf("incompleted value %v: %w", values, ErrUnconvertibleRecord) - } +func (d *csvInnerDecoder) Decode(r *map[string]interface{}) error { + numNames := len(d.names) + d.r.FieldsPerRecord = numNames - e := make(map[string]interface{}) - for i, v := range values { - // bool - if v != "0" && v != "1" { - if vv, err := strconv.ParseBool(v); err == nil { - e[names[i]] = vv - continue - } - } + values, err := d.r.Read() + if err != nil { + return err + } - // int - if vv, err := strconv.ParseInt(v, 10, 64); err == nil { - e[names[i]] = vv - continue - } + record := make(map[string]interface{}, numNames) + for i, v := range values { + n := d.names[i] - // float - if vv, err := strconv.ParseFloat(v, 64); err == nil { - e[names[i]] = vv + // bool + if v != "0" && v != "1" { + if vv, err := strconv.ParseBool(v); err == nil { + record[n] = vv continue } + } + + // int + if vv, err := strconv.ParseInt(v, 10, 64); err == nil { + record[n] = vv + continue + } - // others; to string - e[names[i]] = v + // float + if vv, err := strconv.ParseFloat(v, 64); err == nil { + record[n] = vv + continue } - arr = append(arr, e) + // others; to string + record[n] = v } - return arr, nil + *r = record + + return nil } -func FormatCsvToArrow(s *schema.IntermediateSchema, data []byte, delimiter delimiter) (*WrappedRecord, error) { - maps, err := FormatCsvToMap(s, data, delimiter) - if err != nil { - return nil, err +func getFieldNamesFromSchema(s *schema.IntermediateSchema) ([]string, error) { + elems := s.ArrowSchema.Fields() + + if len(elems) == 0 { + return nil, fmt.Errorf("no element is available: %w", ErrUnconvertibleRecord) } - return formatMapToArrowRecord(s.ArrowSchema, maps) + names := make([]string, 0, len(elems)) + for _, e := range elems { + names = append(names, e.Name) + } + + return names, nil } diff --git a/record/csv_test.go b/record/csv_test.go index a75406f..d5577e7 100644 --- a/record/csv_test.go +++ b/record/csv_test.go @@ -1,6 +1,8 @@ package record import ( + "bytes" + "io" "reflect" "testing" @@ -8,7 +10,7 @@ import ( "github.com/reproio/columnify/schema" ) -func TestFormatCsvToMap(t *testing.T) { +func TestCsvInnerDecoder_Decode(t *testing.T) { cases := []struct { schema *schema.IntermediateSchema input []byte @@ -151,36 +153,28 @@ true 2 2 2.2 2.2 bar bar`), }, isErr: false, }, - - // Not csv - { - schema: schema.NewIntermediateSchema( - arrow.NewSchema([]arrow.Field{}, nil), - "primitives", - ), - input: []byte("not-valid-csv"), - expected: nil, - isErr: true, - }, - - // Not tsv - { - schema: schema.NewIntermediateSchema( - arrow.NewSchema([]arrow.Field{}, nil), - "primitives", - ), - input: []byte("not-valid-tsv"), - delimiter: TsvDelimiter, - expected: nil, - isErr: true, - }, } for _, c := range cases { - actual, err := FormatCsvToMap(c.schema, c.input, c.delimiter) + buf := bytes.NewReader(c.input) + d, err := newCsvInnerDecoder(buf, c.schema, c.delimiter) + if err != nil { + t.Fatal(err) + } + + actual := make([]map[string]interface{}, 0) + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } - if err != nil != c.isErr { + if (err != nil && err != io.EOF) != c.isErr { t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/jsonl.go b/record/jsonl.go index 48144cd..7ca8e32 100644 --- a/record/jsonl.go +++ b/record/jsonl.go @@ -1,38 +1,32 @@ package record import ( + "bufio" "encoding/json" - "strings" - - "github.com/reproio/columnify/schema" + "io" ) -func FormatJsonlToMap(data []byte) ([]map[string]interface{}, error) { - lines := strings.Split(string(data), "\n") - - records := make([]map[string]interface{}, 0) - for _, l := range lines { - if l == "" { - // skip blank line - continue - } - - var e map[string]interface{} - if err := json.Unmarshal([]byte(l), &e); err != nil { - return nil, err - } +type jsonlInnerDecoder struct { + s *bufio.Scanner +} - records = append(records, e) +func newJsonlInnerDecoder(r io.Reader) *jsonlInnerDecoder { + return &jsonlInnerDecoder{ + s: bufio.NewScanner(r), } - - return records, nil } -func FormatJsonlToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatJsonlToMap(data) - if err != nil { - return nil, err +func (d *jsonlInnerDecoder) Decode(r *map[string]interface{}) error { + if d.s.Scan() { + if err := json.Unmarshal(d.s.Bytes(), r); err != nil { + return err + } + } else { + if err := d.s.Err(); err != nil { + return err + } + return io.EOF } - return formatMapToArrowRecord(s.ArrowSchema, maps) + return d.s.Err() } diff --git a/record/jsonl_test.go b/record/jsonl_test.go index db07a9e..569b716 100644 --- a/record/jsonl_test.go +++ b/record/jsonl_test.go @@ -1,11 +1,13 @@ package record import ( + "bytes" + "io" "reflect" "testing" ) -func TestFormatJsonlToMap(t *testing.T) { +func TestJsonlInnerDecoder_Decode(t *testing.T) { cases := []struct { input []byte expected []map[string]interface{} @@ -43,16 +45,29 @@ func TestFormatJsonlToMap(t *testing.T) { // Not JSONL { input: []byte("not-valid-json"), - expected: nil, + expected: []map[string]interface{}{}, isErr: true, }, } for _, c := range cases { - actual, err := FormatJsonlToMap(c.input) + buf := bytes.NewReader(c.input) + d := newJsonlInnerDecoder(buf) - if err != nil != c.isErr { + actual := make([]map[string]interface{}, 0) + var err error + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } + + if (err != nil && err != io.EOF) != c.isErr { t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/ltsv.go b/record/ltsv.go index c76c1d9..c2f81f0 100644 --- a/record/ltsv.go +++ b/record/ltsv.go @@ -1,63 +1,64 @@ package record import ( + "bufio" + "io" "strconv" - "strings" - - "github.com/reproio/columnify/schema" "github.com/Songmu/go-ltsv" ) -func FormatLtsvToMap(data []byte) ([]map[string]interface{}, error) { - lines := strings.Split(string(data), "\n") +type ltsvInnerDecoder struct { + s *bufio.Scanner +} + +func newLtsvInnerDecoder(r io.Reader) *ltsvInnerDecoder { + return <svInnerDecoder{ + s: bufio.NewScanner(r), + } +} - records := make([]map[string]interface{}, 0) - for _, l := range lines { - v := map[string]string{} +func (d *ltsvInnerDecoder) Decode(r *map[string]interface{}) error { + if d.s.Scan() { + data := d.s.Bytes() - err := ltsv.Unmarshal([]byte(l), &v) + m := map[string]string{} + err := ltsv.Unmarshal(data, &m) if err != nil { - return nil, err + return err } - m := make(map[string]interface{}) - for k, v := range v { + *r = make(map[string]interface{}) + for k, v := range m { // bool if v != "0" && v != "1" { if vv, err := strconv.ParseBool(v); err == nil { - m[k] = vv + (*r)[k] = vv continue } } // int if vv, err := strconv.ParseInt(v, 10, 64); err == nil { - m[k] = vv + (*r)[k] = vv continue } // float if vv, err := strconv.ParseFloat(v, 64); err == nil { - m[k] = vv + (*r)[k] = vv continue } // others; to string - m[k] = v + (*r)[k] = v } - - records = append(records, m) - } - - return records, nil -} - -func FormatLtsvToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatLtsvToMap(data) - if err != nil { - return nil, err + } else { + if err := d.s.Err(); err != nil { + return err + } + return io.EOF } - return formatMapToArrowRecord(s.ArrowSchema, maps) + return d.s.Err() } diff --git a/record/ltsv_test.go b/record/ltsv_test.go index 3b0ff18..47a071a 100644 --- a/record/ltsv_test.go +++ b/record/ltsv_test.go @@ -1,11 +1,13 @@ package record import ( + "bytes" + "io" "reflect" "testing" ) -func TestFormatLtsvToMap(t *testing.T) { +func TestLtsvInnerDecoder_Decode(t *testing.T) { cases := []struct { input []byte expected []map[string]interface{} @@ -41,16 +43,29 @@ boolean:true int:2 long:2 float:2.2 double:2.2 bytes:bar string:bar`), // Not LTSV { input: []byte("not-valid-ltsv"), - expected: nil, + expected: []map[string]interface{}{}, isErr: true, }, } for _, c := range cases { - actual, err := FormatLtsvToMap(c.input) + buf := bytes.NewReader(c.input) + d := newLtsvInnerDecoder(buf) - if err != nil != c.isErr { + actual := make([]map[string]interface{}, 0) + var err error + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } + + if (err != nil && err != io.EOF) != c.isErr { t.Errorf("expected: %v, but actual: %v\n", c.isErr, err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/msgpack.go b/record/msgpack.go index 7200ed2..3f1ff31 100644 --- a/record/msgpack.go +++ b/record/msgpack.go @@ -1,43 +1,33 @@ package record import ( - "bytes" "fmt" "io" - "github.com/reproio/columnify/schema" - "github.com/vmihailenco/msgpack/v4" ) -func FormatMsgpackToMap(data []byte) ([]map[string]interface{}, error) { - d := msgpack.NewDecoder(bytes.NewReader(data)) - - maps := make([]map[string]interface{}, 0) - for { - arr, err := d.DecodeInterface() - if err == io.EOF { - break - } else if err != nil { - return nil, err - } - - m, mapOk := arr.(map[string]interface{}) - if !mapOk { - return nil, fmt.Errorf("invalid input %v: %w", arr, ErrUnconvertibleRecord) - } +type msgpackInnerDecoder struct { + d *msgpack.Decoder +} - maps = append(maps, m) +func newMsgpackInnerDecoder(r io.Reader) *msgpackInnerDecoder { + return &msgpackInnerDecoder{ + d: msgpack.NewDecoder(r), } - - return maps, nil } -func FormatMsgpackToArrow(s *schema.IntermediateSchema, data []byte) (*WrappedRecord, error) { - maps, err := FormatMsgpackToMap(data) +func (d *msgpackInnerDecoder) Decode(r *map[string]interface{}) error { + arr, err := d.d.DecodeInterface() if err != nil { - return nil, err + return err + } + + m, mapOk := arr.(map[string]interface{}) + if !mapOk { + return fmt.Errorf("invalid input %v: %w", arr, ErrUnconvertibleRecord) } + *r = m - return formatMapToArrowRecord(s.ArrowSchema, maps) + return nil } diff --git a/record/msgpack_test.go b/record/msgpack_test.go index 7206c35..dc360af 100644 --- a/record/msgpack_test.go +++ b/record/msgpack_test.go @@ -3,11 +3,12 @@ package record import ( "bytes" "errors" + "io" "reflect" "testing" ) -func TestFormatMsgpackToMap(t *testing.T) { +func TestMsgpackInnnerDecoder_Decode(t *testing.T) { cases := []struct { input []byte expected []map[string]interface{} @@ -48,22 +49,35 @@ func TestFormatMsgpackToMap(t *testing.T) { "string": "bar", }, }, - err: nil, + err: io.EOF, }, // Not map type { input: []byte("\xa7compact"), - expected: nil, + expected: []map[string]interface{}{}, err: ErrUnconvertibleRecord, }, } for _, c := range cases { - actual, err := FormatMsgpackToMap(c.input) + buf := bytes.NewReader(c.input) + d := newMsgpackInnerDecoder(buf) + + actual := make([]map[string]interface{}, 0) + var err error + for { + var v map[string]interface{} + err = d.Decode(&v) + if err != nil { + break + } + actual = append(actual, v) + } if !errors.Is(err, c.err) { t.Errorf("expected: %v, but actual: %v\n", c.err, err) + continue } if !reflect.DeepEqual(actual, c.expected) { diff --git a/record/record.go b/record/record.go index 9941380..c917870 100644 --- a/record/record.go +++ b/record/record.go @@ -1,8 +1,10 @@ package record import ( + "encoding/json" "errors" "fmt" + "io" "github.com/reproio/columnify/schema" ) @@ -21,53 +23,63 @@ var ( ErrUnconvertibleRecord = errors.New("input record is unable to convert") ) -func FormatToArrow(data []byte, s *schema.IntermediateSchema, recordType string) (*WrappedRecord, error) { +// innerDecoder decodes data from given Reader to the intermediate representation. +type innerDecoder interface { + // Decode reads input data via Reader and extract it to the argument. + // If there is no data left to be read, Read returns nil, io.EOF. + Decode(r *map[string]interface{}) error +} + +// jsonStringConverter converts data with innerDecoder and returns JSON string value. +type jsonStringConverter struct { + inner innerDecoder +} + +func NewJsonStringConverter(r io.Reader, s *schema.IntermediateSchema, recordType string) (*jsonStringConverter, error) { + var inner innerDecoder + var err error + switch recordType { case RecordTypeAvro: - return FormatAvroToArrow(s, data) + inner, err = newAvroInnerDecoder(r) case RecordTypeCsv: - return FormatCsvToArrow(s, data, CsvDelimiter) + inner, err = newCsvInnerDecoder(r, s, CsvDelimiter) case RecordTypeJsonl: - return FormatJsonlToArrow(s, data) + inner = newJsonlInnerDecoder(r) case RecordTypeLtsv: - return FormatLtsvToArrow(s, data) + inner = newLtsvInnerDecoder(r) case RecordTypeMsgpack: - return FormatMsgpackToArrow(s, data) + inner = newMsgpackInnerDecoder(r) case RecordTypeTsv: - return FormatCsvToArrow(s, data, TsvDelimiter) + inner, err = newCsvInnerDecoder(r, s, TsvDelimiter) default: return nil, fmt.Errorf("unsupported record type %s: %w", recordType, ErrUnsupportedRecord) } -} - -// FormatToMap converts input data to map based data with given schema. -func FormatToMap(data []byte, s *schema.IntermediateSchema, recordType string) ([]map[string]interface{}, error) { - switch recordType { - case RecordTypeAvro: - return FormatAvroToMap(data) - - case RecordTypeCsv: - return FormatCsvToMap(s, data, CsvDelimiter) - case RecordTypeJsonl: - return FormatJsonlToMap(data) - - case RecordTypeLtsv: - return FormatLtsvToMap(data) + return &jsonStringConverter{ + inner: inner, + }, err +} - case RecordTypeMsgpack: - return FormatMsgpackToMap(data) +func (d *jsonStringConverter) Convert(v *string) error { + var vv map[string]interface{} - case RecordTypeTsv: - return FormatCsvToMap(s, data, TsvDelimiter) + err := d.inner.Decode(&vv) + if err != nil { + return err + } - default: - return nil, fmt.Errorf("unsupported record type %s: %w", recordType, ErrUnsupportedRecord) + data, err := json.Marshal(vv) + if err != nil { + return err } + *v = string(data) + + return nil } diff --git a/record/record_test.go b/record/record_test.go index d324d82..2dde443 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -1,60 +1,44 @@ package record import ( - "errors" + "encoding/json" "testing" - - "github.com/reproio/columnify/schema" ) -func TestFormatToArrow(t *testing.T) { - cases := []struct { - input []byte - schema *schema.IntermediateSchema - recordType string - err error - }{ - // TODO valid cases +type nopInnerDecoder struct { + r map[string]interface{} + err error +} - { - input: nil, - schema: nil, - recordType: "Unknown", - err: ErrUnsupportedRecord, +func (d *nopInnerDecoder) Decode(r *map[string]interface{}) error { + *r = d.r + return d.err +} + +func TestJsonStringConverter_Convert(t *testing.T) { + inner := &nopInnerDecoder{ + r: map[string]interface{}{ + "key1": 42, + "key2": "test", }, + err: nil, } - for _, c := range cases { - _, err := FormatToArrow(c.input, c.schema, c.recordType) - - if !errors.Is(err, c.err) { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } + d := jsonStringConverter{ + inner: inner, } -} - -func TestFormatToMap(t *testing.T) { - cases := []struct { - input []byte - schema *schema.IntermediateSchema - recordType string - err error - }{ - // TODO valid cases - { - input: nil, - schema: nil, - recordType: "Unknown", - err: ErrUnsupportedRecord, - }, + var v string + err := d.Convert(&v) + if err != nil { + t.Fatalf("expected no error, but actual: %v\n", err) } - for _, c := range cases { - _, err := FormatToMap(c.input, c.schema, c.recordType) - - if !errors.Is(err, c.err) { - t.Errorf("expected: %v, but actual: %v\n", c.err, err) - } + data, err := json.Marshal(inner.r) + if err != nil { + t.Fatalf("expected no error, but actual: %v\n", err) + } + if v != string(data) { + t.Fatalf("expected: %v, but actual: %v\n", string(data), v) } }