Skip to content

Commit

Permalink
Cherry-pick 94572b3 with conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
vitess-bot[bot] committed Oct 24, 2023
1 parent 31896d5 commit 1f1cf86
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"os"
"path"
"runtime"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -136,12 +137,22 @@ var (
writeMetrics WriteMetrics
)

var (
countIterations = 5
)

const (
<<<<<<< HEAD
maxTableRows = 4096
maxConcurrency = 20
singleConnectionSleepInterval = 2 * time.Millisecond
countIterations = 5
migrationWaitTimeout = 60 * time.Second
=======
maxTableRows = 4096
workloadDuration = 5 * time.Second
migrationWaitTimeout = 60 * time.Second
>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302))
)

func resetOpOrder() {
Expand Down Expand Up @@ -371,6 +382,9 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri
query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

rowcount := 0
for {
queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
Expand All @@ -381,7 +395,12 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri
}

select {
<<<<<<< HEAD
case <-time.After(time.Second):
=======
case <-ticker.C:
continue // Keep looping
>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302))
case <-ctx.Done():
break
}
Expand Down Expand Up @@ -480,7 +499,11 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error {
return err
}

<<<<<<< HEAD
func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
=======
func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) {
>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302))
log.Infof("Running single connection")
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
Expand All @@ -491,6 +514,9 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
_, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true)
require.Nil(t, err)

ticker := time.NewTicker(sleepInterval)
defer ticker.Stop()

for {
if atomic.LoadInt64(done) == 1 {
log.Infof("Terminating single connection")
Expand All @@ -504,20 +530,48 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) {
case 2:
err = generateDelete(t, conn)
}
<<<<<<< HEAD
=======
select {
case <-ctx.Done():
log.Infof("Terminating single connection")
return
case <-ticker.C:
}
>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302))
assert.Nil(t, err)
time.Sleep(singleConnectionSleepInterval)
}
}

func runMultipleConnections(ctx context.Context, t *testing.T) {
<<<<<<< HEAD
log.Infof("Running multiple connections")
var done int64
=======
// The workload for a 16 vCPU machine is:
// - Concurrency of 16
// - 2ms interval between queries for each connection
// As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine
// we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms.
maxConcurrency := runtime.NumCPU()
sleepModifier := 16.0 / float64(maxConcurrency)
baseSleepInterval := 2 * time.Millisecond
singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier
sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds))

log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval)
>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302))
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<<<<<<< HEAD
runSingleConnection(ctx, t, &done)
=======
runSingleConnection(ctx, t, sleepInterval)
>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302))
}()
}
<-ctx.Done()
Expand Down
15 changes: 11 additions & 4 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,13 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
startTime := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

lastKnownStatus := ""
for time.Since(startTime) < timeout {
for {
countMatchedShards := 0
r := VtgateExecQuery(t, vtParams, query, "")
for _, row := range r.Named().Rows {
Expand All @@ -266,9 +270,12 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
if countMatchedShards == len(shards) {
return schema.OnlineDDLStatus(lastKnownStatus)
}
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
return schema.OnlineDDLStatus(lastKnownStatus)
case <-ticker.C:
}
}
return schema.OnlineDDLStatus(lastKnownStatus)
}

// CheckMigrationArtifacts verifies given migration exists, and checks if it has artifacts
Expand Down

0 comments on commit 1f1cf86

Please sign in to comment.