Skip to content

Commit

Permalink
Handle requestCrawl fanout in Rainbow
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Nov 22, 2024
1 parent 848d9bd commit f8621de
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 39 deletions.
22 changes: 13 additions & 9 deletions cmd/rainbow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,25 @@ package main

import (
"context"
"github.com/bluesky-social/indigo/events"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"time"

"github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/splitter"
"github.com/carlmjohnson/versioninfo"
_ "go.uber.org/automaxprocs"

_ "net/http/pprof"

_ "github.com/joho/godotenv/autoload"

logging "github.com/ipfs/go-log"
_ "github.com/joho/godotenv/autoload"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
_ "go.uber.org/automaxprocs"
)

var log = logging.Logger("splitter")
Expand Down Expand Up @@ -84,6 +81,11 @@ func run(args []string) {
Usage: "max bytes target for event cache, 0 to disable size target trimming",
EnvVars: []string{"SPLITTER_PERSIST_BYTES"},
},
&cli.StringSliceFlag{
Name: "next-crawler",
Usage: "forward POST requestCrawl to this url, should be machine root url and not xrpc/requestCrawl, comma separated list",
EnvVars: []string{"RELAY_NEXT_CRAWLER"},
},
}

app.Action = Splitter
Expand Down Expand Up @@ -135,6 +137,8 @@ func Splitter(cctx *cli.Context) error {

persistPath := cctx.String("persist-db")
upstreamHost := cctx.String("splitter-host")
nextCrawlers := cctx.StringSlice("next-crawler")

var spl *splitter.Splitter
var err error
if persistPath != "" {
Expand All @@ -150,14 +154,14 @@ func Splitter(cctx *cli.Context) error {
CursorFile: cctx.String("cursor-file"),
PebbleOptions: &ppopts,
}
spl, err = splitter.NewSplitter(conf)
spl, err = splitter.NewSplitter(conf, nextCrawlers)
} else {
log.Info("building in-memory splitter")
conf := splitter.SplitterConfig{
UpstreamHost: upstreamHost,
CursorFile: cctx.String("cursor-file"),
}
spl, err = splitter.NewSplitter(conf)
spl, err = splitter.NewSplitter(conf, nextCrawlers)
}
if err != nil {
log.Fatalw("failed to create splitter", "path", persistPath, "error", err)
Expand Down
5 changes: 5 additions & 0 deletions indexer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ var catchupEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "indexer_catchup_events_processed",
Help: "Number of catchup events processed",
})

var usersAddedToCatchupQueue = promauto.NewCounter(prometheus.CounterOpts{
Name: "indexer_users_added_to_catchup_queue",
Help: "Number of users added to catchup queue",
})
148 changes: 118 additions & 30 deletions splitter/splitter.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
package splitter

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/bluesky-social/indigo/api/atproto"
comatproto "github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/bgs"
events "github.com/bluesky-social/indigo/events"
"github.com/bluesky-social/indigo/events/schedulers/sequential"
"github.com/bluesky-social/indigo/util"
"github.com/bluesky-social/indigo/xrpc"
"github.com/gorilla/websocket"
logging "github.com/ipfs/go-log"
"github.com/labstack/echo/v4"
Expand All @@ -39,6 +46,9 @@ type Splitter struct {
consumers map[uint64]*SocketConsumer

conf SplitterConfig

httpC *http.Client
nextCrawlers []*url.URL
}

type SplitterConfig struct {
Expand All @@ -47,49 +57,41 @@ type SplitterConfig struct {
PebbleOptions *events.PebblePersistOptions
}

func NewMemSplitter(host string) *Splitter {
conf := SplitterConfig{
UpstreamHost: host,
CursorFile: "cursor-file",
func NewSplitter(conf SplitterConfig, nextCrawlers []string) (*Splitter, error) {
var nextCrawlerURLs []*url.URL
if len(nextCrawlers) > 0 {
nextCrawlerURLs = make([]*url.URL, len(nextCrawlers))
for i, tu := range nextCrawlers {
var err error
nextCrawlerURLs[i], err = url.Parse(tu)
if err != nil {
return nil, fmt.Errorf("failed to parse next-crawler url: %w", err)
}
log.Infow("configuring relay for requestCrawl", "host", nextCrawlerURLs[i])
}
}

erb := NewEventRingBuffer(20_000, 10_000)

em := events.NewEventManager(erb)
return &Splitter{
conf: conf,
erb: erb,
events: em,
consumers: make(map[uint64]*SocketConsumer),
s := &Splitter{
conf: conf,
consumers: make(map[uint64]*SocketConsumer),
httpC: util.RobustHTTPClient(),
nextCrawlers: nextCrawlerURLs,
}
}
func NewSplitter(conf SplitterConfig) (*Splitter, error) {

if conf.PebbleOptions == nil {
// mem splitter
erb := NewEventRingBuffer(20_000, 10_000)

em := events.NewEventManager(erb)
return &Splitter{
conf: conf,
erb: erb,
events: em,
consumers: make(map[uint64]*SocketConsumer),
}, nil
s.events = events.NewEventManager(erb)
} else {
pp, err := events.NewPebblePersistance(conf.PebbleOptions)
if err != nil {
return nil, err
}

go pp.GCThread(context.Background())
em := events.NewEventManager(pp)
return &Splitter{
conf: conf,
pp: pp,
events: em,
consumers: make(map[uint64]*SocketConsumer),
}, nil
s.events = events.NewEventManager(pp)
}

return s, nil
}
func NewDiskSplitter(host, path string, persistHours float64, maxBytes int64) (*Splitter, error) {
ppopts := events.PebblePersistOptions{
Expand Down Expand Up @@ -198,6 +200,7 @@ func (s *Splitter) StartWithListener(listen net.Listener) error {

// TODO: this API is temporary until we formalize what we want here

e.GET("/xrpc/com.atproto.sync.requestCrawl", s.RequestCrawlHandler)
e.GET("/xrpc/com.atproto.sync.subscribeRepos", s.EventsHandler)
e.GET("/xrpc/_health", s.HandleHealthCheck)

Expand All @@ -218,6 +221,91 @@ func (s *Splitter) HandleHealthCheck(c echo.Context) error {
return c.JSON(200, HealthStatus{Status: "ok"})
}

type XRPCError struct {
Message string `json:"message"`
}

func (s *Splitter) RequestCrawlHandler(c echo.Context) error {
ctx := c.Request().Context()
var body comatproto.SyncRequestCrawl_Input
if err := c.Bind(&body); err != nil {
return c.JSON(http.StatusBadRequest, XRPCError{Message: fmt.Sprintf("invalid body: %s", err)})
}

host := body.Hostname
if host == "" {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname")
}

if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
host = "https://" + host
}

u, err := url.Parse(host)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "failed to parse hostname")
}

if u.Scheme == "http" {
return echo.NewHTTPError(http.StatusBadRequest, "this server requires https")
}
if u.Path != "" {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without path")
}

if u.Query().Encode() != "" {
return echo.NewHTTPError(http.StatusBadRequest, "must pass hostname without query")
}

host = u.Host // potentially hostname:port

clientHost := fmt.Sprintf("%s://%s", u.Scheme, host)

xrpcC := &xrpc.Client{
Host: clientHost,
Client: http.DefaultClient, // not using the client that auto-retries
}

desc, err := atproto.ServerDescribeServer(ctx, xrpcC)
if err != nil {
errMsg := fmt.Sprintf("requested host (%s) failed to respond to describe request", clientHost)
return echo.NewHTTPError(http.StatusBadRequest, errMsg)
}

// Maybe we could do something with this response later
_ = desc

if len(s.nextCrawlers) != 0 {
blob, err := json.Marshal(body)
if err != nil {
log.Warnw("could not forward requestCrawl, json err", "err", err)
} else {
go func(bodyBlob []byte) {
for _, remote := range s.nextCrawlers {
if remote == nil {
continue
}

pu := remote.JoinPath("/xrpc/com.atproto.sync.requestCrawl")
response, err := s.httpC.Post(pu.String(), "application/json", bytes.NewReader(bodyBlob))
if response != nil && response.Body != nil {
response.Body.Close()
}
if err != nil || response == nil {
log.Warnw("requestCrawl forward failed", "host", remote, "err", err)
} else if response.StatusCode != http.StatusOK {
log.Warnw("requestCrawl forward failed", "host", remote, "status", response.Status)
} else {
log.Infow("requestCrawl forward successful", "host", remote)
}
}
}(blob)
}
}

return c.JSON(200, HealthStatus{Status: "ok"})
}

func (s *Splitter) EventsHandler(c echo.Context) error {
var since *int64
if sinceVal := c.QueryParam("cursor"); sinceVal != "" {
Expand Down

0 comments on commit f8621de

Please sign in to comment.