From 6ff67c1426c6d16461a49ee85934ba52bff5d8c2 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 5 Feb 2024 22:58:06 +0300 Subject: [PATCH] fix: rebase --- pkg/storer/debug.go | 2 +- pkg/storer/internal/cache/cache.go | 36 ++---- pkg/storer/internal/cache/cache_test.go | 15 ++- pkg/storer/internal/upload/uploadstore.go | 6 +- .../internal/upload/uploadstore_test.go | 10 +- pkg/storer/migration/step_04_test.go | 4 +- pkg/storer/migration/step_05.go | 30 ++--- pkg/storer/migration/step_05_test.go | 103 +++++++++--------- 8 files changed, 99 insertions(+), 107 deletions(-) diff --git a/pkg/storer/debug.go b/pkg/storer/debug.go index c0b33150e9f..292a65a7d36 100644 --- a/pkg/storer/debug.go +++ b/pkg/storer/debug.go @@ -84,7 +84,7 @@ func (db *DB) DebugInfo(ctx context.Context) (Info, error) { synced uint64 ) eg.Go(func() error { - return upload.IterateAllTagItems(db.repo.IndexStore(), func(ti *upload.TagItem) (bool, error) { + return upload.IterateAllTagItems(db.storage.ReadOnly().IndexStore(), func(ti *upload.TagItem) (bool, error) { select { case <-ctx.Done(): return true, ctx.Err() diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index 501b40c0a35..304eccbed54 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -196,23 +196,24 @@ func (c *Cache) ShallowCopy( entries := make([]*cacheEntry, 0, len(addrs)) - entriesToAdd := make([]*cacheEntry, 0, len(addrs)) - defer func() { if err != nil { - for _, entry := range entries { - err = errors.Join(store.ChunkStore().Delete(context.Background(), entry.Address)) - } + _ = store.Run(context.Background(), func(s transaction.Store) error { + for _, entry := range entries { + err = errors.Join(s.ChunkStore().Delete(context.Background(), entry.Address)) + } + return nil + }) } }() for _, addr := range addrs { entry := &cacheEntry{Address: addr, AccessTimestamp: now().UnixNano()} - if has, err := store.IndexStore().Has(entry); err == nil && has { + if has, err := store.ReadOnly().IndexStore().Has(entry); err == nil && has { // Since the caller has previously referenced the chunk (+1 refCnt), and if the chunk is already referenced // by the cache store (+1 refCnt), then we must decrement the refCnt by one ( -1 refCnt to bring the total to +1). // See https://github.com/ethersphere/bee/issues/4530. - _ = store.ChunkStore().Delete(ctx, addr) + _ = store.Run(ctx, func(s transaction.Store) error { return s.ChunkStore().Delete(ctx, addr) }) continue } entries = append(entries, entry) @@ -225,23 +226,13 @@ func (c *Cache) ShallowCopy( //consider only the amount that can fit, the rest should be deleted from the chunkstore. if len(entries) > c.capacity { for _, addr := range entries[:len(entries)-c.capacity] { - _ = store.ChunkStore().Delete(ctx, addr.Address) + _ = store.Run(ctx, func(s transaction.Store) error { return s.ChunkStore().Delete(ctx, addr.Address) }) } entries = entries[len(entries)-c.capacity:] } - batch, err := store.IndexStore().Batch(ctx) - if err != nil { - return fmt.Errorf("failed creating batch: %w", err) - } - - for _, entry := range entries { - err = batch.Put(entry) - if err != nil { - return fmt.Errorf("failed adding entry %s: %w", entry, err) - } - - for _, entry := range entriesToAdd { + err = store.Run(ctx, func(s transaction.Store) error { + for _, entry := range entries { err = s.IndexStore().Put(entry) if err != nil { return fmt.Errorf("failed adding entry %s: %w", entry, err) @@ -254,15 +245,12 @@ func (c *Cache) ShallowCopy( return fmt.Errorf("failed adding cache order index: %w", err) } } - return nil }) if err == nil { - c.size.Add(int64(len(entriesToAdd))) + c.size.Add(int64(len(entries))) } - c.size.Add(int64(len(entries))) - return nil } diff --git a/pkg/storer/internal/cache/cache_test.go b/pkg/storer/internal/cache/cache_test.go index 55c449408ec..baf460735ec 100644 --- a/pkg/storer/internal/cache/cache_test.go +++ b/pkg/storer/internal/cache/cache_test.go @@ -416,7 +416,7 @@ func TestShallowCopyAlreadyCached(t *testing.T) { t.Parallel() st := newTestStorage(t) - c, err := cache.New(context.Background(), st, 1000) + c, err := cache.New(context.Background(), st.ReadOnly().IndexStore(), 1000) if err != nil { t.Fatal(err) } @@ -426,7 +426,10 @@ func TestShallowCopyAlreadyCached(t *testing.T) { for _, ch := range chunks { // add the chunks to chunkstore. This simulates the reserve already populating the chunkstore with chunks. - err := st.ChunkStore().Put(context.Background(), ch) + + err := st.Run(context.Background(), func(s transaction.Store) error { + return s.ChunkStore().Put(context.Background(), ch) + }) if err != nil { t.Fatal(err) } @@ -444,14 +447,14 @@ func TestShallowCopyAlreadyCached(t *testing.T) { t.Fatal(err) } - verifyChunksExist(t, st.ChunkStore(), chunks...) + verifyChunksExist(t, st.ReadOnly().ChunkStore(), chunks...) - err = c.RemoveOldest(context.Background(), st, st.ChunkStore(), 10) + err = c.RemoveOldest(context.Background(), st, 10) if err != nil { t.Fatal(err) } - verifyChunksDeleted(t, st.ChunkStore(), chunks...) + verifyChunksDeleted(t, st.ReadOnly().ChunkStore(), chunks...) } func verifyCacheState( @@ -525,7 +528,7 @@ func verifyChunksDeleted( func verifyChunksExist( t *testing.T, - chStore storage.ChunkStore, + chStore storage.ReadOnlyChunkStore, chs ...swarm.Chunk, ) { t.Helper() diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 278fbce57b5..33bb2289b25 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -679,7 +679,7 @@ func Report( return fmt.Errorf("failed deleting chunk %s: %w", chunk.Address(), err) } - err = batch.Delete(ui) + err = indexStore.Delete(ui) if err != nil { return fmt.Errorf("failed deleting uploadItem %s: %w", ui, err) } @@ -815,7 +815,7 @@ func DeleteTag(st storage.Writer, tagID uint64) error { return nil } -func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error)) error { +func IterateAll(st storage.Reader, iterateFn func(item storage.Item) (bool, error)) error { return st.Iterate( storage.Query{ Factory: func() storage.Item { return new(uploadItem) }, @@ -830,7 +830,7 @@ func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error ) } -func IterateAllTagItems(st storage.Store, cb func(ti *TagItem) (bool, error)) error { +func IterateAllTagItems(st storage.Reader, cb func(ti *TagItem) (bool, error)) error { return st.Iterate( storage.Query{ Factory: func() storage.Item { diff --git a/pkg/storer/internal/upload/uploadstore_test.go b/pkg/storer/internal/upload/uploadstore_test.go index df7d39359a6..c3226bc2de2 100644 --- a/pkg/storer/internal/upload/uploadstore_test.go +++ b/pkg/storer/internal/upload/uploadstore_test.go @@ -527,7 +527,7 @@ func TestChunkPutter(t *testing.T) { t.Run("iterate all", func(t *testing.T) { count := 0 - err := ts.IndexStore().Iterate( + err := ts.ReadOnly().IndexStore().Iterate( storage.Query{ Factory: func() storage.Item { return new(upload.UploadItem) }, }, @@ -538,7 +538,7 @@ func TestChunkPutter(t *testing.T) { if synced { t.Fatal("expected synced to be false") } - has, err := ts.ChunkStore().Has(context.Background(), address) + has, err := ts.ReadOnly().ChunkStore().Has(context.Background(), address) if err != nil { t.Fatalf("unexpected error in Has(...): %v", err) } @@ -589,7 +589,7 @@ func TestChunkPutter(t *testing.T) { t.Run("iterate all tag items", func(t *testing.T) { var tagItemsCount, uploaded, synced uint64 - err := upload.IterateAllTagItems(ts.IndexStore(), func(ti *upload.TagItem) (bool, error) { + err := upload.IterateAllTagItems(ts.ReadOnly().IndexStore(), func(ti *upload.TagItem) (bool, error) { uploaded += ti.Split synced += ti.Synced tagItemsCount++ @@ -764,7 +764,7 @@ func TestChunkReporter(t *testing.T) { Address: chunk.Address(), BatchID: chunk.Stamp().BatchID(), } - has, err := ts.IndexStore().Has(ui) + has, err := ts.ReadOnly().IndexStore().Has(ui) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -777,7 +777,7 @@ func TestChunkReporter(t *testing.T) { Address: chunk.Address(), BatchID: chunk.Stamp().BatchID(), } - has, err = ts.IndexStore().Has(pi) + has, err = ts.ReadOnly().IndexStore().Has(pi) if err != nil { t.Fatalf("Has(...): unexpected error: %v", err) } diff --git a/pkg/storer/migration/step_04_test.go b/pkg/storer/migration/step_04_test.go index 66159585956..2931d7f9193 100644 --- a/pkg/storer/migration/step_04_test.go +++ b/pkg/storer/migration/step_04_test.go @@ -35,9 +35,7 @@ func Test_Step_04(t *testing.T) { sharkyDir := t.TempDir() sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) assert.NoError(t, err) - store := inmemstore.New() - storage := transaction.NewStorage(sharkyStore, store) stepFn := localmigration.Step_04(sharkyDir, 1, storage) @@ -58,7 +56,7 @@ func Test_Step_04(t *testing.T) { assert.NoError(t, err) } - err = sharkyStore.Close() + err = storage.Close() assert.NoError(t, err) assert.NoError(t, stepFn()) diff --git a/pkg/storer/migration/step_05.go b/pkg/storer/migration/step_05.go index f9aa75b2b00..0b9f5a446c8 100644 --- a/pkg/storer/migration/step_05.go +++ b/pkg/storer/migration/step_05.go @@ -5,16 +5,18 @@ package migration import ( + "context" "fmt" "os" "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/storer/internal/upload" ) // step_05 is a migration step that removes all upload items from the store. -func step_05(st storage.BatchedStore) error { +func step_05(st transaction.Storage) error { logger := log.NewLogger("migration-step-05", log.WithSink(os.Stdout)) logger.Info("start removing upload items") @@ -22,7 +24,9 @@ func step_05(st storage.BatchedStore) error { errC := make(chan error) go func() { for item := range itemC { - err := st.Delete(item) + err := st.Run(context.Background(), func(s transaction.Store) error { + return s.IndexStore().Delete(item) + }) if err != nil { errC <- fmt.Errorf("delete upload item: %w", err) return @@ -31,23 +35,19 @@ func step_05(st storage.BatchedStore) error { close(errC) }() - go func() { - defer close(itemC) - err := upload.IterateAll(st, func(u storage.Item) (bool, error) { - itemC <- u - return false, nil - }) - if err != nil { - errC <- fmt.Errorf("iterate upload items: %w", err) - return + err := upload.IterateAll(st.ReadOnly().IndexStore(), func(u storage.Item) (bool, error) { + select { + case itemC <- u: + case err := <-errC: + return true, err } - }() - - err := <-errC + return false, nil + }) + close(itemC) if err != nil { return err } logger.Info("finished removing upload items") - return nil + return <-errC } diff --git a/pkg/storer/migration/step_05_test.go b/pkg/storer/migration/step_05_test.go index cc7a88214a8..32eedaaf514 100644 --- a/pkg/storer/migration/step_05_test.go +++ b/pkg/storer/migration/step_05_test.go @@ -8,43 +8,42 @@ import ( "context" "testing" - "github.com/ethersphere/bee/pkg/node" - "github.com/ethersphere/bee/pkg/postage" + "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storage/leveldbstore" chunktest "github.com/ethersphere/bee/pkg/storage/testing" - "github.com/ethersphere/bee/pkg/storer" "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/transaction" "github.com/ethersphere/bee/pkg/storer/internal/upload" localmigration "github.com/ethersphere/bee/pkg/storer/migration" "github.com/ethersphere/bee/pkg/swarm" - kademlia "github.com/ethersphere/bee/pkg/topology/mock" - "github.com/ethersphere/bee/pkg/util/testutil" + "github.com/stretchr/testify/assert" ) func Test_Step_05(t *testing.T) { t.Parallel() - db, err := storer.New(context.Background(), "", &storer.Options{ - Logger: testutil.NewLogger(t), - RadiusSetter: kademlia.NewTopologyDriver(), - Batchstore: new(postage.NoOpBatchStore), - ReserveCapacity: node.ReserveCapacity, - }) - if err != nil { - t.Fatalf("New(...): unexpected error: %v", err) - } + sharkyDir := t.TempDir() + sharkyStore, err := sharky.New(&dirFS{basedir: sharkyDir}, 1, swarm.SocMaxChunkSize) + assert.NoError(t, err) + lstore, err := leveldbstore.New("", nil) + assert.NoError(t, err) + + store := transaction.NewStorage(sharkyStore, lstore) t.Cleanup(func() { - err := db.Close() + err := store.Close() if err != nil { t.Fatalf("Close(): unexpected closing storer: %v", err) } }) - wantCount := func(t *testing.T, st internal.Storage, want int) { + ctx := context.Background() + + wantCount := func(t *testing.T, st storage.Reader, want int) { t.Helper() count := 0 - err = upload.IterateAll(st.IndexStore(), func(_ storage.Item) (bool, error) { + err := upload.IterateAll(st, func(_ storage.Item) (bool, error) { count++ return false, nil }) @@ -56,47 +55,51 @@ func Test_Step_05(t *testing.T) { } } - err = db.Execute(context.Background(), func(st internal.Storage) error { - tag, err := upload.NextTag(st.IndexStore()) - if err != nil { - t.Fatalf("create tag: %v", err) - } + var tag upload.TagItem + err = store.Run(context.Background(), func(s transaction.Store) error { + tag, err = upload.NextTag(s.IndexStore()) + return err + }) + if err != nil { + t.Fatalf("create tag: %v", err) + } - putter, err := upload.NewPutter(st, tag.TagID) - if err != nil { - t.Fatalf("create putter: %v", err) - } - ctx := context.Background() - chunks := chunktest.GenerateTestRandomChunks(10) - b, err := st.IndexStore().Batch(ctx) - if err != nil { - t.Fatalf("create batch: %v", err) - } + var putter internal.PutterCloserWithReference + err = store.Run(context.Background(), func(s transaction.Store) error { + putter, err = upload.NewPutter(s.IndexStore(), tag.TagID) + return err + }) + if err != nil { + t.Fatalf("create putter: %v", err) + } + chunks := chunktest.GenerateTestRandomChunks(10) + + err = store.Run(context.Background(), func(s transaction.Store) error { for _, ch := range chunks { - err := putter.Put(ctx, st, b, ch) + err := putter.Put(ctx, s, ch) if err != nil { - t.Fatalf("put chunk: %v", err) + return err } } - err = putter.Close(st, st.IndexStore(), swarm.RandAddress(t)) - if err != nil { - t.Fatalf("close putter: %v", err) - } - err = b.Commit() - if err != nil { - t.Fatalf("commit batch: %v", err) - } - - wantCount(t, st, 10) - err = localmigration.Step_05(st.IndexStore()) - if err != nil { - t.Fatalf("step 05: %v", err) - } - wantCount(t, st, 0) return nil }) if err != nil { - t.Fatalf("execute: %v", err) + t.Fatalf("put chunk: %v", err) + } + + err = store.Run(ctx, func(s transaction.Store) error { + return putter.Close(s.IndexStore(), swarm.RandAddress(t)) + }) + if err != nil { + t.Fatalf("close putter: %v", err) + } + + wantCount(t, store.ReadOnly().IndexStore(), 10) + + err = localmigration.Step_05(store) + if err != nil { + t.Fatalf("step 05: %v", err) } + wantCount(t, store.ReadOnly().IndexStore(), 0) }