Skip to content

Commit

Permalink
first draft of az affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheinblum authored and brirams committed Mar 8, 2024
1 parent 439252f commit 37cfc73
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 31 deletions.
20 changes: 11 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,21 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
golang.org/x/mod v0.5.1 // indirect
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b
golang.org/x/text v0.3.7
golang.org/x/sync v0.1.0
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0
golang.org/x/text v0.8.0
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
golang.org/x/tools v0.1.9
golang.org/x/tools v0.6.0
google.golang.org/api v0.45.0
google.golang.org/genproto v0.0.0-20210701191553-46259e63a0a9 // indirect
google.golang.org/grpc v1.45.0
google.golang.org/grpc v1.48.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0
google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b
google.golang.org/protobuf v1.28.0
google.golang.org/protobuf v1.28.1
gopkg.in/DataDog/dd-trace-go.v1 v1.17.0
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
gopkg.in/gcfg.v1 v1.2.3
Expand All @@ -122,6 +122,8 @@ require (

require github.com/bndr/gotabulate v1.1.2

require github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca // indirect

require (
cloud.google.com/go v0.81.0 // indirect
github.com/BurntSushi/toml v0.3.1 // indirect
Expand Down
22 changes: 22 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnht
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codegangsta/cli v1.20.0/go.mod h1:/qJNoX69yVSKu5o4jLyXAENLRyk1uhi7zkbQ3slBdOA=
Expand Down Expand Up @@ -201,6 +202,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0=
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
Expand Down Expand Up @@ -481,6 +483,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca h1:TJgLEmFwX/27nDuhpIfHMKw7++XIYsIOJrXcWdMxcoc=
github.com/kazegusuri/channelzcli v0.0.0-20230307031351-17bac34c51ca/go.mod h1:oXvXMzId9TWNmutG45QL9t6hVhR15Sr5v6X6nfPpy2E=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
Expand Down Expand Up @@ -840,6 +844,8 @@ golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -892,6 +898,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -920,6 +928,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -992,9 +1002,13 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand All @@ -1006,6 +1020,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand Down Expand Up @@ -1074,6 +1090,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8=
golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -1181,6 +1199,8 @@ google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M=
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w=
google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/grpc/examples v0.0.0-20210430044426-28078834f35b h1:D/GTYPo6I1oEo08Bfpuj3xl5XE+UGHj7//5fVyKxhsQ=
Expand All @@ -1200,6 +1220,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/DataDog/dd-trace-go.v1 v1.17.0 h1:j9vAp9Re9bbtA/QFehkJpNba/6W2IbJtNuXZophCa54=
gopkg.in/DataDog/dd-trace-go.v1 v1.17.0/go.mod h1:DVp8HmDh8PuTu2Z0fVVlBsyWaC++fzwVCaGWylTe3tg=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down
12 changes: 12 additions & 0 deletions go/cmd/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package main

import (
"log"
"math/rand"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/channelz/service"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vtgateproxy"
Expand All @@ -38,6 +42,14 @@ func main() {
servenv.ParseFlags("vtgateproxy")
servenv.Init()

lis, err := net.Listen("tcp", "localhost:8153")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
service.RegisterChannelzServiceToServer(s)
go s.Serve(lis)

servenv.OnRun(func() {
// Flags are parsed now. Parse the template using the actual flag value and overwrite the current template.
vtgateproxy.RegisterJsonDiscovery()
Expand Down
22 changes: 10 additions & 12 deletions go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ type resolveJSONGateConfig struct {
filters resolveFilters
}

type discoverySlackAZ struct{}
type discoverySlackType struct{}

func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error) {
config := []DiscoveryHost{}
fmt.Printf("Loading config %v\n", r.jsonPath)
Expand All @@ -130,35 +133,29 @@ func (r *resolveJSONGateConfig) loadConfig() (*[]resolver.Address, []byte, error

addrs := []resolver.Address{}
for _, s := range config {
// Apply filters
az := attributes.New(discoverySlackAZ{}, s.AZId).WithValue(discoverySlackType{}, s.Type)

// Filter hosts to this gate type
if r.filters.gate_type != "" {
if r.filters.gate_type != s.Type {
continue
}
}

if r.filters.az_id != "" {
if r.filters.az_id != s.AZId {
continue
}
}
// Add matching hosts to registration list
addrs = append(addrs, resolver.Address{
Addr: fmt.Sprintf("%s:%s", s.NebulaAddress, s.Grpc),
BalancerAttributes: attributes.New("az", s.AZId),
BalancerAttributes: az,
})
}

fmt.Printf("Addrs: %v\n", addrs)

// Shuffle to ensure every host has a different order to iterate through
r.rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})

// Slice off the first N hosts, optionally
if r.num_connections > 0 && r.num_connections <= len(addrs) {
addrs = addrs[0:r.num_connections]
}

h := sha256.New()
if _, err := io.Copy(h, bytes.NewReader(data)); err != nil {
return nil, nil, err
Expand Down Expand Up @@ -218,6 +215,7 @@ func (r *resolveJSONGateConfig) start() {
hash = newHash

fmt.Printf("Loaded %d hosts\n", len(*addrs))
fmt.Printf("Loaded %v", addrs)
r.cc.UpdateState(resolver.State{Addresses: *addrs})
}
}()
Expand Down
115 changes: 115 additions & 0 deletions go/vt/vtgateproxy/gate_balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package vtgateproxy

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
)

// Name is the name of az affinity balancer.
const Name = "slack_affinity_balancer"
const MetadataAZKey = "grpc-slack-az-metadata"
const MetadataGateTypeKey = "grpc-slack-gate-type-metadata"

var logger = grpclog.Component("slack_affinity_balancer")

func WithSlackAZAffinityContext(ctx context.Context, azID string, gateType string) context.Context {
ctx = metadata.AppendToOutgoingContext(ctx, MetadataAZKey, azID, MetadataGateTypeKey, gateType)
return ctx
}

func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &slackAZAffinityBalancer{}, base.Config{HealthCheck: true})
}

func init() {
balancer.Register(newBuilder())
}

type slackAZAffinityBalancer struct{}

func (*slackAZAffinityBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("slackAZAffinityBalancer: Build called with info: %v", info)
fmt.Printf("Rebuilding picker\n")

if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
allSubConns := []balancer.SubConn{}
subConnsByAZ := map[string][]balancer.SubConn{}

for sc := range info.ReadySCs {
subConnInfo, _ := info.ReadySCs[sc]
az := subConnInfo.Address.BalancerAttributes.Value(discoverySlackAZ{}).(string)

allSubConns = append(allSubConns, sc)
subConnsByAZ[az] = append(subConnsByAZ[az], sc)
}
return &slackAZAffinityPicker{
allSubConns: allSubConns,
subConnsByAZ: subConnsByAZ,
}
}

type slackAZAffinityPicker struct {
// allSubConns is all subconns that were in the ready state when the picker was created
allSubConns []balancer.SubConn
subConnsByAZ map[string][]balancer.SubConn
nextByAZ sync.Map
next uint32
}

// Pick the next in the list from the list of subconns (RR)
func (p *slackAZAffinityPicker) pickFromSubconns(scList []balancer.SubConn, nextIndex uint32) (balancer.PickResult, error) {
subConnsLen := uint32(len(scList))

if subConnsLen == 0 {
return balancer.PickResult{}, errors.New("No hosts in list")
}

fmt.Printf("Select offset: %v %v %v\n", nextIndex, nextIndex%subConnsLen, len(scList))

sc := scList[nextIndex%subConnsLen]
return balancer.PickResult{SubConn: sc}, nil
}

func (p *slackAZAffinityPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
hdrs, _ := metadata.FromOutgoingContext(info.Ctx)
fmt.Printf("Headers: %v %v\n", hdrs, info)
keys := hdrs.Get(MetadataAZKey)
if len(keys) < 1 {
fmt.Printf("uh oh - missing keys: %v %v %v\n", keys, hdrs, info.Ctx)
fmt.Printf("no header - pick from anywhere\n")
return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1))
}
az := keys[0]

if az == "" {
fmt.Printf("Header unset, pick from anywhere\n")
return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1))
}

fmt.Printf("Selecting from az: %v\n", az)
subConns := p.subConnsByAZ[az]
if len(subConns) == 0 {
fmt.Printf("No subconns in az and gate type, pick from anywhere\n")
return p.pickFromSubconns(p.allSubConns, atomic.AddUint32(&p.next, 1))
}
val, _ := p.nextByAZ.LoadOrStore(az, new(uint32))
ptr := val.(*uint32)
atomic.AddUint32(ptr, 1)

if len(subConns) >= 2 {
fmt.Printf("Limiting to first 2\n")
return p.pickFromSubconns(subConns[0:2], *ptr)
} else {
return p.pickFromSubconns(subConns, *ptr)
}
}
Loading

0 comments on commit 37cfc73

Please sign in to comment.