From be512ec55a041b3c3f91ab8bad414858738d87e0 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Thu, 21 Mar 2024 13:01:35 -0700 Subject: [PATCH 01/16] Draft: very messy and doesn't compile --- go/vt/vtgateproxy/discovery.go | 70 +++++++++++++++--------------- go/vt/vtgateproxy/gate_balancer.go | 41 ++++++++++++----- go/vt/vtgateproxy/vtgateproxy.go | 38 ++++++++-------- 3 files changed, 80 insertions(+), 69 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index b282abc9edb..532823e8330 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -9,7 +9,7 @@ import ( "io" "math/rand" "os" - "strconv" + "strings" "time" "google.golang.org/grpc/attributes" @@ -18,6 +18,7 @@ import ( var ( jsonDiscoveryConfig = flag.String("json_config", "", "json file describing the host list to use fot vitess://vtgate resolution") + numConnectionsInt = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain") ) // File based discovery for vtgate grpc endpoints @@ -54,33 +55,27 @@ type JSONGateConfigDiscovery struct { JsonPath string } +const queryParamFilterPrefix = "filter_" + func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { fmt.Printf("Start registration for target: %v\n", target.URL.String()) queryOpts := target.URL.Query() - queryParamCount := queryOpts.Get("num_connections") - queryAZID := queryOpts.Get("az_id") - num_connections := 0 - gateType := target.URL.Host - if queryParamCount != "" { - num_connections, _ = strconv.Atoi(queryParamCount) - } - - filters := resolveFilters{ - gate_type: gateType, - } - - if queryAZID != "" { - filters.az_id = queryAZID + filters := hostFilters{} + filters["type"] = gateType + for k, _ := range queryOpts { + if strings.HasPrefix(k, queryParamFilterPrefix) { + filteredPrefix := strings.TrimPrefix(k, queryParamFilterPrefix) + filters[filteredPrefix] = queryOpts.Get(k) + } } r := &resolveJSONGateConfig{ - target: target, - cc: cc, - jsonPath: b.JsonPath, - num_connections: num_connections, - filters: filters, + target: target, + cc: cc, + jsonPath: b.JsonPath, + filters: filters, } r.start() return r, nil @@ -101,23 +96,25 @@ type resolveFilters struct { az_id string } +type hostFilters = map[string]string + // exampleResolver is a // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). type resolveJSONGateConfig struct { - target resolver.Target - cc resolver.ClientConn - jsonPath string - ticker *time.Ticker - rand *rand.Rand // safe for concurrent use. - num_connections int - filters resolveFilters + target resolver.Target + cc resolver.ClientConn + jsonPath string + ticker *time.Ticker + rand *rand.Rand // safe for concurrent use. + filters hostFilters } type discoverySlackAZ struct{} type discoverySlackType struct{} +type matchesFilter struct{} func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) { - config := []DiscoveryHost{} + pairs := []map[string]string{} fmt.Printf("Loading config %v\n", r.jsonPath) data, err := os.ReadFile(r.jsonPath) @@ -125,27 +122,28 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error return nil, nil, err } - err = json.Unmarshal(data, &config) + err = json.Unmarshal(data, &pairs) if err != nil { fmt.Printf("parse err: %v\n", err) return nil, nil, err } addrs := []resolver.Address{} - for _, s := range config { - az := attributes.New(discoverySlackAZ{}, s.AZId).WithValue(discoverySlackType{}, s.Type) + for _, pair := range pairs { + attributes := attributes.New(matchesFilter{}, true) - // Filter hosts to this gate type - if r.filters.gate_type != "" { - if r.filters.gate_type != s.Type { + for k, v := range r.filters { + if pair[k] != v { + fmt.Printf("Filtering out %v", pair) + attributes.WithValue(matchesFilter{}, false) continue } } // Add matching hosts to registration list addrs = append(addrs, resolver.Address{ - Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc), - BalancerAttributes: az, + Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]), + BalancerAttributes: attributes, }) } diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index 77f8de98c19..5045622f407 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strconv" + "strings" "sync" "sync/atomic" @@ -16,13 +17,14 @@ import ( // Name is the name of az affinity balancer. const Name = "slack_affinity_balancer" -const MetadataAZKey = "grpc-slack-az-metadata" const MetadataHostAffinityCount = "grpc-slack-num-connections-metadata" +const MetadataDiscoveryFilterPrefix = "grpc_discovery_filter_" var logger = grpclog.Component("slack_affinity_balancer") -func WithSlackAZAffinityContext(ctx context.Context, azID string, numConnections string) context.Context { - ctx = metadata.AppendToOutgoingContext(ctx, MetadataAZKey, azID, MetadataHostAffinityCount, numConnections) +func WithSlackAZAffinityContext(ctx context.Context, numConnections string, filters metadata.MD) context.Context { + metadata.NewOutgoingContext(ctx, filters) + ctx = metadata.AppendToOutgoingContext(ctx, MetadataHostAffinityCount, numConnections) return ctx } @@ -44,27 +46,30 @@ func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } allSubConns := []balancer.SubConn{} - subConnsByAZ := map[string][]balancer.SubConn{} + subConnsByFiltered := []balancer.SubConn{} for sc := range info.ReadySCs { subConnInfo, _ := info.ReadySCs[sc] - az := subConnInfo.Address.BalancerAttributes.Value(discoverySlackAZ{}).(string) + matchesFilter := subConnInfo.Address.BalancerAttributes.Value(matchesFilter{}).(bool) allSubConns = append(allSubConns, sc) - subConnsByAZ[az] = append(subConnsByAZ[az], sc) + if matchesFilter { + subConnsByFiltered = append(subConnsByFiltered, sc) + } + } return &slackAZAffinityPicker{ - allSubConns: allSubConns, - subConnsByAZ: subConnsByAZ, + allSubConns: allSubConns, + filteredSubConns: subConnsByFiltered, } } type slackAZAffinityPicker struct { // allSubConns is all subconns that were in the ready state when the picker was created - allSubConns []balancer.SubConn - subConnsByAZ map[string][]balancer.SubConn - nextByAZ sync.Map - next uint32 + allSubConns []balancer.SubConn + filteredSubConns []balancer.SubConn + nextByAZ sync.Map + next uint32 } // Pick the next in the list from the list of subconns (RR) @@ -90,6 +95,18 @@ func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResul } az := keys[0] + filteredSubconns := p.allSubConns + for k, v := range hdrs { + if strings.HasPrefix(k, MetadataDiscoveryFilterPrefix) { + filterName := strings.TrimPrefix(k, MetadataDiscoveryFilterPrefix) + filterValue := v + } + } + + for _, s := range v { + + } + if az == "" { return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 68869dbd323..0ac2885adaa 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -23,12 +23,13 @@ import ( "flag" "fmt" "io" - "net/url" + "strconv" "strings" "sync" "time" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" @@ -41,8 +42,7 @@ import ( ) var ( - dialTimeout = flag.Duration("dial_timeout", 5*time.Second, "dialer timeout for the GRPC connection") - + dialTimeout = flag.Duration("dial_timeout", 5*time.Second, "dialer timeout for the GRPC connection") defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable") sysVarSetEnabled = flag.Bool("enable_system_settings", true, "This will enable the system settings to be changed per session at the database connection level") @@ -53,24 +53,13 @@ var ( ) type VTGateProxy struct { - targetConns map[string]*vtgateconn.VTGateConn - mu sync.Mutex - azID string - gateType string - numConnections string + targetConns map[string]*vtgateconn.VTGateConn + mu sync.Mutex } -func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) { - targetURL, err := url.Parse(target) - if err != nil { - return nil, err - } - - proxy.azID = targetURL.Query().Get("az_id") - proxy.numConnections = targetURL.Query().Get("num_connections") - proxy.gateType = targetURL.Host - - fmt.Printf("Getting connection for %v in %v with %v connections\n", target, proxy.azID, proxy.numConnections) +func (proxy *VTGateProxy) getConnection(ctx context.Context, target string, filters metadata.MD) (*vtgateconn.VTGateConn, error) { + numConnectionsString := strconv.Itoa(*numConnectionsInt) + fmt.Printf("Getting connection for %v in %v with %v filters\n", target, filters) // If the connection exists, return it proxy.mu.Lock() @@ -90,7 +79,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"slack_affinity_balancer":{}}]}`)), nil }) - conn, err := vtgateconn.DialProtocol(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.numConnections), "grpc", target) + conn, err := vtgateconn.DialProtocol(WithSlackAZAffinityContext(ctx, numConnectionsString, filters), "grpc", target) if err != nil { return nil, err } @@ -108,7 +97,14 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no target string supplied by client") } - conn, err := proxy.getConnection(ctx, target) + filters := metadata.Pairs() + for k, v := range connectionAttributes { + if strings.HasPrefix(k, MetadataDiscoveryFilterPrefix) { + filters.Append(k, v) + } + } + + conn, err := proxy.getConnection(ctx, target, filters) if err != nil { return nil, err } From 15fc1395275af587e5d2202e1a47a74c42155100 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Thu, 21 Mar 2024 14:50:02 -0700 Subject: [PATCH 02/16] Simplifyy --- go/vt/vtgateproxy/discovery.go | 30 ++++++------ go/vt/vtgateproxy/gate_balancer.go | 78 +++++++----------------------- go/vt/vtgateproxy/vtgateproxy.go | 29 ++++++----- 3 files changed, 50 insertions(+), 87 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 532823e8330..c5ceae7f3ed 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -64,7 +64,7 @@ func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.Clie filters := hostFilters{} filters["type"] = gateType - for k, _ := range queryOpts { + for k := range queryOpts { if strings.HasPrefix(k, queryParamFilterPrefix) { filteredPrefix := strings.TrimPrefix(k, queryParamFilterPrefix) filters[filteredPrefix] = queryOpts.Get(k) @@ -91,11 +91,6 @@ func RegisterJsonDiscovery() { fmt.Printf("Registered %v scheme\n", jsonDiscovery.Scheme()) } -type resolveFilters struct { - gate_type string - az_id string -} - type hostFilters = map[string]string // exampleResolver is a @@ -109,12 +104,10 @@ type resolveJSONGateConfig struct { filters hostFilters } -type discoverySlackAZ struct{} -type discoverySlackType struct{} type matchesFilter struct{} func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) { - pairs := []map[string]string{} + pairs := []map[string]interface{}{} fmt.Printf("Loading config %v\n", r.jsonPath) data, err := os.ReadFile(r.jsonPath) @@ -130,20 +123,25 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error addrs := []resolver.Address{} for _, pair := range pairs { - attributes := attributes.New(matchesFilter{}, true) + filterMatch := false for k, v := range r.filters { - if pair[k] != v { - fmt.Printf("Filtering out %v", pair) - attributes.WithValue(matchesFilter{}, false) - continue + if pair[k] == v { + filterMatch = true + } else { + filterMatch = false } } + attrs := attributes.New(matchesFilter{}, "nomatch") + if filterMatch { + attrs = attributes.New(matchesFilter{}, "match") + } + // Add matching hosts to registration list addrs = append(addrs, resolver.Address{ Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]), - BalancerAttributes: attributes, + BalancerAttributes: attrs, }) } @@ -205,7 +203,7 @@ func (r *resolveJSONGateConfig) start() { } // Make sure this wasn't a spurious change by checking the hash - if bytes.Compare(hash, newHash) == 0 && newHash != nil { + if bytes.Equal(hash, newHash) && newHash != nil { fmt.Printf("No content changed in discovery file... ignoring\n") continue } diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index 5045622f407..0a6fbc2b139 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -1,18 +1,13 @@ package vtgateproxy import ( - "context" "errors" "fmt" - "strconv" - "strings" - "sync" "sync/atomic" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/metadata" ) // Name is the name of az affinity balancer. @@ -22,12 +17,6 @@ const MetadataDiscoveryFilterPrefix = "grpc_discovery_filter_" var logger = grpclog.Component("slack_affinity_balancer") -func WithSlackAZAffinityContext(ctx context.Context, numConnections string, filters metadata.MD) context.Context { - metadata.NewOutgoingContext(ctx, filters) - ctx = metadata.AppendToOutgoingContext(ctx, MetadataHostAffinityCount, numConnections) - return ctx -} - func newBuilder() balancer.Builder { return base.NewBalancerBuilder(Name, &slackAZAffinityBalancer{}, base.Config{HealthCheck: true}) } @@ -40,7 +29,7 @@ type slackAZAffinityBalancer struct{} func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker { logger.Infof("slackAZAffinityBalancer: Build called with info: %v", info) - fmt.Printf("Rebuilding picker\n") + fmt.Printf("Rebuilding picker: %v\n", info) if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) @@ -49,15 +38,18 @@ func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker subConnsByFiltered := []balancer.SubConn{} for sc := range info.ReadySCs { - subConnInfo, _ := info.ReadySCs[sc] - matchesFilter := subConnInfo.Address.BalancerAttributes.Value(matchesFilter{}).(bool) + subConnInfo := info.ReadySCs[sc] + matchesFilter := subConnInfo.Address.BalancerAttributes.Value(matchesFilter{}).(string) allSubConns = append(allSubConns, sc) - if matchesFilter { + if matchesFilter == "match" { subConnsByFiltered = append(subConnsByFiltered, sc) } - } + + fmt.Printf("Filtered subcons: %v\n", len(subConnsByFiltered)) + fmt.Printf("All subcons: %v\n", len(allSubConns)) + return &slackAZAffinityPicker{ allSubConns: allSubConns, filteredSubConns: subConnsByFiltered, @@ -68,7 +60,6 @@ type slackAZAffinityPicker struct { // allSubConns is all subconns that were in the ready state when the picker was created allSubConns []balancer.SubConn filteredSubConns []balancer.SubConn - nextByAZ sync.Map next uint32 } @@ -77,60 +68,27 @@ func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, next subConnsLen := uint32(len(scList)) if subConnsLen == 0 { - return balancer.PickResult{}, errors.New("No hosts in list") + return balancer.PickResult{}, errors.New("no hosts in list") } - fmt.Printf("Select offset: %v %v %v\n", nextIndex, nextIndex%subConnsLen, len(scList)) - sc := scList[nextIndex%subConnsLen] + fmt.Printf("Select offset: %v %v %v %v\n", nextIndex, nextIndex%subConnsLen, len(scList), sc) + return balancer.PickResult{SubConn: sc}, nil } func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - hdrs, _ := metadata.FromOutgoingContext(info.Ctx) - numConnections := 0 - keys := hdrs.Get(MetadataAZKey) - if len(keys) < 1 { - return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) - } - az := keys[0] - - filteredSubconns := p.allSubConns - for k, v := range hdrs { - if strings.HasPrefix(k, MetadataDiscoveryFilterPrefix) { - filterName := strings.TrimPrefix(k, MetadataDiscoveryFilterPrefix) - filterValue := v - } - } - - for _, s := range v { - - } - - if az == "" { - return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) - } - - keys = hdrs.Get(MetadataHostAffinityCount) - if len(keys) > 0 { - if i, err := strconv.Atoi(keys[0]); err != nil { - numConnections = i - } - } - - subConns := p.subConnsByAZ[az] - if len(subConns) == 0 { - fmt.Printf("No subconns in az and gate type, pick from anywhere\n") + filteredSubConns := p.filteredSubConns + numConnections := *numConnectionsInt + if len(filteredSubConns) == 0 { + fmt.Printf("No subconns in the filtered list, pick from anywhere in pool\n") return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) } - val, _ := p.nextByAZ.LoadOrStore(az, new(uint32)) - ptr := val.(*uint32) - atomic.AddUint32(ptr, 1) - if len(subConns) >= numConnections && numConnections > 0 { + if len(filteredSubConns) >= numConnections && numConnections > 0 { fmt.Printf("Limiting to first %v\n", numConnections) - return p.pickFromSubconns(subConns[0:numConnections], *ptr) + return p.pickFromSubconns(filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1)) } else { - return p.pickFromSubconns(subConns, *ptr) + return p.pickFromSubconns(filteredSubConns, atomic.AddUint32(&p.next, 1)) } } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 0ac2885adaa..f6932fe888c 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -23,7 +23,7 @@ import ( "flag" "fmt" "io" - "strconv" + "net/url" "strings" "sync" "time" @@ -57,13 +57,12 @@ type VTGateProxy struct { mu sync.Mutex } -func (proxy *VTGateProxy) getConnection(ctx context.Context, target string, filters metadata.MD) (*vtgateconn.VTGateConn, error) { - numConnectionsString := strconv.Itoa(*numConnectionsInt) - fmt.Printf("Getting connection for %v in %v with %v filters\n", target, filters) +func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) { + fmt.Printf("Getting connection for %v\n", target) // If the connection exists, return it proxy.mu.Lock() - existingConn, _ := proxy.targetConns[target] + existingConn := proxy.targetConns[target] if existingConn != nil { proxy.mu.Unlock() return existingConn, nil @@ -79,7 +78,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string, filt return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"slack_affinity_balancer":{}}]}`)), nil }) - conn, err := vtgateconn.DialProtocol(WithSlackAZAffinityContext(ctx, numConnectionsString, filters), "grpc", target) + conn, err := vtgateconn.DialProtocol(ctx, "grpc", target) if err != nil { return nil, err } @@ -97,14 +96,22 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no target string supplied by client") } + targetUrl := url.URL{ + Scheme: "vtgate", + Host: target, + } + filters := metadata.Pairs() + values := url.Values{} for k, v := range connectionAttributes { - if strings.HasPrefix(k, MetadataDiscoveryFilterPrefix) { + if strings.HasPrefix(k, queryParamFilterPrefix) { filters.Append(k, v) + values.Set(k, v) } } + targetUrl.RawQuery = values.Encode() - conn, err := proxy.getConnection(ctx, target, filters) + conn, err := proxy.getConnection(ctx, targetUrl.String()) if err != nil { return nil, err } @@ -116,7 +123,7 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu // same effect as if a "rollback" statement was executed, but does not affect the query // statistics. func (proxy *VTGateProxy) CloseSession(ctx context.Context, session *vtgateconn.VTGateSession) error { - return session.CloseSession(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType)) + return session.CloseSession(ctx) } // ResolveTransaction resolves the specified 2PC transaction. @@ -138,11 +145,11 @@ func (proxy *VTGateProxy) Execute(ctx context.Context, session *vtgateconn.VTGat return &sqltypes.Result{}, nil } - return session.Execute(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType), sql, bindVariables) + return session.Execute(ctx, sql, bindVariables) } func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { - stream, err := session.StreamExecute(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType), sql, bindVariables) + stream, err := session.StreamExecute(ctx, sql, bindVariables) if err != nil { return err } From 9e12ae1e86cedce59a38863dcd45b713a3ddfb83 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Thu, 21 Mar 2024 14:51:55 -0700 Subject: [PATCH 03/16] less log, plz --- go/vt/vtgateproxy/gate_balancer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index 0a6fbc2b139..eafcac03c62 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -29,7 +29,7 @@ type slackAZAffinityBalancer struct{} func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker { logger.Infof("slackAZAffinityBalancer: Build called with info: %v", info) - fmt.Printf("Rebuilding picker: %v\n", info) + fmt.Printf("Rebuilding picker\n") if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) From 09da6d494ad361f36519706364ff01737e4a5e30 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Thu, 21 Mar 2024 17:26:33 -0700 Subject: [PATCH 04/16] simplify more --- go/vt/vtgateproxy/discovery.go | 15 ++++++------ go/vt/vtgateproxy/gate_balancer.go | 37 +++++++++++------------------- go/vt/vtgateproxy/vtgateproxy.go | 2 ++ 3 files changed, 22 insertions(+), 32 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index c5ceae7f3ed..f9744026e64 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -71,7 +71,7 @@ func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.Clie } } - r := &resolveJSONGateConfig{ + r := &JSONGateConfigResolver{ target: target, cc: cc, jsonPath: b.JsonPath, @@ -95,7 +95,7 @@ type hostFilters = map[string]string // exampleResolver is a // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). -type resolveJSONGateConfig struct { +type JSONGateConfigResolver struct { target resolver.Target cc resolver.ClientConn jsonPath string @@ -106,7 +106,7 @@ type resolveJSONGateConfig struct { type matchesFilter struct{} -func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) { +func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, error) { pairs := []map[string]interface{}{} fmt.Printf("Loading config %v\n", r.jsonPath) @@ -123,7 +123,6 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error addrs := []resolver.Address{} for _, pair := range pairs { - filterMatch := false for k, v := range r.filters { if pair[k] == v { @@ -145,7 +144,7 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error }) } - fmt.Printf("Addrs: %v\n", addrs) + fmt.Printf("Loaded addrs from discovery file: %v\n", addrs) // Shuffle to ensure every host has a different order to iterate through r.rand.Shuffle(len(addrs), func(i, j int) { @@ -161,7 +160,7 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error return &addrs, h.Sum(nil), nil } -func (r *resolveJSONGateConfig) start() { +func (r *JSONGateConfigResolver) start() { fmt.Print("Starting discovery checker\n") r.rand = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -219,8 +218,8 @@ func (r *resolveJSONGateConfig) start() { fmt.Printf("Loaded hosts, starting ticker\n") } -func (r *resolveJSONGateConfig) ResolveNow(o resolver.ResolveNowOptions) {} -func (r *resolveJSONGateConfig) Close() { +func (r *JSONGateConfigResolver) ResolveNow(o resolver.ResolveNowOptions) {} +func (r *JSONGateConfigResolver) Close() { r.ticker.Stop() } diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index eafcac03c62..2114e307d8b 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -7,30 +7,22 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" - "google.golang.org/grpc/grpclog" ) // Name is the name of az affinity balancer. const Name = "slack_affinity_balancer" -const MetadataHostAffinityCount = "grpc-slack-num-connections-metadata" -const MetadataDiscoveryFilterPrefix = "grpc_discovery_filter_" - -var logger = grpclog.Component("slack_affinity_balancer") func newBuilder() balancer.Builder { - return base.NewBalancerBuilder(Name, &slackAZAffinityBalancer{}, base.Config{HealthCheck: true}) + return base.NewBalancerBuilder(Name, &pickerBuilder{}, base.Config{HealthCheck: true}) } func init() { balancer.Register(newBuilder()) } -type slackAZAffinityBalancer struct{} - -func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker { - logger.Infof("slackAZAffinityBalancer: Build called with info: %v", info) - fmt.Printf("Rebuilding picker\n") +type pickerBuilder struct{} +func (*pickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } @@ -47,16 +39,13 @@ func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker } } - fmt.Printf("Filtered subcons: %v\n", len(subConnsByFiltered)) - fmt.Printf("All subcons: %v\n", len(allSubConns)) - - return &slackAZAffinityPicker{ + return &filteredAffinityPicker{ allSubConns: allSubConns, filteredSubConns: subConnsByFiltered, } } -type slackAZAffinityPicker struct { +type filteredAffinityPicker struct { // allSubConns is all subconns that were in the ready state when the picker was created allSubConns []balancer.SubConn filteredSubConns []balancer.SubConn @@ -64,7 +53,7 @@ type slackAZAffinityPicker struct { } // Pick the next in the list from the list of subconns (RR) -func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) { +func (p *filteredAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) { subConnsLen := uint32(len(scList)) if subConnsLen == 0 { @@ -72,23 +61,23 @@ func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, next } sc := scList[nextIndex%subConnsLen] - fmt.Printf("Select offset: %v %v %v %v\n", nextIndex, nextIndex%subConnsLen, len(scList), sc) + fmt.Printf("Select offset: iteration:(%v) mod:(%v) connLen:(%v)\n", nextIndex, nextIndex%subConnsLen, len(scList)) return balancer.PickResult{SubConn: sc}, nil } -func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - filteredSubConns := p.filteredSubConns +func (p *filteredAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + fmt.Printf("Picking: subcons counts: filtered(%v) all(%v)\n", len(p.filteredSubConns), len(p.allSubConns)) numConnections := *numConnectionsInt - if len(filteredSubConns) == 0 { + if len(p.filteredSubConns) == 0 { fmt.Printf("No subconns in the filtered list, pick from anywhere in pool\n") return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) } - if len(filteredSubConns) >= numConnections && numConnections > 0 { + if len(p.filteredSubConns) >= numConnections && numConnections > 0 { fmt.Printf("Limiting to first %v\n", numConnections) - return p.pickFromSubconns(filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1)) + return p.pickFromSubconns(p.filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1)) } else { - return p.pickFromSubconns(filteredSubConns, atomic.AddUint32(&p.next, 1)) + return p.pickFromSubconns(p.filteredSubConns, atomic.AddUint32(&p.next, 1)) } } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index f6932fe888c..a2a655ce0bd 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -65,6 +65,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt existingConn := proxy.targetConns[target] if existingConn != nil { proxy.mu.Unlock() + fmt.Printf("Reused connection for %v\n", target) return existingConn, nil } proxy.mu.Unlock() @@ -84,6 +85,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt } proxy.mu.Lock() + fmt.Printf("Created new connection for %v\n", target) proxy.targetConns[target] = conn proxy.mu.Unlock() From 6584d3c650018f3784e06adfa48ec1c0c23860a6 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Thu, 21 Mar 2024 22:19:10 -0700 Subject: [PATCH 05/16] Simplified by a lot - much simpler now pick fewer addresses --- go/vt/vtgateproxy/discovery.go | 51 ++++++++++++++++++++++-------- go/vt/vtgateproxy/gate_balancer.go | 34 ++++---------------- 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index f9744026e64..cb418e0dfb9 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -121,27 +121,52 @@ func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, erro return nil, nil, err } + allAddrs := []resolver.Address{} + filteredAddrs := []resolver.Address{} addrs := []resolver.Address{} for _, pair := range pairs { - filterMatch := false + matchesAll := true for k, v := range r.filters { - if pair[k] == v { - filterMatch = true - } else { - filterMatch = false + if pair[k] != v { + matchesAll = false } } - attrs := attributes.New(matchesFilter{}, "nomatch") - if filterMatch { - attrs = attributes.New(matchesFilter{}, "match") + if matchesAll { + filteredAddrs = append(filteredAddrs, resolver.Address{ + Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]), + BalancerAttributes: attributes.New(matchesFilter{}, "match"), + }) } - // Add matching hosts to registration list - addrs = append(addrs, resolver.Address{ - Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]), - BalancerAttributes: attrs, - }) + // Must filter by type + t, ok := r.filters["type"] + if ok { + if pair["type"] == t { + // Add matching hosts to registration list + allAddrs = append(allAddrs, resolver.Address{ + Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]), + BalancerAttributes: attributes.New(matchesFilter{}, "nomatch"), + }) + } + } + } + + fmt.Printf("-----\n") + fmt.Printf("filtered: %v\n", filteredAddrs) + fmt.Printf("-----\n") + fmt.Printf("all: %v\n", allAddrs) + fmt.Printf("----\n") + + // Nothing in the filtered list? Get them all + if len(filteredAddrs) == 0 { + addrs = allAddrs + } else if len(filteredAddrs) > *numConnectionsInt { + addrs = filteredAddrs[0:*numConnectionsInt] + } else if len(allAddrs) > *numConnectionsInt { + addrs = allAddrs[0:*numConnectionsInt] + } else { + addrs = allAddrs } fmt.Printf("Loaded addrs from discovery file: %v\n", addrs) diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index 2114e307d8b..0e334b64206 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -27,29 +27,22 @@ func (*pickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } allSubConns := []balancer.SubConn{} - subConnsByFiltered := []balancer.SubConn{} for sc := range info.ReadySCs { - subConnInfo := info.ReadySCs[sc] - matchesFilter := subConnInfo.Address.BalancerAttributes.Value(matchesFilter{}).(string) - + //subConnInfo := info.ReadySCs[sc] + //matchesFilter := subConnInfo.Address.BalancerAttributes.Value(matchesFilter{}).(string) allSubConns = append(allSubConns, sc) - if matchesFilter == "match" { - subConnsByFiltered = append(subConnsByFiltered, sc) - } } return &filteredAffinityPicker{ - allSubConns: allSubConns, - filteredSubConns: subConnsByFiltered, + subConns: allSubConns, } } type filteredAffinityPicker struct { // allSubConns is all subconns that were in the ready state when the picker was created - allSubConns []balancer.SubConn - filteredSubConns []balancer.SubConn - next uint32 + subConns []balancer.SubConn + next uint32 } // Pick the next in the list from the list of subconns (RR) @@ -61,23 +54,10 @@ func (p *filteredAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nex } sc := scList[nextIndex%subConnsLen] - fmt.Printf("Select offset: iteration:(%v) mod:(%v) connLen:(%v)\n", nextIndex, nextIndex%subConnsLen, len(scList)) - return balancer.PickResult{SubConn: sc}, nil } func (p *filteredAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - fmt.Printf("Picking: subcons counts: filtered(%v) all(%v)\n", len(p.filteredSubConns), len(p.allSubConns)) - numConnections := *numConnectionsInt - if len(p.filteredSubConns) == 0 { - fmt.Printf("No subconns in the filtered list, pick from anywhere in pool\n") - return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) - } - - if len(p.filteredSubConns) >= numConnections && numConnections > 0 { - fmt.Printf("Limiting to first %v\n", numConnections) - return p.pickFromSubconns(p.filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1)) - } else { - return p.pickFromSubconns(p.filteredSubConns, atomic.AddUint32(&p.next, 1)) - } + fmt.Printf("Picking: subcons counts: len(%v)\n", len(p.subConns)) + return p.pickFromSubconns(p.subConns, atomic.AddUint32(&p.next, 1)) } From 578c4e8932737b777ae1128dee83f42c10046cd3 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Thu, 21 Mar 2024 22:27:13 -0700 Subject: [PATCH 06/16] fixy --- go/vt/vtgateproxy/discovery.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index cb418e0dfb9..003f1f9feb7 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -108,7 +108,7 @@ type matchesFilter struct{} func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, error) { pairs := []map[string]interface{}{} - fmt.Printf("Loading config %v\n", r.jsonPath) + fmt.Printf("Loading config %v config for %v connections\n", r.jsonPath, *numConnectionsInt) data, err := os.ReadFile(r.jsonPath) if err != nil { @@ -123,7 +123,7 @@ func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, erro allAddrs := []resolver.Address{} filteredAddrs := []resolver.Address{} - addrs := []resolver.Address{} + var addrs []resolver.Address for _, pair := range pairs { matchesAll := true for k, v := range r.filters { From 06538a552010335446aaf6f75ba633cf98697193 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Thu, 21 Mar 2024 22:28:09 -0700 Subject: [PATCH 07/16] Account for infinite --- go/vt/vtgateproxy/discovery.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 003f1f9feb7..99bbc255d2d 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -161,6 +161,8 @@ func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, erro // Nothing in the filtered list? Get them all if len(filteredAddrs) == 0 { addrs = allAddrs + } else if *numConnectionsInt == 0 { + addrs = allAddrs } else if len(filteredAddrs) > *numConnectionsInt { addrs = filteredAddrs[0:*numConnectionsInt] } else if len(allAddrs) > *numConnectionsInt { From ebe2075f9f83c06f7209481503d2ed23d6a9dbab Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 14:22:11 -0700 Subject: [PATCH 08/16] copyright nonsense --- go/vt/vtgate/vtgateconn/vtgateconn.go | 2 +- go/vt/vtgateproxy/discovery.go | 15 +++++++++++++++ go/vt/vtgateproxy/mysql_server.go | 2 +- go/vt/vtgateproxy/vtgateproxy.go | 2 +- 4 files changed, 18 insertions(+), 3 deletions(-) diff --git a/go/vt/vtgate/vtgateconn/vtgateconn.go b/go/vt/vtgate/vtgateconn/vtgateconn.go index e1c49877d1d..3057acc3fcf 100644 --- a/go/vt/vtgate/vtgateconn/vtgateconn.go +++ b/go/vt/vtgate/vtgateconn/vtgateconn.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Vitess Authors. +Copyright 2024 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 99bbc255d2d..689d51b2b49 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -1,3 +1,18 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package vtgateproxy import ( diff --git a/go/vt/vtgateproxy/mysql_server.go b/go/vt/vtgateproxy/mysql_server.go index 27ad82d187b..e07b4ff109f 100644 --- a/go/vt/vtgateproxy/mysql_server.go +++ b/go/vt/vtgateproxy/mysql_server.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Vitess Authors. +Copyright 2024 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index a2a655ce0bd..0b24d579d59 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Vitess Authors. +Copyright 2024 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From dbccdda788f2391fbe805375dcea7aefd55c0b53 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 15:21:55 -0700 Subject: [PATCH 09/16] clean up debug logging --- go/vt/vtgateproxy/discovery.go | 46 +++++++++++++----------------- go/vt/vtgateproxy/gate_balancer.go | 2 -- go/vt/vtgateproxy/vtgateproxy.go | 8 +++--- 3 files changed, 24 insertions(+), 32 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 689d51b2b49..b0497628b49 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -29,6 +29,8 @@ import ( "google.golang.org/grpc/attributes" "google.golang.org/grpc/resolver" + + "vitess.io/vitess/go/vt/log" ) var ( @@ -73,7 +75,7 @@ type JSONGateConfigDiscovery struct { const queryParamFilterPrefix = "filter_" func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - fmt.Printf("Start registration for target: %v\n", target.URL.String()) + log.V(100).Infof("Start registration for target: %v\n", target.URL.String()) queryOpts := target.URL.Query() gateType := target.URL.Host @@ -98,12 +100,11 @@ func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.Clie func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" } func RegisterJsonDiscovery() { - fmt.Printf("Registering: %v\n", *jsonDiscoveryConfig) jsonDiscovery := &JSONGateConfigDiscovery{ JsonPath: *jsonDiscoveryConfig, } resolver.Register(jsonDiscovery) - fmt.Printf("Registered %v scheme\n", jsonDiscovery.Scheme()) + log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), *jsonDiscoveryConfig) } type hostFilters = map[string]string @@ -121,9 +122,10 @@ type JSONGateConfigResolver struct { type matchesFilter struct{} -func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, error) { +func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error) { pairs := []map[string]interface{}{} - fmt.Printf("Loading config %v config for %v connections\n", r.jsonPath, *numConnectionsInt) + + log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnectionsInt) data, err := os.ReadFile(r.jsonPath) if err != nil { @@ -132,7 +134,7 @@ func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, erro err = json.Unmarshal(data, &pairs) if err != nil { - fmt.Printf("parse err: %v\n", err) + log.Errorf("error parsing JSON discovery file %s: %v\n", r.jsonPath, err) return nil, nil, err } @@ -167,12 +169,6 @@ func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, erro } } - fmt.Printf("-----\n") - fmt.Printf("filtered: %v\n", filteredAddrs) - fmt.Printf("-----\n") - fmt.Printf("all: %v\n", allAddrs) - fmt.Printf("----\n") - // Nothing in the filtered list? Get them all if len(filteredAddrs) == 0 { addrs = allAddrs @@ -186,8 +182,6 @@ func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, erro addrs = allAddrs } - fmt.Printf("Loaded addrs from discovery file: %v\n", addrs) - // Shuffle to ensure every host has a different order to iterate through r.rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] @@ -197,17 +191,19 @@ func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, erro if _, err := io.Copy(h, bytes.NewReader(data)); err != nil { return nil, nil, err } + sum := h.Sum(nil) + + log.V(100).Infof("resolved %s to addrs: 0x%x, %v\n", r.target.URL.String(), sum, addrs) - fmt.Printf("Returning discovery: %d hosts checksum %x\n", len(addrs), h.Sum(nil)) - return &addrs, h.Sum(nil), nil + return &addrs, sum, nil } func (r *JSONGateConfigResolver) start() { - fmt.Print("Starting discovery checker\n") + log.V(100).Infof("Starting discovery checker\n") r.rand = rand.New(rand.NewSource(time.Now().UnixNano())) // Immediately load the initial config - addrs, hash, err := r.loadConfig() + addrs, hash, err := r.resolve() if err == nil { // if we parse ok, populate the local address store r.cc.UpdateState(resolver.State{Addresses: *addrs}) @@ -223,7 +219,7 @@ func (r *JSONGateConfigResolver) start() { for range r.ticker.C { checkFileStat, err := os.Stat(r.jsonPath) if err != nil { - fmt.Printf("Error stat'ing config %v\n", err) + log.Errorf("Error stat'ing config %v\n", err) continue } isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime() @@ -233,31 +229,29 @@ func (r *JSONGateConfigResolver) start() { } fileStat = checkFileStat - fmt.Printf("Detected config change\n") + log.V(100).Infof("Detected config change\n") - addrs, newHash, err := r.loadConfig() + addrs, newHash, err := r.resolve() if err != nil { // better luck next loop // TODO: log this - fmt.Print("Can't load config: %v\n", err) + log.Errorf("Error resolving config: %v\n", err) continue } // Make sure this wasn't a spurious change by checking the hash if bytes.Equal(hash, newHash) && newHash != nil { - fmt.Printf("No content changed in discovery file... ignoring\n") + log.V(100).Infof("No content changed in discovery file... ignoring\n") continue } hash = newHash - fmt.Printf("Loaded %d hosts\n", len(*addrs)) - fmt.Printf("Loaded %v", addrs) r.cc.UpdateState(resolver.State{Addresses: *addrs}) } }() - fmt.Printf("Loaded hosts, starting ticker\n") + log.V(100).Infof("Loaded hosts, starting ticker\n") } func (r *JSONGateConfigResolver) ResolveNow(o resolver.ResolveNowOptions) {} diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index 0e334b64206..cee07e33f1c 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -2,7 +2,6 @@ package vtgateproxy import ( "errors" - "fmt" "sync/atomic" "google.golang.org/grpc/balancer" @@ -58,6 +57,5 @@ func (p *filteredAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nex } func (p *filteredAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - fmt.Printf("Picking: subcons counts: len(%v)\n", len(p.subConns)) return p.pickFromSubconns(p.subConns, atomic.AddUint32(&p.next, 1)) } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 0b24d579d59..7bdd96117b3 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -21,7 +21,6 @@ package vtgateproxy import ( "context" "flag" - "fmt" "io" "net/url" "strings" @@ -33,6 +32,7 @@ import ( "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/schema" @@ -58,14 +58,14 @@ type VTGateProxy struct { } func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) { - fmt.Printf("Getting connection for %v\n", target) + log.V(100).Infof("Getting connection for %v\n", target) // If the connection exists, return it proxy.mu.Lock() existingConn := proxy.targetConns[target] if existingConn != nil { proxy.mu.Unlock() - fmt.Printf("Reused connection for %v\n", target) + log.V(100).Infof("Reused connection for %v\n", target) return existingConn, nil } proxy.mu.Unlock() @@ -84,8 +84,8 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt return nil, err } + log.V(100).Infof("Created new connection for %v\n", target) proxy.mu.Lock() - fmt.Printf("Created new connection for %v\n", target) proxy.targetConns[target] = conn proxy.mu.Unlock() From 0359ecdcb020e695bdf53f048bd1e00ecfb71cf4 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 15:28:10 -0700 Subject: [PATCH 10/16] round_robin works! --- go/vt/vtgateproxy/gate_balancer.go | 61 ------------------------------ go/vt/vtgateproxy/vtgateproxy.go | 5 +-- 2 files changed, 1 insertion(+), 65 deletions(-) delete mode 100644 go/vt/vtgateproxy/gate_balancer.go diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go deleted file mode 100644 index cee07e33f1c..00000000000 --- a/go/vt/vtgateproxy/gate_balancer.go +++ /dev/null @@ -1,61 +0,0 @@ -package vtgateproxy - -import ( - "errors" - "sync/atomic" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/base" -) - -// Name is the name of az affinity balancer. -const Name = "slack_affinity_balancer" - -func newBuilder() balancer.Builder { - return base.NewBalancerBuilder(Name, &pickerBuilder{}, base.Config{HealthCheck: true}) -} - -func init() { - balancer.Register(newBuilder()) -} - -type pickerBuilder struct{} - -func (*pickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { - if len(info.ReadySCs) == 0 { - return base.NewErrPicker(balancer.ErrNoSubConnAvailable) - } - allSubConns := []balancer.SubConn{} - - for sc := range info.ReadySCs { - //subConnInfo := info.ReadySCs[sc] - //matchesFilter := subConnInfo.Address.BalancerAttributes.Value(matchesFilter{}).(string) - allSubConns = append(allSubConns, sc) - } - - return &filteredAffinityPicker{ - subConns: allSubConns, - } -} - -type filteredAffinityPicker struct { - // allSubConns is all subconns that were in the ready state when the picker was created - subConns []balancer.SubConn - next uint32 -} - -// Pick the next in the list from the list of subconns (RR) -func (p *filteredAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) { - subConnsLen := uint32(len(scList)) - - if subConnsLen == 0 { - return balancer.PickResult{}, errors.New("no hosts in list") - } - - sc := scList[nextIndex%subConnsLen] - return balancer.PickResult{SubConn: sc}, nil -} - -func (p *filteredAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - return p.pickFromSubconns(p.subConns, atomic.AddUint32(&p.next, 1)) -} diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 7bdd96117b3..a831afde6d7 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -72,11 +72,8 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt // Otherwise create a new connection after dropping the lock, allowing multiple requests to // race to create the conn for now. - // grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - // return append(opts, grpc.WithBlock()), nil - // }) grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"slack_affinity_balancer":{}}]}`)), nil + return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil }) conn, err := vtgateconn.DialProtocol(ctx, "grpc", target) From 1b62a712f5a5674513dc0ff00c039917b83774a8 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 16:41:50 -0700 Subject: [PATCH 11/16] use rw mutex to serialize creation --- go/vt/vtgateproxy/vtgateproxy.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index a831afde6d7..bc142cb0d5d 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -48,27 +48,32 @@ var ( vtGateProxy *VTGateProxy = &VTGateProxy{ targetConns: map[string]*vtgateconn.VTGateConn{}, - mu: sync.Mutex{}, + mu: sync.RWMutex{}, } ) type VTGateProxy struct { targetConns map[string]*vtgateconn.VTGateConn - mu sync.Mutex + mu sync.RWMutex } func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) { log.V(100).Infof("Getting connection for %v\n", target) // If the connection exists, return it - proxy.mu.Lock() + proxy.mu.RLock() existingConn := proxy.targetConns[target] if existingConn != nil { - proxy.mu.Unlock() + proxy.mu.RUnlock() log.V(100).Infof("Reused connection for %v\n", target) return existingConn, nil } - proxy.mu.Unlock() + + // No luck, need to create a new one. Serialize new additions so we don't create multiple + // for a given target. + log.V(100).Infof("Need to create connection for %v\n", target) + proxy.mu.RUnlock() + proxy.mu.Lock() // Otherwise create a new connection after dropping the lock, allowing multiple requests to // race to create the conn for now. @@ -82,7 +87,6 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt } log.V(100).Infof("Created new connection for %v\n", target) - proxy.mu.Lock() proxy.targetConns[target] = conn proxy.mu.Unlock() From 0f25fa672e3e782b0eac3f0a7e624e14332c7055 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 16:41:55 -0700 Subject: [PATCH 12/16] rework the filtering to make everything parameterized and more explicit Change all the config so that instead of hard coded constants we set the various connection attributes, json field names, etc using command line flags. Then make the pool type and affinity arguments more explicit and less generic. --- go/vt/vtgateproxy/discovery.go | 132 +++++++++++++++++-------------- go/vt/vtgateproxy/vtgateproxy.go | 25 +++--- 2 files changed, 81 insertions(+), 76 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index b0497628b49..258d8d56cb8 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -24,10 +24,8 @@ import ( "io" "math/rand" "os" - "strings" "time" - "google.golang.org/grpc/attributes" "google.golang.org/grpc/resolver" "vitess.io/vitess/go/vt/log" @@ -35,7 +33,9 @@ import ( var ( jsonDiscoveryConfig = flag.String("json_config", "", "json file describing the host list to use fot vitess://vtgate resolution") - numConnectionsInt = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain") + addressField = flag.String("address_field", "address", "field name in the json file containing the address") + portField = flag.String("port_field", "port", "field name in the json file containing the port") + numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain") ) // File based discovery for vtgate grpc endpoints @@ -75,24 +75,30 @@ type JSONGateConfigDiscovery struct { const queryParamFilterPrefix = "filter_" func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - log.V(100).Infof("Start registration for target: %v\n", target.URL.String()) - queryOpts := target.URL.Query() - gateType := target.URL.Host - - filters := hostFilters{} - filters["type"] = gateType - for k := range queryOpts { - if strings.HasPrefix(k, queryParamFilterPrefix) { - filteredPrefix := strings.TrimPrefix(k, queryParamFilterPrefix) - filters[filteredPrefix] = queryOpts.Get(k) + attrs := target.URL.Query() + + poolType := "" + if *poolTypeAttr != "" { + poolType = attrs.Get(*poolTypeAttr) + if poolType == "" { + return nil, fmt.Errorf("pool type attribute %s not in target", *poolTypeAttr) } } + // affinity is optional + affinity := "" + if *affinityAttr != "" { + affinity = attrs.Get(*affinityAttr) + } + + log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity) + r := &JSONGateConfigResolver{ target: target, cc: cc, jsonPath: b.JsonPath, - filters: filters, + poolType: poolType, + affinity: affinity, } r.start() return r, nil @@ -115,85 +121,89 @@ type JSONGateConfigResolver struct { target resolver.Target cc resolver.ClientConn jsonPath string - ticker *time.Ticker - rand *rand.Rand // safe for concurrent use. - filters hostFilters + poolType string + affinity string + + ticker *time.Ticker + rand *rand.Rand // safe for concurrent use. } -type matchesFilter struct{} +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func jsonDump(data interface{}) string { + json, _ := json.Marshal(data) + return string(json) +} func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error) { - pairs := []map[string]interface{}{} - log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnectionsInt) + log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) data, err := os.ReadFile(r.jsonPath) if err != nil { return nil, nil, err } - err = json.Unmarshal(data, &pairs) + hosts := []map[string]interface{}{} + err = json.Unmarshal(data, &hosts) if err != nil { log.Errorf("error parsing JSON discovery file %s: %v\n", r.jsonPath, err) return nil, nil, err } - allAddrs := []resolver.Address{} - filteredAddrs := []resolver.Address{} - var addrs []resolver.Address - for _, pair := range pairs { - matchesAll := true - for k, v := range r.filters { - if pair[k] != v { - matchesAll = false + // optionally filter to only hosts that match the pool type + if r.poolType != "" { + candidates := []map[string]interface{}{} + for _, host := range hosts { + hostType, ok := host[*poolTypeAttr] + if ok && hostType == r.poolType { + candidates = append(candidates, host) + log.V(1000).Infof("matched host %s with type %s", jsonDump(host), hostType) + } else { + log.V(1000).Infof("skipping host %s with type %s", jsonDump(host), hostType) } } + hosts = candidates + } - if matchesAll { - filteredAddrs = append(filteredAddrs, resolver.Address{ - Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]), - BalancerAttributes: attributes.New(matchesFilter{}, "match"), - }) - } + // Shuffle to ensure every host has a different order to iterate through + r.rand.Shuffle(len(hosts), func(i, j int) { + hosts[i], hosts[j] = hosts[j], hosts[i] + }) - // Must filter by type - t, ok := r.filters["type"] - if ok { - if pair["type"] == t { - // Add matching hosts to registration list - allAddrs = append(allAddrs, resolver.Address{ - Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]), - BalancerAttributes: attributes.New(matchesFilter{}, "nomatch"), - }) + // If affinity is specified, then shuffle those hosts to the front + if r.affinity != "" { + i := 0 + for j := 0; j < len(hosts); j++ { + hostAffinity, ok := hosts[j][*affinityAttr] + if ok && hostAffinity == r.affinity { + hosts[i], hosts[j] = hosts[j], hosts[i] + i++ } } } - // Nothing in the filtered list? Get them all - if len(filteredAddrs) == 0 { - addrs = allAddrs - } else if *numConnectionsInt == 0 { - addrs = allAddrs - } else if len(filteredAddrs) > *numConnectionsInt { - addrs = filteredAddrs[0:*numConnectionsInt] - } else if len(allAddrs) > *numConnectionsInt { - addrs = allAddrs[0:*numConnectionsInt] - } else { - addrs = allAddrs + // Grab the first N addresses, and voila! + var addrs []resolver.Address + hosts = hosts[:min(*numConnections, len(hosts))] + for _, host := range hosts { + addrs = append(addrs, resolver.Address{ + Addr: fmt.Sprintf("%s:%s", host[*addressField], host[*portField]), + }) } - // Shuffle to ensure every host has a different order to iterate through - r.rand.Shuffle(len(addrs), func(i, j int) { - addrs[i], addrs[j] = addrs[j], addrs[i] - }) - h := sha256.New() if _, err := io.Copy(h, bytes.NewReader(data)); err != nil { return nil, nil, err } sum := h.Sum(nil) - log.V(100).Infof("resolved %s to addrs: 0x%x, %v\n", r.target.URL.String(), sum, addrs) + log.V(100).Infof("resolved %s to hosts %s addrs: 0x%x, %v\n", r.target.URL.String(), jsonDump(hosts), sum, addrs) return &addrs, sum, nil } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index bc142cb0d5d..c91000a90df 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -25,26 +25,22 @@ import ( "net/url" "strings" "sync" - "time" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/vterrors" _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) var ( - dialTimeout = flag.Duration("dial_timeout", 5*time.Second, "dialer timeout for the GRPC connection") - defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable") - sysVarSetEnabled = flag.Bool("enable_system_settings", true, "This will enable the system settings to be changed per session at the database connection level") + poolTypeAttr = flag.String("pool_type_attr", "", "Attribute (both mysql connection and JSON file) used to specify the target vtgate type and filter the hosts, e.g. 'type'") + affinityAttr = flag.String("affinity_attr", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'") vtGateProxy *VTGateProxy = &VTGateProxy{ targetConns: map[string]*vtgateconn.VTGateConn{}, @@ -94,23 +90,22 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt } func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) { - target, ok := connectionAttributes["target"] - if !ok { - return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "no target string supplied by client") + + if *poolTypeAttr != "" { + _, ok := connectionAttributes[*poolTypeAttr] + if !ok { + return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *poolTypeAttr) + } } targetUrl := url.URL{ Scheme: "vtgate", - Host: target, + Host: "pool", } - filters := metadata.Pairs() values := url.Values{} for k, v := range connectionAttributes { - if strings.HasPrefix(k, queryParamFilterPrefix) { - filters.Append(k, v) - values.Set(k, v) - } + values.Set(k, v) } targetUrl.RawQuery = values.Encode() From 8dd5c0f61b4b19bfefe10072f02f166790b1bfca Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 16:45:44 -0700 Subject: [PATCH 13/16] no longer needed --- go/vt/vtgateproxy/discovery.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 258d8d56cb8..857390e7e45 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -72,8 +72,6 @@ type JSONGateConfigDiscovery struct { JsonPath string } -const queryParamFilterPrefix = "filter_" - func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { attrs := target.URL.Query() From bb3a83ccc40c25db18d811fea378c62e8c9fb1f4 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 16:48:07 -0700 Subject: [PATCH 14/16] update comments --- go/vt/vtgateproxy/discovery.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 857390e7e45..be34ca3ebd4 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -75,6 +75,8 @@ type JSONGateConfigDiscovery struct { func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { attrs := target.URL.Query() + // If the config specifies a pool type attribute, then the caller must supply it in the connection + // attributes, otherwise reject the request. poolType := "" if *poolTypeAttr != "" { poolType = attrs.Get(*poolTypeAttr) @@ -83,7 +85,7 @@ func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.Clie } } - // affinity is optional + // Affinity on the other hand is just an optimization affinity := "" if *affinityAttr != "" { affinity = attrs.Get(*affinityAttr) From 19d06e2e02e2696de2f847573eba0cd14486e12d Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 17:00:14 -0700 Subject: [PATCH 15/16] only pass through the URL params we need --- go/vt/vtgateproxy/discovery.go | 3 --- go/vt/vtgateproxy/vtgateproxy.go | 27 ++++++++++++++++++--------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index be34ca3ebd4..ec708e3a46a 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -113,9 +113,6 @@ func RegisterJsonDiscovery() { log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), *jsonDiscoveryConfig) } -type hostFilters = map[string]string - -// exampleResolver is a // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). type JSONGateConfigResolver struct { target resolver.Target diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index c91000a90df..d1c28ebce9f 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -91,22 +91,31 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) { - if *poolTypeAttr != "" { - _, ok := connectionAttributes[*poolTypeAttr] - if !ok { - return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *poolTypeAttr) - } - } - targetUrl := url.URL{ Scheme: "vtgate", Host: "pool", } values := url.Values{} - for k, v := range connectionAttributes { - values.Set(k, v) + + if *poolTypeAttr != "" { + poolType, ok := connectionAttributes[*poolTypeAttr] + if ok { + values.Set(*poolTypeAttr, poolType) + } else { + return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *poolTypeAttr) + } + } + + if *affinityAttr != "" { + affinity, ok := connectionAttributes[*affinityAttr] + if ok { + values.Set(*affinityAttr, affinity) + } else { + return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *affinityAttr) + } } + targetUrl.RawQuery = values.Encode() conn, err := proxy.getConnection(ctx, targetUrl.String()) From 7e1900fb3f065527eb0d69bd4f90e91dad7eb8a2 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 22 Mar 2024 17:01:06 -0700 Subject: [PATCH 16/16] affinity is actually optional --- go/vt/vtgateproxy/vtgateproxy.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index d1c28ebce9f..5f4a50c8ea1 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -111,8 +111,6 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu affinity, ok := connectionAttributes[*affinityAttr] if ok { values.Set(*affinityAttr, affinity) - } else { - return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *affinityAttr) } }