From 09da6d494ad361f36519706364ff01737e4a5e30 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Thu, 21 Mar 2024 17:26:33 -0700 Subject: [PATCH] simplify more --- go/vt/vtgateproxy/discovery.go | 15 ++++++------ go/vt/vtgateproxy/gate_balancer.go | 37 +++++++++++------------------- go/vt/vtgateproxy/vtgateproxy.go | 2 ++ 3 files changed, 22 insertions(+), 32 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index c5ceae7f3ed..f9744026e64 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -71,7 +71,7 @@ func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.Clie } } - r := &resolveJSONGateConfig{ + r := &JSONGateConfigResolver{ target: target, cc: cc, jsonPath: b.JsonPath, @@ -95,7 +95,7 @@ type hostFilters = map[string]string // exampleResolver is a // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). -type resolveJSONGateConfig struct { +type JSONGateConfigResolver struct { target resolver.Target cc resolver.ClientConn jsonPath string @@ -106,7 +106,7 @@ type resolveJSONGateConfig struct { type matchesFilter struct{} -func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) { +func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, error) { pairs := []map[string]interface{}{} fmt.Printf("Loading config %v\n", r.jsonPath) @@ -123,7 +123,6 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error addrs := []resolver.Address{} for _, pair := range pairs { - filterMatch := false for k, v := range r.filters { if pair[k] == v { @@ -145,7 +144,7 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error }) } - fmt.Printf("Addrs: %v\n", addrs) + fmt.Printf("Loaded addrs from discovery file: %v\n", addrs) // Shuffle to ensure every host has a different order to iterate through r.rand.Shuffle(len(addrs), func(i, j int) { @@ -161,7 +160,7 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error return &addrs, h.Sum(nil), nil } -func (r *resolveJSONGateConfig) start() { +func (r *JSONGateConfigResolver) start() { fmt.Print("Starting discovery checker\n") r.rand = rand.New(rand.NewSource(time.Now().UnixNano())) @@ -219,8 +218,8 @@ func (r *resolveJSONGateConfig) start() { fmt.Printf("Loaded hosts, starting ticker\n") } -func (r *resolveJSONGateConfig) ResolveNow(o resolver.ResolveNowOptions) {} -func (r *resolveJSONGateConfig) Close() { +func (r *JSONGateConfigResolver) ResolveNow(o resolver.ResolveNowOptions) {} +func (r *JSONGateConfigResolver) Close() { r.ticker.Stop() } diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index eafcac03c62..2114e307d8b 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -7,30 +7,22 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" - "google.golang.org/grpc/grpclog" ) // Name is the name of az affinity balancer. const Name = "slack_affinity_balancer" -const MetadataHostAffinityCount = "grpc-slack-num-connections-metadata" -const MetadataDiscoveryFilterPrefix = "grpc_discovery_filter_" - -var logger = grpclog.Component("slack_affinity_balancer") func newBuilder() balancer.Builder { - return base.NewBalancerBuilder(Name, &slackAZAffinityBalancer{}, base.Config{HealthCheck: true}) + return base.NewBalancerBuilder(Name, &pickerBuilder{}, base.Config{HealthCheck: true}) } func init() { balancer.Register(newBuilder()) } -type slackAZAffinityBalancer struct{} - -func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker { - logger.Infof("slackAZAffinityBalancer: Build called with info: %v", info) - fmt.Printf("Rebuilding picker\n") +type pickerBuilder struct{} +func (*pickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } @@ -47,16 +39,13 @@ func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker } } - fmt.Printf("Filtered subcons: %v\n", len(subConnsByFiltered)) - fmt.Printf("All subcons: %v\n", len(allSubConns)) - - return &slackAZAffinityPicker{ + return &filteredAffinityPicker{ allSubConns: allSubConns, filteredSubConns: subConnsByFiltered, } } -type slackAZAffinityPicker struct { +type filteredAffinityPicker struct { // allSubConns is all subconns that were in the ready state when the picker was created allSubConns []balancer.SubConn filteredSubConns []balancer.SubConn @@ -64,7 +53,7 @@ type slackAZAffinityPicker struct { } // Pick the next in the list from the list of subconns (RR) -func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) { +func (p *filteredAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) { subConnsLen := uint32(len(scList)) if subConnsLen == 0 { @@ -72,23 +61,23 @@ func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, next } sc := scList[nextIndex%subConnsLen] - fmt.Printf("Select offset: %v %v %v %v\n", nextIndex, nextIndex%subConnsLen, len(scList), sc) + fmt.Printf("Select offset: iteration:(%v) mod:(%v) connLen:(%v)\n", nextIndex, nextIndex%subConnsLen, len(scList)) return balancer.PickResult{SubConn: sc}, nil } -func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - filteredSubConns := p.filteredSubConns +func (p *filteredAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + fmt.Printf("Picking: subcons counts: filtered(%v) all(%v)\n", len(p.filteredSubConns), len(p.allSubConns)) numConnections := *numConnectionsInt - if len(filteredSubConns) == 0 { + if len(p.filteredSubConns) == 0 { fmt.Printf("No subconns in the filtered list, pick from anywhere in pool\n") return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) } - if len(filteredSubConns) >= numConnections && numConnections > 0 { + if len(p.filteredSubConns) >= numConnections && numConnections > 0 { fmt.Printf("Limiting to first %v\n", numConnections) - return p.pickFromSubconns(filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1)) + return p.pickFromSubconns(p.filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1)) } else { - return p.pickFromSubconns(filteredSubConns, atomic.AddUint32(&p.next, 1)) + return p.pickFromSubconns(p.filteredSubConns, atomic.AddUint32(&p.next, 1)) } } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index f6932fe888c..a2a655ce0bd 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -65,6 +65,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt existingConn := proxy.targetConns[target] if existingConn != nil { proxy.mu.Unlock() + fmt.Printf("Reused connection for %v\n", target) return existingConn, nil } proxy.mu.Unlock() @@ -84,6 +85,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt } proxy.mu.Lock() + fmt.Printf("Created new connection for %v\n", target) proxy.targetConns[target] = conn proxy.mu.Unlock()