-
Notifications
You must be signed in to change notification settings - Fork 329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf(flow): Map&Reduce Operator use batch to reduce alloc #4567
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe changes enhance the dataflow processing capabilities in several Rust modules by introducing batch processing methods, improving error handling, and introducing new structures for better data manipulation. New public methods have been added to support batch operations across various components, streamlining data handling and improving performance. The overall design is more robust, allowing for efficient management of collections and operations within the data processing context. Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
47c36a6
to
5de2ace
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4567 +/- ##
==========================================
- Coverage 84.92% 84.66% -0.26%
==========================================
Files 1096 1103 +7
Lines 196807 198600 +1793
==========================================
+ Hits 167135 168148 +1013
- Misses 29672 30452 +780 |
@coderabbitai review |
Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Outside diff range, codebase verification and nitpick comments (1)
src/flow/src/compute/render.rs (1)
90-117
: Handle unimplemented plan types gracefully inrender_plan_batch
.The function currently returns a
NotImplementedSnafu
error for certain plan types. Ensure that these cases are handled appropriately, and consider providing more informative error messages or logging for future debugging.// Consider adding more context to the error messages for unimplemented plans NotImplementedSnafu { reason: "Get is still WIP in batchmode", }
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (12)
- src/flow/src/compute/render.rs (3 hunks)
- src/flow/src/compute/render/map.rs (1 hunks)
- src/flow/src/compute/render/reduce.rs (5 hunks)
- src/flow/src/compute/render/src_sink.rs (2 hunks)
- src/flow/src/compute/types.rs (2 hunks)
- src/flow/src/expr.rs (3 hunks)
- src/flow/src/expr/linear.rs (7 hunks)
- src/flow/src/expr/relation/func.rs (2 hunks)
- src/flow/src/expr/scalar.rs (4 hunks)
- src/flow/src/lib.rs (1 hunks)
- src/flow/src/repr.rs (1 hunks)
- src/flow/src/utils.rs (1 hunks)
Files skipped from review due to trivial changes (1)
- src/flow/src/lib.rs
Additional comments not posted (22)
src/flow/src/compute/types.rs (3)
109-110
: Enhanced flexibility with generic type parameter inCollectionBundle
.The introduction of a generic type parameter
T
with a default ofDiffRow
significantly enhances the flexibility of theCollectionBundle
struct, allowing it to handle various data types while maintaining backward compatibility.
126-133
: Alignedfrom_collection
method with generic struct definition.The
from_collection
method now accepts aCollection<T>
, aligning with the generic type parameterT
in theCollectionBundle
struct. This change ensures type consistency across the codebase.
Line range hint
135-144
: Improvedclone
method forCollectionBundle<T>
.The
clone
method now supportsCollectionBundle<T>
whereT: Clone
, allowing instances to be cloned with any clonable typeT
. This enhances the usability of the struct.src/flow/src/repr.rs (1)
180-184
: EnhancedRow
usability withFrom<Vec<Value>>
implementation.The new
From
trait implementation allows for straightforward conversion fromVec<Value>
toRow
, enhancing the usability and flexibility of theRow
struct.src/flow/src/expr.rs (5)
44-50
: EnhancedBatch
struct withdiffs
field.The addition of the
diffs
field in theBatch
struct allows for tracking row insertions and deletions, providing more detailed information about the state of rows.
52-71
: UpdatedPartialEq
implementation forBatch
.The
PartialEq
implementation now includes thediffs
field in equality checks, ensuring that all relevant fields are considered for consistency and correctness.
126-139
: Robusttry_new
method forBatch
.The
try_new
method ensures all columns have the same length and initializesdiffs
toNone
, providing robust error handling and consistency in batch creation.
186-192
: Improved error handling inslice
method forBatch
.The
slice
method now returns aResult<Batch, EvalError>
, allowing for graceful error handling when slicing batches.
Line range hint
197-225
: Enhancedappend_batch
method forBatch
.The
append_batch
method now allows appending batches even when one is empty, enhancing the robustness and flexibility of batch operations.src/flow/src/compute/render/src_sink.rs (2)
35-89
: Enhanced data flow management withrender_source_batch
.The
render_source_batch
method processes incoming batches and handles various reception scenarios with error logging, enhancing data flow management through real-time batch processing and robust error handling.
173-197
: Expanded functionality withrender_unbounded_sink_batch
.The
render_unbounded_sink_batch
method facilitates the creation of an unbounded sink for batch collections, ensuring flexible and resilient handling of batch outputs with graceful handling of sender closure.src/flow/src/expr/relation/func.rs (3)
164-183
: Validate vector and diff length consistency.The
try_new
method inVectorDiff
ensures that the vector and diff have the same length. This is crucial for maintaining data integrity. Ensure that this validation is consistently applied whereverVectorDiff
is used.
186-197
: Implement efficient iteration withVectorDiffIter
.The
IntoIterator
implementation forVectorDiff
provides a streamlined way to iterate over vector-diff pairs. Ensure that this iterator is used efficiently in the codebase to avoid unnecessary performance overhead.
138-161
: Ensure proper handling of accumulator initialization.The
eval_batch
function initializes the accumulator based on whether it's empty or not. Ensure that the logic for creating a new accumulator or using an existing one is correct and efficient. Consider potential edge cases where the accumulator might not be correctly initialized.Verification successful
Accumulator Initialization Logic Verified
The
eval_batch
function properly handles accumulator initialization by usingAccum::new_accum
when the iterator is empty andAccum::try_from_iter
when restoring from an existing state. Both functions are implemented to handle their respective scenarios correctly. No issues found.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the initialization logic of the accumulator in `eval_batch`. # Test: Search for the `eval_batch` function and ensure proper initialization logic. rg --type rust -A 10 'fn eval_batch'Length of output: 6285
Script:
#!/bin/bash # Search for the definition of `Accum::new_accum` to understand its implementation. rg --type rust 'fn new_accum' -A 10 # Search for the definition of `Accum::try_from_iter` to understand its implementation. rg --type rust 'fn try_from_iter' -A 10Length of output: 5696
src/flow/src/compute/render/map.rs (1)
33-76
: Ensure efficient batch processing inrender_mfp_batch
.The
render_mfp_batch
function processes batches of data usingMfpPlan
. Ensure that the batch processing logic is efficient and correctly handles different data scenarios, including error handling and resource management.src/flow/src/expr/scalar.rs (1)
Line range hint
879-906
: Ensure robust error handling inBatch::try_new
.The
try_new
method now includes error handling for batch creation. Ensure that this method is thoroughly tested and handles all potential errors gracefully, especially in edge cases.src/flow/src/utils.rs (1)
516-516
: Addition ofClone
trait toArrangeHandler
.The inclusion of the
Clone
trait forArrangeHandler
is appropriate for enabling multiple references to the same data in concurrent contexts. Ensure that the usage of cloned instances does not lead to data inconsistencies.src/flow/src/expr/linear.rs (2)
484-519
: Implementation ofeval_batch_into
.This method efficiently processes a batch of data by applying predicates and filtering rows based on conditions. The error handling for column count mismatches is robust, providing clear feedback if the batch does not meet expected input arity. Ensure that the method is well-tested for various batch sizes and edge cases.
522-558
: Implementation ofeval_batch_inner
.The method enhances batch processing by evaluating predicates across a batch, utilizing a boolean vector to track rows that pass the predicates. The handling of type mismatches during downcasting to
BooleanArray
is well-implemented, ensuring robustness. Consider adding more test cases to cover complex predicate scenarios.src/flow/src/compute/render/reduce.rs (3)
42-256
: Introduction ofrender_reduce_batch
.This method significantly enhances batch processing capabilities for reduce operations. The design is robust, handling key and value separation, state updates, and scheduling efficiently. Ensure that the lack of support for distinct aggregation in batch mode is clearly documented and consider adding tests for edge cases.
378-388
: Addition offrom_accum_values_to_live_accums
.This utility function converts accumulated values into live accumulators, supporting batch processing. The implementation is clear and efficient. Ensure that the function is tested with various accumulator configurations to verify its correctness.
399-428
: Modification ofbatch_split_by_key_val
.The function now processes batches, separating key and value columns efficiently. The handling of empty key or value batches is well-considered. Ensure that the error handling logic is thoroughly tested to prevent runtime issues.
9f53a07
to
3aafc1c
Compare
3aafc1c
to
21c3fff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General LGTM
…am#4567) * feat: partial impl mfp * feat: eval batch inner * chore: fmt * feat: mfp eval_batch * WIP * feat: Collection generic over row&Batch * feat: render source batch * chore: chore * feat: render mfp batch * feat: render reduce batch(WIP) * feat(WIP): render reduce * feat: reduce batch * feat: render sink batch * feat: render constant batch * chore: error handling& mfp batch test * test: mfp batch * chore: rm import * test: render reduce batch * chore: add TODO * chore: per bot review * refactor: per review * chore: cmt * chore: rename * docs: update no panic
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
render_*_batch
functions for flow's render(Seerender/map/reduce/src_sink.rs
andrender.rs
files)render_reduce_batch
which simpilfy impl and should improve performance(Seesrc/flow/src/compute/render/reduce.rs
)evaluate_inner/into_batch
for SafeMfpPlan to help with eval mfp with batch(Seesrc/flow/src/expr/linear.rs
file)The PR is already too large too add functionality to actually use it, will do that in next PR
Checklist
Summary by CodeRabbit
New Features
Batch
struct to track row modifications and implemented new methods for improved error handling.Bug Fixes
Documentation
Tests
Chores