From 9174fd5a0b5c5000a2627dc2eed01c9f81a48bdc Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Sun, 13 Aug 2023 23:30:20 -0700 Subject: [PATCH] palomar: more progress --- .gitignore | 1 + cmd/palomar/README.md | 1 + cmd/palomar/main.go | 108 ++++++++++++++++++++++++++++-------------- go.mod | 1 + go.sum | 1 + search/indexing.go | 35 +++++++++----- search/server.go | 39 +++++++++------ 7 files changed, 124 insertions(+), 62 deletions(-) diff --git a/.gitignore b/.gitignore index d3e24cb4b..75d28d8d8 100644 --- a/.gitignore +++ b/.gitignore @@ -29,6 +29,7 @@ test-coverage.out /lexgen /stress /labelmaker +/palomar # Don't ignore this file itself, or other specific dotfiles !.gitignore diff --git a/cmd/palomar/README.md b/cmd/palomar/README.md index 75f61845c..12a7d0af3 100644 --- a/cmd/palomar/README.md +++ b/cmd/palomar/README.md @@ -44,6 +44,7 @@ You can run test queries from the top level of the repository: go run ./cmd/palomar search-post "hello" go run ./cmd/palomar search-profile "hello" + go run ./cmd/palomar search-profile -typeahead "h" ## Configuration diff --git a/cmd/palomar/main.go b/cmd/palomar/main.go index 19de91835..54636196f 100644 --- a/cmd/palomar/main.go +++ b/cmd/palomar/main.go @@ -57,18 +57,18 @@ func run(args []string) error { Name: "elastic-hosts", Usage: "elasticsearch hosts (schema/host/port)", Value: "http://localhost:9200", - EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS"}, + EnvVars: []string{"ES_HOSTS", "ELASTIC_HOSTS", "OPENSEARCH_URL", "ELASTICSEARCH_URL"}, }, &cli.StringFlag{ Name: "es-post-index", Usage: "ES index for 'post' documents", - Value: "posts", + Value: "palomar_post", EnvVars: []string{"ES_POST_INDEX"}, }, &cli.StringFlag{ Name: "es-profile-index", Usage: "ES index for 'profile' documents", - Value: "profiles", + Value: "palomar_profile", EnvVars: []string{"ES_PROFILE_INDEX"}, }, &cli.StringFlag{ @@ -93,9 +93,10 @@ func run(args []string) error { } app.Commands = []*cli.Command{ - elasticCheckCmd, - searchCmd, runCmd, + elasticCheckCmd, + searchPostCmd, + searchProfileCmd, } return app.Run(args) @@ -106,9 +107,8 @@ var runCmd = &cli.Command{ Usage: "combined indexing+query server", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "database-url", - // XXX: data/palomar/search.db - Value: "sqlite://data/thecloud.db", + Name: "database-url", + Value: "sqlite://data/palomar/search.db", EnvVars: []string{"DATABASE_URL"}, }, &cli.BoolFlag{ @@ -139,6 +139,8 @@ var runCmd = &cli.Command{ cctx.String("atp-plc-host"), cctx.String("atp-pds-host"), cctx.String("atp-bgs-host"), + cctx.String("es-profile-index"), + cctx.String("es-post-index"), ) if err != nil { return err @@ -186,42 +188,76 @@ var elasticCheckCmd = &cli.Command{ }, } -var searchCmd = &cli.Command{ - Name: "search", - Usage: "run a simple query against search index", - Action: func(cctx *cli.Context) error { - escli, err := createEsClient(cctx) - if err != nil { - return err - } +func queryIndex(cctx *cli.Context, index string) error { + escli, err := createEsClient(cctx) + if err != nil { + return err + } - var buf bytes.Buffer - query := map[string]interface{}{ + var buf bytes.Buffer + var query map[string]interface{} + if cctx.Bool("typeahead") == true { + query = map[string]interface{}{ "query": map[string]interface{}{ - "match": map[string]interface{}{ - "text": cctx.Args().First(), + "multi_match": map[string]interface{}{ + "query": strings.Join(cctx.Args().Slice(), " "), + "type": "bool_prefix", + "fields": []string{ + "typeahead", + "typeahead._2gram", + "typeahead._3gram", + }, }, }, } - if err := json.NewEncoder(&buf).Encode(query); err != nil { - log.Fatalf("Error encoding query: %s", err) + } else { + query = map[string]interface{}{ + "query": map[string]interface{}{ + "query_string": map[string]interface{}{ + "query": strings.Join(cctx.Args().Slice(), " "), + "default_field": "everything", + }, + }, } + } + if err := json.NewEncoder(&buf).Encode(query); err != nil { + log.Fatalf("Error encoding query: %s", err) + } - // Perform the search request. - res, err := escli.Search( - escli.Search.WithContext(context.Background()), - escli.Search.WithIndex(cctx.String("es-posts-index")), - escli.Search.WithBody(&buf), - escli.Search.WithTrackTotalHits(true), - escli.Search.WithPretty(), - ) - if err != nil { - log.Fatalf("Error getting response: %s", err) - } + // Perform the search request. + res, err := escli.Search( + escli.Search.WithContext(context.Background()), + escli.Search.WithIndex(index), + escli.Search.WithBody(&buf), + escli.Search.WithTrackTotalHits(true), + escli.Search.WithPretty(), + ) + if err != nil { + log.Fatalf("Error getting response: %s", err) + } - fmt.Println(res) - return nil + fmt.Println(res) + return nil +} +var searchPostCmd = &cli.Command{ + Name: "search-post", + Usage: "run a simple query against posts index", + Action: func(cctx *cli.Context) error { + return queryIndex(cctx, cctx.String("es-post-index")) + }, +} + +var searchProfileCmd = &cli.Command{ + Name: "search-profile", + Usage: "run a simple query against posts index", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "typeahead", + }, + }, + Action: func(cctx *cli.Context) error { + return queryIndex(cctx, cctx.String("es-profile-index")) }, } @@ -261,7 +297,7 @@ func createEsClient(cctx *cli.Context) (*es.Client, error) { return nil, fmt.Errorf("cannot get escli info: %w", err) } defer info.Body.Close() - log.Info(info) + log.Debug(info) return escli, nil } diff --git a/go.mod b/go.mod index d80b032ad..25a5ddc1f 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 + github.com/rivo/uniseg v0.1.0 github.com/stretchr/testify v1.8.4 github.com/urfave/cli/v2 v2.25.1 github.com/whyrusleeping/cbor-gen v0.0.0-20230331140348-1f892b517e70 diff --git a/go.sum b/go.sum index 8b369b7ea..25aa86025 100644 --- a/go.sum +++ b/go.sum @@ -501,6 +501,7 @@ github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJf github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0= github.com/prometheus/statsd_exporter v0.22.7/go.mod h1:N/TevpjkIh9ccs6nuzY3jQn9dFqnUakOjnEuMPJJJnI= +github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/search/indexing.go b/search/indexing.go index bb89de3cf..ede0de815 100644 --- a/search/indexing.go +++ b/search/indexing.go @@ -5,6 +5,8 @@ import ( "context" "encoding/json" "fmt" + "regexp" + "strings" "time" bsky "github.com/bluesky-social/indigo/api/bsky" @@ -17,7 +19,7 @@ import ( func (s *Server) deletePost(ctx context.Context, u *User, path string) error { log.Infof("deleting post: %s", path) req := esapi.DeleteRequest{ - Index: "posts", + Index: s.postIndex, DocumentID: encodeDocumentID(u.ID, path), Refresh: "true", } @@ -32,10 +34,20 @@ func (s *Server) deletePost(ctx context.Context, u *User, path string) error { return nil } -func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid string, pcid cid.Cid) error { +func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, path string, pcid cid.Cid) error { + + parts := strings.SplitN(path, "/", 3) + var tidRegex = regexp.MustCompile(`^[234567abcdefghijklmnopqrstuvwxyz]{13}$`) + if len(parts) != 2 || !tidRegex.MatchString(parts[1]) { + log.Warnf("Skipping post record with weird path/TID did=%s path=%s", u.Did, path) + return nil + } + rkey := parts[1] + + // TODO: just skip this part? if err := s.db.Create(&PostRef{ Cid: pcid.String(), - Tid: tid, + Tid: rkey, Uid: u.ID, }).Error; err != nil { return err @@ -44,10 +56,10 @@ func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid // TODO: is this needed? what happens if we try to index w/ invalid timestamp? _, err := time.Parse(util.ISO8601, rec.CreatedAt) if err != nil { - return fmt.Errorf("post (%d, %s) had invalid timestamp (%q): %w", u.ID, tid, rec.CreatedAt, err) + return fmt.Errorf("post (%d, %s) had invalid timestamp (%q): %w", u.ID, rkey, rec.CreatedAt, err) } - doc := TransformPost(rec, u, tid, pcid.String()) + doc := TransformPost(rec, u, rkey, pcid.String()) b, err := json.Marshal(doc) if err != nil { return err @@ -55,7 +67,7 @@ func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid log.Infof("Indexing post") req := esapi.IndexRequest{ - Index: "posts", + Index: s.postIndex, DocumentID: doc.DocId(), Body: bytes.NewReader(b), Refresh: "true", @@ -71,10 +83,11 @@ func (s *Server) indexPost(ctx context.Context, u *User, rec *bsky.FeedPost, tid return nil } -func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfile, rkey string, pcid cid.Cid) error { +func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfile, path string, pcid cid.Cid) error { - if rkey != "self" { - log.Warnf("Skipping non-canonical profile record did=%s rkey=%s", u.Did, rkey) + parts := strings.SplitN(path, "/", 3) + if len(parts) != 2 || parts[1] != "self" { + log.Warnf("Skipping non-canonical profile record did=%s path=%s", u.Did, path) return nil } @@ -90,7 +103,7 @@ func (s *Server) indexProfile(ctx context.Context, u *User, rec *bsky.ActorProfi return err } req := esapi.IndexRequest{ - Index: "profiles", + Index: s.profileIndex, DocumentID: fmt.Sprint(u.ID), Body: bytes.NewReader(b), Refresh: "true", @@ -131,7 +144,7 @@ func (s *Server) updateUserHandle(ctx context.Context, did, handle string) error } req := esapi.UpdateRequest{ - Index: "profiles", + Index: s.profileIndex, DocumentID: fmt.Sprint(u.ID), Body: bytes.NewReader(b), Refresh: "true", diff --git a/search/server.go b/search/server.go index bdcbcd126..38853bc6a 100644 --- a/search/server.go +++ b/search/server.go @@ -36,13 +36,15 @@ import ( var log = logging.Logger("search") type Server struct { - escli *es.Client - db *gorm.DB - bgshost string - xrpcc *xrpc.Client - bgsxrpc *xrpc.Client - plc *api.PLCServer - echo *echo.Echo + escli *es.Client + postIndex string + profileIndex string + db *gorm.DB + bgshost string + xrpcc *xrpc.Client + bgsxrpc *xrpc.Client + plc *api.PLCServer + echo *echo.Echo userCache *lru.Cache } @@ -66,7 +68,7 @@ type LastSeq struct { Seq int64 } -func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) (*Server, error) { +func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost, profileIndex, postIndex string) (*Server, error) { log.Info("Migrating database") db.AutoMigrate(&PostRef{}) @@ -94,13 +96,15 @@ func NewServer(db *gorm.DB, escli *es.Client, plcHost, pdsHost, bgsHost string) ucache, _ := lru.New(100000) s := &Server{ - escli: escli, - db: db, - bgshost: bgsHost, - xrpcc: xc, - bgsxrpc: bgsxrpc, - plc: plc, - userCache: ucache, + escli: escli, + profileIndex: profileIndex, + postIndex: postIndex, + db: db, + bgshost: bgsHost, + xrpcc: xc, + bgsxrpc: bgsxrpc, + plc: plc, + userCache: ucache, } return s, nil } @@ -157,6 +161,10 @@ func (s *Server) RunIndexer(ctx context.Context) error { } for _, op := range evt.Ops { + // filter out all records other than "post" and "profile" + if !(strings.HasPrefix(op.Path, "app.bsky.feed.post/") || strings.HasPrefix(op.Path, "app.bsky.actor.profile/")) { + continue + } ek := repomgr.EventKind(op.Action) switch ek { case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: @@ -469,6 +477,7 @@ func (s *Server) RunAPI(listen string) error { e.GET("/_health", s.handleHealthCheck) e.GET("/search/posts", s.handleSearchRequestPosts) e.GET("/search/profiles", s.handleSearchRequestProfiles) + // XXX: e.GET("/search/profiles-typeahead", s.handleSearchRequestProfiles) s.echo = e log.Infof("starting search API daemon at: %s", listen)