Skip to content

Commit

Permalink
Copy WAL frames through temp file to shadow
Browse files Browse the repository at this point in the history
This implementation replicates what the new behavior in the legacy
branch did using a temporary file instead of memory to buffer WAL
frames between commits.

Fixes excess memory use if the WAL includes massive commits due to
big inserts or migrations.
  • Loading branch information
hifi committed May 1, 2023
1 parent fdafec9 commit 9e2d067
Showing 1 changed file with 41 additions and 10 deletions.
51 changes: 41 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,16 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64
return 0, 0, fmt.Errorf("last checksum: %w", err)
}

// Write to a temporary shadow file.
tempFilename := filename + ".tmp"
defer os.Remove(tempFilename)

f, err := internal.CreateFile(tempFilename, db.fileInfo)
if err != nil {
return 0, 0, fmt.Errorf("create temp file: %w", err)
}
defer f.Close()

// Seek to correct position on real wal.
if _, err := r.Seek(origSize, io.SeekStart); err != nil {
return 0, 0, fmt.Errorf("real wal seek: %w", err)
Expand All @@ -1043,7 +1053,6 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64
// Read through WAL from last position to find the page of the last
// committed transaction.
frame := make([]byte, db.pageSize+WALFrameHeaderSize)
var buf bytes.Buffer
offset := origSize
lastCommitSize := origSize
for {
Expand Down Expand Up @@ -1073,32 +1082,54 @@ func (db *DB) copyToShadowWAL(filename string) (origWalSize int64, newSize int64
break
}

// Add page to the new size of the shadow WAL.
buf.Write(frame)
// Write page to temporary WAL file.
if _, err := f.Write(frame); err != nil {
return 0, 0, fmt.Errorf("write temp shadow wal: %w", err)
}

Tracef("%s: copy-shadow: ok %s offset=%d salt=%x %x", db.path, filename, offset, salt0, salt1)
offset += int64(len(frame))

// Flush to shadow WAL if commit record.
// Update new size if written frame was a commit record.
newDBSize := binary.BigEndian.Uint32(frame[4:])
if newDBSize != 0 {
if _, err := buf.WriteTo(w); err != nil {
return 0, 0, fmt.Errorf("write shadow wal: %w", err)
}
buf.Reset()
lastCommitSize = offset
}
}

// Sync & close.
// If no WAL writes found, exit.
if origSize == lastCommitSize {
return origWalSize, origSize, nil
}

walByteN := lastCommitSize - origSize

// Move to beginning of temporary file.
if _, err := f.Seek(0, io.SeekStart); err != nil {
return 0, 0, fmt.Errorf("temp file seek: %w", err)
}

// Copy from temporary file to shadow WAL.
if _, err := io.Copy(w, &io.LimitedReader{R: f, N: walByteN}); err != nil {
return 0, 0, fmt.Errorf("write shadow file: %w", err)
}

// Close & remove temporary file.
if err := f.Close(); err != nil {
return 0, 0, err
} else if err := os.Remove(tempFilename); err != nil {
return 0, 0, err
}

// Sync & close shadow WAL.
if err := w.Sync(); err != nil {
return 0, 0, err
} else if err := w.Close(); err != nil {
return 0, 0, err
}

// Track total number of bytes written to WAL.
db.totalWALBytesCounter.Add(float64(lastCommitSize - origSize))
db.totalWALBytesCounter.Add(float64(walByteN))

return origWalSize, lastCommitSize, nil
}
Expand Down

0 comments on commit 9e2d067

Please sign in to comment.