diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index e2887090664..ec94ff7f6d8 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -26,7 +26,12 @@ import ( "github.com/spf13/cobra" ) -const optionNameValidation = "validate" +const ( + optionNameValidation = "validate" + optionNameValidationPin = "validate-pin" + optionNameCollectionPin = "pin" + optionNameOutputLocation = "output" +) func (c *command) initDBCmd() { cmd := &cobra.Command{ @@ -40,6 +45,7 @@ func (c *command) initDBCmd() { dbInfoCmd(cmd) dbCompactCmd(cmd) dbValidateCmd(cmd) + dbValidatePinsCmd(cmd) c.root.AddCommand(cmd) } @@ -166,6 +172,61 @@ func dbCompactCmd(cmd *cobra.Command) { cmd.AddCommand(c) } +func dbValidatePinsCmd(cmd *cobra.Command) { + c := &cobra.Command{ + Use: "validate-pin", + Short: "Validates pin collection chunks with sharky store.", + RunE: func(cmd *cobra.Command, args []string) (err error) { + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + dataDir, err := cmd.Flags().GetString(optionNameDataDir) + if err != nil { + return fmt.Errorf("get data-dir: %w", err) + } + if dataDir == "" { + return errors.New("no data-dir provided") + } + + providedPin, err := cmd.Flags().GetString(optionNameCollectionPin) + if err != nil { + return fmt.Errorf("read pin option: %w", err) + } + + outputLoc, err := cmd.Flags().GetString(optionNameOutputLocation) + if err != nil { + return fmt.Errorf("read location option: %w", err) + } + + localstorePath := path.Join(dataDir, "localstore") + + err = storer.ValidatePinCollectionChunks(context.Background(), localstorePath, providedPin, outputLoc, &storer.Options{ + Logger: logger, + RadiusSetter: noopRadiusSetter{}, + Batchstore: new(postage.NoOpBatchStore), + ReserveCapacity: node.ReserveCapacity, + }) + if err != nil { + return fmt.Errorf("localstore: %w", err) + } + + return nil + }, + } + c.Flags().String(optionNameDataDir, "", "data directory") + c.Flags().String(optionNameVerbosity, "info", "verbosity level") + c.Flags().String(optionNameCollectionPin, "", "only validate given pin") + c.Flags().String(optionNameOutputLocation, "", "location and name of the output file") + cmd.AddCommand(c) +} + func dbValidateCmd(cmd *cobra.Command) { c := &cobra.Command{ Use: "validate", diff --git a/pkg/storer/validate.go b/pkg/storer/validate.go index 5eb5cfb5e45..ccad569455d 100644 --- a/pkg/storer/validate.go +++ b/pkg/storer/validate.go @@ -7,16 +7,20 @@ package storer import ( "context" "fmt" + "os" "path" "sync" "time" + "sync/atomic" + "github.com/ethersphere/bee/pkg/cac" "github.com/ethersphere/bee/pkg/log" "github.com/ethersphere/bee/pkg/sharky" "github.com/ethersphere/bee/pkg/soc" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" "github.com/ethersphere/bee/pkg/swarm" ) @@ -141,3 +145,163 @@ func validateWork(logger log.Logger, store storage.Store, readFn func(context.Co wg.Wait() } + +// ValidatePinCollectionChunks collects all chunk addresses that are present in a pin collection but +// are either invalid or missing altogether. +func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location string, opts *Options) error { + logger := opts.Logger + + store, err := initStore(basePath, opts) + if err != nil { + return fmt.Errorf("failed creating levelDB index store: %w", err) + } + defer func() { + if err := store.Close(); err != nil { + logger.Error(err, "failed closing store") + } + }() + + fs := &dirFS{basedir: path.Join(basePath, sharkyPath)} + sharky, err := sharky.New(fs, sharkyNoOfShards, swarm.SocMaxChunkSize) + if err != nil { + return err + } + defer func() { + if err := sharky.Close(); err != nil { + logger.Error(err, "failed closing sharky") + } + }() + + logger.Info("performing chunk validation") + validatePins(logger, store, pin, location, sharky.Read) + + return nil +} + +func validatePins(logger log.Logger, store storage.Store, pin, location string, readFn func(context.Context, sharky.Location, []byte) error) { + var stats struct { + total, read, invalid atomic.Int32 + } + + n := time.Now() + defer func() { + logger.Info("done", "duration", time.Since(n), "read", stats.read.Load(), "invalid", stats.invalid.Load(), "total", stats.total.Load()) + }() + + validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) bool { + stats.total.Add(1) + + if err := readFn(context.Background(), item.Location, buf); err != nil { + stats.read.Add(1) + return true + } + + ch := swarm.NewChunk(item.Address, buf) + + if cac.Valid(ch) { + return true + } + + if soc.Valid(ch) { + return true + } + + stats.invalid.Add(1) + + return false + } + + var pins []swarm.Address + + if pin != "" { + addr, err := swarm.ParseHexAddress(pin) + if err != nil { + panic(fmt.Sprintf("parse provided pin: %s", err)) + } + pins = append(pins, addr) + } else { + var err error + pins, err = pinstore.Pins(store) + if err != nil { + logger.Error(err, "get pins") + return + } + } + + logger.Info("got a total number of pins", "size", len(pins)) + + var ( + fileName = "address.csv" + fileLoc = "." + ) + + if location != "" { + if path.Ext(location) != "" { + fileName = path.Base(location) + } + fileLoc = path.Dir(location) + } + + logger.Info("saving stats to", "location", fileLoc, "name", fileName) + + location = path.Join(fileLoc, fileName) + + f, err := os.OpenFile(location, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + logger.Error(err, "open output file for writing") + return + } + + if _, err := f.WriteString("invalid\tmissing\ttotal\taddress\n"); err != nil { + logger.Error(err, "write title") + return + } + + defer f.Close() + + for _, pin := range pins { + var wg sync.WaitGroup + var ( + total, missing, invalid atomic.Int32 + ) + + iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) + + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + buf := make([]byte, swarm.SocMaxChunkSize) + for item := range iteratateItemsC { + if !validChunk(item, buf[:item.Location.Length]) { + invalid.Add(1) + } + } + }() + } + + logger.Info("start iteration", "pin", pin) + + _ = pinstore.IterateCollection(store, pin, func(addr swarm.Address) (bool, error) { + total.Add(1) + rIdx := &chunkstore.RetrievalIndexItem{Address: addr} + if err := store.Get(rIdx); err != nil { + missing.Add(1) + } else { + iteratateItemsC <- rIdx + } + return false, nil + }) + + close(iteratateItemsC) + + wg.Wait() + + report := fmt.Sprintf("%d\t%d\t%d\t%s\n", invalid.Load(), missing.Load(), total.Load(), pin) + + if _, err := f.WriteString(report); err != nil { + logger.Error(err, "write report line") + return + } + } +}