Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: async filter & subscription system #82

Merged
merged 6 commits into from
Oct 24, 2024
Merged

fix: async filter & subscription system #82

merged 6 commits into from
Oct 24, 2024

Conversation

beer-1
Copy link
Collaborator

@beer-1 beer-1 commented Oct 24, 2024

Description

Closes: #XXXX


Author Checklist

All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.

I have...

  • included the correct type prefix in the PR title, you can find examples of the prefixes below:
  • confirmed ! in the type prefix if API or client breaking change
  • targeted the correct branch
  • provided a link to the relevant issue or specification
  • reviewed "Files changed" and left comments if necessary
  • included the necessary unit and integration tests
  • updated the relevant documentation or specification, including comments for documenting Go code
  • confirmed all CI checks have passed

Reviewers Checklist

All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.

I have...

  • confirmed the correct type prefix in the PR title
  • confirmed all author checklist items have been addressed
  • reviewed state machine logic, API design and naming, documentation is accurate, tests and test coverage

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced event emission capabilities for block events, improving notification efficiency.
    • Introduced lifecycle management for subscriptions, refining filter handling and organization.
  • Bug Fixes

    • Updated logic to ensure valid block height context in transaction count retrieval.
  • Refactor

    • Removed outdated filter configuration parameters to simplify the API.
    • Streamlined subscription management and filter lifecycle processes for better clarity and performance.
  • Style

    • Improved code readability and maintainability through updated comments and method names.

@beer-1 beer-1 self-assigned this Oct 24, 2024
@beer-1 beer-1 requested a review from a team as a code owner October 24, 2024 08:56
Copy link

coderabbitai bot commented Oct 24, 2024

Warning

Rate limit exceeded

@beer-1 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 18 minutes and 20 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Files that changed from the base of the PR and between 7ebe297 and 4d7e744.

Walkthrough

The pull request introduces significant changes to enhance log handling and event emission within the EVMIndexerImpl class and the subscription management in the filters package. Key modifications include the aggregation of logs into a slice before emission, the introduction of a new blockEvents struct, and the creation of a dedicated method for emitting block events. Additionally, methods related to filter management have been refactored for improved efficiency, including the removal of certain configuration parameters and the simplification of the Logs method interface.

Changes

File Change Summary
indexer/abci.go Modified ListenFinalizeBlock to aggregate logs into blockLogs and emit them in a new goroutine.
indexer/indexer.go Added blockEvents struct and blockEventsEmitter method for enhanced block event emission.
jsonrpc/backend/filters.go Removed RPCFilterCap and RPCBlockRangeCap methods from JSONRPCBackend.
jsonrpc/backend/tx.go Updated getQueryCtxWithHeight to reset height if it exceeds the latest block height; updated GetTransactionCount to use this method.
jsonrpc/config/config.go Removed FilterCap and BlockRangeCap from JSONRPCConfig and related constants and flags.
jsonrpc/namespaces/eth/filters/api.go Enhanced subscription management and filter handling; added subscription channels to filter struct.
jsonrpc/namespaces/eth/filters/filter.go Simplified Logs method by removing blockLimit parameter.
jsonrpc/namespaces/eth/filters/subscriptions.go Updated subscription methods to utilize new lifecycle management channels and improved clarity.

Possibly related PRs

🐇 In the code's garden, changes bloom bright,
Logs now gather, ready for flight.
Emitting events with a new grace,
Subscriptions dance in their rightful place.
With each little tweak, the system does sing,
Hopping along, oh what joy these changes bring! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (9)
jsonrpc/namespaces/eth/filters/subscriptions.go (2)

Line range hint 131-135: Consider batching log notifications for better performance.

Processing and notifying logs individually could be inefficient for large log sets. Consider batching notifications:

-for _, log := range logs {
-    log := log
-    _ = notifier.Notify(rpcSub.ID, &log)
+if len(logs) > 0 {
+    _ = notifier.Notify(rpcSub.ID, logs)
 }

Line range hint 200-223: Consider adding timeout to uninstallation.

While the uninstallation logic prevents deadlocks, it could potentially hang if the event loop is not responsive. Consider adding a timeout:

 func (api *FilterAPI) uninstallSubscription(s *subscription) {
     s.unsubOnce.Do(func() {
+        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+        defer cancel()
     uninstallLoop:
         for {
             select {
             case api.uninstall <- s:
                 break uninstallLoop
+            case <-ctx.Done():
+                return
             case <-s.logsChan:
             // ... other cases ...
             }
         }
         
-        <-s.err
+        select {
+        case <-s.err:
+        case <-ctx.Done():
+        }
     })
 }
indexer/indexer.go (2)

142-146: Enhance struct documentation.

While the struct is well-designed, consider adding more detailed documentation to explain:

  • The purpose and lifecycle of this struct
  • The relationship between header and logs
  • Whether logs can be nil/empty
-// blockEvents is a struct to emit block events.
+// blockEvents bundles a block header with its associated logs for event emission.
+// The header represents the block metadata while logs contain an array of log arrays,
+// where each inner array typically represents logs from a single transaction.
 type blockEvents struct {
     header *coretypes.Header
     logs   [][]*coretypes.Log
 }

141-141: Improve subscription management.

The current subscription system could benefit from better lifecycle management:

  1. Add mutex protection for channel slice modifications
  2. Implement unsubscribe functionality
  3. Add channel cleanup mechanism

Consider implementing a subscription manager:

type subscription struct {
    id          int64
    blockChan   chan *coretypes.Header
    logsChan    chan []*coretypes.Log
    pendingChan chan *rpctypes.RPCTransaction
    done        chan struct{}
}

func (e *EVMIndexerImpl) Subscribe() (*subscription, error) {
    e.mu.Lock()
    defer e.mu.Unlock()
    
    sub := &subscription{
        id:          e.nextSubID(),
        blockChan:   make(chan *coretypes.Header),
        logsChan:    make(chan []*coretypes.Log),
        pendingChan: make(chan *rpctypes.RPCTransaction),
        done:        make(chan struct{}),
    }
    
    e.subscriptions[sub.id] = sub
    return sub, nil
}

func (s *subscription) Unsubscribe() {
    close(s.done)
    close(s.blockChan)
    close(s.logsChan)
    close(s.pendingChan)
}
indexer/abci.go (1)

181-182: LGTM: Improved log handling with batch collection.

The change from immediate emission to batch collection is a good architectural improvement that can reduce channel contention and improve performance, especially under high load.

Consider adding metrics to track:

  • Average number of logs per block
  • Time spent in log collection vs emission
    This would help in future optimizations of the batch size and emission strategy.
jsonrpc/namespaces/eth/filters/filter.go (1)

Line range hint 134-171: LGTM! Consider enhancing documentation for async operations.

The async log retrieval implementation is robust with proper error handling, context cancellation, and efficient channel management. However, the async behavior could be better documented.

Consider adding documentation comments explaining:

  • Channel buffer sizes and their implications
  • Cleanup guarantees for goroutines
  • Expected behavior on context cancellation

Example documentation:

 // rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
 // it creates and returns two channels: one for delivering log data, and one for reporting errors.
+// The returned channels are unbuffered and will be closed when:
+// - All logs have been processed
+// - An error occurs during processing
+// - The provided context is cancelled
+// The caller must consume from both channels to prevent goroutine leaks.
 func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *coretypes.Log, chan error) {
jsonrpc/backend/tx.go (2)

Line range hint 183-187: Ensure consistent error handling across all return paths

The method has multiple error return paths with different behaviors:

  • Some paths return (nil, nil) for not found cases
  • Others return (nil, err) for errors

Consider standardizing the error handling to make the behavior more predictable for callers.


Line range hint 119-187: Consider architectural implications for subscription system

The changes to block height handling could have implications for the async filter & subscription system:

  1. Silent fallback to height 0 might affect event filtering accuracy
  2. Subscription systems need to be aware of this behavior
  3. Consider documenting this behavior in the subscription system documentation
jsonrpc/namespaces/eth/filters/api.go (1)

40-42: Consider renaming field 's' to 'subscription' for clarity

Using a more descriptive field name than s can improve code readability. Suggest renaming s to subscription.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 875632b and 7ebe297.

📒 Files selected for processing (8)
  • indexer/abci.go (3 hunks)
  • indexer/indexer.go (1 hunks)
  • jsonrpc/backend/filters.go (0 hunks)
  • jsonrpc/backend/tx.go (2 hunks)
  • jsonrpc/config/config.go (0 hunks)
  • jsonrpc/namespaces/eth/filters/api.go (8 hunks)
  • jsonrpc/namespaces/eth/filters/filter.go (1 hunks)
  • jsonrpc/namespaces/eth/filters/subscriptions.go (5 hunks)
💤 Files with no reviewable changes (2)
  • jsonrpc/backend/filters.go
  • jsonrpc/config/config.go
🔇 Additional comments (9)
jsonrpc/namespaces/eth/filters/subscriptions.go (2)

21-27: Well-structured channel organization!

The separation of channels into "listening" and "lifecycle" groups with clear documentation improves code readability and maintainability.


Line range hint 15-28: Verify concurrent access safety.

The subscription struct contains multiple channels that are accessed from different goroutines. Let's verify there are no race conditions in the codebase:

✅ Verification successful

Let me gather more context about how these subscriptions are handled by looking at the implementation details.

This will help us understand:

  1. How subscriptions are installed and managed
  2. How the channels are used across goroutines
  3. The lifecycle management of subscriptions

Let me gather one final piece of information about the subscription installation and mutex usage to complete the analysis.

This will help us understand:

  1. How the mutex protects subscription operations
  2. The complete subscription installation process

Based on the gathered information, I can now provide a final response about the concurrent access safety of the subscription struct.

Concurrent access is properly handled - no safety concerns

The subscription struct's concurrent access is safely managed through several mechanisms:

  1. The filtersMut sync.Mutex protects all filter and subscription operations in the FilterAPI
  2. Channel operations are naturally thread-safe
  3. The unsubOnce sync.Once ensures single execution of unsubscribe operations
  4. Subscription lifecycle is well-managed:
    • Installation is protected by mutex
    • Each subscription runs in its own goroutine
    • Uninstallation is coordinated through channels and mutex

The design follows Go's concurrency patterns correctly by:

  • Using channels for communication
  • Protecting shared state with mutex
  • Ensuring clean teardown with sync.Once
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other goroutines accessing subscription fields
rg -A 3 "go func" | grep -B 2 "s\."

# Search for mutex usage in subscription-related code
rg "sync\.Mutex|sync\.RWMutex" --type go

Length of output: 1090


Script:

#!/bin/bash
# Check subscription installation and uninstallation
ast-grep --pattern 'func $_ installSubscription($_) {
  $$$
}'

# Check how the channels are used
rg -B 2 -A 5 "headerChan|logsChan|txChan|hashChan" --type go

# Look for subscription creation and cleanup
rg -B 2 -A 5 "NewSubscription|uninstallSubscription" --type go

Length of output: 41448


Script:

#!/bin/bash
# Check how subscriptions are installed and managed with mutex
rg -B 3 -A 5 "filtersMut" --type go

# Look for subscription installation
ast-grep --pattern 'func $_ install($_) {
  $$$
}'

Length of output: 9158

indexer/abci.go (2)

149-149: LGTM: Efficient log collection initialization.

The pre-allocation of capacity for blockLogs is an optimization that prevents slice reallocation during append operations.


195-196: ⚠️ Potential issue

Consider improving goroutine management and error handling.

The asynchronous event emission could benefit from several improvements:

  1. Context propagation for cancellation
  2. Error handling for the asynchronous operation
  3. Channel backpressure handling
  4. Race condition prevention

Consider applying these improvements:

-	// emit block event in a goroutine
-	go e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: blockLogs})
+	// emit block event in a goroutine with proper context and error handling
+	eventsChan := make(chan error, 1)
+	go func() {
+		defer close(eventsChan)
+		select {
+		case eventsChan <- e.blockEventsEmitter(&blockEvents{header: &blockHeader, logs: append([][]*coretypes.Log{}, blockLogs...)}):
+		case <-ctx.Done():
+			eventsChan <- ctx.Err()
+		}
+	}()
+	
+	// Optional: handle errors from the goroutine
+	go func() {
+		if err := <-eventsChan; err != nil {
+			e.logger.Error("block events emission failed", "err", err)
+		}
+	}()

This implementation:

  • Propagates context for proper cancellation
  • Provides error handling
  • Creates a defensive copy of blockLogs to prevent race conditions
  • Uses a buffered error channel to prevent goroutine leaks

Let's verify the event emission implementation:

jsonrpc/namespaces/eth/filters/filter.go (1)

73-73: Consider adding safeguards for unbounded range queries.

The removal of blockLimit parameter could potentially lead to unbounded resource usage when querying large block ranges. While this simplifies the API, it might expose the system to resource exhaustion.

Let's check if there are any other rate limiting or protection mechanisms in place:

Consider implementing one of these safeguards:

  1. Add rate limiting at the API layer
  2. Implement pagination for large result sets
  3. Add configurable system-wide limits for maximum block range
jsonrpc/backend/tx.go (1)

119-127: Consider explicit error handling for invalid block heights

The current implementation silently falls back to height 0 when the requested height exceeds the latest block. This could mask issues where callers expect data from a specific height. Consider:

  1. Explicitly returning an error for heights that exceed the latest block
  2. Adding a comment explaining the fallback behavior if it's intentional
  3. Adding protection against potential race conditions between height check and context creation

Let's verify if this fallback behavior is consistent with other height-related methods:

jsonrpc/namespaces/eth/filters/api.go (3)

94-115: Efficient filter cleanup with ticker and delayed unsubscription

The introduction of a ticker for periodic checks and processing unsubscriptions outside the lock improves performance and avoids potential deadlocks.


135-136: Improved concurrency by deferring log filtering to subscription goroutine

Moving log filtering into the subscription goroutine reduces contention and improves performance.


389-392: Properly uninstalling associated subscriptions

Ensuring the associated subscription is uninstalled when removing a filter prevents potential resource leaks.

jsonrpc/namespaces/eth/filters/subscriptions.go Outdated Show resolved Hide resolved
indexer/indexer.go Outdated Show resolved Hide resolved
jsonrpc/namespaces/eth/filters/api.go Show resolved Hide resolved
jsonrpc/namespaces/eth/filters/api.go Show resolved Hide resolved
jsonrpc/namespaces/eth/filters/api.go Show resolved Hide resolved
Copy link

codecov bot commented Oct 24, 2024

Codecov Report

Attention: Patch coverage is 0% with 193 lines in your changes missing coverage. Please review.

Project coverage is 27.85%. Comparing base (875632b) to head (4d7e744).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
jsonrpc/namespaces/eth/filters/api.go 0.00% 126 Missing ⚠️
jsonrpc/namespaces/eth/filters/subscriptions.go 0.00% 27 Missing ⚠️
indexer/indexer.go 0.00% 15 Missing ⚠️
indexer/abci.go 0.00% 9 Missing ⚠️
jsonrpc/backend/tx.go 0.00% 9 Missing ⚠️
jsonrpc/config/config.go 0.00% 4 Missing ⚠️
jsonrpc/backend/backend.go 0.00% 2 Missing ⚠️
jsonrpc/namespaces/eth/filters/filter.go 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main      #82      +/-   ##
==========================================
- Coverage   28.06%   27.85%   -0.21%     
==========================================
  Files         124      124              
  Lines       13676    13777     +101     
==========================================
  Hits         3838     3838              
- Misses       9277     9378     +101     
  Partials      561      561              
Files with missing lines Coverage Δ
jsonrpc/backend/filters.go 0.00% <ø> (ø)
jsonrpc/namespaces/eth/filters/filter.go 0.00% <0.00%> (ø)
jsonrpc/backend/backend.go 0.00% <0.00%> (ø)
jsonrpc/config/config.go 0.00% <0.00%> (ø)
indexer/abci.go 0.00% <0.00%> (ø)
jsonrpc/backend/tx.go 0.00% <0.00%> (ø)
indexer/indexer.go 0.00% <0.00%> (ø)
jsonrpc/namespaces/eth/filters/subscriptions.go 0.00% <0.00%> (ø)
jsonrpc/namespaces/eth/filters/api.go 0.00% <0.00%> (ø)

@beer-1 beer-1 merged commit 63688ba into main Oct 24, 2024
5 checks passed
@beer-1 beer-1 deleted the fix/async-filter branch October 24, 2024 09:35
@coderabbitai coderabbitai bot mentioned this pull request Nov 22, 2024
11 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant