Skip to content

Commit

Permalink
simplify more
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheinblum committed Mar 22, 2024
1 parent 9e12ae1 commit 09da6d4
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 32 deletions.
15 changes: 7 additions & 8 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.Clie
}
}

r := &resolveJSONGateConfig{
r := &JSONGateConfigResolver{
target: target,
cc: cc,
jsonPath: b.JsonPath,
Expand All @@ -95,7 +95,7 @@ type hostFilters = map[string]string

// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type resolveJSONGateConfig struct {
type JSONGateConfigResolver struct {
target resolver.Target
cc resolver.ClientConn
jsonPath string
Expand All @@ -106,7 +106,7 @@ type resolveJSONGateConfig struct {

type matchesFilter struct{}

func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) {
func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, error) {
pairs := []map[string]interface{}{}
fmt.Printf("Loading config %v\n", r.jsonPath)

Expand All @@ -123,7 +123,6 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error

addrs := []resolver.Address{}
for _, pair := range pairs {

filterMatch := false
for k, v := range r.filters {
if pair[k] == v {
Expand All @@ -145,7 +144,7 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error
})
}

fmt.Printf("Addrs: %v\n", addrs)
fmt.Printf("Loaded addrs from discovery file: %v\n", addrs)

// Shuffle to ensure every host has a different order to iterate through
r.rand.Shuffle(len(addrs), func(i, j int) {
Expand All @@ -161,7 +160,7 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error
return &addrs, h.Sum(nil), nil
}

func (r *resolveJSONGateConfig) start() {
func (r *JSONGateConfigResolver) start() {
fmt.Print("Starting discovery checker\n")
r.rand = rand.New(rand.NewSource(time.Now().UnixNano()))

Expand Down Expand Up @@ -219,8 +218,8 @@ func (r *resolveJSONGateConfig) start() {
fmt.Printf("Loaded hosts, starting ticker\n")

}
func (r *resolveJSONGateConfig) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *resolveJSONGateConfig) Close() {
func (r *JSONGateConfigResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *JSONGateConfigResolver) Close() {
r.ticker.Stop()
}

Expand Down
37 changes: 13 additions & 24 deletions go/vt/vtgateproxy/gate_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,22 @@ import (

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
)

// Name is the name of az affinity balancer.
const Name = "slack_affinity_balancer"
const MetadataHostAffinityCount = "grpc-slack-num-connections-metadata"
const MetadataDiscoveryFilterPrefix = "grpc_discovery_filter_"

var logger = grpclog.Component("slack_affinity_balancer")

func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &slackAZAffinityBalancer{}, base.Config{HealthCheck: true})
return base.NewBalancerBuilder(Name, &pickerBuilder{}, base.Config{HealthCheck: true})
}

func init() {
balancer.Register(newBuilder())
}

type slackAZAffinityBalancer struct{}

func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("slackAZAffinityBalancer: Build called with info: %v", info)
fmt.Printf("Rebuilding picker\n")
type pickerBuilder struct{}

func (*pickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
Expand All @@ -47,48 +39,45 @@ func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker
}
}

fmt.Printf("Filtered subcons: %v\n", len(subConnsByFiltered))
fmt.Printf("All subcons: %v\n", len(allSubConns))

return &slackAZAffinityPicker{
return &filteredAffinityPicker{
allSubConns: allSubConns,
filteredSubConns: subConnsByFiltered,
}
}

type slackAZAffinityPicker struct {
type filteredAffinityPicker struct {
// allSubConns is all subconns that were in the ready state when the picker was created
allSubConns []balancer.SubConn
filteredSubConns []balancer.SubConn
next uint32
}

// Pick the next in the list from the list of subconns (RR)
func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) {
func (p *filteredAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) {
subConnsLen := uint32(len(scList))

if subConnsLen == 0 {
return balancer.PickResult{}, errors.New("no hosts in list")
}

sc := scList[nextIndex%subConnsLen]
fmt.Printf("Select offset: %v %v %v %v\n", nextIndex, nextIndex%subConnsLen, len(scList), sc)
fmt.Printf("Select offset: iteration:(%v) mod:(%v) connLen:(%v)\n", nextIndex, nextIndex%subConnsLen, len(scList))

return balancer.PickResult{SubConn: sc}, nil
}

func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
filteredSubConns := p.filteredSubConns
func (p *filteredAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
fmt.Printf("Picking: subcons counts: filtered(%v) all(%v)\n", len(p.filteredSubConns), len(p.allSubConns))
numConnections := *numConnectionsInt
if len(filteredSubConns) == 0 {
if len(p.filteredSubConns) == 0 {
fmt.Printf("No subconns in the filtered list, pick from anywhere in pool\n")
return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1))
}

if len(filteredSubConns) >= numConnections && numConnections > 0 {
if len(p.filteredSubConns) >= numConnections && numConnections > 0 {
fmt.Printf("Limiting to first %v\n", numConnections)
return p.pickFromSubconns(filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1))
return p.pickFromSubconns(p.filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1))
} else {
return p.pickFromSubconns(filteredSubConns, atomic.AddUint32(&p.next, 1))
return p.pickFromSubconns(p.filteredSubConns, atomic.AddUint32(&p.next, 1))
}
}
2 changes: 2 additions & 0 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
existingConn := proxy.targetConns[target]
if existingConn != nil {
proxy.mu.Unlock()
fmt.Printf("Reused connection for %v\n", target)
return existingConn, nil
}
proxy.mu.Unlock()
Expand All @@ -84,6 +85,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
}

proxy.mu.Lock()
fmt.Printf("Created new connection for %v\n", target)
proxy.targetConns[target] = conn
proxy.mu.Unlock()

Expand Down

0 comments on commit 09da6d4

Please sign in to comment.