diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 40df37f376b..04cc94b1185 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -23,6 +23,7 @@ import ( "io" "math/rand" "os" + "sort" "time" "google.golang.org/grpc/resolver" @@ -60,8 +61,9 @@ type JSONGateResolverBuilder struct { portField string poolTypeField string affinityField string + affinityValue string - targets []targetHost + targets map[string][]targetHost resolvers []*JSONGateResolver rand *rand.Rand @@ -96,13 +98,16 @@ func RegisterJSONGateResolver( portField string, poolTypeField string, affinityField string, + affinityValue string, ) (*JSONGateResolverBuilder, error) { jsonDiscovery := &JSONGateResolverBuilder{ + targets: map[string][]targetHost{}, jsonPath: jsonPath, addressField: addressField, portField: portField, poolTypeField: poolTypeField, affinityField: affinityField, + affinityValue: affinityValue, } resolver.Register(jsonDiscovery) @@ -138,17 +143,19 @@ func (b *JSONGateResolverBuilder) start() error { poolTypes := map[string]int{} affinityTypes := map[string]int{} - for _, t := range b.targets { - count := poolTypes[t.poolType] - poolTypes[t.poolType] = count + 1 + for _, ts := range b.targets { + for _, t := range ts { + count := poolTypes[t.poolType] + poolTypes[t.poolType] = count + 1 - count = affinityTypes[t.affinity] - affinityTypes[t.affinity] = count + 1 + count = affinityTypes[t.affinity] + affinityTypes[t.affinity] = count + 1 + } } buildCount.Add(1) - log.Infof("loaded %d targets, pool types %v, affinity groups %v", len(b.targets), poolTypes, affinityTypes) + log.Infof("loaded targets, pool types %v, affinity groups %v", poolTypes, affinityTypes) // Start a config watcher b.ticker = time.NewTicker(1 * time.Second) @@ -217,7 +224,7 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { return false, fmt.Errorf("error parsing JSON discovery file %s: %v", b.jsonPath, err) } - var targets []targetHost + var targets = map[string][]targetHost{} for _, host := range hosts { address, hasAddress := host[b.addressField] port, hasPort := host[b.portField] @@ -258,8 +265,21 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s has invalid value %v", b.jsonPath, b.portField, port) } - targets = append(targets, targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}) + target := targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)} + targets[target.poolType] = append(targets[target.poolType], target) + } + + for poolType := range targets { + if b.affinityField != "" { + sort.Slice(targets[poolType], func(i, j int) bool { + return b.affinityValue == targets[poolType][i].affinity + }) + } + if len(targets[poolType]) > *numConnections { + targets[poolType] = targets[poolType][:*numConnections] + } } + b.targets = targets return true, nil @@ -270,17 +290,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) - // filter to only targets that match the pool type. if unset, this will just be a copy - // of the full target list. - targets := []targetHost{} - for _, target := range b.targets { - if r.poolType == target.poolType { - targets = append(targets, target) - log.V(1000).Infof("matched target %v with type %s", target, r.poolType) - } else { - log.V(1000).Infof("skipping host %v with type %s", target, r.poolType) - } - } + targets := b.targets[r.poolType] // Shuffle to ensure every host has a different order to iterate through, putting // the affinity matching (e.g. same az) hosts at the front and the non-matching ones @@ -302,32 +312,12 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { } } - // Grab the first N addresses, and voila! var addrs []resolver.Address - targets = targets[:min(*numConnections, len(targets))] for _, target := range targets { addrs = append(addrs, resolver.Address{Addr: target.addr}) } - // Count some metrics - var unknown, local, remote int64 - for _, target := range targets { - if r.affinity == "" { - unknown++ - } else if r.affinity == target.affinity { - local++ - } else { - remote++ - } - } - if unknown != 0 { - affinityCount.Add("unknown", unknown) - } - affinityCount.Add("local", local) - affinityCount.Add("remote", remote) - poolTypeCount.Add(r.poolType, int64(len(targets))) - - log.V(100).Infof("updated targets for %s to %v (local %d / remote %d)", r.target.URL.String(), targets, local, remote) + log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } @@ -346,19 +336,13 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie } } - // Affinity on the other hand is just an optimization - affinity := "" - if b.affinityField != "" { - affinity = attrs.Get(b.affinityField) - } - - log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity) + log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, b.affinityValue) r := &JSONGateResolver{ target: target, clientConn: cc, poolType: poolType, - affinity: affinity, + affinity: b.affinityValue, } b.update(r) @@ -372,14 +356,3 @@ func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} func (r *JSONGateResolver) Close() { log.Infof("Closing resolver for target %s", r.target.URL.String()) } - -// Utilities -func min(a, b int) int { - if a < b { - return a - } - return b -} - -func init() { -} diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index bdf44348450..86ebb6350ff 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -43,7 +43,8 @@ var ( vtgateHostsFile = flag.String("vtgate_hosts_file", "", "json file describing the host list to use for vtgate:// resolution") numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain") poolTypeField = flag.String("pool_type_field", "", "Field name used to specify the target vtgate type and filter the hosts") - affinityField = flag.String("affinity_field", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'") + affinityField = flag.String("affinity_field", "", "Attribute (JSON file) used to specify the routing affinity , e.g. 'az_id'") + affinityValue = flag.String("affinity_value", "", "Value to match for routing affinity , e.g. 'use-az1'") addressField = flag.String("address_field", "address", "field name in the json file containing the address") portField = flag.String("port_field", "port", "field name in the json file containing the port") @@ -194,5 +195,6 @@ func Init() { *portField, *poolTypeField, *affinityField, + *affinityValue, ) }