Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ingestion of EVM events idempotent #760

Merged
merged 13 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions api/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,18 +419,11 @@ func (d *DebugAPI) traceBlockByNumber(
}

func (d *DebugAPI) executorAtBlock(block *models.Block) (*evm.BlockExecutor, error) {
previousBlock, err := d.blocks.GetByHeight(block.Height - 1)
if err != nil {
return nil, err
}

// We need to re-execute all the transactions from the given block,
// on top of the previous block state, to generate the correct traces.
snapshot, err := d.registerStore.GetSnapshotAt(previousBlock.Height)
snapshot, err := d.registerStore.GetSnapshotAt(block.Height)
if err != nil {
return nil, fmt.Errorf(
"failed to get register snapshot at block height %d: %w",
previousBlock.Height,
block.Height,
err,
)
}
Expand Down
58 changes: 37 additions & 21 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (b *Bootstrap) StopEventIngestion() {
}

func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
b.logger.Info().Msg("bootstrap starting metrics server")
b.logger.Info().Msg("bootstrap starting API server")

b.server = api.NewServer(b.logger, b.collector, b.config)

Expand Down Expand Up @@ -372,7 +372,6 @@ func (b *Bootstrap) StopAPIServer() {
}

func (b *Bootstrap) StartMetricsServer(ctx context.Context) error {

b.metrics = newMetricsWrapper(b.logger, b.config.MetricsPort)
return b.metrics.Start(ctx)
}
Expand Down Expand Up @@ -639,45 +638,62 @@ func setupStorage(
}, nil
}

// Run will run complete bootstrap of the EVM gateway with all the engines.
// Run is a blocking call, but it does signal readiness of the service
// through a channel provided as an argument.
func Run(ctx context.Context, cfg config.Config, ready component.ReadyFunc) error {
boot, err := New(cfg)
if err != nil {
return err
}

func (b *Bootstrap) Run(
ctx context.Context,
cfg config.Config,
ready component.ReadyFunc,
) error {
// Start the API Server first, to avoid any races with incoming
// EVM events, that might affect the starting state.
if err := boot.StartAPIServer(ctx); err != nil {
if err := b.StartAPIServer(ctx); err != nil {
return fmt.Errorf("failed to start API server: %w", err)
}

if err := boot.StartEventIngestion(ctx); err != nil {
if err := b.StartEventIngestion(ctx); err != nil {
return fmt.Errorf("failed to start event ingestion engine: %w", err)
}

if err := boot.StartMetricsServer(ctx); err != nil {
if err := b.StartMetricsServer(ctx); err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}

if err := boot.StartProfilerServer(ctx); err != nil {
if err := b.StartProfilerServer(ctx); err != nil {
return fmt.Errorf("failed to start profiler server: %w", err)
}

// mark ready
ready()

return nil
}

func (b *Bootstrap) Stop() {
b.logger.Info().Msg("bootstrap received context cancellation, stopping services")

b.StopEventIngestion()
b.StopMetricsServer()
b.StopAPIServer()
b.StopClient()
b.StopDB()
}

// Run will run complete bootstrap of the EVM gateway with all the engines.
// Run is a blocking call, but it does signal readiness of the service
// through a channel provided as an argument.
func Run(ctx context.Context, cfg config.Config, ready component.ReadyFunc) error {
boot, err := New(cfg)
if err != nil {
return err
}

if err := boot.Run(ctx, cfg, ready); err != nil {
return err
}

// if context is canceled start shutdown
<-ctx.Done()
boot.logger.Warn().Msg("bootstrap received context cancellation, stopping services")

boot.StopEventIngestion()
boot.StopMetricsServer()
boot.StopAPIServer()
boot.StopClient()
boot.StopDB()
boot.Stop()

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/onflow/atree v0.9.0
github.com/onflow/cadence v1.3.1
github.com/onflow/flow-go v0.38.1-0.20250213171922-77f4db56bb54
github.com/onflow/flow-go v0.38.1-0.20250218174738-2181389f9f7d
github.com/onflow/flow-go-sdk v1.3.1
github.com/onflow/go-ethereum v1.14.7
github.com/prometheus/client_golang v1.20.5
Expand Down Expand Up @@ -135,13 +135,13 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/onflow/crypto v0.25.2 // indirect
github.com/onflow/flow-core-contracts/lib/go/contracts v1.4.0 // indirect
github.com/onflow/flow-core-contracts/lib/go/templates v1.4.0 // indirect
github.com/onflow/flow-core-contracts/lib/go/contracts v1.5.1-preview // indirect
github.com/onflow/flow-core-contracts/lib/go/templates v1.5.1-preview // indirect
github.com/onflow/flow-ft/lib/go/contracts v1.0.1 // indirect
github.com/onflow/flow-ft/lib/go/templates v1.0.1 // indirect
github.com/onflow/flow-nft/lib/go/contracts v1.2.2 // indirect
github.com/onflow/flow-nft/lib/go/contracts v1.2.3 // indirect
github.com/onflow/flow-nft/lib/go/templates v1.2.1 // indirect
github.com/onflow/flow/protobuf/go/flow v0.4.7 // indirect
github.com/onflow/flow/protobuf/go/flow v0.4.9 // indirect
github.com/onflow/sdks v0.6.0-preview.1 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/gomega v1.18.1 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -520,24 +520,24 @@ github.com/onflow/cadence v1.3.1 h1:bs9TFHQy8HHbwTtCtg5cLdyndWhmwq55RSwID1cb220=
github.com/onflow/cadence v1.3.1/go.mod h1:6/47FljVAdl3/31tShI8JOJW0sXYZHK1PwXkE+yk0qA=
github.com/onflow/crypto v0.25.2 h1:GjHunqVt+vPcdqhxxhAXiMIF3YiLX7gTuTR5O+VG2ns=
github.com/onflow/crypto v0.25.2/go.mod h1:fY7eLqUdMKV8EGOw301unP8h7PvLVy8/6gVR++/g0BY=
github.com/onflow/flow-core-contracts/lib/go/contracts v1.4.0 h1:R86HaOuk6vpuECZnriEUE7bw9inC2AtdSn8lL/iwQLQ=
github.com/onflow/flow-core-contracts/lib/go/contracts v1.4.0/go.mod h1:9asTBnB6Tw2UlVVtQKyS/egYv3xr4zVlJnJ75z1dfac=
github.com/onflow/flow-core-contracts/lib/go/templates v1.4.0 h1:u2DAG8pk0xFH7TwS70t1gSZ/FtIIZWMSNyiu4SeXBYg=
github.com/onflow/flow-core-contracts/lib/go/templates v1.4.0/go.mod h1:pN768Al/wLRlf3bwugv9TyxniqJxMu4sxnX9eQJam64=
github.com/onflow/flow-core-contracts/lib/go/contracts v1.5.1-preview h1:W+QkNQcIbhtR+zXVROKq0bdDEnvzUfUrQrCmegmwzvc=
github.com/onflow/flow-core-contracts/lib/go/contracts v1.5.1-preview/go.mod h1:LyCICUK6sK1jtEyb+3GuRw5tYfHT1uxACLwLTLxw/0I=
github.com/onflow/flow-core-contracts/lib/go/templates v1.5.1-preview h1:C0PraQFfwpav4nJAf/RPE9BJyYD6lUMvt+cJyiMDeis=
github.com/onflow/flow-core-contracts/lib/go/templates v1.5.1-preview/go.mod h1:pN768Al/wLRlf3bwugv9TyxniqJxMu4sxnX9eQJam64=
github.com/onflow/flow-ft/lib/go/contracts v1.0.1 h1:Ts5ob+CoCY2EjEd0W6vdLJ7hLL3SsEftzXG2JlmSe24=
github.com/onflow/flow-ft/lib/go/contracts v1.0.1/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A=
github.com/onflow/flow-ft/lib/go/templates v1.0.1 h1:FDYKAiGowABtoMNusLuRCILIZDtVqJ/5tYI4VkF5zfM=
github.com/onflow/flow-ft/lib/go/templates v1.0.1/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE=
github.com/onflow/flow-go v0.38.1-0.20250213171922-77f4db56bb54 h1:JizTOEQ6ME++LOEqOmKUxtM0Q8tpHUvVJBhqBrgDeE0=
github.com/onflow/flow-go v0.38.1-0.20250213171922-77f4db56bb54/go.mod h1:p88l1DhHObIUr+8b0zqkwH0bklZmQksb6IFH8ZaL1Bg=
github.com/onflow/flow-go v0.38.1-0.20250218174738-2181389f9f7d h1:XRefc4rcBjGDEqsj3OB6XjSjSeYwMtysl7jmaLYpg+s=
github.com/onflow/flow-go v0.38.1-0.20250218174738-2181389f9f7d/go.mod h1:VS7MlNHZeDrGm9/jkuMCSvAQTLFXpzQD0BIMB8/QYB8=
github.com/onflow/flow-go-sdk v1.3.1 h1:2YdTL/R1/DjMYYmyKgArTeQ93GKvLlfCeCpMVH7b8q4=
github.com/onflow/flow-go-sdk v1.3.1/go.mod h1:0rMuCLShdX9F4pLBCPhlMGCFu8gu9SfiXT/Lc9qAi24=
github.com/onflow/flow-nft/lib/go/contracts v1.2.2 h1:XFERNVUDGbZ4ViZjt7P1cGD80mO1PzUJYPfdhXFsGbQ=
github.com/onflow/flow-nft/lib/go/contracts v1.2.2/go.mod h1:eZ9VMMNfCq0ho6kV25xJn1kXeCfxnkhj3MwF3ed08gY=
github.com/onflow/flow-nft/lib/go/contracts v1.2.3 h1:4ju20g1xgDKWBT63rOj5f/Sa4Lc+naCSWT4p31x9yQk=
github.com/onflow/flow-nft/lib/go/contracts v1.2.3/go.mod h1:eZ9VMMNfCq0ho6kV25xJn1kXeCfxnkhj3MwF3ed08gY=
github.com/onflow/flow-nft/lib/go/templates v1.2.1 h1:SAALMZPDw9Eb9p5kSLnmnFxjyig1MLiT4JUlLp0/bSE=
github.com/onflow/flow-nft/lib/go/templates v1.2.1/go.mod h1:W6hOWU0xltPqNpv9gQX8Pj8Jtf0OmRxc1XX2V0kzJaI=
github.com/onflow/flow/protobuf/go/flow v0.4.7 h1:iP6DFx4wZ3ETORsyeqzHu7neFT3d1CXF6wdK+AOOjmc=
github.com/onflow/flow/protobuf/go/flow v0.4.7/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk=
github.com/onflow/flow/protobuf/go/flow v0.4.9 h1:UfsWWqj6VQbEHvaw8kSGvIawCpEfz3gOGZfcdugNxVE=
github.com/onflow/flow/protobuf/go/flow v0.4.9/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk=
github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc=
github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I=
github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc=
Expand Down
159 changes: 82 additions & 77 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,84 @@ import (
"github.com/rs/zerolog"
)

var apiErrors = prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("api_errors_total"),
Help: "Total number of API errors",
})

var serverPanicsCounters = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prefixedName("api_server_panics_total"),
Help: "Total number of panics in the API server",
}, []string{"reason"})

var operatorBalance = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("operator_balance"),
Help: "Flow balance of the EVM gateway operator wallet",
})

var cadenceBlockHeight = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("cadence_block_height"),
Help: "Current Cadence block height",
})

var evmBlockHeight = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("evm_block_height"),
Help: "Current EVM block height",
})

var evmBlockIndexedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("blocks_indexed_total"),
Help: "Total number of blocks indexed",
})

var evmTxIndexedCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("txs_indexed_total"),
Help: "Total number transactions indexed",
})

var evmAccountCallCounters = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prefixedName("evm_account_interactions_total"),
Help: "Total number of account interactions",
}, []string{"address"})

// TODO: Think of adding 'status_code'
var requestDurations = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prefixedName("api_request_duration_seconds"),
Help: "Duration of the request made a specific API endpoint",
Buckets: prometheus.DefBuckets,
}, []string{"method"})

var availableSigningKeys = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("available_signing_keys"),
Help: "Number of keys available for transaction signing",
})

var gasEstimationIterations = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("gas_estimation_iterations"),
Help: "Number of iterations taken to estimate the gas of a EVM call/tx",
})

var blockIngestionTime = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prefixedName("block_ingestion_time_seconds"),
Help: "Time taken to fully ingest an EVM block in the local state index",
Buckets: prometheus.DefBuckets,
})

var metrics = []prometheus.Collector{
apiErrors,
serverPanicsCounters,
cadenceBlockHeight,
evmBlockHeight,
evmBlockIndexedCounter,
evmTxIndexedCounter,
operatorBalance,
evmAccountCallCounters,
requestDurations,
availableSigningKeys,
gasEstimationIterations,
blockIngestionTime,
}

type Collector interface {
ApiErrorOccurred()
ServerPanicked(reason string)
Expand Down Expand Up @@ -42,83 +120,6 @@ type DefaultCollector struct {
}

func NewCollector(logger zerolog.Logger) Collector {
apiErrors := prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("api_errors_total"),
Help: "Total number of API errors",
})

serverPanicsCounters := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prefixedName("api_server_panics_total"),
Help: "Total number of panics in the API server",
}, []string{"reason"})

operatorBalance := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("operator_balance"),
Help: "Flow balance of the EVM gateway operator wallet",
})

cadenceBlockHeight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("cadence_block_height"),
Help: "Current Cadence block height",
})

evmBlockHeight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("evm_block_height"),
Help: "Current EVM block height",
})

evmBlockIndexedCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("blocks_indexed_total"),
Help: "Total number of blocks indexed",
})

evmTxIndexedCounter := prometheus.NewCounter(prometheus.CounterOpts{
Name: prefixedName("txs_indexed_total"),
Help: "Total number transactions indexed",
})

evmAccountCallCounters := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: prefixedName("evm_account_interactions_total"),
Help: "Total number of account interactions",
}, []string{"address"})

// TODO: Think of adding 'status_code'
requestDurations := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: prefixedName("api_request_duration_seconds"),
Help: "Duration of the request made a specific API endpoint",
Buckets: prometheus.DefBuckets,
}, []string{"method"})

availableSigningKeys := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("available_signing_keys"),
Help: "Number of keys available for transaction signing",
})

gasEstimationIterations := prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("gas_estimation_iterations"),
Help: "Number of iterations taken to estimate the gas of a EVM call/tx",
})

blockIngestionTime := prometheus.NewHistogram(prometheus.HistogramOpts{
Name: prefixedName("block_ingestion_time_seconds"),
Help: "Time taken to fully ingest an EVM block in the local state index",
Buckets: prometheus.DefBuckets,
})

metrics := []prometheus.Collector{
apiErrors,
serverPanicsCounters,
cadenceBlockHeight,
evmBlockHeight,
evmBlockIndexedCounter,
evmTxIndexedCounter,
operatorBalance,
evmAccountCallCounters,
requestDurations,
availableSigningKeys,
gasEstimationIterations,
blockIngestionTime,
}
if err := registerMetrics(logger, metrics...); err != nil {
logger.Info().Msg("using noop collector as metric register failed")
return NopCollector
Expand All @@ -142,6 +143,10 @@ func NewCollector(logger zerolog.Logger) Collector {

func registerMetrics(logger zerolog.Logger, metrics ...prometheus.Collector) error {
for _, m := range metrics {
// During E2E tests, the EVM GW might be bootstrapped again
// and again, so we make sure to register the metrics on a
// clean state.
prometheus.Unregister(m)
if err := prometheus.Register(m); err != nil {
logger.Err(err).Msg("failed to register metric")
return err
Expand Down
21 changes: 17 additions & 4 deletions storage/pebble/register_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,23 @@ func newLookupKey(height uint64, key []byte) *lookupKey {
return &lookupKey
}

// GetSnapshotAt returns a snapshot of the register index at the given block height.
// the snapshot has a cache. Nil values are cached.
func (r *RegisterStorage) GetSnapshotAt(evmBlockHeight uint64) (types.BackendStorageSnapshot, error) {
return NewStorageSnapshot(r.Get, evmBlockHeight), nil
// GetSnapshotAt returns a snapshot of the register index at the start of the
// given block height (which is the end of the previous block).
// The snapshot has a cache. Nil values are cached.
func (r *RegisterStorage) GetSnapshotAt(
evmBlockHeightOfStartStateToQuery uint64,
) (types.BackendStorageSnapshot, error) {
var snapshotHeightOfEndState uint64
if evmBlockHeightOfStartStateToQuery > 0 {
// `evmBlockHeightOfStartStateToQuery-1` to get the end state of the previous block.
snapshotHeightOfEndState = evmBlockHeightOfStartStateToQuery - 1
} else {
// Avoid a possible underflow
snapshotHeightOfEndState = uint64(0)
}

// NewStorageSnapshot return the end state of a given height.
return NewStorageSnapshot(r.Get, snapshotHeightOfEndState), nil
}

func registerOwnerMismatch(expected flow.Address, owner flow.Address) error {
Expand Down
Loading
Loading