From 42d29e3710b187f9ba8e1cf0b5a938d03dbe0587 Mon Sep 17 00:00:00 2001 From: Stefan VanBuren Date: Thu, 31 Oct 2024 12:28:28 -0400 Subject: [PATCH 1/3] Drop go.uber.org/atomic in favor of sync/atomic The primitives in sync/atomic are not as good as those in uber/atomic; happy to drop this. --- go.mod | 1 - go.sum | 2 -- private/buf/bufcurl/verbose_transport.go | 2 +- private/pkg/storage/limit.go | 8 +++--- private/pkg/storage/storageos/bucket.go | 25 ++++++++++++++++--- .../storage/storagetesting/storagetesting.go | 2 +- private/pkg/thread/thread_test.go | 6 ++--- 7 files changed, 30 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index be7ea20de3..bee7a1ab02 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,6 @@ require ( github.com/tetratelabs/wazero v1.8.1 go.lsp.dev/jsonrpc2 v0.10.0 go.lsp.dev/protocol v0.12.0 - go.uber.org/atomic v1.11.0 go.uber.org/zap v1.27.0 go.uber.org/zap/exp v0.3.0 golang.org/x/crypto v0.28.0 diff --git a/go.sum b/go.sum index c250b660c2..b63176a178 100644 --- a/go.sum +++ b/go.sum @@ -286,8 +286,6 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= -go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= -go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= diff --git a/private/buf/bufcurl/verbose_transport.go b/private/buf/bufcurl/verbose_transport.go index ecfbe0e427..72e60a765b 100644 --- a/private/buf/bufcurl/verbose_transport.go +++ b/private/buf/bufcurl/verbose_transport.go @@ -24,10 +24,10 @@ import ( "runtime" "sort" "strings" + "sync/atomic" "connectrpc.com/connect" "github.com/bufbuild/buf/private/pkg/verbose" - "go.uber.org/atomic" ) type skipUploadFinishedMessageKey struct{} diff --git a/private/pkg/storage/limit.go b/private/pkg/storage/limit.go index f34669421c..fb53ec5540 100644 --- a/private/pkg/storage/limit.go +++ b/private/pkg/storage/limit.go @@ -16,8 +16,7 @@ package storage import ( "context" - - "go.uber.org/atomic" + "sync/atomic" ) // LimitWriteBucket returns a [WriteBucket] that writes to [writeBucket] @@ -42,7 +41,6 @@ type limitedWriteBucket struct { func newLimitedWriteBucket(bucket WriteBucket, limit int64) *limitedWriteBucket { return &limitedWriteBucket{ WriteBucket: bucket, - currentSize: atomic.NewInt64(0), limit: limit, } } @@ -78,7 +76,7 @@ func (o *limitedWriteObjectCloser) Write(p []byte) (int, error) { writeSize := int64(len(p)) newBucketSize := o.bucketSize.Add(writeSize) if newBucketSize > o.limit { - o.bucketSize.Sub(writeSize) + o.bucketSize.Add(-writeSize) return 0, &errWriteLimitReached{ Limit: o.limit, ExceedingBy: newBucketSize - o.limit, @@ -86,7 +84,7 @@ func (o *limitedWriteObjectCloser) Write(p []byte) (int, error) { } writtenSize, err := o.WriteObjectCloser.Write(p) if int64(writtenSize) < writeSize { - o.bucketSize.Sub(writeSize - int64(writtenSize)) + o.bucketSize.Add(-(writeSize - int64(writtenSize))) } return writtenSize, err } diff --git a/private/pkg/storage/storageos/bucket.go b/private/pkg/storage/storageos/bucket.go index 259f99fba8..d5a26f970b 100644 --- a/private/pkg/storage/storageos/bucket.go +++ b/private/pkg/storage/storageos/bucket.go @@ -20,12 +20,12 @@ import ( "io/fs" "os" "path/filepath" + "sync/atomic" "github.com/bufbuild/buf/private/pkg/filepathext" "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/storage/storageutil" - "go.uber.org/atomic" ) // errNotDir is the error returned if a path is not a directory. @@ -359,7 +359,7 @@ type writeObjectCloser struct { path string // writeErr contains the first non-nil error caught by a call to Write. // This is returned in Close for atomic writes to prevent writing an incomplete file. - writeErr atomic.Error + writeErr onceError } func newWriteObjectCloser( @@ -375,7 +375,7 @@ func newWriteObjectCloser( func (w *writeObjectCloser) Write(p []byte) (int, error) { n, err := w.file.Write(p) if err != nil { - w.writeErr.CompareAndSwap(nil, err) + w.writeErr.Store(err) } return n, toStorageError(err) } @@ -404,6 +404,25 @@ func (w *writeObjectCloser) Close() error { return err } +// onceError is an object that will only store an error once. +type onceError struct { + err atomic.Value +} + +// Store stores the err. +func (e *onceError) Store(err error) { + e.err.CompareAndSwap(nil, err) +} + +// Load loads the stored error. +func (e *onceError) Load() error { + err, ok := e.err.Load().(error) + if !ok { + return nil + } + return err +} + // newErrNotDir returns a new Error for a path not being a directory. func newErrNotDir(path string) *normalpath.Error { return normalpath.NewError(path, errNotDir) diff --git a/private/pkg/storage/storagetesting/storagetesting.go b/private/pkg/storage/storagetesting/storagetesting.go index 15d4d291d5..67438815cd 100644 --- a/private/pkg/storage/storagetesting/storagetesting.go +++ b/private/pkg/storage/storagetesting/storagetesting.go @@ -28,6 +28,7 @@ import ( "sort" "strconv" "sync" + "sync/atomic" "testing" "github.com/bufbuild/buf/private/pkg/slicesext" @@ -38,7 +39,6 @@ import ( "github.com/bufbuild/buf/private/pkg/tmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/atomic" ) const ( diff --git a/private/pkg/thread/thread_test.go b/private/pkg/thread/thread_test.go index 1d35f23f9d..4ef4dedc9e 100644 --- a/private/pkg/thread/thread_test.go +++ b/private/pkg/thread/thread_test.go @@ -16,10 +16,10 @@ package thread import ( "context" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" - "go.uber.org/atomic" ) func TestParallelizeWithImmediateCancellation(t *testing.T) { @@ -35,7 +35,7 @@ func TestParallelizeWithImmediateCancellation(t *testing.T) { ) for i := 0; i < jobsToExecute; i++ { jobs = append(jobs, func(_ context.Context) error { - executed.Inc() + executed.Add(1) return nil }) } @@ -49,7 +49,7 @@ func TestParallelizeWithImmediateCancellation(t *testing.T) { var jobs []func(context.Context) error for i := 0; i < 10; i++ { jobs = append(jobs, func(_ context.Context) error { - executed.Inc() + executed.Add(1) return nil }) } From 2515c4261a2883f8d3f1843cc05778c81f4c9213 Mon Sep 17 00:00:00 2001 From: Stefan VanBuren Date: Fri, 1 Nov 2024 15:12:21 -0400 Subject: [PATCH 2/3] Add back pointer Oops! --- private/pkg/storage/limit.go | 1 + 1 file changed, 1 insertion(+) diff --git a/private/pkg/storage/limit.go b/private/pkg/storage/limit.go index fb53ec5540..badd79caef 100644 --- a/private/pkg/storage/limit.go +++ b/private/pkg/storage/limit.go @@ -41,6 +41,7 @@ type limitedWriteBucket struct { func newLimitedWriteBucket(bucket WriteBucket, limit int64) *limitedWriteBucket { return &limitedWriteBucket{ WriteBucket: bucket, + currentSize: &atomic.Int64{}, limit: limit, } } From 1a41b945067818b0536a484cef27290843a9fe2d Mon Sep 17 00:00:00 2001 From: Stefan VanBuren Date: Fri, 1 Nov 2024 15:44:12 -0400 Subject: [PATCH 3/3] Be more defensive above checking the atomicValue type --- private/pkg/storage/storageos/bucket.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/private/pkg/storage/storageos/bucket.go b/private/pkg/storage/storageos/bucket.go index d5a26f970b..64746a4848 100644 --- a/private/pkg/storage/storageos/bucket.go +++ b/private/pkg/storage/storageos/bucket.go @@ -26,6 +26,7 @@ import ( "github.com/bufbuild/buf/private/pkg/normalpath" "github.com/bufbuild/buf/private/pkg/storage" "github.com/bufbuild/buf/private/pkg/storage/storageutil" + "github.com/bufbuild/buf/private/pkg/syserror" ) // errNotDir is the error returned if a path is not a directory. @@ -416,10 +417,14 @@ func (e *onceError) Store(err error) { // Load loads the stored error. func (e *onceError) Load() error { - err, ok := e.err.Load().(error) - if !ok { + atomicValue := e.err.Load() + if atomicValue == nil { return nil } + err, ok := atomicValue.(error) + if !ok { + return syserror.Newf("expected error but got %T", atomicValue) + } return err }