diff --git a/cmd/palomar/main.go b/cmd/palomar/main.go index b75e9a480..1b8641c79 100644 --- a/cmd/palomar/main.go +++ b/cmd/palomar/main.go @@ -120,6 +120,18 @@ var runCmd = &cli.Command{ Value: ":3999", EnvVars: []string{"PALOMAR_BIND"}, }, + &cli.IntFlag{ + Name: "bgs-sync-rate-limit", + Usage: "max repo sync (checkout) requests per second to upstream (BGS)", + Value: 8, + EnvVars: []string{"PALOMAR_BGS_SYNC_RATE_LIMIT"}, + }, + &cli.IntFlag{ + Name: "index-max-concurrency", + Usage: "max number of concurrent index requests (HTTP POST) to search index", + Value: 20, + EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"}, + }, }, Action: func(cctx *cli.Context) error { logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ @@ -146,17 +158,19 @@ var runCmd = &cli.Command{ TryAuthoritativeDNS: true, SkipDNSDomainSuffixes: []string{".bsky.social"}, } - dir := identity.NewCacheDirectory(&base, 50000, time.Hour*24, time.Minute*2) + dir := identity.NewCacheDirectory(&base, 200000, time.Hour*24, time.Minute*2) srv, err := search.NewServer( db, escli, &dir, search.Config{ - BGSHost: cctx.String("atp-bgs-host"), - ProfileIndex: cctx.String("es-profile-index"), - PostIndex: cctx.String("es-post-index"), - Logger: logger, + BGSHost: cctx.String("atp-bgs-host"), + ProfileIndex: cctx.String("es-profile-index"), + PostIndex: cctx.String("es-post-index"), + Logger: logger, + BGSSyncRateLimit: cctx.Int("bgs-sync-rate-limit"), + IndexMaxConcurrency: cctx.Int("index-max-concurrency"), }, ) if err != nil { diff --git a/search/handlers.go b/search/handlers.go index 5693ee3c0..6232da073 100644 --- a/search/handlers.go +++ b/search/handlers.go @@ -148,7 +148,6 @@ func (s *Server) SearchPosts(ctx context.Context, q string, offset, size int) (* if len(posts) == size && (offset+size) < 10000 { out.Cursor = fmt.Sprintf("%d", offset+size) } - fmt.Println(resp.Hits.Total) if resp.Hits.Total.Relation == "eq" { out.HitsTotal = &resp.Hits.Total.Value } diff --git a/search/query.go b/search/query.go index a51c820cf..0e346be6a 100644 --- a/search/query.go +++ b/search/query.go @@ -163,25 +163,17 @@ func DoSearchGeneric(ctx context.Context, escli *es.Client, index, q string) (*E } func doSearch(ctx context.Context, escli *es.Client, index string, query interface{}) (*EsSearchResponse, error) { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(query); err != nil { - return nil, fmt.Errorf("error encoding query: %s", err) - } - - // re-marshal query as string for logging - if true { - bod, err := json.Marshal(query) - if err != nil { - return nil, fmt.Errorf("failed to serialize query: %w", err) - } - slog.Warn("sending query", "index", index, "query", string(bod)) + b, err := json.Marshal(query) + if err != nil { + return nil, fmt.Errorf("failed to serialize query: %w", err) } + slog.Warn("sending query", "index", index, "query", string(b)) // Perform the search request. res, err := escli.Search( escli.Search.WithContext(ctx), escli.Search.WithIndex(index), - escli.Search.WithBody(&buf), + escli.Search.WithBody(bytes.NewBuffer(b)), ) if err != nil { return nil, fmt.Errorf("search query error: %w", err) diff --git a/search/server.go b/search/server.go index 84001197f..8f7f60160 100644 --- a/search/server.go +++ b/search/server.go @@ -43,10 +43,12 @@ type LastSeq struct { } type Config struct { - BGSHost string - ProfileIndex string - PostIndex string - Logger *slog.Logger + BGSHost string + ProfileIndex string + PostIndex string + Logger *slog.Logger + BGSSyncRateLimit int + IndexMaxConcurrency int } func NewServer(db *gorm.DB, escli *es.Client, dir identity.Directory, config Config) (*Server, error) { @@ -85,8 +87,16 @@ func NewServer(db *gorm.DB, escli *es.Client, dir identity.Directory, config Con bfstore := backfill.NewGormstore(db) opts := backfill.DefaultBackfillOptions() - opts.ParallelRecordCreates = 20 - opts.SyncRequestsPerSecond = 8 + if config.BGSSyncRateLimit > 0 { + opts.SyncRequestsPerSecond = config.BGSSyncRateLimit + } else { + opts.SyncRequestsPerSecond = 8 + } + if config.IndexMaxConcurrency > 0 { + opts.ParallelRecordCreates = config.IndexMaxConcurrency + } else { + opts.ParallelRecordCreates = 20 + } opts.NSIDFilter = "app.bsky." bf := backfill.NewBackfiller( "search",