From e30e101f2e2d6e8b787b89e2f6c06b9dd80d4dcb Mon Sep 17 00:00:00 2001 From: notanatol Date: Tue, 20 Feb 2024 17:09:41 -0600 Subject: [PATCH] feat: ref cnt and sharky --- cmd/bee/cmd/db.go | 134 ++++++++++ pkg/storer/fix_refcnt.go | 283 ++++++++++++++++++++++ pkg/storer/fix_sharky.go | 75 ++++++ pkg/storer/internal/cache/cache.go | 10 + pkg/storer/internal/pinning/pinning.go | 28 +++ pkg/storer/internal/reserve/reserve.go | 16 ++ pkg/storer/internal/upload/uploadstore.go | 12 + pkg/storer/validate.go | 109 +++++++++ 8 files changed, 667 insertions(+) create mode 100644 pkg/storer/fix_refcnt.go create mode 100644 pkg/storer/fix_sharky.go diff --git a/cmd/bee/cmd/db.go b/cmd/bee/cmd/db.go index ec94ff7f6d8..42ee8878280 100644 --- a/cmd/bee/cmd/db.go +++ b/cmd/bee/cmd/db.go @@ -31,6 +31,7 @@ const ( optionNameValidationPin = "validate-pin" optionNameCollectionPin = "pin" optionNameOutputLocation = "output" + optionNameRepair = "repair" ) func (c *command) initDBCmd() { @@ -172,6 +173,139 @@ func dbCompactCmd(cmd *cobra.Command) { cmd.AddCommand(c) } +func dbFixRefCntCmd(cmd *cobra.Command) { + c := &cobra.Command{ + Use: "fixrefcnt", + Short: "Recalculates chunk reference counters.", + RunE: func(cmd *cobra.Command, args []string) (err error) { + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + dataDir, err := cmd.Flags().GetString(optionNameDataDir) + if err != nil { + return fmt.Errorf("get data-dir: %w", err) + } + if dataDir == "" { + return errors.New("no data-dir provided") + } + + repair, err := cmd.Flags().GetBool(optionNameRepair) + if err != nil { + return fmt.Errorf("get repair: %w", err) + } + + logger.Warning("fixrefcnt recalculates the chunk reference counter for all known chunks.") + if !repair { + logger.Warning("By default, it will only report discrepancies unless --" + optionNameRepair + " is specified.") + } + logger.Warning("Also, fixrefcnt will not reduce the RefCnt below 1. This may change in the future.") + logger.Warning(" Discrepancies logged at Warning level.") + logger.Warning(" Progress logged at Info level.") + logger.Warning(" ???? logged at Debug level.") + + if repair { + logger.Warning("starting to repair the DB with data-dir", "path", dataDir) + logger.Warning("this is VERY experimental and may completely corrupt your localstore!") + logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...") + time.Sleep(10 * time.Second) + logger.Warning("proceeding with database repair...") + } + + localstorePath := path.Join(dataDir, "localstore") + + err = storer.FixRefCnt(context.Background(), localstorePath, &storer.Options{ + Logger: logger, + RadiusSetter: noopRadiusSetter{}, + Batchstore: new(postage.NoOpBatchStore), + ReserveCapacity: node.ReserveCapacity, + }, repair) + if err != nil { + return fmt.Errorf("localstore: %w", err) + } + + return nil + }, + } + c.Flags().String(optionNameDataDir, "", "data directory") + c.Flags().String(optionNameVerbosity, "info", "verbosity level") + c.Flags().Bool(optionNameRepair, false, "actually (attempt to) FIX any discovered discrepancies") + cmd.AddCommand(c) +} + +func dbFixSharkyCmd(cmd *cobra.Command) { + c := &cobra.Command{ + Use: "fixsharky", + Short: "Removes invalid references to the sharky store.", + RunE: func(cmd *cobra.Command, args []string) (err error) { + v, err := cmd.Flags().GetString(optionNameVerbosity) + if err != nil { + return fmt.Errorf("get verbosity: %w", err) + } + v = strings.ToLower(v) + logger, err := newLogger(cmd, v) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + dataDir, err := cmd.Flags().GetString(optionNameDataDir) + if err != nil { + return fmt.Errorf("get data-dir: %w", err) + } + if dataDir == "" { + return errors.New("no data-dir provided") + } + + repair, err := cmd.Flags().GetBool(optionNameRepair) + if err != nil { + return fmt.Errorf("get repair: %w", err) + } + + logger.Warning("fixsharky ensures that all references to sharky return a chunk that hashes to the expected reference.") + if !repair { + logger.Warning("By default, it will only report issues (like validate) unless --" + optionNameRepair + " is specified.") + } + logger.Warning("Note: This WILL leave (hopefully less impactful) inconsistencies in your localstore.") + logger.Warning(" Discrepancies logged at Warning level.") + logger.Warning(" Progress logged at Info level.") + logger.Warning(" SOC chunks logged at Debug level.") + + if repair { + logger.Warning("starting to repair the DB with data-dir", "path", dataDir) + logger.Warning("this is VERY experimental and may completely corrupt your localstore!") + logger.Warning("you have another 10 seconds to change your mind and kill this process with CTRL-C...") + time.Sleep(10 * time.Second) + logger.Warning("proceeding with database repair...") + } + + localstorePath := path.Join(dataDir, "localstore") + + err = storer.FixSharky(context.Background(), localstorePath, &storer.Options{ + Logger: logger, + RadiusSetter: noopRadiusSetter{}, + Batchstore: new(postage.NoOpBatchStore), + ReserveCapacity: node.ReserveCapacity, + }, repair) + if err != nil { + return fmt.Errorf("localstore: %w", err) + } + + return nil + }, + } + c.Flags().String(optionNameDataDir, "", "data directory") + c.Flags().String(optionNameVerbosity, "info", "verbosity level") + c.Flags().Bool(optionNameRepair, false, "actually (attempt to) FIX any discovered inconsistencies") + c.Flags().Bool(optionNameValidation, false, "run chunk validation checks again after any repairs") + cmd.AddCommand(c) +} + func dbValidatePinsCmd(cmd *cobra.Command) { c := &cobra.Command{ Use: "validate-pin", diff --git a/pkg/storer/fix_refcnt.go b/pkg/storer/fix_refcnt.go new file mode 100644 index 00000000000..0bc9f46152f --- /dev/null +++ b/pkg/storer/fix_refcnt.go @@ -0,0 +1,283 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "errors" + "fmt" + "sort" + "time" + + "github.com/ethersphere/bee/pkg/storer/internal/cache" + "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + pinstore "github.com/ethersphere/bee/pkg/storer/internal/pinning" + "github.com/ethersphere/bee/pkg/storer/internal/reserve" + "github.com/ethersphere/bee/pkg/storer/internal/upload" + "github.com/ethersphere/bee/pkg/swarm" +) + +// FixRefCnt attempts to correct the RefCnt in all RetrievalIndexItems by scanning the actual chunk referers. +func FixRefCnt(ctx context.Context, basePath string, opts *Options, repair bool) error { + logger := opts.Logger + + store, err := initStore(basePath, opts) + if err != nil { + return fmt.Errorf("failed creating levelDB index store: %w", err) + } + defer func() { + if err := store.Close(); err != nil { + logger.Error(err, "failed closing store") + } + }() + + statusInterval := 15 * time.Second + ticked := false + ticker := time.NewTicker(statusInterval) + go func() { + for range ticker.C { + ticked = true + } + }() + + startChunkCount := time.Now() + total := 0 + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + total++ + if ticked { + ticked = false + logger.Info("..still counting chunks", "total", total, "elapsed", time.Since(startChunkCount).Round(time.Second)) + } + return nil + }) + ticker.Stop() + logger.Info("chunk count finished", "total", total, "elapsed", time.Since(startChunkCount).Round(time.Second)) + + pinTotal := -1 + if false { + ticked = false + ticker.Reset(statusInterval) + startPinCount := time.Now() + _ = pinstore.IteratePinnedChunks(store, func(addr swarm.Address) (bool, error) { + pinTotal++ + if ticked { + ticked = false + logger.Info("..still counting pins", "total", pinTotal, "elapsed", time.Since(startPinCount).Round(time.Second)) + } + return false, nil + }) + logger.Info("pin count finished", "total", pinTotal, "elapsed", time.Since(startPinCount).Round(time.Second)) + } + + type ItemInfo struct { + item *chunkstore.RetrievalIndexItem + pinCnt uint32 + reserveCnt uint32 + cacheCnt uint32 + uploadCnt uint32 + batches [][]byte + } + + start := time.Now() + repaired := 0 + processed := 0 + discovered := 0 + shouldBeZero := 0 + chunksPerPass := 50_000_000 + for { + n := time.Now() + skip := processed + ticked = false + ticker.Reset(statusInterval) + logger.Info("searching chunks", "skip", skip) + + items := make(map[string]*ItemInfo) + // we deliberately choose to iterate the whole store again for each shard + // so that we do not store all the items in memory (for operators with huge localstores) + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + if ticked { + ticked = false + logger.Info("..still searching chunks", "skip", skip, "found", len(items), "total", total, "elapsed", time.Since(n).Round(time.Second)) + } + if skip > 0 { + skip-- + return nil + } + items[item.Address.String()] = &ItemInfo{item: item} + if len(items) >= chunksPerPass { + return errors.New("found enough chunks") + } + return nil + }) + ticker.Stop() + + logger.Info("found chunks", "found", len(items), "processed", processed, "total", total, "elapsed", time.Since(n).Round(time.Second)) + + logger.Info("Scanning Cache") + cacheCnt := 0 + cacheSeen := 0 + startCache := time.Now() + ticked = false + ticker.Reset(statusInterval) + _ = cache.IterateCachedChunks(store, func(address swarm.Address) error { + cacheSeen++ + _, exists := items[address.String()] + if exists { + items[address.String()].cacheCnt++ + cacheCnt++ + } + if ticked { + ticked = false + logger.Info("..still scanning cache", "scanned", cacheSeen, "cached", cacheCnt, "elapsed", time.Since(startCache).Round(time.Second)) + } + return nil + }) + ticker.Stop() + logger.Debug("scanned cache", "cached", cacheCnt, "total", cacheSeen, "elapsed", time.Since(startCache).Round(time.Second)) + logger.Info("Scanning Reserve") + reserveCnt := 0 + var reserveMax uint32 + reserveSeen := 0 + startReserve := time.Now() + ticked = false + ticker.Reset(statusInterval) + _ = reserve.IterateReserve(store, func(bri *reserve.BatchRadiusItem) (bool, error) { + reserveSeen++ + _, exists := items[bri.Address.String()] + if exists { + items[bri.Address.String()].reserveCnt++ + if items[bri.Address.String()].reserveCnt > reserveMax { + reserveMax = items[bri.Address.String()].reserveCnt + } + items[bri.Address.String()].batches = append(items[bri.Address.String()].batches, bri.BatchID) + reserveCnt++ + } + if ticked { + ticked = false + logger.Info("..still scanning reserve", "scanned", reserveSeen, "reserved", reserveCnt, "maxDupes", reserveMax, "elapsed", time.Since(startReserve).Round(time.Second)) + } + return false, nil + }) + ticker.Stop() + logger.Info("scanned reserve", "reserved", reserveCnt, "total", reserveSeen, "elapsed", time.Since(startReserve).Round(time.Second)) + + logger.Info("Scanning Uploads") + uploadCnt := 0 + uploadSeen := 0 + startUpload := time.Now() + ticked = false + ticker.Reset(statusInterval) + _ = upload.IterateAllPushItems(store, func(address swarm.Address) (bool, error) { + uploadSeen++ + _, exists := items[address.String()] + if exists { + items[address.String()].uploadCnt++ + uploadCnt++ + } + if ticked { + ticked = false + logger.Info("..still scanning uploads", "scanned", uploadSeen, "uploads", uploadCnt, "elapsed", time.Since(startUpload).Round(time.Second)) + } + return false, nil + }) + ticker.Stop() + logger.Debug("scanned uploads", "uploads", uploadCnt, "total", uploadSeen, "elapsed", time.Since(startUpload).Round(time.Second)) + + logger.Info("Scanning Pins") + pinScanCount := 0 + pinCnt := 0 + var pinMax uint32 + startPins := time.Now() + ticked = false + ticker.Reset(statusInterval) + _ = pinstore.IteratePinnedChunks(store, func(addr swarm.Address) (bool, error) { + pinScanCount++ + _, exists := items[addr.String()] + if exists { + items[addr.String()].pinCnt++ + if items[addr.String()].pinCnt > pinMax { + pinMax = items[addr.String()].pinCnt + } + pinCnt++ + } else if len(items) == total { // Don't complain about missing pins if we can't handle all chunks at once + logger.Debug("missing pinned chunk", "address", addr) + } + if ticked { + ticked = false + logger.Info("..still scanning pins", "scanned", pinScanCount, "pinned", pinCnt, "total", pinTotal, "maxDupes", pinMax, "elapsed", time.Since(startPins).Round(time.Second)) + } + return false, nil + }) + ticker.Stop() + if pinTotal == -1 { + pinTotal = pinScanCount + } + logger.Info("scanned pins", "pinned", pinCnt, "total", pinTotal, "elapsed", time.Since(startPins).Round(time.Second)) + + batch, err := store.Batch(ctx) + if err != nil { + logger.Error(err, "Failed to create batch") + } + batchHits := 0 + + logger.Info("Summarizing refCnts") + refs := make(map[uint32]uint32) + for _, item := range items { + refs[item.item.RefCnt]++ + newRefCnt := item.pinCnt*100 + item.reserveCnt + item.cacheCnt + item.uploadCnt + if newRefCnt != item.item.RefCnt { + discovered++ + if item.item.RefCnt == 1 && newRefCnt == 0 { + shouldBeZero++ + } else { + logger.Warning("Mismatched RefCnt", "original", item.item.RefCnt, "new", newRefCnt, "chunk", item.item.Address, "pins", item.pinCnt, "reserve", item.reserveCnt, "cache", item.cacheCnt, "upload", item.uploadCnt, "batchHits", batchHits) + if repair { + orgRefCnt := item.item.RefCnt + item.item.RefCnt = max(1, newRefCnt) + if err := batch.Put(item.item); err != nil { + logger.Error(err, "Failed to update RefCnt", "chunk", item.item.Address, "from", orgRefCnt, "to", newRefCnt) + } + batchHits = batchHits + 1 + if batchHits >= 20000 { + if err := batch.Commit(); err != nil { + logger.Error(err, "batch.Commit failed") + } + batch, err = store.Batch(ctx) + if err != nil { + logger.Error(err, "Failed to create batch") + } + batchHits = 0 + } + repaired++ + } else if newRefCnt == 0 { + shouldBeZero++ + } + } + } + } + if err := batch.Commit(); err != nil { + logger.Error(err, "batch.Commit failed") + } + + keys := make([]int, 0, len(refs)) + for k := range refs { + keys = append(keys, int(k)) + } + sort.Ints(keys) + + for _, k := range keys { + logger.Debug("refCnt distribution", "refCnt", k, "count", refs[uint32(k)]) + } + + processed += len(items) + logger.Info("scanned chunks", "discovered", discovered, "repaired", repaired, "shouldBeZero", shouldBeZero, "processed", processed, "total", total, "elapsed", time.Since(start).Round(time.Second)) + + if len(items) < chunksPerPass { // Incomplete means that we hit the end! + break + } + } + + return nil +} diff --git a/pkg/storer/fix_sharky.go b/pkg/storer/fix_sharky.go new file mode 100644 index 00000000000..0097499c95e --- /dev/null +++ b/pkg/storer/fix_sharky.go @@ -0,0 +1,75 @@ +// Copyright 2023 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package storer + +import ( + "context" + "fmt" + "path" + "time" + + "github.com/ethersphere/bee/pkg/sharky" + "github.com/ethersphere/bee/pkg/storer/internal/chunkstore" + "github.com/ethersphere/bee/pkg/swarm" +) + +// FixSharky removes all invalid RetrievalIndexItems which are not properly read from sharky. +func FixSharky(ctx context.Context, basePath string, opts *Options, repair bool) error { + logger := opts.Logger + + store, err := initStore(basePath, opts) + if err != nil { + return fmt.Errorf("failed creating levelDB index store: %w", err) + } + + defer func() { + if err := store.Close(); err != nil { + logger.Error(err, "failed closing store") + } + }() + + sharkyRecover, err := sharky.NewRecovery(path.Join(basePath, sharkyPath), sharkyNoOfShards, swarm.SocMaxChunkSize) + if err != nil { + return err + } + + defer func() { + if err := sharkyRecover.Close(); err != nil { + logger.Error(err, "failed closing sharky recovery") + } + }() + + logger.Info("locating invalid chunks") + invalids := getInvalidChunks(logger, store, sharkyRecover.Read) + + if len(invalids) == 0 || !repair { + return nil + } + + logger.Info("removing invalid chunks", "count", len(invalids)) + + n := time.Now() + + batch, err := store.Batch(ctx) + if err != nil { + return err + } + + for _, invalid := range invalids { + item := &chunkstore.RetrievalIndexItem{Address: *invalid} + if err := batch.Delete(item); err != nil { + return fmt.Errorf("store delete: %w", err) + } + logger.Info("removing", "reference", *invalid) + } + + if err := batch.Commit(); err != nil { + return err + } + + logger.Info("removal finished", "duration", time.Since(n)) + + return nil +} diff --git a/pkg/storer/internal/cache/cache.go b/pkg/storer/internal/cache/cache.go index 887f2a27ae9..cebab29008e 100644 --- a/pkg/storer/internal/cache/cache.go +++ b/pkg/storer/internal/cache/cache.go @@ -260,6 +260,16 @@ func (c *Cache) ShallowCopy( return nil } +// Iterate iterates over entire cache with a call back. +func IterateCachedChunks(st storage.Store, callBackFunc func(address swarm.Address) error) error { + return st.Iterate(storage.Query{ + Factory: func() storage.Item { return new(cacheEntry) }, + }, func(r storage.Result) (bool, error) { + entry := r.Entry.(*cacheEntry) + return false, callBackFunc(entry.Address) + }) +} + // RemoveOldest removes the oldest cache entries from the store. The count // specifies the number of entries to remove. func (c *Cache) RemoveOldest(ctx context.Context, store internal.Storage, chStore storage.ChunkStore, count uint64) error { diff --git a/pkg/storer/internal/pinning/pinning.go b/pkg/storer/internal/pinning/pinning.go index 8fb25ba92ad..d0870d8b60c 100644 --- a/pkg/storer/internal/pinning/pinning.go +++ b/pkg/storer/internal/pinning/pinning.go @@ -477,3 +477,31 @@ func IterateCollectionStats(st storage.Store, iterateFn func(st CollectionStat) }, ) } + +func IteratePinnedChunks(st storage.Store, fn func(addr swarm.Address) (bool, error)) error { + var stop bool + err := st.Iterate(storage.Query{ + Factory: func() storage.Item { return new(pinCollectionItem) }, + }, func(r storage.Result) (bool, error) { + UUID := r.Entry.(*pinCollectionItem).UUID + err := st.Iterate(storage.Query{ + Factory: func() storage.Item { return &pinChunkItem{UUID: UUID} }, + ItemProperty: storage.QueryItemID, + }, func(r storage.Result) (bool, error) { + addr := swarm.NewAddress([]byte(r.ID)) + stop, err := fn(addr) + if err != nil { + return true, err + } + return stop, nil + }) + if err != nil { + return true, err + } + return stop, nil + }) + if err != nil { + return fmt.Errorf("pin store: failed iterating root refs: %w", err) + } + return nil +} diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index 47c843e48b1..19eb38339af 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -510,3 +510,19 @@ func (r *Reserve) IncBinID(store storage.Store, bin uint8) (uint64, error) { return item.BinID, store.Put(item) } + +func IterateReserve(store storage.Store, cb func(*BatchRadiusItem) (bool, error)) error { + err := store.Iterate(storage.Query{ + Factory: func() storage.Item { return &BatchRadiusItem{} }, + }, func(res storage.Result) (bool, error) { + item := res.Entry.(*BatchRadiusItem) + + stop, err := cb(item) + if stop || err != nil { + return true, err + } + return false, nil + }) + + return err +} diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 81fed2a8bcb..b69198df021 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -856,6 +856,18 @@ func IterateAllTagItems(st storage.Store, cb func(ti *TagItem) (bool, error)) er ) } +func IterateAllPushItems(st storage.Store, iterateFn func(addr swarm.Address) (bool, error)) error { + return st.Iterate( + storage.Query{ + Factory: func() storage.Item { return new(pushItem) }, + }, + func(r storage.Result) (bool, error) { + address := r.Entry.(*pushItem).Address + return iterateFn(address) + }, + ) +} + // BatchIDForChunk returns the first known batchID for the given chunk address. func BatchIDForChunk(st storage.Store, addr swarm.Address) ([]byte, error) { var batchID []byte diff --git a/pkg/storer/validate.go b/pkg/storer/validate.go index ccad569455d..47f4d781fd6 100644 --- a/pkg/storer/validate.go +++ b/pkg/storer/validate.go @@ -305,3 +305,112 @@ func validatePins(logger log.Logger, store storage.Store, pin, location string, } } } + +func getInvalidChunks(logger log.Logger, store storage.Store, readFn func(context.Context, sharky.Location, []byte) error) []*swarm.Address { + + total := 0 + socCount := 0 + invalidCount := 0 + var ( + muChunks = &sync.Mutex{} + chunks = make([]*swarm.Address, 0) + ) + + n := time.Now() + defer func() { + logger.Info("validation finished", "duration", time.Since(n), "invalid", invalidCount, "soc", socCount, "total", total) + }() + + iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) + + validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) (bool, error) { + err := readFn(context.Background(), item.Location, buf) + if err != nil { + logger.Warning("invalid chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "location", item.Location, "error", err) + return false, err + } + + ch := swarm.NewChunk(item.Address, buf) + if !cac.Valid(ch) { + if soc.Valid(ch) { + socCount++ + logger.Debug("found soc chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0)) + return true, nil + } else { + invalidCount++ + logger.Warning("invalid cac/soc chunk", "address", item.Address, "timestamp", time.Unix(int64(item.Timestamp), 0), "err", err) + + h, err := cac.DoHash(buf[swarm.SpanSize:], buf[:swarm.SpanSize]) + if err != nil { + logger.Error(err, "cac hash") + return false, err + } + + computedAddr := swarm.NewAddress(h) + + if !cac.Valid(swarm.NewChunk(computedAddr, buf)) { + logger.Warning("computed chunk is also an invalid cac", "err", err) + return false, err + } + + sharedEntry := chunkstore.RetrievalIndexItem{Address: computedAddr} + err = store.Get(&sharedEntry) + if err != nil { + logger.Warning("no shared entry found") + return false, nil + } + + logger.Warning("retrieved chunk with shared slot", "shared_address", sharedEntry.Address, "shared_timestamp", time.Unix(int64(sharedEntry.Timestamp), 0)) + return false, nil + } + } else { + return true, nil + } + } + + s := time.Now() + + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + total++ + if total%10_000_000 == 0 { + logger.Info("..still counting chunks", "total", total) + } + return nil + }) + logger.Info("validation count finished", "duration", time.Since(s), "total", total) + + var wg sync.WaitGroup + + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + buf := make([]byte, swarm.SocMaxChunkSize) + for item := range iteratateItemsC { + valid, err := validChunk(item, buf[:item.Location.Length]) + if !valid && err == nil { + muChunks.Lock() + chunks = append(chunks, &item.Address) + muChunks.Unlock() + } + } + }() + } + + count := 0 + _ = chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + iteratateItemsC <- item + count++ + if count%100_000 == 0 { + logger.Info("..still validating chunks", "count", count, "invalid", invalidCount, "soc", socCount, "total", total, "percent", fmt.Sprintf("%.2f", (float64(count)*100.0)/float64(total))) + } + + return nil + }) + + close(iteratateItemsC) + + wg.Wait() + + return chunks +}