Skip to content

Commit

Permalink
Fix: Errant GTID detection on the replicas when they set replication …
Browse files Browse the repository at this point in the history
…source (#16833)

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Oct 3, 2024
1 parent 24212e7 commit eab262e
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 57 deletions.
9 changes: 9 additions & 0 deletions changelog/21.0/21.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- **[Dynamic VReplication Configuration](#dynamic-vreplication-configuration)**
- **[Reference Table Materialization](#reference-table-materialization)**
- **[New VEXPLAIN Modes: TRACE and KEYS](#new-vexplain-modes)**
- **[Errant GTID Detection on Vttablets](#errant-gtid-vttablet)**

## <a id="major-changes"/>Major Changes

Expand Down Expand Up @@ -197,3 +198,11 @@ The KEYS mode for VEXPLAIN offers a concise summary of query structure, highligh
KEYS mode analyzes the query structure without executing it, providing JSON output that includes grouping columns, join columns, filter columns (potential candidates for indexes, primary keys, or sharding keys), and the statement type.

These new VEXPLAIN modes enhance Vitess's query analysis capabilities, allowing for more informed decisions about sharding strategies and query optimization.

### <a id="errant-gtid-vttablet"/>Errant GTID Detection on Vttablets

Vttablets now run an errant GTID detection logic before they join the replication stream. So, if a replica has an errant GTID, it will
not start replicating from the primary. It will fail the call the set its replication source because of the errant GTID. This prevents us
from running into situations from which recovery is very hard.

For users running with the vitess operator on kubernetes, this change means that the replicas with errant GTIDs will have broken replication and will report as unready. The users will need to manually clean up these errant replica tablets.
27 changes: 27 additions & 0 deletions go/mysql/replication/mysql56_gtid_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,33 @@ func (set Mysql56GTIDSet) SIDBlock() []byte {
return buf.Bytes()
}

// ErrantGTIDsOnReplica gets the errant GTIDs on the replica by comparing against the primary position and UUID.
func ErrantGTIDsOnReplica(replicaPosition Position, primaryPosition Position) (string, error) {
replicaGTIDSet, replicaOk := replicaPosition.GTIDSet.(Mysql56GTIDSet)
primaryGTIDSet, primaryOk := primaryPosition.GTIDSet.(Mysql56GTIDSet)

// Currently we only support errant GTID detection for MySQL 56 flavour.
if !replicaOk || !primaryOk {
return "", nil
}

// Calculate the difference between the replica and primary GTID sets.
diffSet := replicaGTIDSet.Difference(primaryGTIDSet)
return diffSet.String(), nil
}

// RemoveUUID removes a specific UUID from the gtid set.
func (set Mysql56GTIDSet) RemoveUUID(uuid SID) Mysql56GTIDSet {
newSet := make(Mysql56GTIDSet)
for sid, intervals := range set {
if sid == uuid {
continue
}
newSet[sid] = intervals
}
return newSet
}

// Difference will supply the difference between the receiver and supplied Mysql56GTIDSets, and supply the result
// as a Mysql56GTIDSet.
func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet {
Expand Down
85 changes: 85 additions & 0 deletions go/mysql/replication/mysql56_gtid_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,3 +704,88 @@ func BenchmarkMySQL56GTIDParsing(b *testing.B) {
}
}
}

func TestErrantGTIDsOnReplica(t *testing.T) {
tests := []struct {
name string
replicaPosition string
primaryPosition string
errantGtidWanted string
wantErr string
}{
{
name: "Empty replica position",
replicaPosition: "MySQL56/",
primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
errantGtidWanted: "",
}, {
name: "Empty primary position",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
primaryPosition: "MySQL56/",
errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8",
}, {
name: "Empty primary position - with multiple errant gtids",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
primaryPosition: "MySQL56/",
errantGtidWanted: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1",
}, {
name: "Single errant GTID",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30",
errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
}, {
name: "Multiple errant GTID",
replicaPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-35",
primaryPosition: "MySQL56/8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-50,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1-30,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:34",
errantGtidWanted: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:31-32,8bc65cca-3fe4-11ed-bbfb-091034d48bd3:3-33:35",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
replPos, err := DecodePosition(tt.replicaPosition)
require.NoError(t, err)
primaryPos, err := DecodePosition(tt.primaryPosition)
require.NoError(t, err)
errantGTIDs, err := ErrantGTIDsOnReplica(replPos, primaryPos)
if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
require.EqualValues(t, tt.errantGtidWanted, errantGTIDs)
}

})
}
}

func TestMysql56GTIDSet_RemoveUUID(t *testing.T) {
tests := []struct {
name string
initialSet string
uuid string
wantSet string
}{
{
name: "Remove unknown UUID",
initialSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
uuid: "8bc65c84-3fe4-11ed-a912-257f0fcde6c9",
wantSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
},
{
name: "Remove a single UUID",
initialSet: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9:1-8,8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
uuid: "8bc65c84-3fe4-11ed-a912-257f0fcdd6c9",
wantSet: "8bc65cca-3fe4-11ed-bbfb-091034d48b3e:1:4-24",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gtidSet, err := ParseMysql56GTIDSet(tt.initialSet)
require.NoError(t, err)
sid, err := ParseSID(tt.uuid)
require.NoError(t, err)
gtidSet = gtidSet.RemoveUUID(sid)
require.EqualValues(t, tt.wantSet, gtidSet.String())
})
}
}
4 changes: 4 additions & 0 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type FakeMysqlDaemon struct {
// and ReplicationStatus.
CurrentPrimaryPosition replication.Position

// CurrentRelayLogPosition is returned by ReplicationStatus.
CurrentRelayLogPosition replication.Position

// CurrentSourceFilePosition is used to determine the executed
// file based positioning of the replication source.
CurrentSourceFilePosition replication.Position
Expand Down Expand Up @@ -313,6 +316,7 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.
return replication.ReplicationStatus{
Position: fmd.CurrentPrimaryPosition,
FilePosition: fmd.CurrentSourceFilePosition,
RelayLogPosition: fmd.CurrentRelayLogPosition,
RelayLogSourceBinlogEquivalentPosition: fmd.CurrentSourceFilePosition,
ReplicationLagSeconds: fmd.ReplicationLagSeconds,
// Implemented as AND to avoid changing all tests that were
Expand Down
48 changes: 14 additions & 34 deletions go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,15 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/backupstats"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// This file handles the initial backup restore upon startup.
Expand Down Expand Up @@ -326,7 +323,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
} else if keyspaceInfo.KeyspaceType == topodatapb.KeyspaceType_NORMAL {
// Reconnect to primary only for "NORMAL" keyspaces
params.Logger.Infof("Restore: starting replication at position %v", pos)
if err := tm.startReplication(context.Background(), pos, originalType); err != nil {
if err := tm.startReplication(ctx, pos, originalType); err != nil {
return err
}
}
Expand Down Expand Up @@ -577,47 +574,30 @@ func (tm *TabletManager) disableReplication(ctx context.Context) error {
}

func (tm *TabletManager) startReplication(ctx context.Context, pos replication.Position, tabletType topodatapb.TabletType) error {
if err := tm.MysqlDaemon.StopReplication(ctx, nil); err != nil {
// The first three steps of stopping replication, and setting the replication position,
// we want to do even if the context expires, so we use a background context for these tasks.
if err := tm.MysqlDaemon.StopReplication(context.Background(), nil); err != nil {
return vterrors.Wrap(err, "failed to stop replication")
}
if err := tm.MysqlDaemon.ResetReplicationParameters(ctx); err != nil {
if err := tm.MysqlDaemon.ResetReplicationParameters(context.Background()); err != nil {
return vterrors.Wrap(err, "failed to reset replication")
}

// Set the position at which to resume from the primary.
if err := tm.MysqlDaemon.SetReplicationPosition(ctx, pos); err != nil {
if err := tm.MysqlDaemon.SetReplicationPosition(context.Background(), pos); err != nil {
return vterrors.Wrap(err, "failed to set replication position")
}

primary, err := tm.initializeReplication(ctx, tabletType)
primaryPosStr, err := tm.initializeReplication(ctx, tabletType)
// If we ran into an error while initializing replication, then there is no point in waiting for catch-up.
// Also, if there is no primary tablet in the shard, we don't need to proceed further.
if err != nil || primary == nil {
if err != nil || primaryPosStr == "" {
return err
}

// wait for reliable replication_lag_seconds
// we have pos where we want to resume from
// if PrimaryPosition is the same, that means no writes
// have happened to primary, so we are up-to-date
// otherwise, wait for replica's Position to change from
// the initial pos before proceeding
tmc := tmclient.NewTabletManagerClient()
defer tmc.Close()
remoteCtx, remoteCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer remoteCancel()
posStr, err := tmc.PrimaryPosition(remoteCtx, primary.Tablet)
if err != nil {
// It is possible that though PrimaryAlias is set, the primary tablet is unreachable
// Log a warning and let tablet restore in that case
// If we had instead considered this fatal, all tablets would crash-loop
// until a primary appears, which would make it impossible to elect a primary.
log.Warningf("Can't get primary replication position after restore: %v", err)
return nil
}
primaryPos, err := replication.DecodePosition(posStr)
primaryPos, err := replication.DecodePosition(primaryPosStr)
if err != nil {
return vterrors.Wrapf(err, "can't decode primary replication position: %q", posStr)
return vterrors.Wrapf(err, "can't decode primary replication position: %q", primaryPos)
}

if !pos.Equal(primaryPos) {
Expand Down
30 changes: 30 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
wasReplicating := false
shouldbeReplicating := false
status, err := tm.MysqlDaemon.ReplicationStatus(ctx)
replicaPosition := status.RelayLogPosition
if err == mysql.ErrNotReplica {
// This is a special error that means we actually succeeded in reading
// the status, but the status is empty because replication is not
Expand All @@ -717,6 +718,12 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
// Since we continue in the case of this error, make sure 'status' is
// in a known, empty state.
status = replication.ReplicationStatus{}
// The replica position we use for the errant GTID detection should be the executed
// GTID set since this tablet is not running replication at all.
replicaPosition, err = tm.MysqlDaemon.PrimaryPosition(ctx)
if err != nil {
return err
}
} else if err != nil {
// Abort on any other non-nil error.
return err
Expand Down Expand Up @@ -748,12 +755,35 @@ func (tm *TabletManager) setReplicationSourceLocked(ctx context.Context, parentA
if err != nil {
return err
}

host := parent.Tablet.MysqlHostname
port := parent.Tablet.MysqlPort
// If host is empty, then we shouldn't even attempt the reparent. That tablet has already shutdown.
if host == "" {
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, "Shard primary has empty mysql hostname")
}
// Errant GTID detection.
{
// Find the executed GTID set of the tablet that we are reparenting to.
// We will then compare our own position against it to verify that we don't
// have an errant GTID. If we find any GTID that we have, but the primary doesn't,
// we will not enter the replication graph and instead fail replication.
primaryPositionStr, err := tm.tmc.PrimaryPosition(ctx, parent.Tablet)
if err != nil {
return err
}
primaryPosition, err := replication.DecodePosition(primaryPositionStr)
if err != nil {
return err
}
errantGtid, err := replication.ErrantGTIDsOnReplica(replicaPosition, primaryPosition)
if err != nil {
return err
}
if errantGtid != "" {
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("Errant GTID detected - %s; Primary GTID - %s, Replica GTID - %s", errantGtid, primaryPosition, replicaPosition.String()))
}
}
if status.SourceHost != host || status.SourcePort != port || heartbeatInterval != 0 {
// This handles both changing the address and starting replication.
if err := tm.MysqlDaemon.SetReplicationSource(ctx, host, port, heartbeatInterval, wasReplicating, shouldbeReplicating); err != nil {
Expand Down
Loading

0 comments on commit eab262e

Please sign in to comment.