Skip to content

Commit

Permalink
Remove BinlogFilePos code
Browse files Browse the repository at this point in the history
In the end it wasn't worth it as we only used it for one variable
and it made backporting more risky/difficult.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 7, 2025
1 parent 17f4f6c commit 4e54f15
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 152 deletions.
72 changes: 0 additions & 72 deletions go/mysql/replication/binlog_file_position.go

This file was deleted.

20 changes: 10 additions & 10 deletions go/mysql/replication/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ReplicationStatus struct {
// it is the executed GTID set. For file replication implementation, it is same as
// FilePosition
Position Position
// RelayLogPosition is the GTID Position that the replica would be at if it
// RelayLogPosition is the Position that the replica would be at if it
// were to finish executing everything that's currently in its relay log.
// However, some MySQL flavors don't expose this information,
// in which case RelayLogPosition.IsZero() will be true.
Expand All @@ -41,14 +41,14 @@ type ReplicationStatus struct {
// executed GTID set and retrieved GTID set. For file replication implementation,
// it is same as RelayLogSourceBinlogEquivalentPosition
RelayLogPosition Position
// FilePosition represents the server's file/pos based replication pseudo GTID position.
// FilePosition stores the position of the source tablets binary log
// upto which the SQL thread of the replica has run.
FilePosition Position
// RelayLogSourceBinlogEquivalentPosition stores the file/pos based replication pseudo
// GTID position up to which the IO thread has read and added to the relay log.
// RelayLogSourceBinlogEquivalentPosition stores the position of the source tablets binary log
// upto which the IO thread has read and added to the relay log
RelayLogSourceBinlogEquivalentPosition Position
// RelayLogFilePosition stores the actual binlog file and position (not any pseudo GTID
// based position) in the relay log file.
RelayLogFilePosition BinlogFilePos
// RelayLogFilePosition stores the position in the relay log file
RelayLogFilePosition Position
SourceServerID uint32
IOState ReplicationState
LastIOError string
Expand Down Expand Up @@ -103,7 +103,7 @@ func ReplicationStatusToProto(s ReplicationStatus) *replicationdatapb.Status {
ReplicationLagSeconds: s.ReplicationLagSeconds,
ReplicationLagUnknown: s.ReplicationLagUnknown,
SqlDelay: s.SQLDelay,
RelayLogFilePosition: s.RelayLogFilePosition.String(),
RelayLogFilePosition: EncodePosition(s.RelayLogFilePosition),
SourceHost: s.SourceHost,
SourceUser: s.SourceUser,
SourcePort: s.SourcePort,
Expand Down Expand Up @@ -139,7 +139,7 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogSourceBinlogEquivalentPosition"))
}
relayFilePos, err := ParseBinlogFilePos(s.RelayLogFilePosition)
relayFilePos, err := DecodePosition(s.RelayLogFilePosition)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode RelayLogFilePosition"))
}
Expand Down Expand Up @@ -366,7 +366,7 @@ func ParseReplicationStatus(fields map[string]string, replica bool) ReplicationS
relayPosStr := fields["Relay_Log_Pos"]
file = fields["Relay_Log_File"]
if file != "" && relayPosStr != "" {
status.RelayLogFilePosition, err = ParseBinlogFilePos(fmt.Sprintf("%s:%s", file, relayPosStr))
status.RelayLogFilePosition.GTIDSet, err = ParseFilePosGTIDSet(fmt.Sprintf("%s:%s", file, relayPosStr))
if err != nil {
log.Warningf("Error parsing GTID set %s:%s: %v", file, relayPosStr, err)
}
Expand Down
31 changes: 15 additions & 16 deletions go/mysql/replication/replication_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func TestMysqlShouldGetPosition(t *testing.T) {
}
got, err := ParseMysqlPrimaryStatus(resultMap)
require.NoError(t, err)
assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet)
assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)
assert.Equalf(t, got.Position.GTIDSet.String(), want.Position.GTIDSet.String(), "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet)
assert.Equalf(t, got.FilePosition.GTIDSet.String(), want.FilePosition.GTIDSet.String(), "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet)
}

func TestMysqlRetrieveMasterServerId(t *testing.T) {
Expand Down Expand Up @@ -181,14 +181,13 @@ func TestMysqlRetrieveFileBasedPositions(t *testing.T) {
want := ReplicationStatus{
FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}},
RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}},
RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309},
RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}},
}
got, err := ParseMysqlReplicationStatus(resultMap, false)
require.NoError(t, err)
assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)
assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition)
assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v",
got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition)
assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet)
assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet)
assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet)
}

func TestMysqlShouldGetLegacyRelayLogPosition(t *testing.T) {
Expand Down Expand Up @@ -257,13 +256,13 @@ func TestMariadbRetrieveFileBasedPositions(t *testing.T) {
want := ReplicationStatus{
FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}},
RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}},
RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309},
RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}},
}
got, err := ParseMariadbReplicationStatus(resultMap)
require.NoError(t, err)
assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition)
assert.Equal(t, got.FilePosition, want.FilePosition, fmt.Sprintf("got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition))
assert.Equal(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, fmt.Sprintf("got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition))
assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet)
assert.Equal(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, fmt.Sprintf("got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet))
assert.Equal(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, fmt.Sprintf("got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet))
}

func TestMariadbShouldGetNilRelayLogPosition(t *testing.T) {
Expand Down Expand Up @@ -305,15 +304,15 @@ func TestFilePosRetrieveExecutedPosition(t *testing.T) {
RelayLogPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}},
FilePosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000002", Pos: 1307}},
RelayLogSourceBinlogEquivalentPosition: Position{GTIDSet: FilePosGTID{File: "master-bin.000003", Pos: 1308}},
RelayLogFilePosition: BinlogFilePos{File: "relay-bin.000004", Pos: 1309},
RelayLogFilePosition: Position{GTIDSet: FilePosGTID{File: "relay-bin.000004", Pos: 1309}},
}
got, err := ParseFilePosReplicationStatus(resultMap)
require.NoError(t, err)
assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet)
assert.Equalf(t, got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet, "got RelayLogPosition: %v; want RelayLogPosition: %v", got.RelayLogPosition.GTIDSet, want.RelayLogPosition.GTIDSet)
assert.Equalf(t, got.RelayLogFilePosition, want.RelayLogFilePosition, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition, want.RelayLogFilePosition)
assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)
assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition, want.RelayLogSourceBinlogEquivalentPosition)
assert.Equalf(t, got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet, "got RelayLogFilePosition: %v; want RelayLogFilePosition: %v", got.RelayLogFilePosition.GTIDSet, want.RelayLogFilePosition.GTIDSet)
assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet)
assert.Equalf(t, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "got RelayLogSourceBinlogEquivalentPosition: %v; want RelayLogSourceBinlogEquivalentPosition: %v", got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, want.RelayLogSourceBinlogEquivalentPosition.GTIDSet)
assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor")
assert.Equalf(t, got.RelayLogPosition.GTIDSet, got.RelayLogSourceBinlogEquivalentPosition.GTIDSet, "RelayLogPosition and RelayLogSourceBinlogEquivalentPosition don't match when they should for the FilePos flavor")
}
Expand All @@ -331,6 +330,6 @@ func TestFilePosShouldGetPosition(t *testing.T) {
got, err := ParseFilePosPrimaryStatus(resultMap)
require.NoError(t, err)
assert.Equalf(t, got.Position.GTIDSet, want.Position.GTIDSet, "got Position: %v; want Position: %v", got.Position.GTIDSet, want.Position.GTIDSet)
assert.Equalf(t, got.FilePosition, want.FilePosition, "got FilePosition: %v; want FilePosition: %v", got.FilePosition, want.FilePosition)
assert.Equalf(t, got.FilePosition.GTIDSet, want.FilePosition.GTIDSet, "got FilePosition: %v; want FilePosition: %v", got.FilePosition.GTIDSet, want.FilePosition.GTIDSet)
assert.Equalf(t, got.Position.GTIDSet, got.FilePosition.GTIDSet, "FilePosition and Position don't match when they should for the FilePos flavor")
}
12 changes: 5 additions & 7 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type FakeMysqlDaemon struct {

// CurrentSourceFilePosition is used to determine the executed
// file based positioning of the replication source.
CurrentSourceFilePosition replication.BinlogFilePos
CurrentSourceFilePosition replication.Position

// ReplicationStatusError is used by ReplicationStatus.
ReplicationStatusError error
Expand Down Expand Up @@ -315,20 +315,19 @@ func (fmd *FakeMysqlDaemon) ReplicationStatus(ctx context.Context) (replication.
}
fmd.mu.Lock()
defer fmd.mu.Unlock()
filePos, err := fmd.CurrentSourceFilePosition.ConvertToFlavorPosition()
return replication.ReplicationStatus{
Position: fmd.CurrentPrimaryPosition,
FilePosition: filePos,
FilePosition: fmd.CurrentSourceFilePosition,
RelayLogPosition: fmd.CurrentRelayLogPosition,
RelayLogSourceBinlogEquivalentPosition: filePos,
RelayLogSourceBinlogEquivalentPosition: fmd.CurrentSourceFilePosition,
ReplicationLagSeconds: fmd.ReplicationLagSeconds,
// Implemented as AND to avoid changing all tests that were
// previously using Replicating = false.
IOState: replication.ReplicationStatusToState(fmt.Sprintf("%v", fmd.Replicating && fmd.IOThreadRunning)),
SQLState: replication.ReplicationStatusToState(fmt.Sprintf("%v", fmd.Replicating)),
SourceHost: fmd.CurrentSourceHost,
SourcePort: fmd.CurrentSourcePort,
}, err
}, nil
}

// PrimaryStatus is part of the MysqlDaemon interface.
Expand All @@ -339,10 +338,9 @@ func (fmd *FakeMysqlDaemon) PrimaryStatus(ctx context.Context) (replication.Prim
return replication.PrimaryStatus{}, fmd.PrimaryStatusError
}
serverUUID, _ := fmd.GetServerUUID(ctx)
filePos, _ := fmd.CurrentSourceFilePosition.ConvertToFlavorPosition()
return replication.PrimaryStatus{
Position: fmd.CurrentPrimaryPosition,
FilePosition: filePos,
FilePosition: fmd.CurrentSourceFilePosition,
ServerUUID: serverUUID,
}, nil
}
Expand Down
17 changes: 1 addition & 16 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func ReadTopologyInstanceBufferable(tabletAlias string, latency *stopwatch.Named
errorChan <- err
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()

binlogPos, err = getBinlogCoordinatesFromString(fs.ReplicationStatus.FilePosition)
binlogPos, err = getBinlogCoordinatesFromPositionString(fs.ReplicationStatus.FilePosition)
instance.RelaylogCoordinates = binlogPos
instance.RelaylogCoordinates.Type = RelayLog
errorChan <- err
Expand Down Expand Up @@ -450,21 +450,6 @@ func getKeyspaceShardName(keyspace, shard string) string {
return fmt.Sprintf("%v:%v", keyspace, shard)
}

// getBinlogCoordinatesFromString is a bridge function to support upgrades and
// downgrades when a given string may be in one of two formats depending on
// the component versions:
// 1. GTID position, e.g. FilePos/relay-bin.000001:12345
// 2. A simple binlog file position, e.g. relay-bin.000001:12345
func getBinlogCoordinatesFromString(s string) (BinlogCoordinates, error) {
if strings.Contains(s, "/") {
// We have a GTID position which includes the flavor.
return getBinlogCoordinatesFromPositionString(s)
}
// We have a simple binlog file position.
binLogCoordinates, err := ParseBinlogCoordinates(s)
return *binLogCoordinates, err
}

func getBinlogCoordinatesFromPositionString(position string) (BinlogCoordinates, error) {
pos, err := replication.DecodePosition(position)
if err != nil || pos.GTIDSet == nil {
Expand Down
Loading

0 comments on commit 4e54f15

Please sign in to comment.