diff --git a/pkg/storer/internal/pinning/export_test.go b/pkg/storer/internal/pinning/export_test.go index f80bbd3c945..79e5864dfba 100644 --- a/pkg/storer/internal/pinning/export_test.go +++ b/pkg/storer/internal/pinning/export_test.go @@ -23,6 +23,7 @@ var ( ErrInvalidPinCollectionItemSize = errInvalidPinCollectionSize ErrPutterAlreadyClosed = errPutterAlreadyClosed ErrCollectionRootAddressIsZero = errCollectionRootAddressIsZero + ErrDuplicatePinCollection = errDuplicatePinCollection ) var NewUUID = newUUID diff --git a/pkg/storer/internal/pinning/pinning.go b/pkg/storer/internal/pinning/pinning.go index 03c47667180..8fb25ba92ad 100644 --- a/pkg/storer/internal/pinning/pinning.go +++ b/pkg/storer/internal/pinning/pinning.go @@ -40,6 +40,8 @@ var ( // errCollectionRootAddressIsZero is returned if the putter is closed with a zero // swarm.Address. Root reference has to be set. errCollectionRootAddressIsZero = errors.New("pin store: collection root address is zero") + // errDuplicatePinCollection is returned when attempted to pin the same file repeatedly + errDuplicatePinCollection = errors.New("pin store: duplicate pin collection") ) // creates a new UUID and returns it as a byte slice @@ -264,9 +266,21 @@ func (c *collectionPutter) Close(st internal.Storage, writer storage.Writer, roo c.mtx.Lock() defer c.mtx.Unlock() + collection := &pinCollectionItem{Addr: root} + has, err := st.IndexStore().Has(collection) + + if err != nil { + return fmt.Errorf("pin store: check previous root: %w", err) + } + + if has { + // trigger the Cleanup + return errDuplicatePinCollection + } + // Save the root pin reference. c.collection.Addr = root - err := writer.Put(c.collection) + err = writer.Put(c.collection) if err != nil { return fmt.Errorf("pin store: failed updating collection: %w", err) } diff --git a/pkg/storer/internal/pinning/pinning_test.go b/pkg/storer/internal/pinning/pinning_test.go index 096b69b0018..017b135f6f8 100644 --- a/pkg/storer/internal/pinning/pinning_test.go +++ b/pkg/storer/internal/pinning/pinning_test.go @@ -276,6 +276,29 @@ func TestPinStore(t *testing.T) { } }) + t.Run("duplicate collection", func(t *testing.T) { + root := chunktest.GenerateTestRandomChunk() + putter, err := pinstore.NewCollection(st) + if err != nil { + t.Fatal(err) + } + + err = putter.Put(context.Background(), st, st.IndexStore(), root) + if err != nil { + t.Fatal(err) + } + + err = putter.Close(st, st.IndexStore(), root.Address()) + if err != nil { + t.Fatal(err) + } + + err = putter.Close(st, st.IndexStore(), root.Address()) + if err == nil || !errors.Is(err, pinstore.ErrDuplicatePinCollection) { + t.Fatalf("unexpected error during CLose, want: %v, got: %v", pinstore.ErrDuplicatePinCollection, err) + } + }) + t.Run("zero address close", func(t *testing.T) { root := chunktest.GenerateTestRandomChunk() putter, err := pinstore.NewCollection(st) @@ -292,7 +315,6 @@ func TestPinStore(t *testing.T) { if !errors.Is(err, pinstore.ErrCollectionRootAddressIsZero) { t.Fatalf("unexpected error on close, want: %v, got: %v", pinstore.ErrCollectionRootAddressIsZero, err) } - }) } diff --git a/pkg/storer/pinstore_test.go b/pkg/storer/pinstore_test.go index 676a7bd6260..43676f6ce61 100644 --- a/pkg/storer/pinstore_test.go +++ b/pkg/storer/pinstore_test.go @@ -127,6 +127,51 @@ func testPinStore(t *testing.T, newStorer func() (*storer.DB, error)) { verifyPinCollection(t, lstore.Repo(), testCases[0].chunks[0], testCases[0].chunks, true) }) }) + + t.Run("duplicate parallel upload does not leave orphaned chunks", func(t *testing.T) { + chunks := chunktesting.GenerateTestRandomChunks(4) + + session1, err := lstore.NewCollection(context.TODO()) + if err != nil { + t.Fatalf("NewCollection(...): unexpected error: %v", err) + } + + session2, err := lstore.NewCollection(context.TODO()) + if err != nil { + t.Fatalf("NewCollection2(...): unexpected error: %v", err) + } + + for _, ch := range chunks { + err := session2.Put(context.TODO(), ch) + if err != nil { + t.Fatalf("session2.Put(...): unexpected error: %v", err) + t.Fatal(err) + } + + err = session1.Put(context.TODO(), ch) + if err != nil { + t.Fatalf("session1.Put(...): unexpected error: %v", err) + t.Fatal(err) + } + } + + err = session1.Done(chunks[0].Address()) + if err != nil { + t.Fatalf("session1.Done(...): unexpected error: %v", err) + } + + err = session2.Done(chunks[0].Address()) + if err == nil { + t.Fatalf("session2.Done(...): expected error, got nil") + } + + if err := session2.Cleanup(); err != nil { + t.Fatalf("session2.Done(...): unexpected error: %v", err) + } + + verifyPinCollection(t, lstore.Repo(), chunks[0], chunks, true) + verifyChunkRefCount(t, lstore.Repo(), chunks) + }) } func TestPinStore(t *testing.T) { diff --git a/pkg/storer/storer_test.go b/pkg/storer/storer_test.go index ccc491912b6..ce2d7eb72cf 100644 --- a/pkg/storer/storer_test.go +++ b/pkg/storer/storer_test.go @@ -18,6 +18,7 @@ import ( "github.com/ethersphere/bee/pkg/storage/inmemchunkstore" "github.com/ethersphere/bee/pkg/storage/migration" "github.com/ethersphere/bee/pkg/storer" + cs "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" "github.com/ethersphere/bee/pkg/storer/internal/upload" localmigration "github.com/ethersphere/bee/pkg/storer/migration" @@ -44,7 +45,26 @@ func verifyChunks( t.Fatalf("unexpected chunk has state: want %t have %t", has, hasFound) } } +} +func verifyChunkRefCount( + t *testing.T, + repo storage.Repository, + chunks []swarm.Chunk, +) { + t.Helper() + + for _, ch := range chunks { + _ = repo.IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return new(cs.RetrievalIndexItem) }, + }, func(r storage.Result) (bool, error) { + entry := r.Entry.(*cs.RetrievalIndexItem) + if entry.Address.Equal(ch.Address()) && entry.RefCnt != 1 { + t.Errorf("chunk %s has refCnt=%d", ch.Address(), entry.RefCnt) + } + return false, nil + }) + } } func verifySessionInfo(