Skip to content

Commit

Permalink
fix: puller
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Feb 6, 2024
1 parent 64d8ea6 commit 135c1f2
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 18 deletions.
11 changes: 2 additions & 9 deletions pkg/puller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (

type metrics struct {
SyncWorkerIterCounter prometheus.Counter // counts the number of syncing iterations
SyncWorkerCounter prometheus.Counter // count number of syncing jobs
SyncWorkerCounter prometheus.Gauge // count number of syncing jobs
SyncedCounter prometheus.CounterVec // number of synced chunks
SyncWorkerDoneCounter prometheus.Counter // count number of finished syncing jobs
SyncWorkerErrCounter prometheus.Counter // count number of errors
MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost
}
Expand All @@ -28,7 +27,7 @@ func newMetrics() metrics {
Name: "worker_iterations",
Help: "Total worker iterations.",
}),
SyncWorkerCounter: prometheus.NewCounter(prometheus.CounterOpts{
SyncWorkerCounter: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "worker",
Expand All @@ -40,12 +39,6 @@ func newMetrics() metrics {
Name: "synced_chunks",
Help: "Total synced chunks.",
}, []string{"type"}),
SyncWorkerDoneCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "worker_done",
Help: "Total worker jobs done.",
}),
SyncWorkerErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down
15 changes: 6 additions & 9 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,17 +294,14 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
ctx, cancel := context.WithCancel(parentCtx)
peer.setBinCancel(cancel, bin)

sync := func(isHistorical bool, address swarm.Address, start uint64, bin uint8, done func()) {
sync := func(isHistorical bool, address swarm.Address, start uint64) {
p.metrics.SyncWorkerCounter.Inc()

defer p.wg.Done()
defer p.metrics.SyncWorkerDoneCounter.Inc()
defer done()
defer peer.wg.Done()
defer p.metrics.SyncWorkerCounter.Dec()

var (
cursor = start
err error
)
var err error

for {
if isHistorical { // overide start with the next interval if historical syncing
Expand Down Expand Up @@ -373,12 +370,12 @@ func (p *Puller) syncPeerBin(parentCtx context.Context, peer *syncPeer, bin uint
if cursor > 0 {
peer.wg.Add(1)
p.wg.Add(1)
go sync(true, peer.address, cursor, bin, peer.wg.Done)
go sync(true, peer.address, cursor)
}

peer.wg.Add(1)
p.wg.Add(1)
go sync(false, peer.address, cursor+1, bin, peer.wg.Done)
go sync(false, peer.address, cursor+1)
}

func (p *Puller) Close() error {
Expand Down

0 comments on commit 135c1f2

Please sign in to comment.