From 85b52ddd8dc5627bd99786088767b7a544cc6555 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Tue, 19 Nov 2024 15:53:03 -0500 Subject: [PATCH] stream resync to indexer crawler --- bgs/bgs.go | 41 ++++++++++++----------------------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index b64df715e..18eab263f 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -1646,15 +1646,13 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { log.Warnw("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start)) resync = bgs.SetResyncStatus(pds.ID, "checking revs") - // Create a buffered channel for collecting results - results := make(chan revCheckResult, len(repos)) + // run loop over repos with some concurrency sem := semaphore.NewWeighted(40) // Check repo revs against our local copy and enqueue crawls for any that are out of date - for _, r := range repos { + for i, r := range repos { if err := sem.Acquire(ctx, 1); err != nil { log.Errorw("failed to acquire semaphore", "error", err) - results <- revCheckResult{err: err} continue } go func(r comatproto.SyncListRepos_Repo) { @@ -1664,56 +1662,41 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error { ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did) if err != nil { log.Errorw("failed to get user while resyncing PDS, we can't recrawl it", "error", err) - results <- revCheckResult{err: err} return } rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid) if err != nil { log.Warnw("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid) - results <- revCheckResult{ai: ai} + err := bgs.Index.Crawler.Crawl(ctx, ai) + if err != nil { + log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did) + } return } if rev == "" || rev < r.Rev { log.Warnw("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev) - results <- revCheckResult{ai: ai} + err := bgs.Index.Crawler.Crawl(ctx, ai) + if err != nil { + log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did) + } return } - - results <- revCheckResult{} }(r) - } - - var numReposToResync int - for i := 0; i < len(repos); i++ { - res := <-results - if res.err != nil { - log.Errorw("failed to process repo during resync", "error", res.err) - - } - if res.ai != nil { - numReposToResync++ - err := bgs.Index.Crawler.Crawl(ctx, res.ai) - if err != nil { - log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", res.ai.Uid, "did", res.ai.Did) - } - } if i%100 == 0 { if i%10_000 == 0 { - log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt)) + log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt)) } resync.NumReposChecked = i - resync.NumReposToResync = numReposToResync bgs.UpdateResync(resync) } } resync.NumReposChecked = len(repos) - resync.NumReposToResync = numReposToResync bgs.UpdateResync(resync) - log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", numReposToResync) + log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1) return nil }