diff --git a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go index 644def2d4c2..c6f393d236c 100644 --- a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go +++ b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go @@ -347,6 +347,7 @@ func TestSchemaChange(t *testing.T) { // ALTER VIEW t.Run("ALTER VIEW where view exists", func(t *testing.T) { // The view exists + checkTable(t, viewName, true) uuid := testOnlineDDLStatementForView(t, alterViewStatement, ddlStrategy, "vtgate", "success_alter") uuids = append(uuids, uuid) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) @@ -627,7 +628,8 @@ func TestSchemaChange(t *testing.T) { checkMigratedTable(t, tableName, alterHints[0]) testSelectTableMetrics(t) }) - t.Run("postponed revert", func(t *testing.T) { + testPostponedRevert := func(t *testing.T, expectStatuses ...schema.OnlineDDLStatus) { + require.NotEmpty(t, expectStatuses) ctx, cancel := context.WithCancel(context.Background()) defer cancel() var wg sync.WaitGroup @@ -636,22 +638,56 @@ func TestSchemaChange(t *testing.T) { defer wg.Done() runMultipleConnections(ctx, t) }() - uuid := testRevertMigration(t, uuids[len(uuids)-1], ddlStrategy+" -postpone-completion") + uuid := testRevertMigration(t, uuids[len(uuids)-1], ddlStrategy+" --postpone-completion") uuids = append(uuids, uuid) // Should be still running! - onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, expectStatuses...) // Issue a complete and wait for successful completion onlineddl.CheckCompleteMigration(t, &vtParams, shards, uuid, true) - // This part may take a while, because we depend on vreplicatoin polling + // This part may take a while, because we depend on vreplication polling status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 60*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) cancel() // will cause runMultipleConnections() to terminate wg.Wait() + } + t.Run("postponed revert", func(t *testing.T) { + testPostponedRevert(t, schema.OnlineDDLStatusRunning) checkMigratedTable(t, tableName, alterHints[1]) testSelectTableMetrics(t) }) + t.Run("postponed revert view", func(t *testing.T) { + t.Run("CREATE VIEW again", func(t *testing.T) { + // The view does not exist + uuid := testOnlineDDLStatementForView(t, createViewStatement, ddlStrategy, "vtgate", "success_create") + uuids = append(uuids, uuid) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, viewName, true) + testRevertedUUID(t, uuid, "") + }) + t.Run("ALTER VIEW, postpone completion", func(t *testing.T) { + // Technically this test better fits in `onlineddl_scheduler_test.go`, but since we've already laid the grounds here, this is where it landed. + // The view exists + checkTable(t, viewName, true) + uuid := testOnlineDDLStatementForView(t, alterViewStatement, ddlStrategy+" --postpone-completion", "vtgate", "success_create") + uuids = append(uuids, uuid) + + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + // Issue a complete and wait for successful completion + onlineddl.CheckCompleteMigration(t, &vtParams, shards, uuid, true) + // This part may take a while, because we depend on vreplication polling + status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 60*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) + onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) + checkTable(t, viewName, true) + testRevertedUUID(t, uuid, "") + }) + // now verify that the revert for ALTER VIEW respects `--postpone-completion` + testPostponedRevert(t, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) + checkTable(t, viewName, true) + }) + // INSTANT DDL t.Run("INSTANT DDL: add column", func(t *testing.T) { uuid := testOnlineDDLStatementForTable(t, "alter table stress_test add column i_instant int not null default 0", ddlStrategy+" --fast-over-revertible", "vtgate", "i_instant") diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index 05e1217a670..dca084103b8 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -41,8 +41,9 @@ var ( shards []cluster.Shard vtParams mysql.ConnParams - normalWaitTime = 20 * time.Second - extendedWaitTime = 60 * time.Second + normalWaitTime = 20 * time.Second + extendedWaitTime = 60 * time.Second + ensureStateNotChangedTime = 5 * time.Second hostname = "localhost" keyspaceName = "ks" @@ -79,6 +80,9 @@ var ( trivialAlterT2Statement = ` ALTER TABLE t2_test ENGINE=InnoDB; ` + instantAlterT1Statement = ` + ALTER TABLE t1_test ADD COLUMN i0 INT NOT NULL DEFAULT 0; + ` dropT1Statement = ` DROP TABLE IF EXISTS t1_test ` @@ -161,6 +165,9 @@ func TestSchemaChange(t *testing.T) { shards = clusterInstance.Keyspaces[0].Shards require.Equal(t, 1, len(shards)) + mysqlVersion := onlineddl.GetMySQLVersion(t, clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet()) + require.NotEmpty(t, mysqlVersion) + var t1uuid string var t2uuid string @@ -313,7 +320,7 @@ func TestSchemaChange(t *testing.T) { onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning) // now that t1 is running, let's unblock t2. We expect it to remain queued. onlineddl.CheckCompleteMigration(t, &vtParams, shards, t2uuid, true) - time.Sleep(5 * time.Second) + time.Sleep(ensureStateNotChangedTime) // t1 should be still running! onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) // non-concurrent -- should be queued! @@ -345,7 +352,7 @@ func TestSchemaChange(t *testing.T) { t.Run("expect both running", func(t *testing.T) { onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning) onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusRunning) - time.Sleep(5 * time.Second) + time.Sleep(ensureStateNotChangedTime) // both should be still running! onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning) @@ -384,7 +391,7 @@ func TestSchemaChange(t *testing.T) { // since all migrations are throttled, t1 migration is not ready_to_complete, hence // t2 should not be running onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) - time.Sleep(5 * time.Second) + time.Sleep(ensureStateNotChangedTime) // both should be still running! onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady) @@ -393,7 +400,7 @@ func TestSchemaChange(t *testing.T) { onlineddl.UnthrottleAllMigrations(t, &vtParams) // t1 should now be ready_to_complete, hence t2 should start running onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, extendedWaitTime, schema.OnlineDDLStatusRunning) - time.Sleep(5 * time.Second) + time.Sleep(ensureStateNotChangedTime) // both should be still running! onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning) @@ -566,7 +573,7 @@ func TestSchemaChange(t *testing.T) { }) drop1uuid := testOnlineDDLStatement(t, dropT1Statement, ddlStrategy+" -allow-concurrent", "vtgate", "", "", true) // skip wait t.Run("t1drop blocked", func(t *testing.T) { - time.Sleep(5 * time.Second) + time.Sleep(ensureStateNotChangedTime) // drop1 migration should block. It can run concurrently to t1, but conflicts on table name onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusReady) }) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 7cdb082b847..4663722983c 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -2130,7 +2130,7 @@ func (e *Executor) scheduleNextMigration(ctx context.Context) error { postponeLaunch := row.AsBool("postpone_launch", false) postponeCompletion := row.AsBool("postpone_completion", false) readyToComplete := row.AsBool("ready_to_complete", false) - ddlAction := row["ddl_action"].ToString() + isImmediateOperation := row.AsBool("is_immediate_operation", false) if postponeLaunch { // We don't even look into this migration until its postpone_launch flag is cleared @@ -2138,18 +2138,19 @@ func (e *Executor) scheduleNextMigration(ctx context.Context) error { } if !readyToComplete { - // Whether postponsed or not, CREATE and DROP operations are inherently "ready to complete" - // because their operation is instantaneous. - switch ddlAction { - case sqlparser.CreateStr, sqlparser.DropStr: + // see if we need to update ready_to_complete + if isImmediateOperation { + // Whether postponed or not, CREATE and DROP operations, as well as VIEW operations, + // are inherently "ready to complete" because their operation is immediate. if err := e.updateMigrationReadyToComplete(ctx, uuid, true); err != nil { return err } } } - if ddlAction == sqlparser.AlterStr || !postponeCompletion { + + if !(isImmediateOperation && postponeCompletion) { // Any non-postponed migration can be scheduled - // postponed ALTER can be scheduled + // postponed ALTER can be scheduled (because gh-ost or vreplication will postpone the cut-over) // We only schedule a single migration in the execution of this function onlyScheduleOneMigration.Do(func() { err = e.updateMigrationStatus(ctx, uuid, schema.OnlineDDLStatusReady) @@ -2163,70 +2164,137 @@ func (e *Executor) scheduleNextMigration(ctx context.Context) error { return err } -// reviewQueuedMigrations iterates queued migrations and sees if any information needs to be updated +// reviewEmptyTableRevertMigrations reviews a queued REVERT migration. Such a migration has the following SQL: +// "REVERT VITESS_MIGRATION '...'" +// There's nothing in this SQL to indicate: +// - which table is involved? +// - is this a table or a view? +// - Are we reverting a CREATE? A DROP? An ALTER? +// This function fills in the blanks and updates the database row. +func (e *Executor) reviewEmptyTableRevertMigrations(ctx context.Context, onlineDDL *schema.OnlineDDL) (changesMade bool, err error) { + if onlineDDL.Table != "" { + return false, nil + } + // Table name is empty. Let's populate it. + + // Try to update table name and ddl_action + // Failure to do so fails the migration + revertUUID, err := onlineDDL.GetRevertUUID() + if err != nil { + return false, e.failMigration(ctx, onlineDDL, fmt.Errorf("cannot analyze revert UUID for revert migration %s: %v", onlineDDL.UUID, err)) + } + revertedMigration, revertedRow, err := e.readMigration(ctx, revertUUID) + if err != nil { + return false, e.failMigration(ctx, onlineDDL, fmt.Errorf("cannot read migration %s reverted by migration %s: %s", revertUUID, onlineDDL.UUID, err)) + } + revertedActionStr := revertedRow["ddl_action"].ToString() + + mimickedActionStr := "" + switch revertedActionStr { + case sqlparser.CreateStr: + mimickedActionStr = sqlparser.DropStr + case sqlparser.DropStr: + mimickedActionStr = sqlparser.CreateStr + case sqlparser.AlterStr: + mimickedActionStr = sqlparser.AlterStr + default: + return false, e.failMigration(ctx, onlineDDL, fmt.Errorf("cannot run migration %s reverting %s: unexpected action %s", onlineDDL.UUID, revertedMigration.UUID, revertedActionStr)) + } + if err := e.updateDDLAction(ctx, onlineDDL.UUID, mimickedActionStr); err != nil { + return false, err + } + if err := e.updateMigrationIsView(ctx, onlineDDL.UUID, revertedRow.AsBool("is_view", false)); err != nil { + return false, err + } + if err := e.updateMySQLTable(ctx, onlineDDL.UUID, revertedMigration.Table); err != nil { + return false, err + } + return true, nil +} + +// reviewImmediateOperations reviews a queued migration and determines whether it is an "immediate operation". +// Immediate operations are ones that can be performed within a split second, or rather, do not require long +// running processes. Immediate operations are: +// - CREATE TABLE +// - DROP TABLE (which we convert into RENAME) +// - All VIEW operations +// - An INSTANT DDL accompanied by relevant ddl strategy flags +// Non immediate operations are: +// - A gh-ost migration +// - A vitess (vreplication) migration +func (e *Executor) reviewImmediateOperations(ctx context.Context, capableOf mysql.CapableOf, onlineDDL *schema.OnlineDDL, ddlAction string, isView bool) error { + isImmediateOperation := false + switch ddlAction { + case sqlparser.CreateStr, sqlparser.DropStr: + isImmediateOperation = true + case sqlparser.AlterStr: + if isView { + isImmediateOperation = true + } else { + specialPlan, err := e.analyzeSpecialAlterPlan(ctx, onlineDDL, capableOf) + if err != nil { + return err + } + if specialPlan != nil { + isImmediateOperation = true + } + } + } + if isImmediateOperation { + if err := e.updateMigrationSetImmediateOperation(ctx, onlineDDL.UUID); err != nil { + return err + } + } + return nil +} + +// reviewQueuedMigrations iterates through queued migrations and sees if any information needs to be updated. +// The function analyzes the queued migration and fills in some blanks: +// - If this is a REVERT migration, what table is affected? What's the operation? +// - Is this migration an "immediate operation"? func (e *Executor) reviewQueuedMigrations(ctx context.Context) error { + conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB()) + if err != nil { + return err + } + defer conn.Close() + _, capableOf, _ := mysql.GetFlavor(conn.ServerVersion, nil) + e.migrationMutex.Lock() defer e.migrationMutex.Unlock() - // Review REVERT migrations - // These migrations are submitted with some details missing. This is because the statement - // REVERT VITESS_MIGRATION '' - // doesn't have much detail, we need to extract the info from the reverted migration. Missing details: - // - What table is affected? - // - What ddl action (CREATE, DROP, ALTER) is being reverted, or what is the counter-operation to be executed? - - r, err := e.execQuery(ctx, sqlSelectQueuedRevertMigrations) + r, err := e.execQuery(ctx, sqlSelectQueuedUnreviewedMigrations) if err != nil { return err } - for _, row := range r.Named().Rows { - uuid := row["migration_uuid"].ToString() - onlineDDL, _, err := e.readMigration(ctx, uuid) + for _, uuidRow := range r.Named().Rows { + uuid := uuidRow["migration_uuid"].ToString() + onlineDDL, row, err := e.readMigration(ctx, uuid) if err != nil { return err } - reviewEmptyTableRevertMigrations := func() error { - if onlineDDL.Table != "" { - return nil - } - // Table name is empty. Let's populate it. - - // Try to update table name and ddl_action - // Failure to do so fails the migration - revertUUID, err := onlineDDL.GetRevertUUID() - if err != nil { - return e.failMigration(ctx, onlineDDL, fmt.Errorf("cannot analyze revert UUID for revert migration %s: %v", onlineDDL.UUID, err)) - } - revertedMigration, row, err := e.readMigration(ctx, revertUUID) + // handle REVERT migrations: populate table name and update ddl action and is_view: + ddlAction := row["ddl_action"].ToString() + if ddlAction == schema.RevertActionStr { + rowModified, err := e.reviewEmptyTableRevertMigrations(ctx, onlineDDL) if err != nil { - return e.failMigration(ctx, onlineDDL, fmt.Errorf("cannot read migration %s reverted by migration %s: %s", revertUUID, onlineDDL.UUID, err)) - } - revertedActionStr := row["ddl_action"].ToString() - mimickedActionStr := "" - - switch revertedActionStr { - case sqlparser.CreateStr: - mimickedActionStr = sqlparser.DropStr - case sqlparser.DropStr: - mimickedActionStr = sqlparser.CreateStr - case sqlparser.AlterStr: - mimickedActionStr = sqlparser.AlterStr - default: - return e.failMigration(ctx, onlineDDL, fmt.Errorf("cannot run migration %s reverting %s: unexpected action %s", onlineDDL.UUID, revertedMigration.UUID, revertedActionStr)) - } - if err := e.updateDDLAction(ctx, onlineDDL.UUID, mimickedActionStr); err != nil { return err } - if err := e.updateMigrationIsView(ctx, onlineDDL.UUID, row.AsBool("is_view", false)); err != nil { - return err - } - if err := e.updateMySQLTable(ctx, onlineDDL.UUID, revertedMigration.Table); err != nil { - return err + if rowModified { + // re-read migration and entire row + onlineDDL, row, err = e.readMigration(ctx, uuid) + if err != nil { + return err + } + ddlAction = row["ddl_action"].ToString() } - return nil } - if err := reviewEmptyTableRevertMigrations(); err != nil { + isView := row.AsBool("is_view", false) + if err := e.reviewImmediateOperations(ctx, capableOf, onlineDDL, ddlAction, isView); err != nil { + return err + } + if err := e.updateMigrationTimestamp(ctx, "reviewed_timestamp", uuid); err != nil { return err } } @@ -2804,6 +2872,7 @@ func (e *Executor) executeSpecialAlterDDLActionMigrationIfApplicable(ctx context if specialPlan == nil { return false, nil } + switch specialPlan.operation { case instantDDLSpecialOperation: e.addInstantAlgorithm(specialPlan.alterTable) @@ -4151,6 +4220,17 @@ func (e *Executor) updateMigrationIsView(ctx context.Context, uuid string, isVie return err } +func (e *Executor) updateMigrationSetImmediateOperation(ctx context.Context, uuid string) error { + query, err := sqlparser.ParseAndBind(sqlUpdateMigrationSetImmediateOperation, + sqltypes.StringBindVariable(uuid), + ) + if err != nil { + return err + } + _, err = e.execQuery(ctx, query) + return err +} + func (e *Executor) updateMigrationReadyToComplete(ctx context.Context, uuid string, isReady bool) error { query, err := sqlparser.ParseAndBind(sqlUpdateMigrationReadyToComplete, sqltypes.BoolBindVariable(isReady), diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index b6a7532378a..41449572508 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -83,6 +83,8 @@ const ( alterSchemaMigrationsTablePostponeLaunch = "ALTER TABLE _vt.schema_migrations add column postpone_launch tinyint unsigned NOT NULL DEFAULT 0" alterSchemaMigrationsStage = "ALTER TABLE _vt.schema_migrations add column stage text not null" alterSchemaMigrationsCutoverAttempts = "ALTER TABLE _vt.schema_migrations add column cutover_attempts int unsigned NOT NULL DEFAULT 0" + alterSchemaMigrationsTableImmediateOperation = "ALTER TABLE _vt.schema_migrations add column is_immediate_operation tinyint unsigned NOT NULL DEFAULT 0" + alterSchemaMigrationsReviewedTimestamp = "ALTER TABLE _vt.schema_migrations add column reviewed_timestamp timestamp NULL DEFAULT NULL" sqlInsertMigration = `INSERT IGNORE INTO _vt.schema_migrations ( migration_uuid, @@ -111,6 +113,8 @@ const ( sqlSelectQueuedMigrations = `SELECT migration_uuid, ddl_action, + is_view, + is_immediate_operation, postpone_launch, postpone_completion, ready_to_complete @@ -153,6 +157,11 @@ const ( WHERE migration_uuid=%a ` + sqlUpdateMigrationSetImmediateOperation = `UPDATE _vt.schema_migrations + SET is_immediate_operation=1 + WHERE + migration_uuid=%a + ` sqlUpdateMigrationReadyToComplete = `UPDATE _vt.schema_migrations SET ready_to_complete=%a WHERE @@ -381,12 +390,12 @@ const ( WHERE migration_status IN ('queued', 'ready', 'running') ` - sqlSelectQueuedRevertMigrations = `SELECT + sqlSelectQueuedUnreviewedMigrations = `SELECT migration_uuid FROM _vt.schema_migrations WHERE migration_status='queued' - AND ddl_action='revert' + AND reviewed_timestamp IS NULL ` sqlSelectUncollectedArtifacts = `SELECT migration_uuid, @@ -682,4 +691,6 @@ var ApplyDDL = []string{ alterSchemaMigrationsTablePostponeLaunch, alterSchemaMigrationsStage, alterSchemaMigrationsCutoverAttempts, + alterSchemaMigrationsTableImmediateOperation, + alterSchemaMigrationsReviewedTimestamp, }