From a84742e01f37f692b7a5f2c3f081fb1d93d1616e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 18 Oct 2023 10:42:15 +0300 Subject: [PATCH 1/7] OnlineDDL: reduce vrepl_stress workload in forks Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl_vrepl_mini_stress_test.go | 33 +++++++++++++++---- go/test/endtoend/onlineddl/vtgate_util.go | 15 ++++++--- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 983739a976d..f9cf63eef7a 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -135,13 +135,16 @@ var ( writeMetrics WriteMetrics ) -const ( - maxTableRows = 4096 - workloadDuration = 5 * time.Second +var ( maxConcurrency = 20 singleConnectionSleepInterval = 2 * time.Millisecond countIterations = 5 - migrationWaitTimeout = 60 * time.Second +) + +const ( + maxTableRows = 4096 + workloadDuration = 5 * time.Second + migrationWaitTimeout = 60 * time.Second ) func resetOpOrder() { @@ -157,6 +160,18 @@ func nextOpOrder() int64 { return opOrder } +func TestInitialSetup(t *testing.T) { + repo, _ := os.LookupEnv("GITHUB_REPOSITORY") + t.Logf("==== repo=%v", repo) + if repo != "vitessio/vitess" { + // `vitessio/vitess` repository enjoys faster runners. Otherwise, GitHub CI has much slower runners + // and we have to reduce the workload + maxConcurrency = maxConcurrency / 2 + singleConnectionSleepInterval = singleConnectionSleepInterval * 2 + } + t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) +} + func TestMain(m *testing.M) { defer cluster.PanicHandler(nil) flag.Parse() @@ -377,6 +392,9 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + rowcount := 0 for { @@ -388,7 +406,7 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri } select { - case <-time.After(time.Second): + case <-ticker.C: continue // Keep looping case <-ctx.Done(): // Break below to the assertion @@ -502,6 +520,9 @@ func runSingleConnection(ctx context.Context, t *testing.T) { _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) require.Nil(t, err) + ticker := time.NewTicker(singleConnectionSleepInterval) + defer ticker.Stop() + for { switch rand.Int31n(3) { case 0: @@ -515,7 +536,7 @@ func runSingleConnection(ctx context.Context, t *testing.T) { case <-ctx.Done(): log.Infof("Terminating single connection") return - case <-time.After(singleConnectionSleepInterval): + case <-ticker.C: } assert.Nil(t, err) } diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 3d99a2cef92..693523cec48 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -247,9 +247,13 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c for _, status := range expectStatuses { statusesMap[string(status)] = true } - startTime := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + lastKnownStatus := "" - for time.Since(startTime) < timeout { + for { countMatchedShards := 0 r := VtgateExecQuery(t, vtParams, query, "") for _, row := range r.Named().Rows { @@ -266,9 +270,12 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c if countMatchedShards == len(shards) { return schema.OnlineDDLStatus(lastKnownStatus) } - time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } } - return schema.OnlineDDLStatus(lastKnownStatus) } // CheckMigrationArtifacts verifies given migration exists, and checks if it has artifacts From a826b896840d6181677afff52398127a59c348cd Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 18 Oct 2023 10:46:40 +0300 Subject: [PATCH 2/7] verify we're running within GitHub CI Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vrepl_stress/onlineddl_vrepl_mini_stress_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index f9cf63eef7a..2bc22bcd4e1 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -161,9 +161,9 @@ func nextOpOrder() int64 { } func TestInitialSetup(t *testing.T) { - repo, _ := os.LookupEnv("GITHUB_REPOSITORY") + repo, ok := os.LookupEnv("GITHUB_REPOSITORY") // `ok` tells us the env variable exists, hence that we are running in GitHub CI. t.Logf("==== repo=%v", repo) - if repo != "vitessio/vitess" { + if ok && repo != "vitessio/vitess" { // `vitessio/vitess` repository enjoys faster runners. Otherwise, GitHub CI has much slower runners // and we have to reduce the workload maxConcurrency = maxConcurrency / 2 From 5e06b605b0a0975c3ae9ff38fd73c7ab41f0c8ea Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 19 Oct 2023 09:58:43 +0300 Subject: [PATCH 3/7] vcpu-based workload Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl_vrepl_mini_stress_test.go | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 2bc22bcd4e1..f00006a38b1 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -20,9 +20,11 @@ import ( "context" "flag" "fmt" + "math" "math/rand" "os" "path" + "runtime" "strings" "sync" "testing" @@ -161,15 +163,21 @@ func nextOpOrder() int64 { } func TestInitialSetup(t *testing.T) { - repo, ok := os.LookupEnv("GITHUB_REPOSITORY") // `ok` tells us the env variable exists, hence that we are running in GitHub CI. - t.Logf("==== repo=%v", repo) - if ok && repo != "vitessio/vitess" { - // `vitessio/vitess` repository enjoys faster runners. Otherwise, GitHub CI has much slower runners - // and we have to reduce the workload - maxConcurrency = maxConcurrency / 2 - singleConnectionSleepInterval = singleConnectionSleepInterval * 2 - } - t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) + // repo, ok := os.LookupEnv("GITHUB_REPOSITORY") // `ok` tells us the env variable exists, hence that we are running in GitHub CI. + // t.Logf("==== repo=%v", repo) + // if ok && repo != "vitessio/vitess" { + // // `vitessio/vitess` repository enjoys faster runners. Otherwise, GitHub CI has much slower runners + // // and we have to reduce the workload + // maxConcurrency = maxConcurrency / 2 + // singleConnectionSleepInterval = singleConnectionSleepInterval * 2 + // } + // t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) + + vCPUs := runtime.NumCPU() + maxConcurrency = vCPUs + sleepModifier := int(math.Max(float64(4-(vCPUs/4)), 1)) + singleConnectionSleepInterval = time.Duration((int(singleConnectionSleepInterval.Milliseconds()) * sleepModifier) * 1000) // ms to us + t.Logf("==== test setup: runtime.NumCPU()=%v, sleepModifier=%v, singleConnectionSleepInterval=%v", runtime.NumCPU(), sleepModifier, singleConnectionSleepInterval) } func TestMain(m *testing.M) { From 60e39cb156b8255767809a18954fba6084d8b91e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 19 Oct 2023 10:28:06 +0300 Subject: [PATCH 4/7] improved modifiers Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vrepl_stress/onlineddl_vrepl_mini_stress_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index f00006a38b1..c7a4442902f 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -20,7 +20,6 @@ import ( "context" "flag" "fmt" - "math" "math/rand" "os" "path" @@ -175,12 +174,13 @@ func TestInitialSetup(t *testing.T) { vCPUs := runtime.NumCPU() maxConcurrency = vCPUs - sleepModifier := int(math.Max(float64(4-(vCPUs/4)), 1)) - singleConnectionSleepInterval = time.Duration((int(singleConnectionSleepInterval.Milliseconds()) * sleepModifier) * 1000) // ms to us + sleepModifier := 16.0 / float64(vCPUs) + singleConnectionSleepIntervalNanoseconds := float64(singleConnectionSleepInterval.Nanoseconds()) * sleepModifier + singleConnectionSleepInterval = time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) t.Logf("==== test setup: runtime.NumCPU()=%v, sleepModifier=%v, singleConnectionSleepInterval=%v", runtime.NumCPU(), sleepModifier, singleConnectionSleepInterval) } -func TestMain(m *testing.M) { +func _TestMain(m *testing.M) { defer cluster.PanicHandler(nil) flag.Parse() From b999acdac2ea08b572cf3d94c43c525d6db1275f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 19 Oct 2023 10:29:35 +0300 Subject: [PATCH 5/7] temporarily hide main tests Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index c7a4442902f..09c7cb88add 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -247,7 +247,7 @@ func _TestMain(m *testing.M) { } -func TestSchemaChange(t *testing.T) { +func _TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) ctx := context.Background() From bcfa7eacff5c3d7571885930f85870f7bd5325c5 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 19 Oct 2023 11:37:05 +0300 Subject: [PATCH 6/7] refactor Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl_vrepl_mini_stress_test.go | 41 ++++++------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 09c7cb88add..d64463ff92a 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -137,9 +137,7 @@ var ( ) var ( - maxConcurrency = 20 - singleConnectionSleepInterval = 2 * time.Millisecond - countIterations = 5 + countIterations = 5 ) const ( @@ -161,26 +159,7 @@ func nextOpOrder() int64 { return opOrder } -func TestInitialSetup(t *testing.T) { - // repo, ok := os.LookupEnv("GITHUB_REPOSITORY") // `ok` tells us the env variable exists, hence that we are running in GitHub CI. - // t.Logf("==== repo=%v", repo) - // if ok && repo != "vitessio/vitess" { - // // `vitessio/vitess` repository enjoys faster runners. Otherwise, GitHub CI has much slower runners - // // and we have to reduce the workload - // maxConcurrency = maxConcurrency / 2 - // singleConnectionSleepInterval = singleConnectionSleepInterval * 2 - // } - // t.Logf("==== test setup: maxConcurrency=%v, singleConnectionSleepInterval=%v", maxConcurrency, singleConnectionSleepInterval) - - vCPUs := runtime.NumCPU() - maxConcurrency = vCPUs - sleepModifier := 16.0 / float64(vCPUs) - singleConnectionSleepIntervalNanoseconds := float64(singleConnectionSleepInterval.Nanoseconds()) * sleepModifier - singleConnectionSleepInterval = time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) - t.Logf("==== test setup: runtime.NumCPU()=%v, sleepModifier=%v, singleConnectionSleepInterval=%v", runtime.NumCPU(), sleepModifier, singleConnectionSleepInterval) -} - -func _TestMain(m *testing.M) { +func TestMain(m *testing.M) { defer cluster.PanicHandler(nil) flag.Parse() @@ -247,7 +226,7 @@ func _TestMain(m *testing.M) { } -func _TestSchemaChange(t *testing.T) { +func TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) ctx := context.Background() @@ -517,7 +496,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } -func runSingleConnection(ctx context.Context, t *testing.T) { +func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) { log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -528,7 +507,7 @@ func runSingleConnection(ctx context.Context, t *testing.T) { _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) require.Nil(t, err) - ticker := time.NewTicker(singleConnectionSleepInterval) + ticker := time.NewTicker(sleepInterval) defer ticker.Stop() for { @@ -551,13 +530,19 @@ func runSingleConnection(ctx context.Context, t *testing.T) { } func runMultipleConnections(ctx context.Context, t *testing.T) { - log.Infof("Running multiple connections") + maxConcurrency := runtime.NumCPU() + sleepModifier := 16.0 / float64(maxConcurrency) + baseSleepInterval := 2 * time.Millisecond + singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier + sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) + + log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval) var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() - runSingleConnection(ctx, t) + runSingleConnection(ctx, t, sleepInterval) }() } wg.Wait() From 4fbf52ae9430bc3349a369ff31e0e97b0f62a662 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 19 Oct 2023 12:33:55 +0300 Subject: [PATCH 7/7] comment explanining the calculation Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vrepl_stress/onlineddl_vrepl_mini_stress_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index d64463ff92a..7f560a24f9e 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -530,6 +530,11 @@ func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.D } func runMultipleConnections(ctx context.Context, t *testing.T) { + // The workload for a 16 vCPU machine is: + // - Concurrency of 16 + // - 2ms interval between queries for each connection + // As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine + // we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms. maxConcurrency := runtime.NumCPU() sleepModifier := 16.0 / float64(maxConcurrency) baseSleepInterval := 2 * time.Millisecond