From 439252fac0dd3451eb94a04ff77ac25571c45816 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Fri, 3 Nov 2023 23:45:14 -0700 Subject: [PATCH 1/8] Cleanup discovery. Prep for load balancing --- go/vt/vtgateproxy/discovery.go | 64 +++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 055a2a1f677..e8e7c4970f8 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -1,14 +1,18 @@ package vtgateproxy import ( + "bytes" + "crypto/sha256" "encoding/json" "flag" "fmt" + "io" "math/rand" "os" "strconv" "time" + "google.golang.org/grpc/attributes" "google.golang.org/grpc/resolver" ) @@ -51,6 +55,7 @@ type JSONGateConfigDiscovery struct { } func (b *JSONGateConfigDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + fmt.Printf("Start registration for target: %v\n", target.URL.String()) queryOpts := target.URL.Query() queryParamCount := queryOpts.Get("num_connections") queryAZID := queryOpts.Get("az_id") @@ -84,9 +89,11 @@ func (*JSONGateConfigDiscovery) Scheme() string { return "vtgate" } func RegisterJsonDiscovery() { fmt.Printf("Registering: %v\n", *jsonDiscoveryConfig) - resolver.Register(&JSONGateConfigDiscovery{ + jsonDiscovery := &JSONGateConfigDiscovery{ JsonPath: *jsonDiscoveryConfig, - }) + } + resolver.Register(jsonDiscovery) + fmt.Printf("Registered %v scheme\n", jsonDiscovery.Scheme()) } type resolveFilters struct { @@ -106,41 +113,40 @@ type resolveJSONGateConfig struct { filters resolveFilters } -func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, error) { +func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) { config := []DiscoveryHost{} + fmt.Printf("Loading config %v\n", r.jsonPath) data, err := os.ReadFile(r.jsonPath) if err != nil { - return nil, err + return nil, nil, err } err = json.Unmarshal(data, &config) if err != nil { fmt.Printf("parse err: %v\n", err) - return nil, err + return nil, nil, err } - fmt.Printf("%v\n", config) - addrs := []resolver.Address{} for _, s := range config { // Apply filters if r.filters.gate_type != "" { if r.filters.gate_type != s.Type { - // fmt.Printf("Dropped non matching type: %v\n", s.Type) continue } } if r.filters.az_id != "" { if r.filters.az_id != s.AZId { - fmt.Printf("Dropped non matching az: %v\n", s.AZId) continue } } // Add matching hosts to registration list - fmt.Printf("selected host for discovery: %v %v\n", fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc), s) - addrs = append(addrs, resolver.Address{Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc)}) + addrs = append(addrs, resolver.Address{ + Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc), + BalancerAttributes: attributes.New("az", s.AZId), + }) } // Shuffle to ensure every host has a different order to iterate through @@ -153,9 +159,13 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, error) { addrs = addrs[0:r.num_connections] } - fmt.Printf("Returning discovery: %v\n", addrs) + h := sha256.New() + if _, err := io.Copy(h, bytes.NewReader(data)); err != nil { + return nil, nil, err + } - return &addrs, nil + fmt.Printf("Returning discovery: %d hosts checksum %x\n", len(addrs), h.Sum(nil)) + return &addrs, h.Sum(nil), nil } func (r *resolveJSONGateConfig) start() { @@ -163,7 +173,7 @@ func (r *resolveJSONGateConfig) start() { r.rand = rand.New(rand.NewSource(time.Now().UnixNano())) // Immediately load the initial config - addrs, err := r.loadConfig() + addrs, hash, err := r.loadConfig() if err == nil { // if we parse ok, populate the local address store r.cc.UpdateState(resolver.State{Addresses: *addrs}) @@ -175,33 +185,45 @@ func (r *resolveJSONGateConfig) start() { if err != nil { return } - lastLoaded := time.Now() - go func() { for range r.ticker.C { checkFileStat, err := os.Stat(r.jsonPath) + if err != nil { + fmt.Printf("Error stat'ing config %v\n", err) + continue + } isUnchanged := checkFileStat.Size() == fileStat.Size() || checkFileStat.ModTime() == fileStat.ModTime() - isNotExpired := time.Since(lastLoaded) < 1*time.Minute - if isUnchanged && isNotExpired { + if isUnchanged { // no change continue } - lastLoaded = time.Now() fileStat = checkFileStat fmt.Printf("Detected config change\n") - addrs, err := r.loadConfig() + addrs, newHash, err := r.loadConfig() if err != nil { // better luck next loop // TODO: log this - fmt.Print("oh no\n") + fmt.Print("Can't load config: %v\n", err) + continue + } + + // Make sure this wasn't a spurious change by checking the hash + if bytes.Compare(hash, newHash) == 0 && newHash != nil { + fmt.Printf("No content changed in discovery file... ignoring\n") continue } + hash = newHash + + fmt.Printf("Loaded %d hosts\n", len(*addrs)) r.cc.UpdateState(resolver.State{Addresses: *addrs}) } }() + + fmt.Printf("Loaded hosts, starting ticker\n") + } func (r *resolveJSONGateConfig) ResolveNow(o resolver.ResolveNowOptions) {} func (r *resolveJSONGateConfig) Close() { From 37cfc73d2cf1866790f319997e869b7c10273550 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Sat, 4 Nov 2023 02:52:19 -0700 Subject: [PATCH 2/8] first draft of az affinity --- go.mod | 20 ++--- go.sum | 22 ++++++ go/cmd/vtgateproxy/vtgateproxy.go | 12 +++ go/vt/vtgateproxy/discovery.go | 22 +++--- go/vt/vtgateproxy/gate_balancer.go | 115 +++++++++++++++++++++++++++++ go/vt/vtgateproxy/vtgateproxy.go | 39 +++++++--- 6 files changed, 199 insertions(+), 31 deletions(-) create mode 100644 go/vt/vtgateproxy/gate_balancer.go diff --git a/go.mod b/go.mod index f75f4ad2af7..90fbadeb3c9 100644 --- a/go.mod +++ b/go.mod @@ -91,21 +91,21 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 - golang.org/x/mod v0.5.1 // indirect - golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f + golang.org/x/mod v0.8.0 // indirect + golang.org/x/net v0.8.0 golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c - golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect - golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b - golang.org/x/text v0.3.7 + golang.org/x/sync v0.1.0 + golang.org/x/sys v0.6.0 // indirect + golang.org/x/term v0.6.0 + golang.org/x/text v0.8.0 golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac - golang.org/x/tools v0.1.9 + golang.org/x/tools v0.6.0 google.golang.org/api v0.45.0 google.golang.org/genproto v0.0.0-20210701191553-46259e63a0a9 // indirect - google.golang.org/grpc v1.45.0 + google.golang.org/grpc v1.48.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b - google.golang.org/protobuf v1.28.0 + google.golang.org/protobuf v1.28.1 gopkg.in/DataDog/dd-trace-go.v1 v1.17.0 gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect gopkg.in/gcfg.v1 v1.2.3 @@ -122,6 +122,8 @@ require ( require github.com/bndr/gotabulate v1.1.2 +require github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca // indirect + require ( cloud.google.com/go v0.81.0 // indirect github.com/BurntSushi/toml v0.3.1 // indirect diff --git a/go.sum b/go.sum index 4f42b4b9b69..bc4688388a2 100644 --- a/go.sum +++ b/go.sum @@ -151,6 +151,7 @@ github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnht github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codegangsta/cli v1.20.0/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA= @@ -201,6 +202,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -481,6 +483,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca h1:TJgLEmFwX/27nDuhpIfHMKw7++XIYsIOJrXcWdMxcoc= +github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca/go.mod h1:oXvXMzId9TWNmutG45QL9t6hVhR15Sr5v6X6nfPpy2E= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -840,6 +844,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= +golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -892,6 +898,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -920,6 +928,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -992,9 +1002,13 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= +golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1006,6 +1020,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1074,6 +1090,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1181,6 +1199,8 @@ google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= +google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= +google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b h1:D/GTYPo6I1oEo08Bfpuj3xl5XE+UGHj7//5fVyKxhsQ= @@ -1200,6 +1220,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/DataDog/dd-trace-go.v1 v1.17.0 h1:j9vAp9Re9bbtA/QFehkJpNba/6W2IbJtNuXZophCa54= gopkg.in/DataDog/dd-trace-go.v1 v1.17.0/go.mod h1:DVp8HmDh8PuTu2Z0fVVlBsyWaC++fzwVCaGWylTe3tg= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/go/cmd/vtgateproxy/vtgateproxy.go b/go/cmd/vtgateproxy/vtgateproxy.go index a2763e2a6c7..22d556b5a44 100644 --- a/go/cmd/vtgateproxy/vtgateproxy.go +++ b/go/cmd/vtgateproxy/vtgateproxy.go @@ -17,9 +17,13 @@ limitations under the License. package main import ( + "log" "math/rand" + "net" "time" + "google.golang.org/grpc" + "google.golang.org/grpc/channelz/service" "vitess.io/vitess/go/exit" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/vtgateproxy" @@ -38,6 +42,14 @@ func main() { servenv.ParseFlags("vtgateproxy") servenv.Init() + lis, err := net.Listen("tcp", "localhost:8153") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + service.RegisterChannelzServiceToServer(s) + go s.Serve(lis) + servenv.OnRun(func() { // Flags are parsed now. Parse the template using the actual flag value and overwrite the current template. vtgateproxy.RegisterJsonDiscovery() diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index e8e7c4970f8..b282abc9edb 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -113,6 +113,9 @@ type resolveJSONGateConfig struct { filters resolveFilters } +type discoverySlackAZ struct{} +type discoverySlackType struct{} + func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) { config := []DiscoveryHost{} fmt.Printf("Loading config %v\n", r.jsonPath) @@ -130,35 +133,29 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error addrs := []resolver.Address{} for _, s := range config { - // Apply filters + az := attributes.New(discoverySlackAZ{}, s.AZId).WithValue(discoverySlackType{}, s.Type) + + // Filter hosts to this gate type if r.filters.gate_type != "" { if r.filters.gate_type != s.Type { continue } } - if r.filters.az_id != "" { - if r.filters.az_id != s.AZId { - continue - } - } // Add matching hosts to registration list addrs = append(addrs, resolver.Address{ Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc), - BalancerAttributes: attributes.New("az", s.AZId), + BalancerAttributes: az, }) } + fmt.Printf("Addrs: %v\n", addrs) + // Shuffle to ensure every host has a different order to iterate through r.rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] }) - // Slice off the first N hosts, optionally - if r.num_connections > 0 && r.num_connections <= len(addrs) { - addrs = addrs[0:r.num_connections] - } - h := sha256.New() if _, err := io.Copy(h, bytes.NewReader(data)); err != nil { return nil, nil, err @@ -218,6 +215,7 @@ func (r *resolveJSONGateConfig) start() { hash = newHash fmt.Printf("Loaded %d hosts\n", len(*addrs)) + fmt.Printf("Loaded %v", addrs) r.cc.UpdateState(resolver.State{Addresses: *addrs}) } }() diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go new file mode 100644 index 00000000000..523e4c4c44b --- /dev/null +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -0,0 +1,115 @@ +package vtgateproxy + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" +) + +// Name is the name of az affinity balancer. +const Name = "slack_affinity_balancer" +const MetadataAZKey = "grpc-slack-az-metadata" +const MetadataGateTypeKey = "grpc-slack-gate-type-metadata" + +var logger = grpclog.Component("slack_affinity_balancer") + +func WithSlackAZAffinityContext(ctx context.Context, azID string, gateType string) context.Context { + ctx = metadata.AppendToOutgoingContext(ctx, MetadataAZKey, azID, MetadataGateTypeKey, gateType) + return ctx +} + +func newBuilder() balancer.Builder { + return base.NewBalancerBuilder(Name, &slackAZAffinityBalancer{}, base.Config{HealthCheck: true}) +} + +func init() { + balancer.Register(newBuilder()) +} + +type slackAZAffinityBalancer struct{} + +func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker { + logger.Infof("slackAZAffinityBalancer: Build called with info: %v", info) + fmt.Printf("Rebuilding picker\n") + + if len(info.ReadySCs) == 0 { + return base.NewErrPicker(balancer.ErrNoSubConnAvailable) + } + allSubConns := []balancer.SubConn{} + subConnsByAZ := map[string][]balancer.SubConn{} + + for sc := range info.ReadySCs { + subConnInfo, _ := info.ReadySCs[sc] + az := subConnInfo.Address.BalancerAttributes.Value(discoverySlackAZ{}).(string) + + allSubConns = append(allSubConns, sc) + subConnsByAZ[az] = append(subConnsByAZ[az], sc) + } + return &slackAZAffinityPicker{ + allSubConns: allSubConns, + subConnsByAZ: subConnsByAZ, + } +} + +type slackAZAffinityPicker struct { + // allSubConns is all subconns that were in the ready state when the picker was created + allSubConns []balancer.SubConn + subConnsByAZ map[string][]balancer.SubConn + nextByAZ sync.Map + next uint32 +} + +// Pick the next in the list from the list of subconns (RR) +func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) { + subConnsLen := uint32(len(scList)) + + if subConnsLen == 0 { + return balancer.PickResult{}, errors.New("No hosts in list") + } + + fmt.Printf("Select offset: %v %v %v\n", nextIndex, nextIndex%subConnsLen, len(scList)) + + sc := scList[nextIndex%subConnsLen] + return balancer.PickResult{SubConn: sc}, nil +} + +func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + hdrs, _ := metadata.FromOutgoingContext(info.Ctx) + fmt.Printf("Headers: %v %v\n", hdrs, info) + keys := hdrs.Get(MetadataAZKey) + if len(keys) < 1 { + fmt.Printf("uh oh - missing keys: %v %v %v\n", keys, hdrs, info.Ctx) + fmt.Printf("no header - pick from anywhere\n") + return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) + } + az := keys[0] + + if az == "" { + fmt.Printf("Header unset, pick from anywhere\n") + return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) + } + + fmt.Printf("Selecting from az: %v\n", az) + subConns := p.subConnsByAZ[az] + if len(subConns) == 0 { + fmt.Printf("No subconns in az and gate type, pick from anywhere\n") + return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) + } + val, _ := p.nextByAZ.LoadOrStore(az, new(uint32)) + ptr := val.(*uint32) + atomic.AddUint32(ptr, 1) + + if len(subConns) >= 2 { + fmt.Printf("Limiting to first 2\n") + return p.pickFromSubconns(subConns[0:2], *ptr) + } else { + return p.pickFromSubconns(subConns, *ptr) + } +} diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index be5d91d430b..2c565d67022 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -22,6 +22,7 @@ import ( "context" "flag" "io" + "net/url" "strings" "sync" "time" @@ -53,15 +54,27 @@ var ( type VTGateProxy struct { targetConns map[string]*vtgateconn.VTGateConn mu sync.Mutex + azID string + gateType string } func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) { + targetURL, err := url.Parse(target) + if err != nil { + return nil, err + } + + proxy.azID = targetURL.Query().Get("az_id") + proxy.gateType = targetURL.Host + + fmt.Printf("Getting connection for %v in %v\n", target, proxy.azID) + // If the connection exists, return it proxy.mu.Lock() - conn, _ := proxy.targetConns[target] - if conn != nil { + existingConn, _ := proxy.targetConns[target] + if existingConn != nil { proxy.mu.Unlock() - return conn, nil + return existingConn, nil } proxy.mu.Unlock() @@ -70,12 +83,11 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt // grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { // return append(opts, grpc.WithBlock()), nil // }) - grpcclient.RegisterGRPCDialOptions(func(opts []grpc.DialOption) ([]grpc.DialOption, error) { - return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`)), nil + return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"slack_affinity_balancer":{}}]}`)), nil }) - conn, err := vtgateconn.DialProtocol(ctx, "grpc", target) + conn, err := vtgateconn.DialProtocol(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType), "grpc", target) if err != nil { return nil, err } @@ -105,7 +117,7 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu // same effect as if a "rollback" statement was executed, but does not affect the query // statistics. func (proxy *VTGateProxy) CloseSession(ctx context.Context, session *vtgateconn.VTGateSession) error { - return session.CloseSession(ctx) + return session.CloseSession(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType)) } // ResolveTransaction resolves the specified 2PC transaction. @@ -119,7 +131,6 @@ func (proxy *VTGateProxy) Prepare(ctx context.Context, session *vtgateconn.VTGat } func (proxy *VTGateProxy) Execute(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable) (qr *sqltypes.Result, err error) { - // Intercept "use" statements since they just have to update the local session if strings.HasPrefix(sql, "use ") { targetString := sqlescape.UnescapeID(sql[4:]) @@ -127,11 +138,19 @@ func (proxy *VTGateProxy) Execute(ctx context.Context, session *vtgateconn.VTGat return &sqltypes.Result{}, nil } - return session.Execute(ctx, sql, bindVariables) + t := time.Now() + qr, err = session.Execute(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType), sql, bindVariables) + logSql := sql + if len(logSql) > 40 { + logSql = logSql[:40] + } + + fmt.Printf("Execute %s [%s]\n", logSql, time.Since(t)) + return qr, err } func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn.VTGateSession, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) error { - stream, err := session.StreamExecute(ctx, sql, bindVariables) + stream, err := session.StreamExecute(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType), sql, bindVariables) if err != nil { return err } From d61ee1d2401a75f8ca92d4f47e483ca847e1a147 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Tue, 7 Nov 2023 06:17:54 -0800 Subject: [PATCH 3/8] Restore git deps --- go.mod | 20 +++++++++----------- go.sum | 22 ---------------------- 2 files changed, 9 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index 90fbadeb3c9..f75f4ad2af7 100644 --- a/go.mod +++ b/go.mod @@ -91,21 +91,21 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 - golang.org/x/mod v0.8.0 // indirect - golang.org/x/net v0.8.0 + golang.org/x/mod v0.5.1 // indirect + golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f - golang.org/x/sync v0.1.0 - golang.org/x/sys v0.6.0 // indirect - golang.org/x/term v0.6.0 - golang.org/x/text v0.8.0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect + golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b + golang.org/x/text v0.3.7 golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac - golang.org/x/tools v0.6.0 + golang.org/x/tools v0.1.9 google.golang.org/api v0.45.0 google.golang.org/genproto v0.0.0-20210701191553-46259e63a0a9 // indirect - google.golang.org/grpc v1.48.0 + google.golang.org/grpc v1.45.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b - google.golang.org/protobuf v1.28.1 + google.golang.org/protobuf v1.28.0 gopkg.in/DataDog/dd-trace-go.v1 v1.17.0 gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect gopkg.in/gcfg.v1 v1.2.3 @@ -122,8 +122,6 @@ require ( require github.com/bndr/gotabulate v1.1.2 -require github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca // indirect - require ( cloud.google.com/go v0.81.0 // indirect github.com/BurntSushi/toml v0.3.1 // indirect diff --git a/go.sum b/go.sum index bc4688388a2..4f42b4b9b69 100644 --- a/go.sum +++ b/go.sum @@ -151,7 +151,6 @@ github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnht github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= github.com/codegangsta/cli v1.20.0/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA= @@ -202,7 +201,6 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= -github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -483,8 +481,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= -github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca h1:TJgLEmFwX/27nDuhpIfHMKw7++XIYsIOJrXcWdMxcoc= -github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca/go.mod h1:oXvXMzId9TWNmutG45QL9t6hVhR15Sr5v6X6nfPpy2E= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -844,8 +840,6 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -898,8 +892,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -928,8 +920,6 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1002,13 +992,9 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= -golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1020,8 +1006,6 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= -golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1090,8 +1074,6 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= -golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1199,8 +1181,6 @@ google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= -google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= -google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b h1:D/GTYPo6I1oEo08Bfpuj3xl5XE+UGHj7//5fVyKxhsQ= @@ -1220,8 +1200,6 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/DataDog/dd-trace-go.v1 v1.17.0 h1:j9vAp9Re9bbtA/QFehkJpNba/6W2IbJtNuXZophCa54= gopkg.in/DataDog/dd-trace-go.v1 v1.17.0/go.mod h1:DVp8HmDh8PuTu2Z0fVVlBsyWaC++fzwVCaGWylTe3tg= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= From 7e292fad580bb6433c92acd3c2be4c4bf7c3c5cf Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Tue, 7 Nov 2023 06:23:36 -0800 Subject: [PATCH 4/8] Honor num connections --- go/vt/vtgateproxy/gate_balancer.go | 26 +++++++++++++++----------- go/vt/vtgateproxy/vtgateproxy.go | 12 +++++++----- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index 523e4c4c44b..494df3b0578 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "strconv" "sync" "sync/atomic" @@ -16,12 +17,12 @@ import ( // Name is the name of az affinity balancer. const Name = "slack_affinity_balancer" const MetadataAZKey = "grpc-slack-az-metadata" -const MetadataGateTypeKey = "grpc-slack-gate-type-metadata" +const MetadataHostAffinityCount = "grpc-slack-num-connections-metadata" var logger = grpclog.Component("slack_affinity_balancer") -func WithSlackAZAffinityContext(ctx context.Context, azID string, gateType string) context.Context { - ctx = metadata.AppendToOutgoingContext(ctx, MetadataAZKey, azID, MetadataGateTypeKey, gateType) +func WithSlackAZAffinityContext(ctx context.Context, azID string, numConnections string) context.Context { + ctx = metadata.AppendToOutgoingContext(ctx, MetadataAZKey, azID, MetadataHostAffinityCount, numConnections) return ctx } @@ -82,21 +83,24 @@ func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, next func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { hdrs, _ := metadata.FromOutgoingContext(info.Ctx) - fmt.Printf("Headers: %v %v\n", hdrs, info) + numConnections := 0 keys := hdrs.Get(MetadataAZKey) if len(keys) < 1 { - fmt.Printf("uh oh - missing keys: %v %v %v\n", keys, hdrs, info.Ctx) - fmt.Printf("no header - pick from anywhere\n") return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) } az := keys[0] if az == "" { - fmt.Printf("Header unset, pick from anywhere\n") return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1)) } - fmt.Printf("Selecting from az: %v\n", az) + keys = hdrs.Get(MetadataHostAffinityCount) + if len(keys) > 0 { + if i, err := strconv.Atoi(keys[0]); err != nil { + numConnections = i + } + } + subConns := p.subConnsByAZ[az] if len(subConns) == 0 { fmt.Printf("No subconns in az and gate type, pick from anywhere\n") @@ -106,9 +110,9 @@ func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResul ptr := val.(*uint32) atomic.AddUint32(ptr, 1) - if len(subConns) >= 2 { - fmt.Printf("Limiting to first 2\n") - return p.pickFromSubconns(subConns[0:2], *ptr) + if len(subConns) >= numConnections { + fmt.Printf("Limiting to first %v\n", numConnections) + return p.pickFromSubconns(subConns[0:numConnections], *ptr) } else { return p.pickFromSubconns(subConns, *ptr) } diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 2c565d67022..997e3d419f6 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -52,10 +52,11 @@ var ( ) type VTGateProxy struct { - targetConns map[string]*vtgateconn.VTGateConn - mu sync.Mutex - azID string - gateType string + targetConns map[string]*vtgateconn.VTGateConn + mu sync.Mutex + azID string + gateType string + numConnections string } func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vtgateconn.VTGateConn, error) { @@ -65,6 +66,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt } proxy.azID = targetURL.Query().Get("az_id") + proxy.numConnections = targetURL.Query().Get("num_connections") proxy.gateType = targetURL.Host fmt.Printf("Getting connection for %v in %v\n", target, proxy.azID) @@ -87,7 +89,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt return append(opts, grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"slack_affinity_balancer":{}}]}`)), nil }) - conn, err := vtgateconn.DialProtocol(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.gateType), "grpc", target) + conn, err := vtgateconn.DialProtocol(WithSlackAZAffinityContext(ctx, proxy.azID, proxy.numConnections), "grpc", target) if err != nil { return nil, err } From d2e999adf733487cf3d7c558f49128dabf987004 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Tue, 7 Nov 2023 07:17:19 -0800 Subject: [PATCH 5/8] fix bugs --- go/vt/vtgateproxy/gate_balancer.go | 2 +- go/vt/vtgateproxy/vtgateproxy.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgateproxy/gate_balancer.go b/go/vt/vtgateproxy/gate_balancer.go index 494df3b0578..77f8de98c19 100644 --- a/go/vt/vtgateproxy/gate_balancer.go +++ b/go/vt/vtgateproxy/gate_balancer.go @@ -110,7 +110,7 @@ func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResul ptr := val.(*uint32) atomic.AddUint32(ptr, 1) - if len(subConns) >= numConnections { + if len(subConns) >= numConnections && numConnections > 0 { fmt.Printf("Limiting to first %v\n", numConnections) return p.pickFromSubconns(subConns[0:numConnections], *ptr) } else { diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 997e3d419f6..315921ecc00 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -69,7 +69,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt proxy.numConnections = targetURL.Query().Get("num_connections") proxy.gateType = targetURL.Host - fmt.Printf("Getting connection for %v in %v\n", target, proxy.azID) + fmt.Printf("Getting connection for %v in %v with %v connections\n", target, proxy.azID, proxy.numConnections) // If the connection exists, return it proxy.mu.Lock() From 5f83b13d35f75a94a81bf28ed5e27d29675f28cf Mon Sep 17 00:00:00 2001 From: Brian Ramos Date: Fri, 8 Mar 2024 11:43:50 -0800 Subject: [PATCH 6/8] Empty-Commit From 7a9cb6deb8584a1443262181b6318d9b8f579556 Mon Sep 17 00:00:00 2001 From: Jamie Scheinblum Date: Fri, 8 Mar 2024 12:40:35 -0800 Subject: [PATCH 7/8] Update vtgateproxy.go Add fmt import Signed-off-by: Jamie Scheinblum --- go/vt/vtgateproxy/vtgateproxy.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index 315921ecc00..08b19f8c256 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -21,6 +21,7 @@ package vtgateproxy import ( "context" "flag" + "fmt" "io" "net/url" "strings" From ead1d6626d90c7d84af22fd9d39f11e370e3ea81 Mon Sep 17 00:00:00 2001 From: Brian Ramos Date: Fri, 8 Mar 2024 15:01:42 -0800 Subject: [PATCH 8/8] Another empty commit to get this build to pass