Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: upgrade to 1.66.2 and use Codec v2 #16790

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/fsnotify/fsnotify v1.7.0
github.com/go-sql-driver/mysql v1.7.1
github.com/golang/glog v1.2.2
github.com/golang/protobuf v1.5.4
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.6.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
Expand Down
70 changes: 41 additions & 29 deletions go/vt/servenv/grpc_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,64 @@ limitations under the License.
package servenv

import (
"fmt"

// use the original golang/protobuf package we can continue serializing
// messages from our dependencies, particularly from etcd
"github.com/golang/protobuf/proto" //nolint

"google.golang.org/grpc/encoding"
"google.golang.org/grpc/mem"

// Guarantee that the built-in proto is called registered before this one
// so that it can be replaced.
_ "google.golang.org/grpc/encoding/proto" // nolint:revive
)

// Name is the name registered for the proto compressor.
const Name = "proto"

type vtprotoCodec struct{}

type vtprotoMessage interface {
MarshalVT() ([]byte, error)
MarshalToSizedBufferVT(data []byte) (int, error)
UnmarshalVT([]byte) error
SizeVT() int
}

func (vtprotoCodec) Marshal(v any) ([]byte, error) {
switch v := v.(type) {
case vtprotoMessage:
return v.MarshalVT()
case proto.Message:
return proto.Marshal(v)
default:
return nil, fmt.Errorf("failed to marshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
}
type Codec struct {
fallback encoding.CodecV2
}

func (vtprotoCodec) Unmarshal(data []byte, v any) error {
switch v := v.(type) {
case vtprotoMessage:
return v.UnmarshalVT(data)
case proto.Message:
return proto.Unmarshal(data, v)
default:
return fmt.Errorf("failed to unmarshal, message is %T, must satisfy the vtprotoMessage interface or want proto.Message", v)
func (Codec) Name() string { return Name }

var defaultBufferPool = mem.DefaultBufferPool()

func (c *Codec) Marshal(v any) (mem.BufferSlice, error) {
if m, ok := v.(vtprotoMessage); ok {
size := m.SizeVT()
if mem.IsBelowBufferPoolingThreshold(size) {
buf := make([]byte, size)
if _, err := m.MarshalToSizedBufferVT(buf[:size]); err != nil {
return nil, err
}
return mem.BufferSlice{mem.SliceBuffer(buf)}, nil
}
buf := defaultBufferPool.Get(size)
if _, err := m.MarshalToSizedBufferVT((*buf)[:size]); err != nil {
defaultBufferPool.Put(buf)
return nil, err
}
return mem.BufferSlice{mem.NewBuffer(buf, defaultBufferPool)}, nil
}

return c.fallback.Marshal(v)
}

func (vtprotoCodec) Name() string {
return Name
func (c *Codec) Unmarshal(data mem.BufferSlice, v any) error {
if m, ok := v.(vtprotoMessage); ok {
buf := data.MaterializeToBuffer(defaultBufferPool)
defer buf.Free()
return m.UnmarshalVT(buf.ReadOnlyData())
}

return c.fallback.Unmarshal(data, v)
}

func init() {
encoding.RegisterCodec(vtprotoCodec{})
encoding.RegisterCodecV2(&Codec{
fallback: encoding.GetCodecV2("proto"),
})
}
Loading