From c593365aa7fc522f97c488e6d47a4047b12a6d23 Mon Sep 17 00:00:00 2001 From: Acha Bill Date: Thu, 1 Feb 2024 13:05:06 +0100 Subject: [PATCH] feat: add migration --- pkg/storer/internal/upload/uploadstore.go | 22 +++++++ pkg/storer/migration/all_steps.go | 1 + pkg/storer/migration/export_test.go | 1 + pkg/storer/migration/step_05.go | 34 ++++++++++ pkg/storer/migration/step_05_test.go | 77 +++++++++++++++++++++++ 5 files changed, 135 insertions(+) create mode 100644 pkg/storer/migration/step_05.go create mode 100644 pkg/storer/migration/step_05_test.go diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 5c59ca6347b..64910bcdb4c 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -230,10 +230,17 @@ type uploadItem struct { TagID uint64 Uploaded int64 Synced int64 + + // idFunc overrides the ID method. + // This used to get the ID from the item where the address and batchID were not marshalled. + idFunc func() string } // ID implements the storage.Item interface. func (i uploadItem) ID() string { + if i.idFunc != nil { + return i.idFunc() + } return storageutil.JoinFields(i.Address.ByteString(), string(i.BatchID)) } @@ -820,6 +827,21 @@ func DeleteTag(st storage.Store, tagID uint64) error { return nil } +func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error)) error { + return st.Iterate( + storage.Query{ + Factory: func() storage.Item { return new(uploadItem) }, + }, + func(r storage.Result) (bool, error) { + ui := r.Entry.(*uploadItem) + ui.idFunc = func() string { + return r.ID + } + return iterateFn(ui) + }, + ) +} + func IterateAllTagItems(st storage.Store, cb func(ti *TagItem) (bool, error)) error { return st.Iterate( storage.Query{ diff --git a/pkg/storer/migration/all_steps.go b/pkg/storer/migration/all_steps.go index ef028659f6d..69aa419747f 100644 --- a/pkg/storer/migration/all_steps.go +++ b/pkg/storer/migration/all_steps.go @@ -21,6 +21,7 @@ func AfterInitSteps( 2: step_02, 3: step_03(chunkStore, reserve.ChunkType), 4: step_04(sharkyPath, sharkyNoOfShards), + 5: step_05, } } diff --git a/pkg/storer/migration/export_test.go b/pkg/storer/migration/export_test.go index 210d6973607..bfebec5cb42 100644 --- a/pkg/storer/migration/export_test.go +++ b/pkg/storer/migration/export_test.go @@ -9,4 +9,5 @@ var ( Step_02 = step_02 Step_03 = step_03 Step_04 = step_04 + Step_05 = step_05 ) diff --git a/pkg/storer/migration/step_05.go b/pkg/storer/migration/step_05.go new file mode 100644 index 00000000000..ed6ee701247 --- /dev/null +++ b/pkg/storer/migration/step_05.go @@ -0,0 +1,34 @@ +package migration + +import ( + "fmt" + "os" + + "github.com/ethersphere/bee/pkg/log" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storer/internal/upload" +) + +// step_05 is a migration step that removes all upload items from the store. +func step_05(st storage.BatchedStore) error { + logger := log.NewLogger("migration-step-05", log.WithSink(os.Stdout)) + logger.Info("start removing upload items") + + var itemsToDelete []storage.Item + err := upload.IterateAll(st, func(u storage.Item) (bool, error) { + itemsToDelete = append(itemsToDelete, u) + return false, nil + }) + if err != nil { + return fmt.Errorf("iterate upload items: %w", err) + } + + for _, item := range itemsToDelete { + err := st.Delete(item) + if err != nil { + return fmt.Errorf("delete upload item: %w", err) + } + } + logger.Info("finished removing upload items") + return nil +} diff --git a/pkg/storer/migration/step_05_test.go b/pkg/storer/migration/step_05_test.go new file mode 100644 index 00000000000..73333d83d8e --- /dev/null +++ b/pkg/storer/migration/step_05_test.go @@ -0,0 +1,77 @@ +package migration_test + +import ( + "context" + "github.com/ethersphere/bee/pkg/swarm" + "testing" + + "github.com/ethersphere/bee/pkg/storage" + chunktest "github.com/ethersphere/bee/pkg/storage/testing" + "github.com/ethersphere/bee/pkg/storer/internal" + "github.com/ethersphere/bee/pkg/storer/internal/upload" + localmigration "github.com/ethersphere/bee/pkg/storer/migration" +) + +func Test_Step_05(t *testing.T) { + t.Parallel() + + st, closer := internal.NewInmemStorage() + t.Cleanup(func() { + err := closer() + if err != nil { + t.Errorf("closing storage: %v", err) + } + }) + + tag, err := upload.NextTag(st.IndexStore()) + if err != nil { + t.Fatalf("create tag: %v", err) + } + + putter, err := upload.NewPutter(st, tag.TagID) + if err != nil { + t.Fatalf("create putter: %v", err) + } + ctx := context.Background() + chunks := chunktest.GenerateTestRandomChunks(10) + b, err := st.IndexStore().Batch(ctx) + if err != nil { + t.Fatalf("create batch: %v", err) + } + + for _, ch := range chunks { + err := putter.Put(ctx, st, b, ch) + if err != nil { + t.Fatalf("put chunk: %v", err) + } + } + err = putter.Close(st, st.IndexStore(), swarm.RandAddress(t)) + if err != nil { + t.Fatalf("close putter: %v", err) + } + err = b.Commit() + if err != nil { + t.Fatalf("commit batch: %v", err) + } + + wantCount := func(t *testing.T, want int) { + count := 0 + err = upload.IterateAll(st.IndexStore(), func(_ storage.Item) (bool, error) { + count++ + return false, nil + }) + if err != nil { + t.Fatalf("iterate upload items: %v", err) + } + if count != want { + t.Fatalf("expected %d upload items, got %d", want, count) + } + } + + wantCount(t, 10) + err = localmigration.Step_05(st.IndexStore()) + if err != nil { + t.Fatalf("step 05: %v", err) + } + wantCount(t, 0) +}