From dc7818f751acf09c2eefbc9cca9225cc6de1db64 Mon Sep 17 00:00:00 2001 From: Chris Hager Date: Fri, 26 Jul 2024 00:39:57 +0200 Subject: [PATCH] add comments to data-api-backfill --- cmd/core/data-api-backfill.go | 56 ++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 27 deletions(-) diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 7145df0..9460ecf 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -18,14 +18,12 @@ var ( cliRelay string minSlot int64 initCursor uint64 - // bidsOnly bool ) func init() { backfillDataAPICmd.Flags().StringVar(&cliRelay, "relay", "", "specific relay only") backfillDataAPICmd.Flags().Uint64Var(&initCursor, "cursor", 0, "initial cursor") backfillDataAPICmd.Flags().Int64Var(&minSlot, "min-slot", 0, "minimum slot (if unset, backfill until the merge, negative number for that number of slots before latest)") - // backfillDataAPICmd.Flags().BoolVar(&bidsOnly, "bids", false, "only bids") } var backfillDataAPICmd = &cobra.Command{ @@ -76,7 +74,6 @@ var backfillDataAPICmd = &cobra.Command{ for _, relay := range relays { backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot)) - // backfiller.backfillDataAPIBids() err = backfiller.backfillPayloadsDelivered() if err != nil { log.WithError(err).WithField("relay", relay).Error("backfill failed") @@ -102,18 +99,19 @@ func newBackfiller(db *database.DatabaseService, relay common.RelayEntry, cursor } func (bf *backfiller) backfillPayloadsDelivered() error { - log.Infof("backfilling payloads data-api for relay %s ...", bf.relay.Hostname()) + _log := log.WithField("relay", bf.relay.Hostname()) + _log.Info("backfilling payloads from relay data-api ...") // 1. get latest entry from DB latestEntry, err := bf.db.GetDataAPILatestPayloadDelivered(bf.relay.Hostname()) latestSlotInDB := uint64(0) if err != nil && !errors.Is(err, sql.ErrNoRows) { - log.WithError(err).Fatal("failed to get latest entry") + _log.WithError(err).Fatal("failed to get latest entry") return err } else { latestSlotInDB = latestEntry.Slot } - log.Infof("last payload in db at slot: %d", latestSlotInDB) + _log.Infof("last payload in db for slot: %d", latestSlotInDB) // 2. backfill until latest DB entry is reached baseURL := bf.relay.GetURI("/relay/v1/data/bidtraces/proposer_payload_delivered") @@ -127,65 +125,69 @@ func (bf *backfiller) backfillPayloadsDelivered() error { if cursorSlot > 0 { url = fmt.Sprintf("%s?cursor=%d", baseURL, cursorSlot) } - log.Info("url: ", url) + _log.WithField("url: ", url).Info("fetching payloads...") var data []relaycommon.BidTraceV2JSON _, err = common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &data) if err != nil { return err } - log.Infof("got %d entries", len(data)) - entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data)) + _log.Infof("- response contains %d delivered payloads", len(data)) - for index, dataEntry := range data { - log.Debugf("saving entry for slot %d", dataEntry.Slot) - dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), dataEntry) + // build a list of entries for batch DB update + entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data)) + for index, payload := range data { + _log.Debugf("saving entry for slot %d", payload.Slot) + dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), payload) entries[index] = &dbEntry - if !slotsReceived[dataEntry.Slot] { - slotsReceived[dataEntry.Slot] = true + // Count number of slots with payloads + if !slotsReceived[payload.Slot] { + slotsReceived[payload.Slot] = true payloadsNew += 1 } - if cursorSlot == 0 { - log.Infof("latest received payload at slot %d", dataEntry.Slot) - cursorSlot = dataEntry.Slot - } else if cursorSlot > dataEntry.Slot { - cursorSlot = dataEntry.Slot + // Set cursor for next request + if cursorSlot == 0 || cursorSlot > payload.Slot { + cursorSlot = payload.Slot } - builders[dataEntry.BuilderPubkey] = true + // Remember the builder + builders[payload.BuilderPubkey] = true } + // Save entries err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries) if err != nil { - log.WithError(err).Fatal("failed to save entries") + _log.WithError(err).Fatal("failed to save entries") return err } - // save builders + // Save builders for builderPubkey := range builders { err = bf.db.SaveBuilder(&database.BlockBuilderEntry{BuilderPubkey: builderPubkey}) if err != nil { - log.WithError(err).Error("failed to save builder") + _log.WithError(err).Error("failed to save builder") } } + // Stop as soon as no new payloads are received if payloadsNew == 0 { - log.Infof("No new payloads, all done. Earliest payload for slot: %d", cursorSlot) + _log.Infof("No new payloads, all done. Earliest payload for slot: %d", cursorSlot) return nil } + // Stop if at the latest slot in DB if cursorSlot < latestSlotInDB { - log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB) + _log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB) return nil } + // Stop if at min slot if cursorSlot < bf.minSlot { - log.Infof("Payloads backfilled until min slot %d", bf.minSlot) + _log.Infof("Payloads backfilled until min slot %d", bf.minSlot) return nil } - // time.Sleep(1 * time.Second) } }