From 3d620ab5da5835b1b0eef2f6705a8d1880f5f18e Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Tue, 19 Sep 2023 19:40:16 +0300 Subject: [PATCH] Cherry-pick 0461fafbd22f09668f6c2c327a7af0783e07cc92 with conflicts --- .../scheduler/onlineddl_scheduler_test.go | 1306 +++++++++++++++++ go/vt/schema/ddl_strategy.go | 87 ++ go/vt/schema/ddl_strategy_test.go | 182 +++ go/vt/vttablet/onlineddl/executor.go | 5 + go/vt/vttablet/onlineddl/schema.go | 2 +- 5 files changed, 1581 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index d5d7cffab08..ee7fbfb4f7c 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -649,6 +649,1312 @@ func TestSchemaChange(t *testing.T) { t.Run("summary: validate completed_timestamp", func(t *testing.T) { onlineddl.ValidateCompletedTimestamp(t, &vtParams) }) +<<<<<<< HEAD +======= + + t.Run("Cleanup artifacts", func(t *testing.T) { + // Create a migration with a low --retain-artifacts value. + // We will cancel the migration and expect the artifact to be cleaned. + t.Run("start migration", func(t *testing.T) { + t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion --retain-artifacts=1s", "vtctl", "", "", true)) // skip wait + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning) + }) + var artifacts []string + t.Run("validate artifact exists", func(t *testing.T) { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + row := rs.Named().Row() + require.NotNil(t, row) + + artifacts = textutil.SplitDelimitedList(row.AsString("artifacts", "")) + assert.NotEmpty(t, artifacts) + assert.Equal(t, 1, len(artifacts)) + checkTable(t, artifacts[0], true) + + retainArtifactsSeconds := row.AsInt64("retain_artifacts_seconds", 0) + assert.Equal(t, int64(1), retainArtifactsSeconds) // due to --retain-artifacts=1s + }) + t.Run("cancel migration", func(t *testing.T) { + onlineddl.CheckCancelMigration(t, &vtParams, shards, t1uuid, true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusCancelled) + }) + t.Run("wait for cleanup", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), normalWaitTime) + defer cancel() + + for { + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + row := rs.Named().Row() + require.NotNil(t, row) + if !row["cleanup_timestamp"].IsNull() { + // This is what we've been waiting for + break + } + select { + case <-ctx.Done(): + assert.Fail(t, "timeout waiting for cleanup") + return + case <-time.After(time.Second): + } + } + }) + t.Run("validate artifact does not exist", func(t *testing.T) { + checkTable(t, artifacts[0], false) + }) + }) + + // INSTANT DDL + instantDDLCapable, err := capableOf(mysql.InstantAddLastColumnFlavorCapability) + require.NoError(t, err) + if instantDDLCapable { + t.Run("INSTANT DDL: postpone-completion", func(t *testing.T) { + t1uuid := testOnlineDDLStatement(t, createParams(instantAlterT1Statement, ddlStrategy+" --prefer-instant-ddl --postpone-completion", "vtgate", "", "", true)) + + t.Run("expect t1 queued", func(t *testing.T) { + // we want to validate that the migration remains queued even after some time passes. It must not move beyond 'queued' + time.Sleep(ensureStateNotChangedTime) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + }) + t.Run("complete t1", func(t *testing.T) { + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + }) + } + // 'mysql' strategy + t.Run("mysql strategy", func(t *testing.T) { + t.Run("declarative", func(t *testing.T) { + t1uuid = testOnlineDDLStatement(t, createParams(createT1Statement, "mysql --declarative", "vtgate", "just-created", "", false)) + + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + checkTable(t, t1Name, true) + }) + + t.Run("fail postpone-completion", func(t *testing.T) { + t1uuid := testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, "mysql --postpone-completion", "vtgate", "", "", true)) + + // --postpone-completion not supported in mysql strategy + time.Sleep(ensureStateNotChangedTime) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusFailed) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusFailed) + }) + t.Run("trivial", func(t *testing.T) { + t1uuid := testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, "mysql", "vtgate", "", "", true)) + + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + + rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + artifacts := row.AsString("artifacts", "-") + assert.Empty(t, artifacts) + } + }) + t.Run("instant", func(t *testing.T) { + t1uuid := testOnlineDDLStatement(t, createParams(instantAlterT1Statement, "mysql", "vtgate", "", "", true)) + + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + }) + // in-order-completion + t.Run("in-order-completion: multiple drops for nonexistent tables and views", func(t *testing.T) { + u, err := schema.CreateOnlineDDLUUID() + require.NoError(t, err) + + sqls := []string{ + fmt.Sprintf("drop table if exists t4_%s", u), + fmt.Sprintf("drop view if exists t1_%s", u), + fmt.Sprintf("drop table if exists t2_%s", u), + fmt.Sprintf("drop view if exists t3_%s", u), + } + sql := strings.Join(sqls, ";") + var vuuids []string + t.Run("drop multiple tables and views, in-order-completion", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --allow-concurrent --in-order-completion", "vtctl", "", "", true)) // skip wait + vuuids = strings.Split(uuidList, "\n") + assert.Equal(t, 4, len(vuuids)) + for _, uuid := range vuuids { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } + }) + require.Equal(t, 4, len(vuuids)) + for i := range vuuids { + if i > 0 { + testTableCompletionTimes(t, vuuids[i-1], vuuids[i]) + } + } + }) + t.Run("in-order-completion: two new views, one depends on the other", func(t *testing.T) { + u, err := schema.CreateOnlineDDLUUID() + require.NoError(t, err) + v2name := fmt.Sprintf("v2_%s", u) + createv2 := fmt.Sprintf("create view %s as select id from t1_test", v2name) + v1name := fmt.Sprintf("v1_%s", u) + createv1 := fmt.Sprintf("create view %s as select id from %s", v1name, v2name) + + sql := fmt.Sprintf("%s; %s;", createv2, createv1) + var vuuids []string + t.Run("create two views, expect both complete", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(sql, ddlStrategy+" --allow-concurrent --in-order-completion", "vtctl", "", "", true)) // skip wait + vuuids = strings.Split(uuidList, "\n") + assert.Equal(t, 2, len(vuuids)) + for _, uuid := range vuuids { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } + }) + require.Equal(t, 2, len(vuuids)) + testTableCompletionTimes(t, vuuids[0], vuuids[1]) + }) + t.Run("in-order-completion: new table column, new view depends on said column", func(t *testing.T) { + // The VIEW creation can only succeed when the ALTER has completed and the table has the new column + t1uuid = testOnlineDDLStatement(t, createParams(alterExtraColumn, ddlStrategy+" --allow-concurrent --postpone-completion --in-order-completion", "vtctl", "", "", true)) // skip wait + v1uuid := testOnlineDDLStatement(t, createParams(createViewDependsOnExtraColumn, ddlStrategy+" --allow-concurrent --postpone-completion --in-order-completion", "vtctl", "", "", true)) // skip wait + + testAllowConcurrent(t, "t1", t1uuid, 1) + testAllowConcurrent(t, "v1", v1uuid, 1) + t.Run("expect table running, expect view ready", func(t *testing.T) { + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, v1uuid, normalWaitTime, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + time.Sleep(ensureStateNotChangedTime) + // nothing should change + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, v1uuid, normalWaitTime, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + }) + t.Run("complete both", func(t *testing.T) { + onlineddl.CheckCompleteAllMigrations(t, &vtParams, len(shards)*2) + }) + t.Run("expect table success", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("expect view success", func(t *testing.T) { + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, v1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, v1uuid, schema.OnlineDDLStatusComplete) + }) + testTableCompletionTimes(t, t1uuid, v1uuid) + }) +} + +func testSingleton(t *testing.T) { + defer cluster.PanicHandler(t) + shards = clusterInstance.Keyspaces[0].Shards + require.Equal(t, 1, len(shards)) + + createParams := func(ddlStatement string, ddlStrategy string, executeStrategy string, migrationContext string, expectHint string, expectError string, skipWait bool) *testOnlineDDLStatementParams { + return &testOnlineDDLStatementParams{ + ddlStatement: ddlStatement, + ddlStrategy: ddlStrategy, + executeStrategy: executeStrategy, + migrationContext: migrationContext, + expectHint: expectHint, + expectError: expectError, + skipWait: skipWait, + } + } + + createRevertParams := func(revertUUID string, ddlStrategy string, executeStrategy string, migrationContext string, expectError string, skipWait bool) *testRevertMigrationParams { + return &testRevertMigrationParams{ + revertUUID: revertUUID, + executeStrategy: executeStrategy, + ddlStrategy: ddlStrategy, + migrationContext: migrationContext, + expectError: expectError, + skipWait: skipWait, + } + } + + var ( + tableName = `stress_test` + onlineSingletonDDLStrategy = "online --singleton" + onlineSingletonContextDDLStrategy = "online --singleton-context" + createStatement = ` + CREATE TABLE stress_test ( + id bigint(20) not null, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default 'just-created', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key created_idx(created_timestamp), + key updates_idx(updates) + ) ENGINE=InnoDB + ` + // We will run this query with "gh-ost --max-load=Threads_running=1" + alterTableThrottlingStatement = ` + ALTER TABLE stress_test DROP COLUMN created_timestamp + ` + multiAlterTableThrottlingStatement = ` + ALTER TABLE stress_test ENGINE=InnoDB; + ALTER TABLE stress_test ENGINE=InnoDB; + ALTER TABLE stress_test ENGINE=InnoDB; + ` + // A trivial statement which must succeed and does not change the schema + alterTableTrivialStatement = ` + ALTER TABLE stress_test ENGINE=InnoDB + ` + dropStatement = ` + DROP TABLE stress_test +` + dropIfExistsStatement = ` +DROP TABLE IF EXISTS stress_test +` + dropNonexistentTableStatement = ` + DROP TABLE IF EXISTS t_non_existent + ` + multiDropStatements = `DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; DROP TABLE IF EXISTS t3;` + ) + + var uuids []string + // init-cleanup + t.Run("init DROP TABLE", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(dropIfExistsStatement, onlineSingletonDDLStrategy, "vtgate", "", "", "", false)) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + + // CREATE + t.Run("CREATE TABLE", func(t *testing.T) { + // The table does not exist + uuid := testOnlineDDLStatement(t, createParams(createStatement, onlineSingletonDDLStrategy, "vtgate", "", "", "", false)) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + t.Run("revert CREATE TABLE", func(t *testing.T) { + // The table existed, so it will now be dropped (renamed) + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1], onlineSingletonDDLStrategy, "vtgate", "", "", false)) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("revert revert CREATE TABLE", func(t *testing.T) { + // Table was dropped (renamed) so it will now be restored + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1], onlineSingletonDDLStrategy, "vtgate", "", "", false)) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + + var throttledUUID string + t.Run("throttled migration", func(t *testing.T) { + throttledUUID = testOnlineDDLStatement(t, createParams(alterTableThrottlingStatement, "gh-ost --singleton --max-load=Threads_running=1", "vtgate", "", "hint_col", "", false)) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, throttledUUID, schema.OnlineDDLStatusRunning) + }) + t.Run("failed singleton migration, vtgate", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(alterTableThrottlingStatement, "gh-ost --singleton --max-load=Threads_running=1", "vtgate", "", "hint_col", "rejected", true)) + assert.Empty(t, uuid) + }) + t.Run("failed singleton migration, vtctl", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(alterTableThrottlingStatement, "gh-ost --singleton --max-load=Threads_running=1", "vtctl", "", "hint_col", "rejected", true)) + assert.Empty(t, uuid) + }) + t.Run("failed revert migration", func(t *testing.T) { + uuid := testRevertMigration(t, createRevertParams(throttledUUID, onlineSingletonDDLStrategy, "vtgate", "", "rejected", true)) + assert.Empty(t, uuid) + }) + t.Run("terminate throttled migration", func(t *testing.T) { + onlineddl.CheckMigrationStatus(t, &vtParams, shards, throttledUUID, schema.OnlineDDLStatusRunning) + onlineddl.CheckCancelMigration(t, &vtParams, shards, throttledUUID, true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, throttledUUID, 20*time.Second, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, throttledUUID, schema.OnlineDDLStatusCancelled) + }) + t.Run("successful gh-ost alter, vtctl", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(alterTableTrivialStatement, "gh-ost --singleton", "vtctl", "", "hint_col", "", false)) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) + onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false) + }) + t.Run("successful gh-ost alter, vtgate", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(alterTableTrivialStatement, "gh-ost --singleton", "vtgate", "", "hint_col", "", false)) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) + onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false) + }) + + t.Run("successful online alter, vtgate", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(alterTableTrivialStatement, onlineSingletonDDLStrategy, "vtgate", "", "hint_col", "", false)) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false) + onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false) + checkTable(t, tableName, true) + }) + t.Run("revert ALTER TABLE, vttablet", func(t *testing.T) { + // The table existed, so it will now be dropped (renamed) + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1], onlineSingletonDDLStrategy, "vtctl", "", "", false)) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + + var throttledUUIDs []string + // singleton-context + t.Run("throttled migrations, singleton-context", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(multiAlterTableThrottlingStatement, "gh-ost --singleton-context --max-load=Threads_running=1", "vtctl", "", "hint_col", "", false)) + throttledUUIDs = strings.Split(uuidList, "\n") + assert.Equal(t, 3, len(throttledUUIDs)) + for _, uuid := range throttledUUIDs { + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady, schema.OnlineDDLStatusRunning) + } + }) + t.Run("failed migrations, singleton-context", func(t *testing.T) { + _ = testOnlineDDLStatement(t, createParams(multiAlterTableThrottlingStatement, "gh-ost --singleton-context --max-load=Threads_running=1", "vtctl", "", "hint_col", "rejected", false)) + }) + t.Run("terminate throttled migrations", func(t *testing.T) { + for _, uuid := range throttledUUIDs { + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady, schema.OnlineDDLStatusRunning) + onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true) + } + time.Sleep(2 * time.Second) + for _, uuid := range throttledUUIDs { + uuid = strings.TrimSpace(uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + } + }) + + t.Run("successful multiple statement, singleton-context, vtctl", func(t *testing.T) { + uuidList := testOnlineDDLStatement(t, createParams(multiDropStatements, onlineSingletonContextDDLStrategy, "vtctl", "", "", "", false)) + uuidSlice := strings.Split(uuidList, "\n") + assert.Equal(t, 3, len(uuidSlice)) + for _, uuid := range uuidSlice { + uuid = strings.TrimSpace(uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } + }) + + //DROP + + t.Run("online DROP TABLE", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(dropStatement, onlineSingletonDDLStrategy, "vtgate", "", "", "", false)) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("revert DROP TABLE", func(t *testing.T) { + // This will recreate the table (well, actually, rename it back into place) + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1], onlineSingletonDDLStrategy, "vttablet", "", "", false)) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + }) + + t.Run("fail concurrent singleton, vtgate", func(t *testing.T) { + uuid := testOnlineDDLStatement(t, createParams(alterTableTrivialStatement, "vitess --postpone-completion --singleton", "vtgate", "", "hint_col", "", true)) + uuids = append(uuids, uuid) + _ = testOnlineDDLStatement(t, createParams(dropNonexistentTableStatement, "vitess --singleton", "vtgate", "", "hint_col", "rejected", true)) + onlineddl.CheckCompleteAllMigrations(t, &vtParams, len(shards)) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 20*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("fail concurrent singleton-context with revert", func(t *testing.T) { + revertUUID := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1], "vitess --allow-concurrent --postpone-completion --singleton-context", "vtctl", "rev:ctx", "", false)) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertUUID, 20*time.Second, schema.OnlineDDLStatusRunning) + // revert is running + _ = testOnlineDDLStatement(t, createParams(dropNonexistentTableStatement, "vitess --allow-concurrent --singleton-context", "vtctl", "migrate:ctx", "", "rejected", true)) + onlineddl.CheckCancelMigration(t, &vtParams, shards, revertUUID, true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertUUID, 20*time.Second, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, revertUUID, schema.OnlineDDLStatusCancelled) + }) + t.Run("success concurrent singleton-context with no-context revert", func(t *testing.T) { + revertUUID := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1], "vitess --allow-concurrent --postpone-completion", "vtctl", "rev:ctx", "", false)) + onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertUUID, 20*time.Second, schema.OnlineDDLStatusRunning) + // revert is running but has no --singleton-context. Our next migration should be able to run. + uuid := testOnlineDDLStatement(t, createParams(dropNonexistentTableStatement, "vitess --allow-concurrent --singleton-context", "vtctl", "migrate:ctx", "", "", false)) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckCancelMigration(t, &vtParams, shards, revertUUID, true) + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, revertUUID, 20*time.Second, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, revertUUID, schema.OnlineDDLStatusCancelled) + }) +} +func testDeclarative(t *testing.T) { + defer cluster.PanicHandler(t) + shards = clusterInstance.Keyspaces[0].Shards + require.Equal(t, 1, len(shards)) + + var ( + tableName = `stress_test` + viewBaseTableName = `view_base_table_test` + viewName = `view_test` + migrationContext = "1111-2222-3333" + createStatement1 = ` + CREATE TABLE stress_test ( + id bigint(20) not null, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default 'create1', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key created_idx(created_timestamp), + key updates_idx(updates) + ) ENGINE=InnoDB + ` + createStatement2 = ` + CREATE TABLE stress_test ( + id bigint(20) not null, + rand_val varchar(32) null default '', + hint_col varchar(64) not null default 'create2', + created_timestamp timestamp not null default current_timestamp, + updates int unsigned not null default 0, + PRIMARY KEY (id), + key created_idx(created_timestamp), + key updates_idx(updates) + ) ENGINE=InnoDB + ` + createIfNotExistsStatement = ` + CREATE TABLE IF NOT EXISTS stress_test ( + id bigint(20) not null, + PRIMARY KEY (id) + ) ENGINE=InnoDB + ` + createStatementZeroDate = ` + CREATE TABLE zerodate_test ( + id bigint(20) not null, + hint_col varchar(64) not null default 'create_with_zero', + zero_datetime datetime NOT NULL DEFAULT '0000-00-00 00:00:00', + PRIMARY KEY (id) + ) ENGINE=InnoDB + ` + createStatementZeroDate2 = ` + CREATE TABLE zerodate_test ( + id bigint(20) not null, + i int not null default 0, + hint_col varchar(64) not null default 'create_with_zero2', + zero_datetime datetime NOT NULL DEFAULT '0000-00-00 00:00:00', + zero_datetime2 datetime NOT NULL DEFAULT '0000-00-00 00:00:00', + PRIMARY KEY (id) + ) ENGINE=InnoDB + ` + dropZeroDateStatement = ` + DROP TABLE zerodate_test + ` + dropStatement = ` + DROP TABLE stress_test + ` + dropIfExistsStatement = ` + DROP TABLE IF EXISTS stress_test + ` + alterStatement = ` + ALTER TABLE stress_test modify hint_col varchar(64) not null default 'this-should-fail' + ` + trivialAlterStatement = ` + ALTER TABLE stress_test ENGINE=InnoDB + ` + dropViewBaseTableStatement = ` + DROP TABLE IF EXISTS view_base_table_test + ` + createViewBaseTableStatement = ` + CREATE TABLE view_base_table_test (id INT PRIMARY KEY) + ` + createViewStatement1 = ` + CREATE VIEW view_test AS SELECT 'success_create1' AS msg FROM view_base_table_test + ` + createViewStatement2 = ` + CREATE VIEW view_test AS SELECT 'success_create2' AS msg FROM view_base_table_test + ` + createOrReplaceViewStatement = ` + CREATE OR REPLACE VIEW view_test AS SELECT 'success_replace' AS msg FROM view_base_table_test + ` + alterViewStatement = ` + ALTER VIEW view_test AS SELECT 'success_alter' AS msg FROM view_base_table_test + ` + dropViewStatement = ` + DROP VIEW view_test + ` + dropViewIfExistsStatement = ` + DROP VIEW IF EXISTS view_test + ` + insertRowStatement = ` + INSERT IGNORE INTO stress_test (id, rand_val) VALUES (%d, left(md5(rand()), 8)) + ` + updateRowStatement = ` + UPDATE stress_test SET updates=updates+1 WHERE id=%d + ` + deleteRowStatement = ` + DELETE FROM stress_test WHERE id=%d AND updates=1 + ` + // We use CAST(SUM(updates) AS SIGNED) because SUM() returns a DECIMAL datatype, and we want to read a SIGNED INTEGER type + selectCountRowsStatement = ` + SELECT COUNT(*) AS num_rows, CAST(SUM(updates) AS SIGNED) AS sum_updates FROM stress_test + ` + truncateStatement = ` + TRUNCATE TABLE stress_test + ` + writeMetrics WriteMetrics + maxTableRows = 4096 + ) + + declarativeStrategy := "online -declarative" + var uuids []string + + generateInsert := func(t *testing.T, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(insertRowStatement, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.insertsAttempts++ + if err != nil { + writeMetrics.insertsFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.insertsNoops++ + return + } + writeMetrics.inserts++ + }() + return err + } + + generateUpdate := func(t *testing.T, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(updateRowStatement, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.updatesAttempts++ + if err != nil { + writeMetrics.updatesFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.updatesNoops++ + return + } + writeMetrics.updates++ + }() + return err + } + + generateDelete := func(t *testing.T, conn *mysql.Conn) error { + id := rand.Int31n(int32(maxTableRows)) + query := fmt.Sprintf(deleteRowStatement, id) + qr, err := conn.ExecuteFetch(query, 1000, true) + + func() { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + writeMetrics.deletesAttempts++ + if err != nil { + writeMetrics.deletesFailures++ + return + } + assert.Less(t, qr.RowsAffected, uint64(2)) + if qr.RowsAffected == 0 { + writeMetrics.deletesNoops++ + return + } + writeMetrics.deletes++ + }() + return err + } + + initTable := func(t *testing.T) { + log.Infof("initTable begin") + defer log.Infof("initTable complete") + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + writeMetrics.Clear() + _, err = conn.ExecuteFetch(truncateStatement, 1000, true) + require.Nil(t, err) + + for i := 0; i < maxTableRows/2; i++ { + generateInsert(t, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateUpdate(t, conn) + } + for i := 0; i < maxTableRows/4; i++ { + generateDelete(t, conn) + } + } + + testSelectTableMetrics := func(t *testing.T) { + writeMetrics.mu.Lock() + defer writeMetrics.mu.Unlock() + + log.Infof("%s", writeMetrics.String()) + + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.Nil(t, err) + defer conn.Close() + + rs, err := conn.ExecuteFetch(selectCountRowsStatement, 1000, true) + require.Nil(t, err) + + row := rs.Named().Row() + require.NotNil(t, row) + log.Infof("testSelectTableMetrics, row: %v", row) + numRows := row.AsInt64("num_rows", 0) + sumUpdates := row.AsInt64("sum_updates", 0) + + assert.NotZero(t, numRows) + assert.NotZero(t, sumUpdates) + assert.NotZero(t, writeMetrics.inserts) + assert.NotZero(t, writeMetrics.deletes) + assert.NotZero(t, writeMetrics.updates) + assert.Equal(t, writeMetrics.inserts-writeMetrics.deletes, numRows) + assert.Equal(t, writeMetrics.updates-writeMetrics.deletes, sumUpdates) // because we DELETE WHERE updates=1 + } + + testOnlineDDL := func(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectHint string, expectError string) (uuid string) { + params := &testOnlineDDLStatementParams{ + ddlStatement: alterStatement, + ddlStrategy: ddlStrategy, + executeStrategy: executeStrategy, + expectHint: expectHint, + expectError: expectError, + } + if executeStrategy != "vtgate" { + params.migrationContext = migrationContext + } + return testOnlineDDLStatement(t, params) + } + createRevertParams := func(revertUUID string) *testRevertMigrationParams { + return &testRevertMigrationParams{ + revertUUID: revertUUID, + executeStrategy: "vtctl", + ddlStrategy: string(schema.DDLStrategyOnline), + } + } + + // init-cleaup + t.Run("init: drop table", func(t *testing.T) { + // IF EXISTS is not supported in -declarative + uuid := testOnlineDDL(t, dropIfExistsStatement, "online", "vtgate", "", "") + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + t.Run("init: drop view base table", func(t *testing.T) { + // IF EXISTS is not supported in -declarative + uuid := testOnlineDDL(t, dropViewBaseTableStatement, "online", "vtgate", "", "") + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + + // VIEWS + t.Run("create base table for view", func(t *testing.T) { + uuid := testOnlineDDL(t, createViewBaseTableStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, viewBaseTableName, true) + }) + // CREATE VIEW 1 + t.Run("declarative CREATE VIEW where table does not exist", func(t *testing.T) { + // The table does not exist + uuid := testOnlineDDL(t, createViewStatement1, declarativeStrategy, "vtgate", "success_create1", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, viewName, true) + }) + // CREATE VIEW 1 again, noop + t.Run("declarative CREATE VIEW with no changes where view exists", func(t *testing.T) { + // The exists with exact same schema + uuid := testOnlineDDL(t, createViewStatement1, declarativeStrategy, "vtgate", "success_create1", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, false) + checkTable(t, viewName, true) + }) + t.Run("revert CREATE VIEW expecting noop", func(t *testing.T) { + // Reverting a noop changes nothing + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkMigratedTable(t, viewName, "success_create1") + checkTable(t, viewName, true) + }) + // CREATE OR REPLACE VIEW + t.Run("CREATE OR REPLACE VIEW expecting failure", func(t *testing.T) { + // IF NOT EXISTS is not supported in -declarative + uuid := testOnlineDDL(t, createOrReplaceViewStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + checkMigratedTable(t, viewName, "success_create1") + checkTable(t, viewName, true) + }) + t.Run("ALTER VIEW expecting failure", func(t *testing.T) { + // IF NOT EXISTS is not supported in -declarative + uuid := testOnlineDDL(t, alterViewStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + checkMigratedTable(t, viewName, "success_create1") + checkTable(t, viewName, true) + }) + t.Run("DROP VIEW IF EXISTS expecting failure", func(t *testing.T) { + // IF NOT EXISTS is not supported in -declarative + uuid := testOnlineDDL(t, dropViewIfExistsStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + checkMigratedTable(t, viewName, "success_create1") + checkTable(t, viewName, true) + }) + t.Run("declarative DROP VIEW", func(t *testing.T) { + uuid := testOnlineDDL(t, dropViewStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, viewName, false) + }) + // View dropped. Let's start afresh. + + // CREATE VIEW1 + t.Run("declarative CREATE VIEW where view does not exist", func(t *testing.T) { + // The table does not exist + uuid := testOnlineDDL(t, createViewStatement1, declarativeStrategy, "vtgate", "success_create1", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, viewName, true) + }) + // CREATE VIEW2: Change view + t.Run("declarative CREATE VIEW with changes where view exists", func(t *testing.T) { + // The table exists with different schema + uuid := testOnlineDDL(t, createViewStatement2, declarativeStrategy, "vtgate", "success_create2", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, viewName, true) + }) + t.Run("revert CREATE VIEW expecting previous schema", func(t *testing.T) { + // Reverting back to 1st version + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkMigratedTable(t, viewName, "success_create1") + checkTable(t, viewName, true) + }) + t.Run("declarative DROP VIEW", func(t *testing.T) { + // Table exists + uuid := testOnlineDDL(t, dropViewStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, viewName, false) + }) + t.Run("revert DROP VIEW", func(t *testing.T) { + // This will recreate the table (well, actually, rename it back into place) + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, viewName, true) + checkMigratedTable(t, viewName, "success_create1") + }) + t.Run("revert revert DROP VIEW", func(t *testing.T) { + // This will reapply DROP VIEW + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, viewName, false) + }) + t.Run("declarative DROP VIEW where view does not exist", func(t *testing.T) { + uuid := testOnlineDDL(t, dropViewStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, false) + checkTable(t, viewName, false) + }) + t.Run("revert DROP VIEW where view did not exist", func(t *testing.T) { + // Table will not be recreated because it didn't exist during the previous DROP VIEW + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, viewName, false) + }) + // View dropped. Let's start afresh. + + // TABLES + + // CREATE1 + t.Run("declarative CREATE TABLE where table does not exist", func(t *testing.T) { + // The table does not exist + uuid := testOnlineDDL(t, createStatement1, declarativeStrategy, "vtgate", "create1", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, tableName, true) + initTable(t) + testSelectTableMetrics(t) + }) + // CREATE1 again, noop + t.Run("declarative CREATE TABLE with no changes where table exists", func(t *testing.T) { + // The exists with exact same schema + uuid := testOnlineDDL(t, createStatement1, declarativeStrategy, "vtgate", "create1", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, false) + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("revert CREATE TABLE expecting noop", func(t *testing.T) { + // Reverting a noop changes nothing + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkMigratedTable(t, tableName, "create1") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("declarative DROP TABLE", func(t *testing.T) { + uuid := testOnlineDDL(t, dropStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, tableName, false) + }) + // Table dropped. Let's start afresh. + + // CREATE1 + t.Run("declarative CREATE TABLE where table does not exist", func(t *testing.T) { + // The table does not exist + uuid := testOnlineDDL(t, createStatement1, declarativeStrategy, "vtgate", "create1", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, tableName, true) + initTable(t) + testSelectTableMetrics(t) + }) + // CREATE2: Change schema + t.Run("declarative CREATE TABLE with changes where table exists", func(t *testing.T) { + // The table exists with different schema + uuid := testOnlineDDL(t, createStatement2, declarativeStrategy, "vtgate", "create2", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("revert CREATE TABLE expecting previous schema", func(t *testing.T) { + // Reverting back to 1st version + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkMigratedTable(t, tableName, "create1") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("declarative DROP TABLE", func(t *testing.T) { + // Table exists + uuid := testOnlineDDL(t, dropStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, tableName, false) + }) + t.Run("revert DROP TABLE", func(t *testing.T) { + // This will recreate the table (well, actually, rename it back into place) + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, true) + checkMigratedTable(t, tableName, "create1") + testSelectTableMetrics(t) + }) + t.Run("revert revert DROP TABLE", func(t *testing.T) { + // This will reapply DROP TABLE + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + t.Run("declarative DROP TABLE where table does not exist", func(t *testing.T) { + uuid := testOnlineDDL(t, dropStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, false) + checkTable(t, tableName, false) + }) + t.Run("revert DROP TABLE where table did not exist", func(t *testing.T) { + // Table will not be recreated because it didn't exist during the previous DROP TABLE + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, tableName, false) + }) + // Table dropped. Let's start afresh. + + // CREATE1 + t.Run("declarative CREATE TABLE where table does not exist", func(t *testing.T) { + // The table does not exist + uuid := testOnlineDDL(t, createStatement1, declarativeStrategy, "vtgate", "create1", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, tableName, true) + initTable(t) + testSelectTableMetrics(t) + }) + // CREATE2 + t.Run("declarative CREATE TABLE with changes where table exists", func(t *testing.T) { + // The table exists but with different schema + uuid := testOnlineDDL(t, createStatement2, declarativeStrategy, "vtgate", "create2", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + // CREATE1 again + t.Run("declarative CREATE TABLE again with changes where table exists", func(t *testing.T) { + // The table exists but with different schema + uuid := testOnlineDDL(t, createStatement1, declarativeStrategy, "vtgate", "create1", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + onlineddl.CheckMigrationArtifacts(t, &vtParams, shards, uuid, true) + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("revert CREATE TABLE expecting previous schema", func(t *testing.T) { + // Reverting back to previous version + uuid := testRevertMigration(t, createRevertParams(uuids[len(uuids)-1])) + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkMigratedTable(t, tableName, "create2") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("ALTER TABLE expecting failure", func(t *testing.T) { + // ALTER is not supported in -declarative + uuid := testOnlineDDL(t, alterStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + checkMigratedTable(t, tableName, "create2") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("CREATE TABLE IF NOT EXISTS expecting failure", func(t *testing.T) { + // IF NOT EXISTS is not supported in -declarative + uuid := testOnlineDDL(t, createIfNotExistsStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + checkMigratedTable(t, tableName, "create2") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("DROP TABLE IF EXISTS expecting failure", func(t *testing.T) { + // IF EXISTS is not supported in -declarative + uuid := testOnlineDDL(t, dropIfExistsStatement, declarativeStrategy, "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + checkMigratedTable(t, tableName, "create2") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("CREATE TABLE IF NOT EXISTS non-declarative is successful", func(t *testing.T) { + // IF NOT EXISTS is supported in non-declarative mode. Just verifying that the statement itself is good, + // so that the failure we tested for, above, actually tests the "declarative" logic, rather than some + // unrelated error. + uuid := testOnlineDDL(t, createIfNotExistsStatement, "online", "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + // the table existed, so we expect no changes in this non-declarative DDL + checkMigratedTable(t, tableName, "create2") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("CREATE TABLE with zero date and --allow-zero-in-date is successful", func(t *testing.T) { + uuid := testOnlineDDL(t, createStatementZeroDate, "online --allow-zero-in-date", "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkMigratedTable(t, "zerodate_test", "create_with_zero") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("CREATE TABLE with zero date and --allow-zero-in-date is successful", func(t *testing.T) { + uuid := testOnlineDDL(t, createStatementZeroDate, "online -declarative --allow-zero-in-date", "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkMigratedTable(t, "zerodate_test", "create_with_zero") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + t.Run("CREATE TABLE with zero date and --allow-zero-in-date is successful", func(t *testing.T) { + uuid := testOnlineDDL(t, createStatementZeroDate2, "online -declarative --allow-zero-in-date", "vtgate", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkMigratedTable(t, "zerodate_test", "create_with_zero2") + checkTable(t, tableName, true) + testSelectTableMetrics(t) + }) + + // ### The following tests are not strictly 'declarative' but are best served under this endtoend test + + // Test duplicate context/SQL + t.Run("Trivial statement with request context is successful", func(t *testing.T) { + uuid := testOnlineDDL(t, trivialAlterStatement, "online", "vtctl", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + // the table existed, so we expect no changes in this non-declarative DDL + checkTable(t, tableName, true) + + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + message := row["message"].ToString() + require.NotContains(t, message, "duplicate DDL") + } + }) + t.Run("Duplicate trivial statement with request context is successful", func(t *testing.T) { + uuid := testOnlineDDL(t, trivialAlterStatement, "online", "vtctl", "", "") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + // the table existed, so we expect no changes in this non-declarative DDL + checkTable(t, tableName, true) + + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + for _, row := range rs.Named().Rows { + message := row["message"].ToString() + // Message suggests that the migration was identified as duplicate + require.Contains(t, message, "duplicate DDL") + } + }) + // Piggyride this test suite, let's also test --allow-zero-in-date for 'direct' strategy + t.Run("drop non_online", func(t *testing.T) { + _ = testOnlineDDL(t, dropZeroDateStatement, "direct", "vtctl", "", "") + checkTable(t, "zerodate_test", false) + }) + t.Run("CREATE TABLE with zero date fails in 'direct' strategy", func(t *testing.T) { + _ = testOnlineDDL(t, createStatementZeroDate, "direct", "vtctl", "", "Invalid default value for") + checkTable(t, "zerodate_test", false) + }) + t.Run("CREATE TABLE with zero date and --allow-zero-in-date succeeds in 'direct' strategy", func(t *testing.T) { + _ = testOnlineDDL(t, createStatementZeroDate, "direct --allow-zero-in-date", "vtctl", "", "") + checkTable(t, "zerodate_test", true) + }) +} + +func testForeignKeys(t *testing.T) { + defer cluster.PanicHandler(t) + + var ( + createStatements = []string{ + ` + CREATE TABLE parent_table ( + id INT NOT NULL, + parent_hint_col INT NOT NULL DEFAULT 0, + PRIMARY KEY (id) + ) + `, + ` + CREATE TABLE child_table ( + id INT NOT NULL auto_increment, + parent_id INT, + child_hint_col INT NOT NULL DEFAULT 0, + PRIMARY KEY (id), + KEY parent_id_idx (parent_id), + CONSTRAINT child_parent_fk FOREIGN KEY (parent_id) REFERENCES parent_table(id) ON DELETE CASCADE + ) + `, + ` + CREATE TABLE child_nofk_table ( + id INT NOT NULL auto_increment, + parent_id INT, + child_hint_col INT NOT NULL DEFAULT 0, + PRIMARY KEY (id), + KEY parent_id_idx (parent_id) + ) + `, + } + insertStatements = []string{ + "insert into parent_table (id) values(43)", + "insert into child_table (id, parent_id) values(1,43)", + "insert into child_table (id, parent_id) values(2,43)", + "insert into child_table (id, parent_id) values(3,43)", + "insert into child_table (id, parent_id) values(4,43)", + } + ddlStrategy = "online --allow-zero-in-date" + ddlStrategyAllowFK = ddlStrategy + " --unsafe-allow-foreign-keys" + ) + + type testCase struct { + name string + sql string + allowForeignKeys bool + expectHint string + } + var testCases = []testCase{ + { + name: "modify parent, not allowed", + sql: "alter table parent_table engine=innodb", + allowForeignKeys: false, + }, + { + name: "modify child, not allowed", + sql: "alter table child_table engine=innodb", + allowForeignKeys: false, + }, + { + name: "add foreign key to child, not allowed", + sql: "alter table child_table add CONSTRAINT another_fk FOREIGN KEY (parent_id) REFERENCES parent_table(id) ON DELETE CASCADE", + allowForeignKeys: false, + }, + { + name: "add foreign key to table which wasn't a child before, not allowed", + sql: "alter table child_nofk_table add CONSTRAINT new_fk FOREIGN KEY (parent_id) REFERENCES parent_table(id) ON DELETE CASCADE", + allowForeignKeys: false, + }, + { + // on vanilla MySQL, this migration ends with the child_table referencing the old, original table, and not to the new table now called parent_table. + // This is a fundamental foreign key limitation, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/ + // However, this tests is still valid in the sense that it lets us modify the parent table in the first place. + name: "modify parent, trivial", + sql: "alter table parent_table engine=innodb", + allowForeignKeys: true, + expectHint: "parent_hint_col", + }, + { + // on vanilla MySQL, this migration ends with two tables, the original and the new child_table, both referencing parent_table. This has + // the unwanted property of then limiting actions on the parent_table based on what rows exist or do not exist on the now stale old + // child table. + // This is a fundamental foreign key limitation, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/ + // However, this tests is still valid in the sense that it lets us modify the child table in the first place. + // A valid use case: using FOREIGN_KEY_CHECKS=0 at all times. + name: "modify child, trivial", + sql: "alter table child_table engine=innodb", + allowForeignKeys: true, + expectHint: "REFERENCES `parent_table`", + }, + { + // on vanilla MySQL, this migration ends with two tables, the original and the new child_table, both referencing parent_table. This has + // the unwanted property of then limiting actions on the parent_table based on what rows exist or do not exist on the now stale old + // child table. + // This is a fundamental foreign key limitation, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/ + // However, this tests is still valid in the sense that it lets us modify the child table in the first place. + // A valid use case: using FOREIGN_KEY_CHECKS=0 at all times. + name: "add foreign key to child", + sql: "alter table child_table add CONSTRAINT another_fk FOREIGN KEY (parent_id) REFERENCES parent_table(id) ON DELETE CASCADE", + allowForeignKeys: true, + expectHint: "another_fk", + }, + { + name: "add foreign key to table which wasn't a child before", + sql: "alter table child_nofk_table add CONSTRAINT new_fk FOREIGN KEY (parent_id) REFERENCES parent_table(id) ON DELETE CASCADE", + allowForeignKeys: true, + expectHint: "new_fk", + }, + } + + createParams := func(ddlStatement string, ddlStrategy string, executeStrategy string, expectHint string, expectError string, skipWait bool) *testOnlineDDLStatementParams { + return &testOnlineDDLStatementParams{ + ddlStatement: ddlStatement, + ddlStrategy: ddlStrategy, + executeStrategy: executeStrategy, + expectHint: expectHint, + expectError: expectError, + skipWait: skipWait, + } + } + + testStatement := func(t *testing.T, sql string, ddlStrategy string, expectHint string, expectError bool) (uuid string) { + errorHint := "" + if expectError { + errorHint = anyErrorIndicator + } + return testOnlineDDLStatement(t, createParams(sql, ddlStrategy, "vtctl", expectHint, errorHint, false)) + } + for _, testcase := range testCases { + t.Run(testcase.name, func(t *testing.T) { + t.Run("create tables", func(t *testing.T) { + for _, statement := range createStatements { + t.Run(statement, func(t *testing.T) { + uuid := testStatement(t, statement, ddlStrategyAllowFK, "", false) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + }) + } + }) + t.Run("populate tables", func(t *testing.T) { + for _, statement := range insertStatements { + t.Run(statement, func(t *testing.T) { + onlineddl.VtgateExecQuery(t, &vtParams, statement, "") + }) + } + }) + var uuid string + t.Run("run migration", func(t *testing.T) { + if testcase.allowForeignKeys { + uuid = testStatement(t, testcase.sql, ddlStrategyAllowFK, testcase.expectHint, false) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + } else { + uuid = testStatement(t, testcase.sql, ddlStrategy, "", true) + if uuid != "" { + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed) + } + } + }) + t.Run("cleanup", func(t *testing.T) { + var artifacts []string + if uuid != "" { + rs := onlineddl.ReadMigrations(t, &vtParams, uuid) + require.NotNil(t, rs) + row := rs.Named().Row() + require.NotNil(t, row) + + artifacts = textutil.SplitDelimitedList(row.AsString("artifacts", "")) + } + + artifacts = append(artifacts, "child_table", "child_nofk_table", "parent_table") + // brute force drop all tables. In MySQL 8.0 you can do a single `DROP TABLE ... ` + // which auto-resovled order. But in 5.7 you can't. + droppedTables := map[string]bool{} + for range artifacts { + for _, artifact := range artifacts { + if droppedTables[artifact] { + continue + } + statement := fmt.Sprintf("DROP TABLE IF EXISTS %s", artifact) + _, err := clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, statement, cluster.VtctlClientParams{DDLStrategy: "direct"}) + if err == nil { + droppedTables[artifact] = true + } + } + } + statement := fmt.Sprintf("DROP TABLE IF EXISTS %s", strings.Join(artifacts, ",")) + t.Run(statement, func(t *testing.T) { + testStatement(t, statement, "direct", "", false) + }) + }) + }) + } +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) } // testOnlineDDLStatement runs an online DDL, ALTER statement diff --git a/go/vt/schema/ddl_strategy.go b/go/vt/schema/ddl_strategy.go index 05e2e15c1f5..a857805380f 100644 --- a/go/vt/schema/ddl_strategy.go +++ b/go/vt/schema/ddl_strategy.go @@ -24,7 +24,13 @@ import ( ) var ( +<<<<<<< HEAD strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`) +======= + strategyParserRegexp = regexp.MustCompile(`^([\S]+)\s+(.*)$`) + cutOverThresholdFlagRegexp = regexp.MustCompile(fmt.Sprintf(`^[-]{1,2}%s=(.*?)$`, cutOverThresholdFlag)) + retainArtifactsFlagRegexp = regexp.MustCompile(fmt.Sprintf(`^[-]{1,2}%s=(.*?)$`, retainArtifactsFlag)) +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) ) const ( @@ -38,6 +44,11 @@ const ( allowConcurrentFlag = "allow-concurrent" fastOverRevertibleFlag = "fast-over-revertible" fastRangeRotationFlag = "fast-range-rotation" +<<<<<<< HEAD +======= + cutOverThresholdFlag = "cut-over-threshold" + retainArtifactsFlag = "retain-artifacts" +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) vreplicationTestSuite = "vreplication-test-suite" ) @@ -98,6 +109,15 @@ func ParseDDLStrategy(strategyVariable string) (*DDLStrategySetting, error) { default: return nil, fmt.Errorf("Unknown online DDL strategy: '%v'", strategy) } +<<<<<<< HEAD +======= + if _, err := setting.CutOverThreshold(); err != nil { + return nil, err + } + if _, err := setting.RetainArtifactsDuration(); err != nil { + return nil, err + } +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) return setting, nil } @@ -168,7 +188,65 @@ func (setting *DDLStrategySetting) IsFastRangeRotationFlag() bool { return setting.hasFlag(fastRangeRotationFlag) } +<<<<<<< HEAD // IsVreplicationTestSuite checks if strategy options include -vreplicatoin-test-suite +======= +// isCutOverThresholdFlag returns true when given option denotes a `--cut-over-threshold=[...]` flag +func isCutOverThresholdFlag(opt string) (string, bool) { + submatch := cutOverThresholdFlagRegexp.FindStringSubmatch(opt) + if len(submatch) == 0 { + return "", false + } + return submatch[1], true +} + +// isRetainArtifactsFlag returns true when given option denotes a `--retain-artifacts=[...]` flag +func isRetainArtifactsFlag(opt string) (string, bool) { + submatch := retainArtifactsFlagRegexp.FindStringSubmatch(opt) + if len(submatch) == 0 { + return "", false + } + return submatch[1], true +} + +// CutOverThreshold returns a the duration threshold indicated by --cut-over-threshold +func (setting *DDLStrategySetting) CutOverThreshold() (d time.Duration, err error) { + // We do some ugly manual parsing of --cut-over-threshold value + opts, _ := shlex.Split(setting.Options) + for _, opt := range opts { + if val, isCutOver := isCutOverThresholdFlag(opt); isCutOver { + // value is possibly quoted + if s, err := strconv.Unquote(val); err == nil { + val = s + } + if val != "" { + d, err = time.ParseDuration(val) + } + } + } + return d, err +} + +// RetainArtifactsDuration returns a the duration indicated by --retain-artifacts +func (setting *DDLStrategySetting) RetainArtifactsDuration() (d time.Duration, err error) { + // We do some ugly manual parsing of --retain-artifacts + opts, _ := shlex.Split(setting.Options) + for _, opt := range opts { + if val, isRetainArtifacts := isRetainArtifactsFlag(opt); isRetainArtifacts { + // value is possibly quoted + if s, err := strconv.Unquote(val); err == nil { + val = s + } + if val != "" { + d, err = time.ParseDuration(val) + } + } + } + return d, err +} + +// IsVreplicationTestSuite checks if strategy options include --vreplicatoin-test-suite +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) func (setting *DDLStrategySetting) IsVreplicationTestSuite() bool { return setting.hasFlag(vreplicationTestSuite) } @@ -178,6 +256,15 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string { opts, _ := shlex.Split(setting.Options) validOpts := []string{} for _, opt := range opts { +<<<<<<< HEAD +======= + if _, ok := isCutOverThresholdFlag(opt); ok { + continue + } + if _, ok := isRetainArtifactsFlag(opt); ok { + continue + } +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) switch { case isFlag(opt, declarativeFlag): case isFlag(opt, skipTopoFlag): diff --git a/go/vt/schema/ddl_strategy_test.go b/go/vt/schema/ddl_strategy_test.go index 8a700655e51..8ac628b0eea 100644 --- a/go/vt/schema/ddl_strategy_test.go +++ b/go/vt/schema/ddl_strategy_test.go @@ -37,6 +37,135 @@ func TestIsDirect(t *testing.T) { assert.True(t, DDLStrategy("something").IsDirect()) } +<<<<<<< HEAD +======= +func TestIsCutOverThresholdFlag(t *testing.T) { + tt := []struct { + s string + expect bool + val string + d time.Duration + }{ + { + s: "something", + }, + { + s: "-cut-over-threshold", + }, + { + s: "--cut-over-threshold", + }, + { + s: "--cut-over-threshold=", + expect: true, + }, + { + s: "--cut-over-threshold=0", + expect: true, + val: "0", + d: 0, + }, + { + s: "-cut-over-threshold=0", + expect: true, + val: "0", + d: 0, + }, + { + s: "--cut-over-threshold=1m", + expect: true, + val: "1m", + d: time.Minute, + }, + { + s: `--cut-over-threshold="1m"`, + expect: true, + val: `"1m"`, + d: time.Minute, + }, + } + for _, ts := range tt { + t.Run(ts.s, func(t *testing.T) { + setting, err := ParseDDLStrategy("online " + ts.s) + assert.NoError(t, err) + + val, isCutOver := isCutOverThresholdFlag(ts.s) + assert.Equal(t, ts.expect, isCutOver) + assert.Equal(t, ts.val, val) + + if ts.expect { + d, err := setting.CutOverThreshold() + assert.NoError(t, err) + assert.Equal(t, ts.d, d) + } + }) + } +} + +func TestIsExpireArtifactsFlag(t *testing.T) { + tt := []struct { + s string + expect bool + val string + d time.Duration + }{ + { + s: "something", + }, + { + s: "-retain-artifacts", + }, + { + s: "--retain-artifacts", + }, + { + s: "--retain-artifacts=", + expect: true, + }, + { + s: "--retain-artifacts=0", + expect: true, + val: "0", + d: 0, + }, + { + s: "-retain-artifacts=0", + expect: true, + val: "0", + d: 0, + }, + { + s: "--retain-artifacts=1m", + expect: true, + val: "1m", + d: time.Minute, + }, + { + s: `--retain-artifacts="1m"`, + expect: true, + val: `"1m"`, + d: time.Minute, + }, + } + for _, ts := range tt { + t.Run(ts.s, func(t *testing.T) { + setting, err := ParseDDLStrategy("online " + ts.s) + assert.NoError(t, err) + + val, isRetainArtifacts := isRetainArtifactsFlag(ts.s) + assert.Equal(t, ts.expect, isRetainArtifacts) + assert.Equal(t, ts.val, val) + + if ts.expect { + d, err := setting.RetainArtifactsDuration() + assert.NoError(t, err) + assert.Equal(t, ts.d, d) + } + }) + } +} + +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) func TestParseDDLStrategy(t *testing.T) { tt := []struct { strategyVariable string @@ -49,6 +178,13 @@ func TestParseDDLStrategy(t *testing.T) { isAllowConcurrent bool fastOverRevertible bool fastRangeRotation bool +<<<<<<< HEAD +======= + allowForeignKeys bool + analyzeTable bool + cutOverThreshold time.Duration + expireArtifacts time.Duration +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) runtimeOptions string err error }{ @@ -145,6 +281,37 @@ func TestParseDDLStrategy(t *testing.T) { runtimeOptions: "", fastRangeRotation: true, }, +<<<<<<< HEAD +======= + { + strategyVariable: "vitess --unsafe-allow-foreign-keys", + strategy: DDLStrategyVitess, + options: "--unsafe-allow-foreign-keys", + runtimeOptions: "", + allowForeignKeys: true, + }, + { + strategyVariable: "vitess --cut-over-threshold=5m", + strategy: DDLStrategyVitess, + options: "--cut-over-threshold=5m", + runtimeOptions: "", + cutOverThreshold: 5 * time.Minute, + }, + { + strategyVariable: "vitess --retain-artifacts=4m", + strategy: DDLStrategyVitess, + options: "--retain-artifacts=4m", + runtimeOptions: "", + expireArtifacts: 4 * time.Minute, + }, + { + strategyVariable: "vitess --analyze-table", + strategy: DDLStrategyVitess, + options: "--analyze-table", + runtimeOptions: "", + analyzeTable: true, + }, +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) } for _, ts := range tt { setting, err := ParseDDLStrategy(ts.strategyVariable) @@ -166,4 +333,19 @@ func TestParseDDLStrategy(t *testing.T) { _, err := ParseDDLStrategy("other") assert.Error(t, err) } +<<<<<<< HEAD +======= + { + _, err := ParseDDLStrategy("online --cut-over-threshold=X") + assert.Error(t, err) + } + { + _, err := ParseDDLStrategy("online --cut-over-threshold=3") + assert.Error(t, err) + } + { + _, err := ParseDDLStrategy("online --retain-artifacts=3") + assert.Error(t, err) + } +>>>>>>> 0461fafbd2 (OnlineDDL: cleanup cancelled migration artifacts; support `--retain-artifacts=` DDL strategy flag (#14029)) } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index d64ee55e7b7..dfd4525d60a 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -4527,6 +4527,11 @@ func (e *Executor) SubmitMigration( revertedUUID, _ := onlineDDL.GetRevertUUID() // Empty value if the migration is not actually a REVERT. Safe to ignore error. retainArtifactsSeconds := int64((retainOnlineDDLTables).Seconds()) + if retainArtifacts, _ := onlineDDL.StrategySetting().RetainArtifactsDuration(); retainArtifacts != 0 { + // Explicit retention indicated by `--retain-artifact` DDL strategy flag for this migration. Override! + retainArtifactsSeconds = int64((retainArtifacts).Seconds()) + } + _, allowConcurrentMigration := e.allowConcurrentMigration(onlineDDL) query, err := sqlparser.ParseAndBind(sqlInsertMigration, sqltypes.StringBindVariable(onlineDDL.UUID), diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 833148939b0..276eb19b738 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -404,7 +404,7 @@ const ( log_path FROM _vt.schema_migrations WHERE - migration_status IN ('complete', 'failed') + migration_status IN ('complete', 'cancelled', 'failed') AND cleanup_timestamp IS NULL AND completed_timestamp <= IF(retain_artifacts_seconds=0, NOW() - INTERVAL %a SECOND,