Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental backup: fix race condition in reading 'mysqlbinlog' output #14330

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" {
return false, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp)
}
log.Infof("ReadBinlogFilesTimestampsResponse: %+v", resp)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful logging and does not add substantial volume to the output.

incrDetails := &IncrementalBackupDetails{
FirstTimestamp: FormatRFC3339(protoutil.TimeFromProto(resp.FirstTimestamp).UTC()),
FirstTimestampBinlog: filepath.Base(resp.FirstTimestampBinlog),
Expand Down
140 changes: 85 additions & 55 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,85 +1294,86 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, req *mysqlctlpb.Apply
}

// parseBinlogEntryTimestamp attempts to extract a timestamp from a binlog entry.
func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err error) {
func parseBinlogEntryTimestamp(logEntry string) (t time.Time, err error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found bool was a superfluous variable. t.IsZero() is an indicator that the timestamp was not found.

if len(logEntry) == 0 {
return false, t, nil
return t, nil
}
if logEntry[0] != '#' {
return false, t, nil
return t, nil
}
if submatch := binlogEntryCommittedTimestampRegex.FindStringSubmatch(logEntry); submatch != nil {
// MySQL 8.0
binlogEntryCommittedTimestamp := submatch[1]
unixMicros, err := strconv.ParseInt(binlogEntryCommittedTimestamp, 10, 64)
if err != nil {
return false, t, err
return t, err
}
return true, time.UnixMicro(unixMicros), nil
return time.UnixMicro(unixMicros), nil
}
if submatch := binlogEntryTimestampGTIDRegexp.FindStringSubmatch(logEntry); submatch != nil {
// MySQL 5.7
t, err = ParseBinlogTimestamp(submatch[1])
if err != nil {
return false, t, err
return t, err
}
return true, t, nil
return t, nil
}
return false, t, nil
return t, nil
}

// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function
// either looks for the first such timestamp or the last.
func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) {
// looks for the first and last timestamps.
func (mysqld *Mysqld) scanBinlogTimestamp(
mysqlbinlogDir string,
mysqlbinlogEnv []string,
mysqlbinlogName string,
binlogFile string,
stopAtFirst bool, // unused at this moment, to be used as an optimization hint
) (
firstMatchedTime time.Time,
lastMatchedTime time.Time,
err error,
) {
args := []string{binlogFile}
mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...)
mysqlbinlogCmd.Dir = mysqlbinlogDir
mysqlbinlogCmd.Env = mysqlbinlogEnv
log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd)
pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql
if err != nil {
return matchedTime, false, err
}
scanComplete := make(chan error)
intentionalKill := false
scan := func() {
defer close(scanComplete)
defer func() {
intentionalKill = true
mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released
}()
return firstMatchedTime, lastMatchedTime, err
}
scan := func() error {
Copy link
Contributor Author

@shlomi-noach shlomi-noach Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • scanComplete not needed since we now run scan() sequentially.
  • intentionalKill not needed right now. Previously we did optimize for an early break out of the process. For purposes of this bugfix we remove that optimization. In the future we might re-introduce said optimization.

Copy link
Contributor

@mattlord mattlord Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could also set a different writer for stdout instead of using a pipe:

	stdoutbuf := &bytes.Buffer{}
	mysqlbinlogCmd.Stdout = stdoutbuf
	scanComplete := make(chan error)
	intentionalKill := false
	scan := func() {
		defer close(scanComplete)
		defer func() {
			intentionalKill = true
			mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released
		}()
		// Read line by line and process it
		scanner := bufio.NewScanner(stdoutbuf)

Then I think cmd.Wait() would work as expected as it's supposed to wait for stdout writes to complete AFAICT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's also an option. But anyway, the given solution works as expected. For now let's keep it. I might iterate with a replacement output if/when I work on that optimization again. I don't want to push it too close to the GA release.

// Read line by line and process it
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
logEntry := scanner.Text()

found, t, err := parseBinlogEntryTimestamp(logEntry)
t, err := parseBinlogEntryTimestamp(logEntry)
Copy link
Contributor Author

@shlomi-noach shlomi-noach Oct 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found was a superfluous variable. t.IsZero() is an indicator to timestamp being found or not.

if err != nil {
scanComplete <- err
return
return err
}
if found {
matchedTime = t
matchFound = true
if t.IsZero() {
continue
}
if found && stopAtFirst {
// Found the first timestamp and it's all we need. We won't scan any further and so we should also
// kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe).
return
if firstMatchedTime.IsZero() {
firstMatchedTime = t
}
lastMatchedTime = t
}
return nil
}
if err := mysqlbinlogCmd.Start(); err != nil {
return matchedTime, false, err
if err := mysqlbinlogCmd.Start(); err != nil { // Start() is nonblockig
return firstMatchedTime, lastMatchedTime, err
}
go scan()
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill {
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
defer mysqlbinlogCmd.Process.Kill()
if err := scan(); err != nil { // We must first exhaust reading the command's output, before calling cmd.Wait()
return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps")
}
if err := <-scanComplete; err != nil {
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ")
if err := mysqlbinlogCmd.Wait(); err != nil {
return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above few lines are the main change in the PR. First scan() the output, and only then mysqlbinlogCmd.Wait(). This is critical because cmd.Wait() terminates and closes all piped output, even if it's premature to do so.

}
return matchedTime, matchFound, nil
return firstMatchedTime, lastMatchedTime, nil
}

// ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps
Expand Down Expand Up @@ -1402,31 +1403,60 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
return nil, err
}

lastMatchedTimeMap := map[string]time.Time{} // a simple cache to avoid rescanning same files. Key=binlog file name
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An optimization to avoid scanning the same file(2) twice


resp := &mysqlctlpb.ReadBinlogFilesTimestampsResponse{}
// Find first timestamp
for _, binlogFile := range req.BinlogFileNames {
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return nil, err
}
if found {
resp.FirstTimestamp = protoutil.TimeToProto(t)
err = func() error {
for _, binlogFile := range req.BinlogFileNames {
firstMatchedTime, lastMatchedTime, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return vterrors.Wrapf(err, "while scanning for first binlog timestamp in %v", binlogFile)
}
if !lastMatchedTime.IsZero() {
// cache result
lastMatchedTimeMap[binlogFile] = lastMatchedTime
}
if firstMatchedTime.IsZero() {
// Timestamp not found in this file.
continue
}
resp.FirstTimestamp = protoutil.TimeToProto(firstMatchedTime)
resp.FirstTimestampBinlog = binlogFile
break
return nil // early break
}
return nil
}()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above refactor: I chose to embed in a local function, this was not strictly needed, but it feels safe what with local variables and early breaks/returns. The main important change is that we cache lastMatchedTime.

if err != nil {
return resp, err
}
// Find last timestamp
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return nil, err
}
if found {
resp.LastTimestamp = protoutil.TimeToProto(t)
err = func() error {
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]

// See if we have a cached value for this file. This is certainly be the situation if there's a single binary log file in req.BinlogFileNames,
// which means the first file and last file are the same, and so we have already parsed the file while searching for the first timestamp.
lastMatchedTime, ok := lastMatchedTimeMap[binlogFile]
if !ok {
var err error
_, lastMatchedTime, err = mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return vterrors.Wrapf(err, "while scanning for last binlog timestamp in %v", binlogFile)
}
}
if lastMatchedTime.IsZero() {
// Timestamp not found in this file.
continue
}
resp.LastTimestamp = protoutil.TimeToProto(lastMatchedTime)
resp.LastTimestampBinlog = binlogFile
break
return nil // early break
}
return nil
}()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above refactor: I chose to embed in a local function, this was not strictly needed, but it feels safe what with local variables and early breaks/returns. The main important change is that we check lastMatchedTime in cache.

if err != nil {
return resp, err
}
return resp, nil
}
Expand Down
10 changes: 2 additions & 8 deletions go/vt/mysqlctl/mysqld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func TestParseBinlogEntryTimestamp(t *testing.T) {
tcases := []struct {
name string
entry string
found bool
tm time.Time
}{
{
Expand All @@ -157,24 +156,19 @@ func TestParseBinlogEntryTimestamp(t *testing.T) {
{
name: "mysql80",
entry: "#230605 16:06:34 server id 22233 end_log_pos 1037 CRC32 0xa4707c5b GTID last_committed=4 sequence_number=5 rbr_only=no original_committed_timestamp=1685970394031366 immediate_commit_timestamp=1685970394032458 transaction_length=186",
found: true,
tm: time.UnixMicro(1685970394031366),
},
{
name: "mysql57",
entry: "#230608 13:14:31 server id 484362839 end_log_pos 259 CRC32 0xc07510d0 GTID last_committed=0 sequence_number=1 rbr_only=yes",
found: true,
tm: time.Date(2023, time.June, 8, 13, 14, 31, 0, time.UTC),
},
}
for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {
found, tm, err := parseBinlogEntryTimestamp(tcase.entry)
tm, err := parseBinlogEntryTimestamp(tcase.entry)
assert.NoError(t, err)
assert.Equal(t, tcase.found, found)
if tcase.found {
assert.Equal(t, tcase.tm, tm)
}
assert.Equal(t, tcase.tm, tm)
})
}
}
Loading