diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 124a16898a3..6292462f438 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -101,6 +101,9 @@ func dbInfoCmd(cmd *cobra.Command) { } func dbCompactCmd(cmd *cobra.Command) { + + optionNameValidation := "validate" + c := &cobra.Command{ Use: "compact", Short: "Compacts the localstore sharky store.", @@ -123,6 +126,11 @@ func dbCompactCmd(cmd *cobra.Command) { return errors.New("no data-dir provided") } + validation, err := cmd.Flags().GetBool(optionNameValidation) + if err != nil { + return fmt.Errorf("get validation: %w", err) + } + localstorePath := path.Join(dataDir, "localstore") err = storer.Compact(context.Background(), localstorePath, &storer.Options{ @@ -130,7 +138,7 @@ func dbCompactCmd(cmd *cobra.Command) { RadiusSetter: noopRadiusSetter{}, Batchstore: new(postage.NoOpBatchStore), ReserveCapacity: node.ReserveCapacity, - }) + }, validation) if err != nil { return fmt.Errorf("localstore: %w", err) } @@ -140,6 +148,7 @@ func dbCompactCmd(cmd *cobra.Command) { } c.Flags().String(optionNameDataDir, "", "data directory") c.Flags().String(optionNameVerbosity, "info", "verbosity level") + c.Flags().Bool(optionNameValidation, true, "enable chunk validation") cmd.AddCommand(c) } diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 9416454b7ca..f5d0db3e147 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -6,26 +6,29 @@ package storer import ( "context" + "errors" "fmt" "path" "sort" "time" + "github.com/ethersphere/bee/pkg/cac" + "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/soc" + "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/pkg/swarm" + "golang.org/x/sync/errgroup" ) // Compact minimizes sharky disk usage by, using the current sharky locations from the storer, // relocating chunks starting from the end of the used slots to the first available slots. -func Compact(ctx context.Context, basePath string, opts *Options) error { +func Compact(ctx context.Context, basePath string, opts *Options, validate bool) error { logger := opts.Logger n := time.Now() - defer func() { - logger.Info("compaction finished", "duration", time.Since(n)) - }() logger.Info("starting compaction") @@ -49,6 +52,11 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { } }() + if validate { + logger.Info("performing chunk validation before compaction") + validationWork(ctx, logger, store, sharkyRecover) + } + iteratateItemsC := make(chan chunkstore.IterateResult) chunkstore.Iterate(ctx, store, iteratateItemsC) @@ -62,7 +70,7 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { if c.Err != nil { return fmt.Errorf("location read: %w", err) } - count++ + shards[c.Item.Location.Shard] = append(shards[c.Item.Location.Shard], c.Item) } @@ -124,5 +132,48 @@ func Compact(ctx context.Context, basePath string, opts *Options) error { } } + logger.Info("compaction finished", "duration", time.Since(n)) + + if validate { + logger.Info("performing chunk validation after compaction") + validationWork(ctx, logger, store, sharkyRecover) + } + return sharkyRecover.Save() } + +func validationWork(ctx context.Context, logger log.Logger, store storage.Store, sharky *sharky.Recovery) { + + iteratateItemsC := make(chan chunkstore.IterateResult) + chunkstore.Iterate(ctx, store, iteratateItemsC) + + validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error { + err := sharky.Read(ctx, item.Location, buf) + if err != nil { + return err + } + + ch := swarm.NewChunk(item.Address, buf[:item.Location.Length]) + if !cac.Valid(ch) && !soc.Valid(ch) { + return errors.New("invalid chunk") + } + + return nil + } + + eg, ctx := errgroup.WithContext(ctx) + + for i := 0; i < 4; i++ { + eg.Go(func() error { + buf := make([]byte, swarm.SocMaxChunkSize) + for item := range iteratateItemsC { + if err := validChunk(item.Item, buf); err != nil { + logger.Info("invalid chunk", "address", item.Item.Address, "error", err) + } + } + return nil + }) + } + + _ = eg.Wait() +} diff --git a/pkg/storer/compact_test.go b/pkg/storer/compact_test.go index dde002975c2..64b29ea2dcd 100644 --- a/pkg/storer/compact_test.go +++ b/pkg/storer/compact_test.go @@ -73,7 +73,7 @@ func TestCompact(t *testing.T) { t.Fatal(err) } - err = storer.Compact(ctx, basePath, opts) + err = storer.Compact(ctx, basePath, opts, true) if err != nil { t.Fatal(err) }