Skip to content

Commit

Permalink
chore: more repair cmd logs (#4702)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Jun 11, 2024
1 parent a9a9bab commit 4b04c08
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
4 changes: 2 additions & 2 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,10 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
if err != nil {
p.metrics.SyncWorkerErrCounter.Inc()
if errors.Is(err, p2p.ErrPeerNotFound) {
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top)
p.logger.Debug("syncWorker interval failed, quitting", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
return
}
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", address, "start", start, "topmost", top)
loggerV2.Debug("syncWorker interval failed", "error", err, "peer_address", address, "bin", bin, "cursor", cursor, "start", start, "topmost", top)
}

if isHistorical {
Expand Down
20 changes: 16 additions & 4 deletions pkg/storer/migration/reserveRepair.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,20 @@ func ReserveRepairer(
if binIds[item.Bin][item.BinID] > 1 {
return false, fmt.Errorf("binID %d in bin %d already used", item.BinID, item.Bin)
}

err := st.IndexStore().Get(&reserve.ChunkBinItem{Bin: item.Bin, BinID: item.BinID})
if err != nil {
return false, fmt.Errorf("check failed: chunkBinItem, bin %d, binID %d: %w", item.Bin, item.BinID, err)
}

return false, nil
},
)
}

err := checkBinIDs()
if err != nil {
logger.Error(err, "check failed")
logger.Info("pre-repair check failed", "error", err)
}

// STEP 0
Expand Down Expand Up @@ -162,11 +168,16 @@ func ReserveRepairer(
}

var eg errgroup.Group
eg.SetLimit(runtime.NumCPU())

p := runtime.NumCPU()
eg.SetLimit(p)

logger.Info("parallel workers", "count", p)

for _, item := range batchRadiusItems {
func(item *reserve.BatchRadiusItem) {
eg.Go(func() error {

return st.Run(context.Background(), func(s transaction.Store) error {

chunk, err := s.ChunkStore().Get(context.Background(), item.Address)
Expand Down Expand Up @@ -236,11 +247,12 @@ func ReserveRepairer(
return err
}

logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load())

if batchRadiusCnt != chunkBinCnt {
return errors.New("index counts do not match")
return fmt.Errorf("index counts do not match, %d vs %d", batchRadiusCnt, chunkBinCnt)
}

logger.Info("migrated all chunk entries", "new_size", batchRadiusCnt, "missing_chunks", missingChunks.Load(), "invalid_sharky_chunks", invalidSharkyChunks.Load())
return nil
}
}

0 comments on commit 4b04c08

Please sign in to comment.