Skip to content

Commit

Permalink
fix: validation
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Sep 21, 2023
1 parent 2ef1a71 commit b9f308f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 7 deletions.
11 changes: 10 additions & 1 deletion cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func dbInfoCmd(cmd *cobra.Command) {
}

func dbCompactCmd(cmd *cobra.Command) {

optionNameValidation := "validate"

c := &cobra.Command{
Use: "compact",
Short: "Compacts the localstore sharky store.",
Expand All @@ -123,14 +126,19 @@ func dbCompactCmd(cmd *cobra.Command) {
return errors.New("no data-dir provided")
}

validation, err := cmd.Flags().GetBool(optionNameValidation)
if err != nil {
return fmt.Errorf("get validation: %w", err)
}

localstorePath := path.Join(dataDir, "localstore")

err = storer.Compact(context.Background(), localstorePath, &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
})
}, validation)
if err != nil {
return fmt.Errorf("localstore: %w", err)
}
Expand All @@ -140,6 +148,7 @@ func dbCompactCmd(cmd *cobra.Command) {
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.Flags().Bool(optionNameValidation, true, "enable chunk validation")
cmd.AddCommand(c)
}

Expand Down
61 changes: 56 additions & 5 deletions pkg/storer/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,29 @@ package storer

import (
"context"
"errors"
"fmt"
"path"
"sort"
"time"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/sync/errgroup"
)

// Compact minimizes sharky disk usage by, using the current sharky locations from the storer,
// relocating chunks starting from the end of the used slots to the first available slots.
func Compact(ctx context.Context, basePath string, opts *Options) error {
func Compact(ctx context.Context, basePath string, opts *Options, validate bool) error {

logger := opts.Logger

n := time.Now()
defer func() {
logger.Info("compaction finished", "duration", time.Since(n))
}()

logger.Info("starting compaction")

Expand All @@ -49,6 +52,11 @@ func Compact(ctx context.Context, basePath string, opts *Options) error {
}
}()

if validate {
logger.Info("performing chunk validation before compaction")
validationWork(ctx, logger, store, sharkyRecover)
}

iteratateItemsC := make(chan chunkstore.IterateResult)
chunkstore.Iterate(ctx, store, iteratateItemsC)

Expand All @@ -62,7 +70,7 @@ func Compact(ctx context.Context, basePath string, opts *Options) error {
if c.Err != nil {
return fmt.Errorf("location read: %w", err)
}
count++

shards[c.Item.Location.Shard] = append(shards[c.Item.Location.Shard], c.Item)
}

Expand Down Expand Up @@ -124,5 +132,48 @@ func Compact(ctx context.Context, basePath string, opts *Options) error {
}
}

logger.Info("compaction finished", "duration", time.Since(n))

if validate {
logger.Info("performing chunk validation after compaction")
validationWork(ctx, logger, store, sharkyRecover)
}

return sharkyRecover.Save()
}

func validationWork(ctx context.Context, logger log.Logger, store storage.Store, sharky *sharky.Recovery) {

iteratateItemsC := make(chan chunkstore.IterateResult)
chunkstore.Iterate(ctx, store, iteratateItemsC)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error {
err := sharky.Read(ctx, item.Location, buf)
if err != nil {
return err
}

ch := swarm.NewChunk(item.Address, buf[:item.Location.Length])
if !cac.Valid(ch) && !soc.Valid(ch) {
return errors.New("invalid chunk")
}

return nil
}

eg, ctx := errgroup.WithContext(ctx)

for i := 0; i < 4; i++ {
eg.Go(func() error {
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
if err := validChunk(item.Item, buf); err != nil {
logger.Info("invalid chunk", "address", item.Item.Address, "error", err)
}
}
return nil
})
}

_ = eg.Wait()
}
2 changes: 1 addition & 1 deletion pkg/storer/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestCompact(t *testing.T) {
t.Fatal(err)
}

err = storer.Compact(ctx, basePath, opts)
err = storer.Compact(ctx, basePath, opts, true)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit b9f308f

Please sign in to comment.