Skip to content

Commit

Permalink
maybe fix crawler scheduler, more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 19, 2024
1 parent 336816a commit f18240d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
26 changes: 14 additions & 12 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,26 @@ func (c *CrawlDispatcher) dequeueJob(job *crawlWork) {
}

func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
catchupEventsEnqueued.Inc()
c.maplk.Lock()
defer c.maplk.Unlock()

// If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl
job, ok := c.todo[catchup.user.Uid]
if ok {
catchupEventsEnqueued.WithLabelValues("todo").Inc()
job.catchup = append(job.catchup, catchup)
return nil
}

// If the actor crawl is in progress, we can append to the nextr queue which gets emptied after the crawl
job, ok = c.inProgress[catchup.user.Uid]
if ok {
catchupEventsEnqueued.WithLabelValues("prog").Inc()
job.next = append(job.next, catchup)
return nil
}

catchupEventsEnqueued.WithLabelValues("new").Inc()
// Otherwise, we need to create a new crawl job for this actor and enqueue it
cw := &crawlWork{
act: catchup.user,
Expand Down Expand Up @@ -270,6 +272,9 @@ func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bo

// priority-queue for crawlJob based on eligibleTime
func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
if crawlJob.alreadyEnheaped {
log.Errorf("CrawlDispatcher pds %d uid %d trying to enheap alreadyEnheaped", crawlJob.alreadyEnheaped, crawlJob.act.Uid)
}
c.repoSyncLock.Lock()
defer c.repoSyncLock.Unlock()
pdsJobs, has := c.repoSyncPds[crawlJob.act.PDS]
Expand All @@ -290,10 +295,6 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
return
}
}
if crawlJob.nextInPds != nil {
log.Errorf("CrawlDispatcher internal: attempting to enheap crawlWork which somehow got nextInPds work?, pds %d, dropping?", crawlJob.act.PDS)
return
}
pdsJobs.nextInPds = crawlJob
return
} else {
Expand All @@ -317,7 +318,7 @@ func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
func (c *CrawlDispatcher) nextJob() *crawlWork {
c.repoSyncLock.Lock()
defer c.repoSyncLock.Unlock()
retry:
//retry:
for len(c.repoSyncHeap) == 0 {
c.repoSyncCond.Wait()
}
Expand All @@ -327,13 +328,14 @@ retry:
prev := c.repoSyncPds[crawlJob.act.PDS]
if prev != crawlJob {
log.Errorf("CrawlDispatcher internal: pds %d next is not next in eligible heap, dropping all PDS work", crawlJob.act.PDS)
delete(c.repoSyncPds, crawlJob.act.PDS)
goto retry
//delete(c.repoSyncPds, crawlJob.act.PDS)
//goto retry
}
c.repoSyncPds[crawlJob.act.PDS] = crawlJob.nextInPds
} else {
delete(c.repoSyncPds, crawlJob.act.PDS)
}
//c.repoSyncPds[crawlJob.act.PDS] = crawlJob.nextInPds
} // else {
delete(c.repoSyncPds, crawlJob.act.PDS)
//}
crawlJob.alreadyEnheaped = false
return crawlJob
}

Expand Down
4 changes: 2 additions & 2 deletions indexer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ var reposFetched = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "Number of repos fetched",
}, []string{"status"})

var catchupEventsEnqueued = promauto.NewCounter(prometheus.CounterOpts{
var catchupEventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "indexer_catchup_events_enqueued",
Help: "Number of catchup events enqueued",
})
}, []string{"how"})

var catchupEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "indexer_catchup_events_processed",
Expand Down

0 comments on commit f18240d

Please sign in to comment.