From 29970a6042d421aa5289f23a408c6650d2cb9031 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Tue, 24 Oct 2023 11:17:41 +0300 Subject: [PATCH 1/3] Cherry-pick 94572b3f0c42d18c01d365f30803e3472ea75e0d with conflicts --- .../onlineddl_vrepl_mini_stress_test.go | 54 +++++++++++++++++++ go/test/endtoend/onlineddl/vtgate_util.go | 15 ++++-- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index 7201fa70652..af321066d99 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -23,6 +23,7 @@ import ( "math/rand" "os" "path" + "runtime" "strings" "sync" "sync/atomic" @@ -135,12 +136,22 @@ var ( writeMetrics WriteMetrics ) +var ( + countIterations = 5 +) + const ( +<<<<<<< HEAD maxTableRows = 4096 maxConcurrency = 20 singleConnectionSleepInterval = 2 * time.Millisecond countIterations = 5 migrationWaitTimeout = 60 * time.Second +======= + maxTableRows = 4096 + workloadDuration = 5 * time.Second + migrationWaitTimeout = 60 * time.Second +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) ) func resetOpOrder() { @@ -371,6 +382,9 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + rowcount := 0 for { queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) @@ -381,7 +395,12 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri } select { +<<<<<<< HEAD case <-time.After(time.Second): +======= + case <-ticker.C: + continue // Keep looping +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) case <-ctx.Done(): break } @@ -480,7 +499,11 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } +<<<<<<< HEAD func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { +======= +func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) { +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -491,6 +514,9 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { _, err = conn.ExecuteFetch("set transaction isolation level read committed", 1000, true) require.Nil(t, err) + ticker := time.NewTicker(sleepInterval) + defer ticker.Stop() + for { if atomic.LoadInt64(done) == 1 { log.Infof("Terminating single connection") @@ -504,20 +530,48 @@ func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { case 2: err = generateDelete(t, conn) } +<<<<<<< HEAD +======= + select { + case <-ctx.Done(): + log.Infof("Terminating single connection") + return + case <-ticker.C: + } +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) assert.Nil(t, err) time.Sleep(singleConnectionSleepInterval) } } func runMultipleConnections(ctx context.Context, t *testing.T) { +<<<<<<< HEAD log.Infof("Running multiple connections") var done int64 +======= + // The workload for a 16 vCPU machine is: + // - Concurrency of 16 + // - 2ms interval between queries for each connection + // As the number of vCPUs decreases, so do we decrease concurrency, and increase intervals. For example, on a 8 vCPU machine + // we run concurrency of 8 and interval of 4ms. On a 4 vCPU machine we run concurrency of 4 and interval of 8ms. + maxConcurrency := runtime.NumCPU() + sleepModifier := 16.0 / float64(maxConcurrency) + baseSleepInterval := 2 * time.Millisecond + singleConnectionSleepIntervalNanoseconds := float64(baseSleepInterval.Nanoseconds()) * sleepModifier + sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) + + log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval) +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() +<<<<<<< HEAD runSingleConnection(ctx, t, &done) +======= + runSingleConnection(ctx, t, sleepInterval) +>>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) }() } <-ctx.Done() diff --git a/go/test/endtoend/onlineddl/vtgate_util.go b/go/test/endtoend/onlineddl/vtgate_util.go index 5052065082b..b93c43bb7eb 100644 --- a/go/test/endtoend/onlineddl/vtgate_util.go +++ b/go/test/endtoend/onlineddl/vtgate_util.go @@ -246,9 +246,13 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c for _, status := range expectStatuses { statusesMap[string(status)] = true } - startTime := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + lastKnownStatus := "" - for time.Since(startTime) < timeout { + for { countMatchedShards := 0 r := VtgateExecQuery(t, vtParams, query, "") for _, row := range r.Named().Rows { @@ -265,9 +269,12 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c if countMatchedShards == len(shards) { return schema.OnlineDDLStatus(lastKnownStatus) } - time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } } - return schema.OnlineDDLStatus(lastKnownStatus) } // CheckMigrationArtifacts verifies given migration exists, and checks if it has artifacts From b7c19893c2f2e6cde41cd64bdbd89fa211ae980b Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:24:24 +0300 Subject: [PATCH 2/3] resolved conflict Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl_vrepl_mini_stress_test.go | 70 +++++-------------- 1 file changed, 19 insertions(+), 51 deletions(-) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index af321066d99..d92e6ed95c1 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -26,7 +26,6 @@ import ( "runtime" "strings" "sync" - "sync/atomic" "testing" "time" @@ -141,17 +140,9 @@ var ( ) const ( -<<<<<<< HEAD - maxTableRows = 4096 - maxConcurrency = 20 - singleConnectionSleepInterval = 2 * time.Millisecond - countIterations = 5 - migrationWaitTimeout = 60 * time.Second -======= maxTableRows = 4096 workloadDuration = 5 * time.Second migrationWaitTimeout = 60 * time.Second ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) ) func resetOpOrder() { @@ -184,13 +175,10 @@ func TestMain(m *testing.M) { clusterInstance.VtctldExtraArgs = []string{ "--schema_change_dir", schemaChangeDirectory, "--schema_change_controller", "local", - "--schema_change_check_interval", "1", + "--schema_change_check_interval", "1s", } clusterInstance.VtTabletExtraArgs = []string{ - "--enable-lag-throttler", - "--throttle_threshold", "1s", - "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", @@ -231,15 +219,15 @@ func TestMain(m *testing.M) { if err != nil { fmt.Printf("%v\n", err) os.Exit(1) - } else { - os.Exit(exitcode) } - + os.Exit(exitcode) } func TestSchemaChange(t *testing.T) { defer cluster.PanicHandler(t) + ctx := context.Background() + shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) @@ -262,16 +250,17 @@ func TestSchemaChange(t *testing.T) { // that our testing/metrics logic is sound in the first place. testName := fmt.Sprintf("workload without ALTER TABLE %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) initTable(t) + + ctx, cancel := context.WithTimeout(ctx, workloadDuration) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() runMultipleConnections(ctx, t) }() - time.Sleep(5 * time.Second) - cancel() // will cause runMultipleConnections() to terminate wg.Wait() testSelectTableMetrics(t) }) @@ -296,7 +285,7 @@ func TestSchemaChange(t *testing.T) { // the vreplication/ALTER TABLE did not corrupt our data and we are happy. testName := fmt.Sprintf("ALTER TABLE with workload %d/%d", (i + 1), countIterations) t.Run(testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() t.Run("create schema", func(t *testing.T) { testWithInitialSchema(t) }) @@ -304,6 +293,9 @@ func TestSchemaChange(t *testing.T) { initTable(t) }) t.Run("migrate", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup wg.Add(1) go func() { @@ -313,7 +305,7 @@ func TestSchemaChange(t *testing.T) { hint := fmt.Sprintf("hint-alter-with-workload-%d", i) uuid := testOnlineDDLStatement(t, fmt.Sprintf(alterHintStatement, hint), onlineDDLStrategy, "vtgate", hint) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) - cancel() // will cause runMultipleConnections() to terminate + cancel() // Now that the migration is complete, we can stop the workload. wg.Wait() }) t.Run("validate metrics", func(t *testing.T) { @@ -386,6 +378,7 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri defer ticker.Stop() rowcount := 0 + for { queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true) require.Nil(t, err) @@ -395,16 +388,15 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri } select { -<<<<<<< HEAD - case <-time.After(time.Second): -======= case <-ticker.C: continue // Keep looping ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) case <-ctx.Done(): - break + // Break below to the assertion } + + break } + assert.Equal(t, expectCount, rowcount) } @@ -499,11 +491,7 @@ func generateDelete(t *testing.T, conn *mysql.Conn) error { return err } -<<<<<<< HEAD -func runSingleConnection(ctx context.Context, t *testing.T, done *int64) { -======= func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.Duration) { ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) log.Infof("Running single connection") conn, err := mysql.Connect(ctx, &vtParams) require.Nil(t, err) @@ -518,10 +506,6 @@ func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.D defer ticker.Stop() for { - if atomic.LoadInt64(done) == 1 { - log.Infof("Terminating single connection") - return - } switch rand.Int31n(3) { case 0: err = generateInsert(t, conn) @@ -530,25 +514,17 @@ func runSingleConnection(ctx context.Context, t *testing.T, sleepInterval time.D case 2: err = generateDelete(t, conn) } -<<<<<<< HEAD -======= select { case <-ctx.Done(): log.Infof("Terminating single connection") return case <-ticker.C: } ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) assert.Nil(t, err) - time.Sleep(singleConnectionSleepInterval) } } func runMultipleConnections(ctx context.Context, t *testing.T) { -<<<<<<< HEAD - log.Infof("Running multiple connections") - var done int64 -======= // The workload for a 16 vCPU machine is: // - Concurrency of 16 // - 2ms interval between queries for each connection @@ -561,24 +537,16 @@ func runMultipleConnections(ctx context.Context, t *testing.T) { sleepInterval := time.Duration(int64(singleConnectionSleepIntervalNanoseconds)) log.Infof("Running multiple connections: maxConcurrency=%v, sleep interval=%v", maxConcurrency, sleepInterval) ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) var wg sync.WaitGroup for i := 0; i < maxConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() -<<<<<<< HEAD - runSingleConnection(ctx, t, &done) -======= runSingleConnection(ctx, t, sleepInterval) ->>>>>>> 94572b3f0c (OnlineDDL: reduce vrepl_stress workload in forks (#14302)) }() } - <-ctx.Done() - atomic.StoreInt64(&done, 1) - log.Infof("Running multiple connections: done") wg.Wait() - log.Infof("All connections cancelled") + log.Infof("Running multiple connections: done") } func initTable(t *testing.T) { From fc143663ead87592a83af5673f8c46cdc8cd6226 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:25:48 +0300 Subject: [PATCH 3/3] throttler flags Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go index d92e6ed95c1..9af1a2ba5ab 100644 --- a/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go +++ b/go/test/endtoend/onlineddl/vrepl_stress/onlineddl_vrepl_mini_stress_test.go @@ -179,6 +179,9 @@ func TestMain(m *testing.M) { } clusterInstance.VtTabletExtraArgs = []string{ + "--enable-lag-throttler", + "--throttle_threshold", "1s", + "--heartbeat_enable", "--heartbeat_interval", "250ms", "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s",