-
-
Notifications
You must be signed in to change notification settings - Fork 196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serde Header #467
Serde Header #467
Conversation
} | ||
return b, t, nil | ||
} | ||
|
||
type bReader struct{ b []byte } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if there was a specific reason for the custom reader type, I replaced it with bytes.Buffer
to simplify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I often misremember that bytes.Reader is relatively small, and mix it up with bytes.Buffer being a pretty big struct -- but it looks like that's not even true anymore, since the bootstrap
field was removed in 2018: golang/go@9c2be4c
The documentation is worded confusingly and I got tripped up by that the first time I read it. The "reading the array backwards" describes the following sentence itself, not the wire encoding. The following sentence first describes the inner struct -- the 0th index of the inner struct -- and then describes the outer struct -- the 1st index of the outer struct. The paragraph beneath this describes the actual encoding of the array, " So the above array [1, 0] is encoded as the variable length ints 2,1,0 where the first 2 is the length". I don't know why they wrote it this way. |
Oh you're right, I read it again carefully. I double checked what they do in confluent-kafka-go and seems like the order was correct as it was. I'll revert this part 👍 EDIT: Done. |
pkg/sr/serde.go
Outdated
@@ -134,6 +131,20 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { | |||
s.defaults = opts | |||
} | |||
|
|||
// SetHeader configures which header should be used when encoding and decoding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wdyt about this this being SetGlobal
, and then SerdeHeader
being a GlobalOpt
-- i.e. something that applies to all serdes? y/n?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it would make it easy to add more options in the future. I think naming could be better though. IMO SerdeOpt
sounds like a better fit for configuring Serde
(i.e. global options), while the current options could be something like RegisterOpt
(since they are used in Register
). This naming would play well with the changes I proposed in the other PR, which introduces EncodeOpt
, we end up with:
SerdeOpt
configuresSerde
RegisterOpt
configuresSerde.Register
EncodeOpt
configuresSerde.Encode
While we're at it, it might make sense to change Opt
to ClientOpt
and introduce NewSerde(opts ...SerdeOpt)
to match NewClient
. WDYT? I can remove SetHeader
from this PR and introduce the new options in a separate PR, to not increase the scope too much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- +1 to adding NewSerde, which also removes the need for SetDefaults (so that can / should be removed).
- +1 to changing Opt to ClientOpt
- +1 to changing the current register opts to RegisterOpt, and then changing the new opt introduced here to SerdeOpt however all RegisterOpts need to implement SerdeOpt so that they can be used as defaults.
- I'm not sure about the difference between RegisterOpt and EncodeOpt. I need to look at the other PR. I remember glancing at it and not being sure why it was split. Also I wonder if EncodingOpt is better than both RegisterOpt and EncodeOpt (although I could favor EncodeOpt for brevity)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I'm sorry this has been such a slow review cycle, to put it short: I was involved with something that was time consuming sun up to sun down and also went on for ~3w (coincidentally, right when you opened the PR) -- and I'm trying to not be inside on the weekends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries! I'll get back to this PR in a day or two 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might have time on Thursday to take this over if you're not able to get to it (I admit that this review cycle has gone on for ... quite a while).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the situation turned around, because I'm pretty busy now and I'm not sure I'll get a chance to look into this before the weekend. If you get the time, feel free to take over the work that's left.
Sorry for the delay, finally did a deep review. I don't think I have any comments besides what I've posted now. |
pkg/sr/serde.go
Outdated
// then the encoded message. | ||
// | ||
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format | ||
type ConfluentHeader struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be private? Not sure it needs to be public at the moment -- we can default internally to the private confluent header, but then the user can override.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, if we do that, this can be something like
type tConfluentHeader struct{}
var confluentHeader = new(tConfluentHeader)
and then line 143 above can return confluentHeader
the pointer variable (per the pointer methods comment below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made the type private, but used different names to avoid the t
prefix, hope that's fine:
type confluentHeader struct{}
var defaultSerdeHeader = new(confluentHeader)
Although if that's a convention you are using for private types (i.e. tserde
, tConfluentHeader
etc.) let me know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The t in this case means type, I do like it but I'm ok either way.
Thanks for the thorough review! 👍 I tried to address all comments in one way or another. |
pkg/sr/serde.go
Outdated
@@ -132,15 +132,15 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { | |||
} | |||
|
|||
// SetHeader configures which header should be used when encoding and decoding | |||
// values. If the header is set to nil it falls back to ConfluentHeader. | |||
// values. If the header is set to nil it falls back to confluentHeader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
back to the default Confluent header.
pkg/sr/serde.go
Outdated
@@ -134,6 +131,26 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) { | |||
s.defaults = opts | |||
} | |||
|
|||
// DecodeID decodes an ID from in, returning the ID and the remaining bytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decodes an ID from b
but I can sneak this into a quickfix PR that I plan to do
This PR LGTM, I say squash and merge it. |
Sounds good to me! I'll take care of the follow-up in a second. I already have a separate branch with the changes, I just had something urgent come up so I didn't get to open the PR, sorry about that. |
This PR introduces
SerdeHeader
and definesConfluentHeader
as the default implementation. The behavior ofSerde
is the same,one small but important difference is a fixed bug in the encoding of the message index (see the second part of the PR description). The changes are mostly just a refactoring and extraction of the header encoding/decoding logic. It makesSerde
a bit more flexible and future-proof, since the header format can be switched to a different implementation.The motivation behind this change is to expose the logic for encoding/decoding the Confluent wire format header. Using this change the caller can now decode the ID manually if needed and fetch the schema from the schema registry.
The code above is just an illustration, I'm aware that the code would need to take care of request collapsing.
Related to the discussion in #453.