From b549cbf0fcd221139d183c72f50893621bca4e3d Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Sun, 10 May 2020 23:53:24 +0900 Subject: [PATCH 1/2] Update parquet-go to latest --- go.mod | 2 +- go.sum | 4 ++-- parquet/parquet.go | 14 +------------- 3 files changed, 4 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 338015e..864a00a 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,6 @@ require ( github.com/apache/arrow/go/arrow v0.0.0-20200504153628-d13e8f3ed647 github.com/linkedin/goavro/v2 v2.9.7 github.com/vmihailenco/msgpack/v4 v4.3.11 - github.com/xitongsys/parquet-go v1.5.1 + github.com/xitongsys/parquet-go v1.5.2 github.com/xitongsys/parquet-go-source v0.0.0-20200225073416-429277801fe4 ) diff --git a/go.sum b/go.sum index 28a8902..82dae0b 100644 --- a/go.sum +++ b/go.sum @@ -107,8 +107,8 @@ github.com/vmihailenco/msgpack/v4 v4.3.11 h1:Q47CePddpNGNhk4GCnAx9DDtASi2rasatE0 github.com/vmihailenco/msgpack/v4 v4.3.11/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4= github.com/vmihailenco/tagparser v0.1.1 h1:quXMXlA39OCbd2wAdTsGDlK9RkOk6Wuw+x37wVyIuWY= github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI= -github.com/xitongsys/parquet-go v1.5.1 h1:GFjQXrFmqI2XvmAaj7k73QtW3eECFVwaLX2/Mv3Fnuo= -github.com/xitongsys/parquet-go v1.5.1/go.mod h1:xUxwM8ELydxh4edHGegYq1pA8NnMKDx0K/GyB0o2bww= +github.com/xitongsys/parquet-go v1.5.2 h1:t8kVBM+7jPIbM+9ptrpZajWV1lOyHHVIQkTRUTlbK84= +github.com/xitongsys/parquet-go v1.5.2/go.mod h1:90swTgY6VkNM4MkMDsNxq8h30m6Yj1Arv9UMEl5V5DM= github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod h1:xxCx7Wpym/3QCo6JhujJX51dzSXrwmb0oH6FQb39SEA= github.com/xitongsys/parquet-go-source v0.0.0-20200225073416-429277801fe4 h1:KvGGKrTAA489Xkfw1xwz59bj3hH50hC6HjG3Sby+aa4= github.com/xitongsys/parquet-go-source v0.0.0-20200225073416-429277801fe4/go.mod h1:xxCx7Wpym/3QCo6JhujJX51dzSXrwmb0oH6FQb39SEA= diff --git a/parquet/parquet.go b/parquet/parquet.go index 8ca24b3..cba88ca 100644 --- a/parquet/parquet.go +++ b/parquet/parquet.go @@ -6,7 +6,6 @@ import ( "github.com/xitongsys/parquet-go/common" "github.com/xitongsys/parquet-go/layout" - "github.com/xitongsys/parquet-go/parquet" "github.com/xitongsys/parquet-go/schema" ) @@ -42,23 +41,12 @@ func prepareTables(schemaHandler *schema.SchemaHandler) (map[string]*layout.Tabl return nil, err } - var tpe parquet.Type - if index, ok := schemaHandler.MapIndex[pathStr]; ok { - if int(index) < len(schemaHandler.SchemaElements) { - tpe = schemaHandler.SchemaElements[index].GetType() - } else { - return nil, fmt.Errorf("invalid index %v to schema elements %v: %w", index, schemaHandler.SchemaElements, ErrInvalidParquetSchema) - } - } else { - return nil, fmt.Errorf("invalid schema key %v: %w", pathStr, ErrInvalidParquetSchema) - } - tables[pathStr] = &layout.Table{ Path: path, MaxDefinitionLevel: maxDefinitionLevel, MaxRepetitionLevel: maxRepetitionLevel, RepetitionType: e.GetRepetitionType(), - Type: tpe, + Schema: schemaHandler.SchemaElements[schemaHandler.MapIndex[pathStr]], Info: schemaHandler.Infos[i], } } From ee00c83f46005c9c51b33b454d53137d5894ed24 Mon Sep 17 00:00:00 2001 From: Ryo Okubo Date: Mon, 11 May 2020 00:35:25 +0900 Subject: [PATCH 2/2] Add schema/record mismatching case --- Makefile | 4 +++- columnifier/parquet_test.go | 16 ++++++++++++++++ testdata/mismatch.avsc | 7 +++++++ 3 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 testdata/mismatch.avsc diff --git a/Makefile b/Makefile index fa422ad..3b76e5f 100644 --- a/Makefile +++ b/Makefile @@ -15,9 +15,11 @@ clean: fmt: gofmt -w **/*.go +# TODO Enable -race after we resolve data race in parquet-go +# ref. https://github.com/xitongsys/parquet-go/issues/256 .PHONY: test test: - go test -race -cover ./... + go test -cover ./... .PHONY: it it: build diff --git a/columnifier/parquet_test.go b/columnifier/parquet_test.go index 7dcd86e..6f878c8 100644 --- a/columnifier/parquet_test.go +++ b/columnifier/parquet_test.go @@ -95,6 +95,22 @@ func TestWriteClose(t *testing.T) { isErr: true, }, + // Mismatch schema & record + { + st: schema.SchemaTypeAvro, + sf: "../testdata/mismatch.avsc", + rt: record.RecordTypeJsonl, + config: Config{ + Parquet: Parquet{ + PageSize: 8 * 1024, + RowGroupSize: 128 * 1024 * 1024, + CompressionCodec: parquet.CompressionCodec_SNAPPY, + }, + }, + input: "../testdata/primitives.jsonl", + isErr: true, + }, + // Valid { st: schema.SchemaTypeAvro, diff --git a/testdata/mismatch.avsc b/testdata/mismatch.avsc new file mode 100644 index 0000000..506e63a --- /dev/null +++ b/testdata/mismatch.avsc @@ -0,0 +1,7 @@ +{ + "type": "record", + "name": "Mismatch", + "fields" : [ + {"name": "f", "type": "bytes"} + ] +}