From 135c1f2f5164d0aa65878d632c0914a195f8a628 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 6 Feb 2024 13:21:03 +0300 Subject: [PATCH] fix: puller --- pkg/puller/metrics.go | 11 ++--------- pkg/puller/puller.go | 15 ++++++--------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/pkg/puller/metrics.go b/pkg/puller/metrics.go index 715a285be63..861e06f69c5 100644 --- a/pkg/puller/metrics.go +++ b/pkg/puller/metrics.go @@ -11,9 +11,8 @@ import ( type metrics struct { SyncWorkerIterCounter prometheus.Counter // counts the number of syncing iterations - SyncWorkerCounter prometheus.Counter // count number of syncing jobs + SyncWorkerCounter prometheus.Gauge // count number of syncing jobs SyncedCounter prometheus.CounterVec // number of synced chunks - SyncWorkerDoneCounter prometheus.Counter // count number of finished syncing jobs SyncWorkerErrCounter prometheus.Counter // count number of errors MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost } @@ -28,7 +27,7 @@ func newMetrics() metrics { Name: "worker_iterations", Help: "Total worker iterations.", }), - SyncWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{ + SyncWorkerCounter: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: m.Namespace, Subsystem: subsystem, Name: "worker", @@ -40,12 +39,6 @@ func newMetrics() metrics { Name: "synced_chunks", Help: "Total synced chunks.", }, []string{"type"}), - SyncWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "worker_done", - Help: "Total worker jobs done.", - }), SyncWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index f2cfd15eee0..791a2a6473b 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -294,17 +294,14 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint ctx, cancel := context.WithCancel(parentCtx) peer.setBinCancel(cancel, bin) - sync := func(isHistorical bool, address swarm.Address, start uint64, bin uint8, done func()) { + sync := func(isHistorical bool, address swarm.Address, start uint64) { p.metrics.SyncWorkerCounter.Inc() defer p.wg.Done() - defer p.metrics.SyncWorkerDoneCounter.Inc() - defer done() + defer peer.wg.Done() + defer p.metrics.SyncWorkerCounter.Dec() - var ( - cursor = start - err error - ) + var err error for { if isHistorical { // overide start with the next interval if historical syncing @@ -373,12 +370,12 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint if cursor > 0 { peer.wg.Add(1) p.wg.Add(1) - go sync(true, peer.address, cursor, bin, peer.wg.Done) + go sync(true, peer.address, cursor) } peer.wg.Add(1) p.wg.Add(1) - go sync(false, peer.address, cursor+1, bin, peer.wg.Done) + go sync(false, peer.address, cursor+1) } func (p *Puller) Close() error {