Skip to content

Commit

Permalink
backfill: logs cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Jul 25, 2024
1 parent dc7818f commit a851144
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ source .env.local
make dev-postgres-start

# Query only a single relay, and for the shortest time possible
go run . core data-api-backfill --relay fb --min-slot -1
go run . core data-api-backfill --relay fb --min-slot -1000

# Now the DB has data, check it (and update in DB)
go run . core check-payload-value
Expand Down
35 changes: 26 additions & 9 deletions cmd/core/data-api-backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flashbots/relayscan/common"
"github.com/flashbots/relayscan/database"
"github.com/flashbots/relayscan/vars"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -54,17 +55,17 @@ var backfillDataAPICmd = &cobra.Command{
log.Infof("Relayscan %s", vars.Version)
log.Infof("Using %d relays", len(relays))
for index, relay := range relays {
log.Infof("relay #%d: %s", index+1, relay.Hostname())
log.Infof("- relay #%d: %s", index+1, relay.Hostname())
}

// Connect to Postgres
db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN)

// If needed, get latest slot (i.e. if min-slot is negative)
if minSlot < 0 {
log.Infof("Getting latest slot from beacon chain for offset %d", minSlot)
log.Infof("Getting latest slot from beaconcha.in for offset %d", minSlot)
latestSlotOnBeaconChain := common.MustGetLatestSlot()
log.Infof("- Latest slot from beacon chain: %d", latestSlotOnBeaconChain)
log.Infof("Latest slot from beaconcha.in: %d", latestSlotOnBeaconChain)
minSlot = int64(latestSlotOnBeaconChain) + minSlot
}

Expand Down Expand Up @@ -100,7 +101,7 @@ func newBackfiller(db *database.DatabaseService, relay common.RelayEntry, cursor

func (bf *backfiller) backfillPayloadsDelivered() error {
_log := log.WithField("relay", bf.relay.Hostname())
_log.Info("backfilling payloads from relay data-api ...")
// _log.Info("backfilling payloads from relay data-api ...")

// 1. get latest entry from DB
latestEntry, err := bf.db.GetDataAPILatestPayloadDelivered(bf.relay.Hostname())
Expand All @@ -111,7 +112,7 @@ func (bf *backfiller) backfillPayloadsDelivered() error {
} else {
latestSlotInDB = latestEntry.Slot
}
_log.Infof("last payload in db for slot: %d", latestSlotInDB)
_log.Infof("Latest payload in DB for slot: %d", latestSlotInDB)

// 2. backfill until latest DB entry is reached
baseURL := bf.relay.GetURI("/relay/v1/data/bidtraces/proposer_payload_delivered")
Expand All @@ -125,22 +126,32 @@ func (bf *backfiller) backfillPayloadsDelivered() error {
if cursorSlot > 0 {
url = fmt.Sprintf("%s?cursor=%d", baseURL, cursorSlot)
}
_log.WithField("url: ", url).Info("fetching payloads...")
_log.WithField("url: ", url).Info("Fetching payloads...")
var data []relaycommon.BidTraceV2JSON
_, err = common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &data)
if err != nil {
return err
}

_log.Infof("- response contains %d delivered payloads", len(data))
_log.Infof("Response contains %d delivered payloads", len(data))

// build a list of entries for batch DB update
entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data))
slotFirst := uint64(0)
slotLast := uint64(0)
for index, payload := range data {
_log.Debugf("saving entry for slot %d", payload.Slot)
dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), payload)
entries[index] = &dbEntry

// Set first and last slot
if slotFirst == 0 || payload.Slot < slotFirst {
slotFirst = payload.Slot
}
if slotLast == 0 || payload.Slot > slotLast {
slotLast = payload.Slot
}

// Count number of slots with payloads
if !slotsReceived[payload.Slot] {
slotsReceived[payload.Slot] = true
Expand All @@ -157,12 +168,18 @@ func (bf *backfiller) backfillPayloadsDelivered() error {
}

// Save entries
err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries)
rowsAffected, err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries)
if err != nil {
_log.WithError(err).Fatal("failed to save entries")
return err
}

_log.WithFields(logrus.Fields{
"rowsAffected": rowsAffected,
"slotFirst": slotFirst,
"slotLast": slotLast,
}).Info("Batch of payloads saved to database")

// Save builders
for builderPubkey := range builders {
err = bf.db.SaveBuilder(&database.BlockBuilderEntry{BuilderPubkey: builderPubkey})
Expand All @@ -179,7 +196,7 @@ func (bf *backfiller) backfillPayloadsDelivered() error {

// Stop if at the latest slot in DB
if cursorSlot < latestSlotInDB {
_log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB)
_log.Infof("Payloads backfilled until last in DB at slot %d", latestSlotInDB)
return nil
}

Expand Down
17 changes: 12 additions & 5 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func (s *DatabaseService) SaveDataAPIPayloadDelivered(entry *DataAPIPayloadDeliv
return err
}

func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPayloadDeliveredEntry) error {
func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPayloadDeliveredEntry) (rowsAffected int64, err error) {
if len(entries) == 0 {
return nil
return 0, nil
}

query := `INSERT INTO ` + vars.TableDataAPIPayloadDelivered + `
Expand All @@ -84,12 +84,19 @@ func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPay
end = len(entries)
}

_, err := s.DB.NamedExec(query, entries[i:end])
r, err := s.DB.NamedExec(query, entries[i:end])
if err != nil {
return err
return 0, err
}

_rowsAffected, err := r.RowsAffected()
if err != nil {
return 0, err
}

rowsAffected += _rowsAffected
}
return nil
return rowsAffected, nil
}

func (s *DatabaseService) GetDataAPILatestPayloadDelivered(relay string) (*DataAPIPayloadDeliveredEntry, error) {
Expand Down

0 comments on commit a851144

Please sign in to comment.