Skip to content

Commit

Permalink
VReplication: Enable VPlayerBatching in unit tests (#17339)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Dec 6, 2024
1 parent 4675244 commit f9acb77
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) {

expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab1(id,val) values (1,_binary'a')",
"insert into tab1(id,val) values (2,_binary'b')",
"insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')",
"/update _vt.vreplication set pos=",
"commit",
}, pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) {
resetBinlogClient()

vttablet.InitVReplicationConfigDefaults()
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0

// Engines cannot be initialized in testenv because it introduces circular dependencies.
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0])
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,14 @@ func testPlayerCopyBigTable(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

// The test is written to match the behavior w/o
// VReplicationExperimentalFlagOptimizeInserts enabled.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
Expand Down Expand Up @@ -814,6 +822,14 @@ func testPlayerCopyWildcardRule(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

// The test is written to match the behavior w/o
// VReplicationExperimentalFlagOptimizeInserts enabled.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
Expand Down
23 changes: 14 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"vitess.io/vitess/go/vt/vterrors"
)

const beginStmtLen = int64(len("begin;"))

// vdbClient is a wrapper on binlogplayer.DBClient.
// It allows us to retry a failed transactions on lock errors.
type vdbClient struct {
Expand All @@ -56,16 +58,19 @@ func (vc *vdbClient) Begin() error {
if vc.InTransaction {
return nil
}
if err := vc.DBClient.Begin(); err != nil {
return err
if vc.maxBatchSize > 0 {
// We are batching the contents of the transaction, which
// starts with the BEGIN and ends with the COMMIT, so we
// do not send a BEGIN down the wire ahead of time.
vc.queriesPos = int64(len(vc.queries))
vc.batchSize = beginStmtLen
} else {
// We're not batching so we start the transaction here
// by sending the BEGIN down the wire.
if err := vc.DBClient.Begin(); err != nil {
return err
}
}

// If we're batching, we only batch the contents of the
// transaction, which starts with the begin and ends with
// the commit.
vc.queriesPos = int64(len(vc.queries))
vc.batchSize = 6 // begin and semicolon

vc.queries = append(vc.queries, "begin")
vc.InTransaction = true
vc.startTime = time.Now()
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
return vr.dbClient.Commit()
}
batchMode := false
if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
// We only do batching in the running/replicating phase.
if len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
batchMode = true
}
if batchMode {
Expand Down
31 changes: 21 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) {

// It does not work when filter is enabled
output := qh.Expect(
"begin",
"rollback",
fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg),
)
Expand Down Expand Up @@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) {
input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')",
output: qh.Expect(
"begin",
"insert into dst4(id1,val) values (1,_binary'aaa')",
"insert into dst4(id1,val) values (3,_binary'ccc')",
"insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')",
"/update _vt.vreplication set pos=",
"commit",
),
Expand All @@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) {
input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')",
output: qh.Expect(
"begin",
"insert into dst5(id1,val) values (1,_binary'abc')",
"insert into dst5(id1,val) values (4,_binary'abc')",
"insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')",
"/update _vt.vreplication set pos=",
"commit",
),
Expand Down Expand Up @@ -1495,17 +1492,15 @@ func TestPlayerRowMove(t *testing.T) {
})
expectDBClientQueries(t, qh.Expect(
"begin",
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"/update _vt.vreplication set pos=",
"commit",
))
expectData(t, "dst", [][]string{
{"1", "1", "1"},
{"2", "5", "2"},
})
validateQueryCountStat(t, "replicate", 3)
validateQueryCountStat(t, "replicate", 1)

execStatements(t, []string{
"update src set val1=1, val2=4 where id=3",
Expand All @@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) {
{"1", "5", "2"},
{"2", "2", "1"},
})
validateQueryCountStat(t, "replicate", 5)
validateQueryCountStat(t, "replicate", 3)
}

func TestPlayerTypes(t *testing.T) {
Expand Down Expand Up @@ -2179,6 +2174,14 @@ func TestPlayerSplitTransaction(t *testing.T) {
func TestPlayerLockErrors(t *testing.T) {
defer deleteTablet(addTablet(100))

// The immediate retry behavior does not apply when doing
// VPlayer Batching.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

execStatements(t, []string{
"create table t1(id int, val varchar(128), primary key(id))",
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
Expand Down Expand Up @@ -2258,6 +2261,14 @@ func TestPlayerLockErrors(t *testing.T) {
func TestPlayerCancelOnLock(t *testing.T) {
defer deleteTablet(addTablet(100))

// The immediate retry behavior does not apply when doing
// VPlayer Batching.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

execStatements(t, []string{
"create table t1(id int, val varchar(128), primary key(id))",
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,14 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
}
vr.stats.State.Store(state.String())
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id)
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
// If we're batching a transaction, then include the state update
// in the current transaction batch.
if vr.dbClient.InTransaction && vr.dbClient.maxBatchSize > 0 {
vr.dbClient.AddQueryToTrxBatch(query)
} else { // Otherwise, send it down the wire
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
}
if state == vr.state {
return nil
Expand Down

0 comments on commit f9acb77

Please sign in to comment.