diff --git a/README.md b/README.md index 5c9889f..48da90b 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ source .env.local make dev-postgres-start # Query only a single relay, and for the shortest time possible -go run . core data-api-backfill --relay fb --min-slot -1 +go run . core data-api-backfill --relay fb --min-slot -1000 # Now the DB has data, check it (and update in DB) go run . core check-payload-value diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 9460ecf..aeb7720 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -11,6 +11,7 @@ import ( "github.com/flashbots/relayscan/common" "github.com/flashbots/relayscan/database" "github.com/flashbots/relayscan/vars" + "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -54,7 +55,7 @@ var backfillDataAPICmd = &cobra.Command{ log.Infof("Relayscan %s", vars.Version) log.Infof("Using %d relays", len(relays)) for index, relay := range relays { - log.Infof("relay #%d: %s", index+1, relay.Hostname()) + log.Infof("- relay #%d: %s", index+1, relay.Hostname()) } // Connect to Postgres @@ -62,9 +63,9 @@ var backfillDataAPICmd = &cobra.Command{ // If needed, get latest slot (i.e. if min-slot is negative) if minSlot < 0 { - log.Infof("Getting latest slot from beacon chain for offset %d", minSlot) + log.Infof("Getting latest slot from beaconcha.in for offset %d", minSlot) latestSlotOnBeaconChain := common.MustGetLatestSlot() - log.Infof("- Latest slot from beacon chain: %d", latestSlotOnBeaconChain) + log.Infof("Latest slot from beaconcha.in: %d", latestSlotOnBeaconChain) minSlot = int64(latestSlotOnBeaconChain) + minSlot } @@ -100,7 +101,7 @@ func newBackfiller(db *database.DatabaseService, relay common.RelayEntry, cursor func (bf *backfiller) backfillPayloadsDelivered() error { _log := log.WithField("relay", bf.relay.Hostname()) - _log.Info("backfilling payloads from relay data-api ...") + // _log.Info("backfilling payloads from relay data-api ...") // 1. get latest entry from DB latestEntry, err := bf.db.GetDataAPILatestPayloadDelivered(bf.relay.Hostname()) @@ -111,7 +112,7 @@ func (bf *backfiller) backfillPayloadsDelivered() error { } else { latestSlotInDB = latestEntry.Slot } - _log.Infof("last payload in db for slot: %d", latestSlotInDB) + _log.Infof("Latest payload in DB for slot: %d", latestSlotInDB) // 2. backfill until latest DB entry is reached baseURL := bf.relay.GetURI("/relay/v1/data/bidtraces/proposer_payload_delivered") @@ -125,22 +126,32 @@ func (bf *backfiller) backfillPayloadsDelivered() error { if cursorSlot > 0 { url = fmt.Sprintf("%s?cursor=%d", baseURL, cursorSlot) } - _log.WithField("url: ", url).Info("fetching payloads...") + _log.WithField("url: ", url).Info("Fetching payloads...") var data []relaycommon.BidTraceV2JSON _, err = common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &data) if err != nil { return err } - _log.Infof("- response contains %d delivered payloads", len(data)) + _log.Infof("Response contains %d delivered payloads", len(data)) // build a list of entries for batch DB update entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data)) + slotFirst := uint64(0) + slotLast := uint64(0) for index, payload := range data { _log.Debugf("saving entry for slot %d", payload.Slot) dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), payload) entries[index] = &dbEntry + // Set first and last slot + if slotFirst == 0 || payload.Slot < slotFirst { + slotFirst = payload.Slot + } + if slotLast == 0 || payload.Slot > slotLast { + slotLast = payload.Slot + } + // Count number of slots with payloads if !slotsReceived[payload.Slot] { slotsReceived[payload.Slot] = true @@ -157,12 +168,18 @@ func (bf *backfiller) backfillPayloadsDelivered() error { } // Save entries - err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries) + rowsAffected, err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries) if err != nil { _log.WithError(err).Fatal("failed to save entries") return err } + _log.WithFields(logrus.Fields{ + "rowsAffected": rowsAffected, + "slotFirst": slotFirst, + "slotLast": slotLast, + }).Info("Batch of payloads saved to database") + // Save builders for builderPubkey := range builders { err = bf.db.SaveBuilder(&database.BlockBuilderEntry{BuilderPubkey: builderPubkey}) @@ -179,7 +196,7 @@ func (bf *backfiller) backfillPayloadsDelivered() error { // Stop if at the latest slot in DB if cursorSlot < latestSlotInDB { - _log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB) + _log.Infof("Payloads backfilled until last in DB at slot %d", latestSlotInDB) return nil } diff --git a/database/database.go b/database/database.go index 1948df9..50b35d2 100644 --- a/database/database.go +++ b/database/database.go @@ -67,9 +67,9 @@ func (s *DatabaseService) SaveDataAPIPayloadDelivered(entry *DataAPIPayloadDeliv return err } -func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPayloadDeliveredEntry) error { +func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPayloadDeliveredEntry) (rowsAffected int64, err error) { if len(entries) == 0 { - return nil + return 0, nil } query := `INSERT INTO ` + vars.TableDataAPIPayloadDelivered + ` @@ -84,12 +84,19 @@ func (s *DatabaseService) SaveDataAPIPayloadDeliveredBatch(entries []*DataAPIPay end = len(entries) } - _, err := s.DB.NamedExec(query, entries[i:end]) + r, err := s.DB.NamedExec(query, entries[i:end]) if err != nil { - return err + return 0, err } + + _rowsAffected, err := r.RowsAffected() + if err != nil { + return 0, err + } + + rowsAffected += _rowsAffected } - return nil + return rowsAffected, nil } func (s *DatabaseService) GetDataAPILatestPayloadDelivered(relay string) (*DataAPIPayloadDeliveredEntry, error) {