Skip to content

Commit

Permalink
decoupled webserver
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Aug 2, 2024
1 parent db297b8 commit 322c80b
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 30 deletions.
21 changes: 20 additions & 1 deletion cmd/service/bidcollect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package service
import (
"github.com/flashbots/relayscan/common"
"github.com/flashbots/relayscan/services/bidcollect"
"github.com/flashbots/relayscan/services/bidcollect/webserver"
"github.com/flashbots/relayscan/services/bidcollect/website"
"github.com/flashbots/relayscan/vars"
"github.com/lithammer/shortuuid"
Expand All @@ -23,6 +24,7 @@ var (
outputTSV bool // by default: CSV, but can be changed to TSV with this setting
uid string // used in output filenames, to avoid collissions between multiple collector instances

useRedis bool
redisAddr string

runDevServerOnly bool // used to play with file listing website
Expand All @@ -31,6 +33,9 @@ var (
buildWebsite bool
buildWebsiteUpload bool
buildWebsiteOutDir string

runWebserverOnly bool // provides a SSE stream of new bids
WebserverListenAddr string
)

func init() {
Expand All @@ -50,7 +55,12 @@ func init() {
bidCollectCmd.Flags().StringVar(&uid, "uid", "", "unique identifier for output files (to avoid collisions)")

// Redis for pushing bids to
bidCollectCmd.Flags().StringVar(&redisAddr, "redis", "", "Redis address for publishing bids (optional)")
bidCollectCmd.Flags().BoolVar(&useRedis, "redis", false, "Publish bids to Redis")
bidCollectCmd.Flags().StringVar(&redisAddr, "redis-addr", "localhost:6379", "Redis address for publishing bids (optional)")

// Webserver mode
bidCollectCmd.Flags().BoolVar(&runWebserverOnly, "webserver", false, "only run webserver for SSE stream")
bidCollectCmd.Flags().StringVar(&WebserverListenAddr, "webserver-addr", "localhost:8080", "listen address for webserver")

// devserver provides the file listing for playing with file HTML
bidCollectCmd.Flags().BoolVar(&runDevServerOnly, "devserver", false, "only run devserver to play with file listing website")
Expand All @@ -66,6 +76,15 @@ var bidCollectCmd = &cobra.Command{
Use: "bidcollect",
Short: "Collect bids",
Run: func(cmd *cobra.Command, args []string) {
if runWebserverOnly {
srv := webserver.New(&webserver.HTTPServerConfig{
ListenAddr: WebserverListenAddr,
RedisAddr: redisAddr,
Log: log,
})
srv.MustStart()
return
}
if runDevServerOnly {
log.Infof("Bidcollect %s devserver starting on %s ...", vars.Version, devServerListenAddr)
fileListingDevServer()
Expand Down
22 changes: 18 additions & 4 deletions docs/2024-06_bidcollect.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ Different data sources have different limitations:
- Bids are deduplicated based on this key:
- `fmt.Sprintf("%d-%s-%s-%s-%s", bid.Slot, bid.BlockHash, bid.ParentHash, bid.BuilderPubkey, bid.Value)`
- this means only the first bid for a given key is stored, even if - for instance - other relays also deliver the same bid
- Bids can be published to Redis (to be consumed by whatever, i.e. a webserver). The channel is called `bidcollect/bids`.
- Enable publishing to Redis with the `--redis` flag
- You can start a webserver that publishes the data via a SSE stream with `--webserver`

---

Expand All @@ -89,16 +92,27 @@ go run . service bidcollect --data-api --ultrasound-stream --all-relays
go run . service bidcollect --get-header --beacon-uri http://localhost:3500 --all-relays
```

To enable the SSE server, first run Redis:
Publish new bids to Redis:

```
```bash
# Start Redis
docker run --name redis -d -p 6379:6379 redis

# Start the collector with the `--redis <addr>` flag:
go run . service bidcollect --data-api --ultrasound-stream --redis

# Subscribe to the `bidcollect/bids` channel
redis-cli SUBSCRIBE bidcollect/bids
```

Then start the collector with the `--redis <addr>` flag:
SSE stream of bids via the built-in webserver:

```bash
go run . service bidcollect --data-api --ultrasound-stream --redis localhost:6379
# Start the webserver in another process to subscribe to Redis and publish bids as SSE stream:
go run . service bidcollect --webserver

# Check if it works by subscribing with curl
curl localhost:8080/v1/sse/bids
```

---
Expand Down
2 changes: 1 addition & 1 deletion services/bidcollect/bid-processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) {

// Send to Redis
if c.redisClient != nil {
err := c.redisClient.Publish(context.Background(), "bidcollect/bids", bid.ToCSVLine(",")).Err()
err := c.redisClient.Publish(context.Background(), types.RedisChannel, bid.ToCSVLine(",")).Err()
if err != nil {
c.log.WithError(err).Error("failed to publish bid to redis")
}
Expand Down
2 changes: 2 additions & 0 deletions services/bidcollect/types/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (

// channel size for bid collector inputs
BidCollectorInputChannelSize = 1000

RedisChannel = "bidcollect/bids"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion services/bidcollect/webserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (srv *Server) handleSSESubscription(w http.ResponseWriter, r *http.Request)
srv.addSubscriber(&subscriber)

// Send CSV header
helloMsg := strings.Join(types.CommonBidCSVFields, "\t") + "\n"
helloMsg := strings.Join(types.CommonBidCSVFields, ",") + "\n"
fmt.Fprint(w, helloMsg)
w.(http.Flusher).Flush() //nolint:forcetypeassert

Expand Down
62 changes: 39 additions & 23 deletions services/bidcollect/webserver/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,15 @@ import (

"github.com/flashbots/relayscan/services/bidcollect/types"
"github.com/go-chi/chi/v5"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
"go.uber.org/atomic"
)

type HTTPServerConfig struct {
ListenAddr string
Log *logrus.Entry

ReadTimeout time.Duration
WriteTimeout time.Duration
RedisAddr string
}

type Server struct {
Expand All @@ -30,6 +29,8 @@ type Server struct {
srv *http.Server
sseConnectionMap map[string]*SSESubscription
sseConnectionLock sync.RWMutex

redisClient *redis.Client
}

func New(cfg *HTTPServerConfig) (srv *Server) {
Expand All @@ -45,30 +46,47 @@ func New(cfg *HTTPServerConfig) (srv *Server) {
router.Get("/v1/sse/bids", srv.handleSSESubscription)

srv.srv = &http.Server{
Addr: cfg.ListenAddr,
Handler: router,
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
Addr: cfg.ListenAddr,
Handler: router,
ReadHeaderTimeout: 1 * time.Second,
}

return srv
}

func (srv *Server) RunInBackground() {
go func() {
srv.log.WithField("listenAddress", srv.cfg.ListenAddr).Info("Starting HTTP server")
if err := srv.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
srv.log.WithField("err", err).Error("HTTP server failed")
}
}()
func (srv *Server) MustStart() {
go srv.MustSubscribeToRedis()

srv.log.WithField("listenAddress", srv.cfg.ListenAddr).Info("Starting HTTP server")
if err := srv.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
srv.log.WithField("err", err).Error("HTTP server failed")
}
}

func (srv *Server) Shutdown() {
// api
if err := srv.srv.Shutdown(context.Background()); err != nil {
srv.log.WithField("err", err).Error("Graceful HTTP server shutdown failed")
} else {
srv.log.Info("HTTP server gracefully stopped")
func (srv *Server) MustSubscribeToRedis() {
if srv.cfg.RedisAddr == "" {
srv.log.Fatal("Redis address is required")
}

srv.log.Info("Subscribing to Redis...")
srv.redisClient = redis.NewClient(&redis.Options{
Addr: srv.cfg.RedisAddr,
Password: "", // no password set
DB: 0, // use default DB
})

// Make sure we can connect to redis to connect to redis
if _, err := srv.redisClient.Ping(context.Background()).Result(); err != nil {
srv.log.WithError(err).Fatal("failed to ping redis")
}

pubsub := srv.redisClient.Subscribe(context.Background(), types.RedisChannel)
ch := pubsub.Channel()
srv.log.Info("Subscribed to Redis")

for msg := range ch {
// fmt.Println(msg.Channel, msg.Payload)
srv.SendToSubscribers(msg.Payload)
}
}

Expand All @@ -85,15 +103,13 @@ func (srv *Server) removeSubscriber(sub *SSESubscription) {
srv.log.WithField("subscribers", len(srv.sseConnectionMap)).Info("removed subscriber")
}

func (srv *Server) SendBid(bid *types.CommonBid) {
func (srv *Server) SendToSubscribers(msg string) {
srv.sseConnectionLock.RLock()
defer srv.sseConnectionLock.RUnlock()
if len(srv.sseConnectionMap) == 0 {
return
}

msg := bid.ToCSVLine("\t")

// Send tx to all subscribers (only if channel is not full)
for _, sub := range srv.sseConnectionMap {
select {
Expand Down

0 comments on commit 322c80b

Please sign in to comment.