Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: pool-related test flakyness #14076

Merged
merged 4 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -189,18 +190,20 @@ func (shard *Shard) Replica() *Vttablet {
return nil
}

// CtrlCHandler handles the teardown for the ctrl-c.
func (cluster *LocalProcessCluster) CtrlCHandler() {
// SetupCtrlCHandler handles the teardown for the ctrl-c.
func (cluster *LocalProcessCluster) SetupCtrlCHandler() {
cluster.Context, cluster.CancelFunc = context.WithCancel(context.Background())

c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
cluster.Teardown()
os.Exit(0)
case <-cluster.Done():
}
go func() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
select {
case <-c:
cluster.Teardown()
os.Exit(0)
case <-cluster.Done():
}
}()
}

// StartTopo starts topology server
Expand Down Expand Up @@ -715,7 +718,7 @@ func (cluster *LocalProcessCluster) NewVtgateInstance() *VtgateProcess {
// NewBareCluster instantiates a new cluster and does not assume existence of any of the vitess processes
func NewBareCluster(cell string, hostname string) *LocalProcessCluster {
cluster := &LocalProcessCluster{Cell: cell, Hostname: hostname, mx: new(sync.Mutex), DefaultCharset: "utf8mb4"}
go cluster.CtrlCHandler()
cluster.SetupCtrlCHandler()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆


cluster.OriginalVTDATAROOT = os.Getenv("VTDATAROOT")
cluster.CurrentVTDATAROOT = path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("vtroot_%d", cluster.GetAndReservePort()))
Expand Down Expand Up @@ -941,28 +944,28 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context,
return err
}

conditionSuccess := false
timeoutExceeded := false
var conditionSuccess atomic.Bool
var timeoutExceeded atomic.Bool
go func() {
time.Sleep(timeout)
timeoutExceeded = true
timeoutExceeded.Store(true)
}()

err = conn.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
if condition(shr) {
conditionSuccess = true
conditionSuccess.Store(true)
}
if timeoutExceeded || conditionSuccess {
if timeoutExceeded.Load() || conditionSuccess.Load() {
return io.EOF
}
return nil
})

if conditionSuccess {
if conditionSuccess.Load() {
return nil
}

if timeoutExceeded {
if timeoutExceeded.Load() {
return errors.New("timeout exceed while waiting for the condition in StreamHealth")
}
return err
Expand Down
24 changes: 14 additions & 10 deletions go/test/endtoend/tabletmanager/tablet_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -250,17 +251,19 @@ func TestHealthCheckSchemaChangeSignal(t *testing.T) {

func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn, primaryTablet *cluster.Vttablet, viewsEnabled bool) {
var streamErr error
wg := sync.WaitGroup{}
var wg sync.WaitGroup
var ranOnce atomic.Bool
var finished atomic.Bool

wg.Add(1)
ranOnce := false
finished := false
ch := make(chan *querypb.StreamHealthResponse)

go func() {
defer wg.Done()
streamErr = clusterInstance.StreamTabletHealthUntil(context.Background(), primaryTablet, 30*time.Second, func(shr *querypb.StreamHealthResponse) bool {
ranOnce = true
ranOnce.Store(true)
// If we are finished, then close the channel and end the stream.
if finished {
if finished.Load() {
close(ch)
return true
}
Expand All @@ -272,13 +275,14 @@ func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn,
// The test becomes flaky if we run the DDL immediately after starting the above go routine because the client for the Stream
// sometimes isn't registered by the time DDL runs, and it misses the update we get. To prevent this situation, we wait for one Stream packet
// to have returned. Once we know we received a Stream packet, then we know that we are registered for the health stream and can execute the DDL.
for i := 0; i < 30; i++ {
if ranOnce {
break
}
for i := 0; i < 30 && !ranOnce.Load(); i++ {
time.Sleep(1 * time.Second)
}

if !ranOnce.Load() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick of nitpicks - github.com/stretchr/testify are our friends.

require.True(ranOnce.Load(), "HealthCheck did not ran?")

t.Fatalf("HealthCheck did not ran?")
}

verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area` (`id` int NOT NULL, `country` varchar(30), PRIMARY KEY (`id`))", "area")
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE TABLE `area2` (`id` int NOT NULL, PRIMARY KEY (`id`))", "area2")
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "CREATE VIEW v2 as select * from t1", viewsEnabled)
Expand All @@ -288,7 +292,7 @@ func verifyHealthStreamSchemaChangeSignals(t *testing.T, vtgateConn *mysql.Conn,
verifyViewDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP VIEW v2", viewsEnabled)
verifyTableDDLSchemaChangeSignal(t, vtgateConn, ch, "DROP TABLE `area`", "area")

finished = true
finished.Store(true)
wg.Wait()
require.NoError(t, streamErr)
}
Expand Down
1 change: 0 additions & 1 deletion go/test/endtoend/vtgate/unsharded/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ BEGIN
insert into allDefaults () values ();
select * from allDefaults;
delete from allDefaults;
set autocommit = 0;
END;

CREATE PROCEDURE in_parameter(IN val int)
Expand Down
57 changes: 24 additions & 33 deletions go/vt/vttablet/endtoend/streamtimeout/healthstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,31 @@ limitations under the License.
package streamtimeout

import (
"fmt"
"context"
"slices"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
)

// TestSchemaChangeTimedout ensures that the timeout functionality is working properly
// to prevent queries from hanging up and causing a mutex to be locked forever.
func TestSchemaChangeTimedout(t *testing.T) {
const TableName = "vitess_healthstream"

client := framework.NewClient()
reloadEstimatedTime := 2 * time.Second

err := cluster.SimulateMySQLHang()
require.NoError(t, err)

defer cluster.StopSimulateMySQLHang()

ch := make(chan []string, 100)
go func(ch chan []string) {
client.StreamHealth(func(response *querypb.StreamHealthResponse) error {
Expand All @@ -44,39 +52,22 @@ func TestSchemaChangeTimedout(t *testing.T) {
})
}(ch)

// We will set up the MySQLHang simulation.
// To avoid flakiness, we will retry the setup if the health_streamer sends a notification before the MySQLHang is simulated.
attempt := 1
var tableName string
loop:
for {
tableName = fmt.Sprintf("vitess_sc%d", attempt)

// change the schema to trigger the health_streamer to send a notification at a later time.
_, err := client.Execute("create table "+tableName+"(id bigint primary key)", nil)
require.NoError(t, err)
// get a clean connection that skips toxyproxy to be able to change the schema in the underlying DB
cleanParams := cluster.MySQLCleanConnParams()
cleanConn, err := mysql.Connect(context.Background(), &cleanParams)
require.NoError(t, err)
defer cleanConn.Close()

// start simulating a mysql stall until a query issued by the health_streamer would hang.
err = cluster.SimulateMySQLHang()
require.NoError(t, err)
// change the schema to trigger the health_streamer to send a notification at a later time.
_, err = cleanConn.ExecuteFetch("create table "+TableName+"(id bigint primary key)", -1, false)
require.NoError(t, err)

select {
case <-ch: // get the schema notification
// The health_streamer can send a notification between the time the schema is changed and the mysql stall is simulated.
// In this rare case, we must retry the same setup again.
cluster.StopSimulateMySQLHang()
attempt++

if attempt > 5 {
t.Errorf("failed to setup MySQLHang even after several attempts")
return
}
t.Logf("retrying setup for attempt %d", attempt)
case <-time.After(reloadEstimatedTime):
break loop
}
select {
case <-ch: // get the schema notification
t.Fatalf("received an schema change event from the HealthStreamer (is toxyproxy working?)")
case <-time.After(reloadEstimatedTime):
// Good, continue
}
defer cluster.StopSimulateMySQLHang()

// We will wait for the health_streamer to attempt sending a notification.
// It's important to keep in mind that the total wait time after the simulation should be shorter than the reload timeout.
Expand All @@ -87,7 +78,7 @@ loop:
time.Sleep(reloadInterval)

// pause simulating the mysql stall to allow the health_streamer to resume.
err := cluster.PauseSimulateMySQLHang()
err = cluster.PauseSimulateMySQLHang()
require.NoError(t, err)

// wait for the health_streamer to complete retrying the notification.
Expand All @@ -97,7 +88,7 @@ loop:
for {
select {
case res := <-ch: // get the schema notification
if slices.Contains(res, tableName) {
if slices.Contains(res, TableName) {
return
}
case <-timeout:
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vttest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ func (db *LocalCluster) MySQLAppDebugConnParams() mysql.ConnParams {
return connParams
}

// MySQLCleanConnParams returns connection params that can be used to connect
// directly to MySQL, even if there's a toxyproxy instance on the way.
func (db *LocalCluster) MySQLCleanConnParams() mysql.ConnParams {
mysqlctl := db.mysql
if toxiproxy, ok := mysqlctl.(*Toxiproxyctl); ok {
mysqlctl = toxiproxy.mysqlctl
}
connParams := mysqlctl.Params(db.DbName())
connParams.Charset = db.Config.Charset
return connParams
}

// SimulateMySQLHang simulates a scenario where the backend MySQL stops all data from flowing through.
// Please ensure to `defer db.StopSimulateMySQLHang()` after calling this method.
func (db *LocalCluster) SimulateMySQLHang() error {
Expand Down