Skip to content

Commit

Permalink
Merge pull request #760 from onflow/mpeter/make-ingestion-idempotent
Browse files Browse the repository at this point in the history
Make ingestion of EVM events idempotent
  • Loading branch information
peterargue authored Feb 21, 2025
2 parents 7274b59 + 55713b8 commit 5b8d549
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 144 deletions.
11 changes: 2 additions & 9 deletions api/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,11 @@ func (d *DebugAPI) traceBlockByNumber(
}

func (d *DebugAPI) executorAtBlock(block *models.Block) (*evm.BlockExecutor, error) {
previousBlock, err := d.blocks.GetByHeight(block.Height - 1)
if err != nil {
return nil, err
}

// We need to re-execute all the transactions from the given block,
// on top of the previous block state, to generate the correct traces.
snapshot, err := d.registerStore.GetSnapshotAt(previousBlock.Height)
snapshot, err := d.registerStore.GetSnapshotAt(block.Height)
if err != nil {
return nil, fmt.Errorf(
"failed to get register snapshot at block height %d: %w",
previousBlock.Height,
block.Height,
err,
)
}
Expand Down
58 changes: 37 additions & 21 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (b *Bootstrap) StopEventIngestion() {
}

func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
b.logger.Info().Msg("bootstrap starting metrics server")
b.logger.Info().Msg("bootstrap starting API server")

b.server = api.NewServer(b.logger, b.collector, b.config)

Expand Down Expand Up @@ -372,7 +372,6 @@ func (b *Bootstrap) StopAPIServer() {
}

func (b *Bootstrap) StartMetricsServer(ctx context.Context) error {

b.metrics = newMetricsWrapper(b.logger, b.config.MetricsPort)
return b.metrics.Start(ctx)
}
Expand Down Expand Up @@ -639,45 +638,62 @@ func setupStorage(
}, nil
}

// Run will run complete bootstrap of the EVM gateway with all the engines.
// Run is a blocking call, but it does signal readiness of the service
// through a channel provided as an argument.
func Run(ctx context.Context, cfg config.Config, ready component.ReadyFunc) error {
boot, err := New(cfg)
if err != nil {
return err
}

func (b *Bootstrap) Run(
ctx context.Context,
cfg config.Config,
ready component.ReadyFunc,
) error {
// Start the API Server first, to avoid any races with incoming
// EVM events, that might affect the starting state.
if err := boot.StartAPIServer(ctx); err != nil {
if err := b.StartAPIServer(ctx); err != nil {
return fmt.Errorf("failed to start API server: %w", err)
}

if err := boot.StartEventIngestion(ctx); err != nil {
if err := b.StartEventIngestion(ctx); err != nil {
return fmt.Errorf("failed to start event ingestion engine: %w", err)
}

if err := boot.StartMetricsServer(ctx); err != nil {
if err := b.StartMetricsServer(ctx); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}

if err := boot.StartProfilerServer(ctx); err != nil {
if err := b.StartProfilerServer(ctx); err != nil {
return fmt.Errorf("failed to start profiler server: %w", err)
}

// mark ready
ready()

return nil
}

func (b *Bootstrap) Stop() {
b.logger.Info().Msg("bootstrap received context cancellation, stopping services")

b.StopEventIngestion()
b.StopMetricsServer()
b.StopAPIServer()
b.StopClient()
b.StopDB()
}

// Run will run complete bootstrap of the EVM gateway with all the engines.
// Run is a blocking call, but it does signal readiness of the service
// through a channel provided as an argument.
func Run(ctx context.Context, cfg config.Config, ready component.ReadyFunc) error {
boot, err := New(cfg)
if err != nil {
return err
}

if err := boot.Run(ctx, cfg, ready); err != nil {
return err
}

// if context is canceled start shutdown
<-ctx.Done()
boot.logger.Warn().Msg("bootstrap received context cancellation, stopping services")

boot.StopEventIngestion()
boot.StopMetricsServer()
boot.StopAPIServer()
boot.StopClient()
boot.StopDB()
boot.Stop()

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/onflow/atree v0.9.0
github.com/onflow/cadence v1.3.1
github.com/onflow/flow-go v0.38.1-0.20250213171922-77f4db56bb54
github.com/onflow/flow-go v0.38.1-0.20250218174738-2181389f9f7d
github.com/onflow/flow-go-sdk v1.3.1
github.com/onflow/go-ethereum v1.14.7
github.com/prometheus/client_golang v1.20.5
Expand Down Expand Up @@ -135,13 +135,13 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/onflow/crypto v0.25.2 // indirect
github.com/onflow/flow-core-contracts/lib/go/contracts v1.4.0 // indirect
github.com/onflow/flow-core-contracts/lib/go/templates v1.4.0 // indirect
github.com/onflow/flow-core-contracts/lib/go/contracts v1.5.1-preview // indirect
github.com/onflow/flow-core-contracts/lib/go/templates v1.5.1-preview // indirect
github.com/onflow/flow-ft/lib/go/contracts v1.0.1 // indirect
github.com/onflow/flow-ft/lib/go/templates v1.0.1 // indirect
github.com/onflow/flow-nft/lib/go/contracts v1.2.2 // indirect
github.com/onflow/flow-nft/lib/go/contracts v1.2.3 // indirect
github.com/onflow/flow-nft/lib/go/templates v1.2.1 // indirect
github.com/onflow/flow/protobuf/go/flow v0.4.7 // indirect
github.com/onflow/flow/protobuf/go/flow v0.4.9 // indirect
github.com/onflow/sdks v0.6.0-preview.1 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/gomega v1.18.1 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -520,24 +520,24 @@ github.com/onflow/cadence v1.3.1 h1:bs9TFHQy8HHbwTtCtg5cLdyndWhmwq55RSwID1cb220=
github.com/onflow/cadence v1.3.1/go.mod h1:6/47FljVAdl3/31tShI8JOJW0sXYZHK1PwXkE+yk0qA=
github.com/onflow/crypto v0.25.2 h1:GjHunqVt+vPcdqhxxhAXiMIF3YiLX7gTuTR5O+VG2ns=
github.com/onflow/crypto v0.25.2/go.mod h1:fY7eLqUdMKV8EGOw301unP8h7PvLVy8/6gVR++/g0BY=
github.com/onflow/flow-core-contracts/lib/go/contracts v1.4.0 h1:R86HaOuk6vpuECZnriEUE7bw9inC2AtdSn8lL/iwQLQ=
github.com/onflow/flow-core-contracts/lib/go/contracts v1.4.0/go.mod h1:9asTBnB6Tw2UlVVtQKyS/egYv3xr4zVlJnJ75z1dfac=
github.com/onflow/flow-core-contracts/lib/go/templates v1.4.0 h1:u2DAG8pk0xFH7TwS70t1gSZ/FtIIZWMSNyiu4SeXBYg=
github.com/onflow/flow-core-contracts/lib/go/templates v1.4.0/go.mod h1:pN768Al/wLRlf3bwugv9TyxniqJxMu4sxnX9eQJam64=
github.com/onflow/flow-core-contracts/lib/go/contracts v1.5.1-preview h1:W+QkNQcIbhtR+zXVROKq0bdDEnvzUfUrQrCmegmwzvc=
github.com/onflow/flow-core-contracts/lib/go/contracts v1.5.1-preview/go.mod h1:LyCICUK6sK1jtEyb+3GuRw5tYfHT1uxACLwLTLxw/0I=
github.com/onflow/flow-core-contracts/lib/go/templates v1.5.1-preview h1:C0PraQFfwpav4nJAf/RPE9BJyYD6lUMvt+cJyiMDeis=
github.com/onflow/flow-core-contracts/lib/go/templates v1.5.1-preview/go.mod h1:pN768Al/wLRlf3bwugv9TyxniqJxMu4sxnX9eQJam64=
github.com/onflow/flow-ft/lib/go/contracts v1.0.1 h1:Ts5ob+CoCY2EjEd0W6vdLJ7hLL3SsEftzXG2JlmSe24=
github.com/onflow/flow-ft/lib/go/contracts v1.0.1/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A=
github.com/onflow/flow-ft/lib/go/templates v1.0.1 h1:FDYKAiGowABtoMNusLuRCILIZDtVqJ/5tYI4VkF5zfM=
github.com/onflow/flow-ft/lib/go/templates v1.0.1/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE=
github.com/onflow/flow-go v0.38.1-0.20250213171922-77f4db56bb54 h1:JizTOEQ6ME++LOEqOmKUxtM0Q8tpHUvVJBhqBrgDeE0=
github.com/onflow/flow-go v0.38.1-0.20250213171922-77f4db56bb54/go.mod h1:p88l1DhHObIUr+8b0zqkwH0bklZmQksb6IFH8ZaL1Bg=
github.com/onflow/flow-go v0.38.1-0.20250218174738-2181389f9f7d h1:XRefc4rcBjGDEqsj3OB6XjSjSeYwMtysl7jmaLYpg+s=
github.com/onflow/flow-go v0.38.1-0.20250218174738-2181389f9f7d/go.mod h1:VS7MlNHZeDrGm9/jkuMCSvAQTLFXpzQD0BIMB8/QYB8=
github.com/onflow/flow-go-sdk v1.3.1 h1:2YdTL/R1/DjMYYmyKgArTeQ93GKvLlfCeCpMVH7b8q4=
github.com/onflow/flow-go-sdk v1.3.1/go.mod h1:0rMuCLShdX9F4pLBCPhlMGCFu8gu9SfiXT/Lc9qAi24=
github.com/onflow/flow-nft/lib/go/contracts v1.2.2 h1:XFERNVUDGbZ4ViZjt7P1cGD80mO1PzUJYPfdhXFsGbQ=
github.com/onflow/flow-nft/lib/go/contracts v1.2.2/go.mod h1:eZ9VMMNfCq0ho6kV25xJn1kXeCfxnkhj3MwF3ed08gY=
github.com/onflow/flow-nft/lib/go/contracts v1.2.3 h1:4ju20g1xgDKWBT63rOj5f/Sa4Lc+naCSWT4p31x9yQk=
github.com/onflow/flow-nft/lib/go/contracts v1.2.3/go.mod h1:eZ9VMMNfCq0ho6kV25xJn1kXeCfxnkhj3MwF3ed08gY=
github.com/onflow/flow-nft/lib/go/templates v1.2.1 h1:SAALMZPDw9Eb9p5kSLnmnFxjyig1MLiT4JUlLp0/bSE=
github.com/onflow/flow-nft/lib/go/templates v1.2.1/go.mod h1:W6hOWU0xltPqNpv9gQX8Pj8Jtf0OmRxc1XX2V0kzJaI=
github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc=
github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk=
github.com/onflow/flow/protobuf/go/flow v0.4.9 h1:UfsWWqj6VQbEHvaw8kSGvIawCpEfz3gOGZfcdugNxVE=
github.com/onflow/flow/protobuf/go/flow v0.4.9/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk=
github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc=
github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I=
github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc=
Expand Down
159 changes: 82 additions & 77 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,84 @@ import (
"github.com/rs/zerolog"
)

var apiErrors = prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("api_errors_total"),
Help: "Total number of API errors",
})

var serverPanicsCounters = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prefixedName("api_server_panics_total"),
Help: "Total number of panics in the API server",
}, []string{"reason"})

var operatorBalance = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("operator_balance"),
Help: "Flow balance of the EVM gateway operator wallet",
})

var cadenceBlockHeight = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("cadence_block_height"),
Help: "Current Cadence block height",
})

var evmBlockHeight = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("evm_block_height"),
Help: "Current EVM block height",
})

var evmBlockIndexedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("blocks_indexed_total"),
Help: "Total number of blocks indexed",
})

var evmTxIndexedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("txs_indexed_total"),
Help: "Total number transactions indexed",
})

var evmAccountCallCounters = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prefixedName("evm_account_interactions_total"),
Help: "Total number of account interactions",
}, []string{"address"})

// TODO: Think of adding 'status_code'
var requestDurations = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prefixedName("api_request_duration_seconds"),
Help: "Duration of the request made a specific API endpoint",
Buckets: prometheus.DefBuckets,
}, []string{"method"})

var availableSigningKeys = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("available_signing_keys"),
Help: "Number of keys available for transaction signing",
})

var gasEstimationIterations = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("gas_estimation_iterations"),
Help: "Number of iterations taken to estimate the gas of a EVM call/tx",
})

var blockIngestionTime = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prefixedName("block_ingestion_time_seconds"),
Help: "Time taken to fully ingest an EVM block in the local state index",
Buckets: prometheus.DefBuckets,
})

var metrics = []prometheus.Collector{
apiErrors,
serverPanicsCounters,
cadenceBlockHeight,
evmBlockHeight,
evmBlockIndexedCounter,
evmTxIndexedCounter,
operatorBalance,
evmAccountCallCounters,
requestDurations,
availableSigningKeys,
gasEstimationIterations,
blockIngestionTime,
}

type Collector interface {
ApiErrorOccurred()
ServerPanicked(reason string)
Expand Down Expand Up @@ -42,83 +120,6 @@ type DefaultCollector struct {
}

func NewCollector(logger zerolog.Logger) Collector {
apiErrors := prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("api_errors_total"),
Help: "Total number of API errors",
})

serverPanicsCounters := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prefixedName("api_server_panics_total"),
Help: "Total number of panics in the API server",
}, []string{"reason"})

operatorBalance := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("operator_balance"),
Help: "Flow balance of the EVM gateway operator wallet",
})

cadenceBlockHeight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("cadence_block_height"),
Help: "Current Cadence block height",
})

evmBlockHeight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("evm_block_height"),
Help: "Current EVM block height",
})

evmBlockIndexedCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("blocks_indexed_total"),
Help: "Total number of blocks indexed",
})

evmTxIndexedCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("txs_indexed_total"),
Help: "Total number transactions indexed",
})

evmAccountCallCounters := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prefixedName("evm_account_interactions_total"),
Help: "Total number of account interactions",
}, []string{"address"})

// TODO: Think of adding 'status_code'
requestDurations := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prefixedName("api_request_duration_seconds"),
Help: "Duration of the request made a specific API endpoint",
Buckets: prometheus.DefBuckets,
}, []string{"method"})

availableSigningKeys := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("available_signing_keys"),
Help: "Number of keys available for transaction signing",
})

gasEstimationIterations := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("gas_estimation_iterations"),
Help: "Number of iterations taken to estimate the gas of a EVM call/tx",
})

blockIngestionTime := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prefixedName("block_ingestion_time_seconds"),
Help: "Time taken to fully ingest an EVM block in the local state index",
Buckets: prometheus.DefBuckets,
})

metrics := []prometheus.Collector{
apiErrors,
serverPanicsCounters,
cadenceBlockHeight,
evmBlockHeight,
evmBlockIndexedCounter,
evmTxIndexedCounter,
operatorBalance,
evmAccountCallCounters,
requestDurations,
availableSigningKeys,
gasEstimationIterations,
blockIngestionTime,
}
if err := registerMetrics(logger, metrics...); err != nil {
logger.Info().Msg("using noop collector as metric register failed")
return NopCollector
Expand All @@ -142,6 +143,10 @@ func NewCollector(logger zerolog.Logger) Collector {

func registerMetrics(logger zerolog.Logger, metrics ...prometheus.Collector) error {
for _, m := range metrics {
// During E2E tests, the EVM GW might be bootstrapped again
// and again, so we make sure to register the metrics on a
// clean state.
prometheus.Unregister(m)
if err := prometheus.Register(m); err != nil {
logger.Err(err).Msg("failed to register metric")
return err
Expand Down
21 changes: 17 additions & 4 deletions storage/pebble/register_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,23 @@ func newLookupKey(height uint64, key []byte) *lookupKey {
return &lookupKey
}

// GetSnapshotAt returns a snapshot of the register index at the given block height.
// the snapshot has a cache. Nil values are cached.
func (r *RegisterStorage) GetSnapshotAt(evmBlockHeight uint64) (types.BackendStorageSnapshot, error) {
return NewStorageSnapshot(r.Get, evmBlockHeight), nil
// GetSnapshotAt returns a snapshot of the register index at the start of the
// given block height (which is the end of the previous block).
// The snapshot has a cache. Nil values are cached.
func (r *RegisterStorage) GetSnapshotAt(
evmBlockHeightOfStartStateToQuery uint64,
) (types.BackendStorageSnapshot, error) {
var snapshotHeightOfEndState uint64
if evmBlockHeightOfStartStateToQuery > 0 {
// `evmBlockHeightOfStartStateToQuery-1` to get the end state of the previous block.
snapshotHeightOfEndState = evmBlockHeightOfStartStateToQuery - 1
} else {
// Avoid a possible underflow
snapshotHeightOfEndState = uint64(0)
}

// NewStorageSnapshot return the end state of a given height.
return NewStorageSnapshot(r.Get, snapshotHeightOfEndState), nil
}

func registerOwnerMismatch(expected flow.Address, owner flow.Address) error {
Expand Down
Loading

0 comments on commit 5b8d549

Please sign in to comment.