Skip to content

Commit

Permalink
[release-19.0] SwitchTraffic: use separate context while canceling a …
Browse files Browse the repository at this point in the history
…migration (#17340) (#17364)

Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
Co-authored-by: Rohit Nayak <[email protected]>
  • Loading branch information
3 people authored Jan 12, 2025
1 parent c9653d6 commit e0caf4a
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,30 +994,45 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
return ts.TopoServer().RebuildSrvVSchema(ctx, nil)
}

// cancelMigration attempts to revert all changes made during the migration so that we can get back to the
// state when traffic switching (or reversing) was initiated.
func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) {
var err error

if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
}

// We create a new context while canceling the migration, so that we are independent of the original
// context being cancelled prior to or during the cancel operation.
cmTimeout := 60 * time.Second
cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout)
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
err = ts.changeTableSourceWrites(ctx, allowWrites)
} else {
err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
ts.Logger().Errorf("Cancel migration failed: %v", err)
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
}

sm.CancelStreamMigrations(ctx)
sm.CancelStreamMigrations(cmCtx)

err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
_, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query)
_, err := ts.TabletManagerClient().VReplicationExec(cmCtx, target.GetPrimary().Tablet, query)
return err
})
if err != nil {
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
}

err = ts.deleteReverseVReplication(ctx)
err = ts.deleteReverseVReplication(cmCtx)
if err != nil {
ts.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err)
}
Expand Down

0 comments on commit e0caf4a

Please sign in to comment.