Skip to content

Commit

Permalink
fix(worker): evicts batch only once (#4421)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 24, 2023
1 parent 7501877 commit b6ea8b3
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ func (db *DB) reserveSizeWithinRadiusWorker(ctx context.Context) {
count := 0
missing := 0
radius := db.StorageRadius()

evictBatches := make(map[string]bool)

err := db.reserve.IterateChunksItems(db.repo, 0, func(ci reserve.ChunkItem) (bool, error) {
if ci.Bin >= radius {
count++
Expand All @@ -163,19 +166,23 @@ func (db *DB) reserveSizeWithinRadiusWorker(ctx context.Context) {
return false, nil
}

if exists, _ := db.batchstore.Exists(ci.BatchID); !exists {
if exists, err := db.batchstore.Exists(ci.BatchID); err == nil && !exists {
missing++
db.logger.Debug("reserve size worker, item with invalid batch id", "batch_id", hex.EncodeToString(ci.BatchID), "chunk_address", ci.ChunkAddress)
if err := db.EvictBatch(ctx, ci.BatchID); err != nil {
db.logger.Warning("reserve size worker, batch eviction", "batch_id", hex.EncodeToString(ci.BatchID), "chunk_address", ci.ChunkAddress, "error", err)
}
evictBatches[string(ci.BatchID)] = true
}
return false, nil
})
if err != nil {
db.logger.Error(err, "reserve count within radius")
}

for batch := range evictBatches {
db.logger.Debug("reserve size worker, invalid batch id", "batch_id", hex.EncodeToString([]byte(batch)))
if err := db.EvictBatch(ctx, []byte(batch)); err != nil {
db.logger.Warning("reserve size worker, batch eviction", "batch_id", hex.EncodeToString([]byte(batch)), "error", err)
}
}

db.metrics.ReserveSizeWithinRadius.Set(float64(count))
if !skipInvalidCheck {
db.metrics.ReserveMissingBatch.Set(float64(missing))
Expand Down

0 comments on commit b6ea8b3

Please sign in to comment.