-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: async filter & subscription system #82
Conversation
Warning Rate limit exceeded@beer-1 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 18 minutes and 20 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe pull request introduces significant changes to enhance log handling and event emission within the Changes
Possibly related PRs
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🧹 Outside diff range and nitpick comments (9)
jsonrpc/namespaces/eth/filters/subscriptions.go (2)
Line range hint
131-135
: Consider batching log notifications for better performance.Processing and notifying logs individually could be inefficient for large log sets. Consider batching notifications:
-for _, log := range logs { - log := log - _ = notifier.Notify(rpcSub.ID, &log) +if len(logs) > 0 { + _ = notifier.Notify(rpcSub.ID, logs) }
Line range hint
200-223
: Consider adding timeout to uninstallation.While the uninstallation logic prevents deadlocks, it could potentially hang if the event loop is not responsive. Consider adding a timeout:
func (api *FilterAPI) uninstallSubscription(s *subscription) { s.unsubOnce.Do(func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() uninstallLoop: for { select { case api.uninstall <- s: break uninstallLoop + case <-ctx.Done(): + return case <-s.logsChan: // ... other cases ... } } - <-s.err + select { + case <-s.err: + case <-ctx.Done(): + } }) }indexer/indexer.go (2)
142-146
: Enhance struct documentation.While the struct is well-designed, consider adding more detailed documentation to explain:
- The purpose and lifecycle of this struct
- The relationship between header and logs
- Whether logs can be nil/empty
-// blockEvents is a struct to emit block events. +// blockEvents bundles a block header with its associated logs for event emission. +// The header represents the block metadata while logs contain an array of log arrays, +// where each inner array typically represents logs from a single transaction. type blockEvents struct { header *coretypes.Header logs [][]*coretypes.Log }
141-141
: Improve subscription management.The current subscription system could benefit from better lifecycle management:
- Add mutex protection for channel slice modifications
- Implement unsubscribe functionality
- Add channel cleanup mechanism
Consider implementing a subscription manager:
type subscription struct { id int64 blockChan chan *coretypes.Header logsChan chan []*coretypes.Log pendingChan chan *rpctypes.RPCTransaction done chan struct{} } func (e *EVMIndexerImpl) Subscribe() (*subscription, error) { e.mu.Lock() defer e.mu.Unlock() sub := &subscription{ id: e.nextSubID(), blockChan: make(chan *coretypes.Header), logsChan: make(chan []*coretypes.Log), pendingChan: make(chan *rpctypes.RPCTransaction), done: make(chan struct{}), } e.subscriptions[sub.id] = sub return sub, nil } func (s *subscription) Unsubscribe() { close(s.done) close(s.blockChan) close(s.logsChan) close(s.pendingChan) }indexer/abci.go (1)
181-182
: LGTM: Improved log handling with batch collection.The change from immediate emission to batch collection is a good architectural improvement that can reduce channel contention and improve performance, especially under high load.
Consider adding metrics to track:
- Average number of logs per block
- Time spent in log collection vs emission
This would help in future optimizations of the batch size and emission strategy.jsonrpc/namespaces/eth/filters/filter.go (1)
Line range hint
134-171
: LGTM! Consider enhancing documentation for async operations.The async log retrieval implementation is robust with proper error handling, context cancellation, and efficient channel management. However, the async behavior could be better documented.
Consider adding documentation comments explaining:
- Channel buffer sizes and their implications
- Cleanup guarantees for goroutines
- Expected behavior on context cancellation
Example documentation:
// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously, // it creates and returns two channels: one for delivering log data, and one for reporting errors. +// The returned channels are unbuffered and will be closed when: +// - All logs have been processed +// - An error occurs during processing +// - The provided context is cancelled +// The caller must consume from both channels to prevent goroutine leaks. func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *coretypes.Log, chan error) {jsonrpc/backend/tx.go (2)
Line range hint
183-187
: Ensure consistent error handling across all return pathsThe method has multiple error return paths with different behaviors:
- Some paths return
(nil, nil)
for not found cases- Others return
(nil, err)
for errorsConsider standardizing the error handling to make the behavior more predictable for callers.
Line range hint
119-187
: Consider architectural implications for subscription systemThe changes to block height handling could have implications for the async filter & subscription system:
- Silent fallback to height 0 might affect event filtering accuracy
- Subscription systems need to be aware of this behavior
- Consider documenting this behavior in the subscription system documentation
jsonrpc/namespaces/eth/filters/api.go (1)
40-42
: Consider renaming field 's' to 'subscription' for clarityUsing a more descriptive field name than
s
can improve code readability. Suggest renamings
tosubscription
.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
- indexer/abci.go (3 hunks)
- indexer/indexer.go (1 hunks)
- jsonrpc/backend/filters.go (0 hunks)
- jsonrpc/backend/tx.go (2 hunks)
- jsonrpc/config/config.go (0 hunks)
- jsonrpc/namespaces/eth/filters/api.go (8 hunks)
- jsonrpc/namespaces/eth/filters/filter.go (1 hunks)
- jsonrpc/namespaces/eth/filters/subscriptions.go (5 hunks)
💤 Files with no reviewable changes (2)
- jsonrpc/backend/filters.go
- jsonrpc/config/config.go
🔇 Additional comments (9)
jsonrpc/namespaces/eth/filters/subscriptions.go (2)
21-27
: Well-structured channel organization!The separation of channels into "listening" and "lifecycle" groups with clear documentation improves code readability and maintainability.
Line range hint
15-28
: Verify concurrent access safety.The subscription struct contains multiple channels that are accessed from different goroutines. Let's verify there are no race conditions in the codebase:
✅ Verification successful
Let me gather more context about how these subscriptions are handled by looking at the implementation details.
This will help us understand:
- How subscriptions are installed and managed
- How the channels are used across goroutines
- The lifecycle management of subscriptions
Let me gather one final piece of information about the subscription installation and mutex usage to complete the analysis.
This will help us understand:
- How the mutex protects subscription operations
- The complete subscription installation process
Based on the gathered information, I can now provide a final response about the concurrent access safety of the subscription struct.
Concurrent access is properly handled - no safety concerns
The subscription struct's concurrent access is safely managed through several mechanisms:
- The
filtersMut sync.Mutex
protects all filter and subscription operations in theFilterAPI
- Channel operations are naturally thread-safe
- The
unsubOnce sync.Once
ensures single execution of unsubscribe operations- Subscription lifecycle is well-managed:
- Installation is protected by mutex
- Each subscription runs in its own goroutine
- Uninstallation is coordinated through channels and mutex
The design follows Go's concurrency patterns correctly by:
- Using channels for communication
- Protecting shared state with mutex
- Ensuring clean teardown with sync.Once
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for other goroutines accessing subscription fields rg -A 3 "go func" | grep -B 2 "s\." # Search for mutex usage in subscription-related code rg "sync\.Mutex|sync\.RWMutex" --type goLength of output: 1090
Script:
#!/bin/bash # Check subscription installation and uninstallation ast-grep --pattern 'func $_ installSubscription($_) { $$$ }' # Check how the channels are used rg -B 2 -A 5 "headerChan|logsChan|txChan|hashChan" --type go # Look for subscription creation and cleanup rg -B 2 -A 5 "NewSubscription|uninstallSubscription" --type goLength of output: 41448
Script:
#!/bin/bash # Check how subscriptions are installed and managed with mutex rg -B 3 -A 5 "filtersMut" --type go # Look for subscription installation ast-grep --pattern 'func $_ install($_) { $$$ }'Length of output: 9158
indexer/abci.go (2)
149-149
: LGTM: Efficient log collection initialization.The pre-allocation of capacity for
blockLogs
is an optimization that prevents slice reallocation during append operations.
195-196
:⚠️ Potential issueConsider improving goroutine management and error handling.
The asynchronous event emission could benefit from several improvements:
- Context propagation for cancellation
- Error handling for the asynchronous operation
- Channel backpressure handling
- Race condition prevention
Consider applying these improvements:
- // emit block event in a goroutine - go e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: blockLogs}) + // emit block event in a goroutine with proper context and error handling + eventsChan := make(chan error, 1) + go func() { + defer close(eventsChan) + select { + case eventsChan <- e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: append([][]*coretypes.Log{}, blockLogs...)}): + case <-ctx.Done(): + eventsChan <- ctx.Err() + } + }() + + // Optional: handle errors from the goroutine + go func() { + if err := <-eventsChan; err != nil { + e.logger.Error("block events emission failed", "err", err) + } + }()This implementation:
- Propagates context for proper cancellation
- Provides error handling
- Creates a defensive copy of blockLogs to prevent race conditions
- Uses a buffered error channel to prevent goroutine leaks
Let's verify the event emission implementation:
jsonrpc/namespaces/eth/filters/filter.go (1)
73-73
: Consider adding safeguards for unbounded range queries.The removal of
blockLimit
parameter could potentially lead to unbounded resource usage when querying large block ranges. While this simplifies the API, it might expose the system to resource exhaustion.Let's check if there are any other rate limiting or protection mechanisms in place:
Consider implementing one of these safeguards:
- Add rate limiting at the API layer
- Implement pagination for large result sets
- Add configurable system-wide limits for maximum block range
jsonrpc/backend/tx.go (1)
119-127
: Consider explicit error handling for invalid block heightsThe current implementation silently falls back to height 0 when the requested height exceeds the latest block. This could mask issues where callers expect data from a specific height. Consider:
- Explicitly returning an error for heights that exceed the latest block
- Adding a comment explaining the fallback behavior if it's intentional
- Adding protection against potential race conditions between height check and context creation
Let's verify if this fallback behavior is consistent with other height-related methods:
jsonrpc/namespaces/eth/filters/api.go (3)
94-115
: Efficient filter cleanup with ticker and delayed unsubscriptionThe introduction of a ticker for periodic checks and processing unsubscriptions outside the lock improves performance and avoids potential deadlocks.
135-136
: Improved concurrency by deferring log filtering to subscription goroutineMoving log filtering into the subscription goroutine reduces contention and improves performance.
389-392
: Properly uninstalling associated subscriptionsEnsuring the associated subscription is uninstalled when removing a filter prevents potential resource leaks.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #82 +/- ##
==========================================
- Coverage 28.06% 27.85% -0.21%
==========================================
Files 124 124
Lines 13676 13777 +101
==========================================
Hits 3838 3838
- Misses 9277 9378 +101
Partials 561 561
|
Description
Closes: #XXXX
Author Checklist
All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.
I have...
!
in the type prefix if API or client breaking changeReviewers Checklist
All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.
I have...
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Refactor
Style