From e0caf4ac55f2e50f9bd09a395624cb120fa57224 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Sun, 12 Jan 2025 19:35:01 +0100 Subject: [PATCH] [release-19.0] SwitchTraffic: use separate context while canceling a migration (#17340) (#17364) Signed-off-by: Rohit Nayak Co-authored-by: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Co-authored-by: Rohit Nayak --- go/vt/vtctl/workflow/traffic_switcher.go | 25 +++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 2f6e0659025..c3327017010 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -994,30 +994,45 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a return ts.TopoServer().RebuildSrvVSchema(ctx, nil) } +// cancelMigration attempts to revert all changes made during the migration so that we can get back to the +// state when traffic switching (or reversing) was initiated. func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { var err error + + if ctx.Err() != nil { + // Even though we create a new context later on we still record any context error: + // for forensics in case of failures. + ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err()) + } + + // We create a new context while canceling the migration, so that we are independent of the original + // context being cancelled prior to or during the cancel operation. + cmTimeout := 60 * time.Second + cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout) + defer cmCancel() + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { err = ts.changeTableSourceWrites(ctx, allowWrites) } else { - err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) + err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { - ts.Logger().Errorf("Cancel migration failed: %v", err) + ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) } - sm.CancelStreamMigrations(ctx) + sm.CancelStreamMigrations(cmCtx) err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) - _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query) + _, err := ts.TabletManagerClient().VReplicationExec(cmCtx, target.GetPrimary().Tablet, query) return err }) if err != nil { ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } - err = ts.deleteReverseVReplication(ctx) + err = ts.deleteReverseVReplication(cmCtx) if err != nil { ts.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err) }