From e4dc8729ec6b987c8b582fcf0e2d7255e28d0694 Mon Sep 17 00:00:00 2001 From: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Date: Tue, 10 Dec 2024 21:18:10 +0100 Subject: [PATCH] SwitchTraffic: use separate context while canceling a migration (#17340) Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/traffic_switcher.go | 27 ++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 937dffe70b3..4fc34992b0f 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1135,30 +1135,45 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context) error { return nil } +// cancelMigration attempts to revert all changes made during the migration so that we can get back to the +// state when traffic switching (or reversing) was initiated. func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { var err error + + if ctx.Err() != nil { + // Even though we create a new context later on we still record any context error: + // for forensics in case of failures. + ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err()) + } + + // We create a new context while canceling the migration, so that we are independent of the original + // context being cancelled prior to or during the cancel operation. + cmTimeout := 60 * time.Second + cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout) + defer cmCancel() + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.switchDeniedTables(ctx) + err = ts.switchDeniedTables(cmCtx) } else { - err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) + err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { - ts.Logger().Errorf("Cancel migration failed: %v", err) + ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) } - sm.CancelStreamMigrations(ctx) + sm.CancelStreamMigrations(cmCtx) err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) - _, err := ts.TabletManagerClient().VReplicationExec(ctx, target.GetPrimary().Tablet, query) + _, err := ts.TabletManagerClient().VReplicationExec(cmCtx, target.GetPrimary().Tablet, query) return err }) if err != nil { ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } - err = ts.deleteReverseVReplication(ctx) + err = ts.deleteReverseVReplication(cmCtx) if err != nil { ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) }