Skip to content

Commit

Permalink
batch get events when backfilling
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Nov 12, 2024
1 parent da24a0d commit 6c193ad
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 11 deletions.
9 changes: 9 additions & 0 deletions cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"os"
"os/signal"
"runtime/pprof"
"strings"
"syscall"
"time"
Expand All @@ -29,6 +30,14 @@ var Cmd = &cobra.Command{
Use: "run",
Short: "Runs the EVM Gateway Node",
Run: func(*cobra.Command, []string) {

f, err := os.Create("cpu.pprof")
if err != nil {
log.Fatal().Err(err).Msg("could not create cpu profile")
}
pprof.StartCPUProfile(f)

Check failure on line 38 in cmd/run/cmd.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `pprof.StartCPUProfile` is not checked (errcheck)
defer pprof.StopCPUProfile()

// create multi-key account
if _, exists := os.LookupEnv("MULTIKEY_MODE"); exists {
bootstrap.RunCreateMultiKeyAccount()
Expand Down
83 changes: 72 additions & 11 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"sort"
"time"

"github.com/onflow/cadence/common"
"github.com/onflow/flow-go/fvm/evm/events"
Expand Down Expand Up @@ -62,7 +64,8 @@ func NewRPCEventSubscriber(
//
// If error is encountered during backfill the subscription will end and the response chanel will be closed.
func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents {
eventsChan := make(chan models.BlockEvents)
// buffered channel so that the decoding of the events can happen in parallel to other operations
eventsChan := make(chan models.BlockEvents, 1000)

go func() {
defer func() {
Expand Down Expand Up @@ -196,6 +199,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
// the height by one (next height), and check if we are still in previous sporks, if so repeat everything,
// otherwise return.
func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents {
// TODO(JanezP): if we are backfilling, its more efficient to request events in a batch
eventsChan := make(chan models.BlockEvents)

go func() {
Expand Down Expand Up @@ -224,25 +228,82 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan
Uint64("last-spork-height", latestHeight).
Msg("backfilling spork")

for ev := range r.subscribe(ctx, height) {
eventsChan <- ev
ticker := time.NewTicker(time.Millisecond * 10)

if ev.Err != nil {
maxRange := uint64(249)
for height < latestHeight {

// TODO: do rate limiting better
<-ticker.C

startHeight := height
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", startHeight, latestHeight))
endHeight := height + maxRange
if endHeight > latestHeight {
endHeight = latestHeight
}

evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address)
blockExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeBlockExecuted),
).ID()

transactionExecutedEvent := common.NewAddressLocation(
nil,
evmAddress,
string(events.EventTypeTransactionExecuted),
).ID()

//
blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight)
if err != nil {
r.logger.Error().Err(err).Msg("failed to get block events")
eventsChan <- models.NewBlockEventsError(err)
return
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Height < blocks[j].Height
})

transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight)
if err != nil {
r.logger.Error().Err(err).Msg("failed to get block events")
eventsChan <- models.NewBlockEventsError(err)
return
}

r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight))
sort.Slice(transactions, func(i, j int) bool {
return transactions[i].Height < transactions[j].Height
})

if len(transactions) != len(blocks) {
r.logger.Error().Msg("transactions and blocks have different length")
eventsChan <- models.NewBlockEventsError(err)
return
}

if ev.Events != nil && ev.Events.CadenceHeight() == latestHeight {
height = ev.Events.CadenceHeight() + 1 // go to next height in the next spork
for i := range transactions {
if transactions[i].Height != blocks[i].Height {
r.logger.Error().Msg("transactions and blocks have different height")
eventsChan <- models.NewBlockEventsError(err)
return
}
// append the transaction events to the block events
blocks[i].Events = append(blocks[i].Events, transactions[i].Events...)

r.logger.Info().
Uint64("next-height", height).
Msg("reached the end of spork, checking next spork")
evmEvents := models.NewBlockEvents(blocks[i])
height = evmEvents.Events.CadenceHeight() + 1

break
}

}
ticker.Stop()

r.logger.Info().
Uint64("next-height", height).
Msg("reached the end of spork, checking next spork")
}
}()

Expand Down
11 changes: 11 additions & 0 deletions services/requester/cross-spork_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,14 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight(
}
return client.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...)
}

func (c *CrossSporkClient) GetEventsForHeightRange(
ctx context.Context, eventType string, startHeight uint64, endHeight uint64,
) ([]flow.BlockEvents, error) {
// TODO: also make sure the endHeight is not too high
client, err := c.getClientForHeight(startHeight)
if err != nil {
return nil, err
}
return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight)
}

0 comments on commit 6c193ad

Please sign in to comment.