Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serde Header #467

Merged
merged 16 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 146 additions & 64 deletions pkg/sr/serde.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package sr

import (
"bytes"
"encoding/binary"
"errors"
"io"
"reflect"
"sync"
"sync/atomic"
)

// The wire format for encoded types is 0, then big endian uint32 of the ID,
// then the encoded message.
//
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

var (
// ErrNotRegistered is returned from Serde when attempting to encode a
// value or decode an ID that has not been registered, or when using
Expand Down Expand Up @@ -80,8 +75,9 @@ type tserde struct {
gen func() any
typeof reflect.Type

index []int // for encoding, an optional index we use
subindex map[int]tserde // for decoding, we look up sub-indices in the payload
index []int // for encoding, an optional index we use
subindex map[int]tserde // for decoding, we look up sub-indices in the payload
subindexDepth int // for decoding, we need to know the maximum depth of the subindex map
}

// Serde encodes and decodes values according to the schema registry wire
Expand All @@ -101,6 +97,7 @@ type Serde struct {
mu sync.Mutex

defaults []SerdeOpt
header SerdeHeader
}

var (
Expand Down Expand Up @@ -134,6 +131,20 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) {
s.defaults = opts
}

// SetHeader configures which header should be used when encoding and decoding
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt about this this being SetGlobal, and then SerdeHeader being a GlobalOpt -- i.e. something that applies to all serdes? y/n?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it would make it easy to add more options in the future. I think naming could be better though. IMO SerdeOpt sounds like a better fit for configuring Serde (i.e. global options), while the current options could be something like RegisterOpt (since they are used in Register). This naming would play well with the changes I proposed in the other PR, which introduces EncodeOpt, we end up with:

  • SerdeOpt configures Serde
  • RegisterOpt configures Serde.Register
  • EncodeOpt configures Serde.Encode

While we're at it, it might make sense to change Opt to ClientOpt and introduce NewSerde(opts ...SerdeOpt) to match NewClient. WDYT? I can remove SetHeader from this PR and introduce the new options in a separate PR, to not increase the scope too much.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • +1 to adding NewSerde, which also removes the need for SetDefaults (so that can / should be removed).
  • +1 to changing Opt to ClientOpt
  • +1 to changing the current register opts to RegisterOpt, and then changing the new opt introduced here to SerdeOpt however all RegisterOpts need to implement SerdeOpt so that they can be used as defaults.
  • I'm not sure about the difference between RegisterOpt and EncodeOpt. I need to look at the other PR. I remember glancing at it and not being sure why it was split. Also I wonder if EncodingOpt is better than both RegisterOpt and EncodeOpt (although I could favor EncodeOpt for brevity)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'm sorry this has been such a slow review cycle, to put it short: I was involved with something that was time consuming sun up to sun down and also went on for ~3w (coincidentally, right when you opened the PR) -- and I'm trying to not be inside on the weekends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries! I'll get back to this PR in a day or two 👍

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have time on Thursday to take this over if you're not able to get to it (I admit that this review cycle has gone on for ... quite a while).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the situation turned around, because I'm pretty busy now and I'm not sure I'll get a chance to look into this before the weekend. If you get the time, feel free to take over the work that's left.

// values. If the header is set to nil it falls back to confluentHeader.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

back to the default Confluent header.

func (s *Serde) SetHeader(header SerdeHeader) {
s.header = header
}

// Header returns the configured header.
func (s *Serde) Header() SerdeHeader {
twmb marked this conversation as resolved.
Show resolved Hide resolved
if s.header == nil {
return defaultSerdeHeader
}
return s.header
}

// Register registers a schema ID and the value it corresponds to, as well as
// the encoding or decoding functions. You need to register functions depending
// on whether you are only encoding, only decoding, or both.
Expand Down Expand Up @@ -172,7 +183,16 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) {
// tree to find the end node we are initializing.
twmb marked this conversation as resolved.
Show resolved Hide resolved
k := id
at := m[k]
for _, idx := range t.index {
max := func(i, j int) int {
if i > j {
return i
}
return j
}
for i, idx := range t.index {
at.subindexDepth = max(at.subindexDepth, len(t.index)-i)
twmb marked this conversation as resolved.
Show resolved Hide resolved
m[k] = at

m = at.subindex
k = idx
at = m[k]
Expand All @@ -186,15 +206,16 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) {

// Now, we initialize the end node.
t = tserde{
id: uint32(id),
exists: true,
encode: t.encode,
appendEncode: t.appendEncode,
decode: t.decode,
gen: t.gen,
typeof: typeof,
index: t.index,
subindex: at.subindex,
id: uint32(id),
exists: true,
encode: t.encode,
appendEncode: t.appendEncode,
decode: t.decode,
gen: t.gen,
typeof: typeof,
index: t.index,
subindex: at.subindex,
subindexDepth: at.subindexDepth,
}
m[k] = t
}
Expand All @@ -219,8 +240,8 @@ func tserdeMapClone(m map[int]tserde, at int, index []int) map[int]tserde {
return dup
}

// Encode encodes a value according to the schema registry wire format and
// returns it. If EncodeFn was not used, this returns ErrNotRegistered.
// Encode encodes a value and prepends the header according to the configured
// SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered.
func (s *Serde) Encode(v any) ([]byte, error) {
return s.AppendEncode(nil, v)
}
Expand All @@ -234,23 +255,9 @@ func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) {
return b, ErrNotRegistered
}

b = append(b,
0,
byte(t.id>>24),
byte(t.id>>16),
byte(t.id>>8),
byte(t.id>>0),
)

if len(t.index) > 0 {
if len(t.index) == 1 && t.index[0] == 0 {
b = append(b, 0) // first-index shortcut (one type in the protobuf)
} else {
b = binary.AppendVarint(b, int64(len(t.index)))
for _, idx := range t.index {
b = binary.AppendVarint(b, int64(idx))
}
}
b, err := s.Header().AppendEncode(b, int(t.id), t.index)
if err != nil {
return nil, err
}

if t.appendEncode != nil {
Expand Down Expand Up @@ -316,44 +323,119 @@ func (s *Serde) DecodeNew(b []byte) (any, error) {
}

func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) {
if len(b) < 5 || b[0] != 0 {
return nil, tserde{}, ErrBadHeader
h := s.Header()

id, b, err := h.DecodeID(b)
if err != nil {
return nil, tserde{}, err
}
id := binary.BigEndian.Uint32(b[1:5])
b = b[5:]

t := s.loadIDs()[int(id)]
t := s.loadIDs()[id]
if len(t.subindex) > 0 {
r := bReader{b}
br := io.ByteReader(&r)
l, err := binary.ReadVarint(br)
if l == 0 { // length 0 is a shortcut for length 1, index 0
t = t.subindex[0]
}
for err == nil && t.subindex != nil && l > 0 {
var idx int64
idx, err = binary.ReadVarint(br)
t = t.subindex[int(idx)]
l--
}
var index []int
index, b, err = h.DecodeIndex(b, t.subindexDepth)
if err != nil {
return nil, t, err
return nil, tserde{}, err
}
for _, idx := range index {
if t.subindex == nil {
return nil, tserde{}, ErrNotRegistered
}
t = t.subindex[idx]
}
b = r.b
}
if !t.exists {
return nil, t, ErrNotRegistered
return nil, tserde{}, ErrNotRegistered
}
return b, t, nil
}

type bReader struct{ b []byte }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there was a specific reason for the custom reader type, I replaced it with bytes.Buffer to simplify.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I often misremember that bytes.Reader is relatively small, and mix it up with bytes.Buffer being a pretty big struct -- but it looks like that's not even true anymore, since the bootstrap field was removed in 2018: golang/go@9c2be4c

// SerdeHeader encodes and decodes a message header.
type SerdeHeader interface {
// AppendEncode encodes a schema ID and optional index to b, returning the
// updated slice or an error.
AppendEncode(b []byte, id int, index []int) ([]byte, error)
twmb marked this conversation as resolved.
Show resolved Hide resolved
// DecodeID decodes an ID from in, returning the ID and the remaining bytes,
// or an error.
DecodeID(in []byte) (id int, out []byte, err error)
twmb marked this conversation as resolved.
Show resolved Hide resolved
// DecodeIndex decodes at most maxLength of a schema index from in,
// returning the index and remaining bytes, or an error.
DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error)
twmb marked this conversation as resolved.
Show resolved Hide resolved
}

var defaultSerdeHeader = new(confluentHeader)

// confluentHeader is a SerdeHeader that produces the Confluent wire format. It
// starts with 0, then big endian uint32 of the ID, then index (only protobuf),
// then the encoded message.
//
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
type confluentHeader struct{}

// AppendEncode appends an encoded header to b according to the Confluent wire
// format and returns it. Error is always nil.
func (*confluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, error) {
b = append(
b,
0,
byte(id>>24),
byte(id>>16),
byte(id>>8),
byte(id>>0),
)

if len(index) > 0 {
if len(index) == 1 && index[0] == 0 {
b = append(b, 0) // first-index shortcut (one type in the protobuf)
} else {
b = binary.AppendVarint(b, int64(len(index)))
for _, idx := range index {
b = binary.AppendVarint(b, int64(idx))
}
}
}

func (b *bReader) ReadByte() (byte, error) {
if len(b.b) > 0 {
r := b.b[0]
b.b = b.b[1:]
return r, nil
return b, nil
}

// DecodeID strips and decodes the schema ID from b. It returns the ID alongside
// the unread bytes. If the header does not contain the magic byte or b contains
// less than 5 bytes it returns ErrBadHeader.
func (*confluentHeader) DecodeID(b []byte) (int, []byte, error) {
if len(b) < 5 || b[0] != 0 {
return 0, nil, ErrBadHeader
}
id := binary.BigEndian.Uint32(b[1:5])
return int(id), b[5:], nil
}

// DecodeIndex strips and decodes indices from b. It returns the index slice
// alongside the unread bytes. It expects b to be the output of DecodeID (schema
// ID should already be stripped away). If maxLength is greater than 0 and the
// encoded data contains more indices than maxLength the function returns
// ErrNotRegistered.
func (*confluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error) {
buf := bytes.NewBuffer(b)
twmb marked this conversation as resolved.
Show resolved Hide resolved
l, err := binary.ReadVarint(buf)
if err != nil {
return nil, nil, err
}
twmb marked this conversation as resolved.
Show resolved Hide resolved
if l == 0 { // length 0 is a shortcut for length 1, index 0
return []int{0}, buf.Bytes(), nil
}
if l < 0 { // index length can't be negative
return nil, nil, ErrBadHeader
}
if maxLength > 0 && int(l) > maxLength { // index count is greater than expected
return nil, nil, ErrNotRegistered
}
index := make([]int, l)
twmb marked this conversation as resolved.
Show resolved Hide resolved
for i := range index {
idx, err := binary.ReadVarint(buf)
if err != nil {
return nil, nil, err
}
index[i] = int(idx)
}
return 0, io.EOF
return index, buf.Bytes(), nil
}
79 changes: 79 additions & 0 deletions pkg/sr/serde_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,86 @@ func TestSerde(t *testing.T) {
if _, err := serde.DecodeNew([]byte{0, 0, 0, 0, 3}); err != io.EOF {
t.Errorf("got %v != exp io.EOF", err)
}
if _, err := serde.DecodeNew([]byte{0, 0, 0, 0, 3, 8}); err != ErrNotRegistered {
t.Errorf("got %v != exp ErrNotRegistered", err)
}
if _, err := serde.DecodeNew([]byte{0, 0, 0, 0, 99}); err != ErrNotRegistered {
t.Errorf("got %v != exp ErrNotRegistered", err)
}
}

func TestConfluentHeader(t *testing.T) {
var h confluentHeader

for i, test := range []struct {
id int
index []int
expEnc []byte
}{
{id: 1, index: nil, expEnc: []byte{0, 0, 0, 0, 1}},
{id: 256, index: nil, expEnc: []byte{0, 0, 0, 1, 0}},
{id: 2, index: []int{0}, expEnc: []byte{0, 0, 0, 0, 2, 0}},
{id: 3, index: []int{1}, expEnc: []byte{0, 0, 0, 0, 3, 2, 2}},
{id: 4, index: []int{1, 2, 3}, expEnc: []byte{0, 0, 0, 0, 4, 6, 2, 4, 6}},
} {
b, err := h.AppendEncode(nil, test.id, test.index)
if err != nil {
t.Errorf("#%d AppendEncode: got unexpected err %v", i, err)
continue
}
if !bytes.Equal(b, test.expEnc) {
t.Errorf("#%d: AppendEncode(%v) != exp(%v)", i, b, test.expEnc)
continue
}

if b2, _ := h.AppendEncode([]byte("foo"), test.id, test.index); !bytes.Equal(b2, append([]byte("foo"), b...)) {
t.Errorf("#%d got AppendEncode(%v) != AppendEncode(foo%v)", i, b2, b)
}

id, b2, err := h.DecodeID(b)
if err != nil {
t.Errorf("#%d DecodeID: got unexpected err %v", i, err)
continue
}
if id != test.id {
t.Errorf("#%d: DecodeID: id(%v) != exp(%v)", i, id, test.id)
continue
}
if test.index == nil && len(b2) != 0 {
t.Errorf("#%d: DecodeID: bytes(%v) != exp([])", i, b2)
continue
}

if test.index != nil {
index, b3, err := h.DecodeIndex(b2, len(test.index))
if err != nil {
t.Errorf("#%d DecodeIndex: got unexpected err %v", i, err)
continue
}
if !reflect.DeepEqual(index, test.index) {
t.Errorf("#%d: DecodeIndex: index(%v) != exp(%v)", i, index, test.index)
continue
}
if len(b3) != 0 {
t.Errorf("#%d: DecodeIndex: bytes(%v) != exp([])", i, b3)
continue
}
}
}

if _, _, err := h.DecodeID([]byte{1, 0, 0, 0, 0, 1}); err != ErrBadHeader {
t.Errorf("got %v != exp ErrBadHeader", err)
}
if _, _, err := h.DecodeID([]byte{0, 0, 0, 0}); err != ErrBadHeader {
t.Errorf("got %v != exp ErrBadHeader", err)
}
if _, _, err := h.DecodeIndex([]byte{2}, 1); err != io.EOF {
t.Errorf("got %v != exp io.EOF", err)
}
if _, _, err := h.DecodeIndex([]byte{6, 2, 4, 6}, 2); err != ErrNotRegistered {
t.Errorf("got %v != exp ErrNotRegistered", err)
}
if _, _, err := h.DecodeIndex([]byte{1}, 2); err != ErrBadHeader {
t.Errorf("got %v != exp ErrBadHeader", err)
}
}