diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index 496239d98d7..3e6e910105f 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -39,6 +39,7 @@ func (c *command) initDBCmd() { dbNukeCmd(cmd) dbInfoCmd(cmd) dbCompactCmd(cmd) + dbValidateCmd(cmd) c.root.AddCommand(cmd) } @@ -165,6 +166,54 @@ func dbCompactCmd(cmd *cobra.Command) { cmd.AddCommand(c) } +func dbValidateCmd(cmd *cobra.Command) { + c := &cobra.Command{ + Use: "validate", + Short: "Validates the localstore sharky store.", + RunE: func(cmd *cobra.Command, args []string) (err error) { + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + dataDir, err := cmd.Flags().GetString(optionNameDataDir) + if err != nil { + return fmt.Errorf("get data-dir: %w", err) + } + if dataDir == "" { + return errors.New("no data-dir provided") + } + + logger.Warning("Validation ensures that sharky returns a chunk that hashes to the expected reference.") + logger.Warning(" Invalid chunks logged at Warning level.") + logger.Warning(" Progress logged at Info level.") + logger.Warning(" SOC chunks logged at Debug level.") + + localstorePath := path.Join(dataDir, "localstore") + + err = storer.Validate(context.Background(), localstorePath, &storer.Options{ + Logger: logger, + RadiusSetter: noopRadiusSetter{}, + Batchstore: new(postage.NoOpBatchStore), + ReserveCapacity: node.ReserveCapacity, + }) + if err != nil { + return fmt.Errorf("localstore: %w", err) + } + + return nil + }, + } + c.Flags().String(optionNameDataDir, "", "data directory") + c.Flags().String(optionNameVerbosity, "info", "verbosity level") + cmd.AddCommand(c) +} + func dbExportCmd(cmd *cobra.Command) { c := &cobra.Command{ Use: "export", diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index 642285d0239..5e23c73b8eb 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -10,14 +10,9 @@ import ( "fmt" "path" "sort" - "sync" "time" - "github.com/ethersphere/bee/pkg/cac" - "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/sharky" - "github.com/ethersphere/bee/pkg/soc" - "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" "github.com/ethersphere/bee/pkg/swarm" ) @@ -50,7 +45,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) if validate { logger.Info("performing chunk validation before compaction") - validationWork(logger, store, sharkyRecover) + validateWork(logger, store, sharkyRecover.Read) } logger.Info("starting compaction") @@ -148,81 +143,8 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) if validate { logger.Info("performing chunk validation after compaction") - validationWork(logger, store, sharkyRecover) + validateWork(logger, store, sharkyRecover.Read) } return nil } - -func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recovery) { - - n := time.Now() - defer func() { - logger.Info("validation finished", "duration", time.Since(n)) - }() - - iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) - - validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) { - err := sharky.Read(context.Background(), item.Location, buf) - if err != nil { - logger.Warning("invalid chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "location", item.Location, "error", err) - return - } - - ch := swarm.NewChunk(item.Address, buf) - if !cac.Valid(ch) && !soc.Valid(ch) { - - logger.Info("invalid cac/soc chunk ", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0)) - - h, err := cac.DoHash(buf[swarm.SpanSize:], buf[:swarm.SpanSize]) - if err != nil { - logger.Error(err, "cac hash") - return - } - - computedAddr := swarm.NewAddress(h) - - if !cac.Valid(swarm.NewChunk(computedAddr, buf)) { - logger.Info("computed chunk is also an invalid cac") - return - } - - shardedEntry := chunkstore.RetrievalIndexItem{Address: computedAddr} - err = store.Get(&shardedEntry) - if err != nil { - logger.Info("no shared entry found") - return - } - - logger.Info("retrieved chunk with shared slot", "shared_address", shardedEntry.Address, "shared_timestamp", time.Unix(int64(shardedEntry.Timestamp), 0)) - } - } - - var wg sync.WaitGroup - - for i := 0; i < 8; i++ { - wg.Add(1) - go func() { - defer wg.Done() - buf := make([]byte, swarm.SocMaxChunkSize) - for item := range iteratateItemsC { - validChunk(item, buf[:item.Location.Length]) - } - }() - } - - count := 0 - _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { - iteratateItemsC <- item - count++ - if count%100_000 == 0 { - logger.Info("..still validating chunks", "count", count) - } - return nil - }) - - close(iteratateItemsC) - - wg.Wait() -} diff --git a/pkg/storer/validate.go b/pkg/storer/validate.go new file mode 100644 index 00000000000..5eb5cfb5e45 --- /dev/null +++ b/pkg/storer/validate.go @@ -0,0 +1,143 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "fmt" + "path" + "sync" + "time" + + "github.com/ethersphere/bee/pkg/cac" + "github.com/ethersphere/bee/pkg/log" + "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/soc" + "github.com/ethersphere/bee/pkg/storage" + "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/pkg/swarm" +) + +// Validate ensures that all retrievalIndex chunks are correctly stored in sharky. +func Validate(ctx context.Context, basePath string, opts *Options) error { + + logger := opts.Logger + + store, err := initStore(basePath, opts) + if err != nil { + return fmt.Errorf("failed creating levelDB index store: %w", err) + } + defer func() { + if err := store.Close(); err != nil { + logger.Error(err, "failed closing store") + } + }() + + sharky, err := sharky.New(&dirFS{basedir: path.Join(basePath, sharkyPath)}, + sharkyNoOfShards, swarm.SocMaxChunkSize) + if err != nil { + return err + } + defer func() { + if err := sharky.Close(); err != nil { + logger.Error(err, "failed closing sharky") + } + }() + + logger.Info("performing chunk validation") + validateWork(logger, store, sharky.Read) + + return nil +} + +func validateWork(logger log.Logger, store storage.Store, readFn func(context.Context, sharky.Location, []byte) error) { + + total := 0 + socCount := 0 + invalidCount := 0 + + n := time.Now() + defer func() { + logger.Info("validation finished", "duration", time.Since(n), "invalid", invalidCount, "soc", socCount, "total", total) + }() + + iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) + + validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) { + err := readFn(context.Background(), item.Location, buf) + if err != nil { + logger.Warning("invalid chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "location", item.Location, "error", err) + return + } + + ch := swarm.NewChunk(item.Address, buf) + if !cac.Valid(ch) { + if soc.Valid(ch) { + socCount++ + logger.Debug("found soc chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0)) + } else { + invalidCount++ + logger.Warning("invalid cac/soc chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0)) + + h, err := cac.DoHash(buf[swarm.SpanSize:], buf[:swarm.SpanSize]) + if err != nil { + logger.Error(err, "cac hash") + return + } + + computedAddr := swarm.NewAddress(h) + + if !cac.Valid(swarm.NewChunk(computedAddr, buf)) { + logger.Warning("computed chunk is also an invalid cac", "err", err) + return + } + + sharedEntry := chunkstore.RetrievalIndexItem{Address: computedAddr} + err = store.Get(&sharedEntry) + if err != nil { + logger.Warning("no shared entry found") + return + } + + logger.Warning("retrieved chunk with shared slot", "shared_address", sharedEntry.Address, "shared_timestamp", time.Unix(int64(sharedEntry.Timestamp), 0)) + } + } + } + + s := time.Now() + + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + total++ + return nil + }) + logger.Info("validation count finished", "duration", time.Since(s), "total", total) + + var wg sync.WaitGroup + + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + buf := make([]byte, swarm.SocMaxChunkSize) + for item := range iteratateItemsC { + validChunk(item, buf[:item.Location.Length]) + } + }() + } + + count := 0 + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + iteratateItemsC <- item + count++ + if count%100_000 == 0 { + logger.Info("..still validating chunks", "count", count, "invalid", invalidCount, "soc", socCount, "total", total, "percent", fmt.Sprintf("%.2f", (float64(count)*100.0)/float64(total))) + } + return nil + }) + + close(iteratateItemsC) + + wg.Wait() +}