diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go index e00a5578171..c671d2a086d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go @@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) { expectDBClientAndVreplicationQueries(t, []string{ "begin", - "insert into tab1(id,val) values (1,_binary'a')", - "insert into tab1(id,val) values (2,_binary'b')", + "insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')", "/update _vt.vreplication set pos=", "commit", }, pos) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index fe8b62d3cef..12a05a69dbc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) { resetBinlogClient() vttablet.InitVReplicationConfigDefaults() - vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 // Engines cannot be initialized in testenv because it introduces circular dependencies. streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index a95e0bf17c5..1f2bca6400a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -766,7 +766,6 @@ func testPlayerCopyBigTable(t *testing.T) { "insert into dst(id,val) values (1,'aaa')", `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"id" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:"1"}'.*`, // The next catchup executes the new row insert, but will be a no-op. - "insert into dst(id,val) select 3, 'ccc' from dual where (3) <= (1)", // fastForward has nothing to add. Just saves position. // Back to copy mode. // Inserts can happen out-of-order. @@ -895,7 +894,6 @@ func testPlayerCopyWildcardRule(t *testing.T) { "insert into src(id,val) values (1,'aaa')", `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \(_binary'fields:{name:"id" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:"1"}'.*`, // The next catchup executes the new row insert, but will be a no-op. - "insert into src(id,val) select 3, 'ccc' from dual where (3) <= (1)", // fastForward has nothing to add. Just saves position. // Return to copy mode. // Inserts can happen out-of-order. diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index 8a4409db06c..3f927f6233e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -56,16 +56,18 @@ func (vc *vdbClient) Begin() error { if vc.InTransaction { return nil } - if err := vc.DBClient.Begin(); err != nil { - return err + if vc.maxBatchSize == 0 { + // We're not batching so we BEGIN the transaction here. + if err := vc.DBClient.Begin(); err != nil { + return err + } + } else { + // If we're batching, we only batch the contents of the + // transaction, which starts with the BEGIN and ends with + // the COMMIT. + vc.queriesPos = int64(len(vc.queries)) + vc.batchSize = 6 // begin and semicolon } - - // If we're batching, we only batch the contents of the - // transaction, which starts with the begin and ends with - // the commit. - vc.queriesPos = int64(len(vc.queries)) - vc.batchSize = 6 // begin and semicolon - vc.queries = append(vc.queries, "begin") vc.InTransaction = true vc.startTime = time.Now() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index db2f3f341ac..4131ced5b62 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -133,7 +133,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return vr.dbClient.Commit() } batchMode := false - if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { + // We only do batching in the running phase. + if copyState == nil && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { batchMode = true } if batchMode { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 0cc568c1cf1..5d437e63166 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) { // It does not work when filter is enabled output := qh.Expect( - "begin", "rollback", fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg), ) @@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) { input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')", output: qh.Expect( "begin", - "insert into dst4(id1,val) values (1,_binary'aaa')", - "insert into dst4(id1,val) values (3,_binary'ccc')", + "insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')", "/update _vt.vreplication set pos=", "commit", ), @@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) { input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')", output: qh.Expect( "begin", - "insert into dst5(id1,val) values (1,_binary'abc')", - "insert into dst5(id1,val) values (4,_binary'abc')", + "insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')", "/update _vt.vreplication set pos=", "commit", ), @@ -1495,9 +1492,7 @@ func TestPlayerRowMove(t *testing.T) { }) expectDBClientQueries(t, qh.Expect( "begin", - "insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", + "insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", "/update _vt.vreplication set pos=", "commit", )) @@ -1505,7 +1500,7 @@ func TestPlayerRowMove(t *testing.T) { {"1", "1", "1"}, {"2", "5", "2"}, }) - validateQueryCountStat(t, "replicate", 3) + validateQueryCountStat(t, "replicate", 1) execStatements(t, []string{ "update src set val1=1, val2=4 where id=3", @@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) { {"1", "5", "2"}, {"2", "2", "1"}, }) - validateQueryCountStat(t, "replicate", 5) + validateQueryCountStat(t, "replicate", 3) } func TestPlayerTypes(t *testing.T) { @@ -1737,6 +1732,7 @@ func TestPlayerDDL(t *testing.T) { pos1 := primaryPosition(t) // The stop position must be the GTID of the first DDL expectDBClientQueries(t, qh.Expect( + "/update _vt.vreplication set state='Stopped'", "begin", fmt.Sprintf("/update _vt.vreplication set pos='%s'", pos1), "/update _vt.vreplication set state='Stopped'", @@ -1757,6 +1753,7 @@ func TestPlayerDDL(t *testing.T) { // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", "/update.*'Running'", + "/update _vt.vreplication set state='Stopped'", "begin", fmt.Sprintf("/update.*'%s'", pos2), "/update _vt.vreplication set state='Stopped'", @@ -1922,6 +1919,7 @@ func TestPlayerStopPos(t *testing.T) { // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", "/update.*'Running'", + "/update.*'Stopped'", "begin", "insert into yes(id,val) values (1,'aaa')", fmt.Sprintf("/update.*compress.*'%s'", stopPos), @@ -1947,6 +1945,7 @@ func TestPlayerStopPos(t *testing.T) { // Second update is from vreplicator. "/update _vt.vreplication set message='Picked source tablet.*", "/update.*'Running'", + "/update.*'Stopped'", "begin", // Since 'no' generates empty transactions that are skipped by // vplayer, a commit is done only for the stop position event. @@ -2179,6 +2178,14 @@ func TestPlayerSplitTransaction(t *testing.T) { func TestPlayerLockErrors(t *testing.T) { defer deleteTablet(addTablet(100)) + // The immediate retry behavior does not apply when doing + // VPlayer Batching. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + execStatements(t, []string{ "create table t1(id int, val varchar(128), primary key(id))", fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb), @@ -2245,7 +2252,8 @@ func TestPlayerLockErrors(t *testing.T) { )) // Release the lock, and watch the retry go through. - _, _ = vconn.ExecuteFetch("rollback", 1) + _, err := vconn.ExecuteFetch("rollback", 1) + require.NoError(t, err) expectDBClientQueries(t, qh.Expect( "begin", "update t1 set val='ccc' where id=1", @@ -2258,6 +2266,14 @@ func TestPlayerLockErrors(t *testing.T) { func TestPlayerCancelOnLock(t *testing.T) { defer deleteTablet(addTablet(100)) + // The immediate retry behavior does not apply when doing + // VPlayer Batching. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + execStatements(t, []string{ "create table t1(id int, val varchar(128), primary key(id))", fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),