Skip to content

Commit

Permalink
push bids to redis
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Aug 2, 2024
1 parent fb1e374 commit db297b8
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 35 deletions.
13 changes: 8 additions & 5 deletions cmd/service/bidcollect.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
outputTSV bool // by default: CSV, but can be changed to TSV with this setting
uid string // used in output filenames, to avoid collissions between multiple collector instances

webserverListenAddr string
redisAddr string

runDevServerOnly bool // used to play with file listing website
devServerListenAddr string
Expand All @@ -49,8 +49,8 @@ func init() {
// utils
bidCollectCmd.Flags().StringVar(&uid, "uid", "", "unique identifier for output files (to avoid collisions)")

// webserver for SSE
bidCollectCmd.Flags().StringVar(&webserverListenAddr, "webserver-addr", "", "listen address for SSE subscription webserver")
// Redis for pushing bids to
bidCollectCmd.Flags().StringVar(&redisAddr, "redis", "", "Redis address for publishing bids (optional)")

// devserver provides the file listing for playing with file HTML
bidCollectCmd.Flags().BoolVar(&runDevServerOnly, "devserver", false, "only run devserver to play with file listing website")
Expand Down Expand Up @@ -108,10 +108,13 @@ var bidCollectCmd = &cobra.Command{
BeaconNodeURI: beaconNodeURI,
OutDir: outDir,
OutputTSV: outputTSV,
WebserverAddr: webserverListenAddr,
RedisAddr: redisAddr,
}

bidCollector := bidcollect.NewBidCollector(&opts)
bidCollector, err := bidcollect.NewBidCollector(&opts)
if err != nil {
log.WithError(err).Fatal("failed to create bid collector")
}
bidCollector.MustStart()
},
}
Expand Down
12 changes: 12 additions & 0 deletions docs/2024-06_bidcollect.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ go run . service bidcollect --data-api --ultrasound-stream --all-relays
go run . service bidcollect --get-header --beacon-uri http://localhost:3500 --all-relays
```

To enable the SSE server, first run Redis:

```
docker run --name redis -d -p 6379:6379 redis
```

Then start the collector with the `--redis <addr>` flag:

```bash
go run . service bidcollect --data-api --ultrasound-stream --redis localhost:6379
```

---

## Useful Clickhouse queries
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/gballet/go-verkle v0.1.1-0.20231031103413-a67434b50f46 // indirect
Expand Down Expand Up @@ -85,6 +86,7 @@ require (
github.com/prometheus/procfs v0.9.0 // indirect
github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 // indirect
github.com/r3labs/sse/v2 v2.10.0 // indirect
github.com/redis/go-redis/v9 v9.6.1 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etly
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down Expand Up @@ -353,6 +355,8 @@ github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 h1:0tVE4
github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7/go.mod h1:wmuf/mdK4VMD+jA9ThwcUKjg3a2XWM9cVfFYjDyY4j4=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
Expand Down
46 changes: 26 additions & 20 deletions services/bidcollect/bid-processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bidcollect

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -10,7 +11,7 @@ import (

"github.com/flashbots/relayscan/common"
"github.com/flashbots/relayscan/services/bidcollect/types"
"github.com/flashbots/relayscan/services/bidcollect/webserver"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
)

Expand All @@ -21,11 +22,11 @@ import (
// - One CSV for top bids only

type BidProcessorOpts struct {
Log *logrus.Entry
UID string
OutDir string
OutputTSV bool
WebserverAddr string
Log *logrus.Entry
UID string
OutDir string
OutputTSV bool
RedisAddr string
}

type OutFiles struct {
Expand All @@ -47,10 +48,10 @@ type BidProcessor struct {
csvSeparator string
csvFileEnding string

webserver *webserver.Server
redisClient *redis.Client
}

func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor {
func NewBidProcessor(opts *BidProcessorOpts) (*BidProcessor, error) {
c := &BidProcessor{
log: opts.Log,
opts: opts,
Expand All @@ -67,20 +68,22 @@ func NewBidProcessor(opts *BidProcessorOpts) *BidProcessor {
c.csvFileEnding = "csv"
}

if opts.WebserverAddr != "" {
c.webserver = webserver.New(&webserver.HTTPServerConfig{
ListenAddr: opts.WebserverAddr,
Log: opts.Log,
if opts.RedisAddr != "" {
c.redisClient = redis.NewClient(&redis.Options{
Addr: opts.RedisAddr,
Password: "", // no password set
DB: 0, // use default DB
})

// Make sure we can connect to redis to connect to redis
if _, err := c.redisClient.Ping(context.Background()).Result(); err != nil {
return nil, err
}
}
return c
return c, nil
}

func (c *BidProcessor) Start() {
if c.webserver != nil {
c.webserver.RunInBackground()
}

for {
time.Sleep(30 * time.Second)
c.housekeeping()
Expand Down Expand Up @@ -117,9 +120,12 @@ func (c *BidProcessor) processBids(bids []*types.CommonBid) {
isNewBid = true
}

// Send to subscribers
if c.webserver != nil {
c.webserver.SendBid(bid)
// Send to Redis
if c.redisClient != nil {
err := c.redisClient.Publish(context.Background(), "bidcollect/bids", bid.ToCSVLine(",")).Err()
if err != nil {
c.log.WithError(err).Error("failed to publish bid to redis")
}
}

// Write to CSV
Expand Down
20 changes: 10 additions & 10 deletions services/bidcollect/bidcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type BidCollectorOpts struct {
OutDir string
OutputTSV bool

WebserverAddr string
RedisAddr string
}

type BidCollector struct {
Expand All @@ -35,8 +35,8 @@ type BidCollector struct {
processor *BidProcessor
}

func NewBidCollector(opts *BidCollectorOpts) *BidCollector {
c := &BidCollector{
func NewBidCollector(opts *BidCollectorOpts) (c *BidCollector, err error) {
c = &BidCollector{
log: opts.Log,
opts: opts,
}
Expand All @@ -51,14 +51,14 @@ func NewBidCollector(opts *BidCollectorOpts) *BidCollector {
c.getHeaderBidC = make(chan GetHeaderPollerBidsMsg, types.BidCollectorInputChannelSize)

// output
c.processor = NewBidProcessor(&BidProcessorOpts{
Log: opts.Log,
UID: opts.UID,
OutDir: opts.OutDir,
OutputTSV: opts.OutputTSV,
WebserverAddr: opts.WebserverAddr,
c.processor, err = NewBidProcessor(&BidProcessorOpts{
Log: opts.Log,
UID: opts.UID,
OutDir: opts.OutDir,
OutputTSV: opts.OutputTSV,
RedisAddr: opts.RedisAddr,
})
return c
return c, err
}

func (c *BidCollector) MustStart() {
Expand Down

0 comments on commit db297b8

Please sign in to comment.