diff --git a/arrow/doc.go b/arrow/doc.go new file mode 100644 index 0000000..c4aee8b --- /dev/null +++ b/arrow/doc.go @@ -0,0 +1,11 @@ +/* + Package arrow is an extension for Go Arrow implementation. + https://github.com/apache/arrow/tree/master/go/arrow + + Go Arrow package still has some missing parts which we required, so + we fill it in this package our own. The package structure considers to + Arrow official's. + see also https://github.com/apache/arrow/blob/master/docs/source/status.rst + +*/ +package arrow diff --git a/arrow/json/writer.go b/arrow/json/writer.go new file mode 100644 index 0000000..fa0a867 --- /dev/null +++ b/arrow/json/writer.go @@ -0,0 +1,338 @@ +package json + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "strings" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" +) + +var ( + ErrMismatchFields = errors.New("arrow/json: number of records mismatch") + ErrUnsupportedType = errors.New("arrow/json: unsupported type") +) + +// JsonEncoder wraps encoding/json.Encoder and writes array.Record based on a schema. +type Encoder struct { + e *json.Encoder + schema *arrow.Schema +} + +// NewWriter returns a writer that writes array.Records to the CSV file +// with the given schema. +// +// NewWriter panics if the given schema contains fields that have types that are not +// primitive types. +func NewWriter(w io.Writer, schema *arrow.Schema) *Encoder { + ww := &Encoder{ + e: json.NewEncoder(w), + schema: schema, + } + + return ww +} + +func (e *Encoder) Schema() *arrow.Schema { return e.schema } + +// Write writes a single Record as one row to the JSON file +func (e *Encoder) Write(record array.Record) error { + if !record.Schema().Equal(e.schema) { + return ErrMismatchFields + } + + recs := make([]map[string]interface{}, record.NumRows()) + for i := range recs { + recs[i] = make(map[string]interface{}, record.NumCols()) + } + + for i, col := range record.Columns() { + if err := writeData(col.Data(), &recs, []string{e.schema.Field(i).Name}); err != nil { + return err + } + } + + return e.e.Encode(recs) +} + +func writeData(data *array.Data, recs *[]map[string]interface{}, names []string) error { + switch data.DataType().ID() { + case arrow.BOOL: + arr := array.NewBooleanData(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.INT8: + arr := array.NewInt8Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.INT16: + arr := array.NewInt16Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.INT32: + arr := array.NewInt32Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.INT64: + arr := array.NewInt64Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.UINT8: + arr := array.NewUint8Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.UINT16: + arr := array.NewUint16Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.UINT32: + arr := array.NewUint32Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.UINT64: + arr := array.NewUint64Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.FLOAT32: + arr := array.NewFloat32Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.FLOAT64: + arr := array.NewFloat64Data(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.STRING: + arr := array.NewStringData(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.BINARY: + arr := array.NewBinaryData(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + if err := deepSet(&(*recs)[i], names, arr.Value(i)); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.STRUCT: + arr := array.NewStructData(data) + defer arr.Release() + st, stOk := arr.DataType().(*arrow.StructType) + if !stOk { + return fmt.Errorf("unsupported data type %v: %w", arr.DataType(), ErrUnsupportedType) + } + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + for i := 0; i < arr.NumField(); i++ { + n := st.Field(i).Name + d := arr.Field(i).Data() + if err := writeData(d, recs, append(names, n)); err != nil { + return err + } + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + case arrow.LIST: + arr := array.NewListData(data) + defer arr.Release() + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + o := i + arr.Offset() + bgn := int64(arr.Offsets()[o]) + end := int64(arr.Offsets()[o+1]) + slice := array.NewSlice(arr.ListValues(), bgn, end) + if err := writeData(slice.Data(), recs, names); err != nil { + return err + } + } else { + if err := deepSet(&(*recs)[i], names, nil); err != nil { + return err + } + } + } + + default: + return ErrUnsupportedType + } + + return nil +} + +func deepSet(recv *map[string]interface{}, keys []string, value interface{}) error { + cur := *recv + numKeys := len(keys) + + if numKeys > 1 { + for _, k := range keys[:numKeys-1] { + sub, subOk := (*recv)[k] + if !subOk { + return fmt.Errorf("no entry to %v", strings.Join(keys, ".")) + } + + typed, typedOk := sub.(map[string]interface{}) + if !typedOk { + return fmt.Errorf("unexpected type of value %v", sub) + } + + cur = typed + } + } + + if vv, ok := cur[keys[numKeys-1]]; ok { + if arr, arrOk := vv.([]interface{}); arrOk { + cur[keys[numKeys-1]] = append(arr, value) + } else { + cur[keys[numKeys-1]] = []interface{}{vv, value} + } + } else { + cur[keys[numKeys-1]] = value + } + + return nil +} diff --git a/arrow/json/writer_test.go b/arrow/json/writer_test.go new file mode 100644 index 0000000..e3d33f2 --- /dev/null +++ b/arrow/json/writer_test.go @@ -0,0 +1,176 @@ +package json + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "strings" + "testing" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" +) + +func Example_writer() { + f := new(bytes.Buffer) + + pool := memory.NewGoAllocator() + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "i64", Type: arrow.PrimitiveTypes.Int64}, + {Name: "f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "str", Type: arrow.BinaryTypes.String}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int64Builder).AppendValues([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) + b.Field(2).(*array.StringBuilder).AppendValues([]string{"str-0", "str-1", "str-2", "str-3", "str-4", "str-5", "str-6", "str-7", "str-8", "str-9"}, nil) + + rec := b.NewRecord() + defer rec.Release() + + w := NewWriter(f, schema) + err := w.Write(rec) + if err != nil { + log.Fatal(err) + } +} + +func TestJsonWriter(t *testing.T) { + tests := []struct { + name string + }{{ + name: "Primitives", + }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testJsonWriter(t) + }) + } +} + +func testJsonWriter(t *testing.T) { + f := new(bytes.Buffer) + + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "bool", Type: arrow.FixedWidthTypes.Boolean}, + {Name: "i8", Type: arrow.PrimitiveTypes.Int8}, + {Name: "i16", Type: arrow.PrimitiveTypes.Int16}, + {Name: "i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "i64", Type: arrow.PrimitiveTypes.Int64}, + {Name: "u8", Type: arrow.PrimitiveTypes.Uint8}, + {Name: "u16", Type: arrow.PrimitiveTypes.Uint16}, + {Name: "u32", Type: arrow.PrimitiveTypes.Uint32}, + {Name: "u64", Type: arrow.PrimitiveTypes.Uint64}, + {Name: "f32", Type: arrow.PrimitiveTypes.Float32}, + {Name: "f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "str", Type: arrow.BinaryTypes.String}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{true, false, true}, nil) + b.Field(1).(*array.Int8Builder).AppendValues([]int8{-1, 0, 1}, nil) + b.Field(2).(*array.Int16Builder).AppendValues([]int16{-1, 0, 1}, nil) + b.Field(3).(*array.Int32Builder).AppendValues([]int32{-1, 0, 1}, nil) + b.Field(4).(*array.Int64Builder).AppendValues([]int64{-1, 0, 1}, nil) + b.Field(5).(*array.Uint8Builder).AppendValues([]uint8{0, 1, 2}, nil) + b.Field(6).(*array.Uint16Builder).AppendValues([]uint16{0, 1, 2}, nil) + b.Field(7).(*array.Uint32Builder).AppendValues([]uint32{0, 1, 2}, nil) + b.Field(8).(*array.Uint64Builder).AppendValues([]uint64{0, 1, 2}, nil) + b.Field(9).(*array.Float32Builder).AppendValues([]float32{0.0, 0.1, 0.2}, nil) + b.Field(10).(*array.Float64Builder).AppendValues([]float64{0.0, 0.1, 0.2}, nil) + b.Field(11).(*array.StringBuilder).AppendValues([]string{"str-0", "str-1", "str-2"}, nil) + + for _, field := range b.Fields() { + field.AppendNull() + } + + rec := b.NewRecord() + defer rec.Release() + + w := NewWriter(f, schema) + err := w.Write(rec) + if err != nil { + t.Fatal(err) + } + + want := strings.ReplaceAll(`[ +{"bool":true,"f32":0,"f64":0,"i16":-1,"i32":-1,"i64":-1,"i8":-1,"str":"str-0","u16":0,"u32":0,"u64":0,"u8":0}, +{"bool":false,"f32":0.1,"f64":0.1,"i16":0,"i32":0,"i64":0,"i8":0,"str":"str-1","u16":1,"u32":1,"u64":1,"u8":1}, +{"bool":true,"f32":0.2,"f64":0.2,"i16":1,"i32":1,"i64":1,"i8":1,"str":"str-2","u16":2,"u32":2,"u64":2,"u8":2}, +{"bool":null,"f32":null,"f64":null,"i16":null,"i32":null,"i64":null,"i8":null,"str":null,"u16":null,"u32":null,"u64":null,"u8":null}] +`, "\n", "") + "\n" + + if got, want := f.String(), want; strings.Compare(got, want) != 0 { + t.Fatalf("invalid output:\ngot=%s\nwant=%s\n", got, want) + } +} + +func BenchmarkWrite(b *testing.B) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(b, 0) + + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "bool", Type: arrow.FixedWidthTypes.Boolean}, + {Name: "i8", Type: arrow.PrimitiveTypes.Int8}, + {Name: "i16", Type: arrow.PrimitiveTypes.Int16}, + {Name: "i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "i64", Type: arrow.PrimitiveTypes.Int64}, + {Name: "u8", Type: arrow.PrimitiveTypes.Uint8}, + {Name: "u16", Type: arrow.PrimitiveTypes.Uint16}, + {Name: "u32", Type: arrow.PrimitiveTypes.Uint32}, + {Name: "u64", Type: arrow.PrimitiveTypes.Uint64}, + {Name: "f32", Type: arrow.PrimitiveTypes.Float32}, + {Name: "f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "str", Type: arrow.BinaryTypes.String}, + }, + nil, + ) + + bldr := array.NewRecordBuilder(pool, schema) + defer bldr.Release() + + const N = 1000 + for i := 0; i < N; i++ { + bldr.Field(0).(*array.BooleanBuilder).Append(i%10 == 0) + bldr.Field(1).(*array.Int8Builder).Append(int8(i)) + bldr.Field(2).(*array.Int16Builder).Append(int16(i)) + bldr.Field(3).(*array.Int32Builder).Append(int32(i)) + bldr.Field(4).(*array.Int64Builder).Append(int64(i)) + bldr.Field(5).(*array.Uint8Builder).Append(uint8(i)) + bldr.Field(6).(*array.Uint16Builder).Append(uint16(i)) + bldr.Field(7).(*array.Uint32Builder).Append(uint32(i)) + bldr.Field(8).(*array.Uint64Builder).Append(uint64(i)) + bldr.Field(9).(*array.Float32Builder).Append(float32(i)) + bldr.Field(10).(*array.Float64Builder).Append(float64(i)) + bldr.Field(11).(*array.StringBuilder).Append(fmt.Sprintf("str-%d", i)) + } + + rec := bldr.NewRecord() + defer rec.Release() + + w := NewWriter(ioutil.Discard, schema) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := w.Write(rec) + if err != nil { + b.Fatal(err) + } + } +}