Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serde Header #467

Merged
merged 16 commits into from
Jul 10, 2023
Merged

Serde Header #467

merged 16 commits into from
Jul 10, 2023

Conversation

lovromazgon
Copy link
Contributor

@lovromazgon lovromazgon commented Jun 2, 2023

This PR introduces SerdeHeader and defines ConfluentHeader as the default implementation. The behavior of Serde is the same, one small but important difference is a fixed bug in the encoding of the message index (see the second part of the PR description). The changes are mostly just a refactoring and extraction of the header encoding/decoding logic. It makes Serde a bit more flexible and future-proof, since the header format can be switched to a different implementation.

The motivation behind this change is to expose the logic for encoding/decoding the Confluent wire format header. Using this change the caller can now decode the ID manually if needed and fetch the schema from the schema registry.

val, err := serde.DecodeNew(b)
if errors.Is(err, sr.ErrNotRegistered) {
	id, _ := serde.Header().DecodeID(b)

	// fetch schema from schema registry using the ID...

	serde.Register(id, myVal, sr.DecodeFn(func(b []byte, v any) error {
		// decode using the schema ...
	})

	// retry decoding
	val, err = serde.DecodeNew(b)
}

The code above is just an illustration, I'm aware that the code would need to take care of request collapsing.

Related to the discussion in #453.

}
return b, t, nil
}

type bReader struct{ b []byte }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there was a specific reason for the custom reader type, I replaced it with bytes.Buffer to simplify.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I often misremember that bytes.Reader is relatively small, and mix it up with bytes.Buffer being a pretty big struct -- but it looks like that's not even true anymore, since the bootstrap field was removed in 2018: golang/go@9c2be4c

@twmb
Copy link
Owner

twmb commented Jun 2, 2023

The documentation is worded confusingly and I got tripped up by that the first time I read it. The "reading the array backwards" describes the following sentence itself, not the wire encoding. The following sentence first describes the inner struct -- the 0th index of the inner struct -- and then describes the outer struct -- the 1st index of the outer struct. The paragraph beneath this describes the actual encoding of the array, " So the above array [1, 0] is encoded as the variable length ints 2,1,0 where the first 2 is the length". I don't know why they wrote it this way.

@lovromazgon
Copy link
Contributor Author

lovromazgon commented Jun 2, 2023

Oh you're right, I read it again carefully. I double checked what they do in confluent-kafka-go and seems like the order was correct as it was.

I'll revert this part 👍 EDIT: Done.

pkg/sr/serde.go Show resolved Hide resolved
pkg/sr/serde.go Outdated Show resolved Hide resolved
pkg/sr/serde.go Outdated
@@ -134,6 +131,20 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) {
s.defaults = opts
}

// SetHeader configures which header should be used when encoding and decoding
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt about this this being SetGlobal, and then SerdeHeader being a GlobalOpt -- i.e. something that applies to all serdes? y/n?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, it would make it easy to add more options in the future. I think naming could be better though. IMO SerdeOpt sounds like a better fit for configuring Serde (i.e. global options), while the current options could be something like RegisterOpt (since they are used in Register). This naming would play well with the changes I proposed in the other PR, which introduces EncodeOpt, we end up with:

  • SerdeOpt configures Serde
  • RegisterOpt configures Serde.Register
  • EncodeOpt configures Serde.Encode

While we're at it, it might make sense to change Opt to ClientOpt and introduce NewSerde(opts ...SerdeOpt) to match NewClient. WDYT? I can remove SetHeader from this PR and introduce the new options in a separate PR, to not increase the scope too much.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • +1 to adding NewSerde, which also removes the need for SetDefaults (so that can / should be removed).
  • +1 to changing Opt to ClientOpt
  • +1 to changing the current register opts to RegisterOpt, and then changing the new opt introduced here to SerdeOpt however all RegisterOpts need to implement SerdeOpt so that they can be used as defaults.
  • I'm not sure about the difference between RegisterOpt and EncodeOpt. I need to look at the other PR. I remember glancing at it and not being sure why it was split. Also I wonder if EncodingOpt is better than both RegisterOpt and EncodeOpt (although I could favor EncodeOpt for brevity)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I'm sorry this has been such a slow review cycle, to put it short: I was involved with something that was time consuming sun up to sun down and also went on for ~3w (coincidentally, right when you opened the PR) -- and I'm trying to not be inside on the weekends.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries! I'll get back to this PR in a day or two 👍

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have time on Thursday to take this over if you're not able to get to it (I admit that this review cycle has gone on for ... quite a while).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the situation turned around, because I'm pretty busy now and I'm not sure I'll get a chance to look into this before the weekend. If you get the time, feel free to take over the work that's left.

pkg/sr/serde.go Outdated Show resolved Hide resolved
pkg/sr/serde.go Outdated Show resolved Hide resolved
pkg/sr/serde.go Outdated Show resolved Hide resolved
pkg/sr/serde.go Show resolved Hide resolved
pkg/sr/serde.go Show resolved Hide resolved
pkg/sr/serde.go Show resolved Hide resolved
pkg/sr/serde.go Show resolved Hide resolved
pkg/sr/serde.go Show resolved Hide resolved
@twmb
Copy link
Owner

twmb commented Jun 15, 2023

Sorry for the delay, finally did a deep review. I don't think I have any comments besides what I've posted now.

pkg/sr/serde.go Outdated
// then the encoded message.
//
// https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format
type ConfluentHeader struct{}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be private? Not sure it needs to be public at the moment -- we can default internally to the private confluent header, but then the user can override.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, if we do that, this can be something like

type tConfluentHeader struct{}

var confluentHeader = new(tConfluentHeader)

and then line 143 above can return confluentHeader the pointer variable (per the pointer methods comment below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the type private, but used different names to avoid the t prefix, hope that's fine:

type confluentHeader struct{}

var defaultSerdeHeader = new(confluentHeader)

Although if that's a convention you are using for private types (i.e. tserde, tConfluentHeader etc.) let me know.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The t in this case means type, I do like it but I'm ok either way.

@lovromazgon
Copy link
Contributor Author

Thanks for the thorough review! 👍 I tried to address all comments in one way or another.

pkg/sr/serde.go Outdated
@@ -132,15 +132,15 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) {
}

// SetHeader configures which header should be used when encoding and decoding
// values. If the header is set to nil it falls back to ConfluentHeader.
// values. If the header is set to nil it falls back to confluentHeader.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

back to the default Confluent header.

pkg/sr/serde.go Outdated Show resolved Hide resolved
pkg/sr/serde.go Outdated
@@ -134,6 +131,26 @@ func (s *Serde) SetDefaults(opts ...SerdeOpt) {
s.defaults = opts
}

// DecodeID decodes an ID from in, returning the ID and the remaining bytes,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decodes an ID from b but I can sneak this into a quickfix PR that I plan to do

@twmb
Copy link
Owner

twmb commented Jul 8, 2023

This PR LGTM, I say squash and merge it.
This is now missing the ability to set a header but I think per this comment chain: #467 (comment) that will be in a follow up PR.

@twmb twmb mentioned this pull request Jul 8, 2023
@lovromazgon
Copy link
Contributor Author

Sounds good to me! I'll take care of the follow-up in a second. I already have a separate branch with the changes, I just had something urgent come up so I didn't get to open the PR, sorry about that.

@twmb twmb merged commit df66c94 into twmb:master Jul 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants