Skip to content

Commit

Permalink
[release-21.0] Flaky TestMoveTables(Un)sharded: Handle race condition (
Browse files Browse the repository at this point in the history
…#17440) (#17455)

Signed-off-by: Rohit Nayak <[email protected]>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
  • Loading branch information
vitess-bot[bot] authored Jan 2, 2025
1 parent 3f4de79 commit e7af629
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 3 deletions.
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletconntest/fakequeryservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,8 @@ func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*quer

// VStream is part of the queryservice.QueryService interface
func (f *FakeQueryService) VStream(ctx context.Context, request *binlogdatapb.VStreamRequest, send func([]*binlogdatapb.VEvent) error) error {
panic("not implemented")
// This is called as part of vreplication unit tests, so we don't panic here.
return fmt.Errorf("VStream not implemented")
}

// VStreamRows is part of the QueryService interface.
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/constants/sidecar"
Expand Down Expand Up @@ -1764,6 +1766,14 @@ func addInvariants(dbClient *binlogplayer.MockDBClient, vreplID, sourceTabletUID
))
dbClient.AddInvariant(fmt.Sprintf(updatePickedSourceTablet, cell, sourceTabletUID, vreplID), &sqltypes.Result{})
dbClient.AddInvariant("update _vt.vreplication set state='Running', message='' where id=1", &sqltypes.Result{})
dbClient.AddInvariant(vreplication.SqlMaxAllowedPacket, sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"max_allowed_packet",
"int64",
),
"65536",
))
dbClient.AddInvariant("update _vt.vreplication set message", &sqltypes.Result{})
}

func addMaterializeSettingsTablesToSchema(ms *vtctldatapb.MaterializeSettings, tenv *testEnv, venv *vtenv.Environment) {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
settings.StopPos = pausePos
saveStop = false
}

log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v",
vr.id, settings.StartPos, settings.StopPos, vr.source.Filter)
queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
}
Expand All @@ -141,7 +142,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
maxAllowedPacket := int64(vr.workflowConfig.RelayLogMaxSize)
// We explicitly do NOT want to batch this, we want to send it down the wire
// immediately so we use ExecuteFetch directly.
res, err := vr.dbClient.ExecuteFetch("select @@session.max_allowed_packet as max_allowed_packet", 1)
res, err := vr.dbClient.ExecuteFetch(SqlMaxAllowedPacket, 1)
if err != nil {
log.Errorf("Error getting max_allowed_packet, will use the relay_log_max_size value of %d bytes: %v", vr.workflowConfig.RelayLogMaxSize, err)
} else {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ const (
json_unquote(json_extract(action, '$.type'))=%a and vrepl_id=%a and table_name=%a`
sqlDeletePostCopyAction = `delete from _vt.post_copy_action where vrepl_id=%a and
table_name=%a and id=%a`
SqlMaxAllowedPacket = "select @@session.max_allowed_packet as max_allowed_packet"
)

// vreplicator provides the core logic to start vreplication streams
Expand Down

0 comments on commit e7af629

Please sign in to comment.