Skip to content

Commit

Permalink
fix(pinning): check before writing root address (#4558)
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol authored Jan 30, 2024
1 parent 36855af commit cffd1cc
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/storer/internal/pinning/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
ErrInvalidPinCollectionItemSize = errInvalidPinCollectionSize
ErrPutterAlreadyClosed = errPutterAlreadyClosed
ErrCollectionRootAddressIsZero = errCollectionRootAddressIsZero
ErrDuplicatePinCollection = errDuplicatePinCollection
)

var NewUUID = newUUID
Expand Down
16 changes: 15 additions & 1 deletion pkg/storer/internal/pinning/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var (
// errCollectionRootAddressIsZero is returned if the putter is closed with a zero
// swarm.Address. Root reference has to be set.
errCollectionRootAddressIsZero = errors.New("pin store: collection root address is zero")
// errDuplicatePinCollection is returned when attempted to pin the same file repeatedly
errDuplicatePinCollection = errors.New("pin store: duplicate pin collection")
)

// creates a new UUID and returns it as a byte slice
Expand Down Expand Up @@ -264,9 +266,21 @@ func (c *collectionPutter) Close(st internal.Storage, writer storage.Writer, roo
c.mtx.Lock()
defer c.mtx.Unlock()

collection := &pinCollectionItem{Addr: root}
has, err := st.IndexStore().Has(collection)

if err != nil {
return fmt.Errorf("pin store: check previous root: %w", err)
}

if has {
// trigger the Cleanup
return errDuplicatePinCollection
}

// Save the root pin reference.
c.collection.Addr = root
err := writer.Put(c.collection)
err = writer.Put(c.collection)
if err != nil {
return fmt.Errorf("pin store: failed updating collection: %w", err)
}
Expand Down
24 changes: 23 additions & 1 deletion pkg/storer/internal/pinning/pinning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,29 @@ func TestPinStore(t *testing.T) {
}
})

t.Run("duplicate collection", func(t *testing.T) {
root := chunktest.GenerateTestRandomChunk()
putter, err := pinstore.NewCollection(st)
if err != nil {
t.Fatal(err)
}

err = putter.Put(context.Background(), st, st.IndexStore(), root)
if err != nil {
t.Fatal(err)
}

err = putter.Close(st, st.IndexStore(), root.Address())
if err != nil {
t.Fatal(err)
}

err = putter.Close(st, st.IndexStore(), root.Address())
if err == nil || !errors.Is(err, pinstore.ErrDuplicatePinCollection) {
t.Fatalf("unexpected error during CLose, want: %v, got: %v", pinstore.ErrDuplicatePinCollection, err)
}
})

t.Run("zero address close", func(t *testing.T) {
root := chunktest.GenerateTestRandomChunk()
putter, err := pinstore.NewCollection(st)
Expand All @@ -292,7 +315,6 @@ func TestPinStore(t *testing.T) {
if !errors.Is(err, pinstore.ErrCollectionRootAddressIsZero) {
t.Fatalf("unexpected error on close, want: %v, got: %v", pinstore.ErrCollectionRootAddressIsZero, err)
}

})
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/storer/pinstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,51 @@ func testPinStore(t *testing.T, newStorer func() (*storer.DB, error)) {
verifyPinCollection(t, lstore.Repo(), testCases[0].chunks[0], testCases[0].chunks, true)
})
})

t.Run("duplicate parallel upload does not leave orphaned chunks", func(t *testing.T) {
chunks := chunktesting.GenerateTestRandomChunks(4)

session1, err := lstore.NewCollection(context.TODO())
if err != nil {
t.Fatalf("NewCollection(...): unexpected error: %v", err)
}

session2, err := lstore.NewCollection(context.TODO())
if err != nil {
t.Fatalf("NewCollection2(...): unexpected error: %v", err)
}

for _, ch := range chunks {
err := session2.Put(context.TODO(), ch)
if err != nil {
t.Fatalf("session2.Put(...): unexpected error: %v", err)
t.Fatal(err)
}

err = session1.Put(context.TODO(), ch)
if err != nil {
t.Fatalf("session1.Put(...): unexpected error: %v", err)
t.Fatal(err)
}
}

err = session1.Done(chunks[0].Address())
if err != nil {
t.Fatalf("session1.Done(...): unexpected error: %v", err)
}

err = session2.Done(chunks[0].Address())
if err == nil {
t.Fatalf("session2.Done(...): expected error, got nil")
}

if err := session2.Cleanup(); err != nil {
t.Fatalf("session2.Done(...): unexpected error: %v", err)
}

verifyPinCollection(t, lstore.Repo(), chunks[0], chunks, true)
verifyChunkRefCount(t, lstore.Repo(), chunks)
})
}

func TestPinStore(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/storer/storer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
"github.com/ethersphere/bee/pkg/storage/migration"
"github.com/ethersphere/bee/pkg/storer"
cs "github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning"
"github.com/ethersphere/bee/pkg/storer/internal/upload"
localmigration "github.com/ethersphere/bee/pkg/storer/migration"
Expand All @@ -44,7 +45,26 @@ func verifyChunks(
t.Fatalf("unexpected chunk has state: want %t have %t", has, hasFound)
}
}
}

func verifyChunkRefCount(
t *testing.T,
repo storage.Repository,
chunks []swarm.Chunk,
) {
t.Helper()

for _, ch := range chunks {
_ = repo.IndexStore().Iterate(storage.Query{
Factory: func() storage.Item { return new(cs.RetrievalIndexItem) },
}, func(r storage.Result) (bool, error) {
entry := r.Entry.(*cs.RetrievalIndexItem)
if entry.Address.Equal(ch.Address()) && entry.RefCnt != 1 {
t.Errorf("chunk %s has refCnt=%d", ch.Address(), entry.RefCnt)
}
return false, nil
})
}
}

func verifySessionInfo(
Expand Down

0 comments on commit cffd1cc

Please sign in to comment.