Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix flaky test on mysqlshell backup engine #17037

Merged
merged 4 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 25 additions & 28 deletions go/vt/mysqlctl/mysqlshellbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ var (
// disable redo logging and double write buffer
mysqlShellSpeedUpRestore = false

mysqlShellBackupBinaryName = "mysqlsh"

// use when checking if we need to create the directory on the local filesystem or not.
knownObjectStoreParams = []string{"s3BucketName", "osBucketName", "azureContainerName"}

Expand Down Expand Up @@ -87,7 +85,9 @@ type MySQLShellBackupManifest struct {
}

func init() {
BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{}
BackupRestoreEngineMap[mysqlShellBackupEngineName] = &MySQLShellBackupEngine{
binaryName: "mysqlsh",
}

for _, cmd := range []string{"vtcombo", "vttablet", "vtbackup", "vttestserver", "vtctldclient"} {
servenv.OnParseFor(cmd, registerMysqlShellBackupEngineFlags)
Expand All @@ -106,6 +106,7 @@ func registerMysqlShellBackupEngineFlags(fs *pflag.FlagSet) {
// MySQLShellBackupEngine encapsulates the logic to implement the restoration
// of a mysql-shell based backup.
type MySQLShellBackupEngine struct {
binaryName string
}

const (
Expand Down Expand Up @@ -164,41 +165,37 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back
return BackupUnusable, vterrors.Wrap(err, "failed to fetch position")
}

cmd := exec.CommandContext(ctx, mysqlShellBackupBinaryName, args...)
cmd := exec.CommandContext(ctx, be.binaryName, args...)

params.Logger.Infof("running %s", cmd.String())

cmdOut, err := cmd.StdoutPipe()
if err != nil {
return BackupUnusable, vterrors.Wrap(err, "cannot create stdout pipe")
}
cmdOriginalErr, err := cmd.StderrPipe()
if err != nil {
return BackupUnusable, vterrors.Wrap(err, "cannot create stderr pipe")
}
if err := cmd.Start(); err != nil {
return BackupUnusable, vterrors.Wrap(err, "can't start mysqlshell")
}
stdoutReader, stdoutWriter := io.Pipe()
stderrReader, stderrWriter := io.Pipe()
lockWaiterReader, lockWaiterWriter := io.Pipe()

pipeReader, pipeWriter := io.Pipe()
cmdErr := io.TeeReader(cmdOriginalErr, pipeWriter)
cmd.Stdout = stdoutWriter
cmd.Stderr = stderrWriter
combinedErr := io.TeeReader(stderrReader, lockWaiterWriter)

cmdWg := &sync.WaitGroup{}
cmdWg.Add(3)
go releaseReadLock(ctx, pipeReader, params, cmdWg, lockAcquired)
go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", cmdOut, params.Logger, cmdWg.Done)
go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", cmdErr, params.Logger, cmdWg.Done)
go releaseReadLock(ctx, lockWaiterReader, params, cmdWg, lockAcquired)
go scanLinesToLogger(mysqlShellBackupEngineName+" stdout", stdoutReader, params.Logger, cmdWg.Done)
go scanLinesToLogger(mysqlShellBackupEngineName+" stderr", combinedErr, params.Logger, cmdWg.Done)

// Get exit status.
if err := cmd.Wait(); err != nil {
pipeWriter.Close() // make sure we close the writer so the goroutines above will complete.
return BackupUnusable, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed")
}
// we run the command, wait for it to complete and close all pipes so the goroutines can complete on their own.
// after that we can process if an error has happened or not.
err = cmd.Run()

// close the pipeWriter and wait for the goroutines to have read all the logs
pipeWriter.Close()
stdoutWriter.Close()
stderrWriter.Close()
lockWaiterWriter.Close()
cmdWg.Wait()

if err != nil {
return BackupUnusable, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed")
}

// open the MANIFEST
params.Logger.Infof("Writing backup MANIFEST")
mwc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown)
Expand Down Expand Up @@ -371,7 +368,7 @@ func (be *MySQLShellBackupEngine) ExecuteRestore(ctx context.Context, params Res
if err := cmd.Wait(); err != nil {
return nil, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed")
}
params.Logger.Infof("%s completed successfully", mysqlShellBackupBinaryName)
params.Logger.Infof("%s completed successfully", be.binaryName)

// disable local_infile now that the restore is done.
err = params.Mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL LOCAL_INFILE=0")
Expand Down
18 changes: 9 additions & 9 deletions go/vt/mysqlctl/mysqlshellbackupengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,10 @@ func generateTestFile(t *testing.T, name, contents string) {
// during ExecuteBackup(), even if the backup didn't succeed.
func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
originalLocation := mysqlShellBackupLocation
originalBinary := mysqlShellBackupBinaryName
mysqlShellBackupLocation = "logical"
mysqlShellBackupBinaryName = path.Join(t.TempDir(), "test.sh")

defer func() { // restore the original values.
defer func() { // restore the original value.
mysqlShellBackupLocation = originalLocation
mysqlShellBackupBinaryName = originalBinary
}()

logger := logutil.NewMemoryLogger()
Expand All @@ -340,7 +337,6 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
mysql := NewFakeMysqlDaemon(fakedb)
defer mysql.Close()

be := &MySQLShellBackupEngine{}
params := BackupParams{
TabletAlias: "test",
Logger: logger,
Expand All @@ -351,6 +347,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
}

t.Run("lock released if we see the mysqlsh lock being acquired", func(t *testing.T) {
be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")}
logger.Clear()
manifestBuffer := ioutil.NewBytesBufferWriter()
bs.StartBackupReturn.BackupHandle = &FakeBackupHandle{
Expand All @@ -359,7 +356,8 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
}

// this simulates mysql shell completing without any issues.
generateTestFile(t, mysqlShellBackupBinaryName, fmt.Sprintf("#!/bin/bash\n>&2 echo %s", mysqlShellLockMessage))
generateTestFile(t, be.binaryName, fmt.Sprintf(
"#!/bin/bash\n>&2 echo %s; echo \"backup completed\"; sleep 0.01", mysqlShellLockMessage))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rvrangel This feels still like it works around the race? Is there a way to more fundamentally fix the race? There's afaik no guarantee that here that the actual implementation will take some time before it exits after the last line outputted and we'd lose that last line in logging etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not that I know, but let me know if you have other ideas we could try. the main problem seems to happen when the pipe/file is closed during the Scan() is taking place in the other goroutine and I haven't found a way of avoiding it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rvrangel Fwiw, I've tried running it without the sleep 0.01 in the test program and it doesn't seem to fail? So is that really needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you run the test as I did here (so it runs thousands of times with -race -count=1000)? If I remove the sleep I do see the errors pop up every now and then when I run it. If you don't, maybe try it with 10k instead.


bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name())
require.NoError(t, err)
Expand All @@ -380,7 +378,8 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
"failed to release the global lock after mysqlsh")
})

t.Run("lock released if when we don't see mysqlsh released it", func(t *testing.T) {
t.Run("lock released if we don't see mysqlsh release it", func(t *testing.T) {
be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")}
mysql.GlobalReadLock = false // clear lock status.
logger.Clear()
manifestBuffer := ioutil.NewBytesBufferWriter()
Expand All @@ -390,7 +389,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
}

// this simulates mysqlshell completing, but we don't see the message that is released its lock.
generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 0")
generateTestFile(t, be.binaryName, "#!/bin/bash\nexit 0")

bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name())
require.NoError(t, err)
Expand All @@ -407,6 +406,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
})

t.Run("lock released when backup fails", func(t *testing.T) {
be := &MySQLShellBackupEngine{binaryName: path.Join(t.TempDir(), "mysqlsh.sh")}
mysql.GlobalReadLock = false // clear lock status.
logger.Clear()
manifestBuffer := ioutil.NewBytesBufferWriter()
Expand All @@ -416,7 +416,7 @@ func TestMySQLShellBackupEngine_ExecuteBackup_ReleaseLock(t *testing.T) {
}

// this simulates the backup process failing.
generateTestFile(t, mysqlShellBackupBinaryName, "#!/bin/bash\nexit 1")
generateTestFile(t, be.binaryName, "#!/bin/bash\nexit 1")

bh, err := bs.StartBackup(context.Background(), t.TempDir(), t.Name())
require.NoError(t, err)
Expand Down
Loading