From c34b9a506ce070c4e88d3cf38c433aab055ad7e5 Mon Sep 17 00:00:00 2001 From: notanatol Date: Sat, 27 Jan 2024 16:10:03 -0600 Subject: [PATCH] fix: add test --- pkg/storer/internal/pinning/pinning.go | 3 +- pkg/storer/pinstore_test.go | 45 ++++++++++++++++++++++++++ pkg/storer/storer_test.go | 20 ++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/pkg/storer/internal/pinning/pinning.go b/pkg/storer/internal/pinning/pinning.go index fbb72833b25..2a4c4e856a7 100644 --- a/pkg/storer/internal/pinning/pinning.go +++ b/pkg/storer/internal/pinning/pinning.go @@ -272,7 +272,8 @@ func (c *collectionPutter) Close(st internal.Storage, writer storage.Writer, roo } if has { - return fmt.Errorf("pin store: duplicate collection: %w", err) // will trigger the Cleanup + // trigger the Cleanup + return fmt.Errorf("pin store: duplicate collection") } // Save the root pin reference. diff --git a/pkg/storer/pinstore_test.go b/pkg/storer/pinstore_test.go index 676a7bd6260..43676f6ce61 100644 --- a/pkg/storer/pinstore_test.go +++ b/pkg/storer/pinstore_test.go @@ -127,6 +127,51 @@ func testPinStore(t *testing.T, newStorer func() (*storer.DB, error)) { verifyPinCollection(t, lstore.Repo(), testCases[0].chunks[0], testCases[0].chunks, true) }) }) + + t.Run("duplicate parallel upload does not leave orphaned chunks", func(t *testing.T) { + chunks := chunktesting.GenerateTestRandomChunks(4) + + session1, err := lstore.NewCollection(context.TODO()) + if err != nil { + t.Fatalf("NewCollection(...): unexpected error: %v", err) + } + + session2, err := lstore.NewCollection(context.TODO()) + if err != nil { + t.Fatalf("NewCollection2(...): unexpected error: %v", err) + } + + for _, ch := range chunks { + err := session2.Put(context.TODO(), ch) + if err != nil { + t.Fatalf("session2.Put(...): unexpected error: %v", err) + t.Fatal(err) + } + + err = session1.Put(context.TODO(), ch) + if err != nil { + t.Fatalf("session1.Put(...): unexpected error: %v", err) + t.Fatal(err) + } + } + + err = session1.Done(chunks[0].Address()) + if err != nil { + t.Fatalf("session1.Done(...): unexpected error: %v", err) + } + + err = session2.Done(chunks[0].Address()) + if err == nil { + t.Fatalf("session2.Done(...): expected error, got nil") + } + + if err := session2.Cleanup(); err != nil { + t.Fatalf("session2.Done(...): unexpected error: %v", err) + } + + verifyPinCollection(t, lstore.Repo(), chunks[0], chunks, true) + verifyChunkRefCount(t, lstore.Repo(), chunks) + }) } func TestPinStore(t *testing.T) { diff --git a/pkg/storer/storer_test.go b/pkg/storer/storer_test.go index ccc491912b6..c894b3bba34 100644 --- a/pkg/storer/storer_test.go +++ b/pkg/storer/storer_test.go @@ -18,6 +18,7 @@ import ( "github.com/ethersphere/bee/pkg/storage/inmemchunkstore" "github.com/ethersphere/bee/pkg/storage/migration" "github.com/ethersphere/bee/pkg/storer" + cs "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" "github.com/ethersphere/bee/pkg/storer/internal/upload" localmigration "github.com/ethersphere/bee/pkg/storer/migration" @@ -44,7 +45,26 @@ func verifyChunks( t.Fatalf("unexpected chunk has state: want %t have %t", has, hasFound) } } +} +func verifyChunkRefCount( + t *testing.T, + repo storage.Repository, + chunks []swarm.Chunk, +) { + t.Helper() + + for _, ch := range chunks { + repo.IndexStore().Iterate(storage.Query{ + Factory: func() storage.Item { return new(cs.RetrievalIndexItem) }, + }, func(r storage.Result) (bool, error) { + entry := r.Entry.(*cs.RetrievalIndexItem) + if entry.Address.Equal(ch.Address()) && entry.RefCnt != 1 { + t.Errorf("chunk %s has refCnt=%d", ch.Address(), entry.RefCnt) + } + return false, nil + }) + } } func verifySessionInfo(