Skip to content

Commit

Permalink
fix: valid fix
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Sep 21, 2023
1 parent 7a07112 commit 87500b0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 59 deletions.
56 changes: 27 additions & 29 deletions pkg/storer/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,36 +57,23 @@ func Compact(ctx context.Context, basePath string, opts *Options, validate bool)

n := time.Now()

iteratateItemsC := make(chan chunkstore.IterateResult)
chunkstore.Iterate(ctx, store, iteratateItemsC)

var shards [][]*chunkstore.RetrievalIndexItem
for i := 0; i < sharkyNoOfShards; i++ {
shards = append(shards, []*chunkstore.RetrievalIndexItem{})
}

count := 0
for c := range iteratateItemsC {
if c.Err != nil {
return fmt.Errorf("location read: %w", err)
}

shards[c.Item.Location.Shard] = append(shards[c.Item.Location.Shard], c.Item)
}

logger.Info("total items", "count", count)

for shard := 0; shard < sharkyNoOfShards; shard++ {

locs := shards[shard]
items := make([]*chunkstore.RetrievalIndexItem, 0, 1_000_000)
chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {

Check failure on line 63 in pkg/storer/compact.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `chunkstore.Iterate` is not checked (errcheck)
if item.Location.Shard == uint8(shard) {
items = append(items, item)
}
return nil
})

sort.Slice(locs, func(i, j int) bool {
return locs[i].Location.Slot < locs[j].Location.Slot
sort.Slice(items, func(i, j int) bool {
return items[i].Location.Slot < items[j].Location.Slot
})

lastUsedSlot := locs[len(locs)-1].Location.Slot
lastUsedSlot := items[len(items)-1].Location.Slot
slots := make([]*chunkstore.RetrievalIndexItem, lastUsedSlot+1) // marks free and used slots
for _, l := range locs {
for _, l := range items {
slots[l.Location.Slot] = l
}

Expand Down Expand Up @@ -151,16 +138,15 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store,
logger.Info("validation finished", "duration", time.Since(n))
}()

iteratateItemsC := make(chan chunkstore.IterateResult)
chunkstore.Iterate(ctx, store, iteratateItemsC)
iteratateItemsC := make(chan *chunkstore.RetrievalIndexItem)

validChunk := func(item *chunkstore.RetrievalIndexItem, buf []byte) error {
err := sharky.Read(ctx, item.Location, buf)
if err != nil {
return err
}

ch := swarm.NewChunk(item.Address, buf[:item.Location.Length])
ch := swarm.NewChunk(item.Address, buf)
if !cac.Valid(ch) && !soc.Valid(ch) {
return errors.New("invalid chunk")
}
Expand All @@ -174,13 +160,25 @@ func validationWork(ctx context.Context, logger log.Logger, store storage.Store,
eg.Go(func() error {
buf := make([]byte, swarm.SocMaxChunkSize)
for item := range iteratateItemsC {
if err := validChunk(item.Item, buf); err != nil {
logger.Info("invalid chunk", "address", item.Item.Address, "error", err)
if err := validChunk(item, buf[:item.Location.Length]); err != nil {
logger.Info("invalid chunk", "address", item.Address, "error", err)
}
}
return nil
})
}

count := 0
chunkstore.Iterate(store, func(item *chunkstore.RetrievalIndexItem) error {

Check failure on line 172 in pkg/storer/compact.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `chunkstore.Iterate` is not checked (errcheck)
iteratateItemsC <- item
count++
if count%100_000 == 0 {
logger.Info("..still validating chunks", "count", count)
}
return nil
})

close(iteratateItemsC)

_ = eg.Wait()
}
37 changes: 7 additions & 30 deletions pkg/storer/internal/chunkstore/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,34 +57,11 @@ func IterateLocations(
}

// IterateLocations iterates over entire retrieval index and plucks only sharky location.
func Iterate(
ctx context.Context,
st storage.Store,
locationResultC chan<- IterateResult,
) {
go func() {
defer close(locationResultC)

err := st.Iterate(storage.Query{
Factory: func() storage.Item { return new(RetrievalIndexItem) },
}, func(r storage.Result) (bool, error) {
entry := r.Entry.(*RetrievalIndexItem)

select {
case <-ctx.Done():
return true, ctx.Err()
case locationResultC <- IterateResult{Item: entry}:
}

return false, nil
})
if err != nil {
result := IterateResult{Err: fmt.Errorf("iterate retrieval index error: %w", err)}

select {
case <-ctx.Done():
case locationResultC <- result:
}
}
}()
func Iterate(st storage.Store, cb func(*RetrievalIndexItem) error) error {
return st.Iterate(storage.Query{
Factory: func() storage.Item { return new(RetrievalIndexItem) },
}, func(r storage.Result) (bool, error) {
entry := r.Entry.(*RetrievalIndexItem)
return false, cb(entry)
})
}

0 comments on commit 87500b0

Please sign in to comment.