Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Enable VPlayerBatching in unit tests #17339

Merged
merged 5 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ func TestExternalConnectorPlay(t *testing.T) {

expectDBClientAndVreplicationQueries(t, []string{
"begin",
"insert into tab1(id,val) values (1,_binary'a')",
"insert into tab1(id,val) values (2,_binary'b')",
"insert into tab1(id,val) values (1,_binary'a'), (2,_binary'b')",
"/update _vt.vreplication set pos=",
"commit",
}, pos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func setup(ctx context.Context) (func(), int) {
resetBinlogClient()

vttablet.InitVReplicationConfigDefaults()
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the flags were disabled for all of the unit tests and I did not notice it previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want this PR to be able to stand on its own, I could change this line to vttablet.DefaultVReplicationConfig.ExperimentalFlags = 7 rather than removing it.


// Engines cannot be initialized in testenv because it introduces circular dependencies.
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0])
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,14 @@ func testPlayerCopyBigTable(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

// The test is written to match the behavior w/o
// VReplicationExperimentalFlagOptimizeInserts enabled.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
Expand Down Expand Up @@ -814,6 +822,14 @@ func testPlayerCopyWildcardRule(t *testing.T) {
reset := vstreamer.AdjustPacketSize(1)
defer reset()

// The test is written to match the behavior w/o
// VReplicationExperimentalFlagOptimizeInserts enabled.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

savedCopyPhaseDuration := vttablet.DefaultVReplicationConfig.CopyPhaseDuration
// copyPhaseDuration should be low enough to have time to send one row.
vttablet.DefaultVReplicationConfig.CopyPhaseDuration = 500 * time.Millisecond
Expand Down
21 changes: 12 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,19 @@ func (vc *vdbClient) Begin() error {
if vc.InTransaction {
return nil
}
if err := vc.DBClient.Begin(); err != nil {
return err
if vc.maxBatchSize > 0 {
// We are batching the contents of the transaction, which
// starts with the BEGIN and ends with the COMMIT, so we
// do not send a BEGIN down the wire ahead of time.
vc.queriesPos = int64(len(vc.queries))
vc.batchSize = 6 // begin and semicolon
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's not new here and moved, but should we avoid the magic constant here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a magic constant, it's just the known length of "begin;"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it clearer in the comment:

Suggested change
vc.batchSize = 6 // begin and semicolon
vc.batchSize = 6 // equal to `len("begin;")`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed it here: 26f93d4

} else {
// We're not batching so we start the transaction here
// by sending the BEGIN down the wire.
if err := vc.DBClient.Begin(); err != nil {
return err
}
mattlord marked this conversation as resolved.
Show resolved Hide resolved
}

// If we're batching, we only batch the contents of the
// transaction, which starts with the begin and ends with
// the commit.
vc.queriesPos = int64(len(vc.queries))
vc.batchSize = 6 // begin and semicolon

vc.queries = append(vc.queries, "begin")
vc.InTransaction = true
vc.startTime = time.Now()
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
return vr.dbClient.Commit()
}
batchMode := false
if vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
// We only do batching in the running/replicating phase.
if len(copyState) == 0 && vr.workflowConfig.ExperimentalFlags&vttablet.VReplicationExperimentalFlagVPlayerBatching != 0 {
batchMode = true
}
if batchMode {
Expand Down
31 changes: 21 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) {

// It does not work when filter is enabled
output := qh.Expect(
"begin",
"rollback",
fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg),
)
Expand Down Expand Up @@ -975,8 +974,7 @@ func TestPlayerFilters(t *testing.T) {
input: "insert into src4 values (1,100,'aaa'),(2,200,'bbb'),(3,100,'ccc')",
output: qh.Expect(
"begin",
"insert into dst4(id1,val) values (1,_binary'aaa')",
"insert into dst4(id1,val) values (3,_binary'ccc')",
"insert into dst4(id1,val) values (1,_binary'aaa'), (3,_binary'ccc')",
"/update _vt.vreplication set pos=",
"commit",
),
Expand All @@ -987,8 +985,7 @@ func TestPlayerFilters(t *testing.T) {
input: "insert into src5 values (1,100,'abc'),(2,200,'xyz'),(3,100,'xyz'),(4,300,'abc'),(5,200,'xyz')",
output: qh.Expect(
"begin",
"insert into dst5(id1,val) values (1,_binary'abc')",
"insert into dst5(id1,val) values (4,_binary'abc')",
"insert into dst5(id1,val) values (1,_binary'abc'), (4,_binary'abc')",
"/update _vt.vreplication set pos=",
"commit",
),
Expand Down Expand Up @@ -1495,17 +1492,15 @@ func TestPlayerRowMove(t *testing.T) {
})
expectDBClientQueries(t, qh.Expect(
"begin",
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (2,ifnull(2, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"insert into dst(val1,sval2,rcount) values (1,ifnull(1, 0),1), (2,ifnull(2, 0),1), (2,ifnull(3, 0),1) on duplicate key update sval2=sval2+ifnull(values(sval2), 0), rcount=rcount+1",
"/update _vt.vreplication set pos=",
"commit",
))
expectData(t, "dst", [][]string{
{"1", "1", "1"},
{"2", "5", "2"},
})
validateQueryCountStat(t, "replicate", 3)
validateQueryCountStat(t, "replicate", 1)

execStatements(t, []string{
"update src set val1=1, val2=4 where id=3",
Expand All @@ -1521,7 +1516,7 @@ func TestPlayerRowMove(t *testing.T) {
{"1", "5", "2"},
{"2", "2", "1"},
})
validateQueryCountStat(t, "replicate", 5)
validateQueryCountStat(t, "replicate", 3)
}

func TestPlayerTypes(t *testing.T) {
Expand Down Expand Up @@ -2179,6 +2174,14 @@ func TestPlayerSplitTransaction(t *testing.T) {
func TestPlayerLockErrors(t *testing.T) {
defer deleteTablet(addTablet(100))

// The immediate retry behavior does not apply when doing
// VPlayer Batching.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

execStatements(t, []string{
"create table t1(id int, val varchar(128), primary key(id))",
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
Expand Down Expand Up @@ -2258,6 +2261,14 @@ func TestPlayerLockErrors(t *testing.T) {
func TestPlayerCancelOnLock(t *testing.T) {
defer deleteTablet(addTablet(100))

// The immediate retry behavior does not apply when doing
// VPlayer Batching.
origExperimentalFlags := vttablet.DefaultVReplicationConfig.ExperimentalFlags
vttablet.DefaultVReplicationConfig.ExperimentalFlags = 0
defer func() {
vttablet.DefaultVReplicationConfig.ExperimentalFlags = origExperimentalFlags
}()

execStatements(t, []string{
"create table t1(id int, val varchar(128), primary key(id))",
fmt.Sprintf("create table %s.t1(id int, val varchar(128), primary key(id))", vrepldb),
Expand Down
10 changes: 8 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,14 @@ func (vr *vreplicator) setState(state binlogdatapb.VReplicationWorkflowState, me
}
vr.stats.State.Store(state.String())
query := fmt.Sprintf("update _vt.vreplication set state='%v', message=%v where id=%v", state, encodeString(binlogplayer.MessageTruncate(message)), vr.id)
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
// If we're batching a transaction, then include the state update
// in the current transaction batch.
if vr.dbClient.InTransaction && vr.dbClient.maxBatchSize > 0 {
vr.dbClient.AddQueryToTrxBatch(query)
} else { // Otherwise, send it down the wire
if _, err := vr.dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("could not set state: %v: %v", query, err)
}
Comment on lines +511 to +518
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for the noted extraneous state update that surfaced in the unit tests. This caused no known issues, but was unnecessary and corrected here.

}
if state == vr.state {
return nil
Expand Down
Loading