Skip to content

Commit

Permalink
[release-18.0] OnlineDDL: reduce vrepl_stress workload in forks (#14302
Browse files Browse the repository at this point in the history
…) (#14349)

Signed-off-by: Shlomi Noach <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
vitess-bot[bot] and shlomi-noach authored Oct 24, 2023
1 parent e8fb5ae commit 6103c37
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"math/rand"
"os"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -136,12 +136,14 @@ var (
writeMetrics WriteMetrics
)

var (
countIterations = 5
)

const (
maxTableRows = 4096
maxConcurrency = 20
singleConnectionSleepInterval = 2 * time.Millisecond
countIterations = 5
migrationWaitTimeout = 60 * time.Second
maxTableRows = 4096
workloadDuration = 5 * time.Second
migrationWaitTimeout = 60 * time.Second
)

func resetOpOrder() {
Expand Down Expand Up @@ -227,6 +229,8 @@ func TestMain(m *testing.M) {
func TestSchemaChange(t *testing.T) {
defer cluster.PanicHandler(t)

ctx := context.Background()

shards = clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))

Expand All @@ -251,16 +255,17 @@ func TestSchemaChange(t *testing.T) {
// that our testing/metrics logic is sound in the first place.
testName := fmt.Sprintf("workload without ALTER TABLE %d/%d", (i + 1), countIterations)
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
initTable(t)

ctx, cancel := context.WithTimeout(ctx, workloadDuration)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
runMultipleConnections(ctx, t)
}()
time.Sleep(5 * time.Second)
cancel() // will cause runMultipleConnections() to terminate
wg.Wait()
testSelectTableMetrics(t)
})
Expand All @@ -285,14 +290,17 @@ func TestSchemaChange(t *testing.T) {
// the vreplication/ALTER TABLE did not corrupt our data and we are happy.
testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations)
t.Run(testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx := context.Background()
t.Run("create schema", func(t *testing.T) {
testWithInitialSchema(t)
})
t.Run("init table", func(t *testing.T) {
initTable(t)
})
t.Run("migrate", func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -302,7 +310,7 @@ func TestSchemaChange(t *testing.T) {
hint := fmt.Sprintf("hint-alter-with-workload-%d", i)
uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), onlineDDLStrategy, "vtgate", hint)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
cancel() // will cause runMultipleConnections() to terminate
cancel() // Now that the migration is complete, we can stop the workload.
wg.Wait()
})
t.Run("validate metrics", func(t *testing.T) {
Expand Down Expand Up @@ -371,6 +379,9 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri
query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

rowcount := 0

for {
Expand All @@ -382,7 +393,7 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri
}

select {
case <-time.After(time.Second):
case <-ticker.C:
continue // Keep looping
case <-ctx.Done():
// Break below to the assertion
Expand Down Expand Up @@ -485,7 +496,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error {
return err
}

func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) {
log.Infof("Running single connection")
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -496,11 +507,10 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
_, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true)
require.Nil(t, err)

ticker := time.NewTicker(sleepInterval)
defer ticker.Stop()

for {
if atomic.LoadInt64(done) == 1 {
log.Infof("Terminating single connection")
return
}
switch rand.Int31n(3) {
case 0:
err = generateInsert(t, conn)
Expand All @@ -509,27 +519,39 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
case 2:
err = generateDelete(t, conn)
}
select {
case <-ctx.Done():
log.Infof("Terminating single connection")
return
case <-ticker.C:
}
assert.Nil(t, err)
time.Sleep(singleConnectionSleepInterval)
}
}

func runMultipleConnections(ctx context.Context, t *testing.T) {
log.Infof("Running multiple connections")
var done int64
// The workload for a 16 vCPU machine is:
// - Concurrency of 16
// - 2ms interval between queries for each connection
// As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine
// we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms.
maxConcurrency := runtime.NumCPU()
sleepModifier := 16.0 / float64(maxConcurrency)
baseSleepInterval := 2 * time.Millisecond
singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier
sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds))

log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval)
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
runSingleConnection(ctx, t, &done)
runSingleConnection(ctx, t, sleepInterval)
}()
}
<-ctx.Done()
atomic.StoreInt64(&done, 1)
log.Infof("Running multiple connections: done")
wg.Wait()
log.Infof("All connections cancelled")
log.Infof("Running multiple connections: done")
}

func initTable(t *testing.T) {
Expand Down
15 changes: 11 additions & 4 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,13 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for time.Since(startTime) < timeout {
for {
countMatchedShards := 0
r := VtgateExecQuery(t, vtParams, query, "")
for _, row := range r.Named().Rows {
Expand All @@ -266,9 +270,12 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
}
return schema.OnlineDDLStatus(lastKnownStatus)
}

// CheckMigrationArtifacts verifies given migration exists, and checks if it has artifacts
Expand Down

0 comments on commit 6103c37

Please sign in to comment.