Skip to content

Commit

Permalink
palomar: feedback from review
Browse files Browse the repository at this point in the history
  • Loading branch information
bnewbold committed Sep 15, 2023
1 parent 8b8ab88 commit 8a7e7fd
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
24 changes: 19 additions & 5 deletions cmd/palomar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,18 @@ var runCmd = &cli.Command{
Value: ":3999",
EnvVars: []string{"PALOMAR_BIND"},
},
&cli.IntFlag{
Name: "bgs-sync-rate-limit",
Usage: "max repo sync (checkout) requests per second to upstream (BGS)",
Value: 8,
EnvVars: []string{"PALOMAR_BGS_SYNC_RATE_LIMIT"},
},
&cli.IntFlag{
Name: "index-max-concurrency",
Usage: "max number of concurrent index requests (HTTP POST) to search index",
Value: 20,
EnvVars: []string{"PALOMAR_INDEX_MAX_CONCURRENCY"},
},
},
Action: func(cctx *cli.Context) error {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Expand All @@ -146,17 +158,19 @@ var runCmd = &cli.Command{
TryAuthoritativeDNS: true,
SkipDNSDomainSuffixes: []string{".bsky.social"},
}
dir := identity.NewCacheDirectory(&base, 50000, time.Hour*24, time.Minute*2)
dir := identity.NewCacheDirectory(&base, 200000, time.Hour*24, time.Minute*2)

srv, err := search.NewServer(
db,
escli,
&dir,
search.Config{
BGSHost: cctx.String("atp-bgs-host"),
ProfileIndex: cctx.String("es-profile-index"),
PostIndex: cctx.String("es-post-index"),
Logger: logger,
BGSHost: cctx.String("atp-bgs-host"),
ProfileIndex: cctx.String("es-profile-index"),
PostIndex: cctx.String("es-post-index"),
Logger: logger,
BGSSyncRateLimit: cctx.Int("bgs-sync-rate-limit"),
IndexMaxConcurrency: cctx.Int("index-max-concurrency"),
},
)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion search/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (s *Server) SearchPosts(ctx context.Context, q string, offset, size int) (*
if len(posts) == size && (offset+size) < 10000 {
out.Cursor = fmt.Sprintf("%d", offset+size)
}
fmt.Println(resp.Hits.Total)
if resp.Hits.Total.Relation == "eq" {
out.HitsTotal = &resp.Hits.Total.Value
}
Expand Down
18 changes: 5 additions & 13 deletions search/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,25 +163,17 @@ func DoSearchGeneric(ctx context.Context, escli *es.Client, index, q string) (*E
}

func doSearch(ctx context.Context, escli *es.Client, index string, query interface{}) (*EsSearchResponse, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return nil, fmt.Errorf("error encoding query: %s", err)
}

// re-marshal query as string for logging
if true {
bod, err := json.Marshal(query)
if err != nil {
return nil, fmt.Errorf("failed to serialize query: %w", err)
}
slog.Warn("sending query", "index", index, "query", string(bod))
b, err := json.Marshal(query)
if err != nil {
return nil, fmt.Errorf("failed to serialize query: %w", err)
}
slog.Warn("sending query", "index", index, "query", string(b))

// Perform the search request.
res, err := escli.Search(
escli.Search.WithContext(ctx),
escli.Search.WithIndex(index),
escli.Search.WithBody(&buf),
escli.Search.WithBody(bytes.NewBuffer(b)),
)
if err != nil {
return nil, fmt.Errorf("search query error: %w", err)
Expand Down
22 changes: 16 additions & 6 deletions search/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ type LastSeq struct {
}

type Config struct {
BGSHost string
ProfileIndex string
PostIndex string
Logger *slog.Logger
BGSHost string
ProfileIndex string
PostIndex string
Logger *slog.Logger
BGSSyncRateLimit int
IndexMaxConcurrency int
}

func NewServer(db *gorm.DB, escli *es.Client, dir identity.Directory, config Config) (*Server, error) {
Expand Down Expand Up @@ -85,8 +87,16 @@ func NewServer(db *gorm.DB, escli *es.Client, dir identity.Directory, config Con

bfstore := backfill.NewGormstore(db)
opts := backfill.DefaultBackfillOptions()
opts.ParallelRecordCreates = 20
opts.SyncRequestsPerSecond = 8
if config.BGSSyncRateLimit > 0 {
opts.SyncRequestsPerSecond = config.BGSSyncRateLimit
} else {
opts.SyncRequestsPerSecond = 8
}
if config.IndexMaxConcurrency > 0 {
opts.ParallelRecordCreates = config.IndexMaxConcurrency
} else {
opts.ParallelRecordCreates = 20
}
opts.NSIDFilter = "app.bsky."
bf := backfill.NewBackfiller(
"search",
Expand Down

0 comments on commit 8a7e7fd

Please sign in to comment.