Skip to content

Commit

Permalink
feat: support db validate command (#4435)
Browse files Browse the repository at this point in the history
  • Loading branch information
ldeffenb authored Oct 30, 2023
1 parent b0836ff commit f780811
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 80 deletions.
49 changes: 49 additions & 0 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func (c *command) initDBCmd() {
dbNukeCmd(cmd)
dbInfoCmd(cmd)
dbCompactCmd(cmd)
dbValidateCmd(cmd)

c.root.AddCommand(cmd)
}
Expand Down Expand Up @@ -165,6 +166,54 @@ func dbCompactCmd(cmd *cobra.Command) {
cmd.AddCommand(c)
}

func dbValidateCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "validate",
Short: "Validates the localstore sharky store.",
RunE: func(cmd *cobra.Command, args []string) (err error) {
v, err := cmd.Flags().GetString(optionNameVerbosity)
if err != nil {
return fmt.Errorf("get verbosity: %w", err)
}
v = strings.ToLower(v)
logger, err := newLogger(cmd, v)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

dataDir, err := cmd.Flags().GetString(optionNameDataDir)
if err != nil {
return fmt.Errorf("get data-dir: %w", err)
}
if dataDir == "" {
return errors.New("no data-dir provided")
}

logger.Warning("Validation ensures that sharky returns a chunk that hashes to the expected reference.")
logger.Warning(" Invalid chunks logged at Warning level.")
logger.Warning(" Progress logged at Info level.")
logger.Warning(" SOC chunks logged at Debug level.")

localstorePath := path.Join(dataDir, "localstore")

err = storer.Validate(context.Background(), localstorePath, &storer.Options{
Logger: logger,
RadiusSetter: noopRadiusSetter{},
Batchstore: new(postage.NoOpBatchStore),
ReserveCapacity: node.ReserveCapacity,
})
if err != nil {
return fmt.Errorf("localstore: %w", err)
}

return nil
},
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "info", "verbosity level")
cmd.AddCommand(c)
}

func dbExportCmd(cmd *cobra.Command) {
c := &cobra.Command{
Use: "export",
Expand Down
82 changes: 2 additions & 80 deletions pkg/storer/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,9 @@ import (
"fmt"
"path"
"sort"
"sync"
"time"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
)
Expand Down Expand Up @@ -50,7 +45,7 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)

if validate {
logger.Info("performing chunk validation before compaction")
validationWork(logger, store, sharkyRecover)
validateWork(logger, store, sharkyRecover.Read)
}

logger.Info("starting compaction")
Expand Down Expand Up @@ -148,81 +143,8 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)

if validate {
logger.Info("performing chunk validation after compaction")
validationWork(logger, store, sharkyRecover)
validateWork(logger, store, sharkyRecover.Read)
}

return nil
}

func validationWork(logger log.Logger, store storage.Store, sharky *sharky.Recovery) {

n := time.Now()
defer func() {
logger.Info("validation finished", "duration", time.Since(n))
}()

iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) {
err := sharky.Read(context.Background(), item.Location, buf)
if err != nil {
logger.Warning("invalid chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "location", item.Location, "error", err)
return
}

ch := swarm.NewChunk(item.Address, buf)
if !cac.Valid(ch) && !soc.Valid(ch) {

logger.Info("invalid cac/soc chunk ", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0))

h, err := cac.DoHash(buf[swarm.SpanSize:], buf[:swarm.SpanSize])
if err != nil {
logger.Error(err, "cac hash")
return
}

computedAddr := swarm.NewAddress(h)

if !cac.Valid(swarm.NewChunk(computedAddr, buf)) {
logger.Info("computed chunk is also an invalid cac")
return
}

shardedEntry := chunkstore.RetrievalIndexItem{Address: computedAddr}
err = store.Get(&shardedEntry)
if err != nil {
logger.Info("no shared entry found")
return
}

logger.Info("retrieved chunk with shared slot", "shared_address", shardedEntry.Address, "shared_timestamp", time.Unix(int64(shardedEntry.Timestamp), 0))
}
}

var wg sync.WaitGroup

for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
validChunk(item, buf[:item.Location.Length])
}
}()
}

count := 0
_ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {
iteratateItemsC <- item
count++
if count%100_000 == 0 {
logger.Info("..still validating chunks", "count", count)
}
return nil
})

close(iteratateItemsC)

wg.Wait()
}
143 changes: 143 additions & 0 deletions pkg/storer/validate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package storer

import (
"context"
"fmt"
"path"
"sync"
"time"

"github.com/ethersphere/bee/pkg/cac"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/sharky"
"github.com/ethersphere/bee/pkg/soc"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storer/internal/chunkstore"
"github.com/ethersphere/bee/pkg/swarm"
)

// Validate ensures that all retrievalIndex chunks are correctly stored in sharky.
func Validate(ctx context.Context, basePath string, opts *Options) error {

logger := opts.Logger

store, err := initStore(basePath, opts)
if err != nil {
return fmt.Errorf("failed creating levelDB index store: %w", err)
}
defer func() {
if err := store.Close(); err != nil {
logger.Error(err, "failed closing store")
}
}()

sharky, err := sharky.New(&dirFS{basedir: path.Join(basePath, sharkyPath)},
sharkyNoOfShards, swarm.SocMaxChunkSize)
if err != nil {
return err
}
defer func() {
if err := sharky.Close(); err != nil {
logger.Error(err, "failed closing sharky")
}
}()

logger.Info("performing chunk validation")
validateWork(logger, store, sharky.Read)

return nil
}

func validateWork(logger log.Logger, store storage.Store, readFn func(context.Context, sharky.Location, []byte) error) {

total := 0
socCount := 0
invalidCount := 0

n := time.Now()
defer func() {
logger.Info("validation finished", "duration", time.Since(n), "invalid", invalidCount, "soc", socCount, "total", total)
}()

iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) {
err := readFn(context.Background(), item.Location, buf)
if err != nil {
logger.Warning("invalid chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "location", item.Location, "error", err)
return
}

ch := swarm.NewChunk(item.Address, buf)
if !cac.Valid(ch) {
if soc.Valid(ch) {
socCount++
logger.Debug("found soc chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0))
} else {
invalidCount++
logger.Warning("invalid cac/soc chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0))

h, err := cac.DoHash(buf[swarm.SpanSize:], buf[:swarm.SpanSize])
if err != nil {
logger.Error(err, "cac hash")
return
}

computedAddr := swarm.NewAddress(h)

if !cac.Valid(swarm.NewChunk(computedAddr, buf)) {
logger.Warning("computed chunk is also an invalid cac", "err", err)
return
}

sharedEntry := chunkstore.RetrievalIndexItem{Address: computedAddr}
err = store.Get(&sharedEntry)
if err != nil {
logger.Warning("no shared entry found")
return
}

logger.Warning("retrieved chunk with shared slot", "shared_address", sharedEntry.Address, "shared_timestamp", time.Unix(int64(sharedEntry.Timestamp), 0))
}
}
}

s := time.Now()

_ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {
total++
return nil
})
logger.Info("validation count finished", "duration", time.Since(s), "total", total)

var wg sync.WaitGroup

for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
defer wg.Done()
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
validChunk(item, buf[:item.Location.Length])
}
}()
}

count := 0
_ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {
iteratateItemsC <- item
count++
if count%100_000 == 0 {
logger.Info("..still validating chunks", "count", count, "invalid", invalidCount, "soc", socCount, "total", total, "percent", fmt.Sprintf("%.2f", (float64(count)*100.0)/float64(total)))
}
return nil
})

close(iteratateItemsC)

wg.Wait()
}

0 comments on commit f780811

Please sign in to comment.