Skip to content

Commit

Permalink
Honor num connections
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheinblum committed Nov 7, 2023
1 parent cf9c6bd commit d3c7462
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 16 deletions.
26 changes: 15 additions & 11 deletions go/vt/vtgateproxy/gate_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"

Expand All @@ -16,12 +17,12 @@ import (
// Name is the name of az affinity balancer.
const Name = "slack_affinity_balancer"
const MetadataAZKey = "grpc-slack-az-metadata"
const MetadataGateTypeKey = "grpc-slack-gate-type-metadata"
const MetadataHostAffinityCount = "grpc-slack-num-connections-metadata"

var logger = grpclog.Component("slack_affinity_balancer")

func WithSlackAZAffinityContext(ctx context.Context, azID string, gateType string) context.Context {
ctx = metadata.AppendToOutgoingContext(ctx, MetadataAZKey, azID, MetadataGateTypeKey, gateType)
func WithSlackAZAffinityContext(ctx context.Context, azID string, numConnections string) context.Context {
ctx = metadata.AppendToOutgoingContext(ctx, MetadataAZKey, azID, MetadataHostAffinityCount, numConnections)
return ctx
}

Expand Down Expand Up @@ -82,21 +83,24 @@ func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, next

func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
hdrs, _ := metadata.FromOutgoingContext(info.Ctx)
fmt.Printf("Headers: %v %v\n", hdrs, info)
numConnections := 0
keys := hdrs.Get(MetadataAZKey)
if len(keys) < 1 {
fmt.Printf("uh oh - missing keys: %v %v %v\n", keys, hdrs, info.Ctx)
fmt.Printf("no header - pick from anywhere\n")
return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1))
}
az := keys[0]

if az == "" {
fmt.Printf("Header unset, pick from anywhere\n")
return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1))
}

fmt.Printf("Selecting from az: %v\n", az)
keys = hdrs.Get(MetadataHostAffinityCount)
if len(keys) > 0 {
if i, err := strconv.Atoi(keys[0]); err != nil {
numConnections = i
}
}

subConns := p.subConnsByAZ[az]
if len(subConns) == 0 {
fmt.Printf("No subconns in az and gate type, pick from anywhere\n")
Expand All @@ -106,9 +110,9 @@ func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResul
ptr := val.(*uint32)
atomic.AddUint32(ptr, 1)

if len(subConns) >= 2 {
fmt.Printf("Limiting to first 2\n")
return p.pickFromSubconns(subConns[0:2], *ptr)
if len(subConns) >= numConnections {
fmt.Printf("Limiting to first %v\n", numConnections)
return p.pickFromSubconns(subConns[0:numConnections], *ptr)
} else {
return p.pickFromSubconns(subConns, *ptr)
}
Expand Down
12 changes: 7 additions & 5 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ var (
)

type VTGateProxy struct {
targetConns map[string]*vtgateconn.VTGateConn
mu sync.Mutex
azID string
gateType string
targetConns map[string]*vtgateconn.VTGateConn
mu sync.Mutex
azID string
gateType string
numConnections string
}

func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) {
Expand All @@ -64,6 +65,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
}

proxy.azID = targetURL.Query().Get("az_id")
proxy.numConnections = targetURL.Query().Get("num_connections")
proxy.gateType = targetURL.Host

fmt.Printf("Getting connection for %v in %v\n", target, proxy.azID)
Expand All @@ -86,7 +88,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt
return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"slack_affinity_balancer":{}}]}`)), nil
})

conn, err := vtgateconn.DialProtocol(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType), "grpc", target)
conn, err := vtgateconn.DialProtocol(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.numConnections), "grpc", target)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d3c7462

Please sign in to comment.