-
-
Notifications
You must be signed in to change notification settings - Fork 197
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serde options refactor #506
Closed
Closed
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
0bc4564
rename Opt to ClientOpt
lovromazgon ad5657f
rename SerdeOpt to EncodingOpt, introduce different SerdeOpt
lovromazgon 7544ce4
Add EncodingOpt to Encode and related methods, introduce ID option
lovromazgon 1ade586
Add EncodingOpt to Decode and related methods
lovromazgon 6da9cf4
Merge remote-tracking branch 'upstream/master' into serde-options
lovromazgon 1bb340b
serde header option
lovromazgon 8aaf14f
introduce DynEncode and DynAppendEncode, simplify options
lovromazgon 8591c09
remove option ID
lovromazgon 18b30d9
rename append to encode
lovromazgon f0fd058
Merge branch 'master' into serde-options
lovromazgon 8038cd3
Merge remote-tracking branch 'upstream/master' into serde-options
lovromazgon 172db90
remove DynEncode method and replace it with a normal function
lovromazgon 2ffcf74
remove unused field in test cases
lovromazgon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,52 +20,6 @@ var ( | |
ErrBadHeader = errors.New("5 byte header for value is missing or does not have 0 magic byte") | ||
) | ||
|
||
type ( | ||
// SerdeOpt is an option to configure a Serde. | ||
SerdeOpt interface{ apply(*tserde) } | ||
serdeOpt struct{ fn func(*tserde) } | ||
) | ||
|
||
func (o serdeOpt) apply(t *tserde) { o.fn(t) } | ||
|
||
// EncodeFn allows Serde to encode a value. | ||
func EncodeFn(fn func(any) ([]byte, error)) SerdeOpt { | ||
return serdeOpt{func(t *tserde) { t.encode = fn }} | ||
} | ||
|
||
// AppendEncodeFn allows Serde to encode a value to an existing slice. This | ||
// can be more efficient than EncodeFn; this function is used if it exists. | ||
func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) SerdeOpt { | ||
return serdeOpt{func(t *tserde) { t.appendEncode = fn }} | ||
} | ||
|
||
// DecodeFn allows Serde to decode into a value. | ||
func DecodeFn(fn func([]byte, any) error) SerdeOpt { | ||
return serdeOpt{func(t *tserde) { t.decode = fn }} | ||
} | ||
|
||
// GenerateFn returns a new(Value) that can be decoded into. This function can | ||
// be used to control the instantiation of a new type for DecodeNew. | ||
func GenerateFn(fn func() any) SerdeOpt { | ||
return serdeOpt{func(t *tserde) { t.gen = fn }} | ||
} | ||
|
||
// Index attaches a message index to a value. A single schema ID can be | ||
// registered multiple times with different indices. | ||
// | ||
// This option supports schemas that encode many different values from the same | ||
// schema (namely, protobuf). The index into the schema to encode a | ||
// particular message is specified with `index`. | ||
// | ||
// NOTE: this option must be used for protobuf schemas. | ||
// | ||
// For more information, see where `message-indexes` are described in: | ||
// | ||
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format | ||
func Index(index ...int) SerdeOpt { | ||
return serdeOpt{func(t *tserde) { t.index = index }} | ||
} | ||
|
||
type tserde struct { | ||
id uint32 | ||
exists bool | ||
|
@@ -96,7 +50,7 @@ type Serde struct { | |
types atomic.Value // map[reflect.Type]tserde | ||
mu sync.Mutex | ||
|
||
defaults []SerdeOpt | ||
defaults []EncodingOpt | ||
h SerdeHeader | ||
} | ||
|
||
|
@@ -105,6 +59,25 @@ var ( | |
noTypes = make(map[reflect.Type]tserde) | ||
) | ||
|
||
// NewSerde returns a new Serde using the supplied default options, which are | ||
// applied to every registered type. These options are always applied first, so | ||
// you can override them as necessary when registering. | ||
// | ||
// This can be useful if you always want to use the same encoding or decoding | ||
// functions. | ||
func NewSerde(opts ...SerdeOrEncodingOpt) *Serde { | ||
var s Serde | ||
for _, opt := range opts { | ||
switch opt := opt.(type) { | ||
case SerdeOpt: | ||
opt.apply(&s) | ||
case EncodingOpt: | ||
s.defaults = append(s.defaults, opt) | ||
} | ||
} | ||
return &s | ||
} | ||
|
||
func (s *Serde) loadIDs() map[int]tserde { | ||
ids := s.ids.Load() | ||
if ids == nil { | ||
|
@@ -121,16 +94,6 @@ func (s *Serde) loadTypes() map[reflect.Type]tserde { | |
return types.(map[reflect.Type]tserde) | ||
} | ||
|
||
// SetDefaults sets default options to apply to every registered type. These | ||
// options are always applied first, so you can override them as necessary when | ||
// registering. | ||
// | ||
// This can be useful if you always want to use the same encoding or decoding | ||
// functions. | ||
func (s *Serde) SetDefaults(opts ...SerdeOpt) { | ||
s.defaults = opts | ||
} | ||
|
||
// DecodeID decodes an ID from b, returning the ID and the remaining bytes, | ||
// or an error. | ||
func (s *Serde) DecodeID(b []byte) (id int, out []byte, err error) { | ||
|
@@ -154,7 +117,7 @@ func (s *Serde) header() SerdeHeader { | |
// Register registers a schema ID and the value it corresponds to, as well as | ||
// the encoding or decoding functions. You need to register functions depending | ||
// on whether you are only encoding, only decoding, or both. | ||
func (s *Serde) Register(id int, v any, opts ...SerdeOpt) { | ||
func (s *Serde) Register(id int, v any, opts ...EncodingOpt) { | ||
var t tserde | ||
for _, opt := range s.defaults { | ||
opt.apply(&t) | ||
|
@@ -258,28 +221,31 @@ func (s *Serde) Encode(v any) ([]byte, error) { | |
return s.AppendEncode(nil, v) | ||
} | ||
|
||
// AppendEncode appends an encoded value to b according to the schema registry | ||
// wire format and returns it. If EncodeFn was not used, this returns | ||
// ErrNotRegistered. | ||
// AppendEncode encodes a value and prepends the header according to the | ||
// configured SerdeHeader, appends it to b and returns b. If EncodeFn was not | ||
// registered, this returns ErrNotRegistered. | ||
func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error) { | ||
t, ok := s.loadTypes()[reflect.TypeOf(v)] | ||
if !ok || (t.encode == nil && t.appendEncode == nil) { | ||
return b, ErrNotRegistered | ||
} | ||
// Load tserde based on the registered type. | ||
t := s.loadTypes()[reflect.TypeOf(v)] | ||
|
||
b, err := s.header().AppendEncode(b, int(t.id), t.index) | ||
if err != nil { | ||
return nil, err | ||
// Check if we loaded a valid tserde. | ||
if !t.exists || (t.encode == nil && t.appendEncode == nil) { | ||
return nil, ErrNotRegistered | ||
} | ||
|
||
if t.appendEncode != nil { | ||
return t.appendEncode(b, v) | ||
} | ||
encoded, err := t.encode(v) | ||
if err != nil { | ||
return nil, err | ||
appendEncode := t.appendEncode | ||
if appendEncode == nil { | ||
// Fallback to t.encode. | ||
appendEncode = func(b []byte, v any) ([]byte, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this causes an extra alloc in the encode path. |
||
encoded, err := t.encode(v) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return append(b, encoded...), nil | ||
} | ||
} | ||
return append(b, encoded...), nil | ||
|
||
return AppendEncode(b, v, int(t.id), t.index, s.header(), appendEncode) | ||
} | ||
|
||
// MustEncode returns the value of Encode, panicking on error. This is a | ||
|
@@ -328,8 +294,10 @@ func (s *Serde) DecodeNew(b []byte) (any, error) { | |
var v any | ||
if t.gen != nil { | ||
v = t.gen() | ||
} else { | ||
} else if t.typeof != nil { | ||
v = reflect.New(t.typeof).Interface() | ||
} else { | ||
return nil, ErrNotRegistered | ||
} | ||
return v, t.decode(b, v) | ||
} | ||
|
@@ -339,7 +307,6 @@ func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { | |
if err != nil { | ||
return nil, tserde{}, err | ||
} | ||
|
||
t := s.loadIDs()[id] | ||
if len(t.subindex) > 0 { | ||
var index []int | ||
|
@@ -354,12 +321,37 @@ func (s *Serde) decodeFind(b []byte) ([]byte, tserde, error) { | |
t = t.subindex[idx] | ||
} | ||
} | ||
if !t.exists { | ||
|
||
// Check if we loaded a valid tserde. | ||
if !t.exists || t.decode == nil { | ||
return nil, tserde{}, ErrNotRegistered | ||
} | ||
|
||
return b, t, nil | ||
} | ||
|
||
// Encode encodes a value and prepends the header. If the encoding function | ||
// fails, this returns an error. | ||
func Encode(v any, id int, index []int, h SerdeHeader, enc func(any) ([]byte, error)) ([]byte, error) { | ||
return AppendEncode(nil, v, id, index, h, func(b []byte, val any) ([]byte, error) { | ||
encoded, err := enc(val) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return append(b, encoded...), nil | ||
}) | ||
} | ||
|
||
// AppendEncode encodes a value and prepends the header, appends it to b and | ||
// returns b. If the encoding function fails, this returns an error. | ||
func AppendEncode(b []byte, v any, id int, index []int, h SerdeHeader, enc func([]byte, any) ([]byte, error)) ([]byte, error) { | ||
b, err := h.AppendEncode(b, id, index) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return enc(b, v) | ||
} | ||
|
||
// SerdeHeader encodes and decodes a message header. | ||
type SerdeHeader interface { | ||
// AppendEncode encodes a schema ID and optional index to b, returning the | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this was lost (and not added in one of the new top level functions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, it's used in the new call to AppendEncode below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might rather have a little bit of duplication, not sure yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep leaning duplication; I rebased on master and squashed all of your commits and did a few touchups in #742.