Skip to content

Commit

Permalink
add comments to data-api-backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
metachris committed Jul 25, 2024
1 parent 652349b commit dc7818f
Showing 1 changed file with 29 additions and 27 deletions.
56 changes: 29 additions & 27 deletions cmd/core/data-api-backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ var (
cliRelay string
minSlot int64
initCursor uint64
// bidsOnly bool
)

func init() {
backfillDataAPICmd.Flags().StringVar(&cliRelay, "relay", "", "specific relay only")
backfillDataAPICmd.Flags().Uint64Var(&initCursor, "cursor", 0, "initial cursor")
backfillDataAPICmd.Flags().Int64Var(&minSlot, "min-slot", 0, "minimum slot (if unset, backfill until the merge, negative number for that number of slots before latest)")
// backfillDataAPICmd.Flags().BoolVar(&bidsOnly, "bids", false, "only bids")
}

var backfillDataAPICmd = &cobra.Command{
Expand Down Expand Up @@ -76,7 +74,6 @@ var backfillDataAPICmd = &cobra.Command{

for _, relay := range relays {
backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot))
// backfiller.backfillDataAPIBids()
err = backfiller.backfillPayloadsDelivered()
if err != nil {
log.WithError(err).WithField("relay", relay).Error("backfill failed")
Expand All @@ -102,18 +99,19 @@ func newBackfiller(db *database.DatabaseService, relay common.RelayEntry, cursor
}

func (bf *backfiller) backfillPayloadsDelivered() error {
log.Infof("backfilling payloads data-api for relay %s ...", bf.relay.Hostname())
_log := log.WithField("relay", bf.relay.Hostname())
_log.Info("backfilling payloads from relay data-api ...")

// 1. get latest entry from DB
latestEntry, err := bf.db.GetDataAPILatestPayloadDelivered(bf.relay.Hostname())
latestSlotInDB := uint64(0)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.WithError(err).Fatal("failed to get latest entry")
_log.WithError(err).Fatal("failed to get latest entry")
return err
} else {
latestSlotInDB = latestEntry.Slot
}
log.Infof("last payload in db at slot: %d", latestSlotInDB)
_log.Infof("last payload in db for slot: %d", latestSlotInDB)

// 2. backfill until latest DB entry is reached
baseURL := bf.relay.GetURI("/relay/v1/data/bidtraces/proposer_payload_delivered")
Expand All @@ -127,65 +125,69 @@ func (bf *backfiller) backfillPayloadsDelivered() error {
if cursorSlot > 0 {
url = fmt.Sprintf("%s?cursor=%d", baseURL, cursorSlot)
}
log.Info("url: ", url)
_log.WithField("url: ", url).Info("fetching payloads...")
var data []relaycommon.BidTraceV2JSON
_, err = common.SendHTTPRequest(context.Background(), *http.DefaultClient, http.MethodGet, url, nil, &data)
if err != nil {
return err
}

log.Infof("got %d entries", len(data))
entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data))
_log.Infof("- response contains %d delivered payloads", len(data))

for index, dataEntry := range data {
log.Debugf("saving entry for slot %d", dataEntry.Slot)
dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), dataEntry)
// build a list of entries for batch DB update
entries := make([]*database.DataAPIPayloadDeliveredEntry, len(data))
for index, payload := range data {
_log.Debugf("saving entry for slot %d", payload.Slot)
dbEntry := database.BidTraceV2JSONToPayloadDeliveredEntry(bf.relay.Hostname(), payload)
entries[index] = &dbEntry

if !slotsReceived[dataEntry.Slot] {
slotsReceived[dataEntry.Slot] = true
// Count number of slots with payloads
if !slotsReceived[payload.Slot] {
slotsReceived[payload.Slot] = true
payloadsNew += 1
}

if cursorSlot == 0 {
log.Infof("latest received payload at slot %d", dataEntry.Slot)
cursorSlot = dataEntry.Slot
} else if cursorSlot > dataEntry.Slot {
cursorSlot = dataEntry.Slot
// Set cursor for next request
if cursorSlot == 0 || cursorSlot > payload.Slot {
cursorSlot = payload.Slot
}

builders[dataEntry.BuilderPubkey] = true
// Remember the builder
builders[payload.BuilderPubkey] = true
}

// Save entries
err := bf.db.SaveDataAPIPayloadDeliveredBatch(entries)
if err != nil {
log.WithError(err).Fatal("failed to save entries")
_log.WithError(err).Fatal("failed to save entries")
return err
}

// save builders
// Save builders
for builderPubkey := range builders {
err = bf.db.SaveBuilder(&database.BlockBuilderEntry{BuilderPubkey: builderPubkey})
if err != nil {
log.WithError(err).Error("failed to save builder")
_log.WithError(err).Error("failed to save builder")
}
}

// Stop as soon as no new payloads are received
if payloadsNew == 0 {
log.Infof("No new payloads, all done. Earliest payload for slot: %d", cursorSlot)
_log.Infof("No new payloads, all done. Earliest payload for slot: %d", cursorSlot)
return nil
}

// Stop if at the latest slot in DB
if cursorSlot < latestSlotInDB {
log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB)
_log.Infof("Payloads backfilled until last in DB - at slot %d", latestSlotInDB)
return nil
}

// Stop if at min slot
if cursorSlot < bf.minSlot {
log.Infof("Payloads backfilled until min slot %d", bf.minSlot)
_log.Infof("Payloads backfilled until min slot %d", bf.minSlot)
return nil
}
// time.Sleep(1 * time.Second)
}
}

Expand Down

0 comments on commit dc7818f

Please sign in to comment.