Skip to content

Commit

Permalink
ci: pool-related test flakyness (#14076)
Browse files Browse the repository at this point in the history
Signed-off-by: Vicent Marti <[email protected]>
  • Loading branch information
vmg authored Sep 22, 2023
1 parent e790f2e commit 71cd149
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 62 deletions.
39 changes: 21 additions & 18 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -189,18 +190,20 @@ func (shard *Shard) Replica() *Vttablet {
return nil
}

// CtrlCHandler handles the teardown for the ctrl-c.
func (cluster *LocalProcessCluster) CtrlCHandler() {
// SetupCtrlCHandler handles the teardown for the ctrl-c.
func (cluster *LocalProcessCluster) SetupCtrlCHandler() {
cluster.Context, cluster.CancelFunc = context.WithCancel(context.Background())

c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
cluster.Teardown()
os.Exit(0)
case <-cluster.Done():
}
go func() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
cluster.Teardown()
os.Exit(0)
case <-cluster.Done():
}
}()
}

// StartTopo starts topology server
Expand Down Expand Up @@ -715,7 +718,7 @@ func (cluster *LocalProcessCluster) NewVtgateInstance() *VtgateProcess {
// NewBareCluster instantiates a new cluster and does not assume existence of any of the vitess processes
func NewBareCluster(cell string, hostname string) *LocalProcessCluster {
cluster := &LocalProcessCluster{Cell: cell, Hostname: hostname, mx: new(sync.Mutex), DefaultCharset: "utf8mb4"}
go cluster.CtrlCHandler()
cluster.SetupCtrlCHandler()

cluster.OriginalVTDATAROOT = os.Getenv("VTDATAROOT")
cluster.CurrentVTDATAROOT = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("vtroot_%d", cluster.GetAndReservePort()))
Expand Down Expand Up @@ -941,28 +944,28 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context,
return err
}

conditionSuccess := false
timeoutExceeded := false
var conditionSuccess atomic.Bool
var timeoutExceeded atomic.Bool
go func() {
time.Sleep(timeout)
timeoutExceeded = true
timeoutExceeded.Store(true)
}()

err = conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
if condition(shr) {
conditionSuccess = true
conditionSuccess.Store(true)
}
if timeoutExceeded || conditionSuccess {
if timeoutExceeded.Load() || conditionSuccess.Load() {
return io.EOF
}
return nil
})

if conditionSuccess {
if conditionSuccess.Load() {
return nil
}

if timeoutExceeded {
if timeoutExceeded.Load() {
return errors.New("timeout exceed while waiting for the condition in StreamHealth")
}
return err
Expand Down
24 changes: 14 additions & 10 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -250,17 +251,19 @@ func TestHealthCheckSchemaChangeSignal(t *testing.T) {

func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, viewsEnabled bool) {
var streamErr error
wg := sync.WaitGroup{}
var wg sync.WaitGroup
var ranOnce atomic.Bool
var finished atomic.Bool

wg.Add(1)
ranOnce := false
finished := false
ch := make(chan *querypb.StreamHealthResponse)

go func() {
defer wg.Done()
streamErr = clusterInstance.StreamTabletHealthUntil(context.Background(), primaryTablet, 30*time.Second, func(shr *querypb.StreamHealthResponse) bool {
ranOnce = true
ranOnce.Store(true)
// If we are finished, then close the channel and end the stream.
if finished {
if finished.Load() {
close(ch)
return true
}
Expand All @@ -272,13 +275,14 @@ func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn,
// The test becomes flaky if we run the DDL immediately after starting the above go routine because the client for the Stream
// sometimes isn't registered by the time DDL runs, and it misses the update we get. To prevent this situation, we wait for one Stream packet
// to have returned. Once we know we received a Stream packet, then we know that we are registered for the health stream and can execute the DDL.
for i := 0; i < 30; i++ {
if ranOnce {
break
}
for i := 0; i < 30 && !ranOnce.Load(); i++ {
time.Sleep(1 * time.Second)
}

if !ranOnce.Load() {
t.Fatalf("HealthCheck did not ran?")
}

verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area` (`id` int NOT NULL, `country` varchar(30), PRIMARY KEY (`id`))", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area2` (`id` int NOT NULL, PRIMARY KEY (`id`))", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE VIEW v2 as select * from t1", viewsEnabled)
Expand All @@ -288,7 +292,7 @@ func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn,
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP VIEW v2", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP TABLE `area`", "area")

finished = true
finished.Store(true)
wg.Wait()
require.NoError(t, streamErr)
}
Expand Down
1 change: 0 additions & 1 deletion go/test/endtoend/vtgate/unsharded/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ BEGIN
insert into allDefaults () values ();
select * from allDefaults;
delete from allDefaults;
set autocommit = 0;
END;
CREATE PROCEDURE in_parameter(IN val int)
Expand Down
57 changes: 24 additions & 33 deletions go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,31 @@ limitations under the License.
package streamtimeout

import (
"fmt"
"context"
"slices"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
)

// TestSchemaChangeTimedout ensures that the timeout functionality is working properly
// to prevent queries from hanging up and causing a mutex to be locked forever.
func TestSchemaChangeTimedout(t *testing.T) {
const TableName = "vitess_healthstream"

client := framework.NewClient()
reloadEstimatedTime := 2 * time.Second

err := cluster.SimulateMySQLHang()
require.NoError(t, err)

defer cluster.StopSimulateMySQLHang()

ch := make(chan []string, 100)
go func(ch chan []string) {
client.StreamHealth(func(response *querypb.StreamHealthResponse) error {
Expand All @@ -44,39 +52,22 @@ func TestSchemaChangeTimedout(t *testing.T) {
})
}(ch)

// We will set up the MySQLHang simulation.
// To avoid flakiness, we will retry the setup if the health_streamer sends a notification before the MySQLHang is simulated.
attempt := 1
var tableName string
loop:
for {
tableName = fmt.Sprintf("vitess_sc%d", attempt)

// change the schema to trigger the health_streamer to send a notification at a later time.
_, err := client.Execute("create table "+tableName+"(id bigint primary key)", nil)
require.NoError(t, err)
// get a clean connection that skips toxyproxy to be able to change the schema in the underlying DB
cleanParams := cluster.MySQLCleanConnParams()
cleanConn, err := mysql.Connect(context.Background(), &cleanParams)
require.NoError(t, err)
defer cleanConn.Close()

// start simulating a mysql stall until a query issued by the health_streamer would hang.
err = cluster.SimulateMySQLHang()
require.NoError(t, err)
// change the schema to trigger the health_streamer to send a notification at a later time.
_, err = cleanConn.ExecuteFetch("create table "+TableName+"(id bigint primary key)", -1, false)
require.NoError(t, err)

select {
case <-ch: // get the schema notification
// The health_streamer can send a notification between the time the schema is changed and the mysql stall is simulated.
// In this rare case, we must retry the same setup again.
cluster.StopSimulateMySQLHang()
attempt++

if attempt > 5 {
t.Errorf("failed to setup MySQLHang even after several attempts")
return
}
t.Logf("retrying setup for attempt %d", attempt)
case <-time.After(reloadEstimatedTime):
break loop
}
select {
case <-ch: // get the schema notification
t.Fatalf("received an schema change event from the HealthStreamer (is toxyproxy working?)")
case <-time.After(reloadEstimatedTime):
// Good, continue
}
defer cluster.StopSimulateMySQLHang()

// We will wait for the health_streamer to attempt sending a notification.
// It's important to keep in mind that the total wait time after the simulation should be shorter than the reload timeout.
Expand All @@ -87,7 +78,7 @@ loop:
time.Sleep(reloadInterval)

// pause simulating the mysql stall to allow the health_streamer to resume.
err := cluster.PauseSimulateMySQLHang()
err = cluster.PauseSimulateMySQLHang()
require.NoError(t, err)

// wait for the health_streamer to complete retrying the notification.
Expand All @@ -97,7 +88,7 @@ loop:
for {
select {
case res := <-ch: // get the schema notification
if slices.Contains(res, tableName) {
if slices.Contains(res, TableName) {
return
}
case <-timeout:
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ func (db *LocalCluster) MySQLAppDebugConnParams() mysql.ConnParams {
return connParams
}

// MySQLCleanConnParams returns connection params that can be used to connect
// directly to MySQL, even if there's a toxyproxy instance on the way.
func (db *LocalCluster) MySQLCleanConnParams() mysql.ConnParams {
mysqlctl := db.mysql
if toxiproxy, ok := mysqlctl.(*Toxiproxyctl); ok {
mysqlctl = toxiproxy.mysqlctl
}
connParams := mysqlctl.Params(db.DbName())
connParams.Charset = db.Config.Charset
return connParams
}

// SimulateMySQLHang simulates a scenario where the backend MySQL stops all data from flowing through.
// Please ensure to `defer db.StopSimulateMySQLHang()` after calling this method.
func (db *LocalCluster) SimulateMySQLHang() error {
Expand Down

0 comments on commit 71cd149

Please sign in to comment.