From ca4c5e2ec90a86184ec5f920ea7a6a8b884ff5f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 2 Jun 2023 16:36:13 +0200 Subject: [PATCH 01/15] introduce SerdeHeader --- pkg/sr/serde.go | 161 +++++++++++++++++++++++++++++-------------- pkg/sr/serde_test.go | 72 ++++++++++++++++++- 2 files changed, 180 insertions(+), 53 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 3f95af01..8d91f6de 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -1,19 +1,14 @@ package sr import ( + "bytes" "encoding/binary" "errors" - "io" "reflect" "sync" "sync/atomic" ) -// The wire format for encoded types is 0, then big endian uint32 of the ID, -// then the encoded message. -// -// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format - var ( // ErrNotRegistered is returned from Serde when attempting to encode a // value or decode an ID that has not been registered, or when using @@ -101,6 +96,7 @@ type Serde struct { mu sync.Mutex defaults []SerdeOpt + header SerdeHeader } var ( @@ -134,6 +130,20 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { s.defaults = opts } +// SetHeader configures which header should be used when encoding and decoding +// values. If the header is set to nil it falls back to ConfluentHeader. +func (s *Serde) SetHeader(header SerdeHeader) { + s.header = header +} + +// Header returns the configured header. +func (s *Serde) Header() SerdeHeader { + if s.header == nil { + return ConfluentHeader{} + } + return s.header +} + // Register registers a schema ID and the value it corresponds to, as well as // the encoding or decoding functions. You need to register functions depending // on whether you are only encoding, only decoding, or both. @@ -219,8 +229,8 @@ func tserdeMapClone(m map[int]tserde, at int, index []int) map[int]tserde { return dup } -// Encode encodes a value according to the schema registry wire format and -// returns it. If EncodeFn was not used, this returns ErrNotRegistered. +// Encode encodes a value and prepends the header according to the configured +// SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered. func (s *Serde) Encode(v any) ([]byte, error) { return s.AppendEncode(nil, v) } @@ -234,23 +244,9 @@ func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { return b, ErrNotRegistered } - b = append(b, - 0, - byte(t.id>>24), - byte(t.id>>16), - byte(t.id>>8), - byte(t.id>>0), - ) - - if len(t.index) > 0 { - if len(t.index) == 1 && t.index[0] == 0 { - b = append(b, 0) // first-index shortcut (one type in the protobuf) - } else { - b = binary.AppendVarint(b, int64(len(t.index))) - for _, idx := range t.index { - b = binary.AppendVarint(b, int64(idx)) - } - } + b, err := s.Header().AppendEncode(b, int(t.id), t.index) + if err != nil { + return nil, err } if t.appendEncode != nil { @@ -316,44 +312,105 @@ func (s *Serde) DecodeNew(b []byte) (any, error) { } func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { - if len(b) < 5 || b[0] != 0 { - return nil, tserde{}, ErrBadHeader + h := s.Header() + + id, b, err := h.DecodeID(b) + if err != nil { + return nil, tserde{}, err } - id := binary.BigEndian.Uint32(b[1:5]) - b = b[5:] - t := s.loadIDs()[int(id)] + t := s.loadIDs()[id] if len(t.subindex) > 0 { - r := bReader{b} - br := io.ByteReader(&r) - l, err := binary.ReadVarint(br) - if l == 0 { // length 0 is a shortcut for length 1, index 0 - t = t.subindex[0] - } - for err == nil && t.subindex != nil && l > 0 { - var idx int64 - idx, err = binary.ReadVarint(br) - t = t.subindex[int(idx)] - l-- - } + var index []int + index, b, err = h.DecodeIndex(b) if err != nil { - return nil, t, err + return nil, tserde{}, err + } + for _, idx := range index { + if t.subindex == nil { + return nil, tserde{}, ErrNotRegistered + } + t = t.subindex[idx] } - b = r.b } if !t.exists { - return nil, t, ErrNotRegistered + return nil, tserde{}, ErrNotRegistered } return b, t, nil } -type bReader struct{ b []byte } +// SerdeHeader encodes and decodes a message header. +type SerdeHeader interface { + AppendEncode(b []byte, id int, index []int) ([]byte, error) + DecodeID(in []byte) (id int, out []byte, err error) + DecodeIndex(in []byte) (index []int, out []byte, err error) +} -func (b *bReader) ReadByte() (byte, error) { - if len(b.b) > 0 { - r := b.b[0] - b.b = b.b[1:] - return r, nil +// ConfluentHeader is a SerdeHeader that produces the Confluent wire format. It +// starts with 0, then big endian uint32 of the ID, then index (only protobuf), +// then the encoded message. +// +// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format +type ConfluentHeader struct{} + +// AppendEncode appends an encoded header to b according to the Confluent wire +// format and returns it. Error is always nil. +func (ConfluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, error) { + b = append( + b, + 0, + byte(id>>24), + byte(id>>16), + byte(id>>8), + byte(id>>0), + ) + + if len(index) > 0 { + if len(index) == 1 && index[0] == 0 { + b = append(b, 0) // first-index shortcut (one type in the protobuf) + } else { + b = binary.AppendVarint(b, int64(len(index))) + for i := len(index) - 1; i >= 0; i-- { + b = binary.AppendVarint(b, int64(index[i])) + } + } + } + + return b, nil +} + +// DecodeID strips and decodes the schema ID from b. It returns the ID alongside +// the unread bytes. If the header does not contain the magic byte or b contains +// less than 5 bytes it returns ErrBadHeader. +func (c ConfluentHeader) DecodeID(b []byte) (int, []byte, error) { + if len(b) < 5 || b[0] != 0 { + return 0, nil, ErrBadHeader + } + id := binary.BigEndian.Uint32(b[1:5]) + return int(id), b[5:], nil +} + +// DecodeIndex strips and decodes the index from b. It returns the index +// alongside the unread bytes. It expects b to be the output of DecodeID (schema +// ID should already be stripped away). +func (c ConfluentHeader) DecodeIndex(b []byte) ([]int, []byte, error) { + buf := bytes.NewBuffer(b) + l, err := binary.ReadVarint(buf) + if err != nil { + return nil, nil, err + } + if l == 0 { // length 0 is a shortcut for length 1, index 0 + return []int{0}, buf.Bytes(), nil + } + index := make([]int, l) + for l > 0 { + var idx int64 + idx, err = binary.ReadVarint(buf) + if err != nil { + return nil, nil, err + } + index[l-1] = int(idx) // index is stored last to first + l-- } - return 0, io.EOF + return index, buf.Bytes(), nil } diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index 8667a0d2..f942beec 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -83,7 +83,7 @@ func TestSerde(t *testing.T) { }, { enc: idx4{Bingo: "bango"}, - expEnc: append([]byte{0, 0, 0, 0, 3, 6, 0, 0, 2}, `{"bingo":"bango"}`...), + expEnc: append([]byte{0, 0, 0, 0, 3, 6, 2, 0, 0}, `{"bingo":"bango"}`...), }, { enc: oneidx{Bar: "bar"}, @@ -139,3 +139,73 @@ func TestSerde(t *testing.T) { t.Errorf("got %v != exp ErrNotRegistered", err) } } + +func TestConfluentHeader(t *testing.T) { + var h ConfluentHeader + + for i, test := range []struct { + id int + index []int + expEnc []byte + }{ + {id: 1, index: nil, expEnc: []byte{0, 0, 0, 0, 1}}, + {id: 256, index: nil, expEnc: []byte{0, 0, 0, 1, 0}}, + {id: 2, index: []int{0}, expEnc: []byte{0, 0, 0, 0, 2, 0}}, + {id: 3, index: []int{1}, expEnc: []byte{0, 0, 0, 0, 3, 2, 2}}, + {id: 4, index: []int{1, 2, 3}, expEnc: []byte{0, 0, 0, 0, 4, 6, 6, 4, 2}}, + } { + b, err := h.AppendEncode(nil, test.id, test.index) + if err != nil { + t.Errorf("#%d AppendEncode: got unexpected err %v", i, err) + continue + } + if !bytes.Equal(b, test.expEnc) { + t.Errorf("#%d: AppendEncode(%v) != exp(%v)", i, b, test.expEnc) + continue + } + + if b2, _ := h.AppendEncode([]byte("foo"), test.id, test.index); !bytes.Equal(b2, append([]byte("foo"), b...)) { + t.Errorf("#%d got AppendEncode(%v) != AppendEncode(foo%v)", i, b2, b) + } + + id, b2, err := h.DecodeID(b) + if err != nil { + t.Errorf("#%d DecodeID: got unexpected err %v", i, err) + continue + } + if id != test.id { + t.Errorf("#%d: DecodeID: id(%v) != exp(%v)", i, id, test.id) + continue + } + if test.index == nil && len(b2) != 0 { + t.Errorf("#%d: DecodeID: bytes(%v) != exp([])", i, b2) + continue + } + + if test.index != nil { + index, b3, err := h.DecodeIndex(b2) + if err != nil { + t.Errorf("#%d DecodeIndex: got unexpected err %v", i, err) + continue + } + if !reflect.DeepEqual(index, test.index) { + t.Errorf("#%d: DecodeIndex: index(%v) != exp(%v)", i, index, test.index) + continue + } + if len(b3) != 0 { + t.Errorf("#%d: DecodeIndex: bytes(%v) != exp([])", i, b3) + continue + } + } + } + + if _, _, err := h.DecodeID([]byte{1, 0, 0, 0, 0, 1}); err != ErrBadHeader { + t.Errorf("got %v != exp ErrBadHeader", err) + } + if _, _, err := h.DecodeID([]byte{0, 0, 0, 0}); err != ErrBadHeader { + t.Errorf("got %v != exp ErrBadHeader", err) + } + if _, _, err := h.DecodeIndex([]byte{2}); err != io.EOF { + t.Errorf("got %v != exp io.EOF", err) + } +} From 9be504fc1eebc7b03b158bb0176446cc36799481 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 2 Jun 2023 17:19:45 +0200 Subject: [PATCH 02/15] revert reversal of index --- pkg/sr/serde.go | 9 ++++----- pkg/sr/serde_test.go | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 8d91f6de..4d6a2eee 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -370,8 +370,8 @@ func (ConfluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, erro b = append(b, 0) // first-index shortcut (one type in the protobuf) } else { b = binary.AppendVarint(b, int64(len(index))) - for i := len(index) - 1; i >= 0; i-- { - b = binary.AppendVarint(b, int64(index[i])) + for _, idx := range index { + b = binary.AppendVarint(b, int64(idx)) } } } @@ -403,14 +403,13 @@ func (c ConfluentHeader) DecodeIndex(b []byte) ([]int, []byte, error) { return []int{0}, buf.Bytes(), nil } index := make([]int, l) - for l > 0 { + for i := 0; i < int(l); i++ { var idx int64 idx, err = binary.ReadVarint(buf) if err != nil { return nil, nil, err } - index[l-1] = int(idx) // index is stored last to first - l-- + index[i] = int(idx) } return index, buf.Bytes(), nil } diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index f942beec..a573d2e0 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -83,7 +83,7 @@ func TestSerde(t *testing.T) { }, { enc: idx4{Bingo: "bango"}, - expEnc: append([]byte{0, 0, 0, 0, 3, 6, 2, 0, 0}, `{"bingo":"bango"}`...), + expEnc: append([]byte{0, 0, 0, 0, 3, 6, 0, 0, 2}, `{"bingo":"bango"}`...), }, { enc: oneidx{Bar: "bar"}, @@ -152,7 +152,7 @@ func TestConfluentHeader(t *testing.T) { {id: 256, index: nil, expEnc: []byte{0, 0, 0, 1, 0}}, {id: 2, index: []int{0}, expEnc: []byte{0, 0, 0, 0, 2, 0}}, {id: 3, index: []int{1}, expEnc: []byte{0, 0, 0, 0, 3, 2, 2}}, - {id: 4, index: []int{1, 2, 3}, expEnc: []byte{0, 0, 0, 0, 4, 6, 6, 4, 2}}, + {id: 4, index: []int{1, 2, 3}, expEnc: []byte{0, 0, 0, 0, 4, 6, 2, 4, 6}}, } { b, err := h.AppendEncode(nil, test.id, test.index) if err != nil { From 1982c8a1e7bb87bdd1be186e63c28cda0451c1cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 2 Jun 2023 17:27:42 +0200 Subject: [PATCH 03/15] simplify loop --- pkg/sr/serde.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 4d6a2eee..51966a36 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -403,9 +403,8 @@ func (c ConfluentHeader) DecodeIndex(b []byte) ([]int, []byte, error) { return []int{0}, buf.Bytes(), nil } index := make([]int, l) - for i := 0; i < int(l); i++ { - var idx int64 - idx, err = binary.ReadVarint(buf) + for i := range index { + idx, err := binary.ReadVarint(buf) if err != nil { return nil, nil, err } From 0cdc3c68f3fcce4f78a8396a7dc6a462a865dc22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 2 Jun 2023 17:29:17 +0200 Subject: [PATCH 04/15] remove unused receiver --- pkg/sr/serde.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 51966a36..29f4116b 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -382,7 +382,7 @@ func (ConfluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, erro // DecodeID strips and decodes the schema ID from b. It returns the ID alongside // the unread bytes. If the header does not contain the magic byte or b contains // less than 5 bytes it returns ErrBadHeader. -func (c ConfluentHeader) DecodeID(b []byte) (int, []byte, error) { +func (ConfluentHeader) DecodeID(b []byte) (int, []byte, error) { if len(b) < 5 || b[0] != 0 { return 0, nil, ErrBadHeader } @@ -393,7 +393,7 @@ func (c ConfluentHeader) DecodeID(b []byte) (int, []byte, error) { // DecodeIndex strips and decodes the index from b. It returns the index // alongside the unread bytes. It expects b to be the output of DecodeID (schema // ID should already be stripped away). -func (c ConfluentHeader) DecodeIndex(b []byte) ([]int, []byte, error) { +func (ConfluentHeader) DecodeIndex(b []byte) ([]int, []byte, error) { buf := bytes.NewBuffer(b) l, err := binary.ReadVarint(buf) if err != nil { From b107644c68a2a1dfcd993d416683533f92aaf93a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Mon, 12 Jun 2023 16:38:03 +0200 Subject: [PATCH 05/15] optimize index decoding for unknown types --- pkg/sr/serde.go | 50 +++++++++++++++++++++++++++++--------------- pkg/sr/serde_test.go | 10 +++++++-- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 29f4116b..fd20427b 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -75,8 +75,9 @@ type tserde struct { gen func() any typeof reflect.Type - index []int // for encoding, an optional index we use - subindex map[int]tserde // for decoding, we look up sub-indices in the payload + index []int // for encoding, an optional index we use + subindex map[int]tserde // for decoding, we look up sub-indices in the payload + subindexDepth int // for decoding, we need to know the maximum depth of the subindex map } // Serde encodes and decodes values according to the schema registry wire @@ -182,7 +183,16 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { // tree to find the end node we are initializing. k := id at := m[k] - for _, idx := range t.index { + max := func(i, j int) int { + if i > j { + return i + } + return j + } + for i, idx := range t.index { + at.subindexDepth = max(at.subindexDepth, len(t.index)-i) + m[k] = at + m = at.subindex k = idx at = m[k] @@ -196,15 +206,16 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { // Now, we initialize the end node. t = tserde{ - id: uint32(id), - exists: true, - encode: t.encode, - appendEncode: t.appendEncode, - decode: t.decode, - gen: t.gen, - typeof: typeof, - index: t.index, - subindex: at.subindex, + id: uint32(id), + exists: true, + encode: t.encode, + appendEncode: t.appendEncode, + decode: t.decode, + gen: t.gen, + typeof: typeof, + index: t.index, + subindex: at.subindex, + subindexDepth: at.subindexDepth, } m[k] = t } @@ -322,7 +333,7 @@ func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { t := s.loadIDs()[id] if len(t.subindex) > 0 { var index []int - index, b, err = h.DecodeIndex(b) + index, b, err = h.DecodeIndex(b, t.subindexDepth) if err != nil { return nil, tserde{}, err } @@ -343,7 +354,7 @@ func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { type SerdeHeader interface { AppendEncode(b []byte, id int, index []int) ([]byte, error) DecodeID(in []byte) (id int, out []byte, err error) - DecodeIndex(in []byte) (index []int, out []byte, err error) + DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error) } // ConfluentHeader is a SerdeHeader that produces the Confluent wire format. It @@ -390,10 +401,12 @@ func (ConfluentHeader) DecodeID(b []byte) (int, []byte, error) { return int(id), b[5:], nil } -// DecodeIndex strips and decodes the index from b. It returns the index +// DecodeIndex strips and decodes indices from b. It returns the index slice // alongside the unread bytes. It expects b to be the output of DecodeID (schema -// ID should already be stripped away). -func (ConfluentHeader) DecodeIndex(b []byte) ([]int, []byte, error) { +// ID should already be stripped away). If maxLength is greater than 0 and the +// encoded data contains more indices than maxLength the function returns +// ErrNotRegistered. +func (ConfluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error) { buf := bytes.NewBuffer(b) l, err := binary.ReadVarint(buf) if err != nil { @@ -402,6 +415,9 @@ func (ConfluentHeader) DecodeIndex(b []byte) ([]int, []byte, error) { if l == 0 { // length 0 is a shortcut for length 1, index 0 return []int{0}, buf.Bytes(), nil } + if maxLength > 0 && int(l) > maxLength { // index count is greater than expected + return nil, nil, ErrNotRegistered + } index := make([]int, l) for i := range index { idx, err := binary.ReadVarint(buf) diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index a573d2e0..290edb9a 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -135,6 +135,9 @@ func TestSerde(t *testing.T) { if _, err := serde.DecodeNew([]byte{0, 0, 0, 0, 3}); err != io.EOF { t.Errorf("got %v != exp io.EOF", err) } + if _, err := serde.DecodeNew([]byte{0, 0, 0, 0, 3, 8}); err != ErrNotRegistered { + t.Errorf("got %v != exp ErrNotRegistered", err) + } if _, err := serde.DecodeNew([]byte{0, 0, 0, 0, 99}); err != ErrNotRegistered { t.Errorf("got %v != exp ErrNotRegistered", err) } @@ -183,7 +186,7 @@ func TestConfluentHeader(t *testing.T) { } if test.index != nil { - index, b3, err := h.DecodeIndex(b2) + index, b3, err := h.DecodeIndex(b2, len(test.index)) if err != nil { t.Errorf("#%d DecodeIndex: got unexpected err %v", i, err) continue @@ -205,7 +208,10 @@ func TestConfluentHeader(t *testing.T) { if _, _, err := h.DecodeID([]byte{0, 0, 0, 0}); err != ErrBadHeader { t.Errorf("got %v != exp ErrBadHeader", err) } - if _, _, err := h.DecodeIndex([]byte{2}); err != io.EOF { + if _, _, err := h.DecodeIndex([]byte{2}, 1); err != io.EOF { t.Errorf("got %v != exp io.EOF", err) } + if _, _, err := h.DecodeIndex([]byte{6, 2, 4, 6}, 2); err != ErrNotRegistered { + t.Errorf("got %v != exp ErrNotRegistered", err) + } } From b3312a2b653e9f95376546065d2a22788cce966d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 16 Jun 2023 11:35:31 +0200 Subject: [PATCH 06/15] SerdeHeader comments --- pkg/sr/serde.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index fd20427b..e6700bd2 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -352,8 +352,14 @@ func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { // SerdeHeader encodes and decodes a message header. type SerdeHeader interface { + // AppendEncode encodes a schema ID and optional index to b, returning the + // updated slice or an error. AppendEncode(b []byte, id int, index []int) ([]byte, error) + // DecodeID decodes an ID from in, returning the ID and the remaining bytes, + // or an error. DecodeID(in []byte) (id int, out []byte, err error) + // DecodeIndex decodes at most maxLength of a schema index from in, + // returning the index and remaining bytes, or an error. DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error) } From 08ea701caea2bedf14895493f46739331f3e8290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 16 Jun 2023 11:38:20 +0200 Subject: [PATCH 07/15] check for negative index length --- pkg/sr/serde.go | 3 +++ pkg/sr/serde_test.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index e6700bd2..1adbe23b 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -421,6 +421,9 @@ func (ConfluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, erro if l == 0 { // length 0 is a shortcut for length 1, index 0 return []int{0}, buf.Bytes(), nil } + if l < 0 { // index length can't be negative + return nil, nil, ErrBadHeader + } if maxLength > 0 && int(l) > maxLength { // index count is greater than expected return nil, nil, ErrNotRegistered } diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index 290edb9a..4b591fa0 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -214,4 +214,7 @@ func TestConfluentHeader(t *testing.T) { if _, _, err := h.DecodeIndex([]byte{6, 2, 4, 6}, 2); err != ErrNotRegistered { t.Errorf("got %v != exp ErrNotRegistered", err) } + if _, _, err := h.DecodeIndex([]byte{1}, 2); err != ErrBadHeader { + t.Errorf("got %v != exp ErrBadHeader", err) + } } From 96dd4b5f658e3e81a5bd01e3b9f6e13cbb86c7f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 16 Jun 2023 11:42:43 +0200 Subject: [PATCH 08/15] make confluent header private and a pointer --- pkg/sr/serde.go | 16 +++++++++------- pkg/sr/serde_test.go | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 1adbe23b..93d2ea0e 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -132,7 +132,7 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { } // SetHeader configures which header should be used when encoding and decoding -// values. If the header is set to nil it falls back to ConfluentHeader. +// values. If the header is set to nil it falls back to confluentHeader. func (s *Serde) SetHeader(header SerdeHeader) { s.header = header } @@ -140,7 +140,7 @@ func (s *Serde) SetHeader(header SerdeHeader) { // Header returns the configured header. func (s *Serde) Header() SerdeHeader { if s.header == nil { - return ConfluentHeader{} + return defaultSerdeHeader } return s.header } @@ -363,16 +363,18 @@ type SerdeHeader interface { DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error) } -// ConfluentHeader is a SerdeHeader that produces the Confluent wire format. It +var defaultSerdeHeader = new(confluentHeader) + +// confluentHeader is a SerdeHeader that produces the Confluent wire format. It // starts with 0, then big endian uint32 of the ID, then index (only protobuf), // then the encoded message. // // https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format -type ConfluentHeader struct{} +type confluentHeader struct{} // AppendEncode appends an encoded header to b according to the Confluent wire // format and returns it. Error is always nil. -func (ConfluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, error) { +func (*confluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, error) { b = append( b, 0, @@ -399,7 +401,7 @@ func (ConfluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, erro // DecodeID strips and decodes the schema ID from b. It returns the ID alongside // the unread bytes. If the header does not contain the magic byte or b contains // less than 5 bytes it returns ErrBadHeader. -func (ConfluentHeader) DecodeID(b []byte) (int, []byte, error) { +func (*confluentHeader) DecodeID(b []byte) (int, []byte, error) { if len(b) < 5 || b[0] != 0 { return 0, nil, ErrBadHeader } @@ -412,7 +414,7 @@ func (ConfluentHeader) DecodeID(b []byte) (int, []byte, error) { // ID should already be stripped away). If maxLength is greater than 0 and the // encoded data contains more indices than maxLength the function returns // ErrNotRegistered. -func (ConfluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error) { +func (*confluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error) { buf := bytes.NewBuffer(b) l, err := binary.ReadVarint(buf) if err != nil { diff --git a/pkg/sr/serde_test.go b/pkg/sr/serde_test.go index 4b591fa0..45287c30 100644 --- a/pkg/sr/serde_test.go +++ b/pkg/sr/serde_test.go @@ -144,7 +144,7 @@ func TestSerde(t *testing.T) { } func TestConfluentHeader(t *testing.T) { - var h ConfluentHeader + var h confluentHeader for i, test := range []struct { id int From 29e1d2e60397264af271d88592564c9a11aa8359 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 16 Jun 2023 11:50:32 +0200 Subject: [PATCH 09/15] defer storing of serde ids --- pkg/sr/serde.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 93d2ea0e..ff441e86 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -163,26 +163,28 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { defer s.mu.Unlock() // Type mapping is easy: we just map the type to the final tserde. - // We defer the store because we modify the tserde below, and we - // may delete a type key. dupTypes := make(map[reflect.Type]tserde) for k, v := range s.loadTypes() { dupTypes[k] = v } + + // For IDs, we deeply clone any path that is changing. + dupIDs := tserdeMapClone(s.loadIDs(), id, t.index) + + // We defer the store because we modify the tserde below, and we + // may delete a type key. defer func() { dupTypes[typeof] = t s.types.Store(dupTypes) + s.ids.Store(dupIDs) }() - // For IDs, we deeply clone any path that is changing. - m := tserdeMapClone(s.loadIDs(), id, t.index) - s.ids.Store(m) - // Now we have a full path down index initialized (or, the top // level map if there is no index). We iterate down the index // tree to find the end node we are initializing. k := id - at := m[k] + m := dupIDs + at := dupIDs[k] max := func(i, j int) int { if i > j { return i From 0df919b4157b8a77d691be228b70cea1be3dcc20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 16 Jun 2023 12:01:37 +0200 Subject: [PATCH 10/15] refactor subindex depth calculation --- pkg/sr/serde.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index ff441e86..84cd3a6f 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -191,8 +191,11 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { } return j } + myDepth := len(t.index) for i, idx := range t.index { - at.subindexDepth = max(at.subindexDepth, len(t.index)-i) + // SAFETY: tserdeMapClone deeply clones all maps through the index, so + // our modified value is being saved to a new map that is not being read. + at.subindexDepth = max(at.subindexDepth, myDepth-i) m[k] = at m = at.subindex From 27a8e66df5008c31acdd578fd5ca431ef7b5fb07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 16 Jun 2023 12:36:37 +0200 Subject: [PATCH 11/15] simplify subindex depth calculation --- pkg/sr/serde.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 84cd3a6f..67dc1c65 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -185,18 +185,19 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { k := id m := dupIDs at := dupIDs[k] + depth := len(t.index) max := func(i, j int) int { if i > j { return i } return j } - myDepth := len(t.index) - for i, idx := range t.index { + for _, idx := range t.index { // SAFETY: tserdeMapClone deeply clones all maps through the index, so // our modified value is being saved to a new map that is not being read. - at.subindexDepth = max(at.subindexDepth, myDepth-i) + at.subindexDepth = max(at.subindexDepth, depth) m[k] = at + depth-- m = at.subindex k = idx From 22504ebb5399a6f352056157de19c975d498c16d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 16 Jun 2023 12:47:30 +0200 Subject: [PATCH 12/15] remove Serde.SetHeader --- pkg/sr/serde.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 67dc1c65..378c9053 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -131,12 +131,6 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { s.defaults = opts } -// SetHeader configures which header should be used when encoding and decoding -// values. If the header is set to nil it falls back to confluentHeader. -func (s *Serde) SetHeader(header SerdeHeader) { - s.header = header -} - // Header returns the configured header. func (s *Serde) Header() SerdeHeader { if s.header == nil { From 08a4bb0572e746f9672e88471a24695c389c80e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 16 Jun 2023 12:55:24 +0200 Subject: [PATCH 13/15] reintroduce bReader --- pkg/sr/serde.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 378c9053..8c1bf1d3 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -1,9 +1,9 @@ package sr import ( - "bytes" "encoding/binary" "errors" + "io" "reflect" "sync" "sync/atomic" @@ -415,13 +415,14 @@ func (*confluentHeader) DecodeID(b []byte) (int, []byte, error) { // encoded data contains more indices than maxLength the function returns // ErrNotRegistered. func (*confluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error) { - buf := bytes.NewBuffer(b) - l, err := binary.ReadVarint(buf) + r := bReader{b} + br := io.ByteReader(&r) + l, err := binary.ReadVarint(br) if err != nil { return nil, nil, err } if l == 0 { // length 0 is a shortcut for length 1, index 0 - return []int{0}, buf.Bytes(), nil + return []int{0}, r.b, nil } if l < 0 { // index length can't be negative return nil, nil, ErrBadHeader @@ -431,11 +432,22 @@ func (*confluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, err } index := make([]int, l) for i := range index { - idx, err := binary.ReadVarint(buf) + idx, err := binary.ReadVarint(br) if err != nil { return nil, nil, err } index[i] = int(idx) } - return index, buf.Bytes(), nil + return index, r.b, nil +} + +type bReader struct{ b []byte } + +func (b *bReader) ReadByte() (byte, error) { + if len(b.b) > 0 { + r := b.b[0] + b.b = b.b[1:] + return r, nil + } + return 0, io.EOF } From fea66962997bfa638f7d38f05a987f3eb7b40ccf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Mon, 3 Jul 2023 18:04:10 +0200 Subject: [PATCH 14/15] replace Header with DecodeID and DecodeIndex --- pkg/sr/serde.go | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 8c1bf1d3..3ab9205a 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -17,7 +17,7 @@ var ( // ErrBadHeader is returned from Decode when the input slice is shorter // than five bytes, or if the first byte is not the magic 0 byte. - ErrBadHeader = errors.New("5 byte header for value is missing or does no have 0 magic byte") + ErrBadHeader = errors.New("5 byte header for value is missing or does not have 0 magic byte") ) type ( @@ -97,7 +97,7 @@ type Serde struct { mu sync.Mutex defaults []SerdeOpt - header SerdeHeader + h SerdeHeader } var ( @@ -131,12 +131,24 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { s.defaults = opts } -// Header returns the configured header. -func (s *Serde) Header() SerdeHeader { - if s.header == nil { +// DecodeID decodes an ID from in, returning the ID and the remaining bytes, +// or an error. +func (s *Serde) DecodeID(b []byte) (id int, out []byte, err error) { + return s.header().DecodeID(b) +} + +// DecodeIndex decodes at most maxLength of a schema index from in, returning +// the index and remaining bytes, or an error. It expects b to be the output of +// DecodeID (schema ID should already be stripped away). +func (s *Serde) DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error) { + return s.header().DecodeIndex(in, maxLength) +} + +func (s *Serde) header() SerdeHeader { + if s.h == nil { return defaultSerdeHeader } - return s.header + return s.h } // Register registers a schema ID and the value it corresponds to, as well as @@ -178,7 +190,7 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { // tree to find the end node we are initializing. k := id m := dupIDs - at := dupIDs[k] + at := m[k] depth := len(t.index) max := func(i, j int) int { if i > j { @@ -255,7 +267,7 @@ func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { return b, ErrNotRegistered } - b, err := s.Header().AppendEncode(b, int(t.id), t.index) + b, err := s.header().AppendEncode(b, int(t.id), t.index) if err != nil { return nil, err } @@ -323,9 +335,7 @@ func (s *Serde) DecodeNew(b []byte) (any, error) { } func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { - h := s.Header() - - id, b, err := h.DecodeID(b) + id, b, err := s.DecodeID(b) if err != nil { return nil, tserde{}, err } @@ -333,7 +343,7 @@ func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { t := s.loadIDs()[id] if len(t.subindex) > 0 { var index []int - index, b, err = h.DecodeIndex(b, t.subindexDepth) + index, b, err = s.DecodeIndex(b, t.subindexDepth) if err != nil { return nil, tserde{}, err } From b34cf0dd4cbbd5342db6d1fa32e91bec06370f47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Sun, 9 Jul 2023 08:17:18 +0200 Subject: [PATCH 15/15] fix comment on DecodeID --- pkg/sr/serde.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sr/serde.go b/pkg/sr/serde.go index 3ab9205a..8de7b5eb 100644 --- a/pkg/sr/serde.go +++ b/pkg/sr/serde.go @@ -131,7 +131,7 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { s.defaults = opts } -// DecodeID decodes an ID from in, returning the ID and the remaining bytes, +// DecodeID decodes an ID from b, returning the ID and the remaining bytes, // or an error. func (s *Serde) DecodeID(b []byte) (id int, out []byte, err error) { return s.header().DecodeID(b)