Skip to content

Commit

Permalink
[release-17.0] copy over existing vreplication rows copied to local c…
Browse files Browse the repository at this point in the history
…ounter if resuming from another tablet (#13949) (#13963)

Signed-off-by: Matt Lord <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Lord <[email protected]>
  • Loading branch information
vitess-bot[bot] and mattlord authored Sep 14, 2023
1 parent 4a7aca2 commit 4ef052f
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string {
if pos == "" {
return []string{
"/insert into _vt.vreplication",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"begin",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
Expand All @@ -190,6 +191,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string {
}
return []string{
"/insert into _vt.vreplication",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestJournalOneToOne(t *testing.T) {
fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID),
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
))

Expand Down Expand Up @@ -145,6 +146,8 @@ func TestJournalOneToMany(t *testing.T) {
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
"/update _vt.vreplication set state='Running', message='' where id.*",
))
Expand Down Expand Up @@ -207,6 +210,7 @@ func TestJournalTablePresent(t *testing.T) {
fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID),
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
))

Expand Down
17 changes: 17 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func testPlayerCopyCharPK(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
Expand Down Expand Up @@ -280,6 +281,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// Copy mode.
Expand Down Expand Up @@ -403,6 +405,7 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// Copy mode.
Expand Down Expand Up @@ -477,6 +480,7 @@ func testPlayerCopyTablesWithFK(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"select @@foreign_key_checks;",
// Create the list of tables to copy and transition to Copying state.
"begin",
Expand Down Expand Up @@ -609,6 +613,7 @@ func testPlayerCopyTables(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -751,6 +756,7 @@ func testPlayerCopyBigTable(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set state='Copying'",
Expand Down Expand Up @@ -881,6 +887,7 @@ func testPlayerCopyWildcardRule(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down Expand Up @@ -1042,6 +1049,7 @@ func testPlayerCopyTableContinuation(t *testing.T) {
expectNontxQueries(t, qh.Expect(
// Catchup
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"insert into dst1(id,val) select 1, 'insert in' from dual where (1,1) <= (6,6)",
"insert into dst1(id,val) select 7, 'insert out' from dual where (7,7) <= (6,6)",
"update dst1 set val='updated' where id=3 and (3,3) <= (6,6)",
Expand Down Expand Up @@ -1172,6 +1180,7 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
if !optimizeInsertsEnabled {
expect = expect.Then(qh.Immediately("insert into dst(id,val) select 4, 'new' from dual where (4) <= (2)"))
Expand Down Expand Up @@ -1268,6 +1277,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Copy
"insert into dst(id,val) values (3,'uncopied'), (4,'new')",
`/insert into _vt.copy_state .*`,
Expand Down Expand Up @@ -1321,6 +1331,7 @@ func testPlayerCopyTablesNone(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"begin",
"/update _vt.vreplication set state='Stopped'",
"commit",
Expand Down Expand Up @@ -1375,6 +1386,7 @@ func testPlayerCopyTablesStopAfterCopy(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1464,6 +1476,7 @@ func testPlayerCopyTablesGIPK(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1563,6 +1576,7 @@ func testPlayerCopyTableCancel(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1645,6 +1659,7 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message=",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down Expand Up @@ -1717,6 +1732,7 @@ func testCopyTablesWithInvalidDates(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1813,6 +1829,7 @@ func testCopyInvisibleColumns(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message=",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,7 @@ func TestPlayerDDL(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
fmt.Sprintf("/update.*'%s'", pos2),
Expand Down Expand Up @@ -1862,6 +1863,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
"insert into yes(id,val) values (1,'aaa')",
Expand All @@ -1887,6 +1889,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
// Since 'no' generates empty transactions that are skipped by
Expand All @@ -1905,6 +1908,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"/update.*'Stopped'.*already reached",
))
Expand Down Expand Up @@ -2526,6 +2530,7 @@ func TestRestartOnVStreamEnd(t *testing.T) {
})
expectDBClientQueries(t, qh.Expect(
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
"begin",
"insert into t1(id,val) values (2,'aaa')",
Expand Down Expand Up @@ -3098,6 +3103,7 @@ func startVReplication(t *testing.T, bls *binlogdatapb.BinlogSource, pos string)
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
))
var once sync.Once
Expand Down
40 changes: 39 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer
log.Warningf("The supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d",
vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval)
}
return &vreplicator{
vr := &vreplicator{
vre: vre,
id: id,
source: source,
Expand All @@ -151,6 +151,8 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer
dbClient: newVDBClient(dbClient, stats),
mysqld: mysqld,
}
vr.setExistingRowsCopied()
return vr
}

// Replicate starts a vreplication stream. It can be in one of three phases:
Expand Down Expand Up @@ -1021,3 +1023,39 @@ func (vr *vreplicator) newClientConnection(ctx context.Context) (*vdbClient, err
}
return dbClient, nil
}

// setExistingRowsCopied deals with the case where another tablet started
// the workflow and a reparent occurred, and now that we manage the
// workflow, we need to read the rows_copied that already exists and add
// them to our counter, otherwise it will look like the reparent wiped all the
// rows_copied. So in the event that our CopyRowCount counter is zero, and
// the existing rows_copied in the vreplication table is not, copy the value of
// vreplication.rows_copied into our CopyRowCount.
func (vr *vreplicator) setExistingRowsCopied() {
if vr.stats.CopyRowCount.Get() == 0 {
rowsCopiedExisting, err := vr.readExistingRowsCopied(vr.id)
if err != nil {
log.Warningf("Failed to read existing rows copied value for %s worfklow: %v", vr.WorkflowName, err)
} else if rowsCopiedExisting != 0 {
log.Infof("Resuming the %s vreplication workflow started on another tablet, setting rows copied counter to %v", vr.WorkflowName, rowsCopiedExisting)
vr.stats.CopyRowCount.Set(rowsCopiedExisting)
}
}
}

func (vr *vreplicator) readExistingRowsCopied(id int32) (int64, error) {
query, err := sqlparser.ParseAndBind(`SELECT rows_copied FROM _vt.vreplication WHERE id=%a`,
sqltypes.Int32BindVariable(id),
)
if err != nil {
return 0, err
}
r, err := vr.dbClient.Execute(query)
if err != nil {
return 0, err
}
if len(r.Rows) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get expected single row value when getting rows_copied for workflow id: %d", id)
}
return r.Rows[0][0].ToInt64()
}
51 changes: 51 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,57 @@ func TestCancelledDeferSecondaryKeys(t *testing.T) {
require.Equal(t, 1, len(res.Rows))
}

// TestResumingFromPreviousWorkflowKeepingRowsCopied tests that when you
// resume a workflow started by another tablet (eg. a reparent occurred),
// the rows_copied does not reset to zero but continues along from where
// it left off.
func TestResumingFromPreviousWorkflowKeepingRowsCopied(t *testing.T) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
tablet := addTablet(100)
defer deleteTablet(tablet)
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
}
// The test env uses the same factory for both dba and
// filtered connections.
dbconfigs.GlobalDBConfigs.Filtered.User = "vt_dba"
id := int32(1)

vsclient := newTabletConnector(tablet)
stats := binlogplayer.NewStats()

dbaconn := playerEngine.dbClientFactoryDba()
err := dbaconn.Connect()
require.NoError(t, err)
defer dbaconn.Close()

dbClient := playerEngine.dbClientFactoryFiltered()
err = dbClient.Connect()
require.NoError(t, err)
defer dbClient.Close()

dbName := dbClient.DBName()
rowsCopied := int64(500000)
// Ensure there's an existing vreplication workflow
_, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, rows_copied) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s', %v) on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s', rows_copied=%v",
id, dbName, rowsCopied, dbName, rowsCopied), 1)
require.NoError(t, err)
defer func() {
_, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1)
require.NoError(t, err)
}()
vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine)
assert.Equal(t, rowsCopied, vr.stats.CopyRowCount.Get())
}

// stripCruft removes all whitespace unicode chars and backticks.
func stripCruft(in string) string {
out := strings.Builder{}
Expand Down

0 comments on commit 4ef052f

Please sign in to comment.