Skip to content

Commit

Permalink
refactor a bit more to consolidate the command line flags
Browse files Browse the repository at this point in the history
  • Loading branch information
demmer committed Mar 29, 2024
1 parent de9cbc1 commit 7193ac9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 41 deletions.
2 changes: 0 additions & 2 deletions go/cmd/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ func main() {
servenv.Init()

servenv.OnRun(func() {
// Flags are parsed now. Parse the template using the actual flag value and overwrite the current template.
vtgateproxy.RegisterJsonDiscovery()
vtgateproxy.Init()
})

Expand Down
67 changes: 37 additions & 30 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"bytes"
"crypto/sha256"
"encoding/json"
"flag"
"fmt"
"io"
"math/rand"
Expand All @@ -31,14 +30,8 @@ import (
"vitess.io/vitess/go/vt/log"
)

var (
jsonDiscoveryConfig = flag.String("json_config", "", "json file describing the host list to use fot vitess://vtgate resolution")
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")
numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain")
)

// File based discovery for vtgate grpc endpoints
//
// This loads the list of hosts from json and watches for changes to the list of hosts. It will select N connection to maintain to backend vtgates.
// Connections will rebalance every 5 minutes
//
Expand All @@ -52,51 +45,48 @@ var (
// "type": "aux"
// },
//
// Naming scheme:
// URKL scheme:
// vtgate://<type>?num_connections=<int>&az_id=<string>
//
// num_connections: Option number of hosts to open connections to for round-robin selection
// az_id: Filter to just hosts in this az (optional)
// type: Only select from hosts of this type (required)
//

type DiscoveryHost struct {
Address string
NebulaAddress string `json:"nebula_address"`
Grpc string
AZId string `json:"az_id"`
Type string
}

type JSONGateConfigDiscovery struct {
JsonPath string
jsonPath string
addressField string
portField string
poolTypeField string
affinityField string
}

// Build a new Resolver to route to the given target
func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
attrs := target.URL.Query()

// If the config specifies a pool type attribute, then the caller must supply it in the connection
// attributes, otherwise reject the request.
poolType := ""
if *poolTypeAttr != "" {
poolType = attrs.Get(*poolTypeAttr)
if b.poolTypeField != "" {
poolType = attrs.Get(b.poolTypeField)
if poolType == "" {
return nil, fmt.Errorf("pool type attribute %s not in target", *poolTypeAttr)
return nil, fmt.Errorf("pool type attribute %s not in target", b.poolTypeField)
}
}

// Affinity on the other hand is just an optimization
affinity := ""
if *affinityAttr != "" {
affinity = attrs.Get(*affinityAttr)
if b.affinityField != "" {
affinity = attrs.Get(b.affinityField)
}

log.V(100).Infof("Start discovery for target %v poolType %s affinity %s\n", target.URL.String(), poolType, affinity)

r := &JSONGateConfigResolver{
target: target,
cc: cc,
jsonPath: b.JsonPath,
jsonPath: b.jsonPath,
poolType: poolType,
affinity: affinity,
}
Expand All @@ -105,12 +95,24 @@ func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.Clie
}
func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" }

func RegisterJsonDiscovery() {
func RegisterJsonDiscovery(
jsonPath string,
addressField string,
portField string,
poolTypeField string,
affinityField string,
) *JSONGateConfigDiscovery {
jsonDiscovery := &JSONGateConfigDiscovery{
JsonPath: *jsonDiscoveryConfig,
jsonPath,
addressField,
portField,
poolTypeField,
affinityField,
}
resolver.Register(jsonDiscovery)
log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), *jsonDiscoveryConfig)
log.Infof("Registered JSON discovery scheme %v to watch: %v\n", jsonDiscovery.Scheme(), jsonPath)

return jsonDiscovery
}

// Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver).
Expand Down Expand Up @@ -157,7 +159,7 @@ func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error)
if r.poolType != "" {
candidates := []map[string]interface{}{}
for _, host := range hosts {
hostType, ok := host[*poolTypeAttr]
hostType, ok := host[*poolTypeField]
if ok && hostType == r.poolType {
candidates = append(candidates, host)
log.V(1000).Infof("matched host %s with type %s", jsonDump(host), hostType)
Expand All @@ -177,7 +179,7 @@ func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error)
if r.affinity != "" {
i := 0
for j := 0; j < len(hosts); j++ {
hostAffinity, ok := hosts[j][*affinityAttr]
hostAffinity, ok := hosts[j][*affinityField]
if ok && hostAffinity == r.affinity {
hosts[i], hosts[j] = hosts[j], hosts[i]
i++
Expand All @@ -200,7 +202,12 @@ func (r *JSONGateConfigResolver) resolve() (*[]resolver.Address, []byte, error)
}
sum := h.Sum(nil)

log.V(100).Infof("resolved %s to hosts %s addrs: 0x%x, %v\n", r.target.URL.String(), jsonDump(hosts), sum, addrs)
if log.V(100) {
log.Infof("resolved %s to:", r.target.URL.String())
for _, host := range hosts {
log.Infof(" %s:%s (type: %s) (affinity: %s)", host[*addressField], host[*portField], host[*poolTypeField], host[*affinityField])
}
}

return &addrs, sum, nil
}
Expand Down
29 changes: 20 additions & 9 deletions go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ import (
)

var (
poolTypeAttr = flag.String("pool_type_attr", "", "Attribute (both mysql connection and JSON file) used to specify the target vtgate type and filter the hosts, e.g. 'type'")
affinityAttr = flag.String("affinity_attr", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'")
vtgateHostsFile = flag.String("vtgate_hosts_file", "", "json file describing the host list to use for vtgate:// resolution")
numConnections = flag.Int("num_connections", 4, "number of outbound GPRC connections to maintain")
poolTypeField = flag.String("pool_type_field", "", "Field name used to specify the target vtgate type and filter the hosts")
affinityField = flag.String("affinity_field", "", "Attribute (both mysql protocol connection and JSON file) used to specify the routing affinity , e.g. 'az_id'")
addressField = flag.String("address_field", "address", "field name in the json file containing the address")
portField = flag.String("port_field", "port", "field name in the json file containing the port")

vtGateProxy *VTGateProxy = &VTGateProxy{
targetConns: map[string]*vtgateconn.VTGateConn{},
Expand Down Expand Up @@ -98,19 +102,19 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu

values := url.Values{}

if *poolTypeAttr != "" {
poolType, ok := connectionAttributes[*poolTypeAttr]
if *poolTypeField != "" {
poolType, ok := connectionAttributes[*poolTypeField]
if ok {
values.Set(*poolTypeAttr, poolType)
values.Set(*poolTypeField, poolType)
} else {
return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *poolTypeAttr)
return nil, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "pool type attribute %s not supplied by client", *poolTypeField)
}
}

if *affinityAttr != "" {
affinity, ok := connectionAttributes[*affinityAttr]
if *affinityField != "" {
affinity, ok := connectionAttributes[*affinityField]
if ok {
values.Set(*affinityAttr, affinity)
values.Set(*affinityField, affinity)
}
}

Expand Down Expand Up @@ -174,4 +178,11 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn
}

func Init() {
RegisterJsonDiscovery(
*vtgateHostsFile,
*addressField,
*portField,
*poolTypeField,
*affinityField,
)
}

0 comments on commit 7193ac9

Please sign in to comment.