Skip to content

Commit

Permalink
Enhance support for restore and add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <[email protected]>
  • Loading branch information
frouioui committed Nov 25, 2024
1 parent 3875fd1 commit 7db26bc
Show file tree
Hide file tree
Showing 2 changed files with 334 additions and 43 deletions.
314 changes: 283 additions & 31 deletions go/vt/mysqlctl/backup_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"strconv"
Expand Down Expand Up @@ -590,34 +591,42 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) {
}
}

type writeCloseFailFirstWrite struct {
type rwCloseFailFirstCall struct {
*bytes.Buffer
firstWriteDone bool
firstDone bool
}

func (w *writeCloseFailFirstWrite) Write(p []byte) (n int, err error) {
if w.firstWriteDone {
func (w *rwCloseFailFirstCall) Write(p []byte) (n int, err error) {
if w.firstDone {
return w.Buffer.Write(p)
}
w.firstWriteDone = true
w.firstDone = true
return 0, errors.New("failing first write")
}

func (w *writeCloseFailFirstWrite) Close() error {
func (w *rwCloseFailFirstCall) Read(p []byte) (n int, err error) {
if w.firstDone {
return w.Buffer.Read(p)
}
w.firstDone = true
return 0, errors.New("failing first read")
}

func (w *rwCloseFailFirstCall) Close() error {
return nil
}

func newWriteCloseFailFirstWrite(firstWriteDone bool) *writeCloseFailFirstWrite {
return &writeCloseFailFirstWrite{
Buffer: bytes.NewBuffer(nil),
firstWriteDone: firstWriteDone,
func newWriteCloseFailFirstWrite(firstWriteDone bool) *rwCloseFailFirstCall {
return &rwCloseFailFirstCall{
Buffer: bytes.NewBuffer(nil),
firstDone: firstWriteDone,
}
}

func TestExecuteBackupFailToWriteEachFileOnlyOnce(t *testing.T) {
ctx, backupRoot, keyspace, shard, ts := setupCluster(t, 2, 2)

bufferPerFiles := make(map[string]*writeCloseFailFirstWrite)
bufferPerFiles := make(map[string]*rwCloseFailFirstCall)
be := &mysqlctl.BuiltinBackupEngine{}
bh := &mysqlctl.FakeBackupHandle{}
bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn {
Expand All @@ -626,22 +635,6 @@ func TestExecuteBackupFailToWriteEachFileOnlyOnce(t *testing.T) {
_, isRetry := bufferPerFiles[filename]
newBuffer := newWriteCloseFailFirstWrite(isRetry)
bufferPerFiles[filename] = newBuffer

// if filename == "MANIFEST" {
// return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: writerCloserBuffer}
// }
//
// count := 0
// for _, call := range bh.AddFileCalls {
// if call.Filename == filename {
// count++
// }
// }
// // if it is the first time we call AddFile for this file, let's fail the call
// if count == 1 {
// return mysqlctl.FakeBackupHandleAddFileReturn{Err: fmt.Errorf("failed to AddFile on %s", filename)}
// }
// if it is the second time we call AddFile for this file, let's make it pass
return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer}
}

Expand Down Expand Up @@ -694,10 +687,10 @@ func TestExecuteBackupFailToWriteEachFileOnlyOnce(t *testing.T) {
require.Equal(t, mysqlctl.BackupUsable, backupResult)
}

func TestExecuteBackupFailToWriteFileEachTwice(t *testing.T) {
func TestExecuteBackupFailToWriteFileTwice(t *testing.T) {
ctx, backupRoot, keyspace, shard, ts := setupCluster(t, 1, 1)

bufferPerFiles := make(map[string]*writeCloseFailFirstWrite)
bufferPerFiles := make(map[string]*rwCloseFailFirstCall)
be := &mysqlctl.BuiltinBackupEngine{}
bh := &mysqlctl.FakeBackupHandle{}
bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn {
Expand All @@ -716,6 +709,7 @@ func TestExecuteBackupFailToWriteFileEachTwice(t *testing.T) {
mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"}

logger := logutil.NewMemoryLogger()
fakeStats := backupstats.NewFakeStats()
backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{
Logger: logger,
Mysqld: mysqld,
Expand All @@ -724,7 +718,7 @@ func TestExecuteBackupFailToWriteFileEachTwice(t *testing.T) {
InnodbLogGroupHomeDir: path.Join(backupRoot, "log"),
DataDir: path.Join(backupRoot, "datadir"),
},
Stats: backupstats.NewFakeStats(),
Stats: fakeStats,
Concurrency: 1,
HookExtraEnv: map[string]string{},
TopoServer: ts,
Expand All @@ -737,13 +731,271 @@ func TestExecuteBackupFailToWriteFileEachTwice(t *testing.T) {
"Backing up file: test1/0.ibd (attempt 1/2)",
"Backing up file: test1/0.ibd (attempt 2/2)",
}

assertLogs(t, expectedLogs, logger)

ss := getStats(fakeStats)
require.Equal(t, 2, ss.destinationCloseStats)
require.Equal(t, 2, ss.destinationOpenStats)
require.Equal(t, 2, ss.destinationWriteStats)
require.Equal(t, 2, ss.sourceCloseStats)
require.Equal(t, 2, ss.sourceOpenStats)
require.Equal(t, 2, ss.sourceReadStats)

require.ErrorContains(t, err, "failing first write")
require.Equal(t, mysqlctl.BackupUnusable, backupResult)
}

func TestExecuteRestoreFailToReadEachFileOnlyOnce(t *testing.T) {
ctx, backupRoot, keyspace, shard, ts := setupCluster(t, 2, 2)

be := &mysqlctl.BuiltinBackupEngine{}
bufferPerFiles := make(map[string]*rwCloseFailFirstCall)
bh := &mysqlctl.FakeBackupHandle{}
bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn {
// let's never make it fail for now
newBuffer := newWriteCloseFailFirstWrite(true)
bufferPerFiles[filename] = newBuffer
return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer}
}

// Spin up a fake daemon to be used in backups. It needs to be allowed to receive:
// "STOP REPLICA", "START REPLICA", in that order.
fakedb := fakesqldb.New(t)
defer fakedb.Close()
mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb)
defer mysqld.Close()
mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"}

backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{
Logger: logutil.NewConsoleLogger(),
Mysqld: mysqld,
Cnf: &mysqlctl.Mycnf{
InnodbDataHomeDir: path.Join(backupRoot, "innodb"),
InnodbLogGroupHomeDir: path.Join(backupRoot, "log"),
DataDir: path.Join(backupRoot, "datadir"),
},
Stats: backupstats.NewFakeStats(),
Concurrency: 1,
HookExtraEnv: map[string]string{},
TopoServer: ts,
Keyspace: keyspace,
Shard: shard,
MysqlShutdownTimeout: mysqlShutdownTimeout,
}, bh)

require.NoError(t, err)
require.Equal(t, mysqlctl.BackupUsable, backupResult)

// let's mark each file in the buffer as if it is their first read
for key := range bufferPerFiles {
bufferPerFiles[key].firstDone = false
}

// Now try to restore the above backup.
fakeBh := &mysqlctl.FakeBackupHandle{}
fakeBh.ReadFileReturnF = func(ctx context.Context, filename string) (io.ReadCloser, error) {
return bufferPerFiles[filename], nil
}

fakedb = fakesqldb.New(t)
defer fakedb.Close()
mysqld = mysqlctl.NewFakeMysqlDaemon(fakedb)
defer mysqld.Close()
mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"}

fakeStats := backupstats.NewFakeStats()
logger := logutil.NewMemoryLogger()

restoreParams := mysqlctl.RestoreParams{
Cnf: &mysqlctl.Mycnf{
InnodbDataHomeDir: path.Join(backupRoot, "innodb"),
InnodbLogGroupHomeDir: path.Join(backupRoot, "log"),
DataDir: path.Join(backupRoot, "datadir"),
BinLogPath: path.Join(backupRoot, "binlog"),
RelayLogPath: path.Join(backupRoot, "relaylog"),
RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"),
RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"),
},
Logger: logger,
Mysqld: mysqld,
Concurrency: 1,
HookExtraEnv: map[string]string{},
DeleteBeforeRestore: false,
DbName: "test",
Keyspace: "test",
Shard: "-",
StartTime: time.Now(),
RestoreToPos: replication.Position{},
RestoreToTimestamp: time.Time{},
DryRun: false,
Stats: fakeStats,
MysqlShutdownTimeout: mysqlShutdownTimeout,
}

// Successful restore.
bm, err := be.ExecuteRestore(ctx, restoreParams, fakeBh)
assert.NoError(t, err)
assert.NotNil(t, bm)

ss := getStats(fakeStats)
require.Equal(t, 8, ss.destinationCloseStats)
require.Equal(t, 8, ss.destinationOpenStats)
require.Equal(t, 4, ss.destinationWriteStats)
require.Equal(t, 8, ss.sourceCloseStats)
require.Equal(t, 8, ss.sourceOpenStats)
require.Equal(t, 8, ss.sourceReadStats)
}

func TestExecuteRestoreFailToReadEachFileTwice(t *testing.T) {
ctx, backupRoot, keyspace, shard, ts := setupCluster(t, 2, 2)

be := &mysqlctl.BuiltinBackupEngine{}
bufferPerFiles := make(map[string]*rwCloseFailFirstCall)
bh := &mysqlctl.FakeBackupHandle{}
bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn {
// let's never make it fail for now
newBuffer := newWriteCloseFailFirstWrite(true)
bufferPerFiles[filename] = newBuffer
return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer}
}

// Spin up a fake daemon to be used in backups. It needs to be allowed to receive:
// "STOP REPLICA", "START REPLICA", in that order.
fakedb := fakesqldb.New(t)
defer fakedb.Close()
mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb)
defer mysqld.Close()
mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"}

backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{
Logger: logutil.NewConsoleLogger(),
Mysqld: mysqld,
Cnf: &mysqlctl.Mycnf{
InnodbDataHomeDir: path.Join(backupRoot, "innodb"),
InnodbLogGroupHomeDir: path.Join(backupRoot, "log"),
DataDir: path.Join(backupRoot, "datadir"),
},
Stats: backupstats.NewFakeStats(),
Concurrency: 1,
HookExtraEnv: map[string]string{},
TopoServer: ts,
Keyspace: keyspace,
Shard: shard,
MysqlShutdownTimeout: mysqlShutdownTimeout,
}, bh)

require.NoError(t, err)
require.Equal(t, mysqlctl.BackupUsable, backupResult)

// Now try to restore the above backup.
fakeBh := &mysqlctl.FakeBackupHandle{}
fakeBh.ReadFileReturnF = func(ctx context.Context, filename string) (io.ReadCloser, error) {
// always make it fail, expect if it is the MANIFEST file, otherwise we won't start restoring the other files
buffer := bufferPerFiles[filename]
if filename != "MANIFEST" {
buffer.firstDone = false
}
return buffer, nil
}

fakedb = fakesqldb.New(t)
defer fakedb.Close()
mysqld = mysqlctl.NewFakeMysqlDaemon(fakedb)
defer mysqld.Close()
mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"}

fakeStats := backupstats.NewFakeStats()
logger := logutil.NewMemoryLogger()

restoreParams := mysqlctl.RestoreParams{
Cnf: &mysqlctl.Mycnf{
InnodbDataHomeDir: path.Join(backupRoot, "innodb"),
InnodbLogGroupHomeDir: path.Join(backupRoot, "log"),
DataDir: path.Join(backupRoot, "datadir"),
BinLogPath: path.Join(backupRoot, "binlog"),
RelayLogPath: path.Join(backupRoot, "relaylog"),
RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"),
RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"),
},
Logger: logger,
Mysqld: mysqld,
Concurrency: 1,
HookExtraEnv: map[string]string{},
DeleteBeforeRestore: false,
DbName: "test",
Keyspace: "test",
Shard: "-",
StartTime: time.Now(),
RestoreToPos: replication.Position{},
RestoreToTimestamp: time.Time{},
DryRun: false,
Stats: fakeStats,
MysqlShutdownTimeout: mysqlShutdownTimeout,
}

// Successful restore.
bm, err := be.ExecuteRestore(ctx, restoreParams, fakeBh)
assert.ErrorContains(t, err, "failing first read")
assert.Nil(t, bm)

expectedLogs := []string{
"Failed restoring \"test2/1.ibd\" (attempt 1/2)",
"Failed restoring \"test2/1.ibd\" (attempt 2/2)",
}
assertLogs(t, expectedLogs, logger)

ss := getStats(fakeStats)
require.Equal(t, 2, ss.destinationCloseStats)
require.Equal(t, 2, ss.destinationOpenStats)
require.Equal(t, 0, ss.destinationWriteStats)
require.Equal(t, 2, ss.sourceCloseStats)
require.Equal(t, 2, ss.sourceOpenStats)
require.Equal(t, 2, ss.sourceReadStats)
}

type statSummary struct {
destinationCloseStats int
destinationOpenStats int
destinationWriteStats int
sourceCloseStats int
sourceOpenStats int
sourceReadStats int
}

func getStats(stats *backupstats.FakeStats) statSummary {
var ss statSummary

for _, sr := range stats.ScopeReturns {
switch sr.ScopeV[backupstats.ScopeOperation] {
case "Destination:Close":
if len(sr.TimedIncrementCalls) > 0 {
ss.destinationCloseStats++
}
case "Destination:Open":
if len(sr.TimedIncrementCalls) > 0 {
ss.destinationOpenStats++
}
case "Destination:Write":
if len(sr.TimedIncrementBytesCalls) > 0 {
ss.destinationWriteStats++
}
case "Source:Close":
if len(sr.TimedIncrementCalls) > 0 {
ss.sourceCloseStats++
}
case "Source:Open":
if len(sr.TimedIncrementCalls) > 0 {
ss.sourceOpenStats++
}
case "Source:Read":
if len(sr.TimedIncrementBytesCalls) > 0 {
ss.sourceReadStats++
}
}
}
return ss
}

func assertLogs(t *testing.T, expectedLogs []string, logger *logutil.MemoryLogger) {
for _, log := range expectedLogs {
var found bool
Expand Down
Loading

0 comments on commit 7db26bc

Please sign in to comment.