Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-18.0] Incremental backup: fix race condition in reading 'mysqlbinlog' output (#14330) #14335

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" {
return false, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp)
}
log.Infof("ReadBinlogFilesTimestampsResponse: %+v", resp)
incrDetails := &IncrementalBackupDetails{
FirstTimestamp: FormatRFC3339(protoutil.TimeFromProto(resp.FirstTimestamp).UTC()),
FirstTimestampBinlog: filepath.Base(resp.FirstTimestampBinlog),
Expand Down
140 changes: 85 additions & 55 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,85 +1294,86 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, req *mysqlctlpb.Apply
}

// parseBinlogEntryTimestamp attempts to extract a timestamp from a binlog entry.
func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err error) {
func parseBinlogEntryTimestamp(logEntry string) (t time.Time, err error) {
if len(logEntry) == 0 {
return false, t, nil
return t, nil
}
if logEntry[0] != '#' {
return false, t, nil
return t, nil
}
if submatch := binlogEntryCommittedTimestampRegex.FindStringSubmatch(logEntry); submatch != nil {
// MySQL 8.0
binlogEntryCommittedTimestamp := submatch[1]
unixMicros, err := strconv.ParseInt(binlogEntryCommittedTimestamp, 10, 64)
if err != nil {
return false, t, err
return t, err
}
return true, time.UnixMicro(unixMicros), nil
return time.UnixMicro(unixMicros), nil
}
if submatch := binlogEntryTimestampGTIDRegexp.FindStringSubmatch(logEntry); submatch != nil {
// MySQL 5.7
t, err = ParseBinlogTimestamp(submatch[1])
if err != nil {
return false, t, err
return t, err
}
return true, t, nil
return t, nil
}
return false, t, nil
return t, nil
}

// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function
// either looks for the first such timestamp or the last.
func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) {
// looks for the first and last timestamps.
func (mysqld *Mysqld) scanBinlogTimestamp(
mysqlbinlogDir string,
mysqlbinlogEnv []string,
mysqlbinlogName string,
binlogFile string,
stopAtFirst bool, // unused at this moment, to be used as an optimization hint
) (
firstMatchedTime time.Time,
lastMatchedTime time.Time,
err error,
) {
args := []string{binlogFile}
mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...)
mysqlbinlogCmd.Dir = mysqlbinlogDir
mysqlbinlogCmd.Env = mysqlbinlogEnv
log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd)
pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql
if err != nil {
return matchedTime, false, err
}
scanComplete := make(chan error)
intentionalKill := false
scan := func() {
defer close(scanComplete)
defer func() {
intentionalKill = true
mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released
}()
return firstMatchedTime, lastMatchedTime, err
}
scan := func() error {
// Read line by line and process it
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
logEntry := scanner.Text()

found, t, err := parseBinlogEntryTimestamp(logEntry)
t, err := parseBinlogEntryTimestamp(logEntry)
if err != nil {
scanComplete <- err
return
return err
}
if found {
matchedTime = t
matchFound = true
if t.IsZero() {
continue
}
if found && stopAtFirst {
// Found the first timestamp and it's all we need. We won't scan any further and so we should also
// kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe).
return
if firstMatchedTime.IsZero() {
firstMatchedTime = t
}
lastMatchedTime = t
}
return nil
}
if err := mysqlbinlogCmd.Start(); err != nil {
return matchedTime, false, err
if err := mysqlbinlogCmd.Start(); err != nil { // Start() is nonblockig
return firstMatchedTime, lastMatchedTime, err
}
go scan()
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill {
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
defer mysqlbinlogCmd.Process.Kill()
if err := scan(); err != nil { // We must first exhaust reading the command's output, before calling cmd.Wait()
return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps")
}
if err := <-scanComplete; err != nil {
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ")
if err := mysqlbinlogCmd.Wait(); err != nil {
return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
}
return matchedTime, matchFound, nil
return firstMatchedTime, lastMatchedTime, nil
}

// ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps
Expand Down Expand Up @@ -1402,31 +1403,60 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
return nil, err
}

lastMatchedTimeMap := map[string]time.Time{} // a simple cache to avoid rescanning same files. Key=binlog file name

resp := &mysqlctlpb.ReadBinlogFilesTimestampsResponse{}
// Find first timestamp
for _, binlogFile := range req.BinlogFileNames {
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return nil, err
}
if found {
resp.FirstTimestamp = protoutil.TimeToProto(t)
err = func() error {
for _, binlogFile := range req.BinlogFileNames {
firstMatchedTime, lastMatchedTime, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return vterrors.Wrapf(err, "while scanning for first binlog timestamp in %v", binlogFile)
}
if !lastMatchedTime.IsZero() {
// cache result
lastMatchedTimeMap[binlogFile] = lastMatchedTime
}
if firstMatchedTime.IsZero() {
// Timestamp not found in this file.
continue
}
resp.FirstTimestamp = protoutil.TimeToProto(firstMatchedTime)
resp.FirstTimestampBinlog = binlogFile
break
return nil // early break
}
return nil
}()
if err != nil {
return resp, err
}
// Find last timestamp
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return nil, err
}
if found {
resp.LastTimestamp = protoutil.TimeToProto(t)
err = func() error {
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]

// See if we have a cached value for this file. This is certainly be the situation if there's a single binary log file in req.BinlogFileNames,
// which means the first file and last file are the same, and so we have already parsed the file while searching for the first timestamp.
lastMatchedTime, ok := lastMatchedTimeMap[binlogFile]
if !ok {
var err error
_, lastMatchedTime, err = mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return vterrors.Wrapf(err, "while scanning for last binlog timestamp in %v", binlogFile)
}
}
if lastMatchedTime.IsZero() {
// Timestamp not found in this file.
continue
}
resp.LastTimestamp = protoutil.TimeToProto(lastMatchedTime)
resp.LastTimestampBinlog = binlogFile
break
return nil // early break
}
return nil
}()
if err != nil {
return resp, err
}
return resp, nil
}
Expand Down
10 changes: 2 additions & 8 deletions go/vt/mysqlctl/mysqld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func TestParseBinlogEntryTimestamp(t *testing.T) {
tcases := []struct {
name string
entry string
found bool
tm time.Time
}{
{
Expand All @@ -157,24 +156,19 @@ func TestParseBinlogEntryTimestamp(t *testing.T) {
{
name: "mysql80",
entry: "#230605 16:06:34 server id 22233 end_log_pos 1037 CRC32 0xa4707c5b GTID last_committed=4 sequence_number=5 rbr_only=no original_committed_timestamp=1685970394031366 immediate_commit_timestamp=1685970394032458 transaction_length=186",
found: true,
tm: time.UnixMicro(1685970394031366),
},
{
name: "mysql57",
entry: "#230608 13:14:31 server id 484362839 end_log_pos 259 CRC32 0xc07510d0 GTID last_committed=0 sequence_number=1 rbr_only=yes",
found: true,
tm: time.Date(2023, time.June, 8, 13, 14, 31, 0, time.UTC),
},
}
for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {
found, tm, err := parseBinlogEntryTimestamp(tcase.entry)
tm, err := parseBinlogEntryTimestamp(tcase.entry)
assert.NoError(t, err)
assert.Equal(t, tcase.found, found)
if tcase.found {
assert.Equal(t, tcase.tm, tm)
}
assert.Equal(t, tcase.tm, tm)
})
}
}
Loading