Skip to content

Commit

Permalink
SwitchTraffic: use separate context while canceling a migration (vite…
Browse files Browse the repository at this point in the history
…ssio#17340)

Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps authored Dec 10, 2024
1 parent 0d84a49 commit e4dc872
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,30 +1135,45 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error {
return nil
}

// cancelMigration attempts to revert all changes made during the migration so that we can get back to the
// state when traffic switching (or reversing) was initiated.
func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
var err error

if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
}

// We create a new context while canceling the migration, so that we are independent of the original
// context being cancelled prior to or during the cancel operation.
cmTimeout := 60 * time.Second
cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout)
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
err = ts.switchDeniedTables(ctx)
err = ts.switchDeniedTables(cmCtx)
} else {
err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
ts.Logger().Errorf("Cancel migration failed: %v", err)
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
}

sm.CancelStreamMigrations(ctx)
sm.CancelStreamMigrations(cmCtx)

err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
_, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query)
_, err := ts.TabletManagerClient().VReplicationExec(cmCtx, target.GetPrimary().Tablet, query)
return err
})
if err != nil {
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
}

err = ts.deleteReverseVReplication(ctx)
err = ts.deleteReverseVReplication(cmCtx)
if err != nil {
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
}
Expand Down

0 comments on commit e4dc872

Please sign in to comment.