-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Online DDL: Fail a --in-order-completion migration, if a _prior_ migration within the same context is 'failed' or 'cancelled' #16071
Merged
shlomi-noach
merged 9 commits into
vitessio:main
from
planetscale:onlineddl-in-order-execution
Jun 6, 2024
Merged
Changes from 8 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
a5f2c54
refactor getNonConflictingMigration function
shlomi-noach 768d426
early break
shlomi-noach f621067
Merge branch 'main' into onlineddl-in-order-execution
shlomi-noach e73ce7a
Adding scheduler tests for sequential in-order completion, expect ear…
shlomi-noach a24634e
add query to select migrations by context
shlomi-noach d2eff24
Test that a runniing in-order migration fails if a prior migration is…
shlomi-noach 7d6cee0
all in one 'sqlSelectFailedCancelledMigrationsInContextBeforeMigratio…
shlomi-noach 1cf18be
Fail a --in-order-completion migration, if a _prior_ migration within…
shlomi-noach c429499
use assert.Len, require.Len
shlomi-noach File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2854,6 +2854,31 @@ func (e *Executor) getCompletedMigrationByContextAndSQL(ctx context.Context, onl | |
return completedUUID, nil | ||
} | ||
|
||
// readFailedCancelledMigrationsInContextBeforeMigration returns UUIDs for migrations that are failed/cancelled | ||
// and are in the same context as given migration and _precede_ it chronologically (have lower `id` value) | ||
func (e *Executor) readFailedCancelledMigrationsInContextBeforeMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (uuids []string, err error) { | ||
if onlineDDL.MigrationContext == "" { | ||
// only applies to migrations with an explicit context | ||
return nil, nil | ||
} | ||
query, err := sqlparser.ParseAndBind(sqlSelectFailedCancelledMigrationsInContextBeforeMigration, | ||
sqltypes.StringBindVariable(onlineDDL.MigrationContext), | ||
sqltypes.StringBindVariable(onlineDDL.UUID), | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
r, err := e.execQuery(ctx, query) | ||
if err != nil { | ||
return uuids, err | ||
} | ||
for _, row := range r.Named().Rows { | ||
uuid := row["migration_uuid"].ToString() | ||
uuids = append(uuids, uuid) | ||
} | ||
return uuids, err | ||
} | ||
|
||
// failMigration marks a migration as failed | ||
func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDDL, withError error) error { | ||
defer e.triggerNextCheckInterval() | ||
|
@@ -2865,6 +2890,23 @@ func (e *Executor) failMigration(ctx context.Context, onlineDDL *schema.OnlineDD | |
return withError | ||
} | ||
|
||
// validateInOrderMigration checks whether an in-order migration should be forced to fail, either before running or | ||
// while running. | ||
// This may happen if a prior migration in the same context has failed or was cancelled. | ||
func (e *Executor) validateInOrderMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) (wasFailed bool, err error) { | ||
if !onlineDDL.StrategySetting().IsInOrderCompletion() { | ||
return false, nil | ||
} | ||
uuids, err := e.readFailedCancelledMigrationsInContextBeforeMigration(ctx, onlineDDL) | ||
if err != nil { | ||
return false, err | ||
} | ||
if len(uuids) == 0 { | ||
return false, err | ||
} | ||
return true, e.failMigration(ctx, onlineDDL, fmt.Errorf("migration %v cannot run because prior migration %v in same context has failed/was cancelled", onlineDDL.UUID, uuids[0])) | ||
} | ||
|
||
// analyzeDropDDLActionMigration analyzes a DROP <TABLE|VIEW> migration. | ||
func (e *Executor) analyzeDropDDLActionMigration(ctx context.Context, onlineDDL *schema.OnlineDDL) error { | ||
// Schema analysis: | ||
|
@@ -3384,6 +3426,58 @@ func (e *Executor) executeMigration(ctx context.Context, onlineDDL *schema.Onlin | |
return nil | ||
} | ||
|
||
// getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations. | ||
// Conflicts are: | ||
// - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent | ||
// - a migration is 'ready' but there's another migration 'running' on the exact same table | ||
func (e *Executor) getNonConflictingMigration(ctx context.Context) (*schema.OnlineDDL, error) { | ||
pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
r, err := e.execQuery(ctx, sqlSelectReadyMigrations) | ||
if err != nil { | ||
return nil, err | ||
} | ||
for _, row := range r.Named().Rows { | ||
uuid := row["migration_uuid"].ToString() | ||
onlineDDL, migrationRow, err := e.readMigration(ctx, uuid) | ||
if err != nil { | ||
return nil, err | ||
} | ||
isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false) | ||
|
||
if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { | ||
continue // this migration conflicts with a running one | ||
} | ||
if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs { | ||
return nil, nil // too many running migrations | ||
} | ||
if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() { | ||
// This migration is immediate: if we run it now, it will complete within a second or two at most. | ||
if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID { | ||
continue | ||
} | ||
} | ||
// We will fail an in-order migration if there's _prior_ migrations within the same migration-context | ||
// which have failed. | ||
if onlineDDL.StrategySetting().IsInOrderCompletion() { | ||
wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if wasFailed { | ||
continue | ||
} | ||
} | ||
Comment on lines
+3462
to
+3472
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of this function is just an extract/refactor out of |
||
// This migration seems good to go | ||
return onlineDDL, err | ||
} | ||
// no non-conflicting migration found... | ||
// Either all ready migrations are conflicting, or there are no ready migrations... | ||
return nil, nil | ||
} | ||
|
||
// runNextMigration picks up to one 'ready' migration that is able to run, and executes it. | ||
// Possible scenarios: | ||
// - no migration is in 'ready' state -- nothing to be done | ||
|
@@ -3405,47 +3499,7 @@ func (e *Executor) runNextMigration(ctx context.Context) error { | |
return nil | ||
} | ||
|
||
// getNonConflictingMigration finds a single 'ready' migration which does not conflict with running migrations. | ||
// Conflicts are: | ||
// - a migration is 'ready' but is not set to run _concurrently_, and there's a running migration that is also non-concurrent | ||
// - a migration is 'ready' but there's another migration 'running' on the exact same table | ||
getNonConflictingMigration := func() (*schema.OnlineDDL, error) { | ||
pendingMigrationsUUIDs, err := e.readPendingMigrationsUUIDs(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
r, err := e.execQuery(ctx, sqlSelectReadyMigrations) | ||
if err != nil { | ||
return nil, err | ||
} | ||
for _, row := range r.Named().Rows { | ||
uuid := row["migration_uuid"].ToString() | ||
onlineDDL, migrationRow, err := e.readMigration(ctx, uuid) | ||
if err != nil { | ||
return nil, err | ||
} | ||
isImmediateOperation := migrationRow.AsBool("is_immediate_operation", false) | ||
|
||
if conflictFound, _ := e.isAnyConflictingMigrationRunning(onlineDDL); conflictFound { | ||
continue // this migration conflicts with a running one | ||
} | ||
if e.countOwnedRunningMigrations() >= maxConcurrentOnlineDDLs { | ||
continue // too many running migrations | ||
} | ||
if isImmediateOperation && onlineDDL.StrategySetting().IsInOrderCompletion() { | ||
// This migration is immediate: if we run it now, it will complete within a second or two at most. | ||
if len(pendingMigrationsUUIDs) > 0 && pendingMigrationsUUIDs[0] != onlineDDL.UUID { | ||
continue | ||
} | ||
} | ||
// This migration seems good to go | ||
return onlineDDL, err | ||
} | ||
// no non-conflicting migration found... | ||
// Either all ready migrations are conflicting, or there are no ready migrations... | ||
return nil, nil | ||
} | ||
onlineDDL, err := getNonConflictingMigration() | ||
onlineDDL, err := e.getNonConflictingMigration(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -3792,6 +3846,19 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i | |
_ = e.updateMigrationETASecondsByProgress(ctx, uuid) | ||
_ = e.updateMigrationLastThrottled(ctx, uuid, time.Unix(s.timeThrottled, 0), s.componentThrottled) | ||
|
||
if onlineDDL.StrategySetting().IsInOrderCompletion() { | ||
// We will fail an in-order migration if there's _prior_ migrations within the same migration-context | ||
// which have failed. | ||
wasFailed, err := e.validateInOrderMigration(ctx, onlineDDL) | ||
if err != nil { | ||
return err | ||
} | ||
if wasFailed { | ||
return nil | ||
} | ||
} | ||
|
||
// Check if the migration is ready to cut-over, and proceed to do so if it is. | ||
isReady, err := e.isVReplMigrationReadyToCutOver(ctx, onlineDDL, s) | ||
if err != nil { | ||
_ = e.updateMigrationMessage(ctx, uuid, err.Error()) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you didn't already know, you can use
assert.Len
too/instead.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not know!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done