diff --git a/pkg/storer/internal/reserve/reserve.go b/pkg/storer/internal/reserve/reserve.go index f69d7981095..6931d51f4dd 100644 --- a/pkg/storer/internal/reserve/reserve.go +++ b/pkg/storer/internal/reserve/reserve.go @@ -280,6 +280,7 @@ type ChunkItem struct { BatchID []byte Type swarm.ChunkType BinID uint64 + Bin uint8 } func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb func(ChunkItem) (bool, error)) error { @@ -295,6 +296,7 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb BatchID: item.BatchID, Type: item.ChunkType, BinID: item.BinID, + Bin: item.Bin, } stop, err := cb(chItem) diff --git a/pkg/storer/metrics.go b/pkg/storer/metrics.go index 4543fc5435e..e132c4e931e 100644 --- a/pkg/storer/metrics.go +++ b/pkg/storer/metrics.go @@ -31,6 +31,8 @@ type metrics struct { LevelDBStats prometheus.HistogramVec ExpiryTriggersCount prometheus.Counter ExpiryRunsCount prometheus.Counter + + ReserveMissingBatch prometheus.Gauge } // newMetrics is a convenient constructor for creating new metrics. @@ -64,6 +66,14 @@ func newMetrics() metrics { Help: "Number of chunks in reserve.", }, ), + ReserveMissingBatch: prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "reserve_missing_batch", + Help: "Number of chunks in reserve with missing batches.", + }, + ), ReserveSizeWithinRadius: prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: m.Namespace, diff --git a/pkg/storer/reserve.go b/pkg/storer/reserve.go index 2a4e4b752f1..8535bbe1dd8 100644 --- a/pkg/storer/reserve.go +++ b/pkg/storer/reserve.go @@ -126,8 +126,16 @@ func (db *DB) reserveSizeWithinRadiusWorker(ctx context.Context) { for { count := 0 - err := db.reserve.IterateChunksItems(db.repo, db.StorageRadius(), func(ci reserve.ChunkItem) (bool, error) { - count++ + missing := 0 + radius := db.StorageRadius() + err := db.reserve.IterateChunksItems(db.repo, 0, func(ci reserve.ChunkItem) (bool, error) { + if ci.Bin >= radius { + count++ + } + if exists, _ := db.batchstore.Exists(ci.BatchID); !exists { + missing++ + db.logger.Debug("reserve size worker, item with invalid batch id", "batch_id", hex.EncodeToString(ci.BatchID), "chunk_address", ci.ChunkAddress) + } return false, nil }) if err != nil { @@ -135,6 +143,7 @@ func (db *DB) reserveSizeWithinRadiusWorker(ctx context.Context) { } db.metrics.ReserveSizeWithinRadius.Set(float64(count)) + db.metrics.ReserveMissingBatch.Set(float64(missing)) select { case <-ctx.Done(): @@ -237,27 +246,6 @@ func (db *DB) ReserveGet(ctx context.Context, addr swarm.Address, batchID []byte return db.reserve.Get(ctx, db.repo, addr, batchID) } -func (db *DB) StorageRadius() uint8 { - if db.reserve == nil { - return 0 - } - return db.reserve.Radius() -} - -func (db *DB) ReserveSize() int { - if db.reserve == nil { - return 0 - } - return db.reserve.Size() -} - -func (db *DB) IsWithinStorageRadius(addr swarm.Address) bool { - if db.reserve == nil { - return false - } - return swarm.Proximity(addr.Bytes(), db.baseAddr.Bytes()) >= db.reserve.Radius() -} - func (db *DB) ReserveHas(addr swarm.Address, batchID []byte) (has bool, err error) { dur := captureDuration(time.Now()) defer func() { @@ -445,6 +433,27 @@ func (db *DB) ReserveIterateChunks(cb func(swarm.Chunk) (bool, error)) error { return db.reserve.IterateChunks(db.repo, 0, cb) } +func (db *DB) StorageRadius() uint8 { + if db.reserve == nil { + return 0 + } + return db.reserve.Radius() +} + +func (db *DB) ReserveSize() int { + if db.reserve == nil { + return 0 + } + return db.reserve.Size() +} + +func (db *DB) IsWithinStorageRadius(addr swarm.Address) bool { + if db.reserve == nil { + return false + } + return swarm.Proximity(addr.Bytes(), db.baseAddr.Bytes()) >= db.reserve.Radius() +} + // BinC is the result returned from the SubscribeBin channel that contains the chunk address and the binID type BinC struct { Address swarm.Address