Skip to content

Commit

Permalink
feat: integrity check api
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol committed Feb 9, 2024
1 parent 5b335ea commit b9b1aa3
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 56 deletions.
7 changes: 6 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ type Service struct {

batchStore postage.Storer
stamperStore storage.Store
syncStatus func() (bool, error)
pinIntegrity *storer.PinIntegrity

syncStatus func() (bool, error)

swap swap.Interface
transaction transaction.Service
Expand Down Expand Up @@ -242,6 +244,7 @@ type ExtraOptions struct {
Steward steward.Interface
SyncStatus func() (bool, error)
NodeStatus *status.Service
PinIntegrity *storer.PinIntegrity
}

func New(
Expand Down Expand Up @@ -348,6 +351,8 @@ func (s *Service) Configure(signer crypto.Signer, auth auth.Authenticator, trace
return "", err
}
}

s.pinIntegrity = e.PinIntegrity
}

func (s *Service) SetProbe(probe *Probe) {
Expand Down
60 changes: 60 additions & 0 deletions pkg/api/integritycheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api

import (
"encoding/json"
"net/http"

storer "github.com/ethersphere/bee/pkg/storer"
"github.com/ethersphere/bee/pkg/swarm"
)

type PinIntegrityResponse struct {
Ref swarm.Address `json:"ref"`
Total int `json:"total"`
Missing int `json:"missing"`
Invalid int `json:"invalid"`
}

func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("get_pin_integrity").Build()

querie := struct {
Ref swarm.Address `map:"ref"`
}{}

if response := s.mapStructure(r.URL.Query(), &querie); response != nil {
response("invalid query params", logger, w)
return
}

out := make(chan storer.PinStat)
go s.pinIntegrity.Check(logger, querie.Ref.String(), out)

flusher, ok := w.(http.Flusher)
if !ok {
http.NotFound(w, r)
return
}

w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

enc := json.NewEncoder(w)

for v := range out {
resp := PinIntegrityResponse{
Ref: v.Ref,
Total: v.Total,
Missing: v.Missing,
Invalid: v.Invalid,
}
if err := enc.Encode(resp); err != nil {
break
}
flusher.Flush()
}
}
6 changes: 6 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,4 +599,10 @@ func (s *Service) mountBusinessDebug(restricted bool) {
"GET": http.HandlerFunc(s.rchash),
}),
))

handle("/check/pin", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.pinIntegrityHandler),
}),
))
}
1 change: 1 addition & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ func NewBee(
Steward: steward,
SyncStatus: syncStatusFn,
NodeStatus: nodeStatus,
PinIntegrity: localStore.PinIntegrity(),
}

if o.APIAddr != "" {
Expand Down
39 changes: 26 additions & 13 deletions pkg/storer/storer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,15 +278,15 @@ func initDiskRepository(
basePath string,
locker storage.ChunkLocker,
opts *Options,
) (storage.Repository, io.Closer, error) {
) (storage.Repository, *PinIntegrity, io.Closer, error) {
store, err := initStore(basePath, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
return nil, nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err)
}

err = migration.Migrate(store, "core-migration", localmigration.BeforeIinitSteps())
if err != nil {
return nil, nil, fmt.Errorf("failed core migration: %w", err)
return nil, nil, nil, fmt.Errorf("failed core migration: %w", err)
}

if opts.LdbStats.Load() != nil {
Expand Down Expand Up @@ -338,13 +338,13 @@ func initDiskRepository(
if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) {
err := os.Mkdir(sharkyBasePath, 0777)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}

recoveryCloser, err := sharkyRecovery(ctx, sharkyBasePath, store, opts)
if err != nil {
return nil, nil, fmt.Errorf("failed to recover sharky: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover sharky: %w", err)
}

sharky, err := sharky.New(
Expand All @@ -353,20 +353,25 @@ func initDiskRepository(
swarm.SocMaxChunkSize,
)
if err != nil {
return nil, nil, fmt.Errorf("failed creating sharky instance: %w", err)
return nil, nil, nil, fmt.Errorf("failed creating sharky instance: %w", err)
}

txStore := leveldbstore.NewTxStore(store)
if err := txStore.Recover(); err != nil {
return nil, nil, fmt.Errorf("failed to recover index store: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover index store: %w", err)
}

txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky)
if err := txChunkStore.Recover(); err != nil {
return nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
return nil, nil, nil, fmt.Errorf("failed to recover chunk store: %w", err)
}

return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky, recoveryCloser), nil
pinIntegrity := &PinIntegrity{
Store: store,
Sharky: sharky,
}

return storage.NewRepository(txStore, txChunkStore, locker), pinIntegrity, closer(store, sharky, recoveryCloser), nil
}

func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*cache.Cache, error) {
Expand Down Expand Up @@ -457,6 +462,8 @@ type DB struct {
setSyncerOnce sync.Once
syncer Syncer
opts workerOpts

pinIntegrity *PinIntegrity
}

type workerOpts struct {
Expand All @@ -468,9 +475,10 @@ type workerOpts struct {
// component stores.
func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
var (
repo storage.Repository
err error
dbCloser io.Closer
repo storage.Repository
err error
dbCloser io.Closer
pinIntegrity *PinIntegrity
)
if opts == nil {
opts = defaultOptions()
Expand All @@ -497,7 +505,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
return nil, err
}
} else {
repo, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
repo, pinIntegrity, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -550,6 +558,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) {
wakeupDuration: opts.ReserveWakeUpDuration,
},
directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes),
pinIntegrity: pinIntegrity,
}

if db.validStamp == nil {
Expand Down Expand Up @@ -665,6 +674,10 @@ func (db *DB) ChunkStore() storage.ReadOnlyChunkStore {
return db.repo.ChunkStore()
}

func (db *DB) PinIntegrity() *PinIntegrity {
return db.pinIntegrity
}

// Execute implements the internal.TxExecutor interface.
func (db *DB) Execute(ctx context.Context, do func(internal.Storage) error) error {
tx, commit, rollback := db.repo.NewTx(ctx)
Expand Down
112 changes: 70 additions & 42 deletions pkg/storer/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,27 +173,81 @@ func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location st
}()

logger.Info("performing chunk validation")
validatePins(logger, store, pin, location, sharky.Read)

pv := PinIntegrity{
Store: store,
Sharky: sharky,
}

var (
fileName = "address.csv"
fileLoc = "."
)

if location != "" {
if path.Ext(location) != "" {
fileName = path.Base(location)
}
fileLoc = path.Dir(location)
}

logger.Info("saving stats", "location", fileLoc, "name", fileName)

location = path.Join(fileLoc, fileName)

f, err := os.OpenFile(location, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("open output file for writing: %w", err)
}

if _, err := f.WriteString("invalid\tmissing\ttotal\taddress\n"); err != nil {
return fmt.Errorf("write title: %w", err)
}

defer f.Close()

var ch = make(chan PinStat)
go pv.Check(logger, pin, ch)

for st := range ch {
report := fmt.Sprintf("%d\t%d\t%d\t%s\n", st.Invalid, st.Missing, st.Total, st.Ref)

if _, err := f.WriteString(report); err != nil {
logger.Error(err, "write report line")
break
}
}

return nil
}

func validatePins(logger log.Logger, store storage.Store, pin, location string, readFn func(context.Context, sharky.Location, []byte) error) {
type PinIntegrity struct {
Store storage.Store
Sharky *sharky.Store
}

type PinStat struct {
Ref swarm.Address
Total, Missing, Invalid int
}

func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) {
var stats struct {
total, read, invalid atomic.Int32
}

n := time.Now()
defer func() {
close(out)
logger.Info("done", "duration", time.Since(n), "read", stats.read.Load(), "invalid", stats.invalid.Load(), "total", stats.total.Load())
}()

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) bool {
stats.total.Add(1)

if err := readFn(context.Background(), item.Location, buf); err != nil {
if err := p.Sharky.Read(context.Background(), item.Location, buf); err != nil {
stats.read.Add(1)
return true
return false
}

ch := swarm.NewChunk(item.Address, buf)
Expand Down Expand Up @@ -221,7 +275,7 @@ func validatePins(logger log.Logger, store storage.Store, pin, location string,
pins = append(pins, addr)
} else {
var err error
pins, err = pinstore.Pins(store)
pins, err = pinstore.Pins(p.Store)
if err != nil {
logger.Error(err, "get pins")
return
Expand All @@ -230,35 +284,6 @@ func validatePins(logger log.Logger, store storage.Store, pin, location string,

logger.Info("got a total number of pins", "size", len(pins))

var (
fileName = "address.csv"
fileLoc = "."
)

if location != "" {
if path.Ext(location) != "" {
fileName = path.Base(location)
}
fileLoc = path.Dir(location)
}

logger.Info("saving stats to", "location", fileLoc, "name", fileName)

location = path.Join(fileLoc, fileName)

f, err := os.OpenFile(location, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
logger.Error(err, "open output file for writing")
return
}

if _, err := f.WriteString("invalid\tmissing\ttotal\taddress\n"); err != nil {
logger.Error(err, "write title")
return
}

defer f.Close()

for _, pin := range pins {
var wg sync.WaitGroup
var (
Expand All @@ -282,26 +307,29 @@ func validatePins(logger log.Logger, store storage.Store, pin, location string,

logger.Info("start iteration", "pin", pin)

_ = pinstore.IterateCollection(store, pin, func(addr swarm.Address) (bool, error) {
err := pinstore.IterateCollection(p.Store, pin, func(addr swarm.Address) (bool, error) {
total.Add(1)
rIdx := &chunkstore.RetrievalIndexItem{Address: addr}
if err := store.Get(rIdx); err != nil {
if err := p.Store.Get(rIdx); err != nil {
missing.Add(1)
} else {
iteratateItemsC <- rIdx
}
return false, nil
})

if err != nil {
logger.Error(err, "iterate pin", "pin", pin)
}

close(iteratateItemsC)

wg.Wait()

report := fmt.Sprintf("%d\t%d\t%d\t%s\n", invalid.Load(), missing.Load(), total.Load(), pin)

if _, err := f.WriteString(report); err != nil {
logger.Error(err, "write report line")
return
}
out <- PinStat{
Ref: pin,
Total: int(total.Load()),
Missing: int(missing.Load()),
Invalid: int(invalid.Load())}
}
}

0 comments on commit b9b1aa3

Please sign in to comment.