Skip to content

Commit

Permalink
VReplication: Make Target Sequence Initialization More Robust (vitess…
Browse files Browse the repository at this point in the history
…io#15289)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Feb 20, 2024
1 parent b3a2c99 commit 999f1c1
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 38 deletions.
2 changes: 1 addition & 1 deletion go/sqltypes/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func MakeTestResult(fields []*querypb.Field, rows ...string) *Result {
for i, row := range rows {
result.Rows[i] = make([]Value, len(fields))
for j, col := range split(row) {
if col == "null" {
if strings.ToLower(col) == "null" {
result.Rows[i][j] = NULL
continue
}
Expand Down
25 changes: 14 additions & 11 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3301,6 +3301,20 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
sw.cancelMigration(ctx, sm)
return handleError("failed to create the reverse vreplication streams", err)
}

// Initialize any target sequences, if there are any, before allowing new writes.
if req.InitializeTargetSequences && len(sequenceMetadata) > 0 {
ts.Logger().Infof("Initializing target sequences")
// Writes are blocked so we can safely initialize the sequence tables but
// we also want to use a shorter timeout than the parent context.
// We use at most half of the overall timeout.
initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
sw.cancelMigration(ctx, sm)
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
} else {
if cancel {
return handleError("invalid cancel", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "traffic switching has reached the point of no return, cannot cancel"))
Expand All @@ -3317,17 +3331,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
if err := sw.createJournals(ctx, sourceWorkflows); err != nil {
return handleError("failed to create the journal", err)
}
// Initialize any target sequences, if there are any, before allowing new writes.
if req.InitializeTargetSequences && len(sequenceMetadata) > 0 {
// Writes are blocked so we can safely initialize the sequence tables but
// we also want to use a shorter timeout than the parent context.
// We use up at most half of the overall timeout.
initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
if err := sw.allowTargetWrites(ctx); err != nil {
return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err)
}
Expand Down
17 changes: 11 additions & 6 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -1432,13 +1433,17 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen
MaxRows: 1,
})
if terr != nil || len(qr.Rows) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, terr)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr)
}
maxID, terr := sqltypes.Proto3ToResult(qr).Rows[0][0].ToInt64()
if terr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, terr)
rawVal := sqltypes.Proto3ToResult(qr).Rows[0][0]
maxID := int64(0)
if !rawVal.IsNull() { // If it's NULL then there are no rows and 0 remains the max
maxID, terr = rawVal.ToInt64()
if terr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr)
}
}
srMu.Lock()
defer srMu.Unlock()
Expand Down
42 changes: 25 additions & 17 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -619,6 +620,20 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa
sw.cancelMigration(ctx, sm)
return handleError("failed to create the reverse vreplication streams", err)
}

// Initialize any target sequences, if there are any, before allowing new writes.
if initializeTargetSequences && len(sequenceMetadata) > 0 {
ts.Logger().Infof("Initializing target sequences")
// Writes are blocked so we can safely initialize the sequence tables but
// we also want to use a shorter timeout than the parent context.
// We use at most half of the overall timeout.
initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
sw.cancelMigration(ctx, sm)
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
} else {
if cancel {
return handleError("invalid cancel", fmt.Errorf("traffic switching has reached the point of no return, cannot cancel"))
Expand All @@ -635,17 +650,6 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa
if err := sw.createJournals(ctx, sourceWorkflows); err != nil {
return handleError("failed to create the journal", err)
}
// Initialize any target sequences, if there are any, before allowing new writes.
if initializeTargetSequences && len(sequenceMetadata) > 0 {
// Writes are blocked so we can safely initialize the sequence tables but
// we also want to use a shorter timeout than the parent context.
// We use up at most half of the overall timeout.
initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2)
defer cancel()
if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil {
return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err)
}
}
if err := sw.allowTargetWrites(ctx); err != nil {
return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err)
}
Expand Down Expand Up @@ -2197,13 +2201,17 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen
)
qr, terr := ts.wr.ExecuteFetchAsApp(ictx, primary.GetAlias(), true, query.Query, 1)
if terr != nil || len(qr.Rows) != 1 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, terr)
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr)
}
maxID, terr := sqltypes.Proto3ToResult(qr).Rows[0][0].ToInt64()
if terr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, terr)
rawVal := sqltypes.Proto3ToResult(qr).Rows[0][0]
maxID := int64(0)
if !rawVal.IsNull() { // If it's NULL then there are no rows and 0 remains the max
maxID, terr = rawVal.ToInt64()
if terr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v",
ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr)
}
}
srMu.Lock()
defer srMu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
"maxval",
"int64",
),
"5",
"NULL",
),
)
tme.tmeDB.AddQuery(fmt.Sprintf(maxValForSequence, "ks2", "t2"),
Expand All @@ -274,7 +274,7 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
// Now tell the fakesqldb used by the global keyspace tablets to expect
// the sequence management related queries against the target keyspace.
gfdb.AddQuery(
sqlparser.BuildParsedQuery(sqlInitSequenceTable, sqlescape.EscapeID("vt_global"), sqlescape.EscapeID("t1_seq"), 6, 6, 6).Query,
sqlparser.BuildParsedQuery(sqlInitSequenceTable, sqlescape.EscapeID("vt_global"), sqlescape.EscapeID("t1_seq"), 1, 1, 1).Query,
&sqltypes.Result{RowsAffected: 0},
)
gfdb.AddQuery(
Expand Down
2 changes: 1 addition & 1 deletion go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,8 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) {
"\tKeyspace ks1, Shard 0 at Position MariaDB/5-456-892",
"Wait for VReplication on stopped streams to catchup for up to 1s",
"Create reverse replication workflow test_reverse",
"Create journal entries on source databases",
"The following sequence backing tables used by tables being moved will be initialized: t1_seq,t2_seq",
"Create journal entries on source databases",
"Enable writes on keyspace ks2 tables [t1,t2]",
"Switch routing from keyspace ks1 to keyspace ks2",
"Routing rules for tables [t1,t2] will be updated",
Expand Down

0 comments on commit 999f1c1

Please sign in to comment.