From b0836ff17189997e6123609362cbf82915f8e10e Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 30 Oct 2023 16:15:38 +0300 Subject: [PATCH] fix: uploadstore (#4434) --- pkg/storage/chunkstore.go | 5 - pkg/storer/internal/chunkstamp/chunkstamp.go | 9 +- .../internal/chunkstamp/chunkstamp_test.go | 2 +- pkg/storer/internal/upload/uploadstore.go | 47 ++---- .../internal/upload/uploadstore_test.go | 10 +- pkg/storer/reporter.go | 33 ---- pkg/storer/reporter_test.go | 145 ------------------ pkg/storer/subscribe_push.go | 17 +- pkg/storer/uploadstore.go | 27 +++- pkg/storer/uploadstore_test.go | 127 +++++++++++++++ 10 files changed, 186 insertions(+), 236 deletions(-) delete mode 100644 pkg/storer/reporter.go delete mode 100644 pkg/storer/reporter_test.go diff --git a/pkg/storage/chunkstore.go b/pkg/storage/chunkstore.go index 3c7fbcbe50a..aec30e6afc9 100644 --- a/pkg/storage/chunkstore.go +++ b/pkg/storage/chunkstore.go @@ -12,11 +12,6 @@ import ( "github.com/ethersphere/bee/pkg/swarm" ) -var ( - // ErrNoStampsForChunk is returned when chunk was found but there were no associated stamps. - ErrNoStampsForChunk = fmt.Errorf("no stamp for existing chunk: %w", ErrNotFound) -) - // Getter is the interface that wraps the basic Get method. type Getter interface { // Get a chunk by its swarm.Address. Returns the chunk associated with diff --git a/pkg/storer/internal/chunkstamp/chunkstamp.go b/pkg/storer/internal/chunkstamp/chunkstamp.go index 7c6d231f27c..4e8e8c91f22 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp.go @@ -149,7 +149,6 @@ func Load(s storage.Reader, namespace string, addr swarm.Address) (swarm.Stamp, func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, batchID []byte) (swarm.Stamp, error) { var stamp swarm.Stamp - cnt := 0 found := false err := s.Iterate( storage.Query{ @@ -161,7 +160,6 @@ func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, bat }, }, func(res storage.Result) (bool, error) { - cnt++ item := res.Entry.(*item) if batchID == nil || bytes.Equal(batchID, item.stamp.BatchID()) { stamp = item.stamp @@ -174,9 +172,6 @@ func LoadWithBatchID(s storage.Reader, namespace string, addr swarm.Address, bat if err != nil { return nil, err } - if cnt == 0 { - return nil, storage.ErrNoStampsForChunk - } if !found { return nil, fmt.Errorf("stamp not found for batchID %x: %w", batchID, storage.ErrNotFound) } @@ -234,7 +229,7 @@ func DeleteAll(s storage.Store, namespace string, addr swarm.Address) error { } // Delete removes a stamp associated with an chunk and batchID. -func Delete(s storage.Store, namespace string, addr swarm.Address, batchId []byte) error { +func Delete(s storage.Store, batch storage.Writer, namespace string, addr swarm.Address, batchId []byte) error { stamp, err := LoadWithBatchID(s, namespace, addr, batchId) if err != nil { if errors.Is(err, storage.ErrNotFound) { @@ -242,7 +237,7 @@ func Delete(s storage.Store, namespace string, addr swarm.Address, batchId []byt } return err } - return s.Delete(&item{ + return batch.Delete(&item{ namespace: []byte(namespace), address: addr, stamp: stamp, diff --git a/pkg/storer/internal/chunkstamp/chunkstamp_test.go b/pkg/storer/internal/chunkstamp/chunkstamp_test.go index fb54cfac6f0..a09b1c111ac 100644 --- a/pkg/storer/internal/chunkstamp/chunkstamp_test.go +++ b/pkg/storer/internal/chunkstamp/chunkstamp_test.go @@ -202,7 +202,7 @@ func TestStoreLoadDelete(t *testing.T) { t.Run("delete stored stamp", func(t *testing.T) { if i%2 == 0 { - if err := chunkstamp.Delete(ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID()); err != nil { + if err := chunkstamp.Delete(ts.IndexStore(), ts.IndexStore(), ns, chunk.Address(), chunk.Stamp().BatchID()); err != nil { t.Fatalf("Delete(...): unexpected error: %v", err) } } else { diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index b9784aa78c2..b3faa96fccf 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -10,7 +10,6 @@ import ( "errors" "fmt" "strconv" - "sync" "time" "github.com/ethersphere/bee/pkg/encryption" @@ -372,7 +371,6 @@ var ( type uploadPutter struct { tagID uint64 - mtx sync.Mutex split uint64 seen uint64 closed bool @@ -404,9 +402,6 @@ func NewPutter(s internal.Storage, tagID uint64) (internal.PutterCloserWithRefer // - pushItem entry to make it available for PushSubscriber // - add chunk to the chunkstore till it is synced func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer storage.Writer, chunk swarm.Chunk) error { - u.mtx.Lock() - defer u.mtx.Unlock() - if u.closed { return errPutterAlreadyClosed } @@ -482,9 +477,6 @@ func (u *uploadPutter) Put(ctx context.Context, s internal.Storage, writer stora // the tags. It will update the tag. This will be filled with the Split and Seen count // by the Putter. func (u *uploadPutter) Close(s internal.Storage, writer storage.Writer, addr swarm.Address) error { - u.mtx.Lock() - defer u.mtx.Unlock() - if u.closed { return nil } @@ -518,9 +510,6 @@ func (u *uploadPutter) Close(s internal.Storage, writer storage.Writer, addr swa } func (u *uploadPutter) Cleanup(tx internal.TxExecutor) error { - u.mtx.Lock() - defer u.mtx.Unlock() - if u.closed { return nil } @@ -645,19 +634,10 @@ func CleanupDirty(tx internal.TxExecutor) error { return nil } -type pushReporter struct { - s internal.Storage -} - -// NewPushReporter returns a new storage.PushReporter which can be used by the -// pusher component to report chunk state information. -func NewPushReporter(s internal.Storage) storage.PushReporter { - return &pushReporter{s: s} -} - // Report is the implementation of the PushReporter interface. -func (p *pushReporter) Report( +func Report( ctx context.Context, + s internal.Storage, chunk swarm.Chunk, state storage.ChunkState, ) error { @@ -666,7 +646,7 @@ func (p *pushReporter) Report( BatchID: chunk.Stamp().BatchID(), } - err := p.s.IndexStore().Get(ui) + err := s.IndexStore().Get(ui) if err != nil { return fmt.Errorf("failed to read uploadItem %s: %w", ui, err) } @@ -675,7 +655,7 @@ func (p *pushReporter) Report( TagID: ui.TagID, } - err = p.s.IndexStore().Get(ti) + err = s.IndexStore().Get(ti) if err != nil { return fmt.Errorf("failed getting tag: %w", err) } @@ -693,13 +673,18 @@ func (p *pushReporter) Report( break } - err = p.s.IndexStore().Put(ti) + batch, err := s.IndexStore().Batch(ctx) + if err != nil { + return err + } + + err = batch.Put(ti) if err != nil { return fmt.Errorf("failed updating tag: %w", err) } if state == storage.ChunkSent { - return nil + return batch.Commit() } // Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as @@ -711,28 +696,28 @@ func (p *pushReporter) Report( BatchID: chunk.Stamp().BatchID(), } - err = p.s.IndexStore().Delete(pi) + err = batch.Delete(pi) if err != nil { return fmt.Errorf("failed deleting pushItem %s: %w", pi, err) } - err = chunkstamp.Delete(p.s.IndexStore(), chunkStampNamespace, pi.Address, pi.BatchID) + err = chunkstamp.Delete(s.IndexStore(), batch, chunkStampNamespace, pi.Address, pi.BatchID) if err != nil { return fmt.Errorf("failed deleting chunk stamp %x: %w", pi.BatchID, err) } - err = p.s.ChunkStore().Delete(ctx, chunk.Address()) + err = s.ChunkStore().Delete(ctx, chunk.Address()) if err != nil { return fmt.Errorf("failed deleting chunk %s: %w", chunk.Address(), err) } ui.Synced = now().UnixNano() - err = p.s.IndexStore().Put(ui) + err = batch.Put(ui) if err != nil { return fmt.Errorf("failed updating uploadItem %s: %w", ui, err) } - return nil + return batch.Commit() } var ( diff --git a/pkg/storer/internal/upload/uploadstore_test.go b/pkg/storer/internal/upload/uploadstore_test.go index c7f3ff4874b..569b8c7999e 100644 --- a/pkg/storer/internal/upload/uploadstore_test.go +++ b/pkg/storer/internal/upload/uploadstore_test.go @@ -636,8 +636,6 @@ func TestChunkReporter(t *testing.T) { t.Fatalf("failed creating putter: %v", err) } - reporter := upload.NewPushReporter(ts) - for idx, chunk := range chunktest.GenerateTestRandomChunks(10) { t.Run(fmt.Sprintf("chunk %s", chunk.Address()), func(t *testing.T) { err := putter.Put(context.Background(), ts, ts.IndexStore(), chunk) @@ -646,7 +644,7 @@ func TestChunkReporter(t *testing.T) { } t.Run("mark sent", func(t *testing.T) { - err := reporter.Report(context.Background(), chunk, storage.ChunkSent) + err := upload.Report(context.Background(), ts, chunk, storage.ChunkSent) if err != nil { t.Fatalf("Report(...): unexpected error: %v", err) } @@ -654,7 +652,7 @@ func TestChunkReporter(t *testing.T) { if idx < 4 { t.Run("mark stored", func(t *testing.T) { - err := reporter.Report(context.Background(), chunk, storage.ChunkStored) + err := upload.Report(context.Background(), ts, chunk, storage.ChunkStored) if err != nil { t.Fatalf("Report(...): unexpected error: %v", err) } @@ -663,7 +661,7 @@ func TestChunkReporter(t *testing.T) { if idx >= 4 && idx < 8 { t.Run("mark synced", func(t *testing.T) { - err := reporter.Report(context.Background(), chunk, storage.ChunkSynced) + err := upload.Report(context.Background(), ts, chunk, storage.ChunkSynced) if err != nil { t.Fatalf("Report(...): unexpected error: %v", err) } @@ -672,7 +670,7 @@ func TestChunkReporter(t *testing.T) { if idx >= 8 { t.Run("mark could not sync", func(t *testing.T) { - err := reporter.Report(context.Background(), chunk, storage.ChunkCouldNotSync) + err := upload.Report(context.Background(), ts, chunk, storage.ChunkCouldNotSync) if err != nil { t.Fatalf("Report(...): unexpected error: %v", err) } diff --git a/pkg/storer/reporter.go b/pkg/storer/reporter.go deleted file mode 100644 index a7a8984682d..00000000000 --- a/pkg/storer/reporter.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storer - -import ( - "context" - "errors" - "fmt" - - storage "github.com/ethersphere/bee/pkg/storage" - "github.com/ethersphere/bee/pkg/storer/internal/upload" - "github.com/ethersphere/bee/pkg/swarm" -) - -// Report implements the storage.PushReporter by wrapping the internal reporter -// with a transaction. -func (db *DB) Report(ctx context.Context, chunk swarm.Chunk, state storage.ChunkState) error { - // only allow one report per tag - db.lock.Lock(fmt.Sprintf("%d", chunk.TagID())) - defer db.lock.Unlock(fmt.Sprintf("%d", chunk.TagID())) - - txnRepo, commit, rollback := db.repo.NewTx(ctx) - reporter := upload.NewPushReporter(txnRepo) - - err := reporter.Report(ctx, chunk, state) - if err != nil { - return fmt.Errorf("reporter.Report: %w", errors.Join(err, rollback())) - } - - return commit() -} diff --git a/pkg/storer/reporter_test.go b/pkg/storer/reporter_test.go deleted file mode 100644 index aa37ccf287e..00000000000 --- a/pkg/storer/reporter_test.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2023 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package storer_test - -import ( - "context" - "errors" - "testing" - "time" - - storage "github.com/ethersphere/bee/pkg/storage" - chunktesting "github.com/ethersphere/bee/pkg/storage/testing" - storer "github.com/ethersphere/bee/pkg/storer" - "github.com/ethersphere/bee/pkg/swarm" - "github.com/google/go-cmp/cmp" -) - -func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) { - t.Helper() - - chunks := chunktesting.GenerateTestRandomChunks(3) - - lstore, err := newStorer() - if err != nil { - t.Fatal(err) - } - - session, err := lstore.NewSession() - if err != nil { - t.Fatal(err) - } - - putter, err := lstore.Upload(context.Background(), false, session.TagID) - if err != nil { - t.Fatal(err) - } - - for _, ch := range chunks { - err = putter.Put(context.Background(), ch) - if err != nil { - t.Fatal(err) - } - } - - t.Run("report", func(t *testing.T) { - t.Run("commit", func(t *testing.T) { - err := lstore.Report(context.Background(), chunks[0], storage.ChunkSynced) - if err != nil { - t.Fatalf("Report(...): unexpected error %v", err) - } - - wantTI := storer.SessionInfo{ - TagID: session.TagID, - Split: 0, - Seen: 0, - Sent: 0, - Synced: 1, - Stored: 0, - StartedAt: session.StartedAt, - } - - gotTI, err := lstore.Session(session.TagID) - if err != nil { - t.Fatalf("Session(...): unexpected error: %v", err) - } - - if diff := cmp.Diff(wantTI, gotTI); diff != "" { - t.Fatalf("unexpected tag item (-want +have):\n%s", diff) - } - - has, err := lstore.Repo().ChunkStore().Has(context.Background(), chunks[0].Address()) - if err != nil { - t.Fatalf("ChunkStore.Has(...): unexpected error: %v", err) - } - if has { - t.Fatalf("expected chunk %s to not be found", chunks[0].Address()) - } - }) - - t.Run("rollback", func(t *testing.T) { - want := errors.New("dummy error") - lstore.SetRepoStorePutHook(func(item storage.Item) error { - if item.Namespace() == "tagItem" { - return want - } - return nil - }) - have := lstore.Report(context.Background(), chunks[1], storage.ChunkSynced) - if !errors.Is(have, want) { - t.Fatalf("unexpected error on Report: want %v have %v", want, have) - } - - wantTI := storer.SessionInfo{ - TagID: session.TagID, - Split: 0, - Seen: 0, - Sent: 0, - Synced: 1, - Stored: 0, - StartedAt: session.StartedAt, - } - - gotTI, err := lstore.Session(session.TagID) - if err != nil { - t.Fatalf("Session(...): unexpected error: %v", err) - } - - if diff := cmp.Diff(wantTI, gotTI); diff != "" { - t.Fatalf("unexpected tag item (-want +have):\n%s", diff) - } - - has, err := lstore.Repo().ChunkStore().Has(context.Background(), chunks[1].Address()) - if err != nil { - t.Fatalf("ChunkStore.Has(...): unexpected error: %v", err) - } - if !has { - t.Fatalf("expected chunk %s to be found", chunks[1].Address()) - } - }) - }) -} - -func TestReporter(t *testing.T) { - t.Parallel() - - t.Run("inmem", func(t *testing.T) { - t.Parallel() - - testReporter(t, func() (*storer.DB, error) { - - opts := dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second) - - return storer.New(context.Background(), "", opts) - }) - }) - t.Run("disk", func(t *testing.T) { - t.Parallel() - - opts := dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second) - - testReporter(t, diskStorer(t, opts)) - }) -} diff --git a/pkg/storer/subscribe_push.go b/pkg/storer/subscribe_push.go index 6de641bbaa3..d9c37abce28 100644 --- a/pkg/storer/subscribe_push.go +++ b/pkg/storer/subscribe_push.go @@ -6,10 +6,9 @@ package storer import ( "context" - "errors" "sync" + "time" - storage "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal/upload" "github.com/ethersphere/bee/pkg/swarm" ) @@ -55,15 +54,21 @@ func (db *DB) SubscribePush(ctx context.Context) (<-chan swarm.Chunk, func()) { }) if err != nil { - if !errors.Is(err, storage.ErrNotFound) { - db.logger.Error(err, "subscribe push: iterate error") - return - } // if we get storage.ErrNotFound, it could happen that the previous // iteration happened on a snapshot that was not fully updated yet. // in this case, we wait for the next event to trigger the iteration // again. This trigger ensures that we perform the iteration on the // latest snapshot. + db.logger.Error(err, "subscribe push: iterate error") + select { + case <-db.quit: + return + case <-ctx.Done(): + return + case <-stopChan: + return + case <-time.After(time.Second): + } db.events.Trigger(subscribePushEventKey) } diff --git a/pkg/storer/uploadstore.go b/pkg/storer/uploadstore.go index 9697c53bedf..9b5aeca4bbc 100644 --- a/pkg/storer/uploadstore.go +++ b/pkg/storer/uploadstore.go @@ -17,6 +17,25 @@ import ( "github.com/ethersphere/bee/pkg/swarm" ) +const uploadStoreKey = "uploadstore" + +// Report implements the storage.PushReporter by wrapping the internal reporter +// with a transaction. +func (db *DB) Report(ctx context.Context, chunk swarm.Chunk, state storage.ChunkState) error { + + db.lock.Lock(uploadStoreKey) + defer db.lock.Unlock(uploadStoreKey) + + err := db.Execute(ctx, func(s internal.Storage) error { + return upload.Report(ctx, s, chunk, state) + }) + if err != nil { + return fmt.Errorf("reporter.Report: %w", err) + } + + return nil +} + // Upload is the implementation of UploadStore.Upload method. func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession, error) { if tagID == 0 { @@ -51,6 +70,8 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession return &putterSession{ Putter: putterWithMetrics{ storage.PutterFunc(func(ctx context.Context, chunk swarm.Chunk) error { + db.lock.Lock(uploadStoreKey) + defer db.lock.Unlock(uploadStoreKey) return db.Execute(ctx, func(s internal.Storage) error { b, err := s.IndexStore().Batch(ctx) @@ -77,7 +98,8 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession }, done: func(address swarm.Address) error { defer db.events.Trigger(subscribePushEventKey) - + db.lock.Lock(uploadStoreKey) + defer db.lock.Unlock(uploadStoreKey) return db.Execute(ctx, func(s internal.Storage) error { b, err := s.IndexStore().Batch(ctx) @@ -102,7 +124,8 @@ func (db *DB) Upload(ctx context.Context, pin bool, tagID uint64) (PutterSession }, cleanup: func() error { defer db.events.Trigger(subscribePushEventKey) - + db.lock.Lock(uploadStoreKey) + defer db.lock.Unlock(uploadStoreKey) return errors.Join( uploadPutter.Cleanup(db), func() error { diff --git a/pkg/storer/uploadstore_test.go b/pkg/storer/uploadstore_test.go index aeeba3efc23..fa267728e6e 100644 --- a/pkg/storer/uploadstore_test.go +++ b/pkg/storer/uploadstore_test.go @@ -342,3 +342,130 @@ func TestUploadStore(t *testing.T) { testUploadStore(t, diskStorer(t, dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second))) }) } + +func testReporter(t *testing.T, newStorer func() (*storer.DB, error)) { + t.Helper() + + chunks := chunktesting.GenerateTestRandomChunks(3) + + lstore, err := newStorer() + if err != nil { + t.Fatal(err) + } + + session, err := lstore.NewSession() + if err != nil { + t.Fatal(err) + } + + putter, err := lstore.Upload(context.Background(), false, session.TagID) + if err != nil { + t.Fatal(err) + } + + for _, ch := range chunks { + err = putter.Put(context.Background(), ch) + if err != nil { + t.Fatal(err) + } + } + + t.Run("report", func(t *testing.T) { + t.Run("commit", func(t *testing.T) { + err := lstore.Report(context.Background(), chunks[0], storage.ChunkSynced) + if err != nil { + t.Fatalf("Report(...): unexpected error %v", err) + } + + wantTI := storer.SessionInfo{ + TagID: session.TagID, + Split: 0, + Seen: 0, + Sent: 0, + Synced: 1, + Stored: 0, + StartedAt: session.StartedAt, + } + + gotTI, err := lstore.Session(session.TagID) + if err != nil { + t.Fatalf("Session(...): unexpected error: %v", err) + } + + if diff := cmp.Diff(wantTI, gotTI); diff != "" { + t.Fatalf("unexpected tag item (-want +have):\n%s", diff) + } + + has, err := lstore.Repo().ChunkStore().Has(context.Background(), chunks[0].Address()) + if err != nil { + t.Fatalf("ChunkStore.Has(...): unexpected error: %v", err) + } + if has { + t.Fatalf("expected chunk %s to not be found", chunks[0].Address()) + } + }) + + t.Run("rollback", func(t *testing.T) { + want := errors.New("dummy error") + lstore.SetRepoStorePutHook(func(item storage.Item) error { + if item.Namespace() == "tagItem" { + return want + } + return nil + }) + have := lstore.Report(context.Background(), chunks[1], storage.ChunkSynced) + if !errors.Is(have, want) { + t.Fatalf("unexpected error on Report: want %v have %v", want, have) + } + + wantTI := storer.SessionInfo{ + TagID: session.TagID, + Split: 0, + Seen: 0, + Sent: 0, + Synced: 1, + Stored: 0, + StartedAt: session.StartedAt, + } + + gotTI, err := lstore.Session(session.TagID) + if err != nil { + t.Fatalf("Session(...): unexpected error: %v", err) + } + + if diff := cmp.Diff(wantTI, gotTI); diff != "" { + t.Fatalf("unexpected tag item (-want +have):\n%s", diff) + } + + has, err := lstore.Repo().ChunkStore().Has(context.Background(), chunks[1].Address()) + if err != nil { + t.Fatalf("ChunkStore.Has(...): unexpected error: %v", err) + } + if !has { + t.Fatalf("expected chunk %s to be found", chunks[1].Address()) + } + }) + }) +} + +func TestReporter(t *testing.T) { + t.Parallel() + + t.Run("inmem", func(t *testing.T) { + t.Parallel() + + testReporter(t, func() (*storer.DB, error) { + + opts := dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second) + + return storer.New(context.Background(), "", opts) + }) + }) + t.Run("disk", func(t *testing.T) { + t.Parallel() + + opts := dbTestOps(swarm.RandAddress(t), 0, nil, nil, time.Second) + + testReporter(t, diskStorer(t, opts)) + }) +}