diff --git a/go/cmd/vtgateproxy/vtgateproxy.go b/go/cmd/vtgateproxy/vtgateproxy.go index 96ec4cb1d91..9a827d30f6c 100644 --- a/go/cmd/vtgateproxy/vtgateproxy.go +++ b/go/cmd/vtgateproxy/vtgateproxy.go @@ -17,9 +17,6 @@ limitations under the License. package main import ( - "math/rand" - "time" - "vitess.io/vitess/go/exit" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vtgateproxy" @@ -28,7 +25,6 @@ import ( var () func init() { - rand.Seed(time.Now().UnixNano()) servenv.RegisterDefaultFlags() } diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 74fc6cd1469..0ed473d5228 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -9,7 +9,7 @@ You may obtain a copy of the License at Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieb. See the License for the specific language governing permissions and limitations under the License. */ @@ -41,147 +41,243 @@ import ( // { // "address": "10.4.56.194", // "az_id": "use1-az1", -// "grpc": "15999", +// "port": 15999, // "type": "aux" // }, // // URKL scheme: -// vtgate://?num_connections=&az_id= +// vtgate://?az_id= // // num_connections: Option number of hosts to open connections to for round-robin selection // az_id: Filter to just hosts in this az (optional) // type: Only select from hosts of this type (required) // -type JSONGateConfigDiscovery struct { +type JSONGateResolverBuilder struct { jsonPath string - addressField string + addressField string // XXX FOR NOW portField string poolTypeField string affinityField string -} -// Build a new Resolver to route to the given target -func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - attrs := target.URL.Query() - - // If the config specifies a pool type attribute, then the caller must supply it in the connection - // attributes, otherwise reject the request. - poolType := "" - if b.poolTypeField != "" { - poolType = attrs.Get(b.poolTypeField) - if poolType == "" { - return nil, fmt.Errorf("pool type attribute %s not in target", b.poolTypeField) - } - } + targets []targetHost + resolvers []*JSONGateResolver - // Affinity on the other hand is just an optimization - affinity := "" - if b.affinityField != "" { - affinity = attrs.Get(b.affinityField) - } + rand *rand.Rand + ticker *time.Ticker + checksum []byte +} - log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity) +type targetHost struct { + addr string + poolType string + affinity string +} - r := &JSONGateConfigResolver{ - target: target, - cc: cc, - jsonPath: b.jsonPath, - poolType: poolType, - affinity: affinity, - } - r.start() - return r, nil +// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). +type JSONGateResolver struct { + target resolver.Target + clientConn resolver.ClientConn + poolType string + affinity string } -func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" } -func RegisterJsonDiscovery( +func RegisterJSONGateResolver( jsonPath string, addressField string, portField string, poolTypeField string, affinityField string, -) *JSONGateConfigDiscovery { - jsonDiscovery := &JSONGateConfigDiscovery{ - jsonPath, - addressField, - portField, - poolTypeField, - affinityField, +) (*JSONGateResolverBuilder, error) { + jsonDiscovery := &JSONGateResolverBuilder{ + jsonPath: jsonPath, + addressField: addressField, + portField: portField, + poolTypeField: poolTypeField, + affinityField: affinityField, } + resolver.Register(jsonDiscovery) log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), jsonPath) - return jsonDiscovery + err := jsonDiscovery.start() + if err != nil { + return nil, err + } + + return jsonDiscovery, nil } -// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). -type JSONGateConfigResolver struct { - target resolver.Target - cc resolver.ClientConn - jsonPath string - poolType string - affinity string +func (*JSONGateResolverBuilder) Scheme() string { return "vtgate" } - ticker *time.Ticker - rand *rand.Rand // safe for concurrent use. -} +// Parse and validate the format of the file and start watching for changes +func (b *JSONGateResolverBuilder) start() error { -func min(a, b int) int { - if a < b { - return a + b.rand = rand.New(rand.NewSource(time.Now().UnixNano())) + + // Perform the initial parse + _, err := b.parse() + if err != nil { + return err } - return b -} -func jsonDump(data interface{}) string { - json, _ := json.Marshal(data) - return string(json) -} + // Validate some stats + if len(b.targets) == 0 { + return fmt.Errorf("no valid targets in file %s", b.jsonPath) + } -func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error) { + // Log some stats on startup + poolTypes := map[string]int{} + affinityTypes := map[string]int{} - log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) + for _, t := range b.targets { + count := poolTypes[t.poolType] + poolTypes[t.poolType] = count + 1 + + count = affinityTypes[t.affinity] + affinityTypes[t.affinity] = count + 1 + } + + log.Infof("loaded %d targets, pool types %v, affinity groups %v", len(b.targets), poolTypes, affinityTypes) - data, err := os.ReadFile(r.jsonPath) + // Start a config watcher + b.ticker = time.NewTicker(100 * time.Millisecond) + fileStat, err := os.Stat(b.jsonPath) if err != nil { - return nil, nil, err + return err } + go func() { + for range b.ticker.C { + checkFileStat, err := os.Stat(b.jsonPath) + if err != nil { + log.Errorf("Error stat'ing config %v\n", err) + continue + } + isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime() + if isUnchanged { + // no change + continue + } + + fileStat = checkFileStat + + contentsChanged, err := b.parse() + if err != nil || !contentsChanged { + continue + } + + // notify all the resolvers that the targets changed + for _, r := range b.resolvers { + b.update(r) + } + } + }() + + return nil +} + +// parse the file and build the target host list, returning whether or not the list was +// updated since the last parse, or if the checksum matched +func (b *JSONGateResolverBuilder) parse() (bool, error) { + data, err := os.ReadFile(b.jsonPath) + if err != nil { + return false, err + } + + h := sha256.New() + if _, err := io.Copy(h, bytes.NewReader(data)); err != nil { + return false, err + } + sum := h.Sum(nil) + + if bytes.Equal(sum, b.checksum) { + log.V(100).Infof("file did not change (checksum %x), skipping re-parse", sum) + return false, nil + } + b.checksum = sum + log.V(100).Infof("detected file change (checksum %x), parsing", sum) + hosts := []map[string]interface{}{} err = json.Unmarshal(data, &hosts) if err != nil { - log.Errorf("error parsing JSON discovery file %s: %v\n", r.jsonPath, err) - return nil, nil, err + return false, fmt.Errorf("error parsing JSON discovery file %s: %v", b.jsonPath, err) } - // optionally filter to only hosts that match the pool type - if r.poolType != "" { - candidates := []map[string]interface{}{} - for _, host := range hosts { - hostType, ok := host[*poolTypeField] - if ok && hostType == r.poolType { - candidates = append(candidates, host) - log.V(1000).Infof("matched host %s with type %s", jsonDump(host), hostType) - } else { - log.V(1000).Infof("skipping host %s with type %s", jsonDump(host), hostType) - } + for _, host := range hosts { + address, hasAddress := host[b.addressField] + port, hasPort := host[b.portField] + poolType, hasPoolType := host[b.poolTypeField] + affinity, hasAffinity := host[b.affinityField] + + if !hasAddress { + return false, fmt.Errorf("error parsing JSON discovery file %s: address field %s not present", b.jsonPath, b.addressField) + } + + if !hasPort { + return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s not present", b.jsonPath, b.portField) + } + + if b.poolTypeField != "" && !hasPoolType { + return false, fmt.Errorf("error parsing JSON discovery file %s: pool type field %s not present", b.jsonPath, b.poolTypeField) + } + + if b.affinityField != "" && !hasAffinity { + return false, fmt.Errorf("error parsing JSON discovery file %s: affinity field %s not present", b.jsonPath, b.affinityField) + } + + if b.poolTypeField == "" { + poolType = "" + } + + if b.affinityField == "" { + affinity = "" + } + + // Handle both int and string values for port + switch port.(type) { + case int: + port = fmt.Sprintf("%d", port) + case string: + // nothing to do + default: + return false, fmt.Errorf("error parsing JSON discovery file %s: port field %s has invalid value %v", b.jsonPath, b.portField, port) + } + + b.targets = append(b.targets, targetHost{fmt.Sprintf("%s:%s", address, port), poolType.(string), affinity.(string)}) + } + + return true, nil +} + +// Update the current list of hosts for the given resolver +func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { + + log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) + + // filter to only targets that match the pool type. if unset, this will just be a copy + // of the full target list. + targets := []targetHost{} + for _, target := range b.targets { + if r.poolType == target.poolType { + targets = append(targets, target) + log.V(1000).Infof("matched target %v with type %s", target, r.poolType) + } else { + log.V(1000).Infof("skipping host %v with type %s", target, r.poolType) } - hosts = candidates } // Shuffle to ensure every host has a different order to iterate through - r.rand.Shuffle(len(hosts), func(i, j int) { - hosts[i], hosts[j] = hosts[j], hosts[i] + b.rand.Shuffle(len(targets), func(i, j int) { + targets[i], targets[j] = targets[j], targets[i] }) // If affinity is specified, then shuffle those hosts to the front if r.affinity != "" { i := 0 - for j := 0; j < len(hosts); j++ { - hostAffinity, ok := hosts[j][*affinityField] - if ok && hostAffinity == r.affinity { - hosts[i], hosts[j] = hosts[j], hosts[i] + for j := 0; j < len(targets); j++ { + if r.affinity == targets[j].affinity { + targets[i], targets[j] = targets[j], targets[i] i++ } } @@ -189,91 +285,64 @@ func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error) // Grab the first N addresses, and voila! var addrs []resolver.Address - hosts = hosts[:min(*numConnections, len(hosts))] - for _, host := range hosts { - addrs = append(addrs, resolver.Address{ - Addr: fmt.Sprintf("%s:%s", host[*addressField], host[*portField]), - }) + targets = targets[:min(*numConnections, len(targets))] + for _, target := range targets { + addrs = append(addrs, resolver.Address{Addr: target.addr}) } - h := sha256.New() - if _, err := io.Copy(h, bytes.NewReader(data)); err != nil { - return nil, nil, err - } - sum := h.Sum(nil) + log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) - if log.V(100) { - log.Infof("resolved %s to:", r.target.URL.String()) - for _, host := range hosts { - log.Infof(" %s:%s (type: %s) (affinity: %s)", host[*addressField], host[*portField], host[*poolTypeField], host[*affinityField]) - } - } - - return &addrs, sum, nil + r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } -func (r *JSONGateConfigResolver) start() { - log.V(100).Infof("Starting discovery checker\n") - r.rand = rand.New(rand.NewSource(time.Now().UnixNano())) +// Build a new Resolver to route to the given target +func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + attrs := target.URL.Query() - // Immediately load the initial config - addrs, hash, err := r.resolve() - if err == nil { - // if we parse ok, populate the local address store - r.cc.UpdateState(resolver.State{Addresses: *addrs}) + // If the config specifies a pool type attribute, then the caller must supply it in the connection + // attributes, otherwise reject the request. + poolType := "" + if b.poolTypeField != "" { + poolType = attrs.Get(b.poolTypeField) + if poolType == "" { + return nil, fmt.Errorf("pool type attribute %s not in target", b.poolTypeField) + } } - // Start a config watcher - r.ticker = time.NewTicker(100 * time.Millisecond) - fileStat, err := os.Stat(r.jsonPath) - if err != nil { - return + // Affinity on the other hand is just an optimization + affinity := "" + if b.affinityField != "" { + affinity = attrs.Get(b.affinityField) } - go func() { - for range r.ticker.C { - checkFileStat, err := os.Stat(r.jsonPath) - if err != nil { - log.Errorf("Error stat'ing config %v\n", err) - continue - } - isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime() - if isUnchanged { - // no change - continue - } - fileStat = checkFileStat - log.V(100).Infof("Detected config change\n") - - addrs, newHash, err := r.resolve() - if err != nil { - // better luck next loop - // TODO: log this - log.Errorf("Error resolving config: %v\n", err) - continue - } + log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity) - // Make sure this wasn't a spurious change by checking the hash - if bytes.Equal(hash, newHash) && newHash != nil { - log.V(100).Infof("No content changed in discovery file... ignoring\n") - continue - } + r := &JSONGateResolver{ + target: target, + clientConn: cc, + poolType: poolType, + affinity: affinity, + } - hash = newHash + b.update(r) + b.resolvers = append(b.resolvers, r) - r.cc.UpdateState(resolver.State{Addresses: *addrs}) - } - }() + return r, nil +} - log.V(100).Infof("Loaded hosts, starting ticker\n") +func (r *JSONGateResolver) ResolveNow(o resolver.ResolveNowOptions) {} +func (r *JSONGateResolver) Close() { + log.Infof("Closing resolver for target %s", r.target.URL.String()) } -func (r *JSONGateConfigResolver) ResolveNow(o resolver.ResolveNowOptions) {} -func (r *JSONGateConfigResolver) Close() { - r.ticker.Stop() + +// Utilities +func min(a, b int) int { + if a < b { + return a + } + return b } func init() { - // Register the example ResolverBuilder. This is usually done in a package's - // init() function. } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index e4565d930e2..09a7415f404 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -75,12 +75,6 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt proxy.mu.RUnlock() proxy.mu.Lock() - // Otherwise create a new connection after dropping the lock, allowing multiple requests to - // race to create the conn for now. - grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil - }) - conn, err := vtgateconn.DialProtocol(ctx, "grpc", target) if err != nil { return nil, err @@ -178,7 +172,14 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn } func Init() { - RegisterJsonDiscovery( + + // Otherwise create a new connection after dropping the lock, allowing multiple requests to + // race to create the conn for now. + grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { + return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil + }) + + RegisterJSONGateResolver( *vtgateHostsFile, *addressField, *portField,