Skip to content

Commit

Permalink
copy over existing vreplication rows copied to local counter if resum…
Browse files Browse the repository at this point in the history
…ing from another tablet (#13949)

Signed-off-by: Olga Shestopalova <[email protected]>
Signed-off-by: Olga Shestopalova <[email protected]>
Co-authored-by: Olga Shestopalova <[email protected]>
Co-authored-by: Matt Lord <[email protected]>
Co-authored-by: Shlomi Noach <[email protected]>
  • Loading branch information
4 people authored Sep 13, 2023
1 parent fe70462 commit a159f18
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 1 deletion.
21 changes: 21 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
initSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)"
deleteWorkflow = "delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'"
updatePickedSourceTablet = `update _vt.vreplication set message='Picked source tablet: cell:\"%s\" uid:%d' where id=1`
getRowsCopied = "SELECT rows_copied FROM _vt.vreplication WHERE id=1"
)

var (
Expand Down Expand Up @@ -322,6 +323,16 @@ func TestMoveTables(t *testing.T) {
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(updatePickedSourceTablet, tenv.cells[0], sourceTabletUID), &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
ftc.vrdbClient.ExpectRequest(getRowsCopied,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"rows_copied",
"int64",
),
"0",
),
nil,
)
ftc.vrdbClient.ExpectRequest(getWorkflowState, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"pos|stop_pos|max_tps|max_replication_lag|state|workflow_type|workflow|workflow_sub_type|defer_secondary_keys",
Expand Down Expand Up @@ -657,6 +668,16 @@ func TestFailedMoveTablesCreateCleanup(t *testing.T) {
&sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setSessionTZ, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(setNames, &sqltypes.Result{}, nil)
targetTablet.vrdbClient.ExpectRequest(getRowsCopied,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"rows_copied",
"int64",
),
"0",
),
nil,
)
targetTablet.vrdbClient.ExpectRequest(getWorkflowState,
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string {
if pos == "" {
return []string{
"/insert into _vt.vreplication",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"begin",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
Expand All @@ -190,6 +191,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string {
}
return []string{
"/insert into _vt.vreplication",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestJournalOneToOne(t *testing.T) {
fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID),
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
))

Expand Down Expand Up @@ -145,6 +146,8 @@ func TestJournalOneToMany(t *testing.T) {
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
"/update _vt.vreplication set state='Running', message='' where id.*",
))
Expand Down Expand Up @@ -207,6 +210,7 @@ func TestJournalTablePresent(t *testing.T) {
fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID),
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
))

Expand Down
17 changes: 17 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func testPlayerCopyCharPK(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
Expand Down Expand Up @@ -280,6 +281,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// Copy mode.
Expand Down Expand Up @@ -403,6 +405,7 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// Copy mode.
Expand Down Expand Up @@ -477,6 +480,7 @@ func testPlayerCopyTablesWithFK(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"select @@foreign_key_checks",
// Create the list of tables to copy and transition to Copying state.
"begin",
Expand Down Expand Up @@ -609,6 +613,7 @@ func testPlayerCopyTables(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -751,6 +756,7 @@ func testPlayerCopyBigTable(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set state='Copying'",
Expand Down Expand Up @@ -881,6 +887,7 @@ func testPlayerCopyWildcardRule(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down Expand Up @@ -1042,6 +1049,7 @@ func testPlayerCopyTableContinuation(t *testing.T) {
expectNontxQueries(t, qh.Expect(
// Catchup
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"insert into dst1(id,val) select 1, 'insert in' from dual where (1,1) <= (6,6)",
"insert into dst1(id,val) select 7, 'insert out' from dual where (7,7) <= (6,6)",
"update dst1 set val='updated' where id=3 and (3,3) <= (6,6)",
Expand Down Expand Up @@ -1172,6 +1180,7 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
if !optimizeInsertsEnabled {
expect = expect.Then(qh.Immediately("insert into dst(id,val) select 4, 'new' from dual where (4) <= (2)"))
Expand Down Expand Up @@ -1268,6 +1277,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Copy
"insert into dst(id,val) values (3,'uncopied'), (4,'new')",
`/insert into _vt.copy_state .*`,
Expand Down Expand Up @@ -1321,6 +1331,7 @@ func testPlayerCopyTablesNone(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"begin",
"/update _vt.vreplication set state='Stopped'",
"commit",
Expand Down Expand Up @@ -1375,6 +1386,7 @@ func testPlayerCopyTablesStopAfterCopy(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1464,6 +1476,7 @@ func testPlayerCopyTablesGIPK(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1563,6 +1576,7 @@ func testPlayerCopyTableCancel(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1645,6 +1659,7 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message=",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down Expand Up @@ -1717,6 +1732,7 @@ func testCopyTablesWithInvalidDates(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1813,6 +1829,7 @@ func testCopyInvisibleColumns(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message=",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,7 @@ func TestPlayerDDL(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
fmt.Sprintf("/update.*'%s'", pos2),
Expand Down Expand Up @@ -1919,6 +1920,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
"insert into yes(id,val) values (1,'aaa')",
Expand All @@ -1944,6 +1946,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
// Since 'no' generates empty transactions that are skipped by
Expand All @@ -1962,6 +1965,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"/update.*'Stopped'.*already reached",
))
Expand Down Expand Up @@ -2583,6 +2587,7 @@ func TestRestartOnVStreamEnd(t *testing.T) {
})
expectDBClientQueries(t, qh.Expect(
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
"begin",
"insert into t1(id,val) values (2,'aaa')",
Expand Down Expand Up @@ -3155,6 +3160,7 @@ func startVReplication(t *testing.T, bls *binlogdatapb.BinlogSource, pos string)
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
))
var once sync.Once
Expand Down
40 changes: 39 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer
log.Warningf("The supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d",
vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval)
}
return &vreplicator{
vr := &vreplicator{
vre: vre,
id: id,
source: source,
Expand All @@ -151,6 +151,8 @@ func newVReplicator(id int32, source *binlogdatapb.BinlogSource, sourceVStreamer
dbClient: newVDBClient(dbClient, stats),
mysqld: mysqld,
}
vr.setExistingRowsCopied()
return vr
}

// Replicate starts a vreplication stream. It can be in one of three phases:
Expand Down Expand Up @@ -1030,3 +1032,39 @@ func (vr *vreplicator) newClientConnection(ctx context.Context) (*vdbClient, err
}
return dbClient, nil
}

// setExistingRowsCopied deals with the case where another tablet started
// the workflow and a reparent occurred, and now that we manage the
// workflow, we need to read the rows_copied that already exists and add
// them to our counter, otherwise it will look like the reparent wiped all the
// rows_copied. So in the event that our CopyRowCount counter is zero, and
// the existing rows_copied in the vreplication table is not, copy the value of
// vreplication.rows_copied into our CopyRowCount.
func (vr *vreplicator) setExistingRowsCopied() {
if vr.stats.CopyRowCount.Get() == 0 {
rowsCopiedExisting, err := vr.readExistingRowsCopied(vr.id)
if err != nil {
log.Warningf("Failed to read existing rows copied value for %s worfklow: %v", vr.WorkflowName, err)
} else if rowsCopiedExisting != 0 {
log.Infof("Resuming the %s vreplication workflow started on another tablet, setting rows copied counter to %v", vr.WorkflowName, rowsCopiedExisting)
vr.stats.CopyRowCount.Set(rowsCopiedExisting)
}
}
}

func (vr *vreplicator) readExistingRowsCopied(id int32) (int64, error) {
query, err := sqlparser.ParseAndBind(`SELECT rows_copied FROM _vt.vreplication WHERE id=%a`,
sqltypes.Int32BindVariable(id),
)
if err != nil {
return 0, err
}
r, err := vr.dbClient.Execute(query)
if err != nil {
return 0, err
}
if len(r.Rows) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get expected single row value when getting rows_copied for workflow id: %d", id)
}
return r.Rows[0][0].ToInt64()
}
52 changes: 52 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,58 @@ func TestCancelledDeferSecondaryKeys(t *testing.T) {
require.Equal(t, 1, len(res.Rows))
}

// TestResumingFromPreviousWorkflowKeepingRowsCopied tests that when you
// resume a workflow started by another tablet (eg. a reparent occurred),
// the rows_copied does not reset to zero but continues along from where
// it left off.
func TestResumingFromPreviousWorkflowKeepingRowsCopied(t *testing.T) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
tablet := addTablet(100)
defer deleteTablet(tablet)
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
}
// The test env uses the same factory for both dba and
// filtered connections.
dbconfigs.GlobalDBConfigs.Filtered.User = "vt_dba"
id := int32(1)

vsclient := newTabletConnector(tablet)
stats := binlogplayer.NewStats()
defer stats.Stop()

dbaconn := playerEngine.dbClientFactoryDba()
err := dbaconn.Connect()
require.NoError(t, err)
defer dbaconn.Close()

dbClient := playerEngine.dbClientFactoryFiltered()
err = dbClient.Connect()
require.NoError(t, err)
defer dbClient.Close()

dbName := dbClient.DBName()
rowsCopied := int64(500000)
// Ensure there's an existing vreplication workflow
_, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, rows_copied) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s', %v) on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s', rows_copied=%v",
id, dbName, rowsCopied, dbName, rowsCopied), 1)
require.NoError(t, err)
defer func() {
_, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1)
require.NoError(t, err)
}()
vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine)
assert.Equal(t, rowsCopied, vr.stats.CopyRowCount.Get())
}

// stripCruft removes all whitespace unicode chars and backticks.
func stripCruft(in string) string {
out := strings.Builder{}
Expand Down

0 comments on commit a159f18

Please sign in to comment.