Skip to content

Commit

Permalink
Simplified by a lot - much simpler
Browse files Browse the repository at this point in the history
now pick fewer addresses
  • Loading branch information
jscheinblum committed Mar 22, 2024
1 parent 09da6d4 commit 6584d3c
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 40 deletions.
51 changes: 38 additions & 13 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,27 +121,52 @@ func (r *JSONGateConfigResolver) loadConfig() (*[]resolver.Address, []byte, erro
return nil, nil, err
}

allAddrs := []resolver.Address{}
filteredAddrs := []resolver.Address{}
addrs := []resolver.Address{}
for _, pair := range pairs {
filterMatch := false
matchesAll := true
for k, v := range r.filters {
if pair[k] == v {
filterMatch = true
} else {
filterMatch = false
if pair[k] != v {
matchesAll = false
}
}

attrs := attributes.New(matchesFilter{}, "nomatch")
if filterMatch {
attrs = attributes.New(matchesFilter{}, "match")
if matchesAll {
filteredAddrs = append(filteredAddrs, resolver.Address{
Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]),
BalancerAttributes: attributes.New(matchesFilter{}, "match"),
})
}

// Add matching hosts to registration list
addrs = append(addrs, resolver.Address{
Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]),
BalancerAttributes: attrs,
})
// Must filter by type
t, ok := r.filters["type"]
if ok {
if pair["type"] == t {
// Add matching hosts to registration list
allAddrs = append(allAddrs, resolver.Address{
Addr: fmt.Sprintf("%s:%s", pair["nebula_address"], pair["grpc"]),
BalancerAttributes: attributes.New(matchesFilter{}, "nomatch"),
})
}
}
}

fmt.Printf("-----\n")
fmt.Printf("filtered: %v\n", filteredAddrs)
fmt.Printf("-----\n")
fmt.Printf("all: %v\n", allAddrs)
fmt.Printf("----\n")

// Nothing in the filtered list? Get them all
if len(filteredAddrs) == 0 {
addrs = allAddrs
} else if len(filteredAddrs) > *numConnectionsInt {
addrs = filteredAddrs[0:*numConnectionsInt]
} else if len(allAddrs) > *numConnectionsInt {
addrs = allAddrs[0:*numConnectionsInt]
} else {
addrs = allAddrs
}

fmt.Printf("Loaded addrs from discovery file: %v\n", addrs)
Expand Down
34 changes: 7 additions & 27 deletions go/vt/vtgateproxy/gate_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,22 @@ func (*pickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
allSubConns := []balancer.SubConn{}
subConnsByFiltered := []balancer.SubConn{}

for sc := range info.ReadySCs {
subConnInfo := info.ReadySCs[sc]
matchesFilter := subConnInfo.Address.BalancerAttributes.Value(matchesFilter{}).(string)

//subConnInfo := info.ReadySCs[sc]
//matchesFilter := subConnInfo.Address.BalancerAttributes.Value(matchesFilter{}).(string)
allSubConns = append(allSubConns, sc)
if matchesFilter == "match" {
subConnsByFiltered = append(subConnsByFiltered, sc)
}
}

return &filteredAffinityPicker{
allSubConns: allSubConns,
filteredSubConns: subConnsByFiltered,
subConns: allSubConns,
}
}

type filteredAffinityPicker struct {
// allSubConns is all subconns that were in the ready state when the picker was created
allSubConns []balancer.SubConn
filteredSubConns []balancer.SubConn
next uint32
subConns []balancer.SubConn
next uint32
}

// Pick the next in the list from the list of subconns (RR)
Expand All @@ -61,23 +54,10 @@ func (p *filteredAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nex
}

sc := scList[nextIndex%subConnsLen]
fmt.Printf("Select offset: iteration:(%v) mod:(%v) connLen:(%v)\n", nextIndex, nextIndex%subConnsLen, len(scList))

return balancer.PickResult{SubConn: sc}, nil
}

func (p *filteredAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
fmt.Printf("Picking: subcons counts: filtered(%v) all(%v)\n", len(p.filteredSubConns), len(p.allSubConns))
numConnections := *numConnectionsInt
if len(p.filteredSubConns) == 0 {
fmt.Printf("No subconns in the filtered list, pick from anywhere in pool\n")
return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1))
}

if len(p.filteredSubConns) >= numConnections && numConnections > 0 {
fmt.Printf("Limiting to first %v\n", numConnections)
return p.pickFromSubconns(p.filteredSubConns[0:numConnections], atomic.AddUint32(&p.next, 1))
} else {
return p.pickFromSubconns(p.filteredSubConns, atomic.AddUint32(&p.next, 1))
}
fmt.Printf("Picking: subcons counts: len(%v)\n", len(p.subConns))
return p.pickFromSubconns(p.subConns, atomic.AddUint32(&p.next, 1))
}

0 comments on commit 6584d3c

Please sign in to comment.