diff --git a/cmd/service/bidcollect.go b/cmd/service/bidcollect.go index c5897a8..41d7b1c 100644 --- a/cmd/service/bidcollect.go +++ b/cmd/service/bidcollect.go @@ -7,6 +7,7 @@ package service import ( "github.com/flashbots/relayscan/common" "github.com/flashbots/relayscan/services/bidcollect" + "github.com/flashbots/relayscan/services/bidcollect/webserver" "github.com/flashbots/relayscan/services/bidcollect/website" "github.com/flashbots/relayscan/vars" "github.com/lithammer/shortuuid" @@ -23,6 +24,7 @@ var ( outputTSV bool // by default: CSV, but can be changed to TSV with this setting uid string // used in output filenames, to avoid collissions between multiple collector instances + useRedis bool redisAddr string runDevServerOnly bool // used to play with file listing website @@ -31,6 +33,9 @@ var ( buildWebsite bool buildWebsiteUpload bool buildWebsiteOutDir string + + runWebserverOnly bool // provides a SSE stream of new bids + WebserverListenAddr string ) func init() { @@ -50,7 +55,12 @@ func init() { bidCollectCmd.Flags().StringVar(&uid, "uid", "", "unique identifier for output files (to avoid collisions)") // Redis for pushing bids to - bidCollectCmd.Flags().StringVar(&redisAddr, "redis", "", "Redis address for publishing bids (optional)") + bidCollectCmd.Flags().BoolVar(&useRedis, "redis", false, "Publish bids to Redis") + bidCollectCmd.Flags().StringVar(&redisAddr, "redis-addr", "localhost:6379", "Redis address for publishing bids (optional)") + + // Webserver mode + bidCollectCmd.Flags().BoolVar(&runWebserverOnly, "webserver", false, "only run webserver for SSE stream") + bidCollectCmd.Flags().StringVar(&WebserverListenAddr, "webserver-addr", "localhost:8080", "listen address for webserver") // devserver provides the file listing for playing with file HTML bidCollectCmd.Flags().BoolVar(&runDevServerOnly, "devserver", false, "only run devserver to play with file listing website") @@ -66,6 +76,15 @@ var bidCollectCmd = &cobra.Command{ Use: "bidcollect", Short: "Collect bids", Run: func(cmd *cobra.Command, args []string) { + if runWebserverOnly { + srv := webserver.New(&webserver.HTTPServerConfig{ + ListenAddr: WebserverListenAddr, + RedisAddr: redisAddr, + Log: log, + }) + srv.MustStart() + return + } if runDevServerOnly { log.Infof("Bidcollect %s devserver starting on %s ...", vars.Version, devServerListenAddr) fileListingDevServer() diff --git a/docs/2024-06_bidcollect.md b/docs/2024-06_bidcollect.md index f76597e..bd43462 100644 --- a/docs/2024-06_bidcollect.md +++ b/docs/2024-06_bidcollect.md @@ -74,6 +74,9 @@ Different data sources have different limitations: - Bids are deduplicated based on this key: - `fmt.Sprintf("%d-%s-%s-%s-%s", bid.Slot, bid.BlockHash, bid.ParentHash, bid.BuilderPubkey, bid.Value)` - this means only the first bid for a given key is stored, even if - for instance - other relays also deliver the same bid +- Bids can be published to Redis (to be consumed by whatever, i.e. a webserver). The channel is called `bidcollect/bids`. + - Enable publishing to Redis with the `--redis` flag + - You can start a webserver that publishes the data via a SSE stream with `--webserver` --- @@ -89,16 +92,27 @@ go run . service bidcollect --data-api --ultrasound-stream --all-relays go run . service bidcollect --get-header --beacon-uri http://localhost:3500 --all-relays ``` -To enable the SSE server, first run Redis: +Publish new bids to Redis: -``` +```bash +# Start Redis docker run --name redis -d -p 6379:6379 redis + +# Start the collector with the `--redis ` flag: +go run . service bidcollect --data-api --ultrasound-stream --redis + +# Subscribe to the `bidcollect/bids` channel +redis-cli SUBSCRIBE bidcollect/bids ``` -Then start the collector with the `--redis ` flag: +SSE stream of bids via the built-in webserver: ```bash -go run . service bidcollect --data-api --ultrasound-stream --redis localhost:6379 +# Start the webserver in another process to subscribe to Redis and publish bids as SSE stream: +go run . service bidcollect --webserver + +# Check if it works by subscribing with curl +curl localhost:8080/v1/sse/bids ``` --- diff --git a/services/bidcollect/bid-processor.go b/services/bidcollect/bid-processor.go index 00d3337..9093d37 100644 --- a/services/bidcollect/bid-processor.go +++ b/services/bidcollect/bid-processor.go @@ -122,7 +122,7 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) { // Send to Redis if c.redisClient != nil { - err := c.redisClient.Publish(context.Background(), "bidcollect/bids", bid.ToCSVLine(",")).Err() + err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err() if err != nil { c.log.WithError(err).Error("failed to publish bid to redis") } diff --git a/services/bidcollect/types/consts.go b/services/bidcollect/types/consts.go index d2aa63b..8fb3989 100644 --- a/services/bidcollect/types/consts.go +++ b/services/bidcollect/types/consts.go @@ -14,6 +14,8 @@ const ( // channel size for bid collector inputs BidCollectorInputChannelSize = 1000 + + RedisChannel = "bidcollect/bids" ) var ( diff --git a/services/bidcollect/webserver/handler.go b/services/bidcollect/webserver/handler.go index 34aef46..b921a44 100644 --- a/services/bidcollect/webserver/handler.go +++ b/services/bidcollect/webserver/handler.go @@ -33,7 +33,7 @@ func (srv *Server) handleSSESubscription(w http.ResponseWriter, r *http.Request) srv.addSubscriber(&subscriber) // Send CSV header - helloMsg := strings.Join(types.CommonBidCSVFields, "\t") + "\n" + helloMsg := strings.Join(types.CommonBidCSVFields, ",") + "\n" fmt.Fprint(w, helloMsg) w.(http.Flusher).Flush() //nolint:forcetypeassert diff --git a/services/bidcollect/webserver/webserver.go b/services/bidcollect/webserver/webserver.go index 50de53c..c9f6473 100644 --- a/services/bidcollect/webserver/webserver.go +++ b/services/bidcollect/webserver/webserver.go @@ -10,6 +10,7 @@ import ( "github.com/flashbots/relayscan/services/bidcollect/types" "github.com/go-chi/chi/v5" + "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" "go.uber.org/atomic" ) @@ -17,9 +18,7 @@ import ( type HTTPServerConfig struct { ListenAddr string Log *logrus.Entry - - ReadTimeout time.Duration - WriteTimeout time.Duration + RedisAddr string } type Server struct { @@ -30,6 +29,8 @@ type Server struct { srv *http.Server sseConnectionMap map[string]*SSESubscription sseConnectionLock sync.RWMutex + + redisClient *redis.Client } func New(cfg *HTTPServerConfig) (srv *Server) { @@ -45,30 +46,47 @@ func New(cfg *HTTPServerConfig) (srv *Server) { router.Get("/v1/sse/bids", srv.handleSSESubscription) srv.srv = &http.Server{ - Addr: cfg.ListenAddr, - Handler: router, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, + Addr: cfg.ListenAddr, + Handler: router, + ReadHeaderTimeout: 1 * time.Second, } return srv } -func (srv *Server) RunInBackground() { - go func() { - srv.log.WithField("listenAddress", srv.cfg.ListenAddr).Info("Starting HTTP server") - if err := srv.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - srv.log.WithField("err", err).Error("HTTP server failed") - } - }() +func (srv *Server) MustStart() { + go srv.MustSubscribeToRedis() + + srv.log.WithField("listenAddress", srv.cfg.ListenAddr).Info("Starting HTTP server") + if err := srv.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + srv.log.WithField("err", err).Error("HTTP server failed") + } } -func (srv *Server) Shutdown() { - // api - if err := srv.srv.Shutdown(context.Background()); err != nil { - srv.log.WithField("err", err).Error("Graceful HTTP server shutdown failed") - } else { - srv.log.Info("HTTP server gracefully stopped") +func (srv *Server) MustSubscribeToRedis() { + if srv.cfg.RedisAddr == "" { + srv.log.Fatal("Redis address is required") + } + + srv.log.Info("Subscribing to Redis...") + srv.redisClient = redis.NewClient(&redis.Options{ + Addr: srv.cfg.RedisAddr, + Password: "", // no password set + DB: 0, // use default DB + }) + + // Make sure we can connect to redis to connect to redis + if _, err := srv.redisClient.Ping(context.Background()).Result(); err != nil { + srv.log.WithError(err).Fatal("failed to ping redis") + } + + pubsub := srv.redisClient.Subscribe(context.Background(), types.RedisChannel) + ch := pubsub.Channel() + srv.log.Info("Subscribed to Redis") + + for msg := range ch { + // fmt.Println(msg.Channel, msg.Payload) + srv.SendToSubscribers(msg.Payload) } } @@ -85,15 +103,13 @@ func (srv *Server) removeSubscriber(sub *SSESubscription) { srv.log.WithField("subscribers", len(srv.sseConnectionMap)).Info("removed subscriber") } -func (srv *Server) SendBid(bid *types.CommonBid) { +func (srv *Server) SendToSubscribers(msg string) { srv.sseConnectionLock.RLock() defer srv.sseConnectionLock.RUnlock() if len(srv.sseConnectionMap) == 0 { return } - msg := bid.ToCSVLine("\t") - // Send tx to all subscribers (only if channel is not full) for _, sub := range srv.sseConnectionMap { select {