Skip to content

Commit

Permalink
cleanup, add gauge indexer_catchup_pending
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 19, 2024
1 parent f18240d commit 07f6097
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
13 changes: 5 additions & 8 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,11 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
if has {
if !pdsJobs.alreadyEnheaped {
heap.Push(c, crawlJob)
catchupPending.Set(float64(len(c.repoSyncHeap)))
c.repoSyncCond.Signal()
pdsJobs.alreadyEnheaped = true
}
if pdsJobs == crawlJob {
// this _should_ be true if nextInPds was already there when we called .nextJob()
return
}
for pdsJobs.nextInPds != nil {
Expand All @@ -302,6 +302,7 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
}
if !crawlJob.alreadyEnheaped {
heap.Push(c, crawlJob)
catchupPending.Set(float64(len(c.repoSyncHeap)))
c.repoSyncCond.Signal()
crawlJob.alreadyEnheaped = true
}
Expand All @@ -318,23 +319,19 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
func (c *CrawlDispatcher) nextJob() *crawlWork {
c.repoSyncLock.Lock()
defer c.repoSyncLock.Unlock()
//retry:
for len(c.repoSyncHeap) == 0 {
c.repoSyncCond.Wait()
}
x := heap.Pop(c)
catchupPending.Set(float64(len(c.repoSyncHeap)))
crawlJob := x.(*crawlWork)
if crawlJob.nextInPds != nil {
prev := c.repoSyncPds[crawlJob.act.PDS]
if prev != crawlJob {
log.Errorf("CrawlDispatcher internal: pds %d next is not next in eligible heap, dropping all PDS work", crawlJob.act.PDS)
//delete(c.repoSyncPds, crawlJob.act.PDS)
//goto retry
log.Errorf("CrawlDispatcher internal: pds %d next is not next in eligible heap", crawlJob.act.PDS)
}
//c.repoSyncPds[crawlJob.act.PDS] = crawlJob.nextInPds
} // else {
}
delete(c.repoSyncPds, crawlJob.act.PDS)
//}
crawlJob.alreadyEnheaped = false
return crawlJob
}
Expand Down
5 changes: 5 additions & 0 deletions indexer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ var catchupEventsFailed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "indexer_catchup_events_failed",
Help: "Number of catchup events processed",
}, []string{"err"})

var catchupPending = promauto.NewGauge(prometheus.GaugeOpts{
Name: "indexer_catchup_pending",
Help: "Number of catchup pending",
})

0 comments on commit 07f6097

Please sign in to comment.