Skip to content

Commit

Permalink
Fix Errant GTID detection logic in SetReplicationSource (#17031)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Oct 22, 2024
1 parent e598a87 commit c429d51
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 95 deletions.
10 changes: 8 additions & 2 deletions go/mysql/replication/mysql56_gtid_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (set Mysql56GTIDSet) SIDBlock() []byte {
}

// ErrantGTIDsOnReplica gets the errant GTIDs on the replica by comparing against the primary position and UUID.
func ErrantGTIDsOnReplica(replicaPosition Position, primaryPosition Position) (string, error) {
func ErrantGTIDsOnReplica(replicaPosition Position, primaryPosition Position, primarySid SID) (string, error) {
replicaGTIDSet, replicaOk := replicaPosition.GTIDSet.(Mysql56GTIDSet)
primaryGTIDSet, primaryOk := primaryPosition.GTIDSet.(Mysql56GTIDSet)

Expand All @@ -478,7 +478,13 @@ func ErrantGTIDsOnReplica(replicaPosition Position, primaryPosition Position) (s
}

// Calculate the difference between the replica and primary GTID sets.
diffSet := replicaGTIDSet.Difference(primaryGTIDSet)
// We discount the writes from the primary server first. This is required because sometimes
// the replica might advertise itself as more advanced than the primary, and we don't want to
// incorrectly mark GTIDs errant.
// For example, it is perfectly valid to see the replica GTID set be `ff8ecd9a-8f92-11ef-b369-733dd679dde6:1-33`
// while that of the primary be `ff8ecd9a-8f92-11ef-b369-733dd679dde6:1-29` when `ff8ecd9a-8f92-11ef-b369-733dd679dde6`
// is the primary server UUID. In this case, the replica is not errant.
diffSet := replicaGTIDSet.RemoveUUID(primarySid).Difference(primaryGTIDSet)
return diffSet.String(), nil
}

Expand Down
21 changes: 20 additions & 1 deletion go/mysql/replication/mysql56_gtid_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,34 +756,51 @@ func TestErrantGTIDsOnReplica(t *testing.T) {
name string
replicaPosition string
primaryPosition string
primarySID string
errantGtidWanted string
wantErr string
}{
{
name: "Empty replica position",
replicaPosition: "MySQL56/",
primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
primarySID: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9",
errantGtidWanted: "",
}, {
name: "Empty primary position",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
primaryPosition: "MySQL56/",
primarySID: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e",
errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
}, {
name: "Primary seen as lagging for its own writes",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-33",
primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-29",
primarySID: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9",
}, {
name: "Empty primary position - with multiple errant gtids",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
primaryPosition: "MySQL56/",
primarySID: "8bc65cca-3fe4-11ed-bbfb-091034d49c4f",
errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
}, {
name: "Single errant GTID",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30",
primarySID: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9",
errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
}, {
name: "Multiple errant GTID",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-35",
primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
primarySID: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9",
errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:31-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-33:35",
}, {
name: "Multiple errant GTID after discounting primary writes",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-10,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-35",
primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
primarySID: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e",
errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:9-10,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-33:35",
},
}
for _, tt := range tests {
Expand All @@ -792,7 +809,9 @@ func TestErrantGTIDsOnReplica(t *testing.T) {
require.NoError(t, err)
primaryPos, err := DecodePosition(tt.primaryPosition)
require.NoError(t, err)
errantGTIDs, err := ErrantGTIDsOnReplica(replPos, primaryPos)
primarySID, err := ParseSID(tt.primarySID)
require.NoError(t, err)
errantGTIDs, err := ErrantGTIDsOnReplica(replPos, primaryPos, primarySID)
if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
Expand Down
3 changes: 3 additions & 0 deletions go/mysql/replication/primary_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ type PrimaryStatus struct {
Position Position
// FilePosition represents the server's file based position.
FilePosition Position
// ServerUUID is the UUID of the server.
ServerUUID string
}

// PrimaryStatusToProto translates a PrimaryStatus to proto3.
func PrimaryStatusToProto(s PrimaryStatus) *replicationdatapb.PrimaryStatus {
return &replicationdatapb.PrimaryStatus{
Position: EncodePosition(s.Position),
FilePosition: EncodePosition(s.FilePosition),
ServerUuid: s.ServerUUID,
}
}

Expand Down
7 changes: 7 additions & 0 deletions go/test/endtoend/utils/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ func TestPrimaryStatus(t *testing.T) {
assert.NoError(t, err)

assert.True(t, res.Position.Equal(r.Position), "primary replication status should be same as replication status here")

suuid, err := mysqld.GetServerUUID(context.Background())
assert.NoError(t, err)
assert.NotEmpty(t, suuid)

// The server UUID read from primary status and GetServerUUID should match
assert.Equal(t, suuid, res.ServerUUID)
}

func TestReplicationConfiguration(t *testing.T) {
Expand Down
10 changes: 9 additions & 1 deletion go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type FakeMysqlDaemon struct {
// return an error.
MysqlPort atomic.Int32

// ServerUUID is the server's UUID.
ServerUUID string

// Replicating is updated when calling StartReplication /
// StopReplication (it is not used at all when calling
// ReplicationStatus, it is the test owner responsibility
Expand Down Expand Up @@ -299,7 +302,10 @@ func (fmd *FakeMysqlDaemon) GetServerID(ctx context.Context) (uint32, error) {

// GetServerUUID is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) GetServerUUID(ctx context.Context) (string, error) {
return "000000", nil
if fmd.ServerUUID != "" {
return fmd.ServerUUID, nil
}
return "00000000-0000-0000-0000-000000000000", nil
}

// ReplicationStatus is part of the MysqlDaemon interface.
Expand Down Expand Up @@ -331,9 +337,11 @@ func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.Prim
if fmd.PrimaryStatusError != nil {
return replication.PrimaryStatus{}, fmd.PrimaryStatusError
}
serverUUID, _ := fmd.GetServerUUID(ctx)
return replication.PrimaryStatus{
Position: fmd.CurrentPrimaryPosition,
FilePosition: fmd.CurrentSourceFilePosition,
ServerUUID: serverUUID,
}, nil
}

Expand Down
10 changes: 9 additions & 1 deletion go/vt/mysqlctl/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,15 @@ func (mysqld *Mysqld) PrimaryStatus(ctx context.Context) (replication.PrimarySta
}
defer conn.Recycle()

return conn.Conn.ShowPrimaryStatus()
primaryStatus, err := conn.Conn.ShowPrimaryStatus()
if err != nil {
return replication.PrimaryStatus{}, err
}
primaryStatus.ServerUUID, err = conn.Conn.GetServerUUID()
if err != nil {
return replication.PrimaryStatus{}, err
}
return primaryStatus, nil
}

func (mysqld *Mysqld) ReplicationConfiguration(ctx context.Context) (*replicationdata.Configuration, error) {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/mysqlctl/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func TestPrimaryStatus(t *testing.T) {

db.AddQuery("SELECT 1", &sqltypes.Result{})
db.AddQuery("SHOW MASTER STATUS", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "test_status"))
db.AddQuery("SELECT @@global.server_uuid", sqltypes.MakeTestResult(sqltypes.MakeTestFields("test_field", "varchar"), "test_uuid"))

testMysqld := NewMysqld(dbc)
defer testMysqld.Close()
Expand All @@ -295,6 +296,7 @@ func TestPrimaryStatus(t *testing.T) {
res, err := testMysqld.PrimaryStatus(ctx)
assert.NoError(t, err)
assert.NotNil(t, res)
assert.EqualValues(t, "test_uuid", res.ServerUUID)

db.AddQuery("SHOW MASTER STATUS", &sqltypes.Result{})
_, err = testMysqld.PrimaryStatus(ctx)
Expand Down
Loading

0 comments on commit c429d51

Please sign in to comment.