Skip to content

Commit

Permalink
feat(reserve): new worker and metric for reserve size within radius (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
istae authored Sep 28, 2023
1 parent 15363da commit 60438eb
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 13 deletions.
35 changes: 22 additions & 13 deletions pkg/storer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@ import (

// metrics groups storer related prometheus counters.
type metrics struct {
MethodCalls prometheus.CounterVec
MethodCallsDuration prometheus.HistogramVec
ReserveSize prometheus.Gauge
ReserveCleanup prometheus.Counter
StorageRadius prometheus.Gauge
CacheSize prometheus.Gauge
EvictedChunkCount prometheus.Counter
ExpiredChunkCount prometheus.Counter
OverCapTriggerCount prometheus.Counter
ExpiredBatchCount prometheus.Counter
LevelDBStats prometheus.HistogramVec
ExpiryTriggersCount prometheus.Counter
ExpiryRunsCount prometheus.Counter
MethodCalls prometheus.CounterVec
MethodCallsDuration prometheus.HistogramVec
ReserveSize prometheus.Gauge
ReserveSizeWithinRadius prometheus.Gauge
ReserveCleanup prometheus.Counter
StorageRadius prometheus.Gauge
CacheSize prometheus.Gauge
EvictedChunkCount prometheus.Counter
ExpiredChunkCount prometheus.Counter
OverCapTriggerCount prometheus.Counter
ExpiredBatchCount prometheus.Counter
LevelDBStats prometheus.HistogramVec
ExpiryTriggersCount prometheus.Counter
ExpiryRunsCount prometheus.Counter
}

// newMetrics is a convenient constructor for creating new metrics.
Expand Down Expand Up @@ -63,6 +64,14 @@ func newMetrics() metrics {
Help: "Number of chunks in reserve.",
},
),
ReserveSizeWithinRadius: prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "reserve_size_within_radius",
Help: "Number of chunks in reserve with proximity >= storage radius.",
},
),
ReserveCleanup: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: m.Namespace,
Expand Down
32 changes: 32 additions & 0 deletions pkg/storer/reserve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storage/storageutil"
"github.com/ethersphere/bee/pkg/storer/internal"
"github.com/ethersphere/bee/pkg/storer/internal/reserve"
"github.com/ethersphere/bee/pkg/swarm"
"golang.org/x/exp/slices"
)
Expand All @@ -26,6 +27,8 @@ const (
reserveUpdateLockKey = "reserveUpdateLockKey"
batchExpiry = "batchExpiry"
batchExpiryDone = "batchExpiryDone"

reserveSizeWithinRadiusWakeup = time.Hour
)

func reserveUpdateBatchLockKey(batchID []byte) string {
Expand Down Expand Up @@ -85,6 +88,9 @@ func (db *DB) startReserveWorkers(

db.inFlight.Add(1)
go db.radiusWorker(ctx, wakeUpDur)

db.inFlight.Add(1)
go db.reserveSizeWithinRadiusWorker(ctx)
}

func (db *DB) radiusWorker(ctx context.Context, wakeUpDur time.Duration) {
Expand Down Expand Up @@ -112,6 +118,32 @@ func (db *DB) radiusWorker(ctx context.Context, wakeUpDur time.Duration) {
}
}

func (db *DB) reserveSizeWithinRadiusWorker(ctx context.Context) {
defer db.inFlight.Done()

ticker := time.NewTicker(reserveSizeWithinRadiusWakeup)
defer ticker.Stop()

for {
count := 0
err := db.reserve.IterateChunksItems(db.repo, db.StorageRadius(), func(ci reserve.ChunkItem) (bool, error) {
count++
return false, nil
})
if err != nil {
db.logger.Error(err, "reserve count within radius")
}

db.metrics.ReserveSizeWithinRadius.Set(float64(count))

select {
case <-ctx.Done():
return
case <-ticker.C:
}
}
}

func (db *DB) getExpiredBatches() ([][]byte, error) {
var batchesToEvict [][]byte
err := db.repo.IndexStore().Iterate(storage.Query{
Expand Down

0 comments on commit 60438eb

Please sign in to comment.