diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go index 24e6cb0dbc3..e2b1b8bf100 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go @@ -181,6 +181,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string { if pos == "" { return []string{ "/insert into _vt.vreplication", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "begin", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", @@ -190,6 +191,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string { } return []string{ "/insert into _vt.vreplication", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running'", } } diff --git a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go index 9dfdee766d1..611f0f4ac3b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go @@ -77,6 +77,7 @@ func TestJournalOneToOne(t *testing.T) { fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), "commit", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running', message='' where id.*", )) @@ -145,6 +146,8 @@ func TestJournalOneToMany(t *testing.T) { "commit", "/update _vt.vreplication set message='Picked source tablet.*", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running', message='' where id.*", "/update _vt.vreplication set state='Running', message='' where id.*", )) @@ -207,6 +210,7 @@ func TestJournalTablePresent(t *testing.T) { fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID), "commit", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running', message='' where id.*", )) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 1c977d6287c..bee7b28d04e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -173,6 +173,7 @@ func testPlayerCopyCharPK(t *testing.T) { expectNontxQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", "insert into dst(idc,val) values ('a\\0',1)", @@ -280,6 +281,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { expectNontxQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", // Copy mode. @@ -403,6 +405,7 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) { expectNontxQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", // Copy mode. @@ -477,6 +480,7 @@ func testPlayerCopyTablesWithFK(t *testing.T) { expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "select @@foreign_key_checks;", // Create the list of tables to copy and transition to Copying state. "begin", @@ -609,6 +613,7 @@ func testPlayerCopyTables(t *testing.T) { expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -751,6 +756,7 @@ func testPlayerCopyBigTable(t *testing.T) { expectNontxQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", // The first fast-forward has no starting point. So, it just saves the current position. "/update _vt.vreplication set state='Copying'", @@ -881,6 +887,7 @@ func testPlayerCopyWildcardRule(t *testing.T) { expectNontxQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", // The first fast-forward has no starting point. So, it just saves the current position. @@ -1042,6 +1049,7 @@ func testPlayerCopyTableContinuation(t *testing.T) { expectNontxQueries(t, qh.Expect( // Catchup "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "insert into dst1(id,val) select 1, 'insert in' from dual where (1,1) <= (6,6)", "insert into dst1(id,val) select 7, 'insert out' from dual where (7,7) <= (6,6)", "update dst1 set val='updated' where id=3 and (3,3) <= (6,6)", @@ -1172,6 +1180,7 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) { "/insert into _vt.vreplication", "/update _vt.vreplication set state = 'Copying'", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", ).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer { if !optimizeInsertsEnabled { expect = expect.Then(qh.Immediately("insert into dst(id,val) select 4, 'new' from dual where (4) <= (2)")) @@ -1268,6 +1277,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) { "/insert into _vt.vreplication", "/update _vt.vreplication set state = 'Copying'", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Copy "insert into dst(id,val) values (3,'uncopied'), (4,'new')", `/insert into _vt.copy_state .*`, @@ -1321,6 +1331,7 @@ func testPlayerCopyTablesNone(t *testing.T) { expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "begin", "/update _vt.vreplication set state='Stopped'", "commit", @@ -1375,6 +1386,7 @@ func testPlayerCopyTablesStopAfterCopy(t *testing.T) { expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -1464,6 +1476,7 @@ func testPlayerCopyTablesGIPK(t *testing.T) { expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -1563,6 +1576,7 @@ func testPlayerCopyTableCancel(t *testing.T) { expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -1645,6 +1659,7 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) { expectNontxQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message=", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state", // The first fast-forward has no starting point. So, it just saves the current position. @@ -1717,6 +1732,7 @@ func testCopyTablesWithInvalidDates(t *testing.T) { expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", // Create the list of tables to copy and transition to Copying state. "begin", "/insert into _vt.copy_state", @@ -1813,6 +1829,7 @@ func testCopyInvisibleColumns(t *testing.T) { expectNontxQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message=", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/insert into _vt.copy_state", "/update _vt.vreplication set state='Copying'", // The first fast-forward has no starting point. So, it just saves the current position. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 7b291f6a99c..9e468021e44 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -1696,6 +1696,7 @@ func TestPlayerDDL(t *testing.T) { "/update.*'Running'", // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update.*'Running'", "begin", fmt.Sprintf("/update.*'%s'", pos2), @@ -1862,6 +1863,7 @@ func TestPlayerStopPos(t *testing.T) { "/update.*'Running'", // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update.*'Running'", "begin", "insert into yes(id,val) values (1,'aaa')", @@ -1887,6 +1889,7 @@ func TestPlayerStopPos(t *testing.T) { "/update.*'Running'", // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update.*'Running'", "begin", // Since 'no' generates empty transactions that are skipped by @@ -1905,6 +1908,7 @@ func TestPlayerStopPos(t *testing.T) { "/update.*'Running'", // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update.*'Running'", "/update.*'Stopped'.*already reached", )) @@ -2526,6 +2530,7 @@ func TestRestartOnVStreamEnd(t *testing.T) { }) expectDBClientQueries(t, qh.Expect( "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running'", "begin", "insert into t1(id,val) values (2,'aaa')", @@ -3098,6 +3103,7 @@ func startVReplication(t *testing.T, bls *binlogdatapb.BinlogSource, pos string) expectDBClientQueries(t, qh.Expect( "/insert into _vt.vreplication", "/update _vt.vreplication set message='Picked source tablet.*", + "/SELECT rows_copied FROM _vt.vreplication WHERE id=.+", "/update _vt.vreplication set state='Running'", )) var once sync.Once diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index fd8117a6b5f..33e3174ab41 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -142,7 +142,7 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer log.Warningf("The supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d", vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval) } - return &vreplicator{ + vr := &vreplicator{ vre: vre, id: id, source: source, @@ -151,6 +151,8 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer dbClient: newVDBClient(dbClient, stats), mysqld: mysqld, } + vr.setExistingRowsCopied() + return vr } // Replicate starts a vreplication stream. It can be in one of three phases: @@ -1021,3 +1023,39 @@ func (vr *vreplicator) newClientConnection(ctx context.Context) (*vdbClient, err } return dbClient, nil } + +// setExistingRowsCopied deals with the case where another tablet started +// the workflow and a reparent occurred, and now that we manage the +// workflow, we need to read the rows_copied that already exists and add +// them to our counter, otherwise it will look like the reparent wiped all the +// rows_copied. So in the event that our CopyRowCount counter is zero, and +// the existing rows_copied in the vreplication table is not, copy the value of +// vreplication.rows_copied into our CopyRowCount. +func (vr *vreplicator) setExistingRowsCopied() { + if vr.stats.CopyRowCount.Get() == 0 { + rowsCopiedExisting, err := vr.readExistingRowsCopied(vr.id) + if err != nil { + log.Warningf("Failed to read existing rows copied value for %s worfklow: %v", vr.WorkflowName, err) + } else if rowsCopiedExisting != 0 { + log.Infof("Resuming the %s vreplication workflow started on another tablet, setting rows copied counter to %v", vr.WorkflowName, rowsCopiedExisting) + vr.stats.CopyRowCount.Set(rowsCopiedExisting) + } + } +} + +func (vr *vreplicator) readExistingRowsCopied(id int32) (int64, error) { + query, err := sqlparser.ParseAndBind(`SELECT rows_copied FROM _vt.vreplication WHERE id=%a`, + sqltypes.Int32BindVariable(id), + ) + if err != nil { + return 0, err + } + r, err := vr.dbClient.Execute(query) + if err != nil { + return 0, err + } + if len(r.Rows) != 1 { + return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get expected single row value when getting rows_copied for workflow id: %d", id) + } + return r.Rows[0][0].ToInt64() +} diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go index 66591bbcb81..6bf4e0897ba 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go @@ -626,6 +626,57 @@ func TestCancelledDeferSecondaryKeys(t *testing.T) { require.Equal(t, 1, len(res.Rows)) } +// TestResumingFromPreviousWorkflowKeepingRowsCopied tests that when you +// resume a workflow started by another tablet (eg. a reparent occurred), +// the rows_copied does not reset to zero but continues along from where +// it left off. +func TestResumingFromPreviousWorkflowKeepingRowsCopied(t *testing.T) { + _, cancel := context.WithCancel(context.Background()) + defer cancel() + tablet := addTablet(100) + defer deleteTablet(tablet) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + }}, + } + bls := &binlogdatapb.BinlogSource{ + Keyspace: env.KeyspaceName, + Shard: env.ShardName, + Filter: filter, + } + // The test env uses the same factory for both dba and + // filtered connections. + dbconfigs.GlobalDBConfigs.Filtered.User = "vt_dba" + id := int32(1) + + vsclient := newTabletConnector(tablet) + stats := binlogplayer.NewStats() + + dbaconn := playerEngine.dbClientFactoryDba() + err := dbaconn.Connect() + require.NoError(t, err) + defer dbaconn.Close() + + dbClient := playerEngine.dbClientFactoryFiltered() + err = dbClient.Connect() + require.NoError(t, err) + defer dbClient.Close() + + dbName := dbClient.DBName() + rowsCopied := int64(500000) + // Ensure there's an existing vreplication workflow + _, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, rows_copied) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s', %v) on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s', rows_copied=%v", + id, dbName, rowsCopied, dbName, rowsCopied), 1) + require.NoError(t, err) + defer func() { + _, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1) + require.NoError(t, err) + }() + vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine) + assert.Equal(t, rowsCopied, vr.stats.CopyRowCount.Get()) +} + // stripCruft removes all whitespace unicode chars and backticks. func stripCruft(in string) string { out := strings.Builder{}