diff --git a/go/cmd/vtgateproxy/vtgateproxy.go b/go/cmd/vtgateproxy/vtgateproxy.go index 13378d445a..eacdb0c13f 100644 --- a/go/cmd/vtgateproxy/vtgateproxy.go +++ b/go/cmd/vtgateproxy/vtgateproxy.go @@ -17,9 +17,6 @@ limitations under the License. package main import ( - "math/rand" - "time" - "vitess.io/vitess/go/exit" "vitess.io/vitess/go/stats/prometheusbackend" "vitess.io/vitess/go/vt/servenv" @@ -27,7 +24,6 @@ import ( ) func init() { - rand.Seed(time.Now().UnixNano()) servenv.RegisterDefaultFlags() } @@ -40,8 +36,6 @@ func main() { prometheusbackend.Init("vtgateproxy") servenv.OnRun(func() { - // Flags are parsed now. Parse the template using the actual flag value and overwrite the current template. - vtgateproxy.RegisterJsonDiscovery() vtgateproxy.Init() }) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index ec708e3a46..28826507e6 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -19,7 +19,6 @@ import ( "bytes" "crypto/sha256" "encoding/json" - "flag" "fmt" "io" "math/rand" @@ -28,17 +27,12 @@ import ( "google.golang.org/grpc/resolver" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" ) -var ( - jsonDiscoveryConfig = flag.String("json_config", "", "json file describing the host list to use fot vitess://vtgate resolution") - addressField = flag.String("address_field", "address", "field name in the json file containing the address") - portField = flag.String("port_field", "port", "field name in the json file containing the port") - numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain") -) - // File based discovery for vtgate grpc endpoints +// // This loads the list of hosts from json and watches for changes to the list of hosts. It will select N connection to maintain to backend vtgates. // Connections will rebalance every 5 minutes // @@ -48,225 +42,342 @@ var ( // { // "address": "10.4.56.194", // "az_id": "use1-az1", -// "grpc": "15999", +// "port": 15999, // "type": "aux" // }, // -// Naming scheme: -// vtgate://?num_connections=&az_id= +// URL scheme: +// vtgate://?az_id= // // num_connections: Option number of hosts to open connections to for round-robin selection // az_id: Filter to just hosts in this az (optional) // type: Only select from hosts of this type (required) // -type DiscoveryHost struct { - Address string - NebulaAddress string `json:"nebula_address"` - Grpc string - AZId string `json:"az_id"` - Type string +type JSONGateResolverBuilder struct { + jsonPath string + addressField string + portField string + poolTypeField string + affinityField string + + targets []targetHost + resolvers []*JSONGateResolver + + rand *rand.Rand + ticker *time.Ticker + checksum []byte } -type JSONGateConfigDiscovery struct { - JsonPath string +type targetHost struct { + addr string + poolType string + affinity string } -func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - attrs := target.URL.Query() +// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). +type JSONGateResolver struct { + target resolver.Target + clientConn resolver.ClientConn + poolType string + affinity string +} - // If the config specifies a pool type attribute, then the caller must supply it in the connection - // attributes, otherwise reject the request. - poolType := "" - if *poolTypeAttr != "" { - poolType = attrs.Get(*poolTypeAttr) - if poolType == "" { - return nil, fmt.Errorf("pool type attribute %s not in target", *poolTypeAttr) - } - } +var ( + buildCount = stats.NewCounter("JsonDiscoveryBuild", "JSON host discovery rebuilt the host list") + unchangedCount = stats.NewCounter("JsonDiscoveryUnchanged", "JSON host discovery parsed and determined no change to the file") + affinityCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostAffinity", "Count of hosts returned from discovery by AZ affinity", "affinity") + poolTypeCount = stats.NewCountersWithSingleLabel("JsonDiscoveryHostPoolType", "Count of hosts returned from discovery by pool type", "type") +) - // Affinity on the other hand is just an optimization - affinity := "" - if *affinityAttr != "" { - affinity = attrs.Get(*affinityAttr) +func RegisterJSONGateResolver( + jsonPath string, + addressField string, + portField string, + poolTypeField string, + affinityField string, +) (*JSONGateResolverBuilder, error) { + jsonDiscovery := &JSONGateResolverBuilder{ + jsonPath: jsonPath, + addressField: addressField, + portField: portField, + poolTypeField: poolTypeField, + affinityField: affinityField, } - log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity) + resolver.Register(jsonDiscovery) + log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), jsonPath) - r := &JSONGateConfigResolver{ - target: target, - cc: cc, - jsonPath: b.JsonPath, - poolType: poolType, - affinity: affinity, + err := jsonDiscovery.start() + if err != nil { + return nil, err } - r.start() - return r, nil + + return jsonDiscovery, nil } -func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" } -func RegisterJsonDiscovery() { - jsonDiscovery := &JSONGateConfigDiscovery{ - JsonPath: *jsonDiscoveryConfig, +func (*JSONGateResolverBuilder) Scheme() string { return "vtgate" } + +// Parse and validate the format of the file and start watching for changes +func (b *JSONGateResolverBuilder) start() error { + + b.rand = rand.New(rand.NewSource(time.Now().UnixNano())) + + // Perform the initial parse + _, err := b.parse() + if err != nil { + return err } - resolver.Register(jsonDiscovery) - log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), *jsonDiscoveryConfig) -} -// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). -type JSONGateConfigResolver struct { - target resolver.Target - cc resolver.ClientConn - jsonPath string - poolType string - affinity string + // Validate some stats + if len(b.targets) == 0 { + return fmt.Errorf("no valid targets in file %s", b.jsonPath) + } - ticker *time.Ticker - rand *rand.Rand // safe for concurrent use. -} + // Log some stats on startup + poolTypes := map[string]int{} + affinityTypes := map[string]int{} -func min(a, b int) int { - if a < b { - return a + for _, t := range b.targets { + count := poolTypes[t.poolType] + poolTypes[t.poolType] = count + 1 + + count = affinityTypes[t.affinity] + affinityTypes[t.affinity] = count + 1 } - return b -} -func jsonDump(data interface{}) string { - json, _ := json.Marshal(data) - return string(json) -} + buildCount.Add(1) -func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error) { + log.Infof("loaded %d targets, pool types %v, affinity groups %v", len(b.targets), poolTypes, affinityTypes) - log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) + // Start a config watcher + b.ticker = time.NewTicker(1 * time.Second) + fileStat, err := os.Stat(b.jsonPath) + if err != nil { + return err + } + + go func() { + for range b.ticker.C { + checkFileStat, err := os.Stat(b.jsonPath) + if err != nil { + log.Errorf("Error stat'ing config %v\n", err) + continue + } + isUnchanged := checkFileStat.Size() == fileStat.Size() && checkFileStat.ModTime() == fileStat.ModTime() + if isUnchanged { + // no change + continue + } + + fileStat = checkFileStat + + contentsChanged, err := b.parse() + if err != nil || !contentsChanged { + unchangedCount.Add(1) + continue + } + + buildCount.Add(1) - data, err := os.ReadFile(r.jsonPath) + // notify all the resolvers that the targets changed + for _, r := range b.resolvers { + b.update(r) + } + } + }() + + return nil +} + +// parse the file and build the target host list, returning whether or not the list was +// updated since the last parse, or if the checksum matched +func (b *JSONGateResolverBuilder) parse() (bool, error) { + data, err := os.ReadFile(b.jsonPath) if err != nil { - return nil, nil, err + return false, err + } + + h := sha256.New() + if _, err := io.Copy(h, bytes.NewReader(data)); err != nil { + return false, err } + sum := h.Sum(nil) + + if bytes.Equal(sum, b.checksum) { + log.V(100).Infof("file did not change (checksum %x), skipping re-parse", sum) + return false, nil + } + b.checksum = sum + log.V(100).Infof("detected file change (checksum %x), parsing", sum) hosts := []map[string]interface{}{} err = json.Unmarshal(data, &hosts) if err != nil { - log.Errorf("error parsing JSON discovery file %s: %v\n", r.jsonPath, err) - return nil, nil, err + return false, fmt.Errorf("error parsing JSON discovery file %s: %v", b.jsonPath, err) } - // optionally filter to only hosts that match the pool type - if r.poolType != "" { - candidates := []map[string]interface{}{} - for _, host := range hosts { - hostType, ok := host[*poolTypeAttr] - if ok && hostType == r.poolType { - candidates = append(candidates, host) - log.V(1000).Infof("matched host %s with type %s", jsonDump(host), hostType) - } else { - log.V(1000).Infof("skipping host %s with type %s", jsonDump(host), hostType) - } + for _, host := range hosts { + address, hasAddress := host[b.addressField] + port, hasPort := host[b.portField] + poolType, hasPoolType := host[b.poolTypeField] + affinity, hasAffinity := host[b.affinityField] + + if !hasAddress { + return false, fmt.Errorf("error parsing JSON discovery file %s: address field %s not present", b.jsonPath, b.addressField) + } + + if !hasPort { + return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s not present", b.jsonPath, b.portField) + } + + if b.poolTypeField != "" && !hasPoolType { + return false, fmt.Errorf("error parsing JSON discovery file %s: pool type field %s not present", b.jsonPath, b.poolTypeField) + } + + if b.affinityField != "" && !hasAffinity { + return false, fmt.Errorf("error parsing JSON discovery file %s: affinity field %s not present", b.jsonPath, b.affinityField) + } + + if b.poolTypeField == "" { + poolType = "" + } + + if b.affinityField == "" { + affinity = "" + } + + // Handle both int and string values for port + switch port.(type) { + case int: + port = fmt.Sprintf("%d", port) + case string: + // nothing to do + default: + return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s has invalid value %v", b.jsonPath, b.portField, port) } - hosts = candidates + + b.targets = append(b.targets, targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}) } - // Shuffle to ensure every host has a different order to iterate through - r.rand.Shuffle(len(hosts), func(i, j int) { - hosts[i], hosts[j] = hosts[j], hosts[i] - }) - - // If affinity is specified, then shuffle those hosts to the front - if r.affinity != "" { - i := 0 - for j := 0; j < len(hosts); j++ { - hostAffinity, ok := hosts[j][*affinityAttr] - if ok && hostAffinity == r.affinity { - hosts[i], hosts[j] = hosts[j], hosts[i] - i++ - } + return true, nil +} + +// Update the current list of hosts for the given resolver +func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { + + log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) + + // filter to only targets that match the pool type. if unset, this will just be a copy + // of the full target list. + targets := []targetHost{} + for _, target := range b.targets { + if r.poolType == target.poolType { + targets = append(targets, target) + log.V(1000).Infof("matched target %v with type %s", target, r.poolType) + } else { + log.V(1000).Infof("skipping host %v with type %s", target, r.poolType) + } + } + + // Shuffle to ensure every host has a different order to iterate through, putting + // the affinity matching (e.g. same az) hosts at the front and the non-matching ones + // at the end. + // + // Only need to do n-1 swaps since the last host is always in the right place. + n := len(targets) + head := 0 + tail := n - 1 + for i := 0; i < n-1; i++ { + j := head + b.rand.Intn(tail-head+1) + + if r.affinity == "" || r.affinity == targets[j].affinity { + targets[head], targets[j] = targets[j], targets[head] + head++ + } else { + targets[tail], targets[j] = targets[j], targets[tail] + tail-- } } // Grab the first N addresses, and voila! var addrs []resolver.Address - hosts = hosts[:min(*numConnections, len(hosts))] - for _, host := range hosts { - addrs = append(addrs, resolver.Address{ - Addr: fmt.Sprintf("%s:%s", host[*addressField], host[*portField]), - }) + targets = targets[:min(*numConnections, len(targets))] + for _, target := range targets { + addrs = append(addrs, resolver.Address{Addr: target.addr}) } - h := sha256.New() - if _, err := io.Copy(h, bytes.NewReader(data)); err != nil { - return nil, nil, err + // Count some metrics + var unknown, local, remote int64 + for _, target := range targets { + if r.affinity == "" { + unknown++ + } else if r.affinity == target.affinity { + local++ + } else { + remote++ + } } - sum := h.Sum(nil) + if unknown != 0 { + affinityCount.Add("unknown", unknown) + } + affinityCount.Add("local", local) + affinityCount.Add("remote", remote) + poolTypeCount.Add(r.poolType, int64(len(targets))) - log.V(100).Infof("resolved %s to hosts %s addrs: 0x%x, %v\n", r.target.URL.String(), jsonDump(hosts), sum, addrs) + log.V(100).Infof("updated targets for %s to %v (local %d / remote %d)", r.target.URL.String(), targets, local, remote) - return &addrs, sum, nil + r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } -func (r *JSONGateConfigResolver) start() { - log.V(100).Infof("Starting discovery checker\n") - r.rand = rand.New(rand.NewSource(time.Now().UnixNano())) +// Build a new Resolver to route to the given target +func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + attrs := target.URL.Query() - // Immediately load the initial config - addrs, hash, err := r.resolve() - if err == nil { - // if we parse ok, populate the local address store - r.cc.UpdateState(resolver.State{Addresses: *addrs}) + // If the config specifies a pool type attribute, then the caller must supply it in the connection + // attributes, otherwise reject the request. + poolType := "" + if b.poolTypeField != "" { + poolType = attrs.Get(b.poolTypeField) + if poolType == "" { + return nil, fmt.Errorf("pool type attribute %s not in target", b.poolTypeField) + } } - // Start a config watcher - r.ticker = time.NewTicker(100 * time.Millisecond) - fileStat, err := os.Stat(r.jsonPath) - if err != nil { - return + // Affinity on the other hand is just an optimization + affinity := "" + if b.affinityField != "" { + affinity = attrs.Get(b.affinityField) } - go func() { - for range r.ticker.C { - checkFileStat, err := os.Stat(r.jsonPath) - if err != nil { - log.Errorf("Error stat'ing config %v\n", err) - continue - } - isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime() - if isUnchanged { - // no change - continue - } - - fileStat = checkFileStat - log.V(100).Infof("Detected config change\n") - addrs, newHash, err := r.resolve() - if err != nil { - // better luck next loop - // TODO: log this - log.Errorf("Error resolving config: %v\n", err) - continue - } + log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity) - // Make sure this wasn't a spurious change by checking the hash - if bytes.Equal(hash, newHash) && newHash != nil { - log.V(100).Infof("No content changed in discovery file... ignoring\n") - continue - } + r := &JSONGateResolver{ + target: target, + clientConn: cc, + poolType: poolType, + affinity: affinity, + } - hash = newHash + b.update(r) + b.resolvers = append(b.resolvers, r) - r.cc.UpdateState(resolver.State{Addresses: *addrs}) - } - }() + return r, nil +} - log.V(100).Infof("Loaded hosts, starting ticker\n") +func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} +func (r *JSONGateResolver) Close() { + log.Infof("Closing resolver for target %s", r.target.URL.String()) } -func (r *JSONGateConfigResolver) ResolveNow(o resolver.ResolveNowOptions) {} -func (r *JSONGateConfigResolver) Close() { - r.ticker.Stop() + +// Utilities +func min(a, b int) int { + if a < b { + return a + } + return b } func init() { - // Register the example ResolverBuilder. This is usually done in a package's - // init() function. } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 8a30dd5efa..bdf4434845 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -40,8 +40,12 @@ import ( ) var ( - poolTypeAttr = flag.String("pool_type_attr", "", "Attribute (both mysql connection and JSON file) used to specify the target vtgate type and filter the hosts, e.g. 'type'") - affinityAttr = flag.String("affinity_attr", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'") + vtgateHostsFile = flag.String("vtgate_hosts_file", "", "json file describing the host list to use for vtgate:// resolution") + numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain") + poolTypeField = flag.String("pool_type_field", "", "Field name used to specify the target vtgate type and filter the hosts") + affinityField = flag.String("affinity_field", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'") + addressField = flag.String("address_field", "address", "field name in the json file containing the address") + portField = flag.String("port_field", "port", "field name in the json file containing the port") vtGateProxy *VTGateProxy = &VTGateProxy{ targetConns: map[string]*vtgateconn.VTGateConn{}, @@ -103,19 +107,19 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu values := url.Values{} - if *poolTypeAttr != "" { - poolType, ok := connectionAttributes[*poolTypeAttr] + if *poolTypeField != "" { + poolType, ok := connectionAttributes[*poolTypeField] if ok { - values.Set(*poolTypeAttr, poolType) + values.Set(*poolTypeField, poolType) } else { - return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *poolTypeAttr) + return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *poolTypeField) } } - if *affinityAttr != "" { - affinity, ok := connectionAttributes[*affinityAttr] + if *affinityField != "" { + affinity, ok := connectionAttributes[*affinityField] if ok { - values.Set(*affinityAttr, affinity) + values.Set(*affinityField, affinity) } } @@ -183,4 +187,12 @@ func Init() { grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil }) + + RegisterJSONGateResolver( + *vtgateHostsFile, + *addressField, + *portField, + *poolTypeField, + *affinityField, + ) }