From b9b1aa338fe3e22f3822c33605c3841849de04cd Mon Sep 17 00:00:00 2001 From: notanatol Date: Mon, 5 Feb 2024 13:42:09 -0600 Subject: [PATCH] feat: integrity check api --- pkg/api/api.go | 7 ++- pkg/api/integritycheck.go | 60 ++++++++++++++++++++ pkg/api/router.go | 6 ++ pkg/node/node.go | 1 + pkg/storer/storer.go | 39 ++++++++----- pkg/storer/validate.go | 112 ++++++++++++++++++++++++-------------- 6 files changed, 169 insertions(+), 56 deletions(-) create mode 100644 pkg/api/integritycheck.go diff --git a/pkg/api/api.go b/pkg/api/api.go index 7d6ee42e175..59bab0c9b61 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -174,7 +174,9 @@ type Service struct { batchStore postage.Storer stamperStore storage.Store - syncStatus func() (bool, error) + pinIntegrity *storer.PinIntegrity + + syncStatus func() (bool, error) swap swap.Interface transaction transaction.Service @@ -242,6 +244,7 @@ type ExtraOptions struct { Steward steward.Interface SyncStatus func() (bool, error) NodeStatus *status.Service + PinIntegrity *storer.PinIntegrity } func New( @@ -348,6 +351,8 @@ func (s *Service) Configure(signer crypto.Signer, auth auth.Authenticator, trace return "", err } } + + s.pinIntegrity = e.PinIntegrity } func (s *Service) SetProbe(probe *Probe) { diff --git a/pkg/api/integritycheck.go b/pkg/api/integritycheck.go new file mode 100644 index 00000000000..a3282be704e --- /dev/null +++ b/pkg/api/integritycheck.go @@ -0,0 +1,60 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +package api + +import ( + "encoding/json" + "net/http" + + storer "github.com/ethersphere/bee/pkg/storer" + "github.com/ethersphere/bee/pkg/swarm" +) + +type PinIntegrityResponse struct { + Ref swarm.Address `json:"ref"` + Total int `json:"total"` + Missing int `json:"missing"` + Invalid int `json:"invalid"` +} + +func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { + logger := s.logger.WithName("get_pin_integrity").Build() + + querie := struct { + Ref swarm.Address `map:"ref"` + }{} + + if response := s.mapStructure(r.URL.Query(), &querie); response != nil { + response("invalid query params", logger, w) + return + } + + out := make(chan storer.PinStat) + go s.pinIntegrity.Check(logger, querie.Ref.String(), out) + + flusher, ok := w.(http.Flusher) + if !ok { + http.NotFound(w, r) + return + } + + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + enc := json.NewEncoder(w) + + for v := range out { + resp := PinIntegrityResponse{ + Ref: v.Ref, + Total: v.Total, + Missing: v.Missing, + Invalid: v.Invalid, + } + if err := enc.Encode(resp); err != nil { + break + } + flusher.Flush() + } +} diff --git a/pkg/api/router.go b/pkg/api/router.go index 7c854543b88..08aaee08fa1 100644 --- a/pkg/api/router.go +++ b/pkg/api/router.go @@ -599,4 +599,10 @@ func (s *Service) mountBusinessDebug(restricted bool) { "GET": http.HandlerFunc(s.rchash), }), )) + + handle("/check/pin", web.ChainHandlers( + web.FinalHandler(jsonhttp.MethodHandler{ + "GET": http.HandlerFunc(s.pinIntegrityHandler), + }), + )) } diff --git a/pkg/node/node.go b/pkg/node/node.go index 214cad27d4f..6be2e1ff66a 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -1079,6 +1079,7 @@ func NewBee( Steward: steward, SyncStatus: syncStatusFn, NodeStatus: nodeStatus, + PinIntegrity: localStore.PinIntegrity(), } if o.APIAddr != "" { diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 3ebb3a025d7..396fbf7cb58 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -278,15 +278,15 @@ func initDiskRepository( basePath string, locker storage.ChunkLocker, opts *Options, -) (storage.Repository, io.Closer, error) { +) (storage.Repository, *PinIntegrity, io.Closer, error) { store, err := initStore(basePath, opts) if err != nil { - return nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err) + return nil, nil, nil, fmt.Errorf("failed creating levelDB index store: %w", err) } err = migration.Migrate(store, "core-migration", localmigration.BeforeIinitSteps()) if err != nil { - return nil, nil, fmt.Errorf("failed core migration: %w", err) + return nil, nil, nil, fmt.Errorf("failed core migration: %w", err) } if opts.LdbStats.Load() != nil { @@ -338,13 +338,13 @@ func initDiskRepository( if _, err := os.Stat(sharkyBasePath); os.IsNotExist(err) { err := os.Mkdir(sharkyBasePath, 0777) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } recoveryCloser, err := sharkyRecovery(ctx, sharkyBasePath, store, opts) if err != nil { - return nil, nil, fmt.Errorf("failed to recover sharky: %w", err) + return nil, nil, nil, fmt.Errorf("failed to recover sharky: %w", err) } sharky, err := sharky.New( @@ -353,20 +353,25 @@ func initDiskRepository( swarm.SocMaxChunkSize, ) if err != nil { - return nil, nil, fmt.Errorf("failed creating sharky instance: %w", err) + return nil, nil, nil, fmt.Errorf("failed creating sharky instance: %w", err) } txStore := leveldbstore.NewTxStore(store) if err := txStore.Recover(); err != nil { - return nil, nil, fmt.Errorf("failed to recover index store: %w", err) + return nil, nil, nil, fmt.Errorf("failed to recover index store: %w", err) } txChunkStore := chunkstore.NewTxChunkStore(txStore, sharky) if err := txChunkStore.Recover(); err != nil { - return nil, nil, fmt.Errorf("failed to recover chunk store: %w", err) + return nil, nil, nil, fmt.Errorf("failed to recover chunk store: %w", err) } - return storage.NewRepository(txStore, txChunkStore, locker), closer(store, sharky, recoveryCloser), nil + pinIntegrity := &PinIntegrity{ + Store: store, + Sharky: sharky, + } + + return storage.NewRepository(txStore, txChunkStore, locker), pinIntegrity, closer(store, sharky, recoveryCloser), nil } func initCache(ctx context.Context, capacity uint64, repo storage.Repository) (*cache.Cache, error) { @@ -457,6 +462,8 @@ type DB struct { setSyncerOnce sync.Once syncer Syncer opts workerOpts + + pinIntegrity *PinIntegrity } type workerOpts struct { @@ -468,9 +475,10 @@ type workerOpts struct { // component stores. func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { var ( - repo storage.Repository - err error - dbCloser io.Closer + repo storage.Repository + err error + dbCloser io.Closer + pinIntegrity *PinIntegrity ) if opts == nil { opts = defaultOptions() @@ -497,7 +505,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { return nil, err } } else { - repo, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts) + repo, pinIntegrity, dbCloser, err = initDiskRepository(ctx, dirPath, locker, opts) if err != nil { return nil, err } @@ -550,6 +558,7 @@ func New(ctx context.Context, dirPath string, opts *Options) (*DB, error) { wakeupDuration: opts.ReserveWakeUpDuration, }, directUploadLimiter: make(chan struct{}, pusher.ConcurrentPushes), + pinIntegrity: pinIntegrity, } if db.validStamp == nil { @@ -665,6 +674,10 @@ func (db *DB) ChunkStore() storage.ReadOnlyChunkStore { return db.repo.ChunkStore() } +func (db *DB) PinIntegrity() *PinIntegrity { + return db.pinIntegrity +} + // Execute implements the internal.TxExecutor interface. func (db *DB) Execute(ctx context.Context, do func(internal.Storage) error) error { tx, commit, rollback := db.repo.NewTx(ctx) diff --git a/pkg/storer/validate.go b/pkg/storer/validate.go index ccad569455d..fb1d5707a6d 100644 --- a/pkg/storer/validate.go +++ b/pkg/storer/validate.go @@ -173,27 +173,81 @@ func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location st }() logger.Info("performing chunk validation") - validatePins(logger, store, pin, location, sharky.Read) + + pv := PinIntegrity{ + Store: store, + Sharky: sharky, + } + + var ( + fileName = "address.csv" + fileLoc = "." + ) + + if location != "" { + if path.Ext(location) != "" { + fileName = path.Base(location) + } + fileLoc = path.Dir(location) + } + + logger.Info("saving stats", "location", fileLoc, "name", fileName) + + location = path.Join(fileLoc, fileName) + + f, err := os.OpenFile(location, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("open output file for writing: %w", err) + } + + if _, err := f.WriteString("invalid\tmissing\ttotal\taddress\n"); err != nil { + return fmt.Errorf("write title: %w", err) + } + + defer f.Close() + + var ch = make(chan PinStat) + go pv.Check(logger, pin, ch) + + for st := range ch { + report := fmt.Sprintf("%d\t%d\t%d\t%s\n", st.Invalid, st.Missing, st.Total, st.Ref) + + if _, err := f.WriteString(report); err != nil { + logger.Error(err, "write report line") + break + } + } return nil } -func validatePins(logger log.Logger, store storage.Store, pin, location string, readFn func(context.Context, sharky.Location, []byte) error) { +type PinIntegrity struct { + Store storage.Store + Sharky *sharky.Store +} + +type PinStat struct { + Ref swarm.Address + Total, Missing, Invalid int +} + +func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) { var stats struct { total, read, invalid atomic.Int32 } n := time.Now() defer func() { + close(out) logger.Info("done", "duration", time.Since(n), "read", stats.read.Load(), "invalid", stats.invalid.Load(), "total", stats.total.Load()) }() validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) bool { stats.total.Add(1) - if err := readFn(context.Background(), item.Location, buf); err != nil { + if err := p.Sharky.Read(context.Background(), item.Location, buf); err != nil { stats.read.Add(1) - return true + return false } ch := swarm.NewChunk(item.Address, buf) @@ -221,7 +275,7 @@ func validatePins(logger log.Logger, store storage.Store, pin, location string, pins = append(pins, addr) } else { var err error - pins, err = pinstore.Pins(store) + pins, err = pinstore.Pins(p.Store) if err != nil { logger.Error(err, "get pins") return @@ -230,35 +284,6 @@ func validatePins(logger log.Logger, store storage.Store, pin, location string, logger.Info("got a total number of pins", "size", len(pins)) - var ( - fileName = "address.csv" - fileLoc = "." - ) - - if location != "" { - if path.Ext(location) != "" { - fileName = path.Base(location) - } - fileLoc = path.Dir(location) - } - - logger.Info("saving stats to", "location", fileLoc, "name", fileName) - - location = path.Join(fileLoc, fileName) - - f, err := os.OpenFile(location, os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - logger.Error(err, "open output file for writing") - return - } - - if _, err := f.WriteString("invalid\tmissing\ttotal\taddress\n"); err != nil { - logger.Error(err, "write title") - return - } - - defer f.Close() - for _, pin := range pins { var wg sync.WaitGroup var ( @@ -282,10 +307,10 @@ func validatePins(logger log.Logger, store storage.Store, pin, location string, logger.Info("start iteration", "pin", pin) - _ = pinstore.IterateCollection(store, pin, func(addr swarm.Address) (bool, error) { + err := pinstore.IterateCollection(p.Store, pin, func(addr swarm.Address) (bool, error) { total.Add(1) rIdx := &chunkstore.RetrievalIndexItem{Address: addr} - if err := store.Get(rIdx); err != nil { + if err := p.Store.Get(rIdx); err != nil { missing.Add(1) } else { iteratateItemsC <- rIdx @@ -293,15 +318,18 @@ func validatePins(logger log.Logger, store storage.Store, pin, location string, return false, nil }) + if err != nil { + logger.Error(err, "iterate pin", "pin", pin) + } + close(iteratateItemsC) wg.Wait() - report := fmt.Sprintf("%d\t%d\t%d\t%s\n", invalid.Load(), missing.Load(), total.Load(), pin) - - if _, err := f.WriteString(report); err != nil { - logger.Error(err, "write report line") - return - } + out <- PinStat{ + Ref: pin, + Total: int(total.Load()), + Missing: int(missing.Load()), + Invalid: int(invalid.Load())} } }