-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental backup: fix race condition in reading 'mysqlbinlog' output #14330
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1294,85 +1294,86 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, req *mysqlctlpb.Apply | |
} | ||
|
||
// parseBinlogEntryTimestamp attempts to extract a timestamp from a binlog entry. | ||
func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err error) { | ||
func parseBinlogEntryTimestamp(logEntry string) (t time.Time, err error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if len(logEntry) == 0 { | ||
return false, t, nil | ||
return t, nil | ||
} | ||
if logEntry[0] != '#' { | ||
return false, t, nil | ||
return t, nil | ||
} | ||
if submatch := binlogEntryCommittedTimestampRegex.FindStringSubmatch(logEntry); submatch != nil { | ||
// MySQL 8.0 | ||
binlogEntryCommittedTimestamp := submatch[1] | ||
unixMicros, err := strconv.ParseInt(binlogEntryCommittedTimestamp, 10, 64) | ||
if err != nil { | ||
return false, t, err | ||
return t, err | ||
} | ||
return true, time.UnixMicro(unixMicros), nil | ||
return time.UnixMicro(unixMicros), nil | ||
} | ||
if submatch := binlogEntryTimestampGTIDRegexp.FindStringSubmatch(logEntry); submatch != nil { | ||
// MySQL 5.7 | ||
t, err = ParseBinlogTimestamp(submatch[1]) | ||
if err != nil { | ||
return false, t, err | ||
return t, err | ||
} | ||
return true, t, nil | ||
return t, nil | ||
} | ||
return false, t, nil | ||
return t, nil | ||
} | ||
|
||
// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function | ||
// either looks for the first such timestamp or the last. | ||
func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) { | ||
// looks for the first and last timestamps. | ||
func (mysqld *Mysqld) scanBinlogTimestamp( | ||
mysqlbinlogDir string, | ||
mysqlbinlogEnv []string, | ||
mysqlbinlogName string, | ||
binlogFile string, | ||
stopAtFirst bool, // unused at this moment, to be used as an optimization hint | ||
) ( | ||
firstMatchedTime time.Time, | ||
lastMatchedTime time.Time, | ||
err error, | ||
) { | ||
args := []string{binlogFile} | ||
mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...) | ||
mysqlbinlogCmd.Dir = mysqlbinlogDir | ||
mysqlbinlogCmd.Env = mysqlbinlogEnv | ||
log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd) | ||
pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql | ||
if err != nil { | ||
return matchedTime, false, err | ||
} | ||
scanComplete := make(chan error) | ||
intentionalKill := false | ||
scan := func() { | ||
defer close(scanComplete) | ||
defer func() { | ||
intentionalKill = true | ||
mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released | ||
}() | ||
return firstMatchedTime, lastMatchedTime, err | ||
} | ||
scan := func() error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you could also set a different writer for stdout instead of using a pipe:
Then I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's also an option. But anyway, the given solution works as expected. For now let's keep it. I might iterate with a replacement output if/when I work on that optimization again. I don't want to push it too close to the GA release. |
||
// Read line by line and process it | ||
scanner := bufio.NewScanner(pipe) | ||
for scanner.Scan() { | ||
logEntry := scanner.Text() | ||
|
||
found, t, err := parseBinlogEntryTimestamp(logEntry) | ||
t, err := parseBinlogEntryTimestamp(logEntry) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if err != nil { | ||
scanComplete <- err | ||
return | ||
return err | ||
} | ||
if found { | ||
matchedTime = t | ||
matchFound = true | ||
if t.IsZero() { | ||
continue | ||
} | ||
if found && stopAtFirst { | ||
// Found the first timestamp and it's all we need. We won't scan any further and so we should also | ||
// kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe). | ||
return | ||
if firstMatchedTime.IsZero() { | ||
firstMatchedTime = t | ||
} | ||
lastMatchedTime = t | ||
} | ||
return nil | ||
} | ||
if err := mysqlbinlogCmd.Start(); err != nil { | ||
return matchedTime, false, err | ||
if err := mysqlbinlogCmd.Start(); err != nil { // Start() is nonblockig | ||
return firstMatchedTime, lastMatchedTime, err | ||
} | ||
go scan() | ||
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill { | ||
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps") | ||
defer mysqlbinlogCmd.Process.Kill() | ||
if err := scan(); err != nil { // We must first exhaust reading the command's output, before calling cmd.Wait() | ||
return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps") | ||
} | ||
if err := <-scanComplete; err != nil { | ||
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ") | ||
if err := mysqlbinlogCmd.Wait(); err != nil { | ||
return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above few lines are the main change in the PR. First |
||
} | ||
return matchedTime, matchFound, nil | ||
return firstMatchedTime, lastMatchedTime, nil | ||
} | ||
|
||
// ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps | ||
|
@@ -1402,31 +1403,60 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc | |
return nil, err | ||
} | ||
|
||
lastMatchedTimeMap := map[string]time.Time{} // a simple cache to avoid rescanning same files. Key=binlog file name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An optimization to avoid scanning the same file(2) twice |
||
|
||
resp := &mysqlctlpb.ReadBinlogFilesTimestampsResponse{} | ||
// Find first timestamp | ||
for _, binlogFile := range req.BinlogFileNames { | ||
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if found { | ||
resp.FirstTimestamp = protoutil.TimeToProto(t) | ||
err = func() error { | ||
for _, binlogFile := range req.BinlogFileNames { | ||
firstMatchedTime, lastMatchedTime, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true) | ||
if err != nil { | ||
return vterrors.Wrapf(err, "while scanning for first binlog timestamp in %v", binlogFile) | ||
} | ||
if !lastMatchedTime.IsZero() { | ||
// cache result | ||
lastMatchedTimeMap[binlogFile] = lastMatchedTime | ||
} | ||
if firstMatchedTime.IsZero() { | ||
// Timestamp not found in this file. | ||
continue | ||
} | ||
resp.FirstTimestamp = protoutil.TimeToProto(firstMatchedTime) | ||
resp.FirstTimestampBinlog = binlogFile | ||
break | ||
return nil // early break | ||
} | ||
return nil | ||
}() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above refactor: I chose to embed in a local function, this was not strictly needed, but it feels safe what with local variables and early breaks/returns. The main important change is that we cache |
||
if err != nil { | ||
return resp, err | ||
} | ||
// Find last timestamp | ||
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- { | ||
binlogFile := req.BinlogFileNames[i] | ||
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if found { | ||
resp.LastTimestamp = protoutil.TimeToProto(t) | ||
err = func() error { | ||
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- { | ||
binlogFile := req.BinlogFileNames[i] | ||
|
||
// See if we have a cached value for this file. This is certainly be the situation if there's a single binary log file in req.BinlogFileNames, | ||
// which means the first file and last file are the same, and so we have already parsed the file while searching for the first timestamp. | ||
lastMatchedTime, ok := lastMatchedTimeMap[binlogFile] | ||
if !ok { | ||
var err error | ||
_, lastMatchedTime, err = mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false) | ||
if err != nil { | ||
return vterrors.Wrapf(err, "while scanning for last binlog timestamp in %v", binlogFile) | ||
} | ||
} | ||
if lastMatchedTime.IsZero() { | ||
// Timestamp not found in this file. | ||
continue | ||
} | ||
resp.LastTimestamp = protoutil.TimeToProto(lastMatchedTime) | ||
resp.LastTimestampBinlog = binlogFile | ||
break | ||
return nil // early break | ||
} | ||
return nil | ||
}() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The above refactor: I chose to embed in a local function, this was not strictly needed, but it feels safe what with local variables and early breaks/returns. The main important change is that we check |
||
if err != nil { | ||
return resp, err | ||
} | ||
return resp, nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useful logging and does not add substantial volume to the output.