diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 46efd8dc974..ade8d4e7ec1 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -32,6 +32,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "testing" "time" @@ -189,18 +190,20 @@ func (shard *Shard) Replica() *Vttablet { return nil } -// CtrlCHandler handles the teardown for the ctrl-c. -func (cluster *LocalProcessCluster) CtrlCHandler() { +// SetupCtrlCHandler handles the teardown for the ctrl-c. +func (cluster *LocalProcessCluster) SetupCtrlCHandler() { cluster.Context, cluster.CancelFunc = context.WithCancel(context.Background()) - c := make(chan os.Signal, 2) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - select { - case <-c: - cluster.Teardown() - os.Exit(0) - case <-cluster.Done(): - } + go func() { + c := make(chan os.Signal, 2) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + select { + case <-c: + cluster.Teardown() + os.Exit(0) + case <-cluster.Done(): + } + }() } // StartTopo starts topology server @@ -715,7 +718,7 @@ func (cluster *LocalProcessCluster) NewVtgateInstance() *VtgateProcess { // NewBareCluster instantiates a new cluster and does not assume existence of any of the vitess processes func NewBareCluster(cell string, hostname string) *LocalProcessCluster { cluster := &LocalProcessCluster{Cell: cell, Hostname: hostname, mx: new(sync.Mutex), DefaultCharset: "utf8mb4"} - go cluster.CtrlCHandler() + cluster.SetupCtrlCHandler() cluster.OriginalVTDATAROOT = os.Getenv("VTDATAROOT") cluster.CurrentVTDATAROOT = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("vtroot_%d", cluster.GetAndReservePort())) @@ -941,28 +944,28 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, return err } - conditionSuccess := false - timeoutExceeded := false + var conditionSuccess atomic.Bool + var timeoutExceeded atomic.Bool go func() { time.Sleep(timeout) - timeoutExceeded = true + timeoutExceeded.Store(true) }() err = conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error { if condition(shr) { - conditionSuccess = true + conditionSuccess.Store(true) } - if timeoutExceeded || conditionSuccess { + if timeoutExceeded.Load() || conditionSuccess.Load() { return io.EOF } return nil }) - if conditionSuccess { + if conditionSuccess.Load() { return nil } - if timeoutExceeded { + if timeoutExceeded.Load() { return errors.New("timeout exceed while waiting for the condition in StreamHealth") } return err diff --git a/go/test/endtoend/tabletmanager/tablet_health_test.go b/go/test/endtoend/tabletmanager/tablet_health_test.go index 83a3ce08cfb..7dc4bcd97d2 100644 --- a/go/test/endtoend/tabletmanager/tablet_health_test.go +++ b/go/test/endtoend/tabletmanager/tablet_health_test.go @@ -23,6 +23,7 @@ import ( "net/http" "slices" "sync" + "sync/atomic" "testing" "time" @@ -250,17 +251,19 @@ func TestHealthCheckSchemaChangeSignal(t *testing.T) { func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, viewsEnabled bool) { var streamErr error - wg := sync.WaitGroup{} + var wg sync.WaitGroup + var ranOnce atomic.Bool + var finished atomic.Bool + wg.Add(1) - ranOnce := false - finished := false ch := make(chan *querypb.StreamHealthResponse) + go func() { defer wg.Done() streamErr = clusterInstance.StreamTabletHealthUntil(context.Background(), primaryTablet, 30*time.Second, func(shr *querypb.StreamHealthResponse) bool { - ranOnce = true + ranOnce.Store(true) // If we are finished, then close the channel and end the stream. - if finished { + if finished.Load() { close(ch) return true } @@ -272,13 +275,14 @@ func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, // The test becomes flaky if we run the DDL immediately after starting the above go routine because the client for the Stream // sometimes isn't registered by the time DDL runs, and it misses the update we get. To prevent this situation, we wait for one Stream packet // to have returned. Once we know we received a Stream packet, then we know that we are registered for the health stream and can execute the DDL. - for i := 0; i < 30; i++ { - if ranOnce { - break - } + for i := 0; i < 30 && !ranOnce.Load(); i++ { time.Sleep(1 * time.Second) } + if !ranOnce.Load() { + t.Fatalf("HealthCheck did not ran?") + } + verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area` (`id` int NOT NULL, `country` varchar(30), PRIMARY KEY (`id`))", "area") verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area2` (`id` int NOT NULL, PRIMARY KEY (`id`))", "area2") verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE VIEW v2 as select * from t1", viewsEnabled) @@ -288,7 +292,7 @@ func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP VIEW v2", viewsEnabled) verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP TABLE `area`", "area") - finished = true + finished.Store(true) wg.Wait() require.NoError(t, streamErr) } diff --git a/go/test/endtoend/vtgate/unsharded/main_test.go b/go/test/endtoend/vtgate/unsharded/main_test.go index 1f25db161ef..f772fabecc1 100644 --- a/go/test/endtoend/vtgate/unsharded/main_test.go +++ b/go/test/endtoend/vtgate/unsharded/main_test.go @@ -131,7 +131,6 @@ BEGIN insert into allDefaults () values (); select * from allDefaults; delete from allDefaults; - set autocommit = 0; END; CREATE PROCEDURE in_parameter(IN val int) diff --git a/go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go b/go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go index d13c4ea9e67..d69ce193ef9 100644 --- a/go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go +++ b/go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go @@ -17,13 +17,14 @@ limitations under the License. package streamtimeout import ( - "fmt" + "context" "slices" "testing" "time" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/vttablet/endtoend/framework" ) @@ -31,9 +32,16 @@ import ( // TestSchemaChangeTimedout ensures that the timeout functionality is working properly // to prevent queries from hanging up and causing a mutex to be locked forever. func TestSchemaChangeTimedout(t *testing.T) { + const TableName = "vitess_healthstream" + client := framework.NewClient() reloadEstimatedTime := 2 * time.Second + err := cluster.SimulateMySQLHang() + require.NoError(t, err) + + defer cluster.StopSimulateMySQLHang() + ch := make(chan []string, 100) go func(ch chan []string) { client.StreamHealth(func(response *querypb.StreamHealthResponse) error { @@ -44,39 +52,22 @@ func TestSchemaChangeTimedout(t *testing.T) { }) }(ch) - // We will set up the MySQLHang simulation. - // To avoid flakiness, we will retry the setup if the health_streamer sends a notification before the MySQLHang is simulated. - attempt := 1 - var tableName string -loop: - for { - tableName = fmt.Sprintf("vitess_sc%d", attempt) - - // change the schema to trigger the health_streamer to send a notification at a later time. - _, err := client.Execute("create table "+tableName+"(id bigint primary key)", nil) - require.NoError(t, err) + // get a clean connection that skips toxyproxy to be able to change the schema in the underlying DB + cleanParams := cluster.MySQLCleanConnParams() + cleanConn, err := mysql.Connect(context.Background(), &cleanParams) + require.NoError(t, err) + defer cleanConn.Close() - // start simulating a mysql stall until a query issued by the health_streamer would hang. - err = cluster.SimulateMySQLHang() - require.NoError(t, err) + // change the schema to trigger the health_streamer to send a notification at a later time. + _, err = cleanConn.ExecuteFetch("create table "+TableName+"(id bigint primary key)", -1, false) + require.NoError(t, err) - select { - case <-ch: // get the schema notification - // The health_streamer can send a notification between the time the schema is changed and the mysql stall is simulated. - // In this rare case, we must retry the same setup again. - cluster.StopSimulateMySQLHang() - attempt++ - - if attempt > 5 { - t.Errorf("failed to setup MySQLHang even after several attempts") - return - } - t.Logf("retrying setup for attempt %d", attempt) - case <-time.After(reloadEstimatedTime): - break loop - } + select { + case <-ch: // get the schema notification + t.Fatalf("received an schema change event from the HealthStreamer (is toxyproxy working?)") + case <-time.After(reloadEstimatedTime): + // Good, continue } - defer cluster.StopSimulateMySQLHang() // We will wait for the health_streamer to attempt sending a notification. // It's important to keep in mind that the total wait time after the simulation should be shorter than the reload timeout. @@ -87,7 +78,7 @@ loop: time.Sleep(reloadInterval) // pause simulating the mysql stall to allow the health_streamer to resume. - err := cluster.PauseSimulateMySQLHang() + err = cluster.PauseSimulateMySQLHang() require.NoError(t, err) // wait for the health_streamer to complete retrying the notification. @@ -97,7 +88,7 @@ loop: for { select { case res := <-ch: // get the schema notification - if slices.Contains(res, tableName) { + if slices.Contains(res, TableName) { return } case <-timeout: diff --git a/go/vt/vttest/local_cluster.go b/go/vt/vttest/local_cluster.go index fc9a455ba6f..86b8079a9c8 100644 --- a/go/vt/vttest/local_cluster.go +++ b/go/vt/vttest/local_cluster.go @@ -290,6 +290,18 @@ func (db *LocalCluster) MySQLAppDebugConnParams() mysql.ConnParams { return connParams } +// MySQLCleanConnParams returns connection params that can be used to connect +// directly to MySQL, even if there's a toxyproxy instance on the way. +func (db *LocalCluster) MySQLCleanConnParams() mysql.ConnParams { + mysqlctl := db.mysql + if toxiproxy, ok := mysqlctl.(*Toxiproxyctl); ok { + mysqlctl = toxiproxy.mysqlctl + } + connParams := mysqlctl.Params(db.DbName()) + connParams.Charset = db.Config.Charset + return connParams +} + // SimulateMySQLHang simulates a scenario where the backend MySQL stops all data from flowing through. // Please ensure to `defer db.StopSimulateMySQLHang()` after calling this method. func (db *LocalCluster) SimulateMySQLHang() error {