From ab21ae780e565f08c43e1fa281bb8880c04f199f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 5 Feb 2024 09:26:29 +0000 Subject: [PATCH 1/2] go.mod: bump github.com/hamba/avro/v2 from 2.18.0 to 2.19.0 Bumps [github.com/hamba/avro/v2](https://github.com/hamba/avro) from 2.18.0 to 2.19.0. - [Release notes](https://github.com/hamba/avro/releases) - [Commits](https://github.com/hamba/avro/compare/v2.18.0...v2.19.0) --- updated-dependencies: - dependency-name: github.com/hamba/avro/v2 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 42959d4e0..9f342dbed 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 - github.com/hamba/avro/v2 v2.18.0 + github.com/hamba/avro/v2 v2.19.0 github.com/hashicorp/go-hclog v1.6.2 github.com/hashicorp/go-plugin v1.6.0 github.com/jackc/pgx/v5 v5.5.3 diff --git a/go.sum b/go.sum index 1c36b7a41..06c889d2d 100644 --- a/go.sum +++ b/go.sum @@ -1494,8 +1494,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4Zs github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= -github.com/hamba/avro/v2 v2.18.0 h1:U7T0xI8MGw9+m3SS48E2KHUxas/Hb0EvS0CpkmVcLoI= -github.com/hamba/avro/v2 v2.18.0/go.mod h1:dEG+AHrykTpkXvBYsc+XXTuRlvGC645Ix5d2qR8EdEs= +github.com/hamba/avro/v2 v2.19.0 h1:jITwvb03UMLfTFHFKdvaMyU/G96iVWS5EiMsqo3flfE= +github.com/hamba/avro/v2 v2.19.0/go.mod h1:72DkWmMmAyZA+qHoI89u4RMCQ3X54vpEb1ap80iCIBg= github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok= github.com/hanwen/go-fuse/v2 v2.1.0/go.mod h1:oRyA5eK+pvJyv5otpO/DgccS8y/RvYMaO00GgRLGryc= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= From 1b9479482e3219b42b0346824f8e1994c966b256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Mon, 5 Feb 2024 15:04:40 +0100 Subject: [PATCH 2/2] do not treat unions of primitive types in union resolver --- .../schemaregistry/avro/schema_test.go | 42 ++++++++++++++++++- pkg/processor/schemaregistry/avro/union.go | 32 +++++++++++++- .../schemaregistry/avro/union_test.go | 32 +++++++++++++- 3 files changed, 101 insertions(+), 5 deletions(-) diff --git a/pkg/processor/schemaregistry/avro/schema_test.go b/pkg/processor/schemaregistry/avro/schema_test.go index cef11a540..c68acf741 100644 --- a/pkg/processor/schemaregistry/avro/schema_test.go +++ b/pkg/processor/schemaregistry/avro/schema_test.go @@ -176,7 +176,47 @@ func TestSchema_MarshalUnmarshal(t *testing.T) { }, wantSchema: avro.NewMapSchema(avro.NewPrimitiveSchema(avro.Int, nil)), }, { - name: "map[string]any (with data)", + name: "map[string]any (with primitive data)", + haveValue: map[string]any{ + "foo": "bar", + "foo2": "bar2", + "bar": 1, + "baz": true, + }, + wantValue: map[string]any{ + "foo": "bar", + "foo2": "bar2", + "bar": 1, + "baz": true, + }, + wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{ + &avro.NullSchema{}, + avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.String, nil), + avro.NewPrimitiveSchema(avro.Boolean, nil), + }))), + }, { + name: "map[string]any (with primitive array)", + haveValue: map[string]any{ + "foo": "bar", + "foo2": "bar2", + "bar": 1, + "baz": []int{1, 2, 3}, + }, + wantValue: map[string]any{ + "foo": "bar", + "foo2": "bar2", + "bar": 1, + "baz": []any{1, 2, 3}, + }, + wantSchema: avro.NewMapSchema(must(avro.NewUnionSchema([]avro.Schema{ + &avro.NullSchema{}, + avro.NewPrimitiveSchema(avro.Int, nil), + avro.NewPrimitiveSchema(avro.String, nil), + avro.NewArraySchema(avro.NewPrimitiveSchema(avro.Int, nil)), + }))), + }, { + name: "map[string]any (with union array)", haveValue: map[string]any{ "foo": "bar", "foo2": "bar2", diff --git a/pkg/processor/schemaregistry/avro/union.go b/pkg/processor/schemaregistry/avro/union.go index 0dd69ec43..da8659651 100644 --- a/pkg/processor/schemaregistry/avro/union.go +++ b/pkg/processor/schemaregistry/avro/union.go @@ -337,12 +337,40 @@ func (r UnionResolver) resolveNameForType(v any, us *avro.UnionSchema) (string, func isMapUnion(schema avro.Schema) bool { s, ok := schema.(*avro.MapSchema) - return ok && s.Values().Type() == avro.Union + if !ok { + return false + } + us, ok := s.Values().(*avro.UnionSchema) + if !ok { + return false + } + for _, s := range us.Types() { + // at least one of the types in the union must be a map or array for this + // to count as a map with a union type + if s.Type() == avro.Array || s.Type() == avro.Map { + return true + } + } + return false } func isArrayUnion(schema avro.Schema) bool { s, ok := schema.(*avro.ArraySchema) - return ok && s.Items().Type() == avro.Union + if !ok { + return false + } + us, ok := s.Items().(*avro.UnionSchema) + if !ok { + return false + } + for _, s := range us.Types() { + // at least one of the types in the union must be a map or array for this + // to count as a map with a union type + if s.Type() == avro.Array || s.Type() == avro.Map { + return true + } + } + return false } type substitution interface { diff --git a/pkg/processor/schemaregistry/avro/union_test.go b/pkg/processor/schemaregistry/avro/union_test.go index 7afd1243f..48ae3f0ef 100644 --- a/pkg/processor/schemaregistry/avro/union_test.go +++ b/pkg/processor/schemaregistry/avro/union_test.go @@ -15,6 +15,7 @@ package avro import ( + "reflect" "testing" "github.com/conduitio/conduit/pkg/record" @@ -70,6 +71,15 @@ func TestUnionResolver(t *testing.T) { want: map[string]any{"array": []bool(nil)}, }} + isSlice := func(a any) bool { + if a == nil { + return false + } + // returns true if the type is a slice and not a byte slice + t := reflect.TypeOf(a) + return t.Kind() == reflect.Slice && !t.Elem().AssignableTo(byteType) + } + for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { is := is.New(t) @@ -96,14 +106,32 @@ func TestUnionResolver(t *testing.T) { "foo2": tc.want, "map2": map[string]any{ "map": map[string]any{ - "foo3": tc.want, + "foo3": func() any { + // if the original value is a slice, we consider + // the type a union and wrap it in a map, otherwise + // we keep the original value + if isSlice(tc.have) { + return tc.want + } + return tc.have + }(), }, }, }, "arr1": []any{ tc.want, map[string]any{ - "array": []any{tc.want}, + "array": []any{ + func() any { + // if the original value is a slice, we consider + // the type a union and wrap it in a map, otherwise + // we keep the original value + if isSlice(tc.have) { + return tc.want + } + return tc.have + }(), + }, }, }, }