Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#45 Implement arrow -> json writer #47

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ jobs:
- name: Integration test
run: make it

- name: Benchmark
run: |
make bench
go tool pprof -top cpu.out
go tool pprof -top mem.out

- uses: codecov/codecov-action@v1
with:
file: ./cover.out
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ it: build
./columnify -schemaType bigquery -schemaFile columnifier/testdata/schema/array.bq.json -recordType jsonl columnifier/testdata/record/array.jsonl > /dev/null
./columnify -schemaType bigquery -schemaFile columnifier/testdata/schema/array.bq.json -recordType msgpack columnifier/testdata/record/array.msgpack > /dev/null

# NOTE A large number of -benchtime or default value occurs too long testing time
.PHONY: bench
bench:
go test -bench . -v ./columnifier -benchtime 100000x -benchmem -cpuprofile cpu.out -memprofile mem.out

# Set GITHUB_TOKEN and create release git tag
.PHONY: release
release:
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,16 @@ $ parquet-tools cat -json out.parquet

Currently it has some limitations from schema/record types.

- Some logical types like Decimal are unsupported.
- Some logical types like `Decimal` are unsupported.
- If using `-recordType = avro`, it doesn't support a nested record has only 1 sub field.
- If using `-recordType = avro`, it converts bytes fields to base64 encoded value implicitly.
- The supported values have limitations with considering to record types, e.g. if you use `jsonl`, it might not be able to handle a large value.

## Development

`Columnifier` reads input file(s), converts format based on given parameter, finally writes output files.
Format conversion is separated by schema / record. The `schema` conversion accepts input schema, then converts it to targer's via Arrow's schema. The `record` conversion is similar to schema's but intermediate is simply `map[string]interface{}`, because Arrow record isn't available as an intermediate.
`columnify` basically depends on existing modules but it contains additional modules like `avro`, `parquet` to fill insufficient features.
Format conversion is separated by schema / record. The `schema` conversion accepts input schema, then converts it to targets via Arrow's schema. And also the `record` conversion uses Arrow's Record as the intermediate data representation.
`columnify` basically depends on existing modules but it contains additional modules like `arrow`, `avro`, `parquet` to fill insufficient features.

## Release

Expand Down
11 changes: 11 additions & 0 deletions arrow/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
Package arrow is an extension for Go Arrow implementation.
https://github.com/apache/arrow/tree/master/go/arrow

Go Arrow package still has some missing parts which we required, so
we fill it in this package our own. The package structure considers to
Arrow official's.
see also https://github.com/apache/arrow/blob/master/docs/source/status.rst

*/
package arrow
317 changes: 317 additions & 0 deletions arrow/json/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
package json

import (
"encoding/json"
"errors"
"fmt"
"io"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
)

var (
ErrMismatchFields = errors.New("arrow/json: number of records mismatch")
ErrUnsupportedType = errors.New("arrow/json: unsupported type")
)

// JsonEncoder wraps encoding/json.Encoder and writes array.Record based on a schema.
type Encoder struct {
e *json.Encoder
schema *arrow.Schema
}

// NewWriter returns a writer that writes array.Records to the CSV file
// with the given schema.
//
// NewWriter panics if the given schema contains fields that have types that are not
// primitive types.
func NewWriter(w io.Writer, schema *arrow.Schema) *Encoder {
ww := &Encoder{
e: json.NewEncoder(w),
schema: schema,
}

return ww
}

func (e *Encoder) Schema() *arrow.Schema { return e.schema }

// Write writes a single Record as one row to the JSON file
func (e *Encoder) Write(record array.Record) error {
if !record.Schema().Equal(e.schema) {
return ErrMismatchFields
}

recs := make([]map[string]interface{}, record.NumRows())
for i := range recs {
recs[i] = make(map[string]interface{}, record.NumCols())
}

for i, col := range record.Columns() {
values, err := convertToGo(col.Data())
if err != nil {
return err
}
for j, v := range values {
recs[j][e.schema.Field(i).Name] = v
}
}

return e.e.Encode(recs)
}

// convertToGo converts Arrow values to Go typed values.
func convertToGo(data *array.Data) ([]interface{}, error) {
recs := make([]interface{}, 0, data.Len())

switch data.DataType().ID() {
case arrow.BOOL:
arr := array.NewBooleanData(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.INT8:
arr := array.NewInt8Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.INT16:
arr := array.NewInt16Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.INT32:
arr := array.NewInt32Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.INT64:
arr := array.NewInt64Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.UINT8:
arr := array.NewUint8Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.UINT16:
arr := array.NewUint16Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.UINT32:
arr := array.NewUint32Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.UINT64:
arr := array.NewUint64Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.FLOAT32:
arr := array.NewFloat32Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.FLOAT64:
arr := array.NewFloat64Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.STRING:
arr := array.NewStringData(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.BINARY:
arr := array.NewBinaryData(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.DATE32:
arr := array.NewDate32Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.DATE64:
arr := array.NewDate64Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.TIME32:
arr := array.NewTime32Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.TIME64:
arr := array.NewTime64Data(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.TIMESTAMP:
arr := array.NewTimestampData(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, arr.Value(i))
} else {
recs = append(recs, nil)
}
}

case arrow.STRUCT:
arr := array.NewStructData(data)
defer arr.Release()
st, stOk := arr.DataType().(*arrow.StructType)
if !stOk {
return nil, fmt.Errorf("unsupported data type %v: %w", arr.DataType(), ErrUnsupportedType)
}
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
recs = append(recs, make(map[string]interface{}, arr.NumField()))
} else {
recs = append(recs, nil)
}
}
for i := 0; i < arr.NumField(); i++ {
values, err := convertToGo(arr.Field(i).Data())
if err != nil {
return nil, err
}
for j, v := range values {
if arr.IsValid(j) {
if r, ok := recs[j].(map[string]interface{}); ok {
r[st.Field(i).Name] = v
}
}
}
}

case arrow.LIST:
arr := array.NewListData(data)
defer arr.Release()
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
o := i + arr.Offset()
bgn := int64(arr.Offsets()[o])
end := int64(arr.Offsets()[o+1])
slice := array.NewSlice(arr.ListValues(), bgn, end)
defer slice.Release()
values, err := convertToGo(slice.Data())
if err != nil {
return nil, err
}
recs = append(recs, values)
} else {
recs = append(recs, nil)
}
}
}

return recs, nil
}
Loading