Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Invert the service discovery #268

Merged
merged 16 commits into from
Mar 23, 2024
2 changes: 1 addition & 1 deletion go/vt/vtgate/vtgateconn/vtgateconn.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Vitess Authors.
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
194 changes: 116 additions & 78 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vtgateproxy

import (
Expand All @@ -9,15 +24,18 @@ import (
"io"
"math/rand"
"os"
"strconv"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"

"vitess.io/vitess/go/vt/log"
)

var (
jsonDiscoveryConfig = flag.String("json_config", "", "json file describing the host list to use fot vitess://vtgate resolution")
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")
numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain")
)

// File based discovery for vtgate grpc endpoints
Expand Down Expand Up @@ -55,122 +73,144 @@ type JSONGateConfigDiscovery struct {
}

func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
fmt.Printf("Start registration for target: %v\n", target.URL.String())
queryOpts := target.URL.Query()
queryParamCount := queryOpts.Get("num_connections")
queryAZID := queryOpts.Get("az_id")
num_connections := 0

gateType := target.URL.Host

if queryParamCount != "" {
num_connections, _ = strconv.Atoi(queryParamCount)
attrs := target.URL.Query()

// If the config specifies a pool type attribute, then the caller must supply it in the connection
// attributes, otherwise reject the request.
poolType := ""
if *poolTypeAttr != "" {
poolType = attrs.Get(*poolTypeAttr)
if poolType == "" {
return nil, fmt.Errorf("pool type attribute %s not in target", *poolTypeAttr)
}
}

filters := resolveFilters{
gate_type: gateType,
// Affinity on the other hand is just an optimization
affinity := ""
if *affinityAttr != "" {
affinity = attrs.Get(*affinityAttr)
}

if queryAZID != "" {
filters.az_id = queryAZID
}
log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity)

r := &resolveJSONGateConfig{
target: target,
cc: cc,
jsonPath: b.JsonPath,
num_connections: num_connections,
filters: filters,
r := &JSONGateConfigResolver{
target: target,
cc: cc,
jsonPath: b.JsonPath,
poolType: poolType,
affinity: affinity,
}
r.start()
return r, nil
}
func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" }

func RegisterJsonDiscovery() {
fmt.Printf("Registering: %v\n", *jsonDiscoveryConfig)
jsonDiscovery := &JSONGateConfigDiscovery{
JsonPath: *jsonDiscoveryConfig,
}
resolver.Register(jsonDiscovery)
fmt.Printf("Registered %v scheme\n", jsonDiscovery.Scheme())
log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), *jsonDiscoveryConfig)
}

type resolveFilters struct {
gate_type string
az_id string
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type JSONGateConfigResolver struct {
target resolver.Target
cc resolver.ClientConn
jsonPath string
poolType string
affinity string

ticker *time.Ticker
rand *rand.Rand // safe for concurrent use.
}

// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type resolveJSONGateConfig struct {
target resolver.Target
cc resolver.ClientConn
jsonPath string
ticker *time.Ticker
rand *rand.Rand // safe for concurrent use.
num_connections int
filters resolveFilters
func min(a, b int) int {
if a < b {
return a
}
return b
}

func jsonDump(data interface{}) string {
json, _ := json.Marshal(data)
return string(json)
}

type discoverySlackAZ struct{}
type discoverySlackType struct{}
func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error) {

func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) {
config := []DiscoveryHost{}
fmt.Printf("Loading config %v\n", r.jsonPath)
log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)

data, err := os.ReadFile(r.jsonPath)
if err != nil {
return nil, nil, err
}

err = json.Unmarshal(data, &config)
hosts := []map[string]interface{}{}
err = json.Unmarshal(data, &hosts)
if err != nil {
fmt.Printf("parse err: %v\n", err)
log.Errorf("error parsing JSON discovery file %s: %v\n", r.jsonPath, err)
return nil, nil, err
}

addrs := []resolver.Address{}
for _, s := range config {
az := attributes.New(discoverySlackAZ{}, s.AZId).WithValue(discoverySlackType{}, s.Type)
// optionally filter to only hosts that match the pool type
if r.poolType != "" {
candidates := []map[string]interface{}{}
for _, host := range hosts {
hostType, ok := host[*poolTypeAttr]
if ok && hostType == r.poolType {
candidates = append(candidates, host)
log.V(1000).Infof("matched host %s with type %s", jsonDump(host), hostType)
} else {
log.V(1000).Infof("skipping host %s with type %s", jsonDump(host), hostType)
}
}
hosts = candidates
}

// Filter hosts to this gate type
if r.filters.gate_type != "" {
if r.filters.gate_type != s.Type {
continue
// Shuffle to ensure every host has a different order to iterate through
r.rand.Shuffle(len(hosts), func(i, j int) {
hosts[i], hosts[j] = hosts[j], hosts[i]
})

// If affinity is specified, then shuffle those hosts to the front
if r.affinity != "" {
i := 0
for j := 0; j < len(hosts); j++ {
hostAffinity, ok := hosts[j][*affinityAttr]
if ok && hostAffinity == r.affinity {
hosts[i], hosts[j] = hosts[j], hosts[i]
i++
}
}
}

// Add matching hosts to registration list
// Grab the first N addresses, and voila!
var addrs []resolver.Address
hosts = hosts[:min(*numConnections, len(hosts))]
for _, host := range hosts {
addrs = append(addrs, resolver.Address{
Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc),
BalancerAttributes: az,
Addr: fmt.Sprintf("%s:%s", host[*addressField], host[*portField]),
})
}

fmt.Printf("Addrs: %v\n", addrs)

// Shuffle to ensure every host has a different order to iterate through
r.rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})

h := sha256.New()
if _, err := io.Copy(h, bytes.NewReader(data)); err != nil {
return nil, nil, err
}
sum := h.Sum(nil)

log.V(100).Infof("resolved %s to hosts %s addrs: 0x%x, %v\n", r.target.URL.String(), jsonDump(hosts), sum, addrs)

fmt.Printf("Returning discovery: %d hosts checksum %x\n", len(addrs), h.Sum(nil))
return &addrs, h.Sum(nil), nil
return &addrs, sum, nil
}

func (r *resolveJSONGateConfig) start() {
fmt.Print("Starting discovery checker\n")
func (r *JSONGateConfigResolver) start() {
log.V(100).Infof("Starting discovery checker\n")
r.rand = rand.New(rand.NewSource(time.Now().UnixNano()))

// Immediately load the initial config
addrs, hash, err := r.loadConfig()
addrs, hash, err := r.resolve()
if err == nil {
// if we parse ok, populate the local address store
r.cc.UpdateState(resolver.State{Addresses: *addrs})
Expand All @@ -186,7 +226,7 @@ func (r *resolveJSONGateConfig) start() {
for range r.ticker.C {
checkFileStat, err := os.Stat(r.jsonPath)
if err != nil {
fmt.Printf("Error stat'ing config %v\n", err)
log.Errorf("Error stat'ing config %v\n", err)
continue
}
isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime()
Expand All @@ -196,35 +236,33 @@ func (r *resolveJSONGateConfig) start() {
}

fileStat = checkFileStat
fmt.Printf("Detected config change\n")
log.V(100).Infof("Detected config change\n")

addrs, newHash, err := r.loadConfig()
addrs, newHash, err := r.resolve()
if err != nil {
// better luck next loop
// TODO: log this
fmt.Print("Can't load config: %v\n", err)
log.Errorf("Error resolving config: %v\n", err)
continue
}

// Make sure this wasn't a spurious change by checking the hash
if bytes.Compare(hash, newHash) == 0 && newHash != nil {
fmt.Printf("No content changed in discovery file... ignoring\n")
if bytes.Equal(hash, newHash) && newHash != nil {
log.V(100).Infof("No content changed in discovery file... ignoring\n")
continue
}

hash = newHash

fmt.Printf("Loaded %d hosts\n", len(*addrs))
fmt.Printf("Loaded %v", addrs)
r.cc.UpdateState(resolver.State{Addresses: *addrs})
}
}()

fmt.Printf("Loaded hosts, starting ticker\n")
log.V(100).Infof("Loaded hosts, starting ticker\n")

}
func (r *resolveJSONGateConfig) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *resolveJSONGateConfig) Close() {
func (r *JSONGateConfigResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *JSONGateConfigResolver) Close() {
r.ticker.Stop()
}

Expand Down
Loading
Loading