Skip to content

Commit

Permalink
Cleanup discovery. Prep for load balancing
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheinblum authored and brirams committed Mar 8, 2024
1 parent 82258ab commit 439252f
Showing 1 changed file with 43 additions and 21 deletions.
64 changes: 43 additions & 21 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package vtgateproxy

import (
"bytes"
"crypto/sha256"
"encoding/json"
"flag"
"fmt"
"io"
"math/rand"
"os"
"strconv"
"time"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)

Expand Down Expand Up @@ -51,6 +55,7 @@ type JSONGateConfigDiscovery struct {
}

func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
fmt.Printf("Start registration for target: %v\n", target.URL.String())
queryOpts := target.URL.Query()
queryParamCount := queryOpts.Get("num_connections")
queryAZID := queryOpts.Get("az_id")
Expand Down Expand Up @@ -84,9 +89,11 @@ func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" }

func RegisterJsonDiscovery() {
fmt.Printf("Registering: %v\n", *jsonDiscoveryConfig)
resolver.Register(&JSONGateConfigDiscovery{
jsonDiscovery := &JSONGateConfigDiscovery{
JsonPath: *jsonDiscoveryConfig,
})
}
resolver.Register(jsonDiscovery)
fmt.Printf("Registered %v scheme\n", jsonDiscovery.Scheme())
}

type resolveFilters struct {
Expand All @@ -106,41 +113,40 @@ type resolveJSONGateConfig struct {
filters resolveFilters
}

func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, error) {
func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) {
config := []DiscoveryHost{}
fmt.Printf("Loading config %v\n", r.jsonPath)

data, err := os.ReadFile(r.jsonPath)
if err != nil {
return nil, err
return nil, nil, err
}

err = json.Unmarshal(data, &config)
if err != nil {
fmt.Printf("parse err: %v\n", err)
return nil, err
return nil, nil, err
}

fmt.Printf("%v\n", config)

addrs := []resolver.Address{}
for _, s := range config {
// Apply filters
if r.filters.gate_type != "" {
if r.filters.gate_type != s.Type {
// fmt.Printf("Dropped non matching type: %v\n", s.Type)
continue
}
}

if r.filters.az_id != "" {
if r.filters.az_id != s.AZId {
fmt.Printf("Dropped non matching az: %v\n", s.AZId)
continue
}
}
// Add matching hosts to registration list
fmt.Printf("selected host for discovery: %v %v\n", fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc), s)
addrs = append(addrs, resolver.Address{Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc)})
addrs = append(addrs, resolver.Address{
Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc),
BalancerAttributes: attributes.New("az", s.AZId),
})
}

// Shuffle to ensure every host has a different order to iterate through
Expand All @@ -153,17 +159,21 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, error) {
addrs = addrs[0:r.num_connections]
}

fmt.Printf("Returning discovery: %v\n", addrs)
h := sha256.New()
if _, err := io.Copy(h, bytes.NewReader(data)); err != nil {
return nil, nil, err
}

return &addrs, nil
fmt.Printf("Returning discovery: %d hosts checksum %x\n", len(addrs), h.Sum(nil))
return &addrs, h.Sum(nil), nil
}

func (r *resolveJSONGateConfig) start() {
fmt.Print("Starting discovery checker\n")
r.rand = rand.New(rand.NewSource(time.Now().UnixNano()))

// Immediately load the initial config
addrs, err := r.loadConfig()
addrs, hash, err := r.loadConfig()
if err == nil {
// if we parse ok, populate the local address store
r.cc.UpdateState(resolver.State{Addresses: *addrs})
Expand All @@ -175,33 +185,45 @@ func (r *resolveJSONGateConfig) start() {
if err != nil {
return
}
lastLoaded := time.Now()

go func() {
for range r.ticker.C {
checkFileStat, err := os.Stat(r.jsonPath)
if err != nil {
fmt.Printf("Error stat'ing config %v\n", err)
continue
}
isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime()
isNotExpired := time.Since(lastLoaded) < 1*time.Minute
if isUnchanged && isNotExpired {
if isUnchanged {
// no change
continue
}
lastLoaded = time.Now()

fileStat = checkFileStat
fmt.Printf("Detected config change\n")

addrs, err := r.loadConfig()
addrs, newHash, err := r.loadConfig()
if err != nil {
// better luck next loop
// TODO: log this
fmt.Print("oh no\n")
fmt.Print("Can't load config: %v\n", err)
continue
}

// Make sure this wasn't a spurious change by checking the hash
if bytes.Compare(hash, newHash) == 0 && newHash != nil {
fmt.Printf("No content changed in discovery file... ignoring\n")
continue
}

hash = newHash

fmt.Printf("Loaded %d hosts\n", len(*addrs))
r.cc.UpdateState(resolver.State{Addresses: *addrs})
}
}()

fmt.Printf("Loaded hosts, starting ticker\n")

}
func (r *resolveJSONGateConfig) ResolveNow(o resolver.ResolveNowOptions) {}
func (r *resolveJSONGateConfig) Close() {
Expand Down

0 comments on commit 439252f

Please sign in to comment.