Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bidcollect #37

Merged
merged 44 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
60c98bf
start ultrasound bid stream
metachris May 31, 2024
4546fc8
ultrasound stream start
metachris May 31, 2024
e4f92c5
cleanup
metachris May 31, 2024
75349a2
cleanup + start DataApiPoller
metachris May 31, 2024
b991f08
polling delay
metachris May 31, 2024
87d4c90
cleanup
metachris May 31, 2024
0b5adac
use relays
metachris May 31, 2024
f80c6da
actally request data api
metachris May 31, 2024
2827d2f
commonbid
metachris May 31, 2024
63094f1
cleanup
metachris May 31, 2024
345a892
BidProcessor
metachris Jun 1, 2024
4e2d6b6
cleanups
metachris Jun 1, 2024
40ee72e
cleanup
metachris Jun 1, 2024
c788bc7
more cleanup
metachris Jun 1, 2024
6094d84
outdir
metachris Jun 1, 2024
ac090a6
--all-relays
metachris Jun 1, 2024
33db293
cleanup
metachris Jun 1, 2024
550612e
comment
metachris Jun 1, 2024
a6b8575
outdir with date
metachris Jun 1, 2024
93defa8
cleanup
metachris Jun 1, 2024
c676559
simplify a bit
metachris Jun 2, 2024
3a3dcb2
data-api poller: better offset timing
metachris Jun 2, 2024
e515732
bids-combine.sh
metachris Jun 2, 2024
9223b41
tsv file ending
metachris Jun 2, 2024
144aded
cleanup
metachris Jun 2, 2024
d2c33e8
getheader polling
metachris Jun 2, 2024
7882108
normalize csv timestamp_ms, remove csv timestamp because redundant
metachris Jun 2, 2024
01c6ec0
cleanup
metachris Jun 2, 2024
24873c0
getHeader: only call once
metachris Jun 2, 2024
6670a50
notes
metachris Jun 2, 2024
95f806f
getHeader silence common errors
metachris Jun 2, 2024
dc8ed1e
csv/tsv
metachris Jun 2, 2024
7af600e
docs
metachris Jun 3, 2024
4642620
upload script
metachris Jun 5, 2024
bd733c1
docs update
metachris Jun 7, 2024
083b3e8
start website
metachris Jun 7, 2024
79c563b
website foundation
metachris Jun 7, 2024
996e224
more doc cleanup
metachris Jun 7, 2024
469042c
devserver cleanup
metachris Jun 7, 2024
3aca0bf
minor notes
metachris Jun 11, 2024
a27889b
bids-script: delete
metachris Jun 11, 2024
d508488
fix script perms
metachris Jun 11, 2024
d9ec5db
website
metachris Jun 12, 2024
3dcf2e6
cleanup
metachris Jun 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@
/static_dev/
/relayscan
/deploy*
/test.csv
/csv/
/build/
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,14 @@ cover-html:

docker-image:
DOCKER_BUILDKIT=1 docker build --platform linux/amd64 --build-arg VERSION=${VERSION} . -t relayscan


generate-ssz:
rm -f common/ultrasoundbid_encoding.go
sszgen --path common --objs UltrasoundStreamBid

bids-website:
go run . service bidcollect --build-website --build-website-upload

bids-website-dev:
go run . service bidcollect --devserver
114 changes: 114 additions & 0 deletions cmd/service/bidcollect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package service

/**
* https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md
*/

import (
"github.com/flashbots/relayscan/common"
"github.com/flashbots/relayscan/services/bidcollect"
"github.com/flashbots/relayscan/services/bidcollect/website"
"github.com/flashbots/relayscan/vars"
"github.com/spf13/cobra"
)

var (
collectUltrasoundStream bool
collectGetHeader bool
collectDataAPI bool
useAllRelays bool

outDir string
outputTSV bool // by default: CSV, but can be changed to TSV with this setting

runDevServerOnly bool // used to play with file listing website
devServerListenAddr = ":8095"

buildWebsite bool
buildWebsiteUpload bool
buildWebsiteOutDir string
)

func init() {
bidCollectCmd.Flags().BoolVar(&collectUltrasoundStream, "ultrasound-stream", false, "use ultrasound top-bid stream")
bidCollectCmd.Flags().BoolVar(&collectGetHeader, "get-header", false, "use getHeader API")
bidCollectCmd.Flags().BoolVar(&collectDataAPI, "data-api", false, "use data API")
bidCollectCmd.Flags().BoolVar(&useAllRelays, "all-relays", false, "use all relays")

// for getHeader
bidCollectCmd.Flags().StringVar(&beaconNodeURI, "beacon-uri", vars.DefaultBeaconURI, "beacon endpoint")

// for saving to file
bidCollectCmd.Flags().StringVar(&outDir, "out", "csv", "output directory for CSV/TSV")
bidCollectCmd.Flags().BoolVar(&outputTSV, "out-tsv", false, "output as TSV (instead of CSV)")

// for dev purposes
bidCollectCmd.Flags().BoolVar(&runDevServerOnly, "devserver", false, "only run devserver to play with file listing website")

// building the S3 website
bidCollectCmd.Flags().BoolVar(&buildWebsite, "build-website", false, "build file listing website")
bidCollectCmd.Flags().BoolVar(&buildWebsiteUpload, "build-website-upload", false, "upload after building")
bidCollectCmd.Flags().StringVar(&buildWebsiteOutDir, "build-website-out", "build", "output directory for website")
}

var bidCollectCmd = &cobra.Command{
Use: "bidcollect",
Short: "Collect bids",
Run: func(cmd *cobra.Command, args []string) {
if runDevServerOnly {
log.Infof("Bidcollect (%s) devserver starting on %s ...", vars.Version, devServerListenAddr)
fileListingDevServer()
return
}

if buildWebsite {
log.Infof("Bidcollect %s building website (output: %s) ...", vars.Version, buildWebsiteOutDir)
website.BuildProdWebsite(log, buildWebsiteOutDir, buildWebsiteUpload)
return
}

log.Infof("Bidcollect starting (%s) ...", vars.Version)

// Prepare relays
relays := []common.RelayEntry{
common.MustNewRelayEntry(vars.RelayFlashbots, false),
common.MustNewRelayEntry(vars.RelayUltrasound, false),
}
if useAllRelays {
relays = common.MustGetRelays()
}

log.Infof("Using %d relays", len(relays))
for index, relay := range relays {
log.Infof("- relay #%d: %s", index+1, relay.Hostname())
}

opts := bidcollect.BidCollectorOpts{
Log: log,
Relays: relays,
CollectUltrasoundStream: collectUltrasoundStream,
CollectGetHeader: collectGetHeader,
CollectDataAPI: collectDataAPI,
BeaconNodeURI: beaconNodeURI,
OutDir: outDir,
OutputTSV: outputTSV,
}

bidCollector := bidcollect.NewBidCollector(&opts)
bidCollector.MustStart()
},
}

func fileListingDevServer() {
webserver, err := website.NewDevWebserver(&website.DevWebserverOpts{ //nolint:exhaustruct
ListenAddress: devServerListenAddr,
Log: log,
})
if err != nil {
log.Fatal(err)
}
err = webserver.StartServer()
if err != nil {
log.Fatal(err)
}
}
2 changes: 1 addition & 1 deletion cmd/service/collect-live-bids.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func init() {

var liveBidsCmd = &cobra.Command{
Use: "collect-live-bids",
Short: "On every slot, ask for live bids",
Short: "On every slot, ask for live bids (using getHeader)",
Run: func(cmd *cobra.Command, args []string) {
// Connect to Postgres
db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN)
Expand Down
1 change: 1 addition & 0 deletions cmd/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ var ServiceCmd = &cobra.Command{
func init() {
ServiceCmd.AddCommand(websiteCmd)
ServiceCmd.AddCommand(liveBidsCmd)
ServiceCmd.AddCommand(bidCollectCmd)
}
21 changes: 21 additions & 0 deletions common/relayentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ func NewRelayEntry(relayURL string, requireUser bool) (entry RelayEntry, err err
return entry, err
}

func MustNewRelayEntry(relayURL string, requireUser bool) (entry RelayEntry) {
entry, err := NewRelayEntry(relayURL, requireUser)
Check(err)
return entry
}

// RelayEntriesToStrings returns the string representation of a list of relay entries
func RelayEntriesToStrings(relays []RelayEntry) []string {
ret := make([]string, len(relays))
Expand All @@ -61,6 +67,15 @@ func RelayEntriesToStrings(relays []RelayEntry) []string {
return ret
}

// RelayEntriesToHostnameStrings returns the hostnames of a list of relay entries
func RelayEntriesToHostnameStrings(relays []RelayEntry) []string {
ret := make([]string, len(relays))
for i, entry := range relays {
ret[i] = entry.Hostname()
}
return ret
}

func GetRelays() ([]RelayEntry, error) {
var err error
relays := make([]RelayEntry, len(vars.RelayURLs))
Expand All @@ -72,3 +87,9 @@ func GetRelays() ([]RelayEntry, error) {
}
return relays, nil
}

func MustGetRelays() []RelayEntry {
relays, err := GetRelays()
Check(err)
return relays
}
8 changes: 7 additions & 1 deletion common/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ func SendHTTPRequest(ctx context.Context, client http.Client, method, url string
return resp.StatusCode, fmt.Errorf("%w: %d / %s", errHTTPErrorResponse, resp.StatusCode, string(bodyBytes))
}

if dst != nil {
if dst == nil {
// still read the body to reuse http connection (see also https://stackoverflow.com/a/17953506)
_, err = io.Copy(io.Discard, resp.Body)
if err != nil {
return resp.StatusCode, fmt.Errorf("could not read response body: %w", err)
}
} else {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return resp.StatusCode, fmt.Errorf("could not read response body: %w", err)
Expand Down
28 changes: 28 additions & 0 deletions common/ultrasoundbid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package common

import "math/big"

// https://github.com/ultrasoundmoney/docs/blob/main/top-bid-websocket.md

type (
U64 [8]byte
Hash [32]byte
PublicKey [48]byte
Address [20]byte
U256 [32]byte
)

func (n *U256) String() string {
return new(big.Int).SetBytes(ReverseBytes(n[:])).String()
}

type UltrasoundStreamBid struct {
Timestamp uint64 `json:"timestamp"`
Slot uint64 `json:"slot"`
BlockNumber uint64 `json:"block_number"`
BlockHash Hash `json:"block_hash" ssz-size:"32"`
ParentHash Hash `json:"parent_hash" ssz-size:"32"`
BuilderPubkey PublicKey `json:"builder_pubkey" ssz-size:"48"`
FeeRecipient Address `json:"fee_recipient" ssz-size:"20"`
Value U256 `json:"value" ssz-size:"32"`
}
124 changes: 124 additions & 0 deletions common/ultrasoundbid_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package common

import (
ssz "github.com/ferranbt/fastssz"
)

// MarshalSSZ ssz marshals the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) MarshalSSZ() ([]byte, error) {
return ssz.MarshalSSZ(u)
}

// MarshalSSZTo ssz marshals the UltrasoundStreamBid object to a target array
func (u *UltrasoundStreamBid) MarshalSSZTo(buf []byte) (dst []byte, err error) {
dst = buf

// Field (0) 'Timestamp'
dst = ssz.MarshalUint64(dst, u.Timestamp)

// Field (1) 'Slot'
dst = ssz.MarshalUint64(dst, u.Slot)

// Field (2) 'BlockNumber'
dst = ssz.MarshalUint64(dst, u.BlockNumber)

// Field (3) 'BlockHash'
dst = append(dst, u.BlockHash[:]...)

// Field (4) 'ParentHash'
dst = append(dst, u.ParentHash[:]...)

// Field (5) 'BuilderPubkey'
dst = append(dst, u.BuilderPubkey[:]...)

// Field (6) 'FeeRecipient'
dst = append(dst, u.FeeRecipient[:]...)

// Field (7) 'Value'
dst = append(dst, u.Value[:]...)

return
}

// UnmarshalSSZ ssz unmarshals the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) UnmarshalSSZ(buf []byte) error {
var err error
size := uint64(len(buf))
if size != 188 {
return ssz.ErrSize
}

// Field (0) 'Timestamp'
u.Timestamp = ssz.UnmarshallUint64(buf[0:8])

// Field (1) 'Slot'
u.Slot = ssz.UnmarshallUint64(buf[8:16])

// Field (2) 'BlockNumber'
u.BlockNumber = ssz.UnmarshallUint64(buf[16:24])

// Field (3) 'BlockHash'
copy(u.BlockHash[:], buf[24:56])

// Field (4) 'ParentHash'
copy(u.ParentHash[:], buf[56:88])

// Field (5) 'BuilderPubkey'
copy(u.BuilderPubkey[:], buf[88:136])

// Field (6) 'FeeRecipient'
copy(u.FeeRecipient[:], buf[136:156])

// Field (7) 'Value'
copy(u.Value[:], buf[156:188])

return err
}

// SizeSSZ returns the ssz encoded size in bytes for the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) SizeSSZ() (size int) {
size = 188
return
}

// HashTreeRoot ssz hashes the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) HashTreeRoot() ([32]byte, error) {
return ssz.HashWithDefaultHasher(u)
}

// HashTreeRootWith ssz hashes the UltrasoundStreamBid object with a hasher
func (u *UltrasoundStreamBid) HashTreeRootWith(hh ssz.HashWalker) (err error) {
indx := hh.Index()

// Field (0) 'Timestamp'
hh.PutUint64(u.Timestamp)

// Field (1) 'Slot'
hh.PutUint64(u.Slot)

// Field (2) 'BlockNumber'
hh.PutUint64(u.BlockNumber)

// Field (3) 'BlockHash'
hh.PutBytes(u.BlockHash[:])

// Field (4) 'ParentHash'
hh.PutBytes(u.ParentHash[:])

// Field (5) 'BuilderPubkey'
hh.PutBytes(u.BuilderPubkey[:])

// Field (6) 'FeeRecipient'
hh.PutBytes(u.FeeRecipient[:])

// Field (7) 'Value'
hh.PutBytes(u.Value[:])

hh.Merkleize(indx)
return
}

// GetTree ssz hashes the UltrasoundStreamBid object
func (u *UltrasoundStreamBid) GetTree() (*ssz.Node, error) {
return ssz.ProofTree(u)
}
30 changes: 30 additions & 0 deletions common/ultrasoundbid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package common

import (
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/require"
)

func TestValueDecoding(t *testing.T) {
expected := "55539751698389157"
hex := "0xa558e5221c51c500000000000000000000000000000000000000000000000000"
hexBytes := hexutil.MustDecode(hex)
value := new(big.Int).SetBytes(ReverseBytes(hexBytes[:])).String()
require.Equal(t, expected, value)
}

func TestUltrasoundBidSSZDecoding(t *testing.T) {
hex := "0x704b87ce8f010000a94b8c0000000000b6043101000000002c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac074d0668c9e0912134628c32a54854b952234ebb6c1fdd6b053566ac2d2a09498da03b00ddb78b2c111450a5417a8c368c40f1f140cdf97d95b7fa9565467e0bbbe27877d08e01c69b4e5b02b144e6a265df99a0839818b3f120ebac9b73f82b617dc6a5556c71794b1a9c5400000000000000000000000000000000000000000000000000"
bytes := hexutil.MustDecode(hex)
bid := new(UltrasoundStreamBid)
err := bid.UnmarshalSSZ(bytes)
require.NoError(t, err)

require.Equal(t, uint64(1717156924272), bid.Timestamp)
require.Equal(t, uint64(9194409), bid.Slot)
require.Equal(t, uint64(19989686), bid.BlockNumber)
require.Equal(t, "0x2c02b28fd8fdb45fd6ac43dd04adad1449a35b64247b1ed23a723a1fcf6cac07", hexutil.Encode(bid.BlockHash[:]))
}
Loading
Loading