Skip to content

Commit

Permalink
stream resync to indexer crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Nov 20, 2024
1 parent 9198b79 commit 85b52dd
Showing 1 changed file with 12 additions and 29 deletions.
41 changes: 12 additions & 29 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,15 +1646,13 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
log.Warnw("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start))
resync = bgs.SetResyncStatus(pds.ID, "checking revs")

// Create a buffered channel for collecting results
results := make(chan revCheckResult, len(repos))
// run loop over repos with some concurrency
sem := semaphore.NewWeighted(40)

// Check repo revs against our local copy and enqueue crawls for any that are out of date
for _, r := range repos {
for i, r := range repos {
if err := sem.Acquire(ctx, 1); err != nil {
log.Errorw("failed to acquire semaphore", "error", err)
results <- revCheckResult{err: err}
continue
}
go func(r comatproto.SyncListRepos_Repo) {
Expand All @@ -1664,56 +1662,41 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did)
if err != nil {
log.Errorw("failed to get user while resyncing PDS, we can't recrawl it", "error", err)
results <- revCheckResult{err: err}
return
}

rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid)
if err != nil {
log.Warnw("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid)
results <- revCheckResult{ai: ai}
err := bgs.Index.Crawler.Crawl(ctx, ai)
if err != nil {
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
}
return
}

if rev == "" || rev < r.Rev {
log.Warnw("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev)
results <- revCheckResult{ai: ai}
err := bgs.Index.Crawler.Crawl(ctx, ai)
if err != nil {
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
}
return
}

results <- revCheckResult{}
}(r)
}

var numReposToResync int
for i := 0; i < len(repos); i++ {
res := <-results
if res.err != nil {
log.Errorw("failed to process repo during resync", "error", res.err)

}
if res.ai != nil {
numReposToResync++
err := bgs.Index.Crawler.Crawl(ctx, res.ai)
if err != nil {
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", res.ai.Uid, "did", res.ai.Did)
}
}
if i%100 == 0 {
if i%10_000 == 0 {
log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt))
log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt))
}
resync.NumReposChecked = i
resync.NumReposToResync = numReposToResync
bgs.UpdateResync(resync)
}
}

resync.NumReposChecked = len(repos)
resync.NumReposToResync = numReposToResync
bgs.UpdateResync(resync)

log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", numReposToResync)
log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1)

return nil
}

0 comments on commit 85b52dd

Please sign in to comment.