Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pinned content integrity cmd #4565

Merged
merged 3 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ import (
"github.com/spf13/cobra"
)

const optionNameValidation = "validate"
const (
optionNameValidation = "validate"
optionNameValidationPin = "validate-pin"
optionNameCollectionPin = "pin"
optionNameOutputLocation = "output"
)

func (c *command) initDBCmd() {
cmd := &cobra.Command{
Expand All @@ -40,6 +45,7 @@ func (c *command) initDBCmd() {
dbInfoCmd(cmd)
dbCompactCmd(cmd)
dbValidateCmd(cmd)
dbValidatePinsCmd(cmd)

c.root.AddCommand(cmd)
}
Expand Down Expand Up @@ -166,6 +172,61 @@ func dbCompactCmd(cmd *cobra.Command) {
cmd.AddCommand(c)
}

func dbValidatePinsCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "validate-pin",
Short: "Validates pin collection chunks with sharky store.",
RunE: func(cmd *cobra.Command, args []string) (err error) {
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
}
if dataDir == "" {
return errors.New("no data-dir provided")
}

providedPin, err := cmd.Flags().GetString(optionNameCollectionPin)
if err != nil {
return fmt.Errorf("read pin option: %w", err)
}

outputLoc, err := cmd.Flags().GetString(optionNameOutputLocation)
if err != nil {
return fmt.Errorf("read location option: %w", err)
}

localstorePath := path.Join(dataDir, "localstore")

err = storer.ValidatePinCollectionChunks(context.Background(), localstorePath, providedPin, outputLoc, &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
}

return nil
},
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
c.Flags().String(optionNameCollectionPin, "", "only validate given pin")
c.Flags().String(optionNameOutputLocation, "", "location and name of the output file")
cmd.AddCommand(c)
}

func dbValidateCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "validate",
Expand Down
164 changes: 164 additions & 0 deletions pkg/storer/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ package storer
import (
"context"
"fmt"
"os"
"path"
"sync"
"time"

"sync/atomic"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning"
"github.com/ethersphere/bee/pkg/swarm"
)

Expand Down Expand Up @@ -141,3 +145,163 @@ func validateWork(logger log.Logger, store storage.Store, readFn func(context.Co

wg.Wait()
}

// ValidatePinCollectionChunks collects all chunk addresses that are present in a pin collection but
// are either invalid or missing altogether.
func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location string, opts *Options) error {
logger := opts.Logger

store, err := initStore(basePath, opts)
if err != nil {
return fmt.Errorf("failed creating levelDB index store: %w", err)
}
defer func() {
if err := store.Close(); err != nil {
logger.Error(err, "failed closing store")
}
}()

fs := &dirFS{basedir: path.Join(basePath, sharkyPath)}
sharky, err := sharky.New(fs, sharkyNoOfShards, swarm.SocMaxChunkSize)
if err != nil {
return err
}
defer func() {
if err := sharky.Close(); err != nil {
logger.Error(err, "failed closing sharky")
}
}()

logger.Info("performing chunk validation")
validatePins(logger, store, pin, location, sharky.Read)

return nil
}

func validatePins(logger log.Logger, store storage.Store, pin, location string, readFn func(context.Context, sharky.Location, []byte) error) {
var stats struct {
total, read, invalid atomic.Int32
}

n := time.Now()
defer func() {
logger.Info("done", "duration", time.Since(n), "read", stats.read.Load(), "invalid", stats.invalid.Load(), "total", stats.total.Load())
}()

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) bool {
stats.total.Add(1)

if err := readFn(context.Background(), item.Location, buf); err != nil {
stats.read.Add(1)
return true
}

ch := swarm.NewChunk(item.Address, buf)

if cac.Valid(ch) {
return true
}

if soc.Valid(ch) {
return true
}

stats.invalid.Add(1)

return false
}

var pins []swarm.Address

if pin != "" {
addr, err := swarm.ParseHexAddress(pin)
if err != nil {
panic(fmt.Sprintf("parse provided pin: %s", err))
}
pins = append(pins, addr)
} else {
var err error
pins, err = pinstore.Pins(store)
if err != nil {
logger.Error(err, "get pins")
return
}
}

logger.Info("got a total number of pins", "size", len(pins))

var (
fileName = "address.csv"
fileLoc = "."
)

if location != "" {
if path.Ext(location) != "" {
fileName = path.Base(location)
}
fileLoc = path.Dir(location)
}

logger.Info("saving stats to", "location", fileLoc, "name", fileName)

location = path.Join(fileLoc, fileName)

f, err := os.OpenFile(location, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
logger.Error(err, "open output file for writing")
return
}

if _, err := f.WriteString("invalid\tmissing\ttotal\taddress\n"); err != nil {
logger.Error(err, "write title")
return
}

defer f.Close()

for _, pin := range pins {
var wg sync.WaitGroup
var (
total, missing, invalid atomic.Int32
)

iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
if !validChunk(item, buf[:item.Location.Length]) {
invalid.Add(1)
}
}
}()
}

logger.Info("start iteration", "pin", pin)

_ = pinstore.IterateCollection(store, pin, func(addr swarm.Address) (bool, error) {
total.Add(1)
rIdx := &chunkstore.RetrievalIndexItem{Address: addr}
if err := store.Get(rIdx); err != nil {
missing.Add(1)
} else {
iteratateItemsC <- rIdx
}
return false, nil
})

close(iteratateItemsC)

wg.Wait()

report := fmt.Sprintf("%d\t%d\t%d\t%s\n", invalid.Load(), missing.Load(), total.Load(), pin)

if _, err := f.WriteString(report); err != nil {
logger.Error(err, "write report line")
return
}
}
}
Loading