diff --git a/go.mod b/go.mod index b68fa2736bb..e79abbe3698 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/evanphx/json-patch v4.9.0+incompatible github.com/fsnotify/fsnotify v1.4.9 github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab - github.com/go-sql-driver/mysql v1.6.0 + github.com/go-sql-driver/mysql v1.8.1 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/mock v1.5.0 github.com/golang/protobuf v1.5.2 @@ -123,6 +123,7 @@ require ( require github.com/bndr/gotabulate v1.1.2 require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/go-chi/chi/v5 v5.0.7 // indirect github.com/rantav/go-grpc-channelz v0.0.4 // indirect ) diff --git a/go.sum b/go.sum index 9bd488fda9b..ba8abd98eb9 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/AdaLogics/go-fuzz-headers v0.0.0-20211102141018-f7be0cbad29c h1:9K6I0yCgGSneuHCoIlJl0O09UjqqWduCwd+ZL1nHFWc= github.com/AdaLogics/go-fuzz-headers v0.0.0-20211102141018-f7be0cbad29c/go.mod h1:WpB7kf89yJUETZxQnP1kgYPNwlT2jjdDYUCoxVggM3g= github.com/Azure/azure-pipeline-go v0.2.2 h1:6oiIS9yaG6XCCzhgAgKFfIWyo4LLCiDhZot6ltoThhY= @@ -282,6 +284,8 @@ github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85n github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= diff --git a/go/test/endtoend/vtgateproxytest/failure_test.go b/go/test/endtoend/vtgateproxytest/failure_test.go new file mode 100644 index 00000000000..27c50baef6a --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/failure_test.go @@ -0,0 +1,235 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyVtgateFailureRoundRobin(t *testing.T) { + defer cluster.PanicHandler(t) + + // insert test value + func() { + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + utils.Exec(t, conn, "insert into customer(id, email) values(1, 'email1')") + }() + + const targetAffinity = "use1-az1" + const vtgateCount = 4 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + var config []map[string]string + + for i, vtgate := range vtgates { + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgate.GrpcPort), + "az_id": targetAffinity, + "type": "pool1", + }) + } + + b, err := json.Marshal(config) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + targetAffinity, + "round_robin", + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn("pool1", targetAffinity) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Stopping 1 vtgate") + err = vtgates[0].TearDown() + if err != nil { + t.Fatal(err) + } + + log.Info("Reading test value with one stopped vtgate") + for i := 0; i < vtgateproxyConnections; i++ { + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } +} + +func TestVtgateProxyVtgateFailureFirstReady(t *testing.T) { + defer cluster.PanicHandler(t) + + // insert test value + func() { + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + utils.Exec(t, conn, "insert into customer(id, email) values(1, 'email1')") + }() + + const targetAffinity = "use1-az1" + const vtgateCount = 4 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + var config []map[string]string + + for i, vtgate := range vtgates { + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgate.GrpcPort), + "az_id": targetAffinity, + "type": "pool1", + }) + } + + b, err := json.Marshal(config) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + targetAffinity, + "first_ready", + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn("pool1", targetAffinity) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + // First send some queries to the active vtgate + for i := 0; i < 10; i++ { + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } + + // Now kill the active vtgate + for i := range vtgates { + queryCount, err := getVtgateQueryCount(vtgates[i]) + if err != nil { + t.Fatal(err) + } + + if queryCount.Sum() > 0 { + err = vtgates[i].TearDown() + if err != nil { + t.Fatal(err) + } + } + } + + log.Info("Reading test value after killing the active vtgate") + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) +} diff --git a/go/test/endtoend/vtgateproxytest/main_test.go b/go/test/endtoend/vtgateproxytest/main_test.go new file mode 100644 index 00000000000..bc470085dd0 --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/main_test.go @@ -0,0 +1,461 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgateproxytest + +import ( + "context" + "database/sql" + "encoding/json" + "flag" + "fmt" + "io" + "net" + "net/http" + "os" + "os/exec" + "path" + "reflect" + "strconv" + "strings" + "syscall" + "testing" + "time" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +var ( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "commerce" + cell = "zone1" + sqlSchema = `create table product( + sku varbinary(128), + description varbinary(128), + price bigint, + primary key(sku) + ) ENGINE=InnoDB; + create table customer( + id bigint not null auto_increment, + email varchar(128), + primary key(id) + ) ENGINE=InnoDB; + create table corder( + order_id bigint not null auto_increment, + customer_id bigint, + sku varbinary(128), + price bigint, + primary key(order_id) + ) ENGINE=InnoDB;` + + vSchema = `{ + "tables": { + "product": {}, + "customer": {}, + "corder": {} + } + }` +) + +func TestMain(m *testing.M) { + defer cluster.PanicHandler(nil) + flag.Parse() + + exitCode := func() int { + clusterInstance = cluster.NewCluster(cell, "localhost") + defer clusterInstance.Teardown() + + // Start topo server + err := clusterInstance.StartTopo() + if err != nil { + return 1 + } + + // Start keyspace + keyspace := &cluster.Keyspace{ + Name: keyspaceName, + SchemaSQL: sqlSchema, + VSchema: vSchema, + } + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, true) + if err != nil { + return 1 + } + + // Start vtgate + err = clusterInstance.StartVtgate() + if err != nil { + return 1 + } + vtParams = mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + } + return m.Run() + }() + os.Exit(exitCode) +} + +func selectHelper[T any](ctx context.Context, conn *sql.DB, query string) ([]T, error) { + var result []T + + rows, err := conn.QueryContext(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var row T + v := reflect.ValueOf(&row).Elem() + + var fields []any + for i := 0; i < v.NumField(); i++ { + if v.Field(i).CanAddr() { + fields = append(fields, v.Field(i).Addr().Interface()) + } + } + + err = rows.Scan(fields...) + if err != nil { + return nil, err + } + + result = append(result, row) + } + + return result, nil +} + +type customerEntry struct { + ID int + Email string +} + +func NewVtgateProxyProcess(logDir, vtgateHostsFile, affinity, balancerType string, numConnections, httpPort, grpcPort, mySQLPort int) *VtgateProxyProcess { + return &VtgateProxyProcess{ + Name: "vtgateproxy", + Binary: "vtgateproxy", + LogDir: logDir, + VtgateHostsFile: vtgateHostsFile, + BalancerType: balancerType, + AddressField: "address", + PortField: "grpc", + PoolTypeField: "type", + AffinityField: "az_id", + AffinityValue: affinity, + NumConnections: numConnections, + HTTPPort: httpPort, + GrpcPort: grpcPort, + MySQLPort: mySQLPort, + VerifyURL: "http://" + net.JoinHostPort("localhost", strconv.Itoa(httpPort)) + "/debug/vars", + } +} + +type VtgateProxyProcess struct { + Name string + Binary string + VtgateHostsFile string + BalancerType string + AddressField string + PortField string + PoolTypeField string + AffinityField string + AffinityValue string + LogDir string + NumConnections int + HTTPPort int + GrpcPort int + MySQLPort int + ExtraArgs []string + VerifyURL string + + proc *exec.Cmd + exit chan error +} + +func (vt *VtgateProxyProcess) Setup() error { + args := []string{ + "--port", strconv.Itoa(vt.HTTPPort), + "--grpc_port", strconv.Itoa(vt.GrpcPort), + "--mysql_server_port", strconv.Itoa(vt.MySQLPort), + "--vtgate_hosts_file", vt.VtgateHostsFile, + "--balancer", vt.BalancerType, + "--address_field", vt.AddressField, + "--port_field", vt.PortField, + "--pool_type_field", vt.PoolTypeField, + "--affinity_field", vt.AffinityField, + "--affinity_value", vt.AffinityValue, + "--num_connections", strconv.Itoa(vt.NumConnections), + "--log_dir", vt.LogDir, + "--v", "999", + "--mysql_auth_server_impl", "none", + "--alsologtostderr", + "--grpc_prometheus", + "--vtgate_grpc_fail_fast", + } + args = append(args, vt.ExtraArgs...) + + vt.proc = exec.Command( + vt.Binary, + args..., + ) + vt.proc.Env = append(vt.proc.Env, os.Environ()...) + //errFile, _ := os.Create(path.Join(vt.LogDir, "vtgateproxy-stderr.txt")) + //vt.proc.Stderr = errFile + vt.proc.Stderr = os.Stderr + vt.proc.Stdout = os.Stdout + + log.Infof("Running vtgateproxy with command: %v", strings.Join(vt.proc.Args, " ")) + + err := vt.proc.Start() + if err != nil { + return err + } + vt.exit = make(chan error) + go func() { + if vt.proc != nil { + vt.exit <- vt.proc.Wait() + } + }() + + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + if vt.WaitForStatus() { + return nil + } + select { + case err := <-vt.exit: + return fmt.Errorf("process '%s' exited prematurely (err: %s)", vt.Name, err) + default: + time.Sleep(300 * time.Millisecond) + } + } + + return fmt.Errorf("process '%s' timed out after 60s (err: %s)", vt.Name, <-vt.exit) +} + +// WaitForStatus function checks if vtgateproxy process is up and running +func (vt *VtgateProxyProcess) WaitForStatus() bool { + resp, err := http.Get(vt.VerifyURL) + if err != nil { + return false + } + defer resp.Body.Close() + + return resp.StatusCode == 200 +} + +func (vt *VtgateProxyProcess) Teardown() error { + if err := vt.proc.Process.Kill(); err != nil { + log.Errorf("Failed to kill %v: %v", vt.Name, err) + } + if vt.proc == nil || vt.exit == nil { + return nil + } + vt.proc.Process.Signal(syscall.SIGTERM) + + select { + case <-vt.exit: + vt.proc = nil + return nil + + case <-time.After(30 * time.Second): + vt.proc.Process.Kill() + vt.proc = nil + return <-vt.exit + } +} + +func (vt *VtgateProxyProcess) GetMySQLConn(poolType, affinity string) (*sql.DB, error) { + // Use the go mysql driver since the vitess mysql client does not support + // connectionAttributes. + dsn := fmt.Sprintf("tcp(%v)/ks?connectionAttributes=type:%v,az_id:%v", net.JoinHostPort(clusterInstance.Hostname, strconv.Itoa(vt.MySQLPort)), poolType, affinity) + log.Infof("Using DSN %v", dsn) + + return sql.Open("mysql", dsn) +} + +// WaitForConfig waits until the proxy targets match the config sent to it. +func (vt *VtgateProxyProcess) WaitForConfig(config []map[string]string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + timer := time.NewTicker(100 * time.Millisecond) + defer timer.Stop() + + expect := map[string]int{} + result := map[string]int{} + for _, target := range config { + expect[target["type"]]++ + if expect[target["type"]] > vt.NumConnections { + expect[target["type"]] = vt.NumConnections + } + } + +OUTER: + for { + select { + case <-ctx.Done(): + return fmt.Errorf("targets never updated to match config: %v != %v", result, expect) + case <-timer.C: + } + + result = map[string]int{} + + vars, err := vt.GetVars() + if err != nil { + return err + } + + targets, ok := vars["JsonDiscoveryTargetCount"] + if !ok { + continue OUTER + } + + for k, v := range targets.(map[string]any) { + result[k] = int(v.(float64)) + } + + if len(result) != len(expect) { + continue OUTER + } + + for k, v := range expect { + if result[k] != v { + continue OUTER + } + } + + break OUTER + } + + return nil +} + +// GetVars returns map of vars +func (vt *VtgateProxyProcess) GetVars() (map[string]any, error) { + resultMap := make(map[string]any) + resp, err := http.Get(vt.VerifyURL) + if err != nil { + return nil, fmt.Errorf("error getting response from %s", vt.VerifyURL) + } + defer resp.Body.Close() + + if resp.StatusCode == 200 { + respByte, _ := io.ReadAll(resp.Body) + err := json.Unmarshal(respByte, &resultMap) + if err != nil { + return nil, fmt.Errorf("not able to parse response body") + } + return resultMap, nil + } + return nil, fmt.Errorf("unsuccessful response") +} + +func startAdditionalVtgates(count int) ([]*cluster.VtgateProcess, error) { + var vtgates []*cluster.VtgateProcess + var err error + defer func() { + if err != nil { + teardownVtgates(vtgates) + } + }() + + for i := 0; i < count; i++ { + vtgateInstance := newVtgateInstance(i) + log.Infof("Starting additional vtgate on port %d", vtgateInstance.Port) + if err = vtgateInstance.Setup(); err != nil { + return nil, err + } + + vtgates = append(vtgates, vtgateInstance) + } + + return vtgates, nil +} + +func newVtgateInstance(i int) *cluster.VtgateProcess { + vtgateProcInstance := cluster.VtgateProcessInstance( + clusterInstance.GetAndReservePort(), + clusterInstance.GetAndReservePort(), + clusterInstance.GetAndReservePort(), + clusterInstance.Cell, + clusterInstance.Cell, + clusterInstance.Hostname, + "PRIMARY,REPLICA", + clusterInstance.TopoProcess.Port, + clusterInstance.TmpDirectory, + clusterInstance.VtGateExtraArgs, + clusterInstance.VtGatePlannerVersion, + ) + vtgateProcInstance.MySQLServerSocketPath = path.Join(clusterInstance.TmpDirectory, fmt.Sprintf("mysql%v.sock", i)) + + return vtgateProcInstance +} + +func teardownVtgates(vtgates []*cluster.VtgateProcess) error { + var err error + for _, vtgate := range vtgates { + if vErr := vtgate.TearDown(); vErr != nil { + err = vErr + } + } + + return err +} + +func getVtgateQueryCount(vtgate *cluster.VtgateProcess) (queryCount, error) { + var result queryCount + + vars, err := vtgate.GetVars() + if err != nil { + return result, err + } + + queriesProcessed, ok := vars["QueriesProcessed"] + if !ok { + return result, nil + } + + v := reflect.ValueOf(&result).Elem() + + for k, val := range queriesProcessed.(map[string]any) { + v.FieldByName(k).SetInt(int64(val.(float64))) + } + + return result, err +} + +type queryCount struct { + Begin int + Commit int + Unsharded int + InsertUnsharded int +} + +func (q queryCount) Sum() int { + var result int + v := reflect.ValueOf(q) + for i := 0; i < v.NumField(); i++ { + result += int(v.Field(i).Int()) + } + + return result +} diff --git a/go/test/endtoend/vtgateproxytest/rebalance_test.go b/go/test/endtoend/vtgateproxytest/rebalance_test.go new file mode 100644 index 00000000000..6f3e838bb9b --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/rebalance_test.go @@ -0,0 +1,215 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyRebalanceRoundRobin(t *testing.T) { + testVtgateProxyRebalance(t, "round_robin") +} + +func TestVtgateProxyRebalanceFirstReady(t *testing.T) { + testVtgateProxyRebalance(t, "first_ready") +} + +func testVtgateProxyRebalance(t *testing.T, loadBalancer string) { + defer cluster.PanicHandler(t) + + const targetAffinity = "use1-az1" + const targetPool = "pool1" + const vtgateCount = 10 + const vtgatesInAffinity = 8 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + var config []map[string]string + + for i, vtgate := range vtgates { + affinity := targetAffinity + if i >= vtgatesInAffinity { + affinity = "use1-az2" + } + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgate.GrpcPort), + "az_id": affinity, + "type": targetPool, + }) + } + + vtgateIdx := vtgateproxyConnections + b, err := json.Marshal(config[:vtgateIdx]) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + targetAffinity, + loadBalancer, + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn(targetPool, targetAffinity) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Inserting test value") + tx, err := conn.BeginTx(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + _, err = tx.Exec("insert into customer(id, email) values(1, 'email1')") + if err != nil { + t.Fatal(err) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + log.Info("Reading test value while adding vtgates") + + const totalQueries = 1000 + addVtgateEveryN := totalQueries / len(vtgates) + + for i := 0; i < totalQueries; i++ { + if i%(addVtgateEveryN) == 0 && vtgateIdx <= len(vtgates) { + log.Infof("Adding vtgate %v", vtgateIdx-1) + b, err = json.Marshal(config[:vtgateIdx]) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + if err := vtgateproxyProcInstance.WaitForConfig(config[:vtgateIdx], 5*time.Second); err != nil { + t.Fatal(err) + } + + vtgateIdx++ + } + + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } + + // No queries should be sent to vtgates outside target affinity + const expectMaxQueryCountNonAffinity = 0 + + switch loadBalancer { + case "round_robin": + // At least 1 query should be sent to every vtgate matching target + // affinity + const expectMinQueryCountAffinity = 1 + + for i, vtgate := range vtgates { + queryCount, err := getVtgateQueryCount(vtgate) + if err != nil { + t.Fatal(err) + } + + affinity := config[i]["az_id"] + + log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount) + + if affinity == targetAffinity { + assert.GreaterOrEqual(t, queryCount.Sum(), expectMinQueryCountAffinity, "vtgate %v did not recieve the expected number of queries", i) + } else { + assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i) + } + } + case "first_ready": + // A single vtgate should become the target, and it should recieve all + // queries + targetVtgate := -1 + + for i, vtgate := range vtgates { + queryCount, err := getVtgateQueryCount(vtgate) + if err != nil { + t.Fatal(err) + } + + affinity := config[i]["az_id"] + + log.Infof("vtgate %v (%v) query counts: %+v", i, affinity, queryCount) + + sum := queryCount.Sum() + if sum == 0 { + continue + } + + if targetVtgate != -1 { + t.Logf("only vtgate %v should have received queries; vtgate %v got %v", targetVtgate, i, sum) + t.Fail() + } else if affinity == targetAffinity { + targetVtgate = i + } else { + assert.LessOrEqual(t, queryCount.Sum(), expectMaxQueryCountNonAffinity, "vtgate %v recieved more than the expected number of queries", i) + } + } + } +} diff --git a/go/test/endtoend/vtgateproxytest/scale_test.go b/go/test/endtoend/vtgateproxytest/scale_test.go new file mode 100644 index 00000000000..f81ad52cbcd --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/scale_test.go @@ -0,0 +1,187 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyScaleRoundRobin(t *testing.T) { + testVtgateProxyScale(t, "round_robin") +} + +func TestVtgateProxyScaleFirstReady(t *testing.T) { + testVtgateProxyScale(t, "first_ready") +} + +func testVtgateProxyScale(t *testing.T, loadBalancer string) { + defer cluster.PanicHandler(t) + + // insert test value + func() { + conn, err := mysql.Connect(context.Background(), &vtParams) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + utils.Exec(t, conn, "insert into customer(id, email) values(1, 'email1')") + }() + + const targetAffinity = "use1-az1" + const targetPool = "pool1" + const vtgateCount = 5 + const vtgateproxyConnections = 4 + + vtgates, err := startAdditionalVtgates(vtgateCount) + if err != nil { + t.Fatal(err) + } + defer teardownVtgates(vtgates) + + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + var config []map[string]string + + for i, vtgate := range vtgates { + pool := targetPool + if i == 0 { + // First vtgate is in a different pool and should not have any + // queries routed to it. + pool = "pool2" + } + + config = append(config, map[string]string{ + "host": fmt.Sprintf("vtgate%v", i), + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(vtgate.GrpcPort), + "az_id": targetAffinity, + "type": pool, + }) + } + b, err := json.Marshal(config[:1]) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + targetAffinity, + loadBalancer, + vtgateproxyConnections, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn(targetPool, targetAffinity) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Reading test value while scaling vtgates") + + // Start with an empty list of vtgates, then scale up, then scale back to + // 0. We should expect to see immediate failure when there are no vtgates, + // then success at each scale, until we hit 0 vtgates again, at which point + // we should fail fast again. + i := 0 + scaleUp := true + for { + t.Logf("writing config file with %v vtgates", i) + b, err = json.Marshal(config[:i]) + if err != nil { + t.Fatal(err) + } + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + if err := vtgateproxyProcInstance.WaitForConfig(config[:i], 5*time.Second); err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + result, err := selectHelper[customerEntry](ctx, conn, "select id, email from customer") + // 0 vtgates should fail + // First vtgate is in the wrong pool, so it should also fail + if i <= 1 { + if err == nil { + t.Fatal("query should have failed with no vtgates") + } + + // In first_ready mode, we expect to fail fast and not time out. + if loadBalancer == "first_ready" && errors.Is(err, context.DeadlineExceeded) { + t.Fatal("query timed out but it should have failed fast") + } + } else if err != nil { + t.Fatalf("%v vtgates were present, but the query still failed: %v", i, err) + } else { + assert.Equal(t, []customerEntry{{1, "email1"}}, result) + } + + if scaleUp { + i++ + if i >= len(config) { + scaleUp = false + i -= 2 + } + + continue + } + + i-- + if i < 0 { + break + } + } +} diff --git a/go/test/endtoend/vtgateproxytest/vtgateproxy_test.go b/go/test/endtoend/vtgateproxytest/vtgateproxy_test.go new file mode 100644 index 00000000000..2f96e74ebf5 --- /dev/null +++ b/go/test/endtoend/vtgateproxytest/vtgateproxy_test.go @@ -0,0 +1,116 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This tests select/insert using the unshared keyspace added in main_test +*/ +package vtgateproxytest + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "strconv" + "testing" + + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/vt/log" +) + +func TestVtgateProxyProcessRoundRobin(t *testing.T) { + testVtgateProxyProcess(t, "round_robin") +} + +func TestVtgateProxyProcessFirstReady(t *testing.T) { + testVtgateProxyProcess(t, "first_ready") +} + +func testVtgateProxyProcess(t *testing.T, loadBalancer string) { + defer cluster.PanicHandler(t) + + config := []map[string]string{ + { + "host": "vtgate1", + "address": clusterInstance.Hostname, + "grpc": strconv.Itoa(clusterInstance.VtgateProcess.GrpcPort), + "az_id": "use1-az1", + "type": "pool1", + }, + } + b, err := json.Marshal(config) + if err != nil { + t.Fatal(err) + } + vtgateHostsFile := filepath.Join(clusterInstance.TmpDirectory, "hosts") + if err := os.WriteFile(vtgateHostsFile, b, 0644); err != nil { + t.Fatal(err) + } + + vtgateproxyHTTPPort := clusterInstance.GetAndReservePort() + vtgateproxyGrpcPort := clusterInstance.GetAndReservePort() + vtgateproxyMySQLPort := clusterInstance.GetAndReservePort() + + vtgateproxyProcInstance := NewVtgateProxyProcess( + clusterInstance.TmpDirectory, + vtgateHostsFile, + "use1-az1", + loadBalancer, + 1, + vtgateproxyHTTPPort, + vtgateproxyGrpcPort, + vtgateproxyMySQLPort, + ) + if err := vtgateproxyProcInstance.Setup(); err != nil { + t.Fatal(err) + } + defer vtgateproxyProcInstance.Teardown() + + conn, err := vtgateproxyProcInstance.GetMySQLConn("pool1", "use1-az1") + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + if err := conn.Ping(); err != nil { + t.Fatal(err) + } + + log.Info("Inserting test value") + tx, err := conn.BeginTx(context.Background(), nil) + if err != nil { + t.Fatal(err) + } + + _, err = tx.Exec("insert into customer(id, email) values(1, 'email1')") + if err != nil { + t.Fatal(err) + } + if err := tx.Commit(); err != nil { + t.Fatal(err) + } + + log.Info("Reading test value") + result, err := selectHelper[customerEntry](context.Background(), conn, "select id, email from customer") + if err != nil { + t.Fatal(err) + } + + log.Infof("Read value %v", result) + + assert.Equal(t, []customerEntry{{1, "email1"}}, result) +} diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index 2d78e00e748..e71fd14f3c9 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -71,12 +71,13 @@ func (r *JSONGateResolver) Close() { } type JSONGateResolverBuilder struct { - jsonPath string - addressField string - portField string - poolTypeField string - affinityField string - affinityValue string + jsonPath string + addressField string + portField string + poolTypeField string + affinityField string + affinityValue string + numConnections int mu sync.RWMutex targets map[string][]targetHost @@ -106,16 +107,18 @@ func RegisterJSONGateResolver( poolTypeField string, affinityField string, affinityValue string, + numConnections int, ) (*JSONGateResolverBuilder, error) { jsonDiscovery := &JSONGateResolverBuilder{ - targets: map[string][]targetHost{}, - jsonPath: jsonPath, - addressField: addressField, - portField: portField, - poolTypeField: poolTypeField, - affinityField: affinityField, - affinityValue: affinityValue, - sorter: newShuffleSorter(), + targets: map[string][]targetHost{}, + jsonPath: jsonPath, + addressField: addressField, + portField: portField, + poolTypeField: poolTypeField, + affinityField: affinityField, + affinityValue: affinityValue, + numConnections: numConnections, + sorter: newShuffleSorter(), } resolver.Register(jsonDiscovery) @@ -206,7 +209,10 @@ func (b *JSONGateResolverBuilder) start() error { // notify all the resolvers that the targets changed for _, r := range b.resolvers { - b.update(r) + err = b.update(r) + if err != nil { + log.Errorf("Failed to update resolver: %v", err) + } } } }() @@ -291,10 +297,17 @@ func (b *JSONGateResolverBuilder) parse() (bool, error) { targets[target.PoolType] = append(targets[target.PoolType], target) } + // If a pool disappears, the metric will not record this unless all counts + // are reset each time the file is parsed. If this ends up causing problems + // with the metric briefly dropping to 0, it could be done by rlocking the + // target lock and then comparing the previous targets with the current + // targets and only resetting pools which disappear. + targetCount.ResetAll() + for poolType := range targets { b.sorter.shuffleSort(targets[poolType], b.affinityField, b.affinityValue) if len(targets[poolType]) > *numConnections { - targets[poolType] = targets[poolType][:*numConnections] + targets[poolType] = targets[poolType][:b.numConnections] } targetCount.Set(poolType, int64(len(targets[poolType]))) } @@ -317,7 +330,7 @@ func (b *JSONGateResolverBuilder) GetPools() []string { return pools } -func (b *JSONGateResolverBuilder) GetTargets(poolType string) []targetHost { +func (b *JSONGateResolverBuilder) getTargets(poolType string) []targetHost { // Copy the target slice b.mu.RLock() targets := []targetHost{} @@ -365,10 +378,10 @@ func (s *shuffleSorter) shuffleSort(targets []targetHost, affinityField, affinit } // Update the current list of hosts for the given resolver -func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { +func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error { log.V(100).Infof("resolving target %s to %d connections\n", r.target.URL.String(), *numConnections) - targets := b.GetTargets(r.poolType) + targets := b.getTargets(r.poolType) var addrs []resolver.Address for _, target := range targets { @@ -377,7 +390,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) { log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) - r.clientConn.UpdateState(resolver.State{Addresses: addrs}) + return r.clientConn.UpdateState(resolver.State{Addresses: addrs}) } // Build a new Resolver to route to the given target @@ -402,7 +415,10 @@ func (b *JSONGateResolverBuilder) Build(target resolver.Target, cc resolver.Clie poolType: poolType, } - b.update(r) + err := b.update(r) + if err != nil { + return nil, err + } b.resolvers = append(b.resolvers, r) return r, nil @@ -414,7 +430,7 @@ func (b *JSONGateResolverBuilder) debugTargets() any { pools := b.GetPools() targets := map[string][]targetHost{} for pool := range b.targets { - targets[pool] = b.GetTargets(pool) + targets[pool] = b.getTargets(pool) } return struct { Pools []string diff --git a/go/vt/vtgateproxy/firstready_balancer.go b/go/vt/vtgateproxy/firstready_balancer.go index 27754bc936f..ed884224c64 100644 --- a/go/vt/vtgateproxy/firstready_balancer.go +++ b/go/vt/vtgateproxy/firstready_balancer.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" + "vitess.io/vitess/go/vt/log" ) diff --git a/go/vt/vtgateproxy/vtgateproxy.go b/go/vt/vtgateproxy/vtgateproxy.go index db1abe43009..d25961e8b30 100644 --- a/go/vt/vtgateproxy/vtgateproxy.go +++ b/go/vt/vtgateproxy/vtgateproxy.go @@ -38,7 +38,10 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + + // Imported for flags _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" + "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) @@ -60,7 +63,7 @@ var ( timings = stats.NewTimings("Timings", "proxy timings by operation", "operation") - vtGateProxy *VTGateProxy = &VTGateProxy{ + vtGateProxy = &VTGateProxy{ targetConns: map[string]*vtgateconn.VTGateConn{}, mu: sync.RWMutex{}, } @@ -113,7 +116,7 @@ func (proxy *VTGateProxy) getConnection(ctx context.Context, target string) (*vt func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.ExecuteOptions, connectionAttributes map[string]string) (*vtgateconn.VTGateSession, error) { - targetUrl := url.URL{ + targetURL := url.URL{ Scheme: "vtgate", Host: "pool", } @@ -136,9 +139,9 @@ func (proxy *VTGateProxy) NewSession(ctx context.Context, options *querypb.Execu } } - targetUrl.RawQuery = values.Encode() + targetURL.RawQuery = values.Encode() - conn, err := proxy.getConnection(ctx, targetUrl.String()) + conn, err := proxy.getConnection(ctx, targetURL.String()) if err != nil { return nil, err } @@ -196,7 +199,10 @@ func (proxy *VTGateProxy) StreamExecute(ctx context.Context, session *vtgateconn if err != nil { return err } - callback(qr) + err = callback(qr) + if err != nil { + return err + } } return nil @@ -225,6 +231,7 @@ func Init() { *poolTypeField, *affinityField, *affinityValue, + *numConnections, ) if err != nil {