diff --git a/.github/workflows/cluster_endtoend_21.yml b/.github/workflows/cluster_endtoend_21.yml index 5d703099a3d..e6bccee0004 100644 --- a/.github/workflows/cluster_endtoend_21.yml +++ b/.github/workflows/cluster_endtoend_21.yml @@ -121,6 +121,13 @@ jobs: # install JUnit report formatter go install github.com/vitessio/go-junit-report@HEAD + - name: Install Minio + if: steps.skip-workflow.outputs.skip-workflow == 'false' + run: | + wget https://dl.min.io/server/minio/release/linux-amd64/minio + chmod +x minio + mv minio /usr/local/bin + - name: Setup launchable dependencies if: steps.skip-workflow.outputs.is_draft == 'false' && steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true' && github.base_ref == 'main' run: | diff --git a/go/test/endtoend/backup/s3/s3_builtin_test.go b/go/test/endtoend/backup/s3/s3_builtin_test.go new file mode 100644 index 00000000000..9c83e5f8fec --- /dev/null +++ b/go/test/endtoend/backup/s3/s3_builtin_test.go @@ -0,0 +1,434 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package s3 + +import ( + "context" + "io" + "os" + "os/exec" + "path" + "strconv" + "strings" + "testing" + "time" + + "log" + + "github.com/minio/minio-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/mysqlctl/backupstats" + "vitess.io/vitess/go/vt/mysqlctl/blackbox" + "vitess.io/vitess/go/vt/mysqlctl/s3backupstorage" +) + +/* + These tests use Minio to emulate AWS S3. It allows us to run the tests on + GitHub Actions without having the security burden of carrying out AWS secrets + in our GitHub repo. + + Minio is almost a drop-in replacement for AWS S3, if you want to run these + tests against a true AWS S3 Bucket, you can do so by not running the TestMain + and setting the 'AWS_*' environment variables to your own values. + + This package and file are named 'endtoend', but it's more an integration test. + However, we don't want our CI infra to mistake this for a regular unit-test, + hence the rename to 'endtoend'. +*/ + +func TestMain(m *testing.M) { + f := func() int { + minioPath, err := exec.LookPath("minio") + if err != nil { + log.Fatalf("minio binary not found: %v", err) + } + + dataDir, err := os.MkdirTemp("", "") + if err != nil { + log.Fatalf("could not create temporary directory: %v", err) + } + err = os.MkdirAll(dataDir, 0755) + if err != nil { + log.Fatalf("failed to create MinIO data directory: %v", err) + } + + cmd := exec.Command(minioPath, "server", dataDir, "--console-address", ":9001") + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + err = cmd.Start() + if err != nil { + log.Fatalf("failed to start MinIO: %v", err) + } + defer func() { + cmd.Process.Kill() + }() + + // Local MinIO credentials + accessKey := "minioadmin" + secretKey := "minioadmin" + minioEndpoint := "http://localhost:9000" + bucketName := "test-bucket" + region := "us-east-1" + + client, err := minio.New("localhost:9000", accessKey, secretKey, false) + if err != nil { + log.Fatalf("failed to create MinIO client: %v", err) + } + waitForMinio(client) + + err = client.MakeBucket(bucketName, region) + if err != nil { + log.Fatalf("failed to create test bucket: %v", err) + } + + // Same env variables that are used between AWS S3 and Minio + os.Setenv("AWS_ACCESS_KEY_ID", accessKey) + os.Setenv("AWS_SECRET_ACCESS_KEY", secretKey) + os.Setenv("AWS_BUCKET", bucketName) + os.Setenv("AWS_ENDPOINT", minioEndpoint) + os.Setenv("AWS_REGION", region) + + return m.Run() + } + + os.Exit(f()) +} + +func waitForMinio(client *minio.Client) { + for i := 0; i < 60; i++ { + _, err := client.ListBuckets() + if err == nil { + return + } + time.Sleep(1 * time.Second) + } + log.Fatalf("MinIO server did not become ready in time") +} + +func checkEnvForS3(t *testing.T) { + // We never want to skip the tests if we are running on CI. + // We will always run these tests on CI with the TestMain and Minio. + // There should not be a need to skip the tests due to missing ENV vars. + if os.Getenv("GITHUB_ACTIONS") != "" { + return + } + + envRequired := []string{ + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_BUCKET", + "AWS_ENDPOINT", + "AWS_REGION", + } + + var missing []string + for _, s := range envRequired { + if os.Getenv(s) == "" { + missing = append(missing, s) + } + } + if len(missing) > 0 { + t.Skipf("missing AWS secrets to run this test: please set: %s", strings.Join(missing, ", ")) + } +} + +type backupTestConfig struct { + concurrency int + addFileReturnFn func(s3 *s3backupstorage.S3BackupHandle, ctx context.Context, filename string, filesize int64, firstAdd bool) (io.WriteCloser, error) + checkCleanupError bool + expectedResult mysqlctl.BackupResult + expectedStats blackbox.StatSummary +} + +func runBackupTest(t *testing.T, cfg backupTestConfig) { + checkEnvForS3(t) + s3backupstorage.InitFlag(s3backupstorage.FakeConfig{ + Region: os.Getenv("AWS_REGION"), + Endpoint: os.Getenv("AWS_ENDPOINT"), + Bucket: os.Getenv("AWS_BUCKET"), + ForcePath: true, + }) + + ctx := context.Background() + backupRoot, keyspace, shard, ts := blackbox.SetupCluster(ctx, t, 2, 2) + + be := &mysqlctl.BuiltinBackupEngine{} + + // Configure a tight deadline to force a timeout + oldDeadline := blackbox.SetBuiltinBackupMysqldDeadline(time.Second) + defer blackbox.SetBuiltinBackupMysqldDeadline(oldDeadline) + + fakeStats := backupstats.NewFakeStats() + logger := logutil.NewMemoryLogger() + + bh, err := s3backupstorage.NewFakeS3BackupHandle(ctx, t.Name(), time.Now().Format(mysqlctl.BackupTimestampFormat), logger, fakeStats) + require.NoError(t, err) + t.Cleanup(func() { + err := bh.AbortBackup(ctx) + if cfg.checkCleanupError { + require.NoError(t, err) + } + }) + bh.AddFileReturnF = cfg.addFileReturnFn + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP REPLICA", "START REPLICA", in that order. + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logger, + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Concurrency: cfg.concurrency, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + Stats: fakeStats, + MysqlShutdownTimeout: blackbox.MysqlShutdownTimeout, + }, bh) + + require.Equal(t, cfg.expectedResult, backupResult) + switch cfg.expectedResult { + case mysqlctl.BackupUsable: + require.NoError(t, err) + case mysqlctl.BackupUnusable, mysqlctl.BackupEmpty: + require.Error(t, err) + } + + ss := blackbox.GetStats(fakeStats) + require.Equal(t, cfg.expectedStats.DestinationCloseStats, ss.DestinationCloseStats) + require.Equal(t, cfg.expectedStats.DestinationOpenStats, ss.DestinationOpenStats) + require.Equal(t, cfg.expectedStats.DestinationWriteStats, ss.DestinationWriteStats) + require.Equal(t, cfg.expectedStats.SourceCloseStats, ss.SourceCloseStats) + require.Equal(t, cfg.expectedStats.SourceOpenStats, ss.SourceOpenStats) + require.Equal(t, cfg.expectedStats.SourceReadStats, ss.SourceReadStats) +} + +func TestExecuteBackupS3FailEachFileOnce(t *testing.T) { + runBackupTest(t, backupTestConfig{ + concurrency: 2, + + // Modify the fake S3 storage to always fail when trying to write a file for the first time + addFileReturnFn: s3backupstorage.FailFirstWrite, + checkCleanupError: true, + expectedResult: mysqlctl.BackupUsable, + + // Even though we have 4 files, we expect '8' for all the values below as we re-do every file once. + expectedStats: blackbox.StatSummary{ + DestinationCloseStats: 8, + DestinationOpenStats: 8, + DestinationWriteStats: 8, + SourceCloseStats: 8, + SourceOpenStats: 8, + SourceReadStats: 8, + }, + }) +} + +func TestExecuteBackupS3FailEachFileTwice(t *testing.T) { + runBackupTest(t, backupTestConfig{ + concurrency: 1, + + // Modify the fake S3 storage to always fail when trying to write a file for the first time + addFileReturnFn: s3backupstorage.FailAllWrites, + + // If the code works as expected by this test, no files will be created on S3 and AbortBackup will + // fail, for this reason, let's not check the error return. + // We still call AbortBackup anyway in the event that the code is not behaving as expected and some + // files were created by mistakes, we delete them. + checkCleanupError: false, + expectedResult: mysqlctl.BackupUnusable, + + // All stats here must be equal to 5, we have four files, we go each of them, they all fail. + // The logic decides to retry each file once, we retry the first failed file, it fails again + // but since it has reached the limit of retries, the backup will fail anyway, thus we don't + // retry the other 3 files. + expectedStats: blackbox.StatSummary{ + DestinationCloseStats: 5, + DestinationOpenStats: 5, + DestinationWriteStats: 5, + SourceCloseStats: 5, + SourceOpenStats: 5, + SourceReadStats: 5, + }, + }) +} + +type restoreTestConfig struct { + readFileReturnFn func(s3 *s3backupstorage.S3BackupHandle, ctx context.Context, filename string, firstRead bool) (io.ReadCloser, error) + expectSuccess bool + expectedStats blackbox.StatSummary +} + +func runRestoreTest(t *testing.T, cfg restoreTestConfig) { + checkEnvForS3(t) + s3backupstorage.InitFlag(s3backupstorage.FakeConfig{ + Region: os.Getenv("AWS_REGION"), + Endpoint: os.Getenv("AWS_ENDPOINT"), + Bucket: os.Getenv("AWS_BUCKET"), + ForcePath: true, + }) + + ctx := context.Background() + backupRoot, keyspace, shard, ts := blackbox.SetupCluster(ctx, t, 2, 2) + + fakeStats := backupstats.NewFakeStats() + logger := logutil.NewMemoryLogger() + + be := &mysqlctl.BuiltinBackupEngine{} + dirName := time.Now().Format(mysqlctl.BackupTimestampFormat) + name := t.Name() + "-" + strconv.Itoa(int(time.Now().Unix())) + bh, err := s3backupstorage.NewFakeS3BackupHandle(ctx, name, dirName, logger, fakeStats) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, bh.AbortBackup(ctx)) + }) + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP REPLICA", "START REPLICA", in that order. + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 1, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + MysqlShutdownTimeout: blackbox.MysqlShutdownTimeout, + }, bh) + + require.NoError(t, err) + require.Equal(t, mysqlctl.BackupUsable, backupResult) + + // Backup is done, let's move on to the restore now + + restoreBh, err := s3backupstorage.NewFakeS3RestoreHandle(ctx, name, logger, fakeStats) + require.NoError(t, err) + restoreBh.ReadFileReturnF = cfg.readFileReturnFn + + fakedb = fakesqldb.New(t) + defer fakedb.Close() + mysqld = mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + restoreParams := mysqlctl.RestoreParams{ + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + BinLogPath: path.Join(backupRoot, "binlog"), + RelayLogPath: path.Join(backupRoot, "relaylog"), + RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), + RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + }, + Logger: logger, + Mysqld: mysqld, + Concurrency: 1, + HookExtraEnv: map[string]string{}, + DeleteBeforeRestore: false, + DbName: "test", + Keyspace: "test", + Shard: "-", + StartTime: time.Now(), + RestoreToPos: replication.Position{}, + RestoreToTimestamp: time.Time{}, + DryRun: false, + Stats: fakeStats, + MysqlShutdownTimeout: blackbox.MysqlShutdownTimeout, + } + + // Successful restore. + bm, err := be.ExecuteRestore(ctx, restoreParams, restoreBh) + + if cfg.expectSuccess { + assert.NoError(t, err) + assert.NotNil(t, bm) + } else { + assert.Error(t, err) + } + + ss := blackbox.GetStats(fakeStats) + require.Equal(t, cfg.expectedStats.DestinationCloseStats, ss.DestinationCloseStats) + require.Equal(t, cfg.expectedStats.DestinationOpenStats, ss.DestinationOpenStats) + require.Equal(t, cfg.expectedStats.DestinationWriteStats, ss.DestinationWriteStats) + require.Equal(t, cfg.expectedStats.SourceCloseStats, ss.SourceCloseStats) + require.Equal(t, cfg.expectedStats.SourceOpenStats, ss.SourceOpenStats) + require.Equal(t, cfg.expectedStats.SourceReadStats, ss.SourceReadStats) +} + +func TestExecuteRestoreS3FailEachFileOnce(t *testing.T) { + runRestoreTest(t, restoreTestConfig{ + readFileReturnFn: s3backupstorage.FailFirstRead, + expectSuccess: true, + expectedStats: blackbox.StatSummary{ + DestinationCloseStats: 8, + DestinationOpenStats: 8, + DestinationWriteStats: 4, // 4, because on the first attempt, we fail to read before writing to the filesystem + SourceCloseStats: 8, + SourceOpenStats: 8, + SourceReadStats: 8, + }, + }) +} + +func TestExecuteRestoreS3FailEachFileTwice(t *testing.T) { + runRestoreTest(t, restoreTestConfig{ + readFileReturnFn: s3backupstorage.FailAllReadExpectManifest, + expectSuccess: false, + + // Everything except destination writes must be equal to 5: + // +1 for every file on the first attempt (= 4), and +1 for the first file we try for the second time. + // Since we fail early as soon as a second-attempt-file fails, we won't see a value above 5. + expectedStats: blackbox.StatSummary{ + DestinationCloseStats: 5, + DestinationOpenStats: 5, + DestinationWriteStats: 0, // 0, because on the both attempts, we fail to read before writing to the filesystem + SourceCloseStats: 5, + SourceOpenStats: 5, + SourceReadStats: 5, + }, + }) +} diff --git a/go/vt/mysqlctl/azblobbackupstorage/azblob.go b/go/vt/mysqlctl/azblobbackupstorage/azblob.go index 3ba6b187a2f..dbd146495e8 100644 --- a/go/vt/mysqlctl/azblobbackupstorage/azblob.go +++ b/go/vt/mysqlctl/azblobbackupstorage/azblob.go @@ -32,8 +32,9 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/spf13/pflag" + "vitess.io/vitess/go/vt/mysqlctl/errors" + "vitess.io/vitess/go/viperutil" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/servenv" @@ -203,9 +204,9 @@ type AZBlobBackupHandle struct { name string readOnly bool waitGroup sync.WaitGroup - errors concurrency.AllErrorRecorder ctx context.Context cancel context.CancelFunc + errors.PerFileErrorRecorder } // Directory implements BackupHandle. @@ -218,21 +219,6 @@ func (bh *AZBlobBackupHandle) Name() string { return bh.name } -// RecordError is part of the concurrency.ErrorRecorder interface. -func (bh *AZBlobBackupHandle) RecordError(err error) { - bh.errors.RecordError(err) -} - -// HasErrors is part of the concurrency.ErrorRecorder interface. -func (bh *AZBlobBackupHandle) HasErrors() bool { - return bh.errors.HasErrors() -} - -// Error is part of the concurrency.ErrorRecorder interface. -func (bh *AZBlobBackupHandle) Error() error { - return bh.errors.Error() -} - // AddFile implements BackupHandle. func (bh *AZBlobBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { @@ -263,7 +249,7 @@ func (bh *AZBlobBackupHandle) AddFile(ctx context.Context, filename string, file }) if err != nil { reader.CloseWithError(err) - bh.RecordError(err) + bh.RecordError(filename, err) } }() diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go index fb3d0e2d125..eeb14039d01 100644 --- a/go/vt/mysqlctl/backupengine.go +++ b/go/vt/mysqlctl/backupengine.go @@ -272,7 +272,7 @@ func getBackupManifestInto(ctx context.Context, backup backupstorage.BackupHandl if err := json.NewDecoder(file).Decode(outManifest); err != nil { return vterrors.Wrap(err, "can't decode MANIFEST") } - return nil + return backup.Error() } // IncrementalBackupDetails lists some incremental backup specific information diff --git a/go/vt/mysqlctl/backupstorage/interface.go b/go/vt/mysqlctl/backupstorage/interface.go index 92bc71d63aa..4fd37b3163a 100644 --- a/go/vt/mysqlctl/backupstorage/interface.go +++ b/go/vt/mysqlctl/backupstorage/interface.go @@ -25,7 +25,8 @@ import ( "github.com/spf13/pflag" - "vitess.io/vitess/go/vt/concurrency" + "vitess.io/vitess/go/vt/mysqlctl/errors" + "vitess.io/vitess/go/vt/servenv" ) @@ -89,9 +90,9 @@ type BackupHandle interface { // ReadCloser is closed. ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) - // concurrency.ErrorRecorder is embedded here to coordinate reporting and - // handling of errors among all the components involved in taking a backup. - concurrency.ErrorRecorder + // BackupErrorRecorder is embedded here to coordinate reporting and + // handling of errors among all the components involved in taking/restoring a backup. + errors.BackupErrorRecorder } // BackupStorage is the interface to the storage system diff --git a/go/vt/mysqlctl/backup_blackbox_race_test.go b/go/vt/mysqlctl/blackbox/backup_race_test.go similarity index 97% rename from go/vt/mysqlctl/backup_blackbox_race_test.go rename to go/vt/mysqlctl/blackbox/backup_race_test.go index 5414ebc5fa6..fd39dfe4b06 100644 --- a/go/vt/mysqlctl/backup_blackbox_race_test.go +++ b/go/vt/mysqlctl/blackbox/backup_race_test.go @@ -16,8 +16,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package mysqlctl_test is the blackbox tests for package mysqlctl. -package mysqlctl_test +// Package blackbox is the blackbox tests for package mysqlctl. +package blackbox import ( "fmt" @@ -75,7 +75,7 @@ func TestExecuteBackupWithFailureOnLastFile(t *testing.T) { require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) defer os.RemoveAll(backupRoot) - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { fpath := path.Join("log", mysql.DynamicRedoLogSubdir) @@ -144,7 +144,7 @@ func TestExecuteBackupWithFailureOnLastFile(t *testing.T) { TopoServer: ts, Keyspace: keyspace, Shard: shard, - MysqlShutdownTimeout: mysqlShutdownTimeout, + MysqlShutdownTimeout: MysqlShutdownTimeout, }, bh) require.ErrorContains(t, err, "cannot add file: 3") diff --git a/go/vt/mysqlctl/backup_blackbox_test.go b/go/vt/mysqlctl/blackbox/backup_test.go similarity index 57% rename from go/vt/mysqlctl/backup_blackbox_test.go rename to go/vt/mysqlctl/blackbox/backup_test.go index 15244fb8782..b7e35304904 100644 --- a/go/vt/mysqlctl/backup_blackbox_test.go +++ b/go/vt/mysqlctl/blackbox/backup_test.go @@ -14,12 +14,15 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package mysqlctl_test is the blackbox tests for package mysqlctl. -package mysqlctl_test +// Package blackbox is the blackbox tests for package mysqlctl. +package blackbox import ( + "bytes" "context" + "errors" "fmt" + "io" "os" "path" "strings" @@ -31,7 +34,6 @@ import ( "vitess.io/vitess/go/test/utils" - "vitess.io/vitess/go/mysql/capabilities" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" @@ -48,40 +50,6 @@ import ( "vitess.io/vitess/go/vt/topo/memorytopo" ) -const mysqlShutdownTimeout = 1 * time.Minute - -func setBuiltinBackupMysqldDeadline(t time.Duration) time.Duration { - old := mysqlctl.BuiltinBackupMysqldTimeout - mysqlctl.BuiltinBackupMysqldTimeout = t - - return old -} - -func createBackupDir(root string, dirs ...string) error { - for _, dir := range dirs { - if err := os.MkdirAll(path.Join(root, dir), 0755); err != nil { - return err - } - } - - return nil -} - -func createBackupFiles(root string, fileCount int, ext string) error { - for i := 0; i < fileCount; i++ { - f, err := os.Create(path.Join(root, fmt.Sprintf("%d.%s", i, ext))) - if err != nil { - return err - } - if _, err := f.Write([]byte("hello, world!")); err != nil { - return err - } - defer f.Close() - } - - return nil -} - func TestExecuteBackup(t *testing.T) { ctx := utils.LeakCheckContext(t) @@ -97,7 +65,7 @@ func TestExecuteBackup(t *testing.T) { require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) defer os.RemoveAll(backupRoot) - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { fpath := path.Join("log", mysql.DynamicRedoLogSubdir) @@ -133,8 +101,8 @@ func TestExecuteBackup(t *testing.T) { be := &mysqlctl.BuiltinBackupEngine{} // Configure a tight deadline to force a timeout - oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) - defer setBuiltinBackupMysqldDeadline(oldDeadline) + oldDeadline := SetBuiltinBackupMysqldDeadline(time.Second) + defer SetBuiltinBackupMysqldDeadline(oldDeadline) bh := filebackupstorage.NewBackupHandle(nil, "", "", false) @@ -163,7 +131,7 @@ func TestExecuteBackup(t *testing.T) { Keyspace: keyspace, Shard: shard, Stats: fakeStats, - MysqlShutdownTimeout: mysqlShutdownTimeout, + MysqlShutdownTimeout: MysqlShutdownTimeout, }, bh) require.NoError(t, err) @@ -221,7 +189,7 @@ func TestExecuteBackup(t *testing.T) { TopoServer: ts, Keyspace: keyspace, Shard: shard, - MysqlShutdownTimeout: mysqlShutdownTimeout, + MysqlShutdownTimeout: MysqlShutdownTimeout, }, bh) assert.Error(t, err) @@ -243,7 +211,7 @@ func TestExecuteBackupWithSafeUpgrade(t *testing.T) { require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) defer os.RemoveAll(backupRoot) - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { fpath := path.Join("log", mysql.DynamicRedoLogSubdir) @@ -279,8 +247,8 @@ func TestExecuteBackupWithSafeUpgrade(t *testing.T) { be := &mysqlctl.BuiltinBackupEngine{} // Configure a tight deadline to force a timeout - oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) - defer setBuiltinBackupMysqldDeadline(oldDeadline) + oldDeadline := SetBuiltinBackupMysqldDeadline(time.Second) + defer SetBuiltinBackupMysqldDeadline(oldDeadline) bh := filebackupstorage.NewBackupHandle(nil, "", "", false) @@ -310,7 +278,7 @@ func TestExecuteBackupWithSafeUpgrade(t *testing.T) { Shard: shard, Stats: backupstats.NewFakeStats(), UpgradeSafe: true, - MysqlShutdownTimeout: mysqlShutdownTimeout, + MysqlShutdownTimeout: MysqlShutdownTimeout, }, bh) require.NoError(t, err) @@ -336,7 +304,7 @@ func TestExecuteBackupWithCanceledContext(t *testing.T) { require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) defer os.RemoveAll(backupRoot) - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { fpath := path.Join("log", mysql.DynamicRedoLogSubdir) @@ -397,12 +365,12 @@ func TestExecuteBackupWithCanceledContext(t *testing.T) { TopoServer: ts, Keyspace: keyspace, Shard: shard, - MysqlShutdownTimeout: mysqlShutdownTimeout, + MysqlShutdownTimeout: MysqlShutdownTimeout, }, bh) require.Error(t, err) // all four files will fail - require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled") + require.ErrorContains(t, err, "context canceled; context canceled; context canceled; context canceled") assert.Equal(t, mysqlctl.BackupUnusable, backupResult) } @@ -425,7 +393,7 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) { require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) defer os.RemoveAll(backupRoot) - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { fpath := path.Join("log", mysql.DynamicRedoLogSubdir) @@ -482,7 +450,7 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) { TopoServer: ts, Keyspace: keyspace, Shard: shard, - MysqlShutdownTimeout: mysqlShutdownTimeout, + MysqlShutdownTimeout: MysqlShutdownTimeout, }, bh) require.NoError(t, err) @@ -521,7 +489,7 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) { RestoreToTimestamp: time.Time{}, DryRun: false, Stats: fakeStats, - MysqlShutdownTimeout: mysqlShutdownTimeout, + MysqlShutdownTimeout: MysqlShutdownTimeout, } // Successful restore. @@ -587,24 +555,374 @@ func TestExecuteRestoreWithTimedOutContext(t *testing.T) { } } -// needInnoDBRedoLogSubdir indicates whether we need to create a redo log subdirectory. -// Starting with MySQL 8.0.30, the InnoDB redo logs are stored in a subdirectory of the -// (/. by default) called "#innodb_redo". See: -// -// https://dev.mysql.com/doc/refman/8.0/en/innodb-redo-log.html#innodb-modifying-redo-log-capacity -func needInnoDBRedoLogSubdir() (needIt bool, err error) { - mysqldVersionStr, err := mysqlctl.GetVersionString() - if err != nil { - return needIt, err +type rwCloseFailFirstCall struct { + *bytes.Buffer + firstDone bool +} + +func (w *rwCloseFailFirstCall) Write(p []byte) (n int, err error) { + if w.firstDone { + return w.Buffer.Write(p) } - _, sv, err := mysqlctl.ParseVersionString(mysqldVersionStr) - if err != nil { - return needIt, err + w.firstDone = true + return 0, errors.New("failing first write") +} + +func (w *rwCloseFailFirstCall) Read(p []byte) (n int, err error) { + if w.firstDone { + return w.Buffer.Read(p) } - versionStr := fmt.Sprintf("%d.%d.%d", sv.Major, sv.Minor, sv.Patch) - capableOf := mysql.ServerVersionCapableOf(versionStr) - if capableOf == nil { - return needIt, fmt.Errorf("cannot determine database flavor details for version %s", versionStr) + w.firstDone = true + return 0, errors.New("failing first read") +} + +func (w *rwCloseFailFirstCall) Close() error { + return nil +} + +func newWriteCloseFailFirstWrite(firstWriteDone bool) *rwCloseFailFirstCall { + return &rwCloseFailFirstCall{ + Buffer: bytes.NewBuffer(nil), + firstDone: firstWriteDone, } - return capableOf(capabilities.DynamicRedoLogCapacityFlavorCapability) +} + +func TestExecuteBackupFailToWriteEachFileOnlyOnce(t *testing.T) { + ctx := utils.LeakCheckContext(t) + backupRoot, keyspace, shard, ts := SetupCluster(ctx, t, 2, 2) + + bufferPerFiles := make(map[string]*rwCloseFailFirstCall) + be := &mysqlctl.BuiltinBackupEngine{} + bh := &mysqlctl.FakeBackupHandle{} + bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn { + // This mimics what happens with the other BackupHandles where doing AddFile will either truncate or override + // any existing data if the same filename already exists. + _, isRetry := bufferPerFiles[filename] + newBuffer := newWriteCloseFailFirstWrite(isRetry) + bufferPerFiles[filename] = newBuffer + return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer} + } + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP REPLICA", "START REPLICA", in that order. + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + logger := logutil.NewMemoryLogger() + ctx, cancel := context.WithCancel(ctx) + backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logger, + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 1, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + MysqlShutdownTimeout: MysqlShutdownTimeout, + }, bh) + cancel() + + expectedLogs := []string{ + "Backing up file: test1/0.ibd (attempt 1/2)", + "Backing up file: test1/1.ibd (attempt 1/2)", + "Backing up file: test2/0.ibd (attempt 1/2)", + "Backing up file: test2/1.ibd (attempt 1/2)", + + "Backing up file: test1/0.ibd (attempt 2/2)", + "Backing up file: test1/1.ibd (attempt 2/2)", + "Backing up file: test2/0.ibd (attempt 2/2)", + "Backing up file: test2/1.ibd (attempt 2/2)", + + "Backing up file MANIFEST (attempt 1/2)", + "Failed backing up MANIFEST (attempt 1/2)", + "Backing up file MANIFEST (attempt 2/2)", + "Completed backing up MANIFEST (attempt 2/2)", + } + + // Sleep just long enough for everything to complete. + // It's not flaky, the race detector detects a race when there isn't, + // the machine is just too slow to propagate the ctxCancel() to all goroutines. + time.Sleep(2 * time.Second) + AssertLogs(t, expectedLogs, logger) + + require.NoError(t, err) + require.Equal(t, mysqlctl.BackupUsable, backupResult) +} + +func TestExecuteBackupFailToWriteFileTwice(t *testing.T) { + ctx := utils.LeakCheckContext(t) + backupRoot, keyspace, shard, ts := SetupCluster(ctx, t, 1, 1) + + bufferPerFiles := make(map[string]*rwCloseFailFirstCall) + be := &mysqlctl.BuiltinBackupEngine{} + bh := &mysqlctl.FakeBackupHandle{} + bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn { + newBuffer := newWriteCloseFailFirstWrite(false) + bufferPerFiles[filename] = newBuffer + + return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer} + } + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP REPLICA", "START REPLICA", in that order. + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + logger := logutil.NewMemoryLogger() + fakeStats := backupstats.NewFakeStats() + ctx, cancel := context.WithCancel(ctx) + backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logger, + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: fakeStats, + Concurrency: 1, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + MysqlShutdownTimeout: MysqlShutdownTimeout, + }, bh) + cancel() + + // Sleep just long enough for everything to complete. + // It's not flaky, the race detector detects a race when there isn't, + // the machine is just too slow to propagate the ctxCancel() to all goroutines. + time.Sleep(2 * time.Second) + + expectedLogs := []string{ + "Backing up file: test1/0.ibd (attempt 1/2)", + "Backing up file: test1/0.ibd (attempt 2/2)", + } + AssertLogs(t, expectedLogs, logger) + + ss := GetStats(fakeStats) + require.Equal(t, 2, ss.DestinationCloseStats) + require.Equal(t, 2, ss.DestinationOpenStats) + require.Equal(t, 2, ss.DestinationWriteStats) + require.Equal(t, 2, ss.SourceCloseStats) + require.Equal(t, 2, ss.SourceOpenStats) + require.Equal(t, 2, ss.SourceReadStats) + require.ErrorContains(t, err, "failing first write") + require.Equal(t, mysqlctl.BackupUnusable, backupResult) +} + +func TestExecuteRestoreFailToReadEachFileOnlyOnce(t *testing.T) { + ctx := utils.LeakCheckContext(t) + backupRoot, keyspace, shard, ts := SetupCluster(ctx, t, 2, 2) + + be := &mysqlctl.BuiltinBackupEngine{} + bufferPerFiles := make(map[string]*rwCloseFailFirstCall) + bh := &mysqlctl.FakeBackupHandle{} + bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn { + // let's never make it fail for now + newBuffer := newWriteCloseFailFirstWrite(true) + bufferPerFiles[filename] = newBuffer + return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer} + } + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP REPLICA", "START REPLICA", in that order. + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 1, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + MysqlShutdownTimeout: MysqlShutdownTimeout, + }, bh) + + require.NoError(t, err) + require.Equal(t, mysqlctl.BackupUsable, backupResult) + + // let's mark each file in the buffer as if it is their first read + for key := range bufferPerFiles { + bufferPerFiles[key].firstDone = false + } + + // Now try to restore the above backup. + fakeBh := &mysqlctl.FakeBackupHandle{} + fakeBh.ReadFileReturnF = func(ctx context.Context, filename string) (io.ReadCloser, error) { + return bufferPerFiles[filename], nil + } + + fakedb = fakesqldb.New(t) + defer fakedb.Close() + mysqld = mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + fakeStats := backupstats.NewFakeStats() + logger := logutil.NewMemoryLogger() + + restoreParams := mysqlctl.RestoreParams{ + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + BinLogPath: path.Join(backupRoot, "binlog"), + RelayLogPath: path.Join(backupRoot, "relaylog"), + RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), + RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + }, + Logger: logger, + Mysqld: mysqld, + Concurrency: 1, + HookExtraEnv: map[string]string{}, + DeleteBeforeRestore: false, + DbName: "test", + Keyspace: "test", + Shard: "-", + StartTime: time.Now(), + RestoreToPos: replication.Position{}, + RestoreToTimestamp: time.Time{}, + DryRun: false, + Stats: fakeStats, + MysqlShutdownTimeout: MysqlShutdownTimeout, + } + + // Successful restore. + bm, err := be.ExecuteRestore(ctx, restoreParams, fakeBh) + assert.NoError(t, err) + assert.NotNil(t, bm) + + ss := GetStats(fakeStats) + require.Equal(t, 8, ss.DestinationCloseStats) + require.Equal(t, 8, ss.DestinationOpenStats) + require.Equal(t, 4, ss.DestinationWriteStats) + require.Equal(t, 8, ss.SourceCloseStats) + require.Equal(t, 8, ss.SourceOpenStats) + require.Equal(t, 8, ss.SourceReadStats) +} + +func TestExecuteRestoreFailToReadEachFileTwice(t *testing.T) { + ctx := utils.LeakCheckContext(t) + backupRoot, keyspace, shard, ts := SetupCluster(ctx, t, 2, 2) + + be := &mysqlctl.BuiltinBackupEngine{} + bufferPerFiles := make(map[string]*rwCloseFailFirstCall) + bh := &mysqlctl.FakeBackupHandle{} + bh.AddFileReturnF = func(filename string) mysqlctl.FakeBackupHandleAddFileReturn { + // let's never make it fail for now + newBuffer := newWriteCloseFailFirstWrite(true) + bufferPerFiles[filename] = newBuffer + return mysqlctl.FakeBackupHandleAddFileReturn{WriteCloser: newBuffer} + } + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP REPLICA", "START REPLICA", in that order. + fakedb := fakesqldb.New(t) + defer fakedb.Close() + mysqld := mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + backupResult, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 1, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + MysqlShutdownTimeout: MysqlShutdownTimeout, + }, bh) + + require.NoError(t, err) + require.Equal(t, mysqlctl.BackupUsable, backupResult) + + // Now try to restore the above backup. + fakeBh := &mysqlctl.FakeBackupHandle{} + fakeBh.ReadFileReturnF = func(ctx context.Context, filename string) (io.ReadCloser, error) { + // always make it fail, expect if it is the MANIFEST file, otherwise we won't start restoring the other files + buffer := bufferPerFiles[filename] + if filename != "MANIFEST" { + buffer.firstDone = false + } + return buffer, nil + } + + fakedb = fakesqldb.New(t) + defer fakedb.Close() + mysqld = mysqlctl.NewFakeMysqlDaemon(fakedb) + defer mysqld.Close() + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP REPLICA", "START REPLICA"} + + fakeStats := backupstats.NewFakeStats() + logger := logutil.NewMemoryLogger() + + restoreParams := mysqlctl.RestoreParams{ + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + BinLogPath: path.Join(backupRoot, "binlog"), + RelayLogPath: path.Join(backupRoot, "relaylog"), + RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), + RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + }, + Logger: logger, + Mysqld: mysqld, + Concurrency: 1, + HookExtraEnv: map[string]string{}, + DeleteBeforeRestore: false, + DbName: "test", + Keyspace: "test", + Shard: "-", + StartTime: time.Now(), + RestoreToPos: replication.Position{}, + RestoreToTimestamp: time.Time{}, + DryRun: false, + Stats: fakeStats, + MysqlShutdownTimeout: MysqlShutdownTimeout, + } + + // Successful restore. + bm, err := be.ExecuteRestore(ctx, restoreParams, fakeBh) + assert.ErrorContains(t, err, "failing first read") + assert.Nil(t, bm) + + ss := GetStats(fakeStats) + require.Equal(t, 5, ss.DestinationCloseStats) + require.Equal(t, 5, ss.DestinationOpenStats) + require.Equal(t, 0, ss.DestinationWriteStats) + require.Equal(t, 5, ss.SourceCloseStats) + require.Equal(t, 5, ss.SourceOpenStats) + require.Equal(t, 5, ss.SourceReadStats) } diff --git a/go/vt/mysqlctl/blackbox/utils.go b/go/vt/mysqlctl/blackbox/utils.go new file mode 100644 index 00000000000..e4e3f11fb3c --- /dev/null +++ b/go/vt/mysqlctl/blackbox/utils.go @@ -0,0 +1,196 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package blackbox + +import ( + "context" + "fmt" + "os" + "path" + "slices" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/capabilities" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/mysqlctl/backupstats" + "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" + logutilpb "vitess.io/vitess/go/vt/proto/logutil" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vttime" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +type StatSummary struct { + DestinationCloseStats int + DestinationOpenStats int + DestinationWriteStats int + SourceCloseStats int + SourceOpenStats int + SourceReadStats int +} + +func GetStats(stats *backupstats.FakeStats) StatSummary { + var ss StatSummary + + for _, sr := range stats.ScopeReturns { + switch sr.ScopeV[backupstats.ScopeOperation] { + case "Destination:Close": + ss.DestinationCloseStats += len(sr.TimedIncrementCalls) + case "Destination:Open": + ss.DestinationOpenStats += len(sr.TimedIncrementCalls) + case "Destination:Write": + if len(sr.TimedIncrementBytesCalls) > 0 { + ss.DestinationWriteStats++ + } + case "Source:Close": + ss.SourceCloseStats += len(sr.TimedIncrementCalls) + case "Source:Open": + ss.SourceOpenStats += len(sr.TimedIncrementCalls) + case "Source:Read": + if len(sr.TimedIncrementBytesCalls) > 0 { + ss.SourceReadStats++ + } + } + } + return ss +} + +func AssertLogs(t *testing.T, expectedLogs []string, logger *logutil.MemoryLogger) { + for _, log := range expectedLogs { + require.Truef(t, slices.ContainsFunc(logger.Events, func(event *logutilpb.Event) bool { + return event.GetValue() == log + }), "%s is missing from the logs", log) + } +} + +func SetupCluster(ctx context.Context, t *testing.T, dirs, filesPerDir int) (backupRoot string, keyspace string, shard string, ts *topo.Server) { + // Set up local backup directory + id := fmt.Sprintf("%d", time.Now().UnixNano()) + backupRoot = fmt.Sprintf("testdata/builtinbackup_test_%s", id) + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to execute semaphore acquire inside + // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). + for dirI := range dirs { + dirName := "test" + strconv.Itoa(dirI+1) + require.NoError(t, createBackupDir(dataDir, dirName)) + require.NoError(t, createBackupFiles(path.Join(dataDir, dirName), filesPerDir, "ibd")) + } + t.Cleanup(func() { + require.NoError(t, os.RemoveAll(backupRoot)) + }) + + needIt, err := NeedInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard = "mykeyspace", "-" + ts = memorytopo.NewServer(ctx, "cell1") + t.Cleanup(func() { + ts.Close() + }) + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + return backupRoot, keyspace, shard, ts +} + +// NeedInnoDBRedoLogSubdir indicates whether we need to create a redo log subdirectory. +// Starting with MySQL 8.0.30, the InnoDB redo logs are stored in a subdirectory of the +// (/. by default) called "#innodb_redo". See: +// +// https://dev.mysql.com/doc/refman/8.0/en/innodb-redo-log.html#innodb-modifying-redo-log-capacity +func NeedInnoDBRedoLogSubdir() (needIt bool, err error) { + mysqldVersionStr, err := mysqlctl.GetVersionString() + if err != nil { + return needIt, err + } + _, sv, err := mysqlctl.ParseVersionString(mysqldVersionStr) + if err != nil { + return needIt, err + } + versionStr := fmt.Sprintf("%d.%d.%d", sv.Major, sv.Minor, sv.Patch) + capableOf := mysql.ServerVersionCapableOf(versionStr) + if capableOf == nil { + return needIt, fmt.Errorf("cannot determine database flavor details for version %s", versionStr) + } + return capableOf(capabilities.DynamicRedoLogCapacityFlavorCapability) +} + +const MysqlShutdownTimeout = 1 * time.Minute + +func SetBuiltinBackupMysqldDeadline(t time.Duration) time.Duration { + old := mysqlctl.BuiltinBackupMysqldTimeout + mysqlctl.BuiltinBackupMysqldTimeout = t + + return old +} + +func createBackupDir(root string, dirs ...string) error { + for _, dir := range dirs { + if err := os.MkdirAll(path.Join(root, dir), 0755); err != nil { + return err + } + } + + return nil +} + +func createBackupFiles(root string, fileCount int, ext string) error { + for i := 0; i < fileCount; i++ { + f, err := os.Create(path.Join(root, fmt.Sprintf("%d.%s", i, ext))) + if err != nil { + return err + } + if _, err := f.Write([]byte("hello, world!")); err != nil { + return err + } + defer f.Close() + } + + return nil +} diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 5aa759f6f7a..2046a238400 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -29,18 +29,17 @@ import ( "os" "path" "path/filepath" - "sync" + "strconv" "sync/atomic" "time" "github.com/spf13/pflag" - "golang.org/x/sync/semaphore" + "golang.org/x/sync/errgroup" "vitess.io/vitess/go/ioutil" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/protoutil" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" stats "vitess.io/vitess/go/vt/mysqlctl/backupstats" @@ -60,6 +59,8 @@ const ( builtinBackupEngineName = "builtin" AutoIncrementalFromPos = "auto" dataDictionaryFile = "mysql.ibd" + + maxRetriesPerFile = 1 ) var ( @@ -149,6 +150,13 @@ type FileEntry struct { // ParentPath is an optional prefix to the Base path. If empty, it is ignored. Useful // for writing files in a temporary directory ParentPath string + + // RetryCount specifies how many times we retried restoring/backing up this FileEntry. + // If we fail to restore/backup this FileEntry, we will retry up to maxRetriesPerFile times. + // Every time the builtin backup engine retries this file, we increment this field by 1. + // We don't care about adding this information to the MANIFEST and also to not cause any compatibility issue + // we are adding the - json tag to let Go know it can ignore the field. + RetryCount int `json:"-"` } func init() { @@ -585,6 +593,11 @@ func (be *BuiltinBackupEngine) backupFiles( mysqlVersion string, incrDetails *IncrementalBackupDetails, ) (finalErr error) { + // backupFiles always wait for AddFiles to finish its work before returning, unless there has been a + // non-recoverable error in the process, in both cases we can cancel the context safely. + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // Get the files to backup. // We don't care about totalSize because we add each file separately. var fes []FileEntry @@ -599,43 +612,82 @@ func (be *BuiltinBackupEngine) backupFiles( } params.Logger.Infof("found %v files to backup", len(fes)) - // Backup with the provided concurrency. - sema := semaphore.NewWeighted(int64(params.Concurrency)) - wg := sync.WaitGroup{} + // The error here can be ignored safely. Failed FileEntry's are handled in the next 'if' statement. + _ = be.backupFileEntries(ctx, fes, bh, params) + // BackupHandle supports the BackupErrorRecorder interface for tracking errors + // across any goroutines that fan out to take the backup. This means that we + // don't need a local error recorder and can put everything through the bh. + // + // This handles the scenario where bh.AddFile() encounters an error asynchronously, + // which ordinarily would be lost in the context of `be.backupFile`, i.e. if an + // error were encountered + // [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142). + // + // All the errors are grouped per file, if one or more files failed, we back them up + // once more concurrently, if any of the retry fail, we fail-fast by canceling the context + // and return an error. There is no reason to continue processing the other retries, if + // one of them failed. + if files := bh.GetFailedFiles(); len(files) > 0 { + newFEs := make([]FileEntry, len(fes)) + for _, file := range files { + fileNb, err := strconv.Atoi(file) + if err != nil { + return vterrors.Wrapf(err, "failed to retry file '%s'", file) + } + oldFes := fes[fileNb] + newFEs[fileNb] = FileEntry{ + Base: oldFes.Base, + Name: oldFes.Name, + ParentPath: oldFes.ParentPath, + RetryCount: 1, + } + bh.ResetErrorForFile(file) + } + err = be.backupFileEntries(ctx, newFEs, bh, params) + if err != nil { + return err + } + } + + // Backup the MANIFEST file and apply retry logic. + var manifestErr error + for currentRetry := 0; currentRetry <= maxRetriesPerFile; currentRetry++ { + manifestErr = be.backupManifest(ctx, params, bh, backupPosition, purgedPosition, fromPosition, fromBackupName, serverUUID, mysqlVersion, incrDetails, fes, currentRetry) + if manifestErr == nil { + break + } + bh.ResetErrorForFile(backupManifestFileName) + } + if manifestErr != nil { + return manifestErr + } + return nil +} + +// backupFileEntries iterates over a slice of FileEntry, backing them up concurrently up to the defined concurrency limit. +// This function will ignore empty FileEntry, allowing the retry mechanism to send a partially empty slice, to not +// mess up the index of retriable FileEntry. +// This function does not leave any background operation behind itself, all calls to bh.AddFile will be finished or canceled. +func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []FileEntry, bh backupstorage.BackupHandle, params BackupParams) error { ctxCancel, cancel := context.WithCancel(ctx) defer func() { - // We may still have operations in flight that require a valid context, such as adding files to S3. - // Unless we encountered an error, we should not cancel the context, this is taken care of later - // in the process. If we encountered an error however, we can safely cancel the context as we should - // no longer work on anything and exit fast. - if finalErr != nil { - cancel() - } + // If we reached this defer in all cases we can cancel the context. + // The only ways to get here are: a panic, an error when ending the backup, a successful backup. + // For all three options, it is safe to cancel the context, there should be no pending operations + // that 1) haven't completed, 2) we care about anymore. + cancel() }() + g := errgroup.Group{} + g.SetLimit(params.Concurrency) for i := range fes { - wg.Add(1) - go func(i int) { - defer wg.Done() + if fes[i].Name == "" { + continue + } + g.Go(func() error { fe := &fes[i] - // Wait until we are ready to go, return if we encounter an error - acqErr := sema.Acquire(ctxCancel, 1) - if acqErr != nil { - log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error()) - bh.RecordError(acqErr) - cancel() - return - } - defer sema.Release(1) - - // First check if we have any error, if we have, there is no point trying backing up this file. - // We check for errors before checking if the context is canceled on purpose, if there was an - // error, the context would have been canceled already. - if bh.HasErrors() { - params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error()) - return - } + name := fmt.Sprintf("%v", i) // Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might // end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66, @@ -644,83 +696,30 @@ func (be *BuiltinBackupEngine) backupFiles( select { case <-ctxCancel.Done(): log.Errorf("Context canceled or timed out during %q backup", fe.Name) - bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) - return + bh.RecordError(name, vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) + return nil default: } // Backup the individual file. - name := fmt.Sprintf("%v", i) - if err := be.backupFile(ctxCancel, params, bh, fe, name); err != nil { - bh.RecordError(err) - cancel() + var errBackupFile error + if errBackupFile = be.backupFile(ctxCancel, params, bh, fe, name); errBackupFile != nil { + bh.RecordError(name, vterrors.Wrapf(errBackupFile, "failed to backup file '%s'", name)) + if fe.RetryCount >= maxRetriesPerFile { + // this is the last attempt, and we have an error, we can cancel everything and fail fast. + cancel() + } } - }(i) + return nil + }) } + _ = g.Wait() - wg.Wait() - - // BackupHandle supports the ErrorRecorder interface for tracking errors - // across any goroutines that fan out to take the backup. This means that we - // don't need a local error recorder and can put everything through the bh. - // - // This handles the scenario where bh.AddFile() encounters an error asynchronously, - // which ordinarily would be lost in the context of `be.backupFile`, i.e. if an - // error were encountered - // [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142). - if bh.HasErrors() { - return bh.Error() - } - - // open the MANIFEST - wc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) + err := bh.EndBackup(ctx) if err != nil { - return vterrors.Wrapf(err, "cannot add %v to backup", backupManifestFileName) - } - defer func() { - closeErr := wc.Close() - if finalErr == nil { - finalErr = closeErr - } - }() - - // JSON-encode and write the MANIFEST - bm := &builtinBackupManifest{ - // Common base fields - BackupManifest: BackupManifest{ - BackupName: bh.Name(), - BackupMethod: builtinBackupEngineName, - Position: backupPosition, - PurgedPosition: purgedPosition, - FromPosition: fromPosition, - FromBackup: fromBackupName, - Incremental: !fromPosition.IsZero(), - ServerUUID: serverUUID, - TabletAlias: params.TabletAlias, - Keyspace: params.Keyspace, - Shard: params.Shard, - BackupTime: params.BackupTime.UTC().Format(time.RFC3339), - FinishedTime: time.Now().UTC().Format(time.RFC3339), - MySQLVersion: mysqlVersion, - UpgradeSafe: params.UpgradeSafe, - IncrementalDetails: incrDetails, - }, - - // Builtin-specific fields - FileEntries: fes, - SkipCompress: !backupStorageCompress, - CompressionEngine: CompressionEngineName, - ExternalDecompressor: ManifestExternalDecompressorCmd, - } - data, err := json.MarshalIndent(bm, "", " ") - if err != nil { - return vterrors.Wrapf(err, "cannot JSON encode %v", backupManifestFileName) - } - if _, err := wc.Write([]byte(data)); err != nil { - return vterrors.Wrapf(err, "cannot write %v", backupManifestFileName) + return err } - - return nil + return bh.Error() } type backupPipe struct { @@ -733,6 +732,7 @@ type backupPipe struct { crc32 hash.Hash32 nn int64 done chan struct{} + failed chan struct{} closed int32 } @@ -743,6 +743,7 @@ func newBackupWriter(filename string, writerBufferSize int, maxSize int64, w io. filename: filename, maxSize: maxSize, done: make(chan struct{}), + failed: make(chan struct{}), } } @@ -752,10 +753,16 @@ func newBackupReader(filename string, maxSize int64, r io.Reader) *backupPipe { r: r, filename: filename, done: make(chan struct{}), + failed: make(chan struct{}), maxSize: maxSize, } } +func retryToString(retry int) string { + // We convert the retry number to an attempt number, increasing retry by one, so it looks more human friendly + return fmt.Sprintf("(attempt %d/%d)", retry+1, maxRetriesPerFile+1) +} + func (bp *backupPipe) Read(p []byte) (int, error) { nn, err := bp.r.Read(p) _, _ = bp.crc32.Write(p[:nn]) @@ -770,9 +777,17 @@ func (bp *backupPipe) Write(p []byte) (int, error) { return nn, err } -func (bp *backupPipe) Close() error { +func (bp *backupPipe) Close(isDone bool) (err error) { if atomic.CompareAndSwapInt32(&bp.closed, 0, 1) { - close(bp.done) + // If we fail to Flush the writer we must report this backup as a failure. + defer func() { + if isDone && err == nil { + close(bp.done) + return + } + close(bp.failed) + }() + if bp.w != nil { if err := bp.w.Flush(); err != nil { return err @@ -786,28 +801,31 @@ func (bp *backupPipe) HashString() string { return hex.EncodeToString(bp.crc32.Sum(nil)) } -func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool) { - messageStr := "restoring " +func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool, retryStr string) { + messageStr := "restoring" if !restore { - messageStr = "backing up " + messageStr = "backing up" } tick := time.NewTicker(period) defer tick.Stop() for { select { case <-ctx.Done(): - logger.Infof("Canceled %s of %q file", messageStr, bp.filename) + logger.Infof("Canceled %s of %q file %s", messageStr, bp.filename, retryStr) return case <-bp.done: - logger.Infof("Completed %s %q", messageStr, bp.filename) + logger.Infof("Completed %s %q %s", messageStr, bp.filename, retryStr) + return + case <-bp.failed: + logger.Infof("Failed %s %q %s", messageStr, bp.filename, retryStr) return case <-tick.C: written := float64(atomic.LoadInt64(&bp.nn)) if bp.maxSize == 0 { - logger.Infof("%s %q: %.02fkb", messageStr, bp.filename, written/1024.0) + logger.Infof("%s %q %s: %.02fkb", messageStr, bp.filename, retryStr, written/1024.0) } else { maxSize := float64(bp.maxSize) - logger.Infof("%s %q: %.02f%% (%.02f/%.02fkb)", messageStr, bp.filename, 100.0*written/maxSize, written/1024.0, maxSize/1024.0) + logger.Infof("%s %q %s: %.02f%% (%.02f/%.02fkb)", messageStr, bp.filename, retryStr, 100.0*written/maxSize, written/1024.0, maxSize/1024.0) } } } @@ -815,12 +833,15 @@ func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, // backupFile backs up an individual file. func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) { - ctx, cancel := context.WithCancel(ctx) - defer func() { - if finalErr != nil { - cancel() - } - }() + // We need another context that does not live outside of this function. + // Reporting progress, compressing and writing are operations that will be + // over by the time we exit this function, they can use this cancelable context. + // However, AddFile is something that may continue in the background even after + // this function exits. In this case, we give it the parent context so the caller + // has more control over when to cancel AddFile. + cancelableCtx, cancel := context.WithCancel(ctx) + defer cancel() + // Open the source file for reading. openSourceAt := time.Now() source, err := fe.open(params.Cnf, true) @@ -843,11 +864,12 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara return err } + retryStr := retryToString(fe.RetryCount) br := newBackupReader(fe.Name, fi.Size(), timedSource) - go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, false /*restore*/) + go br.ReportProgress(cancelableCtx, builtinBackupProgress, params.Logger, false /*restore*/, retryStr) // Open the destination file for writing, and a buffer. - params.Logger.Infof("Backing up file: %v", fe.Name) + params.Logger.Infof("Backing up file: %v %s", fe.Name, retryStr) openDestAt := time.Now() dest, err := bh.AddFile(ctx, name, fi.Size()) if err != nil { @@ -879,19 +901,20 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func() { // Close the backupPipe to finish writing on destination. - if err := bw.Close(); err != nil { + if err := bw.Close(createAndCopyErr == nil); err != nil { createAndCopyErr = errors.Join(createAndCopyErr, vterrors.Wrapf(err, "cannot flush destination: %v", name)) } - if err := br.Close(); err != nil { + if err := br.Close(createAndCopyErr == nil); err != nil { createAndCopyErr = errors.Join(createAndCopyErr, vterrors.Wrap(err, "failed to close the source reader")) } + }() // Create the gzip compression pipe, if necessary. if backupStorageCompress { var compressor io.WriteCloser if ExternalCompressorCmd != "" { - compressor, err = newExternalCompressor(ctx, ExternalCompressorCmd, writer, params.Logger) + compressor, err = newExternalCompressor(cancelableCtx, ExternalCompressorCmd, writer, params.Logger) } else { compressor, err = newBuiltinCompressor(CompressionEngineName, writer, params.Logger) } @@ -902,13 +925,13 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) - closer := ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout) + closer := ioutil.NewTimeoutCloser(cancelableCtx, compressor, closeTimeout) defer func() { // Close gzip to flush it, after that all data is sent to writer. closeCompressorAt := time.Now() - params.Logger.Infof("closing compressor") + params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, retryStr) if cerr := closer.Close(); err != nil { - cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name) + cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", fe.Name) params.Logger.Error(cerr) createAndCopyErr = errors.Join(createAndCopyErr, cerr) } @@ -938,6 +961,94 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara return nil } +func (be *BuiltinBackupEngine) backupManifest( + ctx context.Context, + params BackupParams, + bh backupstorage.BackupHandle, + backupPosition replication.Position, + purgedPosition replication.Position, + fromPosition replication.Position, + fromBackupName string, + serverUUID string, + mysqlVersion string, + incrDetails *IncrementalBackupDetails, + fes []FileEntry, + currentAttempt int, +) (finalErr error) { + retryStr := retryToString(currentAttempt) + params.Logger.Infof("Backing up file %s %s", backupManifestFileName, retryStr) + defer func() { + state := "Completed" + if finalErr != nil { + state = "Failed" + } + params.Logger.Infof("%s backing up %s %s", state, backupManifestFileName, retryStr) + }() + + // Creating this function allows us to ensure we always close the writer no matter what, + // and in case of success that we close it before calling bh.EndBackup. + addAndWrite := func() (addAndWriteError error) { + // open the MANIFEST + wc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) + if err != nil { + return vterrors.Wrapf(err, "cannot add %v to backup %s", backupManifestFileName, retryStr) + } + defer func() { + if err := wc.Close(); err != nil { + addAndWriteError = errors.Join(addAndWriteError, vterrors.Wrapf(err, "cannot close backup: %v", backupManifestFileName)) + } + }() + + // JSON-encode and write the MANIFEST + bm := &builtinBackupManifest{ + // Common base fields + BackupManifest: BackupManifest{ + BackupName: bh.Name(), + BackupMethod: builtinBackupEngineName, + Position: backupPosition, + PurgedPosition: purgedPosition, + FromPosition: fromPosition, + FromBackup: fromBackupName, + Incremental: !fromPosition.IsZero(), + ServerUUID: serverUUID, + TabletAlias: params.TabletAlias, + Keyspace: params.Keyspace, + Shard: params.Shard, + BackupTime: params.BackupTime.UTC().Format(time.RFC3339), + FinishedTime: time.Now().UTC().Format(time.RFC3339), + MySQLVersion: mysqlVersion, + UpgradeSafe: params.UpgradeSafe, + IncrementalDetails: incrDetails, + }, + + // Builtin-specific fields + FileEntries: fes, + SkipCompress: !backupStorageCompress, + CompressionEngine: CompressionEngineName, + ExternalDecompressor: ManifestExternalDecompressorCmd, + } + data, err := json.MarshalIndent(bm, "", " ") + if err != nil { + return vterrors.Wrapf(err, "cannot JSON encode %v %s", backupManifestFileName, retryStr) + } + if _, err := wc.Write(data); err != nil { + return vterrors.Wrapf(err, "cannot write %v %s", backupManifestFileName, retryStr) + } + return nil + } + + err := addAndWrite() + if err != nil { + return err + } + + err = bh.EndBackup(ctx) + if err != nil { + return err + } + return bh.Error() +} + // executeRestoreFullBackup restores the files from a full backup. The underlying mysql database service is expected to be stopped. func (be *BuiltinBackupEngine) executeRestoreFullBackup(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, bm builtinBackupManifest) error { if err := prepareToRestore(ctx, params.Cnf, params.Mysqld, params.Logger, params.MysqlShutdownTimeout); err != nil { @@ -997,8 +1108,8 @@ func (be *BuiltinBackupEngine) executeRestoreIncrementalBackup(ctx context.Conte // we return the position from which replication should start // otherwise an error is returned func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (*BackupManifest, error) { - var bm builtinBackupManifest - if err := getBackupManifestInto(ctx, bh, &bm); err != nil { + bm, err := be.restoreManifest(ctx, params, bh) + if err != nil { return nil, err } @@ -1007,7 +1118,6 @@ func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params Restor return nil, err } - var err error if bm.Incremental { err = be.executeRestoreIncrementalBackup(ctx, params, bh, bm) } else { @@ -1020,6 +1130,26 @@ func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params Restor return &bm.BackupManifest, nil } +func (be *BuiltinBackupEngine) restoreManifest(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (bm builtinBackupManifest, finalErr error) { + var retryCount int + defer func() { + state := "Completed" + if finalErr != nil { + state = "Failed" + } + params.Logger.Infof("%s restoring %s %s", state, backupManifestFileName, retryToString(retryCount)) + }() + + for ; retryCount <= maxRetriesPerFile; retryCount++ { + params.Logger.Infof("Restoring file %s %s", backupManifestFileName, retryToString(retryCount)) + if finalErr = getBackupManifestInto(ctx, bh, &bm); finalErr == nil { + break + } + params.Logger.Infof("Failed restoring %s %s", backupManifestFileName, retryToString(retryCount)) + } + return +} + // restoreFiles will copy all the files from the BackupStorage to the // right place. func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, bm builtinBackupManifest) (createdDir string, err error) { @@ -1040,75 +1170,79 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP } } fes := bm.FileEntries - sema := semaphore.NewWeighted(int64(params.Concurrency)) - rec := concurrency.AllErrorRecorder{} - wg := sync.WaitGroup{} - - ctxCancel, cancel := context.WithCancel(ctx) - defer func() { - // We may still have operations in flight that require a valid context, such as adding files to S3. - // Unless we encountered an error, we should not cancel the context. This is taken care of later - // in the process. If we encountered an error however, we can safely cancel the context as we should - // no longer work on anything and exit fast. + _ = be.restoreFileEntries(ctx, fes, bh, bm, params, createdDir) + if files := bh.GetFailedFiles(); len(files) > 0 { + newFEs := make([]FileEntry, len(fes)) + for _, file := range files { + fileNb, err := strconv.Atoi(file) + if err != nil { + return "", vterrors.Wrapf(err, "failed to retry file '%s'", file) + } + oldFes := fes[fileNb] + newFEs[fileNb] = FileEntry{ + Base: oldFes.Base, + Name: oldFes.Name, + ParentPath: oldFes.ParentPath, + Hash: oldFes.Hash, + RetryCount: 1, + } + bh.ResetErrorForFile(file) + } + err = be.restoreFileEntries(ctx, newFEs, bh, bm, params, createdDir) if err != nil { - cancel() + return "", err } - }() + } + return createdDir, nil +} + +func (be *BuiltinBackupEngine) restoreFileEntries(ctx context.Context, fes []FileEntry, bh backupstorage.BackupHandle, bm builtinBackupManifest, params RestoreParams, createdDir string) error { + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(params.Concurrency) for i := range fes { - wg.Add(1) - go func(i int) { - defer wg.Done() + if fes[i].Name == "" { + continue + } + g.Go(func() error { fe := &fes[i] - // Wait until we are ready to go, return if we encounter an error - acqErr := sema.Acquire(ctxCancel, 1) - if acqErr != nil { - log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error()) - rec.RecordError(acqErr) - cancel() - return - } - defer sema.Release(1) - - // First check if we have any error, if we have, there is no point trying to restore this file. - // We check for errors before checking if the context is canceled on purpose, if there was an - // error, the context would have been canceled already. - if rec.HasErrors() { - params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error()) - return - } - + name := fmt.Sprintf("%v", i) // Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might // end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66, // which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces // unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check. select { - case <-ctxCancel.Done(): + case <-ctx.Done(): log.Errorf("Context canceled or timed out during %q restore", fe.Name) - rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) - return + bh.RecordError(name, vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) + return nil default: } fe.ParentPath = createdDir + // And restore the file. - name := fmt.Sprintf("%v", i) - params.Logger.Infof("Copying file %v: %v", name, fe.Name) - err := be.restoreFile(ctxCancel, params, bh, fe, bm, name) - if err != nil { - rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name)) - cancel() + params.Logger.Infof("Copying file %v: %v %s", name, fe.Name, retryToString(fe.RetryCount)) + if errRestore := be.restoreFile(ctx, params, bh, fe, bm, name); errRestore != nil { + bh.RecordError(name, vterrors.Wrapf(errRestore, "failed to restore file %v to %v", name, fe.Name)) + if fe.RetryCount >= maxRetriesPerFile { + // this is the last attempt, and we have an error, we can return an error, which will let errgroup + // know it can cancel the context + return errRestore + } } - }(i) + return nil + }) } - wg.Wait() - return createdDir, rec.Error() + _ = g.Wait() + return bh.Error() } // restoreFile restores an individual file. func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, bm builtinBackupManifest, name string) (finalErr error) { ctx, cancel := context.WithCancel(ctx) defer cancel() + // Open the source file for reading. openSourceAt := time.Now() source, err := bh.ReadFile(ctx, name) @@ -1126,8 +1260,15 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt)) }() - br := newBackupReader(name, 0, timedSource) - go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true) + // Create the backup/source reader and start reporting progress + retryStr := retryToString(fe.RetryCount) + br := newBackupReader(fe.Name, 0, timedSource) + go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true, retryStr) + defer func() { + if err := br.Close(finalErr == nil); err != nil { + finalErr = vterrors.Wrap(finalErr, "failed to close the source reader") + } + }() var reader io.Reader = br // Open the destination file for writing. @@ -1213,10 +1354,6 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa return vterrors.Wrap(err, "failed to flush destination buffer") } - if err := br.Close(); err != nil { - return vterrors.Wrap(err, "failed to close the source reader") - } - return nil } diff --git a/go/vt/mysqlctl/cephbackupstorage/ceph.go b/go/vt/mysqlctl/cephbackupstorage/ceph.go index f8e33dbe641..62330b869f0 100644 --- a/go/vt/mysqlctl/cephbackupstorage/ceph.go +++ b/go/vt/mysqlctl/cephbackupstorage/ceph.go @@ -32,9 +32,10 @@ import ( minio "github.com/minio/minio-go" "github.com/spf13/pflag" - "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + + "vitess.io/vitess/go/vt/log" + errorsbackup "vitess.io/vitess/go/vt/mysqlctl/errors" "vitess.io/vitess/go/vt/servenv" ) @@ -69,23 +70,8 @@ type CephBackupHandle struct { dir string name string readOnly bool - errors concurrency.AllErrorRecorder waitGroup sync.WaitGroup -} - -// RecordError is part of the concurrency.ErrorRecorder interface. -func (bh *CephBackupHandle) RecordError(err error) { - bh.errors.RecordError(err) -} - -// HasErrors is part of the concurrency.ErrorRecorder interface. -func (bh *CephBackupHandle) HasErrors() bool { - return bh.errors.HasErrors() -} - -// Error is part of the concurrency.ErrorRecorder interface. -func (bh *CephBackupHandle) Error() error { - return bh.errors.Error() + errorsbackup.PerFileErrorRecorder } // Directory implements BackupHandle. @@ -109,7 +95,7 @@ func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string, filesi defer bh.waitGroup.Done() // ceph bucket name is where the backups will go - //backup handle dir field contains keyspace/shard value + // backup handle dir field contains keyspace/shard value bucket := alterBucketName(bh.dir) // Give PutObject() the read end of the pipe. @@ -120,7 +106,7 @@ func (bh *CephBackupHandle) AddFile(ctx context.Context, filename string, filesi // Signal the writer that an error occurred, in case it's not done writing yet. reader.CloseWithError(err) // In case the error happened after the writer finished, we need to remember it. - bh.RecordError(err) + bh.RecordError(filename, err) } }() // Give our caller the write end of the pipe. diff --git a/go/vt/mysqlctl/errors/errors.go b/go/vt/mysqlctl/errors/errors.go new file mode 100644 index 00000000000..02485901e50 --- /dev/null +++ b/go/vt/mysqlctl/errors/errors.go @@ -0,0 +1,106 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import ( + "errors" + "strings" + "sync" +) + +type BackupErrorRecorder interface { + RecordError(string, error) + HasErrors() bool + Error() error + GetFailedFiles() []string + ResetErrorForFile(string) +} + +// PerFileErrorRecorder records errors and group them by filename. +// This is particularly useful when processing several files at the same time +// and wanting to know which files failed. +type PerFileErrorRecorder struct { + mu sync.Mutex + errors map[string][]error +} + +// RecordError records a possible error: +// - does nothing if err is nil +func (pfer *PerFileErrorRecorder) RecordError(filename string, err error) { + if err == nil { + return + } + + pfer.mu.Lock() + defer pfer.mu.Unlock() + + if pfer.errors == nil { + pfer.errors = make(map[string][]error, 1) + } + pfer.errors[filename] = append(pfer.errors[filename], err) +} + +// HasErrors returns true if we ever recorded an error +func (pfer *PerFileErrorRecorder) HasErrors() bool { + pfer.mu.Lock() + defer pfer.mu.Unlock() + return len(pfer.errors) > 0 +} + +// Error returns all the errors that were recorded +func (pfer *PerFileErrorRecorder) Error() error { + pfer.mu.Lock() + defer pfer.mu.Unlock() + if pfer.errors == nil { + return nil + } + + var errs []string + for _, fileErrs := range pfer.errors { + for _, err := range fileErrs { + errs = append(errs, err.Error()) + } + } + if len(errs) == 0 { + return nil + } + return errors.New(strings.Join(errs, "; ")) +} + +// GetFailedFiles returns a slice of filenames, each of this file have at least 1 error. +func (pfer *PerFileErrorRecorder) GetFailedFiles() []string { + pfer.mu.Lock() + defer pfer.mu.Unlock() + if pfer.errors == nil { + return nil + } + files := make([]string, 0, len(pfer.errors)) + for filename := range pfer.errors { + files = append(files, filename) + } + return files +} + +// ResetErrorForFile removes all the errors of a given file. +func (pfer *PerFileErrorRecorder) ResetErrorForFile(filename string) { + pfer.mu.Lock() + defer pfer.mu.Unlock() + if pfer.errors == nil { + return + } + delete(pfer.errors, filename) +} diff --git a/go/vt/mysqlctl/fakebackupstorage.go b/go/vt/mysqlctl/fakebackupstorage.go index 75587191157..582b422cf58 100644 --- a/go/vt/mysqlctl/fakebackupstorage.go +++ b/go/vt/mysqlctl/fakebackupstorage.go @@ -21,20 +21,21 @@ import ( "fmt" "io" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + "vitess.io/vitess/go/vt/mysqlctl/errors" ) type FakeBackupHandle struct { Dir string NameV string ReadOnly bool - Errors concurrency.AllErrorRecorder + errors.PerFileErrorRecorder AbortBackupCalls []context.Context AbortBackupReturn error AddFileCalls []FakeBackupHandleAddFileCall AddFileReturn FakeBackupHandleAddFileReturn + AddFileReturnF func(filename string) FakeBackupHandleAddFileReturn EndBackupCalls []context.Context EndBackupReturn error ReadFileCalls []FakeBackupHandleReadFileCall @@ -57,18 +58,6 @@ type FakeBackupHandleReadFileCall struct { Filename string } -func (fbh *FakeBackupHandle) RecordError(err error) { - fbh.Errors.RecordError(err) -} - -func (fbh *FakeBackupHandle) HasErrors() bool { - return fbh.Errors.HasErrors() -} - -func (fbh *FakeBackupHandle) Error() error { - return fbh.Errors.Error() -} - func (fbh *FakeBackupHandle) Directory() string { return fbh.Dir } @@ -79,6 +68,11 @@ func (fbh *FakeBackupHandle) Name() string { func (fbh *FakeBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { fbh.AddFileCalls = append(fbh.AddFileCalls, FakeBackupHandleAddFileCall{ctx, filename, filesize}) + + if fbh.AddFileReturnF != nil { + r := fbh.AddFileReturnF(filename) + return r.WriteCloser, r.Err + } return fbh.AddFileReturn.WriteCloser, fbh.AddFileReturn.Err } diff --git a/go/vt/mysqlctl/filebackupstorage/file.go b/go/vt/mysqlctl/filebackupstorage/file.go index 99148d9169b..bd73c55e70c 100644 --- a/go/vt/mysqlctl/filebackupstorage/file.go +++ b/go/vt/mysqlctl/filebackupstorage/file.go @@ -27,8 +27,9 @@ import ( "github.com/spf13/pflag" + "vitess.io/vitess/go/vt/mysqlctl/errors" + "vitess.io/vitess/go/ioutil" - "vitess.io/vitess/go/vt/concurrency" stats "vitess.io/vitess/go/vt/mysqlctl/backupstats" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/servenv" @@ -59,7 +60,7 @@ type FileBackupHandle struct { dir string name string readOnly bool - errors concurrency.AllErrorRecorder + errors.PerFileErrorRecorder } func NewBackupHandle( @@ -79,21 +80,6 @@ func NewBackupHandle( } } -// RecordError is part of the concurrency.ErrorRecorder interface. -func (fbh *FileBackupHandle) RecordError(err error) { - fbh.errors.RecordError(err) -} - -// HasErrors is part of the concurrency.ErrorRecorder interface. -func (fbh *FileBackupHandle) HasErrors() bool { - return fbh.errors.HasErrors() -} - -// Error is part of the concurrency.ErrorRecorder interface. -func (fbh *FileBackupHandle) Error() error { - return fbh.errors.Error() -} - // Directory is part of the BackupHandle interface func (fbh *FileBackupHandle) Directory() string { return fbh.dir diff --git a/go/vt/mysqlctl/gcsbackupstorage/gcs.go b/go/vt/mysqlctl/gcsbackupstorage/gcs.go index 814395a225a..adecbb9bbba 100644 --- a/go/vt/mysqlctl/gcsbackupstorage/gcs.go +++ b/go/vt/mysqlctl/gcsbackupstorage/gcs.go @@ -32,8 +32,9 @@ import ( "google.golang.org/api/iterator" "google.golang.org/api/option" + "vitess.io/vitess/go/vt/mysqlctl/errors" + "vitess.io/vitess/go/trace" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/servenv" ) @@ -65,22 +66,7 @@ type GCSBackupHandle struct { dir string name string readOnly bool - errors concurrency.AllErrorRecorder -} - -// RecordError is part of the concurrency.ErrorRecorder interface. -func (bh *GCSBackupHandle) RecordError(err error) { - bh.errors.RecordError(err) -} - -// HasErrors is part of the concurrency.ErrorRecorder interface. -func (bh *GCSBackupHandle) HasErrors() bool { - return bh.errors.HasErrors() -} - -// Error is part of the concurrency.ErrorRecorder interface. -func (bh *GCSBackupHandle) Error() error { - return bh.errors.Error() + errors.PerFileErrorRecorder } // Directory implements BackupHandle. diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go index b3a8117aafa..97861e83729 100644 --- a/go/vt/mysqlctl/s3backupstorage/s3.go +++ b/go/vt/mysqlctl/s3backupstorage/s3.go @@ -40,6 +40,7 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/retry" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -48,7 +49,8 @@ import ( "github.com/aws/smithy-go/middleware" "github.com/spf13/pflag" - "vitess.io/vitess/go/vt/concurrency" + errorsbackup "vitess.io/vitess/go/vt/mysqlctl/errors" + "vitess.io/vitess/go/vt/log" stats "vitess.io/vitess/go/vt/mysqlctl/backupstats" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" @@ -144,8 +146,8 @@ type S3BackupHandle struct { dir string name string readOnly bool - errors concurrency.AllErrorRecorder waitGroup sync.WaitGroup + errorsbackup.PerFileErrorRecorder } // Directory is part of the backupstorage.BackupHandle interface. @@ -158,39 +160,23 @@ func (bh *S3BackupHandle) Name() string { return bh.name } -// RecordError is part of the concurrency.ErrorRecorder interface. -func (bh *S3BackupHandle) RecordError(err error) { - bh.errors.RecordError(err) -} - -// HasErrors is part of the concurrency.ErrorRecorder interface. -func (bh *S3BackupHandle) HasErrors() bool { - return bh.errors.HasErrors() -} - -// Error is part of the concurrency.ErrorRecorder interface. -func (bh *S3BackupHandle) Error() error { - return bh.errors.Error() -} - // AddFile is part of the backupstorage.BackupHandle interface. func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { if bh.readOnly { return nil, fmt.Errorf("AddFile cannot be called on read-only backup") } - // Calculate s3 upload part size using the source filesize - partSizeBytes := manager.DefaultUploadPartSize - if filesize > 0 { - minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts) - // Round up to ensure large enough partsize - calculatedPartSizeBytes := int64(math.Ceil(minimumPartSize)) - if calculatedPartSizeBytes > partSizeBytes { - partSizeBytes = calculatedPartSizeBytes - } - } + partSizeBytes := calculateUploadPartSize(filesize) reader, writer := io.Pipe() + bh.handleAddFile(ctx, filename, partSizeBytes, reader, func(err error) { + reader.CloseWithError(err) + }) + + return writer, nil +} + +func (bh *S3BackupHandle) handleAddFile(ctx context.Context, filename string, partSizeBytes int64, reader io.Reader, closer func(error)) { bh.waitGroup.Add(1) go func() { @@ -221,12 +207,24 @@ func (bh *S3BackupHandle) AddFile(ctx context.Context, filename string, filesize }) }) if err != nil { - reader.CloseWithError(err) - bh.RecordError(err) + closer(err) + bh.RecordError(filename, err) } }() +} - return writer, nil +func calculateUploadPartSize(filesize int64) int64 { + // Calculate s3 upload part size using the source filesize + partSizeBytes := manager.DefaultUploadPartSize + if filesize > 0 { + minimumPartSize := float64(filesize) / float64(manager.MaxUploadParts) + // Round up to ensure large enough partsize + calculatedPartSizeBytes := int64(math.Ceil(minimumPartSize)) + if calculatedPartSizeBytes > partSizeBytes { + partSizeBytes = calculatedPartSizeBytes + } + } + return partSizeBytes } // EndBackup is part of the backupstorage.BackupHandle interface. @@ -505,13 +503,24 @@ func (bs *S3BackupStorage) client() (*s3.Client, error) { return nil, err } - bs._client = s3.NewFromConfig(cfg, func(o *s3.Options) { - o.UsePathStyle = forcePath - if retryCount >= 0 { - o.RetryMaxAttempts = retryCount - o.Retryer = &ClosedConnectionRetryer{} - } - }, s3.WithEndpointResolverV2(newEndpointResolver())) + options := []func(options *s3.Options){ + func(o *s3.Options) { + o.UsePathStyle = forcePath + if retryCount >= 0 { + o.RetryMaxAttempts = retryCount + o.Retryer = &ClosedConnectionRetryer{ + awsRetryer: retry.NewStandard(func(options *retry.StandardOptions) { + options.MaxAttempts = retryCount + }), + } + } + }, + } + if endpoint != "" { + options = append(options, s3.WithEndpointResolverV2(newEndpointResolver())) + } + + bs._client = s3.NewFromConfig(cfg, options...) if len(bucket) == 0 { return nil, fmt.Errorf("--s3_backup_storage_bucket required") diff --git a/go/vt/mysqlctl/s3backupstorage/s3_mock.go b/go/vt/mysqlctl/s3backupstorage/s3_mock.go new file mode 100644 index 00000000000..f244c4d63b1 --- /dev/null +++ b/go/vt/mysqlctl/s3backupstorage/s3_mock.go @@ -0,0 +1,223 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package s3backupstorage + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl/backupstats" + "vitess.io/vitess/go/vt/mysqlctl/backupstorage" +) + +type FakeS3BackupHandle struct { + *S3BackupHandle + + AddFileReturnF func(s3 *S3BackupHandle, ctx context.Context, filename string, filesize int64, firstAdd bool) (io.WriteCloser, error) + ReadFileReturnF func(s3 *S3BackupHandle, ctx context.Context, filename string, firstRead bool) (io.ReadCloser, error) + + mu sync.Mutex + addPerFile map[string]int + readPerFile map[string]int +} + +type FakeConfig struct { + Region string + Endpoint string + Bucket string + ForcePath bool +} + +func InitFlag(cfg FakeConfig) { + region = cfg.Region + endpoint = cfg.Endpoint + bucket = cfg.Bucket + forcePath = cfg.ForcePath +} + +func NewFakeS3BackupHandle(ctx context.Context, dir, name string, logger logutil.Logger, stats backupstats.Stats) (*FakeS3BackupHandle, error) { + s := newS3BackupStorage() + bs := s.WithParams(backupstorage.Params{ + Logger: logger, + Stats: stats, + }) + bh, err := bs.StartBackup(ctx, dir, name) + if err != nil { + return nil, err + } + return &FakeS3BackupHandle{ + S3BackupHandle: bh.(*S3BackupHandle), + addPerFile: make(map[string]int), + readPerFile: make(map[string]int), + }, nil +} + +func NewFakeS3RestoreHandle(ctx context.Context, dir string, logger logutil.Logger, stats backupstats.Stats) (*FakeS3BackupHandle, error) { + s := newS3BackupStorage() + bs := s.WithParams(backupstorage.Params{ + Logger: logger, + Stats: stats, + }) + bhs, err := bs.ListBackups(ctx, dir) + if err != nil { + return nil, err + } + return &FakeS3BackupHandle{ + S3BackupHandle: bhs[0].(*S3BackupHandle), + addPerFile: make(map[string]int), + readPerFile: make(map[string]int), + }, nil +} + +func (fbh *FakeS3BackupHandle) Directory() string { + return fbh.S3BackupHandle.Directory() +} + +func (fbh *FakeS3BackupHandle) Name() string { + return fbh.S3BackupHandle.Name() +} + +func (fbh *FakeS3BackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { + fbh.mu.Lock() + defer func() { + fbh.addPerFile[filename] += 1 + fbh.mu.Unlock() + }() + + if fbh.AddFileReturnF != nil { + return fbh.AddFileReturnF(fbh.S3BackupHandle, ctx, filename, filesize, fbh.addPerFile[filename] == 0) + } + return fbh.S3BackupHandle.AddFile(ctx, filename, filesize) +} + +func (fbh *FakeS3BackupHandle) EndBackup(ctx context.Context) error { + return fbh.S3BackupHandle.EndBackup(ctx) +} + +func (fbh *FakeS3BackupHandle) AbortBackup(ctx context.Context) error { + return fbh.S3BackupHandle.AbortBackup(ctx) +} + +func (fbh *FakeS3BackupHandle) ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) { + fbh.mu.Lock() + defer func() { + fbh.readPerFile[filename] += 1 + fbh.mu.Unlock() + }() + + if fbh.ReadFileReturnF != nil { + return fbh.ReadFileReturnF(fbh.S3BackupHandle, ctx, filename, fbh.readPerFile[filename] == 0) + } + return fbh.S3BackupHandle.ReadFile(ctx, filename) +} + +func (fbh *FakeS3BackupHandle) RecordError(s string, err error) { + fbh.S3BackupHandle.RecordError(s, err) +} + +func (fbh *FakeS3BackupHandle) HasErrors() bool { + return fbh.S3BackupHandle.HasErrors() +} + +func (fbh *FakeS3BackupHandle) Error() error { + return fbh.S3BackupHandle.Error() +} + +func (fbh *FakeS3BackupHandle) GetFailedFiles() []string { + return fbh.S3BackupHandle.GetFailedFiles() +} + +func (fbh *FakeS3BackupHandle) ResetErrorForFile(s string) { + fbh.S3BackupHandle.ResetErrorForFile(s) +} + +type failReadPipeReader struct { + *io.PipeReader +} + +func (fwr *failReadPipeReader) Read(p []byte) (n int, err error) { + return 0, errors.New("failing read") +} + +func FailFirstWrite(s3bh *S3BackupHandle, ctx context.Context, filename string, filesize int64, firstAdd bool) (io.WriteCloser, error) { + if s3bh.readOnly { + return nil, fmt.Errorf("AddFile cannot be called on read-only backup") + } + + partSizeBytes := calculateUploadPartSize(filesize) + reader, writer := io.Pipe() + r := io.Reader(reader) + + if firstAdd { + r = &failReadPipeReader{PipeReader: reader} + } + + s3bh.handleAddFile(ctx, filename, partSizeBytes, r, func(err error) { + reader.CloseWithError(err) + }) + return writer, nil +} + +func FailAllWrites(s3bh *S3BackupHandle, ctx context.Context, filename string, filesize int64, _ bool) (io.WriteCloser, error) { + if s3bh.readOnly { + return nil, fmt.Errorf("AddFile cannot be called on read-only backup") + } + + partSizeBytes := calculateUploadPartSize(filesize) + reader, writer := io.Pipe() + r := &failReadPipeReader{PipeReader: reader} + + s3bh.handleAddFile(ctx, filename, partSizeBytes, r, func(err error) { + r.PipeReader.CloseWithError(err) + }) + return writer, nil +} + +type failRead struct{} + +func (fr *failRead) Read(p []byte) (n int, err error) { + return 0, errors.New("failing read") +} + +func (fr *failRead) Close() error { + return nil +} + +func FailFirstRead(s3bh *S3BackupHandle, ctx context.Context, filename string, firstRead bool) (io.ReadCloser, error) { + rc, err := s3bh.ReadFile(ctx, filename) + if err != nil { + return nil, err + } + if firstRead { + return &failRead{}, nil + } + return rc, nil +} + +// FailAllReadExpectManifest is used to fail every attempt at reading a file from S3. +// Only the MANIFEST file is allowed to be read, because otherwise we wouldn't even try to read the normal files. +func FailAllReadExpectManifest(s3bh *S3BackupHandle, ctx context.Context, filename string, _ bool) (io.ReadCloser, error) { + const manifestFileName = "MANIFEST" + if filename == manifestFileName { + return s3bh.ReadFile(ctx, filename) + } + return &failRead{}, nil +} diff --git a/go/vt/wrangler/testlib/backup_test.go b/go/vt/wrangler/testlib/backup_test.go index cb61c4bab99..b540fc9f8f0 100644 --- a/go/vt/wrangler/testlib/backup_test.go +++ b/go/vt/wrangler/testlib/backup_test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/mysql/capabilities" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" @@ -36,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" + "vitess.io/vitess/go/vt/mysqlctl/blackbox" "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" "vitess.io/vitess/go/vt/topo/memorytopo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -132,7 +132,7 @@ func testBackupRestore(t *testing.T, cDetails *compressionDetails) error { require.NoError(t, os.MkdirAll(s, os.ModePerm)) } - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := blackbox.NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { newPath := path.Join(sourceInnodbLogDir, mysql.DynamicRedoLogSubdir) @@ -371,7 +371,7 @@ func TestBackupRestoreLagged(t *testing.T) { } require.NoError(t, os.WriteFile(path.Join(sourceInnodbDataDir, "innodb_data_1"), []byte("innodb data 1 contents"), os.ModePerm)) - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := blackbox.NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { newPath := path.Join(sourceInnodbLogDir, mysql.DynamicRedoLogSubdir) @@ -591,7 +591,7 @@ func TestRestoreUnreachablePrimary(t *testing.T) { } require.NoError(t, os.WriteFile(path.Join(sourceInnodbDataDir, "innodb_data_1"), []byte("innodb data 1 contents"), os.ModePerm)) - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := blackbox.NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { newPath := path.Join(sourceInnodbLogDir, mysql.DynamicRedoLogSubdir) @@ -767,7 +767,7 @@ func TestDisableActiveReparents(t *testing.T) { } require.NoError(t, os.WriteFile(path.Join(sourceInnodbDataDir, "innodb_data_1"), []byte("innodb data 1 contents"), os.ModePerm)) - needIt, err := needInnoDBRedoLogSubdir() + needIt, err := blackbox.NeedInnoDBRedoLogSubdir() require.NoError(t, err) if needIt { newPath := path.Join(sourceInnodbLogDir, mysql.DynamicRedoLogSubdir) @@ -877,25 +877,3 @@ func TestDisableActiveReparents(t *testing.T) { assert.False(t, destTablet.FakeMysqlDaemon.Replicating) assert.True(t, destTablet.FakeMysqlDaemon.Running) } - -// needInnoDBRedoLogSubdir indicates whether we need to create a redo log subdirectory. -// Starting with MySQL 8.0.30, the InnoDB redo logs are stored in a subdirectory of the -// (/. by default) called "#innodb_redo". See: -// -// https://dev.mysql.com/doc/refman/8.0/en/innodb-redo-log.html#innodb-modifying-redo-log-capacity -func needInnoDBRedoLogSubdir() (needIt bool, err error) { - mysqldVersionStr, err := mysqlctl.GetVersionString() - if err != nil { - return needIt, err - } - _, sv, err := mysqlctl.ParseVersionString(mysqldVersionStr) - if err != nil { - return needIt, err - } - versionStr := fmt.Sprintf("%d.%d.%d", sv.Major, sv.Minor, sv.Patch) - capableOf := mysql.ServerVersionCapableOf(versionStr) - if capableOf == nil { - return needIt, fmt.Errorf("cannot determine database flavor details for version %s", versionStr) - } - return capableOf(capabilities.DynamicRedoLogCapacityFlavorCapability) -} diff --git a/test/ci_workflow_gen.go b/test/ci_workflow_gen.go index 7956c491408..9253e16c43f 100644 --- a/test/ci_workflow_gen.go +++ b/test/ci_workflow_gen.go @@ -157,6 +157,9 @@ var ( "vreplication_migrate", "vreplication_vtctldclient_vdiff2_movetables_tz", } + clusterRequiringMinio = []string{ + "21", + } ) type unitTest struct { @@ -174,6 +177,7 @@ type clusterTest struct { EnableBinlogTransactionCompression bool PartialKeyspace bool Cores16 bool + NeedsMinio bool } type vitessTesterTest struct { @@ -286,6 +290,13 @@ func generateClusterWorkflows(list []string, tpl string) { break } } + minioClusters := canonnizeList(clusterRequiringMinio) + for _, minioCluster := range minioClusters { + if minioCluster == cluster { + test.NeedsMinio = true + break + } + } if mysqlVersion == mysql57 { test.Platform = string(mysql57) } diff --git a/test/config.json b/test/config.json index 1e278546c7a..34c6bc1a6be 100644 --- a/test/config.json +++ b/test/config.json @@ -136,6 +136,15 @@ "RetryMax": 1, "Tags": [] }, + "backup_s3": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/backup/s3", "-timeout", "30m"], + "Command": [], + "Manual": false, + "Shard": "21", + "RetryMax": 1, + "Tags": [] + }, "backup_only": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/backup/vtbackup", "-timeout", "20m"], diff --git a/test/templates/cluster_endtoend_test.tpl b/test/templates/cluster_endtoend_test.tpl index 6fe58fae361..8d0a2f650b5 100644 --- a/test/templates/cluster_endtoend_test.tpl +++ b/test/templates/cluster_endtoend_test.tpl @@ -157,6 +157,15 @@ jobs: {{end}} + {{if .NeedsMinio }} + - name: Install Minio + if: steps.skip-workflow.outputs.skip-workflow == 'false' + run: | + wget https://dl.min.io/server/minio/release/linux-amd64/minio + chmod +x minio + mv minio /usr/local/bin + {{end}} + {{if .MakeTools}} - name: Installing zookeeper and consul