Skip to content

Commit

Permalink
palomar: more progress
Browse files Browse the repository at this point in the history
  • Loading branch information
bnewbold committed Aug 14, 2023
1 parent 2e82054 commit 9174fd5
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 62 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ test-coverage.out
/lexgen
/stress
/labelmaker
/palomar

# Don't ignore this file itself, or other specific dotfiles
!.gitignore
Expand Down
1 change: 1 addition & 0 deletions cmd/palomar/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ You can run test queries from the top level of the repository:

go run ./cmd/palomar search-post "hello"
go run ./cmd/palomar search-profile "hello"
go run ./cmd/palomar search-profile -typeahead "h"

## Configuration

Expand Down
108 changes: 72 additions & 36 deletions cmd/palomar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ func run(args []string) error {
Name: "elastic-hosts",
Usage: "elasticsearch hosts (schema/host/port)",
Value: "http://localhost:9200",
EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS"},
EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS", "OPENSEARCH_URL", "ELASTICSEARCH_URL"},
},
&cli.StringFlag{
Name: "es-post-index",
Usage: "ES index for 'post' documents",
Value: "posts",
Value: "palomar_post",
EnvVars: []string{"ES_POST_INDEX"},
},
&cli.StringFlag{
Name: "es-profile-index",
Usage: "ES index for 'profile' documents",
Value: "profiles",
Value: "palomar_profile",
EnvVars: []string{"ES_PROFILE_INDEX"},
},
&cli.StringFlag{
Expand All @@ -93,9 +93,10 @@ func run(args []string) error {
}

app.Commands = []*cli.Command{
elasticCheckCmd,
searchCmd,
runCmd,
elasticCheckCmd,
searchPostCmd,
searchProfileCmd,
}

return app.Run(args)
Expand All @@ -106,9 +107,8 @@ var runCmd = &cli.Command{
Usage: "combined indexing+query server",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "database-url",
// XXX: data/palomar/search.db
Value: "sqlite://data/thecloud.db",
Name: "database-url",
Value: "sqlite://data/palomar/search.db",
EnvVars: []string{"DATABASE_URL"},
},
&cli.BoolFlag{
Expand Down Expand Up @@ -139,6 +139,8 @@ var runCmd = &cli.Command{
cctx.String("atp-plc-host"),
cctx.String("atp-pds-host"),
cctx.String("atp-bgs-host"),
cctx.String("es-profile-index"),
cctx.String("es-post-index"),
)
if err != nil {
return err
Expand Down Expand Up @@ -186,42 +188,76 @@ var elasticCheckCmd = &cli.Command{
},
}

var searchCmd = &cli.Command{
Name: "search",
Usage: "run a simple query against search index",
Action: func(cctx *cli.Context) error {
escli, err := createEsClient(cctx)
if err != nil {
return err
}
func queryIndex(cctx *cli.Context, index string) error {
escli, err := createEsClient(cctx)
if err != nil {
return err
}

var buf bytes.Buffer
query := map[string]interface{}{
var buf bytes.Buffer
var query map[string]interface{}
if cctx.Bool("typeahead") == true {
query = map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"text": cctx.Args().First(),
"multi_match": map[string]interface{}{
"query": strings.Join(cctx.Args().Slice(), " "),
"type": "bool_prefix",
"fields": []string{
"typeahead",
"typeahead._2gram",
"typeahead._3gram",
},
},
},
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
} else {
query = map[string]interface{}{
"query": map[string]interface{}{
"query_string": map[string]interface{}{
"query": strings.Join(cctx.Args().Slice(), " "),
"default_field": "everything",
},
},
}
}
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s", err)
}

// Perform the search request.
res, err := escli.Search(
escli.Search.WithContext(context.Background()),
escli.Search.WithIndex(cctx.String("es-posts-index")),
escli.Search.WithBody(&buf),
escli.Search.WithTrackTotalHits(true),
escli.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
// Perform the search request.
res, err := escli.Search(
escli.Search.WithContext(context.Background()),
escli.Search.WithIndex(index),
escli.Search.WithBody(&buf),
escli.Search.WithTrackTotalHits(true),
escli.Search.WithPretty(),
)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}

fmt.Println(res)
return nil
fmt.Println(res)
return nil
}

var searchPostCmd = &cli.Command{
Name: "search-post",
Usage: "run a simple query against posts index",
Action: func(cctx *cli.Context) error {
return queryIndex(cctx, cctx.String("es-post-index"))
},
}

var searchProfileCmd = &cli.Command{
Name: "search-profile",
Usage: "run a simple query against posts index",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "typeahead",
},
},
Action: func(cctx *cli.Context) error {
return queryIndex(cctx, cctx.String("es-profile-index"))
},
}

Expand Down Expand Up @@ -261,7 +297,7 @@ func createEsClient(cctx *cli.Context) (*es.Client, error) {
return nil, fmt.Errorf("cannot get escli info: %w", err)
}
defer info.Body.Close()
log.Info(info)
log.Debug(info)

return escli, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ require (
github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/rivo/uniseg v0.1.0
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.25.1
github.com/whyrusleeping/cbor-gen v0.0.0-20230331140348-1f892b517e70
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0=
github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9dFqnUakOjnEuMPJJJnI=
github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand Down
35 changes: 24 additions & 11 deletions search/indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"
"time"

bsky "github.com/bluesky-social/indigo/api/bsky"
Expand All @@ -17,7 +19,7 @@ import (
func (s *Server) deletePost(ctx context.Context, u *User, path string) error {
log.Infof("deleting post: %s", path)
req := esapi.DeleteRequest{
Index: "posts",
Index: s.postIndex,
DocumentID: encodeDocumentID(u.ID, path),
Refresh: "true",
}
Expand All @@ -32,10 +34,20 @@ func (s *Server) deletePost(ctx context.Context, u *User, path string) error {
return nil
}

func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid string, pcid cid.Cid) error {
func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, path string, pcid cid.Cid) error {

parts := strings.SplitN(path, "/", 3)
var tidRegex = regexp.MustCompile(`^[234567abcdefghijklmnopqrstuvwxyz]{13}$`)
if len(parts) != 2 || !tidRegex.MatchString(parts[1]) {
log.Warnf("Skipping post record with weird path/TID did=%s path=%s", u.Did, path)
return nil
}
rkey := parts[1]

// TODO: just skip this part?
if err := s.db.Create(&PostRef{
Cid: pcid.String(),
Tid: tid,
Tid: rkey,
Uid: u.ID,
}).Error; err != nil {
return err
Expand All @@ -44,18 +56,18 @@ func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid
// TODO: is this needed? what happens if we try to index w/ invalid timestamp?
_, err := time.Parse(util.ISO8601, rec.CreatedAt)
if err != nil {
return fmt.Errorf("post (%d, %s) had invalid timestamp (%q): %w", u.ID, tid, rec.CreatedAt, err)
return fmt.Errorf("post (%d, %s) had invalid timestamp (%q): %w", u.ID, rkey, rec.CreatedAt, err)
}

doc := TransformPost(rec, u, tid, pcid.String())
doc := TransformPost(rec, u, rkey, pcid.String())
b, err := json.Marshal(doc)
if err != nil {
return err
}

log.Infof("Indexing post")
req := esapi.IndexRequest{
Index: "posts",
Index: s.postIndex,
DocumentID: doc.DocId(),
Body: bytes.NewReader(b),
Refresh: "true",
Expand All @@ -71,10 +83,11 @@ func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid
return nil
}

func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfile, rkey string, pcid cid.Cid) error {
func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfile, path string, pcid cid.Cid) error {

if rkey != "self" {
log.Warnf("Skipping non-canonical profile record did=%s rkey=%s", u.Did, rkey)
parts := strings.SplitN(path, "/", 3)
if len(parts) != 2 || parts[1] != "self" {
log.Warnf("Skipping non-canonical profile record did=%s path=%s", u.Did, path)
return nil
}

Expand All @@ -90,7 +103,7 @@ func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfi
return err
}
req := esapi.IndexRequest{
Index: "profiles",
Index: s.profileIndex,
DocumentID: fmt.Sprint(u.ID),
Body: bytes.NewReader(b),
Refresh: "true",
Expand Down Expand Up @@ -131,7 +144,7 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error
}

req := esapi.UpdateRequest{
Index: "profiles",
Index: s.profileIndex,
DocumentID: fmt.Sprint(u.ID),
Body: bytes.NewReader(b),
Refresh: "true",
Expand Down
39 changes: 24 additions & 15 deletions search/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ import (
var log = logging.Logger("search")

type Server struct {
escli *es.Client
db *gorm.DB
bgshost string
xrpcc *xrpc.Client
bgsxrpc *xrpc.Client
plc *api.PLCServer
echo *echo.Echo
escli *es.Client
postIndex string
profileIndex string
db *gorm.DB
bgshost string
xrpcc *xrpc.Client
bgsxrpc *xrpc.Client
plc *api.PLCServer
echo *echo.Echo

userCache *lru.Cache
}
Expand All @@ -66,7 +68,7 @@ type LastSeq struct {
Seq int64
}

func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) (*Server, error) {
func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost, profileIndex, postIndex string) (*Server, error) {

log.Info("Migrating database")
db.AutoMigrate(&PostRef{})
Expand Down Expand Up @@ -94,13 +96,15 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string)

ucache, _ := lru.New(100000)
s := &Server{
escli: escli,
db: db,
bgshost: bgsHost,
xrpcc: xc,
bgsxrpc: bgsxrpc,
plc: plc,
userCache: ucache,
escli: escli,
profileIndex: profileIndex,
postIndex: postIndex,
db: db,
bgshost: bgsHost,
xrpcc: xc,
bgsxrpc: bgsxrpc,
plc: plc,
userCache: ucache,
}
return s, nil
}
Expand Down Expand Up @@ -157,6 +161,10 @@ func (s *Server) RunIndexer(ctx context.Context) error {
}

for _, op := range evt.Ops {
// filter out all records other than "post" and "profile"
if !(strings.HasPrefix(op.Path, "app.bsky.feed.post/") || strings.HasPrefix(op.Path, "app.bsky.actor.profile/")) {
continue
}
ek := repomgr.EventKind(op.Action)
switch ek {
case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
Expand Down Expand Up @@ -469,6 +477,7 @@ func (s *Server) RunAPI(listen string) error {
e.GET("/_health", s.handleHealthCheck)
e.GET("/search/posts", s.handleSearchRequestPosts)
e.GET("/search/profiles", s.handleSearchRequestProfiles)
// XXX: e.GET("/search/profiles-typeahead", s.handleSearchRequestProfiles)
s.echo = e

log.Infof("starting search API daemon at: %s", listen)
Expand Down

0 comments on commit 9174fd5

Please sign in to comment.