Skip to content

Commit

Permalink
fix: add duration
Browse files Browse the repository at this point in the history
  • Loading branch information
notanatol committed Feb 7, 2024
1 parent e6825e8 commit 22292b3
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
3 changes: 2 additions & 1 deletion pkg/api/integritycheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func (s *Service) pinIntegrityHandler(w http.ResponseWriter, r *http.Request) {
}

out := make(chan storer.PinStat)
go s.pinIntegrity.Check(logger, querie.Ref.String(), out)

go s.pinIntegrity.Check(r.Context(), logger, querie.Ref.String(), out)

flusher, ok := w.(http.Flusher)
if !ok {
Expand Down
50 changes: 42 additions & 8 deletions pkg/storer/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func ValidatePinCollectionChunks(ctx context.Context, basePath, pin, location st
defer f.Close()

var ch = make(chan PinStat)
go pv.Check(logger, pin, ch)
go pv.Check(ctx, logger, pin, ch)

for st := range ch {
report := fmt.Sprintf("%d\t%d\t%d\t%s\n", st.Invalid, st.Missing, st.Total, st.Ref)
Expand All @@ -231,7 +231,7 @@ type PinStat struct {
Total, Missing, Invalid int
}

func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) {
func (p *PinIntegrity) Check(ctx context.Context, logger log.Logger, pin string, out chan PinStat) {
var stats struct {
total, read, invalid atomic.Int32
}
Expand All @@ -245,7 +245,7 @@ func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) {
validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) bool {
stats.total.Add(1)

if err := p.Sharky.Read(context.Background(), item.Location, buf); err != nil {
if err := p.Sharky.Read(ctx, item.Location, buf); err != nil {
stats.read.Add(1)
return false
}
Expand Down Expand Up @@ -284,6 +284,12 @@ func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) {

logger.Info("got a total number of pins", "size", len(pins))

var tcount, tmicrs int64
defer func() {
dur := float64(tmicrs) / float64(tcount)
logger.Info("done iterating pins", "duration", dur)
}()

for _, pin := range pins {
var wg sync.WaitGroup
var (
Expand All @@ -298,38 +304,66 @@ func (p *PinIntegrity) Check(logger log.Logger, pin string, out chan PinStat) {
defer wg.Done()
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
if ctx.Err() != nil {
break
}
if !validChunk(item, buf[:item.Location.Length]) {
invalid.Add(1)
}
}
}()
}

logger.Info("start iteration", "pin", pin)
var count, micrs int64

err := pinstore.IterateCollection(p.Store, pin, func(addr swarm.Address) (bool, error) {
n := time.Now()

defer func() {
count++
micrs += time.Since(n).Microseconds()
}()

total.Add(1)

rIdx := &chunkstore.RetrievalIndexItem{Address: addr}
if err := p.Store.Get(rIdx); err != nil {
missing.Add(1)
} else {
iteratateItemsC <- rIdx
select {
case <-ctx.Done():
return true, nil
case iteratateItemsC <- rIdx:
}
}

return false, nil
})

dur := float64(micrs) / float64(count)

if err != nil {
logger.Error(err, "iterate pin", "pin", pin)
logger.Error(err, "new iteration", "pin", pin, "duration", dur)
} else {
logger.Info("new iteration", "pin", pin, "duration", dur)
}

tcount++
tmicrs += int64(dur)

close(iteratateItemsC)

wg.Wait()

out <- PinStat{
select {
case <-ctx.Done():
logger.Info("context done")
return
case out <- PinStat{
Ref: pin,
Total: int(total.Load()),
Missing: int(missing.Load()),
Invalid: int(invalid.Load())}
Invalid: int(invalid.Load())}:
}
}
}

0 comments on commit 22292b3

Please sign in to comment.