-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingestion Performance improvements #653
Changes from 5 commits
53d6843
4fe9e79
cece1e8
cb45e91
2e65322
b4fee16
b2cb7ba
ca019da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,9 +163,17 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { | |
Int("cadence-event-length", events.Length()). | ||
Msg("received new cadence evm events") | ||
|
||
batch := e.store.NewBatch() | ||
defer func(batch *pebbleDB.Batch) { | ||
err := batch.Close() | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msg("failed to close batch") | ||
} | ||
}(batch) | ||
|
||
// if heartbeat interval with no data still update the cadence height | ||
if events.Empty() { | ||
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), nil); err != nil { | ||
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight(), batch); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was a bug There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice 👌 |
||
return fmt.Errorf( | ||
"failed to update to latest cadence height: %d, during events ingestion: %w", | ||
events.CadenceHeight(), | ||
|
@@ -176,14 +184,6 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { | |
return nil // nothing else to do this was heartbeat event with not event payloads | ||
} | ||
|
||
batch := e.store.NewBatch() | ||
defer func(batch *pebbleDB.Batch) { | ||
err := batch.Close() | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msg("failed to close batch") | ||
} | ||
}(batch) | ||
|
||
// Step 1: Re-execute all transactions on the latest EVM block | ||
|
||
// Step 1.1: Notify the `BlocksProvider` of the newly received EVM block | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |||||||||||||||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||||||||||||||||
"errors" | ||||||||||||||||||||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||||||||||||||||||||
"sort" | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
"github.com/onflow/cadence/common" | ||||||||||||||||||||||||||||||||||||||||||||||
"github.com/onflow/flow-go/fvm/evm/events" | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -62,7 +63,8 @@ func NewRPCEventSubscriber( | |||||||||||||||||||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||||||||||||||||||||
// If error is encountered during backfill the subscription will end and the response chanel will be closed. | ||||||||||||||||||||||||||||||||||||||||||||||
func (r *RPCEventSubscriber) Subscribe(ctx context.Context) <-chan models.BlockEvents { | ||||||||||||||||||||||||||||||||||||||||||||||
eventsChan := make(chan models.BlockEvents) | ||||||||||||||||||||||||||||||||||||||||||||||
// buffered channel so that the decoding of the events can happen in parallel to other operations | ||||||||||||||||||||||||||||||||||||||||||||||
eventsChan := make(chan models.BlockEvents, 1000) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
go func() { | ||||||||||||||||||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -195,7 +197,7 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha | |||||||||||||||||||||||||||||||||||||||||||||
// and check for each event it receives whether we reached the end, if we reach the end it will increase | ||||||||||||||||||||||||||||||||||||||||||||||
// the height by one (next height), and check if we are still in previous sporks, if so repeat everything, | ||||||||||||||||||||||||||||||||||||||||||||||
// otherwise return. | ||||||||||||||||||||||||||||||||||||||||||||||
func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan models.BlockEvents { | ||||||||||||||||||||||||||||||||||||||||||||||
func (r *RPCEventSubscriber) backfill(ctx context.Context, currentHeight uint64) <-chan models.BlockEvents { | ||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
eventsChan := make(chan models.BlockEvents) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
go func() { | ||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -204,49 +206,109 @@ func (r *RPCEventSubscriber) backfill(ctx context.Context, height uint64) <-chan | |||||||||||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||||||||||||||||||
// check if the current height is still in past sporks, and if not return since we are done with backfilling | ||||||||||||||||||||||||||||||||||||||||||||||
if !r.client.IsPastSpork(height) { | ||||||||||||||||||||||||||||||||||||||||||||||
// check if the current currentHeight is still in past sporks, and if not return since we are done with backfilling | ||||||||||||||||||||||||||||||||||||||||||||||
if !r.client.IsPastSpork(currentHeight) { | ||||||||||||||||||||||||||||||||||||||||||||||
r.logger.Info(). | ||||||||||||||||||||||||||||||||||||||||||||||
Uint64("height", height). | ||||||||||||||||||||||||||||||||||||||||||||||
Uint64("height", currentHeight). | ||||||||||||||||||||||||||||||||||||||||||||||
Msg("completed backfilling") | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height) | ||||||||||||||||||||||||||||||||||||||||||||||
var err error | ||||||||||||||||||||||||||||||||||||||||||||||
currentHeight, err = r.backfillSpork(ctx, currentHeight, eventsChan) | ||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
r.logger.Error().Err(err).Msg("error backfilling spork") | ||||||||||||||||||||||||||||||||||||||||||||||
eventsChan <- models.NewBlockEventsError(err) | ||||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
r.logger.Info(). | ||||||||||||||||||||||||||||||||||||||||||||||
Uint64("start-height", height). | ||||||||||||||||||||||||||||||||||||||||||||||
Uint64("last-spork-height", latestHeight). | ||||||||||||||||||||||||||||||||||||||||||||||
Msg("backfilling spork") | ||||||||||||||||||||||||||||||||||||||||||||||
Uint64("next-height", currentHeight). | ||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
Msg("reached the end of spork, checking next spork") | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
for ev := range r.subscribe(ctx, height) { | ||||||||||||||||||||||||||||||||||||||||||||||
eventsChan <- ev | ||||||||||||||||||||||||||||||||||||||||||||||
return eventsChan | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if ev.Err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
// maxRangeForGetEvents is the maximum range of blocks that can be fetched using the GetEventsForHeightRange method. | ||||||||||||||||||||||||||||||||||||||||||||||
const maxRangeForGetEvents = uint64(249) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d]...", ev.Events.CadenceHeight(), latestHeight)) | ||||||||||||||||||||||||||||||||||||||||||||||
func (r *RPCEventSubscriber) backfillSpork(ctx context.Context, fromHeight uint64, eventsChan chan<- models.BlockEvents) (uint64, error) { | ||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you add comment to this function to explain the Is the returned height the first height for the next spork of the fromHeight? |
||||||||||||||||||||||||||||||||||||||||||||||
evmAddress := common.Address(systemcontracts.SystemContractsForChain(r.chain).EVMContract.Address) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if ev.Events != nil && ev.Events.CadenceHeight() == latestHeight { | ||||||||||||||||||||||||||||||||||||||||||||||
height = ev.Events.CadenceHeight() + 1 // go to next height in the next spork | ||||||||||||||||||||||||||||||||||||||||||||||
lastHeight, err := r.client.GetLatestHeightForSpork(ctx, fromHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
eventsChan <- models.NewBlockEventsError(err) | ||||||||||||||||||||||||||||||||||||||||||||||
return 0, err | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
r.logger.Info(). | ||||||||||||||||||||||||||||||||||||||||||||||
Uint64("next-height", height). | ||||||||||||||||||||||||||||||||||||||||||||||
Msg("reached the end of spork, checking next spork") | ||||||||||||||||||||||||||||||||||||||||||||||
r.logger.Info(). | ||||||||||||||||||||||||||||||||||||||||||||||
Uint64("start-height", fromHeight). | ||||||||||||||||||||||||||||||||||||||||||||||
Uint64("last-spork-height", lastHeight). | ||||||||||||||||||||||||||||||||||||||||||||||
Msg("backfilling spork") | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
for fromHeight < lastHeight { | ||||||||||||||||||||||||||||||||||||||||||||||
r.logger.Debug().Msg(fmt.Sprintf("backfilling [%d / %d] ...", fromHeight, lastHeight)) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
startHeight := fromHeight | ||||||||||||||||||||||||||||||||||||||||||||||
endHeight := fromHeight + maxRangeForGetEvents | ||||||||||||||||||||||||||||||||||||||||||||||
if endHeight > lastHeight { | ||||||||||||||||||||||||||||||||||||||||||||||
endHeight = lastHeight | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
blockExecutedEvent := common.NewAddressLocation( | ||||||||||||||||||||||||||||||||||||||||||||||
nil, | ||||||||||||||||||||||||||||||||||||||||||||||
evmAddress, | ||||||||||||||||||||||||||||||||||||||||||||||
string(events.EventTypeBlockExecuted), | ||||||||||||||||||||||||||||||||||||||||||||||
).ID() | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
transactionExecutedEvent := common.NewAddressLocation( | ||||||||||||||||||||||||||||||||||||||||||||||
nil, | ||||||||||||||||||||||||||||||||||||||||||||||
evmAddress, | ||||||||||||||||||||||||||||||||||||||||||||||
string(events.EventTypeTransactionExecuted), | ||||||||||||||||||||||||||||||||||||||||||||||
).ID() | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
return 0, fmt.Errorf("failed to get block events: %w", err) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight) | ||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||
return 0, fmt.Errorf("failed to get block events: %w", err) | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// sort both, just in case | ||||||||||||||||||||||||||||||||||||||||||||||
sort.Slice(blocks, func(i, j int) bool { | ||||||||||||||||||||||||||||||||||||||||||||||
return blocks[i].Height < blocks[j].Height | ||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||
sort.Slice(transactions, func(i, j int) bool { | ||||||||||||||||||||||||||||||||||||||||||||||
return transactions[i].Height < transactions[j].Height | ||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
if len(transactions) != len(blocks) { | ||||||||||||||||||||||||||||||||||||||||||||||
return 0, fmt.Errorf("transactions and blocks have different length") | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can perform the check first
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
for i := range transactions { | ||||||||||||||||||||||||||||||||||||||||||||||
if transactions[i].Height != blocks[i].Height { | ||||||||||||||||||||||||||||||||||||||||||||||
return 0, fmt.Errorf("transactions and blocks have different height") | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// append the transaction events to the block events | ||||||||||||||||||||||||||||||||||||||||||||||
blocks[i].Events = append(blocks[i].Events, transactions[i].Events...) | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
evmEvents := models.NewBlockEvents(blocks[i]) | ||||||||||||||||||||||||||||||||||||||||||||||
eventsChan <- evmEvents | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// advance the height | ||||||||||||||||||||||||||||||||||||||||||||||
fromHeight = evmEvents.Events.CadenceHeight() + 1 | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
return eventsChan | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
return fromHeight, nil | ||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||
// fetchMissingData is used as a backup mechanism for fetching EVM-related | ||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this files are not needed for performance, but they made it slightly easier to test things, so I left them in.