Skip to content

Commit

Permalink
Incremental backup: fix race condition in reading 'mysqlbinlog' output (
Browse files Browse the repository at this point in the history
#14330)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
vitess-bot[bot] committed Oct 23, 2023
1 parent 5f0df9a commit fb9e39f
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 63 deletions.
1 change: 1 addition & 0 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par
if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" {
return false, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp)
}
log.Infof("ReadBinlogFilesTimestampsResponse: %+v", resp)
incrDetails := &IncrementalBackupDetails{
FirstTimestamp: FormatRFC3339(protoutil.TimeFromProto(resp.FirstTimestamp).UTC()),
FirstTimestampBinlog: filepath.Base(resp.FirstTimestampBinlog),
Expand Down
140 changes: 85 additions & 55 deletions go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,85 +1294,86 @@ func (mysqld *Mysqld) ApplyBinlogFile(ctx context.Context, req *mysqlctlpb.Apply
}

// parseBinlogEntryTimestamp attempts to extract a timestamp from a binlog entry.
func parseBinlogEntryTimestamp(logEntry string) (found bool, t time.Time, err error) {
func parseBinlogEntryTimestamp(logEntry string) (t time.Time, err error) {
if len(logEntry) == 0 {
return false, t, nil
return t, nil
}
if logEntry[0] != '#' {
return false, t, nil
return t, nil
}
if submatch := binlogEntryCommittedTimestampRegex.FindStringSubmatch(logEntry); submatch != nil {
// MySQL 8.0
binlogEntryCommittedTimestamp := submatch[1]
unixMicros, err := strconv.ParseInt(binlogEntryCommittedTimestamp, 10, 64)
if err != nil {
return false, t, err
return t, err
}
return true, time.UnixMicro(unixMicros), nil
return time.UnixMicro(unixMicros), nil
}
if submatch := binlogEntryTimestampGTIDRegexp.FindStringSubmatch(logEntry); submatch != nil {
// MySQL 5.7
t, err = ParseBinlogTimestamp(submatch[1])
if err != nil {
return false, t, err
return t, err
}
return true, t, nil
return t, nil
}
return false, t, nil
return t, nil
}

// scanBinlogTimestamp invokes a `mysqlbinlog` binary to look for a timestamp in the given binary. The function
// either looks for the first such timestamp or the last.
func (mysqld *Mysqld) scanBinlogTimestamp(mysqlbinlogDir string, mysqlbinlogEnv []string, mysqlbinlogName string, binlogFile string, stopAtFirst bool) (matchedTime time.Time, matchFound bool, err error) {
// looks for the first and last timestamps.
func (mysqld *Mysqld) scanBinlogTimestamp(
mysqlbinlogDir string,
mysqlbinlogEnv []string,
mysqlbinlogName string,
binlogFile string,
stopAtFirst bool, // unused at this moment, to be used as an optimization hint
) (
firstMatchedTime time.Time,
lastMatchedTime time.Time,
err error,
) {
args := []string{binlogFile}
mysqlbinlogCmd := exec.Command(mysqlbinlogName, args...)
mysqlbinlogCmd.Dir = mysqlbinlogDir
mysqlbinlogCmd.Env = mysqlbinlogEnv
log.Infof("ApplyBinlogFile: running mysqlbinlog command: %#v", mysqlbinlogCmd)
pipe, err := mysqlbinlogCmd.StdoutPipe() // to be piped into mysql
if err != nil {
return matchedTime, false, err
}
scanComplete := make(chan error)
intentionalKill := false
scan := func() {
defer close(scanComplete)
defer func() {
intentionalKill = true
mysqlbinlogCmd.Process.Kill() // ensures the binlog file is released
}()
return firstMatchedTime, lastMatchedTime, err
}
scan := func() error {
// Read line by line and process it
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
logEntry := scanner.Text()

found, t, err := parseBinlogEntryTimestamp(logEntry)
t, err := parseBinlogEntryTimestamp(logEntry)
if err != nil {
scanComplete <- err
return
return err
}
if found {
matchedTime = t
matchFound = true
if t.IsZero() {
continue
}
if found && stopAtFirst {
// Found the first timestamp and it's all we need. We won't scan any further and so we should also
// kill mysqlbinlog (otherwise it keeps waiting until we've read the entire pipe).
return
if firstMatchedTime.IsZero() {
firstMatchedTime = t
}
lastMatchedTime = t
}
return nil
}
if err := mysqlbinlogCmd.Start(); err != nil {
return matchedTime, false, err
if err := mysqlbinlogCmd.Start(); err != nil { // Start() is nonblockig
return firstMatchedTime, lastMatchedTime, err
}
go scan()
if err := mysqlbinlogCmd.Wait(); err != nil && !intentionalKill {
return matchedTime, false, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
defer mysqlbinlogCmd.Process.Kill()
if err := scan(); err != nil { // We must first exhaust reading the command's output, before calling cmd.Wait()
return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps")
}
if err := <-scanComplete; err != nil {
return matchedTime, false, vterrors.Wrapf(err, "scanning mysqlbinlog output in ReadBinlogFilesTimestamps ")
if err := mysqlbinlogCmd.Wait(); err != nil {
return firstMatchedTime, lastMatchedTime, vterrors.Wrapf(err, "waiting on mysqlbinlog command in ReadBinlogFilesTimestamps")
}
return matchedTime, matchFound, nil
return firstMatchedTime, lastMatchedTime, nil
}

// ReadBinlogFilesTimestamps reads all given binlog files via `mysqlbinlog` command and returns the first and last found transaction timestamps
Expand Down Expand Up @@ -1402,31 +1403,60 @@ func (mysqld *Mysqld) ReadBinlogFilesTimestamps(ctx context.Context, req *mysqlc
return nil, err
}

lastMatchedTimeMap := map[string]time.Time{} // a simple cache to avoid rescanning same files. Key=binlog file name

resp := &mysqlctlpb.ReadBinlogFilesTimestampsResponse{}
// Find first timestamp
for _, binlogFile := range req.BinlogFileNames {
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return nil, err
}
if found {
resp.FirstTimestamp = protoutil.TimeToProto(t)
err = func() error {
for _, binlogFile := range req.BinlogFileNames {
firstMatchedTime, lastMatchedTime, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, true)
if err != nil {
return vterrors.Wrapf(err, "while scanning for first binlog timestamp in %v", binlogFile)
}
if !lastMatchedTime.IsZero() {
// cache result
lastMatchedTimeMap[binlogFile] = lastMatchedTime
}
if firstMatchedTime.IsZero() {
// Timestamp not found in this file.
continue
}
resp.FirstTimestamp = protoutil.TimeToProto(firstMatchedTime)
resp.FirstTimestampBinlog = binlogFile
break
return nil // early break
}
return nil
}()
if err != nil {
return resp, err
}
// Find last timestamp
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]
t, found, err := mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return nil, err
}
if found {
resp.LastTimestamp = protoutil.TimeToProto(t)
err = func() error {
for i := len(req.BinlogFileNames) - 1; i >= 0; i-- {
binlogFile := req.BinlogFileNames[i]

// See if we have a cached value for this file. This is certainly be the situation if there's a single binary log file in req.BinlogFileNames,
// which means the first file and last file are the same, and so we have already parsed the file while searching for the first timestamp.
lastMatchedTime, ok := lastMatchedTimeMap[binlogFile]
if !ok {
var err error
_, lastMatchedTime, err = mysqld.scanBinlogTimestamp(dir, env, mysqlbinlogName, binlogFile, false)
if err != nil {
return vterrors.Wrapf(err, "while scanning for last binlog timestamp in %v", binlogFile)
}
}
if lastMatchedTime.IsZero() {
// Timestamp not found in this file.
continue
}
resp.LastTimestamp = protoutil.TimeToProto(lastMatchedTime)
resp.LastTimestampBinlog = binlogFile
break
return nil // early break
}
return nil
}()
if err != nil {
return resp, err
}
return resp, nil
}
Expand Down
10 changes: 2 additions & 8 deletions go/vt/mysqlctl/mysqld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func TestParseBinlogEntryTimestamp(t *testing.T) {
tcases := []struct {
name string
entry string
found bool
tm time.Time
}{
{
Expand All @@ -157,24 +156,19 @@ func TestParseBinlogEntryTimestamp(t *testing.T) {
{
name: "mysql80",
entry: "#230605 16:06:34 server id 22233 end_log_pos 1037 CRC32 0xa4707c5b GTID last_committed=4 sequence_number=5 rbr_only=no original_committed_timestamp=1685970394031366 immediate_commit_timestamp=1685970394032458 transaction_length=186",
found: true,
tm: time.UnixMicro(1685970394031366),
},
{
name: "mysql57",
entry: "#230608 13:14:31 server id 484362839 end_log_pos 259 CRC32 0xc07510d0 GTID last_committed=0 sequence_number=1 rbr_only=yes",
found: true,
tm: time.Date(2023, time.June, 8, 13, 14, 31, 0, time.UTC),
},
}
for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {
found, tm, err := parseBinlogEntryTimestamp(tcase.entry)
tm, err := parseBinlogEntryTimestamp(tcase.entry)
assert.NoError(t, err)
assert.Equal(t, tcase.found, found)
if tcase.found {
assert.Equal(t, tcase.tm, tm)
}
assert.Equal(t, tcase.tm, tm)
})
}
}

0 comments on commit fb9e39f

Please sign in to comment.