diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go index e00a5578171..c671d2a086d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector_test.go @@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) { expectDBClientAndVreplicationQueries(t, []string{ "begin", - "insert into tab1(id,val) values (1,_binary'a')", - "insert into tab1(id,val) values (2,_binary'b')", + "insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')", "/update _vt.vreplication set pos=", "commit", }, pos) diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index fe8b62d3cef..12a05a69dbc 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) { resetBinlogClient() vttablet.InitVReplicationConfigDefaults() - vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 // Engines cannot be initialized in testenv because it introduces circular dependencies. streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0]) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index a95e0bf17c5..a7e4794ba76 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -684,6 +684,14 @@ func testPlayerCopyBigTable(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() + // The test is written to match the behavior w/o + // VReplicationExperimentalFlagOptimizeInserts enabled. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond @@ -814,6 +822,14 @@ func testPlayerCopyWildcardRule(t *testing.T) { reset := vstreamer.AdjustPacketSize(1) defer reset() + // The test is written to match the behavior w/o + // VReplicationExperimentalFlagOptimizeInserts enabled. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration // copyPhaseDuration should be low enough to have time to send one row. vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond diff --git a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go index 8a4409db06c..63538be881d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vdbclient.go @@ -30,6 +30,8 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +const beginStmtLen = int64(len("begin;")) + // vdbClient is a wrapper on binlogplayer.DBClient. // It allows us to retry a failed transactions on lock errors. type vdbClient struct { @@ -56,16 +58,19 @@ func (vc *vdbClient) Begin() error { if vc.InTransaction { return nil } - if err := vc.DBClient.Begin(); err != nil { - return err + if vc.maxBatchSize > 0 { + // We are batching the contents of the transaction, which + // starts with the BEGIN and ends with the COMMIT, so we + // do not send a BEGIN down the wire ahead of time. + vc.queriesPos = int64(len(vc.queries)) + vc.batchSize = beginStmtLen + } else { + // We're not batching so we start the transaction here + // by sending the BEGIN down the wire. + if err := vc.DBClient.Begin(); err != nil { + return err + } } - - // If we're batching, we only batch the contents of the - // transaction, which starts with the begin and ends with - // the commit. - vc.queriesPos = int64(len(vc.queries)) - vc.batchSize = 6 // begin and semicolon - vc.queries = append(vc.queries, "begin") vc.InTransaction = true vc.startTime = time.Now() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index db2f3f341ac..98e36119622 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -133,7 +133,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map return vr.dbClient.Commit() } batchMode := false - if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { + // We only do batching in the running/replicating phase. + if len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 { batchMode = true } if batchMode { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 0cc568c1cf1..50d93e60e5a 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) { // It does not work when filter is enabled output := qh.Expect( - "begin", "rollback", fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg), ) @@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) { input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')", output: qh.Expect( "begin", - "insert into dst4(id1,val) values (1,_binary'aaa')", - "insert into dst4(id1,val) values (3,_binary'ccc')", + "insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')", "/update _vt.vreplication set pos=", "commit", ), @@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) { input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')", output: qh.Expect( "begin", - "insert into dst5(id1,val) values (1,_binary'abc')", - "insert into dst5(id1,val) values (4,_binary'abc')", + "insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')", "/update _vt.vreplication set pos=", "commit", ), @@ -1495,9 +1492,7 @@ func TestPlayerRowMove(t *testing.T) { }) expectDBClientQueries(t, qh.Expect( "begin", - "insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", - "insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", + "insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1", "/update _vt.vreplication set pos=", "commit", )) @@ -1505,7 +1500,7 @@ func TestPlayerRowMove(t *testing.T) { {"1", "1", "1"}, {"2", "5", "2"}, }) - validateQueryCountStat(t, "replicate", 3) + validateQueryCountStat(t, "replicate", 1) execStatements(t, []string{ "update src set val1=1, val2=4 where id=3", @@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) { {"1", "5", "2"}, {"2", "2", "1"}, }) - validateQueryCountStat(t, "replicate", 5) + validateQueryCountStat(t, "replicate", 3) } func TestPlayerTypes(t *testing.T) { @@ -2179,6 +2174,14 @@ func TestPlayerSplitTransaction(t *testing.T) { func TestPlayerLockErrors(t *testing.T) { defer deleteTablet(addTablet(100)) + // The immediate retry behavior does not apply when doing + // VPlayer Batching. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + execStatements(t, []string{ "create table t1(id int, val varchar(128), primary key(id))", fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb), @@ -2258,6 +2261,14 @@ func TestPlayerLockErrors(t *testing.T) { func TestPlayerCancelOnLock(t *testing.T) { defer deleteTablet(addTablet(100)) + // The immediate retry behavior does not apply when doing + // VPlayer Batching. + origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags + vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0 + defer func() { + vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags + }() + execStatements(t, []string{ "create table t1(id int, val varchar(128), primary key(id))", fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb), diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 9ec274ab0ea..42701288a44 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -508,8 +508,14 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me } vr.stats.State.Store(state.String()) query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id) - if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil { - return fmt.Errorf("could not set state: %v: %v", query, err) + // If we're batching a transaction, then include the state update + // in the current transaction batch. + if vr.dbClient.InTransaction && vr.dbClient.maxBatchSize > 0 { + vr.dbClient.AddQueryToTrxBatch(query) + } else { // Otherwise, send it down the wire + if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil { + return fmt.Errorf("could not set state: %v: %v", query, err) + } } if state == vr.state { return nil