Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer crawler in order by next rate-limit eligibility #819

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ func stringLink(lnk *lexutil.LexLink) string {
return lnk.String()
}

// called from fedmgr.go Slurper.handleConnection() through .cb
func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *events.XRPCStreamEvent) error {
ctx, span := tracer.Start(ctx, "handleFedEvent")
defer span.End()
Expand Down
213 changes: 157 additions & 56 deletions indexer/crawler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package indexer

import (
"container/heap"
"context"
"fmt"
"golang.org/x/time/rate"
"sync"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/models"
Expand All @@ -14,8 +17,6 @@ import (
type CrawlDispatcher struct {
ingest chan *models.ActorInfo

repoSync chan *crawlWork

catchup chan *crawlWork

complete chan models.Uid
Expand All @@ -24,26 +25,40 @@ type CrawlDispatcher struct {
todo map[models.Uid]*crawlWork
inProgress map[models.Uid]*crawlWork

doRepoCrawl func(context.Context, *crawlWork) error
repoFetcher CrawlRepoFetcher

concurrency int

repoSyncHeap []*crawlWork
// map [pdsID] *crawlWork pending jobs for that PDS, head of linked list on .nextInPds
repoSyncPds map[uint]*crawlWork
repoSyncLock sync.Mutex
repoSyncCond sync.Cond
}

// this is what we need of RepoFetcher, made interface so it can be passed in without dependency
type CrawlRepoFetcher interface {
FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
GetOrCreateLimiterLazy(pdsID uint) *rate.Limiter
}

func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) {
func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int) (*CrawlDispatcher, error) {
if concurrency < 1 {
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
}

return &CrawlDispatcher{
out := &CrawlDispatcher{
ingest: make(chan *models.ActorInfo),
repoSync: make(chan *crawlWork),
complete: make(chan models.Uid),
catchup: make(chan *crawlWork),
doRepoCrawl: repoFn,
repoFetcher: repoFetcher,
concurrency: concurrency,
todo: make(map[models.Uid]*crawlWork),
inProgress: make(map[models.Uid]*crawlWork),
}, nil
repoSyncPds: make(map[uint]*crawlWork),
}
out.repoSyncCond.L = &out.repoSyncLock
return out, nil
}

func (c *CrawlDispatcher) Run() {
Expand Down Expand Up @@ -71,49 +86,22 @@ type crawlWork struct {
// for events that come in while this actor is being processed
// next items are processed after the crawl
next []*catchupJob

eligibleTime time.Time
nextInPds *crawlWork
alreadyEnheaped bool
}

func (c *CrawlDispatcher) mainLoop() {
var nextDispatchedJob *crawlWork
var jobsAwaitingDispatch []*crawlWork

// dispatchQueue represents the repoSync worker channel to which we dispatch crawl work
var dispatchQueue chan *crawlWork

for {
var crawlJob *crawlWork = nil
select {
case actorToCrawl := <-c.ingest:
// TODO: max buffer size
crawlJob := c.enqueueJobForActor(actorToCrawl)
if crawlJob == nil {
break
}

if nextDispatchedJob == nil {
nextDispatchedJob = crawlJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob)
}
case dispatchQueue <- nextDispatchedJob:
c.dequeueJob(nextDispatchedJob)

if len(jobsAwaitingDispatch) > 0 {
nextDispatchedJob = jobsAwaitingDispatch[0]
jobsAwaitingDispatch = jobsAwaitingDispatch[1:]
} else {
nextDispatchedJob = nil
dispatchQueue = nil
}
case catchupJob := <-c.catchup:
crawlJob = c.enqueueJobForActor(actorToCrawl)
case crawlJob = <-c.catchup:
// CatchupJobs are for processing events that come in while a crawl is in progress
// They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress
if nextDispatchedJob == nil {
nextDispatchedJob = catchupJob
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob)
}
case uid := <-c.complete:
c.maplk.Lock()

Expand All @@ -130,15 +118,21 @@ func (c *CrawlDispatcher) mainLoop() {
job.initScrape = false
job.catchup = job.next
job.next = nil
if nextDispatchedJob == nil {
nextDispatchedJob = job
dispatchQueue = c.repoSync
} else {
jobsAwaitingDispatch = append(jobsAwaitingDispatch, job)
}
crawlJob = job
}
c.maplk.Unlock()
}
if crawlJob != nil {
pdsID := crawlJob.act.PDS
limiter := c.repoFetcher.GetOrCreateLimiterLazy(pdsID)
now := time.Now()
wouldDelay := limiter.ReserveN(now, 1).DelayFrom(now)
crawlJob.eligibleTime = now.Add(wouldDelay)
// put crawl job on heap sorted by eligible time
c.enheapJob(crawlJob)
c.dequeueJob(crawlJob)
crawlJob = nil
}
}
}

Expand Down Expand Up @@ -173,24 +167,26 @@ func (c *CrawlDispatcher) dequeueJob(job *crawlWork) {
}

func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
catchupEventsEnqueued.Inc()
c.maplk.Lock()
defer c.maplk.Unlock()

// If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl
job, ok := c.todo[catchup.user.Uid]
if ok {
catchupEventsEnqueued.WithLabelValues("todo").Inc()
job.catchup = append(job.catchup, catchup)
return nil
}

// If the actor crawl is in progress, we can append to the nextr queue which gets emptied after the crawl
job, ok = c.inProgress[catchup.user.Uid]
if ok {
catchupEventsEnqueued.WithLabelValues("prog").Inc()
job.next = append(job.next, catchup)
return nil
}

catchupEventsEnqueued.WithLabelValues("new").Inc()
// Otherwise, we need to create a new crawl job for this actor and enqueue it
cw := &crawlWork{
act: catchup.user,
Expand All @@ -202,14 +198,18 @@ func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {

func (c *CrawlDispatcher) fetchWorker() {
for {
select {
case job := <-c.repoSync:
if err := c.doRepoCrawl(context.TODO(), job); err != nil {
log.Errorf("failed to perform repo crawl of %q: %s", job.act.Did, err)
}
job := c.nextJob()
nextInPds := job.nextInPds
job.nextInPds = nil
if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
log.Errorf("failed to perform repo crawl of %q: %s", job.act.Did, err)
}

// TODO: do we still just do this if it errors?
c.complete <- job.act.Uid
// TODO: do we still just do this if it errors?
c.complete <- job.act.Uid

if nextInPds != nil {
c.enheapJob(nextInPds)
}
}
}
Expand Down Expand Up @@ -269,3 +269,104 @@ func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bo

return false
}

// priority-queue for crawlJob based on eligibleTime
func (c *CrawlDispatcher) enheapJob(crawlJob *crawlWork) {
if crawlJob.alreadyEnheaped {
log.Errorf("CrawlDispatcher pds %d uid %d trying to enheap alreadyEnheaped", crawlJob.alreadyEnheaped, crawlJob.act.Uid)
}
c.repoSyncLock.Lock()
defer c.repoSyncLock.Unlock()
pdsJobs, has := c.repoSyncPds[crawlJob.act.PDS]
if has {
if !pdsJobs.alreadyEnheaped {
heap.Push(c, crawlJob)
catchupPending.Set(float64(len(c.repoSyncHeap)))
c.repoSyncCond.Signal()
pdsJobs.alreadyEnheaped = true
}
if pdsJobs == crawlJob {
return
}
for pdsJobs.nextInPds != nil {
pdsJobs = pdsJobs.nextInPds
if pdsJobs == crawlJob {
// we re-enheap something later? weird but okay?
return
}
}
pdsJobs.nextInPds = crawlJob
return
} else {
c.repoSyncPds[crawlJob.act.PDS] = crawlJob
}
if !crawlJob.alreadyEnheaped {
heap.Push(c, crawlJob)
catchupPending.Set(float64(len(c.repoSyncHeap)))
c.repoSyncCond.Signal()
crawlJob.alreadyEnheaped = true
}
}

// nextJob returns next available crawlJob based on eligibleTime; block until some available.
// The caller of .nextJob() should .enheapJob(crawlJob.nextInPds) if any after executing crawlJob's work
//
// There's a tiny race where .nextJob() could return the only work for a PDS,
// outside event could .enheapJob() a next one for that PDS,
// get enheaped as available immediately because the rate limiter hasn't ticked the work done from .nextJob() above,
// and then the worker trying to execute the next enheaped work for the PDS would execute immediately but Sleep() to wait for the rate limiter.
// We will call this 'not too bad', 'good enough for now'. -- bolson 2024-11
func (c *CrawlDispatcher) nextJob() *crawlWork {
c.repoSyncLock.Lock()
defer c.repoSyncLock.Unlock()
for len(c.repoSyncHeap) == 0 {
c.repoSyncCond.Wait()
}
x := heap.Pop(c)
catchupPending.Set(float64(len(c.repoSyncHeap)))
crawlJob := x.(*crawlWork)
if crawlJob.nextInPds != nil {
prev := c.repoSyncPds[crawlJob.act.PDS]
if prev != crawlJob {
log.Errorf("CrawlDispatcher internal: pds %d next is not next in eligible heap", crawlJob.act.PDS)
}
}
delete(c.repoSyncPds, crawlJob.act.PDS)
crawlJob.alreadyEnheaped = false
return crawlJob
}

// part of container/heap.Interface and sort.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Len() int {
return len(c.repoSyncHeap)
}

// part of container/heap.Interface and sort.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Less(i, j int) bool {
return c.repoSyncHeap[i].eligibleTime.Before(c.repoSyncHeap[j].eligibleTime)
}

// part of container/heap.Interface and sort.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Swap(i, j int) {
t := c.repoSyncHeap[i]
c.repoSyncHeap[i] = c.repoSyncHeap[j]
c.repoSyncHeap[j] = t
}

// part of container/heap.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Push(x any) {
c.repoSyncHeap = append(c.repoSyncHeap, x.(*crawlWork))
}

// part of container/heap.Interface
// c.repoSyncLock MUST ALREADY BE HELD BEFORE HEAP OPERATIONS
func (c *CrawlDispatcher) Pop() any {
heaplen := len(c.repoSyncHeap)
out := c.repoSyncHeap[heaplen-1]
c.repoSyncHeap = c.repoSyncHeap[:heaplen-1]
return out
}
2 changes: 1 addition & 1 deletion indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events
}

if crawl {
c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency)
c, err := NewCrawlDispatcher(fetcher, fetcher.MaxConcurrency)
if err != nil {
return nil, err
}
Expand Down
14 changes: 12 additions & 2 deletions indexer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,22 @@ var reposFetched = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "Number of repos fetched",
}, []string{"status"})

var catchupEventsEnqueued = promauto.NewCounter(prometheus.CounterOpts{
var catchupEventsEnqueued = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "indexer_catchup_events_enqueued",
Help: "Number of catchup events enqueued",
})
}, []string{"how"})

var catchupEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "indexer_catchup_events_processed",
Help: "Number of catchup events processed",
})

var catchupEventsFailed = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "indexer_catchup_events_failed",
Help: "Number of catchup events processed",
}, []string{"err"})

var catchupPending = promauto.NewGauge(prometheus.GaugeOpts{
Name: "indexer_catchup_pending",
Help: "Number of catchup pending",
})
Loading
Loading