Skip to content

Commit

Permalink
chore: added extra logging in reserve worker for invalid batches (#4384)
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Oct 10, 2023
1 parent cfc472a commit 401e24e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 23 deletions.
2 changes: 2 additions & 0 deletions pkg/storer/internal/reserve/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ type ChunkItem struct {
BatchID []byte
Type swarm.ChunkType
BinID uint64
Bin uint8
}

func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb func(ChunkItem) (bool, error)) error {
Expand All @@ -295,6 +296,7 @@ func (r *Reserve) IterateChunksItems(store internal.Storage, startBin uint8, cb
BatchID: item.BatchID,
Type: item.ChunkType,
BinID: item.BinID,
Bin: item.Bin,
}

stop, err := cb(chItem)
Expand Down
10 changes: 10 additions & 0 deletions pkg/storer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type metrics struct {
LevelDBStats prometheus.HistogramVec
ExpiryTriggersCount prometheus.Counter
ExpiryRunsCount prometheus.Counter

ReserveMissingBatch prometheus.Gauge
}

// newMetrics is a convenient constructor for creating new metrics.
Expand Down Expand Up @@ -64,6 +66,14 @@ func newMetrics() metrics {
Help: "Number of chunks in reserve.",
},
),
ReserveMissingBatch: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_missing_batch",
Help: "Number of chunks in reserve with missing batches.",
},
),
ReserveSizeWithinRadius: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Expand Down
55 changes: 32 additions & 23 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,24 @@ func (db *DB) reserveSizeWithinRadiusWorker(ctx context.Context) {

for {
count := 0
err := db.reserve.IterateChunksItems(db.repo, db.StorageRadius(), func(ci reserve.ChunkItem) (bool, error) {
count++
missing := 0
radius := db.StorageRadius()
err := db.reserve.IterateChunksItems(db.repo, 0, func(ci reserve.ChunkItem) (bool, error) {
if ci.Bin >= radius {
count++
}
if exists, _ := db.batchstore.Exists(ci.BatchID); !exists {
missing++
db.logger.Debug("reserve size worker, item with invalid batch id", "batch_id", hex.EncodeToString(ci.BatchID), "chunk_address", ci.ChunkAddress)
}
return false, nil
})
if err != nil {
db.logger.Error(err, "reserve count within radius")
}

db.metrics.ReserveSizeWithinRadius.Set(float64(count))
db.metrics.ReserveMissingBatch.Set(float64(missing))

select {
case <-ctx.Done():
Expand Down Expand Up @@ -237,27 +246,6 @@ func (db *DB) ReserveGet(ctx context.Context, addr swarm.Address, batchID []byte
return db.reserve.Get(ctx, db.repo, addr, batchID)
}

func (db *DB) StorageRadius() uint8 {
if db.reserve == nil {
return 0
}
return db.reserve.Radius()
}

func (db *DB) ReserveSize() int {
if db.reserve == nil {
return 0
}
return db.reserve.Size()
}

func (db *DB) IsWithinStorageRadius(addr swarm.Address) bool {
if db.reserve == nil {
return false
}
return swarm.Proximity(addr.Bytes(), db.baseAddr.Bytes()) >= db.reserve.Radius()
}

func (db *DB) ReserveHas(addr swarm.Address, batchID []byte) (has bool, err error) {
dur := captureDuration(time.Now())
defer func() {
Expand Down Expand Up @@ -445,6 +433,27 @@ func (db *DB) ReserveIterateChunks(cb func(swarm.Chunk) (bool, error)) error {
return db.reserve.IterateChunks(db.repo, 0, cb)
}

func (db *DB) StorageRadius() uint8 {
if db.reserve == nil {
return 0
}
return db.reserve.Radius()
}

func (db *DB) ReserveSize() int {
if db.reserve == nil {
return 0
}
return db.reserve.Size()
}

func (db *DB) IsWithinStorageRadius(addr swarm.Address) bool {
if db.reserve == nil {
return false
}
return swarm.Proximity(addr.Bytes(), db.baseAddr.Bytes()) >= db.reserve.Radius()
}

// BinC is the result returned from the SubscribeBin channel that contains the chunk address and the binID
type BinC struct {
Address swarm.Address
Expand Down

0 comments on commit 401e24e

Please sign in to comment.