-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make ingestion of EVM events idempotent #760
Conversation
WalkthroughThis pull request modifies the snapshot retrieval logic in the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, but let's wait for the merged commit in flow-go before merging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
tests/integration_test.go (3)
507-544
: Add comments to explain the test scenario.Consider adding comments to explain:
- Why we're submitting this specific number of transactions
- What we're testing with the controlled block mining
- The expected behavior of transaction receipts in this scenario
totalTxs := keyCount*5 + 3 + // Submit more transactions than available keys to verify key rotation + // and transaction processing under load hashes := make([]common.Hash, totalTxs) nonce := uint64(0)
545-565
: Add validation for the ingestion process.The test should verify that the ingestion process is working correctly after restart by checking:
- The current block height
- The state of the ingested events
<-ready2 time.Sleep(3 * time.Second) // some time to startup + + // Verify that the ingestion process started from the correct block + currentBlock, err := ethClient.BlockNumber(ctx) + require.NoError(t, err) + assert.Greater(t, currentBlock, uint64(0), "Block height should be greater than 0")
566-576
: Improve the assertion message for clarity.The current error message "all transactions were not executed" is ambiguous. Consider making it more specific to the idempotency test case.
- }, time.Second*15, time.Second*1, "all transactions were not executed") + }, time.Second*15, time.Second*1, "transactions should remain valid after service restart with earlier start height")bootstrap/bootstrap.go (2)
671-698
: Handle partial initialization failuresIf an error occurs after successfully starting one or more components (e.g. the API server), consider stopping any previously started components before returning to avoid leaving orphan processes. You might call
b.Stop()
or define a partial-rollback mechanism upon encountering an error in the startup chain.
710-726
: Consider graceful cleanup on Run errorsIf
boot.Run()
fails, the function immediately returns without callingboot.Stop()
, potentially leaving allocated resources unclosed. To prevent partial-leak scenarios, callboot.Stop()
in adefer
or in an error-handling block to ensure graceful cleanup.metrics/collector.go (1)
147-149
: Skip only the conflicting metric registration instead of allReturning on the first
AlreadyRegisteredError
halts registration of subsequent metrics. If you'd like to continue registering remaining metrics, consider individually ignoring the conflict rather than returning.Here is a possible modification:
if err := prometheus.Register(m); err != nil { - if errors.As(err, &prometheus.AlreadyRegisteredError{}) { - return nil - } + var are *prometheus.AlreadyRegisteredError + if errors.As(err, &are) { + logger.Warn().Msgf("metric %s already registered, skipping", are.ExistingCollector) + continue + } logger.Err(err).Msg("failed to register metric") return err }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
bootstrap/bootstrap.go
(3 hunks)metrics/collector.go
(2 hunks)storage/pebble/register_storage.go
(1 hunks)tests/integration_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- storage/pebble/register_storage.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Test
🔇 Additional comments (5)
tests/integration_test.go (1)
441-506
: LGTM! Test setup follows established patterns.The test setup is well-structured and consistent with other integration tests in the file.
bootstrap/bootstrap.go (3)
210-210
: Good clarity in the log messageChanging the message to reference the API server accurately aligns the log with the actual operation.
409-409
: Confirm intentional omission of metrics server completionCommenting out
<-b.metrics.Done()
prevents waiting on the metrics server to exit gracefully, which might lead to unpredictable termination. Consider confirming that this is desired, as it can introduce concurrency risks if the metrics server isn’t fully stopped.
700-708
: Validate stopping sequenceStopping the event ingestion first is valid to prevent further writes, but you may consider keeping the metrics server active until all other shutdown tasks complete, so it can record final teardown metrics. Ensure the order here meets the intended architectural constraints.
metrics/collector.go (1)
4-4
: Import is validImporting the
errors
package is necessary to handle the newly introduced error check.
8fc6281
to
1d16241
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
go.sum
is excluded by!**/*.sum
tests/go.sum
is excluded by!**/*.sum
📒 Files selected for processing (7)
api/debug.go
(1 hunks)bootstrap/bootstrap.go
(2 hunks)go.mod
(1 hunks)metrics/collector.go
(2 hunks)storage/pebble/register_storage.go
(1 hunks)tests/go.mod
(1 hunks)tests/integration_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- api/debug.go
- metrics/collector.go
- tests/go.mod
- storage/pebble/register_storage.go
- go.mod
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you highlight the change that make the ingestion idempotent. I didn't quite get how it works
storage/pebble/register_storage.go
Outdated
func (r *RegisterStorage) GetSnapshotAt(evmBlockHeight uint64) (types.BackendStorageSnapshot, error) { | ||
return NewStorageSnapshot(r.Get, evmBlockHeight), nil | ||
// `evmBlockHeight-1` to get the end state of the previous block. | ||
return NewStorageSnapshot(r.Get, evmBlockHeight-1), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we let the caller do the -1
and deals with potential underflow? So the GetSnapshotAt
always return the end state. I think end state is easy to think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the interface defines it as the start of block state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for handling the underflow. it would only happen on block 0, but that happens when syncing from the start or when starting the emulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I found it's easy it can easily lead to confusion.
If we can't change the interface name into GetStartStateSnapshotAt
, at least we should put it into the argument name evmBlockHeightOfStartStateToQuery
. I don't mind being verbose here, but I think it's important to raise attention for the caller to ensure the correct value gets passed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in ea6e833
go.mod
Outdated
@@ -8,7 +8,7 @@ require ( | |||
github.com/hashicorp/go-multierror v1.1.1 | |||
github.com/onflow/atree v0.9.0 | |||
github.com/onflow/cadence v1.3.1 | |||
github.com/onflow/flow-go v0.38.1-0.20250213171922-77f4db56bb54 | |||
github.com/onflow/flow-go v0.38.1-0.20250218133611-bca6e7286932 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's update this with the commit from master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in c0a8880 .
Note that I used the merge commit from master, not the commit hash itself.
I also updated the flow-emulator
dependency to the merge commit on master.
metrics/collector.go
Outdated
@@ -143,6 +144,9 @@ func NewCollector(logger zerolog.Logger) Collector { | |||
func registerMetrics(logger zerolog.Logger, metrics ...prometheus.Collector) error { | |||
for _, m := range metrics { | |||
if err := prometheus.Register(m); err != nil { | |||
if errors.As(err, &prometheus.AlreadyRegisteredError{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems fine, but also points to a bug where we're registering the same metrics more than once. any idea why you needed to add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a fix for this in 71996c2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The EVM GW can be bootstrapped again & again during E2E tests, so the metrics were registered more than once.
storage/pebble/register_storage.go
Outdated
func (r *RegisterStorage) GetSnapshotAt(evmBlockHeight uint64) (types.BackendStorageSnapshot, error) { | ||
return NewStorageSnapshot(r.Get, evmBlockHeight), nil | ||
// `evmBlockHeight-1` to get the end state of the previous block. | ||
return NewStorageSnapshot(r.Get, evmBlockHeight-1), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the interface defines it as the start of block state
storage/pebble/register_storage.go
Outdated
func (r *RegisterStorage) GetSnapshotAt(evmBlockHeight uint64) (types.BackendStorageSnapshot, error) { | ||
return NewStorageSnapshot(r.Get, evmBlockHeight), nil | ||
// `evmBlockHeight-1` to get the end state of the previous block. | ||
return NewStorageSnapshot(r.Get, evmBlockHeight-1), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for handling the underflow. it would only happen on block 0, but that happens when syncing from the start or when starting the emulator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
tests/integration_test.go (1)
551-571
: 🛠️ Refactor suggestionEnhance idempotency verification after restart.
The test could be improved by verifying additional aspects of idempotency:
- Event logs to ensure no duplicate events
- Transaction counts to ensure no duplicate transactions
- State consistency checks
This is a duplicate of a previous review comment. The suggested improvements remain valid and should be implemented to ensure comprehensive idempotency testing.
🧹 Nitpick comments (6)
tests/integration_test.go (3)
478-492
: Consider adding test cases for different validation modes.The test only verifies idempotency with
LocalIndexValidation
. Consider parameterizing the test to verify idempotency with bothLocalIndexValidation
andTxSealValidation
modes.func Test_ForceStartHeightIdempotency(t *testing.T) { + testCases := []struct { + name string + validation config.TxValidation + }{ + {"LocalIndexValidation", config.LocalIndexValidation}, + {"TxSealValidation", config.TxSealValidation}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Move existing test code here + cfg.TxStateValidation = tc.validation + // ... rest of the test + }) + } }
572-583
: Add timeout configuration for assertions.The hardcoded timeout values could be made configurable to accommodate different test environments.
+const ( + defaultAssertTimeout = 15 * time.Second + defaultAssertInterval = 1 * time.Second +) assert.Eventually(t, func() bool { // ... existing code ... -}, time.Second*15, time.Second*1, "all transactions were not executed") +}, defaultAssertTimeout, defaultAssertInterval, "all transactions were not executed")
498-550
: Consider adding error scenarios.The test only covers the happy path. Consider adding test cases for error scenarios such as:
- Network interruptions during restart
- Invalid ForceStartHeight values
- Database corruption scenarios
Example error scenario:
+func Test_ForceStartHeightIdempotencyErrors(t *testing.T) { + // ... setup code ... + + // Test invalid ForceStartHeight + cfg.ForceStartCadenceHeight = 999999 // Set to a height that doesn't exist + boot, err = bootstrap.New(cfg) + require.NoError(t, err) + + err = boot.Run(ctx, cfg, func() {}) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid start height") +}metrics/collector.go (3)
47-50
: Consider limiting cardinality of address labels.The
address
label inevmAccountCallCounters
could lead to high cardinality as there could be millions of unique EVM addresses. Consider implementing one or more of these strategies:
- Add a limit to the number of unique addresses tracked
- Use address ranges or categories instead of raw addresses
- Implement label value aggregation for less frequent addresses
52-57
: Consider implementing the status code label.The TODO comment suggests adding a 'status_code' label to request durations. This would be valuable for:
- Distinguishing between successful and failed requests
- Identifying specific error patterns
- Setting up meaningful alerting rules
Would you like me to help implement the status code label for the request duration metric?
146-149
: Document the idempotent registration behavior.The unregister-before-register pattern makes the metrics registration idempotent, which is good. However, this behavior should be documented to prevent confusion about potential metric gaps during re-registration.
Add a comment explaining the behavior:
// During E2E tests, the EVM GW might be bootstrapped again // and again, so we make sure to register the metrics on a // clean state. +// Note: This makes metric registration idempotent but may +// cause brief gaps in metric collection during re-registration. prometheus.Unregister(m)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
metrics/collector.go
(2 hunks)tests/integration_test.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Test
🔇 Additional comments (2)
tests/integration_test.go (1)
441-447
: LGTM! Well-documented test purpose.The test documentation clearly outlines the key aspects of idempotency being verified.
metrics/collector.go (1)
75-88
: LGTM!The metrics slice is well-organized and includes all declared metrics.
Co-authored-by: Janez Podhostnik <[email protected]>
Co-authored-by: Janez Podhostnik <[email protected]>
Co-authored-by: Peter Argue <[email protected]>
Co-authored-by: Leo Zhang <[email protected]>
b92120d
to
951734b
Compare
|
||
<-ready | ||
|
||
time.Sleep(3 * time.Second) // some time to startup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure this is needed. It should be ready at this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
tests/integration_test.go (1)
572-582
: 🛠️ Refactor suggestionEnhance idempotency verification.
The current test only verifies transaction receipts. Consider adding additional checks to ensure complete idempotency:
- Event verification
- State verification
- Transaction count verification
The previous review already suggested these improvements. Please refer to the earlier comment for the implementation details.
🧹 Nitpick comments (3)
metrics/collector.go (1)
52-58
: Consider implementing the TODO suggestion.Adding a 'status_code' label to the request duration metric would provide valuable insights into API performance across different response types.
Would you like me to help implement this enhancement? Here's what we could track:
- Success responses (2xx)
- Client errors (4xx)
- Server errors (5xx)
tests/integration_test.go (2)
478-492
: Consider adding test coverage for different validation modes.The test only covers
LocalIndexValidation
mode. Since idempotency is critical for data consistency, consider parameterizing the test to cover bothTxSealValidation
andLocalIndexValidation
modes.func Test_ForceStartHeightIdempotency(t *testing.T) { + testCases := []struct { + name string + validationMode config.TxValidationMode + }{ + {"LocalIndexValidation", config.LocalIndexValidation}, + {"TxSealValidation", config.TxSealValidation}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Move existing test code here + cfg.TxStateValidation = tc.validationMode + // ... + }) + } }
551-555
: Consider testing multiple restart scenarios.The test only verifies restarting from block height 1. Consider testing additional scenarios:
- Restart from the middle of the block range
- Restart from the latest block
+ testHeights := []uint64{ + 1, // Start + totalTxs / 2, // Middle + totalTxs, // End + } + for _, height := range testHeights { - cfg.ForceStartCadenceHeight = 1 + cfg.ForceStartCadenceHeight = height + t.Logf("Testing restart from height %d", height) // ... rest of the test + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
go.sum
is excluded by!**/*.sum
tests/go.sum
is excluded by!**/*.sum
📒 Files selected for processing (7)
api/debug.go
(1 hunks)bootstrap/bootstrap.go
(2 hunks)go.mod
(2 hunks)metrics/collector.go
(2 hunks)storage/pebble/register_storage.go
(1 hunks)tests/go.mod
(2 hunks)tests/integration_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- api/debug.go
- storage/pebble/register_storage.go
- bootstrap/bootstrap.go
- go.mod
- tests/go.mod
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Test
🔇 Additional comments (4)
metrics/collector.go (3)
12-74
: Well-structured metric definitions!The metric declarations follow Prometheus best practices with:
- Clear naming conventions with consistent prefixing
- Descriptive help text
- Appropriate metric types for different use cases
75-88
: LGTM!The metrics slice is complete and well-organized, containing all declared metrics.
146-149
: Good solution for handling metric re-registration!This change addresses the issue discussed in previous reviews where metrics were being registered multiple times during E2E tests. The unregistration step ensures idempotent metric registration.
tests/integration_test.go (1)
441-447
: LGTM! Well-documented test function.The test documentation clearly outlines the purpose and verification criteria for idempotency.
Depends on: onflow/flow-go#7050
Work towards: onflow/flow-go#6962
Description
For contributor use:
master
branchFiles changed
in the Github PR explorerSummary by CodeRabbit
Summary by CodeRabbit
Bug Fixes
Chores
New Features
Tests
ForceStartHeight
configuration.