Skip to content

Commit

Permalink
feat: add migration
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Feb 1, 2024
1 parent 1becf68 commit c593365
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 0 deletions.
22 changes: 22 additions & 0 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,17 @@ type uploadItem struct {
TagID uint64
Uploaded int64
Synced int64

// idFunc overrides the ID method.
// This used to get the ID from the item where the address and batchID were not marshalled.
idFunc func() string
}

// ID implements the storage.Item interface.
func (i uploadItem) ID() string {
if i.idFunc != nil {
return i.idFunc()
}
return storageutil.JoinFields(i.Address.ByteString(), string(i.BatchID))
}

Expand Down Expand Up @@ -820,6 +827,21 @@ func DeleteTag(st storage.Store, tagID uint64) error {
return nil
}

func IterateAll(st storage.Store, iterateFn func(item storage.Item) (bool, error)) error {
return st.Iterate(
storage.Query{
Factory: func() storage.Item { return new(uploadItem) },
},
func(r storage.Result) (bool, error) {
ui := r.Entry.(*uploadItem)
ui.idFunc = func() string {
return r.ID
}
return iterateFn(ui)
},
)
}

func IterateAllTagItems(st storage.Store, cb func(ti *TagItem) (bool, error)) error {
return st.Iterate(
storage.Query{
Expand Down
1 change: 1 addition & 0 deletions pkg/storer/migration/all_steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func AfterInitSteps(
2: step_02,
3: step_03(chunkStore, reserve.ChunkType),
4: step_04(sharkyPath, sharkyNoOfShards),
5: step_05,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/storer/migration/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ var (
Step_02 = step_02
Step_03 = step_03
Step_04 = step_04
Step_05 = step_05
)
34 changes: 34 additions & 0 deletions pkg/storer/migration/step_05.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package migration

Check failure on line 1 in pkg/storer/migration/step_05.go

View workflow job for this annotation

GitHub Actions / Lint

Missed header for check (goheader)

import (
"fmt"
"os"

"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal/upload"
)

// step_05 is a migration step that removes all upload items from the store.
func step_05(st storage.BatchedStore) error {
logger := log.NewLogger("migration-step-05", log.WithSink(os.Stdout))
logger.Info("start removing upload items")

var itemsToDelete []storage.Item
err := upload.IterateAll(st, func(u storage.Item) (bool, error) {
itemsToDelete = append(itemsToDelete, u)
return false, nil
})
if err != nil {
return fmt.Errorf("iterate upload items: %w", err)
}

for _, item := range itemsToDelete {
err := st.Delete(item)
if err != nil {
return fmt.Errorf("delete upload item: %w", err)
}
}
logger.Info("finished removing upload items")
return nil
}
77 changes: 77 additions & 0 deletions pkg/storer/migration/step_05_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package migration_test

Check failure on line 1 in pkg/storer/migration/step_05_test.go

View workflow job for this annotation

GitHub Actions / Lint

Missed header for check (goheader)

import (
"context"
"github.com/ethersphere/bee/pkg/swarm"
"testing"

"github.com/ethersphere/bee/pkg/storage"
chunktest "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/upload"
localmigration "github.com/ethersphere/bee/pkg/storer/migration"
)

func Test_Step_05(t *testing.T) {
t.Parallel()

st, closer := internal.NewInmemStorage()
t.Cleanup(func() {
err := closer()
if err != nil {
t.Errorf("closing storage: %v", err)
}
})

tag, err := upload.NextTag(st.IndexStore())
if err != nil {
t.Fatalf("create tag: %v", err)
}

putter, err := upload.NewPutter(st, tag.TagID)
if err != nil {
t.Fatalf("create putter: %v", err)
}
ctx := context.Background()
chunks := chunktest.GenerateTestRandomChunks(10)
b, err := st.IndexStore().Batch(ctx)
if err != nil {
t.Fatalf("create batch: %v", err)
}

for _, ch := range chunks {
err := putter.Put(ctx, st, b, ch)
if err != nil {
t.Fatalf("put chunk: %v", err)
}
}
err = putter.Close(st, st.IndexStore(), swarm.RandAddress(t))
if err != nil {
t.Fatalf("close putter: %v", err)
}
err = b.Commit()
if err != nil {
t.Fatalf("commit batch: %v", err)
}

wantCount := func(t *testing.T, want int) {

Check failure on line 57 in pkg/storer/migration/step_05_test.go

View workflow job for this annotation

GitHub Actions / Lint

test helper function should start from t.Helper() (thelper)
count := 0
err = upload.IterateAll(st.IndexStore(), func(_ storage.Item) (bool, error) {
count++
return false, nil
})
if err != nil {
t.Fatalf("iterate upload items: %v", err)
}
if count != want {
t.Fatalf("expected %d upload items, got %d", want, count)
}
}

wantCount(t, 10)
err = localmigration.Step_05(st.IndexStore())
if err != nil {
t.Fatalf("step 05: %v", err)
}
wantCount(t, 0)
}

0 comments on commit c593365

Please sign in to comment.