-
-
Notifications
You must be signed in to change notification settings - Fork 196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serde Header #467
Serde Header #467
Changes from 8 commits
ca4c5e2
9be504f
1982c8a
0cdc3c6
b107644
b3312a2
08ea701
96dd4b5
29e1d2e
0df919b
27a8e66
22504eb
08a4bb0
4c9231f
fea6696
b34cf0d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,14 @@ | ||
package sr | ||
|
||
import ( | ||
"bytes" | ||
"encoding/binary" | ||
"errors" | ||
"io" | ||
"reflect" | ||
"sync" | ||
"sync/atomic" | ||
) | ||
|
||
// The wire format for encoded types is 0, then big endian uint32 of the ID, | ||
// then the encoded message. | ||
// | ||
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format | ||
|
||
var ( | ||
// ErrNotRegistered is returned from Serde when attempting to encode a | ||
// value or decode an ID that has not been registered, or when using | ||
|
@@ -80,8 +75,9 @@ type tserde struct { | |
gen func() any | ||
typeof reflect.Type | ||
|
||
index []int // for encoding, an optional index we use | ||
subindex map[int]tserde // for decoding, we look up sub-indices in the payload | ||
index []int // for encoding, an optional index we use | ||
subindex map[int]tserde // for decoding, we look up sub-indices in the payload | ||
subindexDepth int // for decoding, we need to know the maximum depth of the subindex map | ||
} | ||
|
||
// Serde encodes and decodes values according to the schema registry wire | ||
|
@@ -101,6 +97,7 @@ type Serde struct { | |
mu sync.Mutex | ||
|
||
defaults []SerdeOpt | ||
header SerdeHeader | ||
} | ||
|
||
var ( | ||
|
@@ -134,6 +131,20 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { | |
s.defaults = opts | ||
} | ||
|
||
// SetHeader configures which header should be used when encoding and decoding | ||
// values. If the header is set to nil it falls back to confluentHeader. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
func (s *Serde) SetHeader(header SerdeHeader) { | ||
s.header = header | ||
} | ||
|
||
// Header returns the configured header. | ||
func (s *Serde) Header() SerdeHeader { | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if s.header == nil { | ||
return defaultSerdeHeader | ||
} | ||
return s.header | ||
} | ||
|
||
// Register registers a schema ID and the value it corresponds to, as well as | ||
// the encoding or decoding functions. You need to register functions depending | ||
// on whether you are only encoding, only decoding, or both. | ||
|
@@ -172,7 +183,16 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { | |
// tree to find the end node we are initializing. | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
k := id | ||
at := m[k] | ||
for _, idx := range t.index { | ||
max := func(i, j int) int { | ||
if i > j { | ||
return i | ||
} | ||
return j | ||
} | ||
for i, idx := range t.index { | ||
at.subindexDepth = max(at.subindexDepth, len(t.index)-i) | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
m[k] = at | ||
|
||
m = at.subindex | ||
k = idx | ||
at = m[k] | ||
|
@@ -186,15 +206,16 @@ func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { | |
|
||
// Now, we initialize the end node. | ||
t = tserde{ | ||
id: uint32(id), | ||
exists: true, | ||
encode: t.encode, | ||
appendEncode: t.appendEncode, | ||
decode: t.decode, | ||
gen: t.gen, | ||
typeof: typeof, | ||
index: t.index, | ||
subindex: at.subindex, | ||
id: uint32(id), | ||
exists: true, | ||
encode: t.encode, | ||
appendEncode: t.appendEncode, | ||
decode: t.decode, | ||
gen: t.gen, | ||
typeof: typeof, | ||
index: t.index, | ||
subindex: at.subindex, | ||
subindexDepth: at.subindexDepth, | ||
} | ||
m[k] = t | ||
} | ||
|
@@ -219,8 +240,8 @@ func tserdeMapClone(m map[int]tserde, at int, index []int) map[int]tserde { | |
return dup | ||
} | ||
|
||
// Encode encodes a value according to the schema registry wire format and | ||
// returns it. If EncodeFn was not used, this returns ErrNotRegistered. | ||
// Encode encodes a value and prepends the header according to the configured | ||
// SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered. | ||
func (s *Serde) Encode(v any) ([]byte, error) { | ||
return s.AppendEncode(nil, v) | ||
} | ||
|
@@ -234,23 +255,9 @@ func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { | |
return b, ErrNotRegistered | ||
} | ||
|
||
b = append(b, | ||
0, | ||
byte(t.id>>24), | ||
byte(t.id>>16), | ||
byte(t.id>>8), | ||
byte(t.id>>0), | ||
) | ||
|
||
if len(t.index) > 0 { | ||
if len(t.index) == 1 && t.index[0] == 0 { | ||
b = append(b, 0) // first-index shortcut (one type in the protobuf) | ||
} else { | ||
b = binary.AppendVarint(b, int64(len(t.index))) | ||
for _, idx := range t.index { | ||
b = binary.AppendVarint(b, int64(idx)) | ||
} | ||
} | ||
b, err := s.Header().AppendEncode(b, int(t.id), t.index) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if t.appendEncode != nil { | ||
|
@@ -316,44 +323,119 @@ func (s *Serde) DecodeNew(b []byte) (any, error) { | |
} | ||
|
||
func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { | ||
if len(b) < 5 || b[0] != 0 { | ||
return nil, tserde{}, ErrBadHeader | ||
h := s.Header() | ||
|
||
id, b, err := h.DecodeID(b) | ||
if err != nil { | ||
return nil, tserde{}, err | ||
} | ||
id := binary.BigEndian.Uint32(b[1:5]) | ||
b = b[5:] | ||
|
||
t := s.loadIDs()[int(id)] | ||
t := s.loadIDs()[id] | ||
if len(t.subindex) > 0 { | ||
r := bReader{b} | ||
br := io.ByteReader(&r) | ||
l, err := binary.ReadVarint(br) | ||
if l == 0 { // length 0 is a shortcut for length 1, index 0 | ||
t = t.subindex[0] | ||
} | ||
for err == nil && t.subindex != nil && l > 0 { | ||
var idx int64 | ||
idx, err = binary.ReadVarint(br) | ||
t = t.subindex[int(idx)] | ||
l-- | ||
} | ||
var index []int | ||
index, b, err = h.DecodeIndex(b, t.subindexDepth) | ||
if err != nil { | ||
return nil, t, err | ||
return nil, tserde{}, err | ||
} | ||
for _, idx := range index { | ||
if t.subindex == nil { | ||
return nil, tserde{}, ErrNotRegistered | ||
} | ||
t = t.subindex[idx] | ||
} | ||
b = r.b | ||
} | ||
if !t.exists { | ||
return nil, t, ErrNotRegistered | ||
return nil, tserde{}, ErrNotRegistered | ||
} | ||
return b, t, nil | ||
} | ||
|
||
type bReader struct{ b []byte } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if there was a specific reason for the custom reader type, I replaced it with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I often misremember that bytes.Reader is relatively small, and mix it up with bytes.Buffer being a pretty big struct -- but it looks like that's not even true anymore, since the |
||
// SerdeHeader encodes and decodes a message header. | ||
type SerdeHeader interface { | ||
// AppendEncode encodes a schema ID and optional index to b, returning the | ||
// updated slice or an error. | ||
AppendEncode(b []byte, id int, index []int) ([]byte, error) | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// DecodeID decodes an ID from in, returning the ID and the remaining bytes, | ||
// or an error. | ||
DecodeID(in []byte) (id int, out []byte, err error) | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// DecodeIndex decodes at most maxLength of a schema index from in, | ||
// returning the index and remaining bytes, or an error. | ||
DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error) | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
var defaultSerdeHeader = new(confluentHeader) | ||
|
||
// confluentHeader is a SerdeHeader that produces the Confluent wire format. It | ||
// starts with 0, then big endian uint32 of the ID, then index (only protobuf), | ||
// then the encoded message. | ||
// | ||
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format | ||
type confluentHeader struct{} | ||
|
||
// AppendEncode appends an encoded header to b according to the Confluent wire | ||
// format and returns it. Error is always nil. | ||
func (*confluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, error) { | ||
b = append( | ||
b, | ||
0, | ||
byte(id>>24), | ||
byte(id>>16), | ||
byte(id>>8), | ||
byte(id>>0), | ||
) | ||
|
||
if len(index) > 0 { | ||
if len(index) == 1 && index[0] == 0 { | ||
b = append(b, 0) // first-index shortcut (one type in the protobuf) | ||
} else { | ||
b = binary.AppendVarint(b, int64(len(index))) | ||
for _, idx := range index { | ||
b = binary.AppendVarint(b, int64(idx)) | ||
} | ||
} | ||
} | ||
|
||
func (b *bReader) ReadByte() (byte, error) { | ||
if len(b.b) > 0 { | ||
r := b.b[0] | ||
b.b = b.b[1:] | ||
return r, nil | ||
return b, nil | ||
} | ||
|
||
// DecodeID strips and decodes the schema ID from b. It returns the ID alongside | ||
// the unread bytes. If the header does not contain the magic byte or b contains | ||
// less than 5 bytes it returns ErrBadHeader. | ||
func (*confluentHeader) DecodeID(b []byte) (int, []byte, error) { | ||
if len(b) < 5 || b[0] != 0 { | ||
return 0, nil, ErrBadHeader | ||
} | ||
id := binary.BigEndian.Uint32(b[1:5]) | ||
return int(id), b[5:], nil | ||
} | ||
|
||
// DecodeIndex strips and decodes indices from b. It returns the index slice | ||
// alongside the unread bytes. It expects b to be the output of DecodeID (schema | ||
// ID should already be stripped away). If maxLength is greater than 0 and the | ||
// encoded data contains more indices than maxLength the function returns | ||
// ErrNotRegistered. | ||
func (*confluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error) { | ||
buf := bytes.NewBuffer(b) | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
l, err := binary.ReadVarint(buf) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if l == 0 { // length 0 is a shortcut for length 1, index 0 | ||
return []int{0}, buf.Bytes(), nil | ||
} | ||
if l < 0 { // index length can't be negative | ||
return nil, nil, ErrBadHeader | ||
} | ||
if maxLength > 0 && int(l) > maxLength { // index count is greater than expected | ||
return nil, nil, ErrNotRegistered | ||
} | ||
index := make([]int, l) | ||
twmb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i := range index { | ||
idx, err := binary.ReadVarint(buf) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
index[i] = int(idx) | ||
} | ||
return 0, io.EOF | ||
return index, buf.Bytes(), nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt about this this being
SetGlobal
, and thenSerdeHeader
being aGlobalOpt
-- i.e. something that applies to all serdes? y/n?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it would make it easy to add more options in the future. I think naming could be better though. IMO
SerdeOpt
sounds like a better fit for configuringSerde
(i.e. global options), while the current options could be something likeRegisterOpt
(since they are used inRegister
). This naming would play well with the changes I proposed in the other PR, which introducesEncodeOpt
, we end up with:SerdeOpt
configuresSerde
RegisterOpt
configuresSerde.Register
EncodeOpt
configuresSerde.Encode
While we're at it, it might make sense to change
Opt
toClientOpt
and introduceNewSerde(opts ...SerdeOpt)
to matchNewClient
. WDYT? I can removeSetHeader
from this PR and introduce the new options in a separate PR, to not increase the scope too much.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I'm sorry this has been such a slow review cycle, to put it short: I was involved with something that was time consuming sun up to sun down and also went on for ~3w (coincidentally, right when you opened the PR) -- and I'm trying to not be inside on the weekends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries! I'll get back to this PR in a day or two 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have time on Thursday to take this over if you're not able to get to it (I admit that this review cycle has gone on for ... quite a while).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the situation turned around, because I'm pretty busy now and I'm not sure I'll get a chance to look into this before the weekend. If you get the time, feel free to take over the work that's left.