From 87500b065d9d9a03b07ee7d2ba6b88879a7ae197 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 21 Sep 2023 22:55:01 +0300 Subject: [PATCH] fix: valid fix --- pkg/storer/compact.go | 56 +++++++++++------------ pkg/storer/internal/chunkstore/helpers.go | 37 +++------------ 2 files changed, 34 insertions(+), 59 deletions(-) diff --git a/pkg/storer/compact.go b/pkg/storer/compact.go index c7acc8186ba..c65976cb356 100644 --- a/pkg/storer/compact.go +++ b/pkg/storer/compact.go @@ -57,36 +57,23 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool) n := time.Now() - iteratateItemsC := make(chan chunkstore.IterateResult) - chunkstore.Iterate(ctx, store, iteratateItemsC) - - var shards [][]*chunkstore.RetrievalIndexItem - for i := 0; i < sharkyNoOfShards; i++ { - shards = append(shards, []*chunkstore.RetrievalIndexItem{}) - } - - count := 0 - for c := range iteratateItemsC { - if c.Err != nil { - return fmt.Errorf("location read: %w", err) - } - - shards[c.Item.Location.Shard] = append(shards[c.Item.Location.Shard], c.Item) - } - - logger.Info("total items", "count", count) - for shard := 0; shard < sharkyNoOfShards; shard++ { - locs := shards[shard] + items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000) + chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + if item.Location.Shard == uint8(shard) { + items = append(items, item) + } + return nil + }) - sort.Slice(locs, func(i, j int) bool { - return locs[i].Location.Slot < locs[j].Location.Slot + sort.Slice(items, func(i, j int) bool { + return items[i].Location.Slot < items[j].Location.Slot }) - lastUsedSlot := locs[len(locs)-1].Location.Slot + lastUsedSlot := items[len(items)-1].Location.Slot slots := make([]*chunkstore.RetrievalIndexItem, lastUsedSlot+1) // marks free and used slots - for _, l := range locs { + for _, l := range items { slots[l.Location.Slot] = l } @@ -151,8 +138,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, logger.Info("validation finished", "duration", time.Since(n)) }() - iteratateItemsC := make(chan chunkstore.IterateResult) - chunkstore.Iterate(ctx, store, iteratateItemsC) + iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem) validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error { err := sharky.Read(ctx, item.Location, buf) @@ -160,7 +146,7 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, return err } - ch := swarm.NewChunk(item.Address, buf[:item.Location.Length]) + ch := swarm.NewChunk(item.Address, buf) if !cac.Valid(ch) && !soc.Valid(ch) { return errors.New("invalid chunk") } @@ -174,13 +160,25 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store, eg.Go(func() error { buf := make([]byte, swarm.SocMaxChunkSize) for item := range iteratateItemsC { - if err := validChunk(item.Item, buf); err != nil { - logger.Info("invalid chunk", "address", item.Item.Address, "error", err) + if err := validChunk(item, buf[:item.Location.Length]); err != nil { + logger.Info("invalid chunk", "address", item.Address, "error", err) } } return nil }) } + count := 0 + chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error { + iteratateItemsC <- item + count++ + if count%100_000 == 0 { + logger.Info("..still validating chunks", "count", count) + } + return nil + }) + + close(iteratateItemsC) + _ = eg.Wait() } diff --git a/pkg/storer/internal/chunkstore/helpers.go b/pkg/storer/internal/chunkstore/helpers.go index 5880467cb48..40f29140e68 100644 --- a/pkg/storer/internal/chunkstore/helpers.go +++ b/pkg/storer/internal/chunkstore/helpers.go @@ -57,34 +57,11 @@ func IterateLocations( } // IterateLocations iterates over entire retrieval index and plucks only sharky location. -func Iterate( - ctx context.Context, - st storage.Store, - locationResultC chan<- IterateResult, -) { - go func() { - defer close(locationResultC) - - err := st.Iterate(storage.Query{ - Factory: func() storage.Item { return new(RetrievalIndexItem) }, - }, func(r storage.Result) (bool, error) { - entry := r.Entry.(*RetrievalIndexItem) - - select { - case <-ctx.Done(): - return true, ctx.Err() - case locationResultC <- IterateResult{Item: entry}: - } - - return false, nil - }) - if err != nil { - result := IterateResult{Err: fmt.Errorf("iterate retrieval index error: %w", err)} - - select { - case <-ctx.Done(): - case locationResultC <- result: - } - } - }() +func Iterate(st storage.Store, cb func(*RetrievalIndexItem) error) error { + return st.Iterate(storage.Query{ + Factory: func() storage.Item { return new(RetrievalIndexItem) }, + }, func(r storage.Result) (bool, error) { + entry := r.Entry.(*RetrievalIndexItem) + return false, cb(entry) + }) }