Skip to content

Commit

Permalink
Point in time recovery and restore: assume (and validate) MySQL56 fla…
Browse files Browse the repository at this point in the history
…vor in position arguments (#15599)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Apr 10, 2024
1 parent 680d05f commit 9c59280
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 13 deletions.
33 changes: 33 additions & 0 deletions go/mysql/replication/mysql56_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
// Mysql56FlavorID is the string identifier for the Mysql56 flavor.
const Mysql56FlavorID = "MySQL56"

var (
ErrExpectMysql56Flavor = vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "expected MySQL GTID position but found a different or invalid format.")
)

// parseMysql56GTID is registered as a GTID parser.
func parseMysql56GTID(s string) (GTID, error) {
// Split into parts.
Expand Down Expand Up @@ -128,3 +132,32 @@ func (gtid Mysql56GTID) GTIDSet() GTIDSet {
func init() {
gtidParsers[Mysql56FlavorID] = parseMysql56GTID
}

// DecodePositionMySQL56 converts a string into a Position value with the MySQL56 flavor. The function returns an error if the given
// string does not translate to a MySQL56 GTID set.
// The prefix "MySQL56/" is optional in the input string. Examples of inputs strings that produce valid result:
// - "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615"
// - "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615"
func DecodePositionMySQL56(s string) (rp Position, gtidSet Mysql56GTIDSet, err error) {
if s == "" {
return rp, nil, nil
}

flav, gtid, ok := strings.Cut(s, "/")
if !ok {
gtid = s
flav = Mysql56FlavorID
}
rp, err = ParsePosition(flav, gtid)
if err != nil {
return rp, nil, err
}
if !rp.MatchesFlavor(Mysql56FlavorID) {
return rp, nil, vterrors.Wrapf(ErrExpectMysql56Flavor, s)
}
gtidSet, ok = rp.GTIDSet.(Mysql56GTIDSet)
if !ok {
return rp, nil, vterrors.Wrapf(ErrExpectMysql56Flavor, s)
}
return rp, gtidSet, nil
}
47 changes: 47 additions & 0 deletions go/mysql/replication/mysql56_gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,50 @@ func TestMysql56ParseGTID(t *testing.T) {
require.NoError(t, err, "unexpected error: %v", err)
assert.Equal(t, want, got, "(&mysql56{}).ParseGTID(%#v) = %#v, want %#v", input, got, want)
}

func TestDecodePositionMySQL56(t *testing.T) {
{
pos, gtidSet, err := DecodePositionMySQL56("")
assert.NoError(t, err)
assert.True(t, pos.IsZero())
assert.Nil(t, gtidSet)
}
{
pos, gtidSet, err := DecodePositionMySQL56("MySQL56/00010203-0405-0607-0809-0A0B0C0D0E0F:1-615")
assert.NoError(t, err)
assert.False(t, pos.IsZero())
assert.NotNil(t, gtidSet)
expectGTID := Mysql56GTIDSet{
SID{
0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf,
}: []interval{{start: 1, end: 615}}}
assert.Equal(t, expectGTID, gtidSet)
}
{
pos, gtidSet, err := DecodePositionMySQL56("00010203-0405-0607-0809-0A0B0C0D0E0F:1-615")
assert.NoError(t, err)
assert.False(t, pos.IsZero())
assert.NotNil(t, gtidSet)
expectGTID := Mysql56GTIDSet{
SID{
0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf,
}: []interval{{start: 1, end: 615}}}
assert.Equal(t, expectGTID, gtidSet)
}
{
_, _, err := DecodePositionMySQL56("q-22b6-11ed-b765-0a43f95f28a3:1-615")
assert.Error(t, err)
}
{
_, _, err := DecodePositionMySQL56("16b1039f-22b6-11ed-b765-0a43f95f28a3")
assert.Error(t, err)
}
{
_, _, err := DecodePositionMySQL56("FilePos/mysql-bin.000001:234")
assert.Error(t, err)
}
{
_, _, err := DecodePositionMySQL56("mysql-bin.000001:234")
assert.Error(t, err)
}
}
8 changes: 8 additions & 0 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"os/exec"
"path"
Expand Down Expand Up @@ -1299,6 +1300,13 @@ func TestReplicaRestoreToPos(t *testing.T, replicaIndex int, restoreToPos replic

require.False(t, restoreToPos.IsZero())
restoreToPosArg := replication.EncodePosition(restoreToPos)
assert.Contains(t, restoreToPosArg, "MySQL56/")
if rand.Intn(2) == 0 {
// Verify that restore works whether or not the MySQL56/ prefix is present.
restoreToPosArg = strings.Replace(restoreToPosArg, "MySQL56/", "", 1)
assert.NotContains(t, restoreToPosArg, "MySQL56/")
}

output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("RestoreFromBackup", "--restore-to-pos", restoreToPosArg, replica.Alias)
if expectError != "" {
require.Errorf(t, err, "expected: %v", expectError)
Expand Down
12 changes: 11 additions & 1 deletion go/test/endtoend/backup/vtctlbackup/pitr_test_framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/rand/v2"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -227,9 +228,17 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
if tc.fromFullPosition {
incrementalFromPos = replication.EncodePosition(fullBackupPos)
}
assert.Contains(t, incrementalFromPos, "MySQL56/")
}
incrementalFromPosArg := incrementalFromPos
if tc.incrementalFrom == incrementalFromPosPosition && tc.fromFullPosition {
// Verify that backup works whether or not the MySQL56/ prefix is present.
// We arbitrarily decide to strip the prefix when "tc.fromFullPosition" is true, and keep it when false.
incrementalFromPosArg = strings.Replace(incrementalFromPosArg, "MySQL56/", "", 1)
assert.NotContains(t, incrementalFromPosArg, "MySQL56/")
}
// always use same 1st replica
manifest, backupName := TestReplicaIncrementalBackup(t, 0, incrementalFromPos, tc.expectEmpty, tc.expectError)
manifest, backupName := TestReplicaIncrementalBackup(t, 0, incrementalFromPosArg, tc.expectEmpty, tc.expectError)
if tc.expectError != "" {
return
}
Expand Down Expand Up @@ -271,6 +280,7 @@ func ExecTestIncrementalBackupAndRestoreToPos(t *testing.T, tcase *PITRTestCase)
t.Run(testName, func(t *testing.T) {
restoreToPos, err := replication.DecodePosition(pos)
require.NoError(t, err)
require.False(t, restoreToPos.IsZero())
TestReplicaRestoreToPos(t, 0, restoreToPos, "")
msgs := ReadRowsFromReplica(t, 0)
count, ok := rowsPerPosition[pos]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func getTabletPosition(t *testing.T, tablet *cluster.Vttablet) replication.Posit
require.NotNil(t, row)
gtidExecuted := row.AsString("gtid_executed", "")
require.NotEmpty(t, gtidExecuted)
pos, err := replication.DecodePositionDefaultFlavor(gtidExecuted, replication.Mysql56FlavorID)
pos, _, err := replication.DecodePositionMySQL56(gtidExecuted)
assert.NoError(t, err)
return pos
}
Expand Down
13 changes: 3 additions & 10 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,18 +223,11 @@ func (be *BuiltinBackupEngine) ExecuteBackup(ctx context.Context, params BackupP

// getIncrementalFromPosGTIDSet turns the given string into a valid Mysql56GTIDSet
func getIncrementalFromPosGTIDSet(incrementalFromPos string) (replication.Mysql56GTIDSet, error) {
pos, err := replication.DecodePositionDefaultFlavor(incrementalFromPos, replication.Mysql56FlavorID)
_, gtidSet, err := replication.DecodePositionMySQL56(incrementalFromPos)
if err != nil {
return nil, vterrors.Wrapf(err, "cannot decode position in incremental backup: %v", incrementalFromPos)
}
if !pos.MatchesFlavor(replication.Mysql56FlavorID) {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "incremental backup only supports MySQL GTID positions. Got: %v", incrementalFromPos)
}
ifPosGTIDSet, ok := pos.GTIDSet.(replication.Mysql56GTIDSet)
if !ok {
return nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "cannot get MySQL GTID value: %v", pos)
}
return ifPosGTIDSet, nil
return gtidSet, nil
}

// executeIncrementalBackup runs an incremental backup, based on given 'incremental_from_pos', which can be:
Expand Down Expand Up @@ -269,7 +262,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
params.Logger.Infof("auto evaluated incremental_from_pos: %s", params.IncrementalFromPos)
}

if _, err := replication.DecodePositionDefaultFlavor(params.IncrementalFromPos, replication.Mysql56FlavorID); err != nil {
if _, _, err := replication.DecodePositionMySQL56(params.IncrementalFromPos); err != nil {
// This does not seem to be a valid position. Maybe it's a backup name?
backupName := params.IncrementalFromPos
pos, err := findBackupPosition(ctx, params, backupName)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--restore-to-pos and --restore-to-timestamp are mutually exclusive")
}
if request.RestoreToPos != "" {
pos, err := replication.DecodePosition(request.RestoreToPos)
pos, _, err := replication.DecodePositionMySQL56(request.RestoreToPos)
if err != nil {
return vterrors.Wrapf(err, "restore failed: unable to decode --restore-to-pos: %s", request.RestoreToPos)
}
Expand Down

0 comments on commit 9c59280

Please sign in to comment.