Skip to content

Commit

Permalink
fix: update migration and test
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill committed Feb 2, 2024
1 parent 4099ecf commit d2b6c8c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 49 deletions.
37 changes: 26 additions & 11 deletions pkg/storer/migration/step_05.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,36 @@ func step_05(st storage.BatchedStore) error {
logger := log.NewLogger("migration-step-05", log.WithSink(os.Stdout))
logger.Info("start removing upload items")

var itemsToDelete []storage.Item
err := upload.IterateAll(st, func(u storage.Item) (bool, error) {
itemsToDelete = append(itemsToDelete, u)
return false, nil
})
if err != nil {
return fmt.Errorf("iterate upload items: %w", err)
}
itemC := make(chan storage.Item)
errC := make(chan error)
go func() {
for item := range itemC {
err := st.Delete(item)
if err != nil {
errC <- fmt.Errorf("delete upload item: %w", err)
return
}
}
close(errC)
}()

for _, item := range itemsToDelete {
err := st.Delete(item)
go func() {
defer close(itemC)
err := upload.IterateAll(st, func(u storage.Item) (bool, error) {
itemC <- u
return false, nil
})
if err != nil {
return fmt.Errorf("delete upload item: %w", err)
errC <- fmt.Errorf("iterate upload items: %w", err)
return
}
}()

err := <-errC
if err != nil {
return err
}

logger.Info("finished removing upload items")
return nil
}
96 changes: 58 additions & 38 deletions pkg/storer/migration/step_05_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,59 +6,42 @@ package migration_test

import (
"context"
"github.com/ethersphere/bee/pkg/swarm"
"testing"

"github.com/ethersphere/bee/pkg/node"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
chunktest "github.com/ethersphere/bee/pkg/storage/testing"
"github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/upload"
localmigration "github.com/ethersphere/bee/pkg/storer/migration"
"github.com/ethersphere/bee/pkg/swarm"
kademlia "github.com/ethersphere/bee/pkg/topology/mock"
"github.com/ethersphere/bee/pkg/util/testutil"
)

func Test_Step_05(t *testing.T) {
t.Parallel()

st, closer := internal.NewInmemStorage()
t.Cleanup(func() {
err := closer()
if err != nil {
t.Errorf("closing storage: %v", err)
}
db, err := storer.New(context.Background(), "", &storer.Options{
Logger: testutil.NewLogger(t),
RadiusSetter: kademlia.NewTopologyDriver(),
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
})

tag, err := upload.NextTag(st.IndexStore())
if err != nil {
t.Fatalf("create tag: %v", err)
}

putter, err := upload.NewPutter(st, tag.TagID)
if err != nil {
t.Fatalf("create putter: %v", err)
}
ctx := context.Background()
chunks := chunktest.GenerateTestRandomChunks(10)
b, err := st.IndexStore().Batch(ctx)
if err != nil {
t.Fatalf("create batch: %v", err)
t.Fatalf("New(...): unexpected error: %v", err)
}

for _, ch := range chunks {
err := putter.Put(ctx, st, b, ch)
t.Cleanup(func() {
err := db.Close()
if err != nil {
t.Fatalf("put chunk: %v", err)
t.Fatalf("Close(): unexpected closing storer: %v", err)
}
}
err = putter.Close(st, st.IndexStore(), swarm.RandAddress(t))
if err != nil {
t.Fatalf("close putter: %v", err)
}
err = b.Commit()
if err != nil {
t.Fatalf("commit batch: %v", err)
}
})

wantCount := func(t *testing.T, want int) {
wantCount := func(t *testing.T, st internal.Storage, want int) {
t.Helper()
count := 0
err = upload.IterateAll(st.IndexStore(), func(_ storage.Item) (bool, error) {
Expand All @@ -73,10 +56,47 @@ func Test_Step_05(t *testing.T) {
}
}

wantCount(t, 10)
err = localmigration.Step_05(st.IndexStore())
err = db.Execute(context.Background(), func(st internal.Storage) error {
tag, err := upload.NextTag(st.IndexStore())
if err != nil {
t.Fatalf("create tag: %v", err)
}

putter, err := upload.NewPutter(st, tag.TagID)
if err != nil {
t.Fatalf("create putter: %v", err)
}
ctx := context.Background()
chunks := chunktest.GenerateTestRandomChunks(10)
b, err := st.IndexStore().Batch(ctx)
if err != nil {
t.Fatalf("create batch: %v", err)
}

for _, ch := range chunks {
err := putter.Put(ctx, st, b, ch)
if err != nil {
t.Fatalf("put chunk: %v", err)
}
}
err = putter.Close(st, st.IndexStore(), swarm.RandAddress(t))
if err != nil {
t.Fatalf("close putter: %v", err)
}
err = b.Commit()
if err != nil {
t.Fatalf("commit batch: %v", err)
}

wantCount(t, st, 10)
err = localmigration.Step_05(st.IndexStore())
if err != nil {
t.Fatalf("step 05: %v", err)
}
wantCount(t, st, 0)
return nil
})
if err != nil {
t.Fatalf("step 05: %v", err)
t.Fatalf("execute: %v", err)
}
wantCount(t, 0)
}

0 comments on commit d2b6c8c

Please sign in to comment.