Skip to content

Commit

Permalink
Online DDL: ALTER VITESS_MIGRATION CLEANUP ALL (vitessio#16314)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jul 24, 2024
1 parent 134a7dc commit 36f7193
Show file tree
Hide file tree
Showing 12 changed files with 4,923 additions and 4,670 deletions.
12 changes: 5 additions & 7 deletions go/cmd/vtctldclient/command/onlineddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ var (
RunE: commandOnlineDDLCancel,
}
OnlineDDLCleanup = &cobra.Command{
Use: "cleanup <keyspace> <uuid>",
Short: "Mark a given schema migration ready for artifact cleanup.",
Use: "cleanup <keyspace> <uuid|all>",
Short: "Mark a given schema migration, or all complete/failed/cancelled migrations, ready for artifact cleanup.",
Example: "OnlineDDL cleanup test_keyspace 82fa54ac_e83e_11ea_96b7_f875a4d24e90",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(2),
Expand Down Expand Up @@ -168,12 +168,10 @@ func commandOnlineDDLCancel(cmd *cobra.Command, args []string) error {
}

func commandOnlineDDLCleanup(cmd *cobra.Command, args []string) error {
keyspace := cmd.Flags().Arg(0)
uuid := cmd.Flags().Arg(1)
if !schema.IsOnlineDDLUUID(uuid) {
return fmt.Errorf("%s is not a valid UUID", uuid)
keyspace, uuid, err := analyzeOnlineDDLCommandWithUuidOrAllArgument(cmd)
if err != nil {
return err
}

cli.FinishedParsing(cmd)

resp, err := client.CleanupSchemaMigration(commandCtx, &vtctldatapb.CleanupSchemaMigrationRequest{
Expand Down
134 changes: 126 additions & 8 deletions go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func testScheduler(t *testing.T) {
commitTransactionChan := make(chan any)
transactionErrorChan := make(chan error)
t.Run("locking table rows", func(t *testing.T) {
go runInTransaction(t, ctx, shards[0].Vttablets[0], "select * from t1_test for update", commitTransactionChan, transactionErrorChan)
go runInTransaction(t, ctx, primaryTablet, "select * from t1_test for update", commitTransactionChan, transactionErrorChan)
})
t.Run("injecting heartbeats asynchronously", func(t *testing.T) {
go func() {
Expand Down Expand Up @@ -1068,6 +1068,22 @@ func testScheduler(t *testing.T) {
})
})

readCleanupsTimetamps := func(t *testing.T, migrationsLike string) (rows int64, cleanedUp int64, needCleanup int64, artifacts []string) {
rs := onlineddl.ReadMigrations(t, &vtParams, migrationsLike)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
rows++
if row["cleanup_timestamp"].IsNull() {
needCleanup++
} else {
cleanedUp++
}
migrationArtifacts := textutil.SplitDelimitedList(row.AsString("artifacts", ""))
artifacts = append(artifacts, migrationArtifacts...)
}
return
}

t.Run("Cleanup artifacts", func(t *testing.T) {
// Create a migration with a low --retain-artifacts value.
// We will cancel the migration and expect the artifact to be cleaned.
Expand Down Expand Up @@ -1104,14 +1120,14 @@ func testScheduler(t *testing.T) {
defer cancel()

for {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
row := rs.Named().Row()
require.NotNil(t, row)
if !row["cleanup_timestamp"].IsNull() {
rows, cleanedUp, needCleanup, _ := readCleanupsTimetamps(t, t1uuid)
assert.EqualValues(t, 1, rows)
if cleanedUp == 1 {
// This is what we've been waiting for
break
}
assert.EqualValues(t, 0, cleanedUp)
assert.EqualValues(t, 1, needCleanup)
select {
case <-ctx.Done():
assert.Fail(t, "timeout waiting for cleanup")
Expand All @@ -1125,6 +1141,108 @@ func testScheduler(t *testing.T) {
})
})

t.Run("cleanup artifacts with CLEANUP ALL", func(t *testing.T) {
// First, cleanup any existing migrations. We don't have an exact track of how many we've had so far.
t.Run("initial cleanup all", func(t *testing.T) {
t.Run("validate migrations exist that need cleanup", func(t *testing.T) {
_, _, needCleanup, _ := readCleanupsTimetamps(t, "%")
assert.Greater(t, needCleanup, int64(1))
})
t.Run("issue cleanup all", func(t *testing.T) {
cleanedUp := onlineddl.CheckCleanupAllMigrations(t, &vtParams, -1)
t.Logf("marked %d migrations for cleanup", cleanedUp)
})
t.Run("wait for all migrations cleanup", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime)
defer cancel()

for {
rows, cleanedUp, needCleanup, artifacts := readCleanupsTimetamps(t, "%")
if needCleanup == 0 {
// This is what we've been waiting for
assert.NotZero(t, rows)
assert.Equal(t, rows, cleanedUp)
assert.Empty(t, artifacts)
t.Logf("rows needing cleanup: %v", needCleanup)
return
}
select {
case <-ctx.Done():
assert.Fail(t, "timeout waiting for cleanup", "rows needing cleanup: %v. artifacts: %v", needCleanup, artifacts)
return
case <-time.After(time.Second):
}
t.Logf("rows needing cleanup: %v. artifacts: %v", needCleanup, artifacts)
}
})
})
// Create a migration with a low --retain-artifacts value.
// We will cancel the migration and expect the artifact to be cleaned.
t.Run("start migration", func(t *testing.T) {
// Intentionally set `--retain-artifacts=1h` which is a long time. Then we will issue
// `ALTER VITESS_MIGRATION CLEANUP ALL` and expect the artifact to be cleaned.
t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy+" --postpone-completion --retain-artifacts=1h", "vtctl", "", "", true)) // skip wait
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
})
t.Run("wait for ready_to_complete", func(t *testing.T) {
waitForReadyToComplete(t, t1uuid, true)
})
var artifacts []string
t.Run("validate artifact exists", func(t *testing.T) {
rs := onlineddl.ReadMigrations(t, &vtParams, t1uuid)
require.NotNil(t, rs)
row := rs.Named().Row()
require.NotNil(t, row)

artifacts = textutil.SplitDelimitedList(row.AsString("artifacts", ""))
require.Len(t, artifacts, 1)
checkTable(t, artifacts[0], true)

retainArtifactsSeconds := row.AsInt64("retain_artifacts_seconds", 0)
assert.EqualValues(t, 3600, retainArtifactsSeconds) // due to --retain-artifacts=1h
})
t.Run("check needs cleanup", func(t *testing.T) {
_, _, needCleanup, _ := readCleanupsTimetamps(t, "%")
assert.EqualValues(t, 1, needCleanup)
})
t.Run("complete migration", func(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusCancelled)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
t.Run("cleanup all", func(t *testing.T) {
onlineddl.CheckCleanupAllMigrations(t, &vtParams, 1)
})
t.Run("wait for migration cleanup", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), extendedWaitTime)
defer cancel()

for {
rows, cleanedUp, needCleanup, artifacts := readCleanupsTimetamps(t, "%")
if needCleanup == 0 {
// This is what we've been waiting for
assert.NotZero(t, rows)
assert.Equal(t, rows, cleanedUp)
assert.Empty(t, artifacts)
t.Logf("rows needing cleanup: %v", needCleanup)
return
}
select {
case <-ctx.Done():
assert.Fail(t, "timeout waiting for cleanup", "rows needing cleanup: %v. artifacts: %v", needCleanup, artifacts)
return
case <-time.After(time.Second):
}
t.Logf("rows needing cleanup: %v. artifacts: %v", needCleanup, artifacts)
}
})

t.Run("validate artifact does not exist", func(t *testing.T) {
checkTable(t, artifacts[0], false)
})
})

checkConstraintCapable, err := capableOf(capabilities.CheckConstraintsCapability) // 8.0.16 and above
require.NoError(t, err)
if checkConstraintCapable {
Expand All @@ -1138,7 +1256,7 @@ func testScheduler(t *testing.T) {
// name it `with_constraint_chk_1`. But we expect Online DDL to explicitly
// modify the constraint name, specifically to get rid of the <table-name> prefix,
// so that we don't get into https://bugs.mysql.com/bug.php?id=107772 situation.
createStatement := getCreateTableStatement(t, shards[0].Vttablets[0], "with_constraint")
createStatement := getCreateTableStatement(t, primaryTablet, "with_constraint")
assert.NotContains(t, createStatement, "with_constraint_chk")
})
})
Expand Down Expand Up @@ -2467,7 +2585,7 @@ func testForeignKeys(t *testing.T) {
//
// In this stress test, we enable Online DDL if the variable 'rename_table_preserve_foreign_key' is present. The Online DDL mechanism will in turn
// query for this variable, and manipulate it, when starting the migration and when cutting over.
rs, err := shards[0].Vttablets[0].VttabletProcess.QueryTablet("show global variables like 'rename_table_preserve_foreign_key'", keyspaceName, false)
rs, err := primaryTablet.VttabletProcess.QueryTablet("show global variables like 'rename_table_preserve_foreign_key'", keyspaceName, false)
require.NoError(t, err)
fkOnlineDDLPossible = len(rs.Rows) > 0
t.Logf("MySQL support for 'rename_table_preserve_foreign_key': %v", fkOnlineDDLPossible)
Expand Down
12 changes: 12 additions & 0 deletions go/test/endtoend/onlineddl/vtgate_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,18 @@ func CheckCancelAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCo
}
}

// CheckCleanupAllMigrations cleans up all applicable migrations and expect number of affected rows
// A negative value for expectCount indicates "don't care, no need to check"
func CheckCleanupAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCount int) uint64 {
cleanupQuery := "alter vitess_migration cleanup all"
r := VtgateExecQuery(t, vtParams, cleanupQuery, "")

if expectCount >= 0 {
assert.Equal(t, expectCount, int(r.RowsAffected))
}
return r.RowsAffected
}

// CheckLaunchAllMigrations launches all queued posponed migrations and expect number of affected rows
// A negative value for expectCount indicates "don't care, no need to check"
func CheckLaunchAllMigrations(t *testing.T, vtParams *mysql.ConnParams, expectCount int) {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/sqlparser/ast_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ func (node *AlterMigration) Format(buf *TrackedBuffer) {
alterType = "retry"
case CleanupMigrationType:
alterType = "cleanup"
case CleanupAllMigrationType:
alterType = "cleanup all"
case LaunchMigrationType:
alterType = "launch"
case LaunchAllMigrationType:
Expand Down
2 changes: 2 additions & 0 deletions go/vt/sqlparser/ast_format_fast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/vt/sqlparser/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ const (
CancelMigrationType
CancelAllMigrationType
CleanupMigrationType
CleanupAllMigrationType
ThrottleMigrationType
ThrottleAllMigrationType
UnthrottleMigrationType
Expand Down
2 changes: 2 additions & 0 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2450,6 +2450,8 @@ var (
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' retry",
}, {
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' cleanup",
}, {
input: "alter vitess_migration cleanup all",
}, {
input: "alter vitess_migration '9748c3b7_7fdb_11eb_ac2c_f875a4d24e90' launch",
}, {
Expand Down
Loading

0 comments on commit 36f7193

Please sign in to comment.