Skip to content

Commit cc68dd5

Browse files
authored
VReplication: Return lock error everywhere that LockName fails (#16560)
Signed-off-by: Matt Lord <[email protected]>
1 parent 61959f6 commit cc68dd5

File tree

4 files changed

+440
-43
lines changed

4 files changed

+440
-43
lines changed

go/vt/vtctl/workflow/framework_test.go

+108
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,23 @@ func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) {
229229
delete(env.tablets[tablet.Keyspace], int(tablet.Alias.Uid))
230230
}
231231

232+
func (env *testEnv) confirmRoutingAllTablesToTarget(t *testing.T) {
233+
t.Helper()
234+
env.tmc.mu.Lock()
235+
defer env.tmc.mu.Unlock()
236+
wantRR := make(map[string][]string)
237+
for _, sd := range env.tmc.schema {
238+
for _, td := range sd.TableDefinitions {
239+
for _, tt := range []string{"", "@rdonly", "@replica"} {
240+
wantRR[td.Name+tt] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
241+
wantRR[fmt.Sprintf("%s.%s", env.sourceKeyspace.KeyspaceName, td.Name+tt)] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
242+
wantRR[fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name+tt)] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
243+
}
244+
}
245+
}
246+
checkRouting(t, env.ws, wantRR)
247+
}
248+
232249
type testTMClient struct {
233250
tmclient.TabletManagerClient
234251
schema map[string]*tabletmanagerdatapb.SchemaDefinition
@@ -240,6 +257,7 @@ type testTMClient struct {
240257

241258
env *testEnv // For access to the env config from tmc methods.
242259
reverse atomic.Bool // Are we reversing traffic?
260+
frozen atomic.Bool // Are the workflows frozen?
243261
}
244262

245263
func newTestTMClient(env *testEnv) *testTMClient {
@@ -306,6 +324,9 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
306324
},
307325
},
308326
}
327+
if tmc.frozen.Load() {
328+
stream.Message = Frozen
329+
}
309330
res.Streams = append(res.Streams, stream)
310331
}
311332

@@ -503,3 +524,90 @@ func (tmc *testTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb
503524
func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int32, pos string) error {
504525
return nil
505526
}
527+
528+
//
529+
// Utility / helper functions.
530+
//
531+
532+
func checkRouting(t *testing.T, ws *Server, want map[string][]string) {
533+
t.Helper()
534+
ctx := context.Background()
535+
got, err := topotools.GetRoutingRules(ctx, ws.ts)
536+
require.NoError(t, err)
537+
require.EqualValues(t, got, want, "routing rules don't match: got: %v, want: %v", got, want)
538+
cells, err := ws.ts.GetCellInfoNames(ctx)
539+
require.NoError(t, err)
540+
for _, cell := range cells {
541+
checkCellRouting(t, ws, cell, want)
542+
}
543+
}
544+
545+
func checkCellRouting(t *testing.T, ws *Server, cell string, want map[string][]string) {
546+
t.Helper()
547+
ctx := context.Background()
548+
svs, err := ws.ts.GetSrvVSchema(ctx, cell)
549+
require.NoError(t, err)
550+
got := make(map[string][]string, len(svs.RoutingRules.Rules))
551+
for _, rr := range svs.RoutingRules.Rules {
552+
got[rr.FromTable] = append(got[rr.FromTable], rr.ToTables...)
553+
}
554+
require.EqualValues(t, got, want, "routing rules don't match for cell %s: got: %v, want: %v", cell, got, want)
555+
}
556+
557+
func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, want []string) {
558+
t.Helper()
559+
ctx := context.Background()
560+
si, err := ts.GetShard(ctx, keyspace, shard)
561+
require.NoError(t, err)
562+
tc := si.GetTabletControl(topodatapb.TabletType_PRIMARY)
563+
var got []string
564+
if tc != nil {
565+
got = tc.DeniedTables
566+
}
567+
require.EqualValues(t, got, want, "denied tables for %s/%s: got: %v, want: %v", keyspace, shard, got, want)
568+
}
569+
570+
func checkServedTypes(t *testing.T, ts *topo.Server, keyspace, shard string, want int) {
571+
t.Helper()
572+
ctx := context.Background()
573+
si, err := ts.GetShard(ctx, keyspace, shard)
574+
require.NoError(t, err)
575+
servedTypes, err := ts.GetShardServingTypes(ctx, si)
576+
require.NoError(t, err)
577+
require.Equal(t, want, len(servedTypes), "shard %s/%s has wrong served types: got: %v, want: %v",
578+
keyspace, shard, len(servedTypes), want)
579+
}
580+
581+
func checkCellServedTypes(t *testing.T, ts *topo.Server, keyspace, shard, cell string, want int) {
582+
t.Helper()
583+
ctx := context.Background()
584+
srvKeyspace, err := ts.GetSrvKeyspace(ctx, cell, keyspace)
585+
require.NoError(t, err)
586+
count := 0
587+
outer:
588+
for _, partition := range srvKeyspace.GetPartitions() {
589+
for _, ref := range partition.ShardReferences {
590+
if ref.Name == shard {
591+
count++
592+
continue outer
593+
}
594+
}
595+
}
596+
require.Equal(t, want, count, "serving types for %s/%s in cell %s: got: %d, want: %d", keyspace, shard, cell, count, want)
597+
}
598+
599+
func checkIfPrimaryServing(t *testing.T, ts *topo.Server, keyspace, shard string, want bool) {
600+
t.Helper()
601+
ctx := context.Background()
602+
si, err := ts.GetShard(ctx, keyspace, shard)
603+
require.NoError(t, err)
604+
require.Equal(t, want, si.IsPrimaryServing, "primary serving for %s/%s: got: %v, want: %v", keyspace, shard, si.IsPrimaryServing, want)
605+
}
606+
607+
func checkIfTableExistInVSchema(ctx context.Context, t *testing.T, ts *topo.Server, keyspace, table string) bool {
608+
vschema, err := ts.GetVSchema(ctx, keyspace)
609+
require.NoError(t, err)
610+
require.NotNil(t, vschema)
611+
_, ok := vschema.Tables[table]
612+
return ok
613+
}

go/vt/vtctl/workflow/server.go

+28-35
Original file line numberDiff line numberDiff line change
@@ -2581,20 +2581,18 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
25812581
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
25822582
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropTargets")
25832583
if lockErr != nil {
2584-
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
2584+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
25852585
}
25862586
defer workflowUnlock(&err)
25872587
ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets")
25882588
if lockErr != nil {
2589-
ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr)
2590-
return nil, lockErr
2589+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
25912590
}
25922591
defer sourceUnlock(&err)
25932592
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
25942593
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets")
25952594
if lockErr != nil {
2596-
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
2597-
return nil, lockErr
2595+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
25982596
}
25992597
defer targetUnlock(&err)
26002598
ctx = lockCtx
@@ -2779,20 +2777,18 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
27792777
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
27802778
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropSources")
27812779
if lockErr != nil {
2782-
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
2780+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
27832781
}
27842782
defer workflowUnlock(&err)
27852783
ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources")
27862784
if lockErr != nil {
2787-
ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr)
2788-
return nil, lockErr
2785+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
27892786
}
27902787
defer sourceUnlock(&err)
27912788
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
27922789
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources")
27932790
if lockErr != nil {
2794-
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
2795-
return nil, lockErr
2791+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
27962792
}
27972793
defer targetUnlock(&err)
27982794
ctx = lockCtx
@@ -3020,13 +3016,12 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche
30203016
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
30213017
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "completeMigrateWorkflow")
30223018
if lockErr != nil {
3023-
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
3019+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
30243020
}
30253021
defer workflowUnlock(&err)
30263022
ctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow")
30273023
if lockErr != nil {
3028-
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
3029-
return nil, lockErr
3024+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
30303025
}
30313026
defer targetUnlock(&err)
30323027

@@ -3193,16 +3188,10 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
31933188

31943189
cellsStr := strings.Join(req.Cells, ",")
31953190

3196-
// Consistently handle errors by logging and returning them.
3197-
handleError := func(message string, err error) (*[]string, error) {
3198-
werr := vterrors.Wrapf(err, message)
3199-
ts.Logger().Error(werr)
3200-
return nil, werr
3201-
}
3202-
32033191
log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String())
32043192
if !switchReplica && !switchRdonly {
3205-
return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
3193+
return defaultErrorHandler(ts.Logger(), "invalid tablet types",
3194+
vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
32063195
}
32073196
// For partial (shard-by-shard migrations) or multi-tenant migrations, traffic for all tablet types
32083197
// is expected to be switched at once. For other MoveTables migrations where we use table routing rules
@@ -3214,24 +3203,28 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
32143203
trafficSwitchingIsAllOrNothing = true
32153204
case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.IsMultiTenantMigration():
32163205
if direction == DirectionBackward {
3217-
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "requesting reversal of read traffic for multi-tenant migrations is not supported"))
3206+
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
3207+
"requesting reversal of read traffic for multi-tenant migrations is not supported"))
32183208
}
32193209
// For multi-tenant migrations, we only support switching traffic to all cells at once
32203210
allCells, err := ts.TopoServer().GetCellInfoNames(ctx)
32213211
if err != nil {
32223212
return nil, err
32233213
}
32243214
if len(req.GetCells()) != 0 && len(req.GetCells()) != len(allCells) {
3225-
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "requesting read traffic for multi-tenant migrations must include all cells"))
3215+
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
3216+
"requesting read traffic for multi-tenant migrations must include all cells"))
32263217
}
32273218
}
32283219

32293220
if !trafficSwitchingIsAllOrNothing {
32303221
if direction == DirectionBackward && switchReplica && len(state.ReplicaCellsSwitched) == 0 {
3231-
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
3222+
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
3223+
"requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
32323224
}
32333225
if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 {
3234-
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
3226+
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
3227+
"requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
32353228
}
32363229
}
32373230

@@ -3253,7 +3246,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
32533246
// If journals exist notify user and fail.
32543247
journalsExist, _, err := ts.checkJournals(ctx)
32553248
if err != nil {
3256-
return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
3249+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
32573250
}
32583251
if journalsExist {
32593252
log.Infof("Found a previous journal entry for %d", ts.id)
@@ -3266,7 +3259,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
32663259
}
32673260

32683261
if err := ts.validate(ctx); err != nil {
3269-
return handleError("workflow validation failed", err)
3262+
return defaultErrorHandler(ts.Logger(), "workflow validation failed", err)
32703263
}
32713264

32723265
// For switching reads, locking the source keyspace is sufficient.
@@ -3282,7 +3275,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
32823275
// For reads, locking the source keyspace is sufficient.
32833276
ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTTL))
32843277
if lockErr != nil {
3285-
return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
3278+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
32863279
}
32873280
defer unlock(&err)
32883281
confirmKeyspaceLocksHeld := func() error {
@@ -3297,7 +3290,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
32973290

32983291
// Remove mirror rules for the specified tablet types.
32993292
if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil {
3300-
return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types",
3293+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types",
33013294
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
33023295
}
33033296

@@ -3306,36 +3299,36 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
33063299
case ts.IsMultiTenantMigration():
33073300
err := sw.switchKeyspaceReads(ctx, roTabletTypes)
33083301
if err != nil {
3309-
return handleError(fmt.Sprintf("failed to switch read traffic, from source keyspace %s to target keyspace %s, workflow %s",
3302+
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to switch read traffic, from source keyspace %s to target keyspace %s, workflow %s",
33103303
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
33113304
}
33123305
case ts.isPartialMigration:
33133306
ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.")
33143307
default:
33153308
err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, rebuildSrvVSchema, direction)
33163309
if err != nil {
3317-
return handleError("failed to switch read traffic for the tables", err)
3310+
return defaultErrorHandler(ts.Logger(), "failed to switch read traffic for the tables", err)
33183311
}
33193312
}
33203313
return sw.logs(), nil
33213314
}
33223315

33233316
if err := confirmKeyspaceLocksHeld(); err != nil {
3324-
return handleError("locks were lost", err)
3317+
return defaultErrorHandler(ts.Logger(), "locks were lost", err)
33253318
}
33263319
ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction)
33273320
if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil {
3328-
return handleError("failed to switch read traffic for the shards", err)
3321+
return defaultErrorHandler(ts.Logger(), "failed to switch read traffic for the shards", err)
33293322
}
33303323

33313324
if err := confirmKeyspaceLocksHeld(); err != nil {
3332-
return handleError("locks were lost", err)
3325+
return defaultErrorHandler(ts.Logger(), "locks were lost", err)
33333326
}
33343327
ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction)
33353328
if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil {
33363329
err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s",
33373330
ts.targetKeyspace, cellsStr)
3338-
return handleError("failed to validate SrvKeyspace record", err2)
3331+
return defaultErrorHandler(ts.Logger(), "failed to validate SrvKeyspace record", err2)
33393332
}
33403333
return sw.logs(), nil
33413334
}

0 commit comments

Comments
 (0)