Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-16.0] copy over existing vreplication rows copied to local counter if resuming from another tablet (#13949) #13962

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string {
if pos == "" {
return []string{
"/insert into _vt.vreplication",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"begin",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
Expand All @@ -190,6 +191,7 @@ func getExpectedVreplicationQueries(t *testing.T, pos string) []string {
}
return []string{
"/insert into _vt.vreplication",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
}
}
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestJournalOneToOne(t *testing.T) {
fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID),
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
))

Expand Down Expand Up @@ -145,6 +146,8 @@ func TestJournalOneToMany(t *testing.T) {
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
"/update _vt.vreplication set state='Running', message='' where id.*",
))
Expand Down Expand Up @@ -207,6 +210,7 @@ func TestJournalTablePresent(t *testing.T) {
fmt.Sprintf("delete from _vt.vreplication where id=%d", firstID),
"commit",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running', message='' where id.*",
))

Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ func testPlayerCopyCharPK(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
"insert into dst(idc,val) values ('a\\0',1)",
Expand Down Expand Up @@ -278,6 +279,7 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// Copy mode.
Expand Down Expand Up @@ -401,6 +403,7 @@ func testPlayerCopyVarcharCompositePKCaseSensitiveCollation(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// Copy mode.
Expand Down Expand Up @@ -475,6 +478,7 @@ func testPlayerCopyTablesWithFK(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"select @@foreign_key_checks;",
// Create the list of tables to copy and transition to Copying state.
"begin",
Expand Down Expand Up @@ -607,6 +611,7 @@ func testPlayerCopyTables(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -746,6 +751,7 @@ func testPlayerCopyBigTable(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
// The first fast-forward has no starting point. So, it just saves the current position.
"/update _vt.vreplication set state='Copying'",
Expand Down Expand Up @@ -876,6 +882,7 @@ func testPlayerCopyWildcardRule(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down Expand Up @@ -1037,6 +1044,7 @@ func testPlayerCopyTableContinuation(t *testing.T) {
expectNontxQueries(t, qh.Expect(
// Catchup
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"insert into dst1(id,val) select 1, 'insert in' from dual where (1,1) <= (6,6)",
"insert into dst1(id,val) select 7, 'insert out' from dual where (7,7) <= (6,6)",
"update dst1 set val='updated' where id=3 and (3,3) <= (6,6)",
Expand Down Expand Up @@ -1167,6 +1175,7 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
).Then(func(expect qh.ExpectationSequencer) qh.ExpectationSequencer {
if !optimizeInsertsEnabled {
expect = expect.Then(qh.Immediately("insert into dst(id,val) select 4, 'new' from dual where (4) <= (2)"))
Expand Down Expand Up @@ -1263,6 +1272,7 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) {
"/insert into _vt.vreplication",
"/update _vt.vreplication set state = 'Copying'",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Copy
"insert into dst(id,val) values (3,'uncopied'), (4,'new')",
`/insert into _vt.copy_state .*`,
Expand Down Expand Up @@ -1316,6 +1326,7 @@ func testPlayerCopyTablesNone(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"begin",
"/update _vt.vreplication set state='Stopped'",
"commit",
Expand Down Expand Up @@ -1370,6 +1381,7 @@ func testPlayerCopyTablesStopAfterCopy(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1454,6 +1466,7 @@ func testPlayerCopyTableCancel(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1542,6 +1555,7 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message=",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down Expand Up @@ -1614,6 +1628,7 @@ func testCopyTablesWithInvalidDates(t *testing.T) {
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
// Create the list of tables to copy and transition to Copying state.
"begin",
"/insert into _vt.copy_state",
Expand Down Expand Up @@ -1711,6 +1726,7 @@ func testCopyInvisibleColumns(t *testing.T) {
expectNontxQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message=",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/insert into _vt.copy_state",
"/update _vt.vreplication set state='Copying'",
// The first fast-forward has no starting point. So, it just saves the current position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,7 @@ func TestPlayerDDL(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
fmt.Sprintf("/update.*'%s'", pos2),
Expand Down Expand Up @@ -1787,6 +1788,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
"insert into yes(id,val) values (1,'aaa')",
Expand All @@ -1812,6 +1814,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"begin",
// Since 'no' generates empty transactions that are skipped by
Expand All @@ -1830,6 +1833,7 @@ func TestPlayerStopPos(t *testing.T) {
"/update.*'Running'",
// Second update is from vreplicator.
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update.*'Running'",
"/update.*'Stopped'.*already reached",
))
Expand Down Expand Up @@ -2451,6 +2455,7 @@ func TestRestartOnVStreamEnd(t *testing.T) {
})
expectDBClientQueries(t, qh.Expect(
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
"begin",
"insert into t1(id,val) values (2,'aaa')",
Expand Down Expand Up @@ -2906,6 +2911,7 @@ func startVReplication(t *testing.T, bls *binlogdatapb.BinlogSource, pos string)
expectDBClientQueries(t, qh.Expect(
"/insert into _vt.vreplication",
"/update _vt.vreplication set message='Picked source tablet.*",
"/SELECT rows_copied FROM _vt.vreplication WHERE id=.+",
"/update _vt.vreplication set state='Running'",
))
var once sync.Once
Expand Down
40 changes: 39 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame
log.Warningf("The supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d",
vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval)
}
return &vreplicator{
vr := &vreplicator{
vre: vre,
id: id,
source: source,
Expand All @@ -164,6 +164,8 @@ func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreame

throttleUpdatesRateLimiter: timer.NewRateLimiter(time.Second),
}
vr.setExistingRowsCopied()
return vr
}

// Replicate starts a vreplication stream. It can be in one of three phases:
Expand Down Expand Up @@ -991,3 +993,39 @@ func (vr *vreplicator) newClientConnection(ctx context.Context) (*vdbClient, err
}
return dbClient, nil
}

// setExistingRowsCopied deals with the case where another tablet started
// the workflow and a reparent occurred, and now that we manage the
// workflow, we need to read the rows_copied that already exists and add
// them to our counter, otherwise it will look like the reparent wiped all the
// rows_copied. So in the event that our CopyRowCount counter is zero, and
// the existing rows_copied in the vreplication table is not, copy the value of
// vreplication.rows_copied into our CopyRowCount.
func (vr *vreplicator) setExistingRowsCopied() {
if vr.stats.CopyRowCount.Get() == 0 {
rowsCopiedExisting, err := vr.readExistingRowsCopied(vr.id)
if err != nil {
log.Warningf("Failed to read existing rows copied value for %s worfklow: %v", vr.WorkflowName, err)
} else if rowsCopiedExisting != 0 {
log.Infof("Resuming the %s vreplication workflow started on another tablet, setting rows copied counter to %v", vr.WorkflowName, rowsCopiedExisting)
vr.stats.CopyRowCount.Set(rowsCopiedExisting)
}
}
}

func (vr *vreplicator) readExistingRowsCopied(id uint32) (int64, error) {
query, err := sqlparser.ParseAndBind(`SELECT rows_copied FROM _vt.vreplication WHERE id=%a`,
sqltypes.Uint32BindVariable(id),
)
if err != nil {
return 0, err
}
r, err := vr.dbClient.Execute(query)
if err != nil {
return 0, err
}
if len(r.Rows) != 1 {
return 0, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get expected single row value when getting rows_copied for workflow id: %d", id)
}
return r.Rows[0][0].ToInt64()
}
51 changes: 51 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,57 @@ func TestCancelledDeferSecondaryKeys(t *testing.T) {
require.Equal(t, 1, len(res.Rows))
}

// TestResumingFromPreviousWorkflowKeepingRowsCopied tests that when you
// resume a workflow started by another tablet (eg. a reparent occurred),
// the rows_copied does not reset to zero but continues along from where
// it left off.
func TestResumingFromPreviousWorkflowKeepingRowsCopied(t *testing.T) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
tablet := addTablet(100)
defer deleteTablet(tablet)
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
}
// The test env uses the same factory for both dba and
// filtered connections.
dbconfigs.GlobalDBConfigs.Filtered.User = "vt_dba"
id := uint32(1)

vsclient := newTabletConnector(tablet)
stats := binlogplayer.NewStats()

dbaconn := playerEngine.dbClientFactoryDba()
err := dbaconn.Connect()
require.NoError(t, err)
defer dbaconn.Close()

dbClient := playerEngine.dbClientFactoryFiltered()
err = dbClient.Connect()
require.NoError(t, err)
defer dbClient.Close()

dbName := dbClient.DBName()
rowsCopied := int64(500000)
// Ensure there's an existing vreplication workflow
_, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, rows_copied) values (%d, 'test', '', '', 99999, 99999, 0, 0, 'Running', '%s', %v) on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s', rows_copied=%v",
id, dbName, rowsCopied, dbName, rowsCopied), 1)
require.NoError(t, err)
defer func() {
_, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1)
require.NoError(t, err)
}()
vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine)
assert.Equal(t, rowsCopied, vr.stats.CopyRowCount.Get())
}

// stripCruft removes all whitespace unicode chars and backticks.
func stripCruft(in string) string {
out := strings.Builder{}
Expand Down