-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: adopt install uninstall chan to resolve deadlock on subscription #81
Conversation
Warning Rate limit exceeded@beer-1 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 5 minutes and 2 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe changes introduce enhancements to the Changes
Possibly related PRs
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (4)
jsonrpc/namespaces/eth/filters/subscriptions.go (1)
185-207
: Excellent deadlock prevention mechanism!The implementation effectively prevents deadlocks by:
- Using sync.Once for atomic uninstallation
- Draining pending events while attempting uninstallation
- Waiting for confirmation of cleanup completion
Consider enhancing the comments to explicitly explain the deadlock prevention mechanism:
- // write uninstall request and consume logs/hashes. This prevents - // the eventLoop broadcast method to deadlock when writing to the - // filter event channel while the subscription loop is waiting for - // this method to return (and thus not reading these events). + // Prevent deadlocks by draining any pending events while attempting + // to write the uninstall request. This ensures that: + // 1. The eventLoop can continue broadcasting without blocking + // 2. No events are lost before uninstallation + // 3. The subscription loop can safely exit without deadlockingjsonrpc/namespaces/eth/filters/api.go (3)
74-76
: Add context-based cancellation for goroutines.The
eventLoop
goroutine lacks a cancellation mechanism. Consider adding a context toFilterAPI
for graceful shutdown of all goroutines.type FilterAPI struct { + ctx context.Context + cancel context.CancelFunc ... } func NewFilterAPI(app *app.MinitiaApp, backend *backend.JSONRPCBackend, logger log.Logger) *FilterAPI { + ctx, cancel := context.WithCancel(context.Background()) api := &FilterAPI{ + ctx: ctx, + cancel: cancel, ... } ... - go api.eventLoop() + go api.eventLoop(ctx) return api }Also applies to: 84-84
Line range hint
105-174
: Improve concurrency handling in eventLoop.Several concurrency-related issues need attention:
- Lock granularity: The current implementation holds locks while appending to slices, which could block other operations unnecessarily.
- Subscription management: The installation process lacks error handling and could lead to deadlocks if the subscription is invalid.
- Potential race condition: Events might be processed before the subscription is fully installed.
Consider these improvements:
func (api *FilterAPI) eventLoop() { + defer api.cancel() // Ensure cleanup on exit for { select { + case <-api.ctx.Done(): + return case s := <-api.install: + if err := api.validateSubscription(s); err != nil { + s.err <- err + close(s.err) + close(s.installed) + continue + } api.subscriptions[s.id] = s close(s.installed) case s := <-api.uninstall: + if s == nil { + continue + } delete(api.subscriptions, s.id) close(s.err) case block := <-api.blockChan: api.filtersMut.Lock() + filters := make(map[rpc.ID]*filter, len(api.filters)) + for id, f := range api.filters { + filters[id] = f + } api.filtersMut.Unlock() - for _, f := range api.filters { + for _, f := range filters { if f.ty == ethfilters.BlocksSubscription { + api.filtersMut.Lock() f.hashes = append(f.hashes, block.Hash()) + api.filtersMut.Unlock() } } } } } +func (api *FilterAPI) validateSubscription(s *subscription) error { + if s == nil { + return errors.New("nil subscription") + } + // Add more validation as needed + return nil +}
Based on the gathered information, I can now provide a final response:
Concurrent testing is indeed needed for subscription management
The codebase shows several concurrent patterns that require testing:
- Mutex-protected shared state (
filtersMut sync.Mutex
)- Multiple channels for communication (
install
,uninstall
,headerChan
,logsChan
, etc.)- Goroutines handling subscriptions (seen in subscription handlers)
- No existing test files found in the filters package
Specific areas that need test coverage:
- Race conditions between filter mutations and event processing
- Channel communication patterns in
eventLoop
- Subscription installation and uninstallation concurrency
- Cleanup of resources in error scenarios
🔗 Analysis chain
Line range hint
105-174
: Add concurrent testing for subscription management.The new subscription management logic would benefit from comprehensive concurrent testing to verify the absence of deadlocks and race conditions.
Would you like me to help generate concurrent test cases for the subscription management functionality?
Let me gather more information about the testing coverage for this functionality.
This will help us understand the current test coverage and concurrent testing patterns in the filters package.
Let me check for more details about the existing test coverage and synchronization mechanisms.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check for existing concurrent tests rg -l "testing\.T.*goroutine.*subscription" # Look for potential race conditions rg "go run.*race" # Check for existing benchmark tests rg "testing\.B.*subscription"Length of output: 97
Script:
#!/bin/bash # Check for any test files related to filters fd ".*test.*" jsonrpc/namespaces/eth/filters/ # Look for existing test patterns in filter tests rg -A 5 "func Test.*Filter" jsonrpc/namespaces/eth/filters/ # Check for concurrent operations in filter tests rg -l "sync\." jsonrpc/namespaces/eth/filters/ # Look for existing subscription tests rg -A 5 "func.*subscription" jsonrpc/namespaces/eth/filters/Length of output: 841
Script:
#!/bin/bash # Check the content of api.go for mutex usage rg -A 3 "sync\.Mutex|sync\.RWMutex" jsonrpc/namespaces/eth/filters/api.go # Look for test files specifically fd ".*_test\.go$" jsonrpc/namespaces/eth/filters/ # Check for any existing test setup with channels rg -A 5 "make\(chan" jsonrpc/namespaces/eth/filters/ # Look for any race detection comments or directives rg -i "race|concurrent" jsonrpc/namespaces/eth/filters/Length of output: 3856
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- jsonrpc/namespaces/eth/filters/api.go (9 hunks)
- jsonrpc/namespaces/eth/filters/subscriptions.go (7 hunks)
🔇 Additional comments (5)
jsonrpc/namespaces/eth/filters/subscriptions.go (4)
Line range hint
16-29
: Well-designed subscription struct enhancements!The new fields provide robust synchronization mechanisms:
id
for unique identificationinstalled
channel for installation signalingerr
channel for uninstallation signalingunsubOnce
for atomic uninstallationThis design effectively prevents race conditions in the subscription lifecycle.
45-54
: Robust subscription initialization and cleanup pattern!The changes implement a thread-safe subscription lifecycle:
- Creates subscription with proper initialization
- Signals installation via channel
- Waits for confirmation before proceeding
- Uses subscription-based cleanup
This pattern effectively prevents race conditions during subscription setup and teardown.
Also applies to: 57-57
107-118
: Consistent implementation of subscription pattern!The changes correctly implement the same thread-safe pattern as NewHeads while preserving the existing filtering logic.
Also applies to: 121-121
154-165
: Consistent implementation of subscription pattern!The changes correctly implement the same thread-safe pattern while maintaining the flexibility to handle both full transactions and hashes.
Also applies to: 168-168
jsonrpc/namespaces/eth/filters/api.go (1)
55-58
: 🛠️ Refactor suggestionConsider using buffered channels for subscription management.
The newly added channels
install
anduninstall
are unbuffered, which could potentially block the sender in high-concurrency scenarios. Consider using buffered channels to reduce contention.- install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + const subscriptionBufferSize = 16 // adjust based on expected concurrent subscriptionsLikely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
jsonrpc/namespaces/eth/filters/api.go (2)
55-58
: Enhance channel documentation for clarity.While the comments describe the basic purpose, consider adding more detailed documentation about:
- The expected message flow through these channels
- Whether these channels are buffered/unbuffered
- Who is responsible for sending/receiving on these channels
// Channels for subscription managements - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification + // install receives new subscriptions from clients and signals completion + // through the subscription's installed channel + install chan *subscription + + // uninstall receives terminated subscriptions and signals completion + // through the subscription's err channel + uninstall chan *subscription
Line range hint
105-174
: Review lock handling and error propagation in eventLoop.The current implementation has several areas that could be improved:
- Lock granularity could be optimized to reduce contention
- Missing timeout handling for channel operations
- Limited error handling for subscription operations
Consider these improvements:
func (api *FilterAPI) eventLoop() { + // Add context for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { select { case block := <-api.blockChan: - api.filtersMut.Lock() - for _, f := range api.filters { + // Copy filters under lock to minimize lock duration + api.filtersMut.Lock() + filters := make(map[rpc.ID]*filter, len(api.filters)) + for id, f := range api.filters { + filters[id] = f + } + api.filtersMut.Unlock() + + // Process without holding lock + for _, f := range filters { if f.ty == ethfilters.BlocksSubscription { f.hashes = append(f.hashes, block.Hash()) } } - api.filtersMut.Unlock() + + // Update under lock + api.filtersMut.Lock() + for id, f := range filters { + if existing, ok := api.filters[id]; ok { + existing.hashes = f.hashes + } + } + api.filtersMut.Unlock() // Similar pattern for logs and pending transactions... case s := <-api.install: + select { + case <-ctx.Done(): + s.err <- errors.New("shutdown in progress") + close(s.err) + return + default: + } + api.filtersMut.Lock() api.subscriptions[s.id] = s api.filtersMut.Unlock() close(s.installed) case s := <-api.uninstall: + select { + case <-ctx.Done(): + return + default: + } + api.filtersMut.Lock() delete(api.subscriptions, s.id) api.filtersMut.Unlock() close(s.err) + case <-ctx.Done(): + return } } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- jsonrpc/namespaces/eth/filters/api.go (9 hunks)
🔇 Additional comments (1)
jsonrpc/namespaces/eth/filters/api.go (1)
305-307
: Reference existing optimization suggestion.The current implementation could benefit from the lock optimization suggestions provided in the previous review.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #81 +/- ##
==========================================
- Coverage 28.14% 28.06% -0.08%
==========================================
Files 124 124
Lines 13638 13676 +38
==========================================
Hits 3838 3838
- Misses 9239 9277 +38
Partials 561 561
|
Description
Closes: #XXXX
Author Checklist
All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.
I have...
!
in the type prefix if API or client breaking changeReviewers Checklist
All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.
I have...
Summary by CodeRabbit
New Features
eventLoop
method, allowing dynamic subscription management.Bug Fixes
Refactor
clearSubscription
method for better handling of subscription uninstallation.