Skip to content

Commit

Permalink
Invert the service discovery (#268)
Browse files Browse the repository at this point in the history
* Draft: very messy and doesn't compile

* Simplifyy

* less log, plz

* simplify more

* Simplified by a lot - much simpler

now pick fewer addresses

* fixy

* Account for infinite

* copyright nonsense

* clean up debug logging

* round_robin works!

* use rw mutex to serialize creation

* rework the filtering to make everything parameterized and more explicit

Change all the config so that instead of hard coded constants we set the
various connection attributes, json field names, etc using command line flags.

Then make the pool type and affinity arguments more explicit and less generic.

* no longer needed

* update comments

* only pass through the URL params we need

* affinity is actually optional

---------

Co-authored-by: Michael Demmer <[email protected]>
  • Loading branch information
jscheinblum and demmer authored Mar 23, 2024
1 parent ffa2e7f commit de9cbc1
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 240 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtgate/vtgateconn/vtgateconn.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Vitess Authors.
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
194 changes: 116 additions & 78 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vtgateproxy

import (
Expand All @@ -9,15 +24,18 @@ import (
"io"
"math/rand"
"os"
"strconv"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"

"vitess.io/vitess/go/vt/log"
)

var (
jsonDiscoveryConfig = flag.String("json_config", "", "json file describing the host list to use fot vitess://vtgate resolution")
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")
numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain")
)

// File based discovery for vtgate grpc endpoints
Expand Down Expand Up @@ -55,122 +73,144 @@ type JSONGateConfigDiscovery struct {
}

func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
fmt.Printf("Start registration for target: %v\n", target.URL.String())
queryOpts := target.URL.Query()
queryParamCount := queryOpts.Get("num_connections")
queryAZID := queryOpts.Get("az_id")
num_connections := 0

gateType := target.URL.Host

if queryParamCount != "" {
num_connections, _ = strconv.Atoi(queryParamCount)
attrs := target.URL.Query()

// If the config specifies a pool type attribute, then the caller must supply it in the connection
// attributes, otherwise reject the request.
poolType := ""
if *poolTypeAttr != "" {
poolType = attrs.Get(*poolTypeAttr)
if poolType == "" {
return nil, fmt.Errorf("pool type attribute %s not in target", *poolTypeAttr)
}
}

filters := resolveFilters{
gate_type: gateType,
// Affinity on the other hand is just an optimization
affinity := ""
if *affinityAttr != "" {
affinity = attrs.Get(*affinityAttr)
}

if queryAZID != "" {
filters.az_id = queryAZID
}
log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity)

r := &resolveJSONGateConfig{
target: target,
cc: cc,
jsonPath: b.JsonPath,
num_connections: num_connections,
filters: filters,
r := &JSONGateConfigResolver{
target: target,
cc: cc,
jsonPath: b.JsonPath,
poolType: poolType,
affinity: affinity,
}
r.start()
return r, nil
}
func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" }

func RegisterJsonDiscovery() {
fmt.Printf("Registering: %v\n", *jsonDiscoveryConfig)
jsonDiscovery := &JSONGateConfigDiscovery{
JsonPath: *jsonDiscoveryConfig,
}
resolver.Register(jsonDiscovery)
fmt.Printf("Registered %v scheme\n", jsonDiscovery.Scheme())
log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), *jsonDiscoveryConfig)
}

type resolveFilters struct {
gate_type string
az_id string
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type JSONGateConfigResolver struct {
target resolver.Target
cc resolver.ClientConn
jsonPath string
poolType string
affinity string

ticker *time.Ticker
rand *rand.Rand // safe for concurrent use.
}

// exampleResolver is a
// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
type resolveJSONGateConfig struct {
target resolver.Target
cc resolver.ClientConn
jsonPath string
ticker *time.Ticker
rand *rand.Rand // safe for concurrent use.
num_connections int
filters resolveFilters
func min(a, b int) int {
if a < b {
return a
}
return b
}

func jsonDump(data interface{}) string {
json, _ := json.Marshal(data)
return string(json)
}

type discoverySlackAZ struct{}
type discoverySlackType struct{}
func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error) {

func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) {
config := []DiscoveryHost{}
fmt.Printf("Loading config %v\n", r.jsonPath)
log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections)

data, err := os.ReadFile(r.jsonPath)
if err != nil {
return nil, nil, err
}

err = json.Unmarshal(data, &config)
hosts := []map[string]interface{}{}
err = json.Unmarshal(data, &hosts)
if err != nil {
fmt.Printf("parse err: %v\n", err)
log.Errorf("error parsing JSON discovery file %s: %v\n", r.jsonPath, err)
return nil, nil, err
}

addrs := []resolver.Address{}
for _, s := range config {
az := attributes.New(discoverySlackAZ{}, s.AZId).WithValue(discoverySlackType{}, s.Type)
// optionally filter to only hosts that match the pool type
if r.poolType != "" {
candidates := []map[string]interface{}{}
for _, host := range hosts {
hostType, ok := host[*poolTypeAttr]
if ok && hostType == r.poolType {
candidates = append(candidates, host)
log.V(1000).Infof("matched host %s with type %s", jsonDump(host), hostType)
} else {
log.V(1000).Infof("skipping host %s with type %s", jsonDump(host), hostType)
}
}
hosts = candidates
}

// Filter hosts to this gate type
if r.filters.gate_type != "" {
if r.filters.gate_type != s.Type {
continue
// Shuffle to ensure every host has a different order to iterate through
r.rand.Shuffle(len(hosts), func(i, j int) {
hosts[i], hosts[j] = hosts[j], hosts[i]
})

// If affinity is specified, then shuffle those hosts to the front
if r.affinity != "" {
i := 0
for j := 0; j < len(hosts); j++ {
hostAffinity, ok := hosts[j][*affinityAttr]
if ok && hostAffinity == r.affinity {
hosts[i], hosts[j] = hosts[j], hosts[i]
i++
}
}
}

// Add matching hosts to registration list
// Grab the first N addresses, and voila!
var addrs []resolver.Address
hosts = hosts[:min(*numConnections, len(hosts))]
for _, host := range hosts {
addrs = append(addrs, resolver.Address{
Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc),
BalancerAttributes: az,
Addr: fmt.Sprintf("%s:%s", host[*addressField], host[*portField]),
})
}

fmt.Printf("Addrs: %v\n", addrs)

// Shuffle to ensure every host has a different order to iterate through
r.rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})

h := sha256.New()
if _, err := io.Copy(h, bytes.NewReader(data)); err != nil {
return nil, nil, err
}
sum := h.Sum(nil)

log.V(100).Infof("resolved %s to hosts %s addrs: 0x%x, %v\n", r.target.URL.String(), jsonDump(hosts), sum, addrs)

fmt.Printf("Returning discovery: %d hosts checksum %x\n", len(addrs), h.Sum(nil))
return &addrs, h.Sum(nil), nil
return &addrs, sum, nil
}

func (r *resolveJSONGateConfig) start() {
fmt.Print("Starting discovery checker\n")
func (r *JSONGateConfigResolver) start() {
log.V(100).Infof("Starting discovery checker\n")
r.rand = rand.New(rand.NewSource(time.Now().UnixNano()))

// Immediately load the initial config
addrs, hash, err := r.loadConfig()
addrs, hash, err := r.resolve()
if err == nil {
// if we parse ok, populate the local address store
r.cc.UpdateState(resolver.State{Addresses: *addrs})
Expand All @@ -186,7 +226,7 @@ func (r *resolveJSONGateConfig) start() {
for range r.ticker.C {
checkFileStat, err := os.Stat(r.jsonPath)
if err != nil {
fmt.Printf("Error stat'ing config %v\n", err)
log.Errorf("Error stat'ing config %v\n", err)
continue
}
isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime()
Expand All @@ -196,35 +236,33 @@ func (r *resolveJSONGateConfig) start() {
}

fileStat = checkFileStat
fmt.Printf("Detected config change\n")
log.V(100).Infof("Detected config change\n")

addrs, newHash, err := r.loadConfig()
addrs, newHash, err := r.resolve()
if err != nil {
// better luck next loop
// TODO: log this
fmt.Print("Can't load config: %v\n", err)
log.Errorf("Error resolving config: %v\n", err)
continue
}

// Make sure this wasn't a spurious change by checking the hash
if bytes.Compare(hash, newHash) == 0 && newHash != nil {
fmt.Printf("No content changed in discovery file... ignoring\n")
if bytes.Equal(hash, newHash) && newHash != nil {
log.V(100).Infof("No content changed in discovery file... ignoring\n")
continue
}

hash = newHash

fmt.Printf("Loaded %d hosts\n", len(*addrs))
fmt.Printf("Loaded %v", addrs)
r.cc.UpdateState(resolver.State{Addresses: *addrs})
}
}()

fmt.Printf("Loaded hosts, starting ticker\n")
log.V(100).Infof("Loaded hosts, starting ticker\n")

}
func (r *resolveJSONGateConfig) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *resolveJSONGateConfig) Close() {
func (r *JSONGateConfigResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *JSONGateConfigResolver) Close() {
r.ticker.Stop()
}

Expand Down
Loading

0 comments on commit de9cbc1

Please sign in to comment.