Skip to content

Commit

Permalink
fix a lock issue for windows
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Jul 1, 2024
1 parent b1f905d commit 385b70f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 35 deletions.
9 changes: 6 additions & 3 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,12 @@ func saveCacheMapTo(w io.Writer, m map[string]cacheValue) error {
return nil
}

func lockCachePersistentFile(lockFilePath string, ex bool, handleError func(error)) (context.CancelFunc, error) {
lockFile := flock.New(lockFilePath)
var err error
func lockCachePersistentFile(cacheFilePath string, ex bool, handleError func(error)) (context.CancelFunc, error) {
var (
lockFilePath = cacheFilePath + ".lock"
lockFile = flock.New(lockFilePath)
err error
)
if ex {
err = lockFile.Lock()
} else {
Expand Down
85 changes: 53 additions & 32 deletions storagev2/internal/uplog/uplog_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func getUplogFileBufferPath(current bool) string {
uplogFileBufferPath = filepath.Join(uplogFileBufferDirPath, UPLOG_FILE_BUFFER_NAME)
}
if !current {
uplogFileBufferPath = uplogFileBufferPath + "." + time.Now().UTC().Format(time.RFC3339Nano)
uplogFileBufferPath = uplogFileBufferPath + "." + time.Now().UTC().Format("20060102150405.999999999")
}
return uplogFileBufferPath
}
Expand All @@ -129,13 +129,13 @@ func getUplogFileDirectoryLock() *flock.Flock {
}

func FlushBuffer() error {
return withUploadFileBuffer(func(io.Writer) (bool, error) {
return true, nil
return withUploadFileBuffer(func(w io.WriteCloser) error {
return w.Close()
})
}

func writeMemoryBufferToFileBuffer(data []byte) (n int, err error) {
if err = withUploadFileBuffer(func(w io.Writer) (shouldClose bool, e error) {
if err = withUploadFileBuffer(func(w io.WriteCloser) (e error) {
for len(data) > 0 {
n, e = w.Write(data)
if e != nil {
Expand Down Expand Up @@ -176,16 +176,14 @@ func tryToArchiveFileBuffer(force bool) {
}
defer locker.Close()

if err = withUploadFileBuffer(func(io.Writer) (shouldClose bool, renameErr error) {
if err = withUploadFileBuffer(func(w io.WriteCloser) error {
currentFilePath := getUplogFileBufferPath(true)
if fileInfo, fileInfoErr := os.Stat(currentFilePath); fileInfoErr == nil && fileInfo.Size() == 0 {
return
return nil
}
archivedFilePath := getUplogFileBufferPath(false)
if renameErr = os.Rename(currentFilePath, archivedFilePath); renameErr == nil {
shouldClose = true
}
return
w.Close()
return os.Rename(currentFilePath, archivedFilePath)
}); err != nil {
return
}
Expand All @@ -194,9 +192,17 @@ func tryToArchiveFileBuffer(force bool) {
go uploadAllClosedFileBuffers()
}

func withUploadFileBuffer(fn func(io.Writer) (bool, error)) (err error) {
var shouldClose bool
type uplogFileBufferWrapper struct{}

func (uplogFileBufferWrapper) Write(p []byte) (int, error) {
return uplogFileBuffer.Write(p)
}

func (uplogFileBufferWrapper) Close() error {
return closeUplogFileBufferWithoutLock()
}

func withUploadFileBuffer(fn func(io.WriteCloser) error) (err error) {
uplogFileBufferLock.Lock()
defer uplogFileBufferLock.Unlock()

Expand All @@ -216,25 +222,34 @@ func withUploadFileBuffer(fn func(io.Writer) (bool, error)) (err error) {
} else if uplogFileBuffer, err = os.OpenFile(uplogFileBufferPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644); err != nil {
return
}
uplogFileBufferFileLocker = flock.New(uplogFileBufferPath)
uplogFileBufferLockPath := uplogFileBufferPath + ".lock"
uplogFileBufferFileLocker = flock.New(uplogFileBufferLockPath)
}

if err = uplogFileBufferFileLocker.Lock(); err != nil {
return
}
shouldClose, err = fn(uplogFileBuffer)
_ = uplogFileBufferFileLocker.Unlock()
if shouldClose {
closeUplogFileBufferWithoutLock()
err = fn(uplogFileBufferWrapper{})
if uplogFileBufferFileLocker != nil && uplogFileBufferFileLocker.Locked() {
_ = uplogFileBufferFileLocker.Unlock()
}
return
}

func closeUplogFileBufferWithoutLock() {
uplogFileBuffer.Close()
uplogFileBuffer = nil
uplogFileBufferFileLocker.Close()
uplogFileBufferFileLocker = nil
func closeUplogFileBufferWithoutLock() error {
var err1, err2 error
if uplogFileBuffer != nil {
err1 = uplogFileBuffer.Close()
uplogFileBuffer = nil
}
if uplogFileBufferFileLocker != nil {
err2 = uplogFileBufferFileLocker.Close()
uplogFileBufferFileLocker = nil
}
if err1 != nil {
return err1
}
return err2
}

func SetWriteFileBufferInterval(d time.Duration) {
Expand Down Expand Up @@ -295,18 +310,25 @@ func (r *multipleFileReader) readAllAsync() {
defer r.w.CloseWithError(io.EOF)
defer r.compressor.Close()
for _, path := range r.paths {
file, err := os.Open(path)
if err != nil {
r.setError(err)
return
}
if _, err = io.Copy(r.compressor, file); err != nil && err != io.EOF {
if err := r.readAllForPathAsync(path); err != nil {
r.setError(err)
return
}
}
}

func (r *multipleFileReader) readAllForPathAsync(path string) error {
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
if _, err = io.Copy(r.compressor, file); err != nil && err != io.EOF {
return err
}
return nil
}

func (r *multipleFileReader) getError() error {
r.errLock.Lock()
defer r.errLock.Unlock()
Expand Down Expand Up @@ -340,10 +362,9 @@ func getArchivedUplogFileBufferPaths(dirPath string) ([]string, error) {

archivedPaths := make([]string, 0, len(dirEntries))
for _, dirEntry := range dirEntries {
if !dirEntry.Mode().IsRegular() {
continue
}
if !strings.HasPrefix(dirEntry.Name(), UPLOG_FILE_BUFFER_NAME+".") {
if !dirEntry.Mode().IsRegular() ||
!strings.HasPrefix(dirEntry.Name(), UPLOG_FILE_BUFFER_NAME+".") ||
strings.HasSuffix(dirEntry.Name(), ".lock") {
continue
}
archivedPaths = append(archivedPaths, filepath.Join(dirPath, dirEntry.Name()))
Expand Down

0 comments on commit 385b70f

Please sign in to comment.