diff --git a/indexer/crawler.go b/indexer/crawler.go index ee3e53429..0f63aab86 100644 --- a/indexer/crawler.go +++ b/indexer/crawler.go @@ -281,11 +281,11 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) { if has { if !pdsJobs.alreadyEnheaped { heap.Push(c, crawlJob) + catchupPending.Set(float64(len(c.repoSyncHeap))) c.repoSyncCond.Signal() pdsJobs.alreadyEnheaped = true } if pdsJobs == crawlJob { - // this _should_ be true if nextInPds was already there when we called .nextJob() return } for pdsJobs.nextInPds != nil { @@ -302,6 +302,7 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) { } if !crawlJob.alreadyEnheaped { heap.Push(c, crawlJob) + catchupPending.Set(float64(len(c.repoSyncHeap))) c.repoSyncCond.Signal() crawlJob.alreadyEnheaped = true } @@ -318,23 +319,19 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) { func (c *CrawlDispatcher) nextJob() *crawlWork { c.repoSyncLock.Lock() defer c.repoSyncLock.Unlock() - //retry: for len(c.repoSyncHeap) == 0 { c.repoSyncCond.Wait() } x := heap.Pop(c) + catchupPending.Set(float64(len(c.repoSyncHeap))) crawlJob := x.(*crawlWork) if crawlJob.nextInPds != nil { prev := c.repoSyncPds[crawlJob.act.PDS] if prev != crawlJob { - log.Errorf("CrawlDispatcher internal: pds %d next is not next in eligible heap, dropping all PDS work", crawlJob.act.PDS) - //delete(c.repoSyncPds, crawlJob.act.PDS) - //goto retry + log.Errorf("CrawlDispatcher internal: pds %d next is not next in eligible heap", crawlJob.act.PDS) } - //c.repoSyncPds[crawlJob.act.PDS] = crawlJob.nextInPds - } // else { + } delete(c.repoSyncPds, crawlJob.act.PDS) - //} crawlJob.alreadyEnheaped = false return crawlJob } diff --git a/indexer/metrics.go b/indexer/metrics.go index bd00be514..78a41a29b 100644 --- a/indexer/metrics.go +++ b/indexer/metrics.go @@ -39,3 +39,8 @@ var catchupEventsFailed = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "indexer_catchup_events_failed", Help: "Number of catchup events processed", }, []string{"err"}) + +var catchupPending = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "indexer_catchup_pending", + Help: "Number of catchup pending", +})