From 4f0cbcc3706918352717fbce27316413cc6589cf Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Wed, 15 May 2024 21:03:35 -0700 Subject: [PATCH 1/5] add firstready balancer --- go/vt/vtgateproxy/firstready_balancer.go | 80 ++++++++++++++++++++++++ go/vt/vtgateproxy/vtgateproxy.go | 6 +- 2 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 go/vt/vtgateproxy/firstready_balancer.go diff --git a/go/vt/vtgateproxy/firstready_balancer.go b/go/vt/vtgateproxy/firstready_balancer.go new file mode 100644 index 00000000000..eaf46353072 --- /dev/null +++ b/go/vt/vtgateproxy/firstready_balancer.go @@ -0,0 +1,80 @@ +package vtgateproxy + +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The firstready balancer implements the GRPC load balancer abstraction by +// routing all queries to the first available target in the list returned from +// discovery. +// +// Similar to the builtin "round_robin" balancer, the base functionality takes care +// of establishing subconns to all targets in the list and keeping them in the "ready" +// state, so all we have to do is pick the first available one from the set. +// +// This is in contrast to the `pick_first` balancer which only establishes the subconn +// to a single target at a time and is therefore subject to undesirable behaviors if, +// for example, the first host in the set is unreachable for some time, but not declared +// down. +// https://github.com/grpc/grpc-go/blob/master/pickfirst.go + +import ( + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/grpclog" +) + +// Name is the name of first_ready balancer. +const Name = "first_ready" + +var logger = grpclog.Component("firstready") + +// newBuilder creates a new roundrobin balancer builder. +func newBuilder() balancer.Builder { + return base.NewBalancerBuilder(Name, &frPickerBuilder{}, base.Config{HealthCheck: true}) +} + +func init() { + balancer.Register(newBuilder()) +} + +type frPickerBuilder struct{} + +func (*frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { + logger.Infof("firstreadyPicker: Build called with info: %v", info) + if len(info.ReadySCs) == 0 { + return base.NewErrPicker(balancer.ErrNoSubConnAvailable) + } + + var subConn balancer.SubConn + for sc := range info.ReadySCs { + subConn = sc + break + } + return &frPicker{ + subConn: subConn, + } +} + +type frPicker struct { + // subConn is the first ready subconn when this picker was + // created. The slice is immutable. Each Get() will do a round robin + // selection from it and return the selected SubConn. + subConn balancer.SubConn +} + +func (p *frPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + return balancer.PickResult{SubConn: p.subConn}, nil +} diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 1911f016a88..159e50a6a80 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -21,6 +21,7 @@ package vtgateproxy import ( "context" "flag" + "fmt" "io" "net/url" "strings" @@ -55,6 +56,7 @@ var ( affinityValue = flag.String("affinity_value", "", "Value to match for routing affinity , e.g. 'use-az1'") addressField = flag.String("address_field", "address", "field name in the json file containing the address") portField = flag.String("port_field", "port", "field name in the json file containing the port") + balancerType = flag.String("balancer", "round_robin", "load balancing algorithm to use") timings = stats.NewTimings("Timings", "proxy timings by operation", "operation") @@ -202,8 +204,10 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn func Init() { log.V(100).Infof("Registering GRPC dial options") + grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil + log.Infof("registering %s load balancer", *balancerType) + return append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, *balancerType))), nil }) RegisterJSONGateResolver( From 878bb5e97aa9082007aaeb3477db5a27db39cba8 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Tue, 21 May 2024 09:00:22 -0700 Subject: [PATCH 2/5] fail if an invalid balancer is picked --- go/vt/vtgateproxy/vtgateproxy.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 159e50a6a80..75b04c74868 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -203,10 +203,18 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn } func Init() { - log.V(100).Infof("Registering GRPC dial options") + log.Infof("registering GRPC dial options: balancer type %s", *balancerType) + + switch *balancerType { + case "round_robin": + case "first_ready": + case "pick_first": + break + default: + log.Fatalf("invalid balancer type %s", *balancerType) + } grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - log.Infof("registering %s load balancer", *balancerType) return append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, *balancerType))), nil }) From 60dc5dfbe7bd5bc6798d4f90ca0227a3728b14d9 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Tue, 21 May 2024 09:15:34 -0700 Subject: [PATCH 3/5] maintain the currentConn even if another one becomes ready --- go/vt/vtgateproxy/firstready_balancer.go | 58 +++++++++++++++++------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/go/vt/vtgateproxy/firstready_balancer.go b/go/vt/vtgateproxy/firstready_balancer.go index eaf46353072..e5591163ee3 100644 --- a/go/vt/vtgateproxy/firstready_balancer.go +++ b/go/vt/vtgateproxy/firstready_balancer.go @@ -31,9 +31,12 @@ limitations under the License. // https://github.com/grpc/grpc-go/blob/master/pickfirst.go import ( + "sync" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/grpclog" + "vitess.io/vitess/go/vt/log" ) // Name is the name of first_ready balancer. @@ -41,7 +44,7 @@ const Name = "first_ready" var logger = grpclog.Component("firstready") -// newBuilder creates a new roundrobin balancer builder. +// newBuilder creates a new first_ready balancer builder. func newBuilder() balancer.Builder { return base.NewBalancerBuilder(Name, &frPickerBuilder{}, base.Config{HealthCheck: true}) } @@ -50,31 +53,52 @@ func init() { balancer.Register(newBuilder()) } -type frPickerBuilder struct{} +// frPickerBuilder implements both the Builder and the Picker interfaces. +// +// Once a conn is chosen and is in the ready state, it will remain as the +// active subconn even if other connections become available. +type frPickerBuilder struct { + mu sync.Mutex + currentConn balancer.SubConn +} + +func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { + log.V(100).Infof("firstreadyPicker: Build called with info: %v", info) -func (*frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { - logger.Infof("firstreadyPicker: Build called with info: %v", info) if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } - var subConn balancer.SubConn + f.mu.Lock() + defer f.mu.Unlock() + + // If we've already chosen a subconn, and it is still in the ready list, then + // no need to change state + if f.currentConn != nil { + log.V(100).Infof("firstreadyPicker: currentConn is active, checking if still ready") + for sc := range info.ReadySCs { + if f.currentConn == sc { + log.V(100).Infof("firstreadyPicker: currentConn still active - not changing") + return f + } + } + } + + // Otherwise either we don't have an active conn or the conn we were using is + // no longer active, so pick an arbitrary new one out of the map. + log.V(100).Infof("firstreadyPicker: currentConn is not active, picking a new one") for sc := range info.ReadySCs { - subConn = sc + f.currentConn = sc break } - return &frPicker{ - subConn: subConn, - } -} -type frPicker struct { - // subConn is the first ready subconn when this picker was - // created. The slice is immutable. Each Get() will do a round robin - // selection from it and return the selected SubConn. - subConn balancer.SubConn + return f } -func (p *frPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { - return balancer.PickResult{SubConn: p.subConn}, nil +// Pick simply returns the currently chosen conn +func (f *frPickerBuilder) Pick(balancer.PickInfo) (balancer.PickResult, error) { + f.mu.Lock() + defer f.mu.Unlock() + + return balancer.PickResult{SubConn: f.currentConn}, nil } From 94197b5c67fcb15bba132b2cfe5fe4dfa22062f9 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Tue, 21 May 2024 09:36:14 -0700 Subject: [PATCH 4/5] remove unnecessary globals --- go/vt/vtgateproxy/firstready_balancer.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/go/vt/vtgateproxy/firstready_balancer.go b/go/vt/vtgateproxy/firstready_balancer.go index e5591163ee3..84f9a13e00c 100644 --- a/go/vt/vtgateproxy/firstready_balancer.go +++ b/go/vt/vtgateproxy/firstready_balancer.go @@ -35,18 +35,12 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" - "google.golang.org/grpc/grpclog" "vitess.io/vitess/go/vt/log" ) -// Name is the name of first_ready balancer. -const Name = "first_ready" - -var logger = grpclog.Component("firstready") - // newBuilder creates a new first_ready balancer builder. func newBuilder() balancer.Builder { - return base.NewBalancerBuilder(Name, &frPickerBuilder{}, base.Config{HealthCheck: true}) + return base.NewBalancerBuilder("first_ready", &frPickerBuilder{}, base.Config{HealthCheck: true}) } func init() { From 23393aacd0b79a92de316179cc2229f0b80b9904 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Tue, 21 May 2024 09:43:35 -0700 Subject: [PATCH 5/5] normalize the names --- go/vt/vtgateproxy/firstready_balancer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgateproxy/firstready_balancer.go b/go/vt/vtgateproxy/firstready_balancer.go index 84f9a13e00c..9610de97018 100644 --- a/go/vt/vtgateproxy/firstready_balancer.go +++ b/go/vt/vtgateproxy/firstready_balancer.go @@ -57,7 +57,7 @@ type frPickerBuilder struct { } func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { - log.V(100).Infof("firstreadyPicker: Build called with info: %v", info) + log.V(100).Infof("first_ready: Build called with info: %v", info) if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) @@ -69,10 +69,10 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { // If we've already chosen a subconn, and it is still in the ready list, then // no need to change state if f.currentConn != nil { - log.V(100).Infof("firstreadyPicker: currentConn is active, checking if still ready") + log.V(100).Infof("first_ready: currentConn is active, checking if still ready") for sc := range info.ReadySCs { if f.currentConn == sc { - log.V(100).Infof("firstreadyPicker: currentConn still active - not changing") + log.V(100).Infof("first_ready: currentConn still active - not changing") return f } } @@ -80,7 +80,7 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { // Otherwise either we don't have an active conn or the conn we were using is // no longer active, so pick an arbitrary new one out of the map. - log.V(100).Infof("firstreadyPicker: currentConn is not active, picking a new one") + log.V(100).Infof("first_ready: currentConn is not active, picking a new one") for sc := range info.ReadySCs { f.currentConn = sc break