From f2b93b4cfbe22b06f6b901799a32d8c5a7116e38 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Fri, 10 Jul 2020 13:14:55 +0900 Subject: [PATCH] wip --- arrow/json/writer.go | 169 ++++++++++++++++++++++++++++++++++++ arrow/json/writer_test.go | 176 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 345 insertions(+) create mode 100644 arrow/json/writer.go create mode 100644 arrow/json/writer_test.go diff --git a/arrow/json/writer.go b/arrow/json/writer.go new file mode 100644 index 0000000..381d0b4 --- /dev/null +++ b/arrow/json/writer.go @@ -0,0 +1,169 @@ +package json + +import ( + "encoding/json" + "errors" + "io" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" +) + +var ( + ErrMismatchFields = errors.New("arrow/json: number of records mismatch") +) + +// JsonEncoder wraps encoding/json.Encoder and writes array.Record based on a schema. +type Encoder struct { + e *json.Encoder + schema *arrow.Schema +} + +// NewWriter returns a writer that writes array.Records to the CSV file +// with the given schema. +// +// NewWriter panics if the given schema contains fields that have types that are not +// primitive types. +func NewWriter(w io.Writer, schema *arrow.Schema) *Encoder { + // TODO + // validate(schema) + + ww := &Encoder{ + e: json.NewEncoder(w), + schema: schema, + } + + return ww +} + +func (e *Encoder) Schema() *arrow.Schema { return e.schema } + +// Write writes a single Record as one row to the JSON file +func (e *Encoder) Write(record array.Record) error { + if !record.Schema().Equal(e.schema) { + return ErrMismatchFields + } + + recs := make([]map[string]interface{}, record.NumRows()) + for i := range recs { + recs[i] = make(map[string]interface{}, record.NumCols()) + } + + for j, col := range record.Columns() { + field := e.schema.Field(j) + switch field.Type.(type) { + case *arrow.BooleanType: + arr := col.(*array.Boolean) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Int8Type: + arr := col.(*array.Int8) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Int16Type: + arr := col.(*array.Int16) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Int32Type: + arr := col.(*array.Int32) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Int64Type: + arr := col.(*array.Int64) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Uint8Type: + arr := col.(*array.Uint8) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Uint16Type: + arr := col.(*array.Uint16) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Uint32Type: + arr := col.(*array.Uint32) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Uint64Type: + arr := col.(*array.Uint64) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Float32Type: + arr := col.(*array.Float32) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.Float64Type: + arr := col.(*array.Float64) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + case *arrow.StringType: + arr := col.(*array.String) + for i := 0; i < arr.Len(); i++ { + if arr.IsValid(i) { + recs[i][field.Name] = arr.Value(i) + } else { + recs[i][field.Name] = nil + } + } + + // TODO more types + } + } + + return e.e.Encode(recs) +} diff --git a/arrow/json/writer_test.go b/arrow/json/writer_test.go new file mode 100644 index 0000000..d808b38 --- /dev/null +++ b/arrow/json/writer_test.go @@ -0,0 +1,176 @@ +package json + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "strings" + "testing" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" +) + +func Example_writer() { + f := new(bytes.Buffer) + + pool := memory.NewGoAllocator() + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "i64", Type: arrow.PrimitiveTypes.Int64}, + {Name: "f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "str", Type: arrow.BinaryTypes.String}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int64Builder).AppendValues([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) + b.Field(2).(*array.StringBuilder).AppendValues([]string{"str-0", "str-1", "str-2", "str-3", "str-4", "str-5", "str-6", "str-7", "str-8", "str-9"}, nil) + + rec := b.NewRecord() + defer rec.Release() + + w := NewWriter(f, schema) + err := w.Write(rec) + if err != nil { + log.Fatal(err) + } +} + +func TestCSVWriter(t *testing.T) { + tests := []struct { + name string + }{{ + name: "Primitives", + }} + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testCSVWriter(t) + }) + } +} + +func testCSVWriter(t *testing.T) { + f := new(bytes.Buffer) + + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "bool", Type: arrow.FixedWidthTypes.Boolean}, + {Name: "i8", Type: arrow.PrimitiveTypes.Int8}, + {Name: "i16", Type: arrow.PrimitiveTypes.Int16}, + {Name: "i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "i64", Type: arrow.PrimitiveTypes.Int64}, + {Name: "u8", Type: arrow.PrimitiveTypes.Uint8}, + {Name: "u16", Type: arrow.PrimitiveTypes.Uint16}, + {Name: "u32", Type: arrow.PrimitiveTypes.Uint32}, + {Name: "u64", Type: arrow.PrimitiveTypes.Uint64}, + {Name: "f32", Type: arrow.PrimitiveTypes.Float32}, + {Name: "f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "str", Type: arrow.BinaryTypes.String}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.BooleanBuilder).AppendValues([]bool{true, false, true}, nil) + b.Field(1).(*array.Int8Builder).AppendValues([]int8{-1, 0, 1}, nil) + b.Field(2).(*array.Int16Builder).AppendValues([]int16{-1, 0, 1}, nil) + b.Field(3).(*array.Int32Builder).AppendValues([]int32{-1, 0, 1}, nil) + b.Field(4).(*array.Int64Builder).AppendValues([]int64{-1, 0, 1}, nil) + b.Field(5).(*array.Uint8Builder).AppendValues([]uint8{0, 1, 2}, nil) + b.Field(6).(*array.Uint16Builder).AppendValues([]uint16{0, 1, 2}, nil) + b.Field(7).(*array.Uint32Builder).AppendValues([]uint32{0, 1, 2}, nil) + b.Field(8).(*array.Uint64Builder).AppendValues([]uint64{0, 1, 2}, nil) + b.Field(9).(*array.Float32Builder).AppendValues([]float32{0.0, 0.1, 0.2}, nil) + b.Field(10).(*array.Float64Builder).AppendValues([]float64{0.0, 0.1, 0.2}, nil) + b.Field(11).(*array.StringBuilder).AppendValues([]string{"str-0", "str-1", "str-2"}, nil) + + for _, field := range b.Fields() { + field.AppendNull() + } + + rec := b.NewRecord() + defer rec.Release() + + w := NewWriter(f, schema) + err := w.Write(rec) + if err != nil { + t.Fatal(err) + } + + want := strings.ReplaceAll(`[ +{"bool":true,"f32":0,"f64":0,"i16":-1,"i32":-1,"i64":-1,"i8":-1,"str":"str-0","u16":0,"u32":0,"u64":0,"u8":0}, +{"bool":false,"f32":0.1,"f64":0.1,"i16":0,"i32":0,"i64":0,"i8":0,"str":"str-1","u16":1,"u32":1,"u64":1,"u8":1} +,{"bool":true,"f32":0.2,"f64":0.2,"i16":1,"i32":1,"i64":1,"i8":1,"str":"str-2","u16":2,"u32":2,"u64":2,"u8":2}, +{"bool":null,"f32":null,"f64":null,"i16":null,"i32":null,"i64":null,"i8":null,"str":null,"u16":null,"u32":null,"u64":null,"u8":null}] +`, "\n", "") + "\n" + + if got, want := f.String(), want; strings.Compare(got, want) != 0 { + t.Fatalf("invalid output:\ngot=%s\nwant=%s\n", got, want) + } +} + +func BenchmarkWrite(b *testing.B) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(b, 0) + + schema := arrow.NewSchema( + []arrow.Field{ + {Name: "bool", Type: arrow.FixedWidthTypes.Boolean}, + {Name: "i8", Type: arrow.PrimitiveTypes.Int8}, + {Name: "i16", Type: arrow.PrimitiveTypes.Int16}, + {Name: "i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "i64", Type: arrow.PrimitiveTypes.Int64}, + {Name: "u8", Type: arrow.PrimitiveTypes.Uint8}, + {Name: "u16", Type: arrow.PrimitiveTypes.Uint16}, + {Name: "u32", Type: arrow.PrimitiveTypes.Uint32}, + {Name: "u64", Type: arrow.PrimitiveTypes.Uint64}, + {Name: "f32", Type: arrow.PrimitiveTypes.Float32}, + {Name: "f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "str", Type: arrow.BinaryTypes.String}, + }, + nil, + ) + + bldr := array.NewRecordBuilder(pool, schema) + defer bldr.Release() + + const N = 1000 + for i := 0; i < N; i++ { + bldr.Field(0).(*array.BooleanBuilder).Append(i%10 == 0) + bldr.Field(1).(*array.Int8Builder).Append(int8(i)) + bldr.Field(2).(*array.Int16Builder).Append(int16(i)) + bldr.Field(3).(*array.Int32Builder).Append(int32(i)) + bldr.Field(4).(*array.Int64Builder).Append(int64(i)) + bldr.Field(5).(*array.Uint8Builder).Append(uint8(i)) + bldr.Field(6).(*array.Uint16Builder).Append(uint16(i)) + bldr.Field(7).(*array.Uint32Builder).Append(uint32(i)) + bldr.Field(8).(*array.Uint64Builder).Append(uint64(i)) + bldr.Field(9).(*array.Float32Builder).Append(float32(i)) + bldr.Field(10).(*array.Float64Builder).Append(float64(i)) + bldr.Field(11).(*array.StringBuilder).Append(fmt.Sprintf("str-%d", i)) + } + + rec := bldr.NewRecord() + defer rec.Release() + + w := NewWriter(ioutil.Discard, schema) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := w.Write(rec) + if err != nil { + b.Fatal(err) + } + } +}