From 22292b36ebd3aec1897fd66967147644127f0d96 Mon Sep 17 00:00:00 2001 From: notanatol Date: Mon, 5 Feb 2024 15:45:00 -0600 Subject: [PATCH] fix: add duration --- pkg/api/integritycheck.go | 3 ++- pkg/storer/validate.go | 50 ++++++++++++++++++++++++++++++++------- 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/pkg/api/integritycheck.go b/pkg/api/integritycheck.go index a3282be704e..75b90b5222d 100644 --- a/pkg/api/integritycheck.go +++ b/pkg/api/integritycheck.go @@ -31,7 +31,8 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) { } out := make(chan storer.PinStat) - go s.pinIntegrity.Check(logger, querie.Ref.String(), out) + + go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out) flusher, ok := w.(http.Flusher) if !ok { diff --git a/pkg/storer/validate.go b/pkg/storer/validate.go index fb1d5707a6d..140d1b4d26a 100644 --- a/pkg/storer/validate.go +++ b/pkg/storer/validate.go @@ -207,7 +207,7 @@ func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location st defer f.Close() var ch = make(chan PinStat) - go pv.Check(logger, pin, ch) + go pv.Check(ctx, logger, pin, ch) for st := range ch { report := fmt.Sprintf("%d\t%d\t%d\t%s\n", st.Invalid, st.Missing, st.Total, st.Ref) @@ -231,7 +231,7 @@ type PinStat struct { Total, Missing, Invalid int } -func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) { +func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat) { var stats struct { total, read, invalid atomic.Int32 } @@ -245,7 +245,7 @@ func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) { validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) bool { stats.total.Add(1) - if err := p.Sharky.Read(context.Background(), item.Location, buf); err != nil { + if err := p.Sharky.Read(ctx, item.Location, buf); err != nil { stats.read.Add(1) return false } @@ -284,6 +284,12 @@ func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) { logger.Info("got a total number of pins", "size", len(pins)) + var tcount, tmicrs int64 + defer func() { + dur := float64(tmicrs) / float64(tcount) + logger.Info("done iterating pins", "duration", dur) + }() + for _, pin := range pins { var wg sync.WaitGroup var ( @@ -298,6 +304,9 @@ func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) { defer wg.Done() buf := make([]byte, swarm.SocMaxChunkSize) for item := range iteratateItemsC { + if ctx.Err() != nil { + break + } if !validChunk(item, buf[:item.Location.Length]) { invalid.Add(1) } @@ -305,31 +314,56 @@ func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) { }() } - logger.Info("start iteration", "pin", pin) + var count, micrs int64 err := pinstore.IterateCollection(p.Store, pin, func(addr swarm.Address) (bool, error) { + n := time.Now() + + defer func() { + count++ + micrs += time.Since(n).Microseconds() + }() + total.Add(1) + rIdx := &chunkstore.RetrievalIndexItem{Address: addr} if err := p.Store.Get(rIdx); err != nil { missing.Add(1) } else { - iteratateItemsC <- rIdx + select { + case <-ctx.Done(): + return true, nil + case iteratateItemsC <- rIdx: + } } + return false, nil }) + dur := float64(micrs) / float64(count) + if err != nil { - logger.Error(err, "iterate pin", "pin", pin) + logger.Error(err, "new iteration", "pin", pin, "duration", dur) + } else { + logger.Info("new iteration", "pin", pin, "duration", dur) } + tcount++ + tmicrs += int64(dur) + close(iteratateItemsC) wg.Wait() - out <- PinStat{ + select { + case <-ctx.Done(): + logger.Info("context done") + return + case out <- PinStat{ Ref: pin, Total: int(total.Load()), Missing: int(missing.Load()), - Invalid: int(invalid.Load())} + Invalid: int(invalid.Load())}: + } } }