Skip to content

Commit

Permalink
[release-16.0] VDiff: wait for shard streams of one table diff to com…
Browse files Browse the repository at this point in the history
…plete for before starting that of the next table (vitessio#14345) (vitessio#14380)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
vitess-bot[bot] and mattlord authored Nov 11, 2023
1 parent c3bd611 commit 12d93c7
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go/vt/sidecardb/schema/vdiff/vdiff.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS _vt.vdiff
`started_at` timestamp NULL DEFAULT NULL,
`liveness_timestamp` timestamp NULL DEFAULT NULL,
`completed_at` timestamp NULL DEFAULT NULL,
`last_error` varbinary(512) DEFAULT NULL,
`last_error` varbinary(1024) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uuid_idx` (`vdiff_uuid`),
KEY `state` (`state`),
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vdiff/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestEngineOpen(t *testing.T) {
), nil)

// Now let's short circuit the vdiff as we know that the open has worked as expected.
shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient)
shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient)

vdenv.vde.Open(context.Background(), vdiffenv.vre)
defer vdenv.vde.Close()
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestVDiff(t *testing.T) {
),
fmt.Sprintf("1|%s|%s|%s||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", vdiffenv.workflow, vreplSource, vdiffSourceGtid, vdiffDBName),
), nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: started')", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest(`select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report
from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestVDiff(t *testing.T) {
vdenv.dbClient.ExpectRequest("update _vt.vdiff_table set state = 'completed' where vdiff_id = 1 and table_name = 't1'", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed'", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = '' , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = left('', 1024) , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: completed')", singleRowAffected, nil)

vdenv.vde.mu.Lock()
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestEngineRetryErroredVDiffs(t *testing.T) {
), nil)

// At this point we know that we kicked off the expected retry so we can short circit the vdiff.
shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient)
shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient)

expectedControllerCnt++
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func resetBinlogClient() {
// has verified the necessary behavior.
func shortCircuitTestAfterQuery(query string, dbClient *binlogplayer.MockDBClient) {
dbClient.ExpectRequest(query, singleRowAffected, fmt.Errorf("Short circuiting test"))
dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = 'Short circuiting test' where id = 1", singleRowAffected, nil)
dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = left('Short circuiting test', 1024) where id = 1", singleRowAffected, nil)
dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: error')", singleRowAffected, nil)
dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'Error: Short circuiting test')", singleRowAffected, nil)
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ const (
IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report
from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vd.id = %d`
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`.
// It also truncates the error if needed to ensure that we can save the state when the error text is very long.
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %d and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
Expand Down
34 changes: 22 additions & 12 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

Expand All @@ -39,14 +34,19 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// how long to wait for background operations to complete
Expand All @@ -73,6 +73,12 @@ type tableDiffer struct {
sourceQuery string
table *tabletmanagerdatapb.TableDefinition
lastPK *querypb.QueryResult

// wgShardStreamers is used, with a cancellable context, to wait for all shard streamers
// to finish after each diff is complete.
wgShardStreamers sync.WaitGroup
shardStreamsCtx context.Context
shardStreamsCancel context.CancelFunc
}

func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefinition, sourceQuery string) *tableDiffer {
Expand Down Expand Up @@ -122,19 +128,21 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx)

if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
return err
}
if err := td.startSourceDataStreams(ctx); err != nil {
if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil {
return err
}
if err := td.syncTargetStreams(ctx); err != nil {
return err
}
if err := td.startTargetDataStream(ctx); err != nil {
if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil {
return err
}
td.setupRowSorters()
Expand Down Expand Up @@ -204,7 +212,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
var (
wg sync.WaitGroup
sourceErr, targetErr error
targetTablet *topodata.Tablet
targetTablet *topodatapb.Tablet
)

// The cells from the vdiff record are a comma separated list.
Expand Down Expand Up @@ -255,7 +263,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
func pickTablet(ctx context.Context, ts *topo.Server, cells []string, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) {
tp, err := discovery.NewTabletPicker(ts, cells, keyspace, shard, tabletTypes)
if err != nil {
return nil, err
Expand Down Expand Up @@ -355,10 +363,12 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err

func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) {
log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query)
td.wgShardStreamers.Add(1)
defer func() {
log.Infof("streamOneShard End on %s", participant.tablet.Alias.String())
close(participant.result)
close(gtidch)
td.wgShardStreamers.Done()
}()
participant.err = func() error {
conn, err := tabletconn.GetDialer()(participant.tablet, false)
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa
}

func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error {
defer func() {
if td.shardStreamsCancel != nil {
td.shardStreamsCancel()
}
// Wait for all the shard streams to finish before returning.
td.wgShardStreamers.Wait()
}()

select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
Expand Down

0 comments on commit 12d93c7

Please sign in to comment.