Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(flow): Map&Reduce Operator use batch to reduce alloc #4567

Merged
merged 24 commits into from
Aug 29, 2024

Conversation

discord9
Copy link
Contributor

@discord9 discord9 commented Aug 15, 2024

I hereby agree to the terms of the GreptimeDB CLA.

Refer to a related PR or issue link (optional)

What's changed and what's your intention?

  • add render_*_batch functions for flow's render(See render/map/reduce/src_sink.rs and render.rs files)
  • A major change is impl of render_reduce_batch which simpilfy impl and should improve performance(See src/flow/src/compute/render/reduce.rs)
  • add evaluate_inner/into_batch for SafeMfpPlan to help with eval mfp with batch(See src/flow/src/expr/linear.rs file)
  • other changes are minor and can be ignore

The PR is already too large too add functionality to actually use it, will do that in next PR

Checklist

  • I have written the necessary rustdoc comments.
  • I have added the necessary unit tests and integration tests.
  • This PR requires documentation updates.

Summary by CodeRabbit

  • New Features

    • Introduced batch processing capabilities for dataflow plans, enhancing performance and efficiency.
    • Added methods for handling batch operations in various contexts, including rendering plans, processing source data, and managing outputs.
    • Enhanced the Batch struct to track row modifications and implemented new methods for improved error handling.
  • Bug Fixes

    • Improved error handling within batch processing methods to ensure robust operation.
  • Documentation

    • Updated method documentation for clarity regarding functionality and usage.
  • Tests

    • New test cases introduced to validate batch processing functionality across various components.
  • Chores

    • Added comments for future code quality improvements regarding out-of-bounds access in the codebase.

Copy link
Contributor

coderabbitai bot commented Aug 15, 2024

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The changes enhance the dataflow processing capabilities in several Rust modules by introducing batch processing methods, improving error handling, and introducing new structures for better data manipulation. New public methods have been added to support batch operations across various components, streamlining data handling and improving performance. The overall design is more robust, allowing for efficient management of collections and operations within the data processing context.

Changes

Files Change Summary
src/flow/src/compute/render.rs, src/flow/src/compute/render/map.rs, src/flow/src/compute/render/reduce.rs, src/flow/src/compute/render/src_sink.rs Introduced new batch processing methods (render_plan_batch, render_mfp_batch, render_reduce_batch, render_source_batch, render_unbounded_sink_batch) to enhance data handling efficiency and robustness. Updated existing methods for better error handling.
src/flow/src/compute/types.rs Modified CollectionBundle to be generic, allowing it to handle various data types. Added new methods for cloning and creating collections of different types.
src/flow/src/expr.rs, src/flow/src/expr/linear.rs, src/flow/src/expr/relation/func.rs, src/flow/src/expr/scalar.rs Enhanced batch processing capabilities in Batch and SafeMfpPlan, introducing methods for batch evaluation and handling differences, improving overall functionality and error management.
src/flow/src/lib.rs Added a comment regarding enabling Clippy lint for out-of-bounds access, without affecting functionality.
src/flow/src/repr.rs Implemented From<Vec<Value>> for Row, allowing for easier conversion from vectors to rows.
src/flow/src/utils.rs Expanded ArrangeHandler to derive Clone, improving usability in concurrent contexts.

Poem

In the fields of code I hop,
With batch methods my spirits pop!
Data flows in a rhythmic dance,
Errors handled, all enhanced!
A sprinkle of joy in every byte,
Hopping along, everything feels right! 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai anywhere in the PR title to generate the title automatically.
  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the docs-not-required This change does not impact docs. label Aug 15, 2024
@discord9 discord9 changed the title perf: flow batch mode&perf report perf(flow): Map&Reduce Operator use batch to reduce alloc Aug 19, 2024
@discord9 discord9 force-pushed the flow_mfp_reduce_batch branch from 47c36a6 to 5de2ace Compare August 20, 2024 03:35
@discord9 discord9 marked this pull request as ready for review August 20, 2024 03:44
Copy link

codecov bot commented Aug 20, 2024

Codecov Report

Attention: Patch coverage is 74.74093% with 195 lines in your changes missing coverage. Please review.

Project coverage is 84.66%. Comparing base (da337a9) to head (bf56523).
Report is 23 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4567      +/-   ##
==========================================
- Coverage   84.92%   84.66%   -0.26%     
==========================================
  Files        1096     1103       +7     
  Lines      196807   198600    +1793     
==========================================
+ Hits       167135   168148    +1013     
- Misses      29672    30452     +780     

@discord9
Copy link
Contributor Author

@coderabbitai review

Copy link
Contributor

coderabbitai bot commented Aug 20, 2024

Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range, codebase verification and nitpick comments (1)
src/flow/src/compute/render.rs (1)

90-117: Handle unimplemented plan types gracefully in render_plan_batch.

The function currently returns a NotImplementedSnafu error for certain plan types. Ensure that these cases are handled appropriately, and consider providing more informative error messages or logging for future debugging.

// Consider adding more context to the error messages for unimplemented plans
NotImplementedSnafu {
    reason: "Get is still WIP in batchmode",
}
Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 6fccff4 and 5de2ace.

Files selected for processing (12)
  • src/flow/src/compute/render.rs (3 hunks)
  • src/flow/src/compute/render/map.rs (1 hunks)
  • src/flow/src/compute/render/reduce.rs (5 hunks)
  • src/flow/src/compute/render/src_sink.rs (2 hunks)
  • src/flow/src/compute/types.rs (2 hunks)
  • src/flow/src/expr.rs (3 hunks)
  • src/flow/src/expr/linear.rs (7 hunks)
  • src/flow/src/expr/relation/func.rs (2 hunks)
  • src/flow/src/expr/scalar.rs (4 hunks)
  • src/flow/src/lib.rs (1 hunks)
  • src/flow/src/repr.rs (1 hunks)
  • src/flow/src/utils.rs (1 hunks)
Files skipped from review due to trivial changes (1)
  • src/flow/src/lib.rs
Additional comments not posted (22)
src/flow/src/compute/types.rs (3)

109-110: Enhanced flexibility with generic type parameter in CollectionBundle.

The introduction of a generic type parameter T with a default of DiffRow significantly enhances the flexibility of the CollectionBundle struct, allowing it to handle various data types while maintaining backward compatibility.


126-133: Aligned from_collection method with generic struct definition.

The from_collection method now accepts a Collection<T>, aligning with the generic type parameter T in the CollectionBundle struct. This change ensures type consistency across the codebase.


Line range hint 135-144: Improved clone method for CollectionBundle<T>.

The clone method now supports CollectionBundle<T> where T: Clone, allowing instances to be cloned with any clonable type T. This enhances the usability of the struct.

src/flow/src/repr.rs (1)

180-184: Enhanced Row usability with From<Vec<Value>> implementation.

The new From trait implementation allows for straightforward conversion from Vec<Value> to Row, enhancing the usability and flexibility of the Row struct.

src/flow/src/expr.rs (5)

44-50: Enhanced Batch struct with diffs field.

The addition of the diffs field in the Batch struct allows for tracking row insertions and deletions, providing more detailed information about the state of rows.


52-71: Updated PartialEq implementation for Batch.

The PartialEq implementation now includes the diffs field in equality checks, ensuring that all relevant fields are considered for consistency and correctness.


126-139: Robust try_new method for Batch.

The try_new method ensures all columns have the same length and initializes diffs to None, providing robust error handling and consistency in batch creation.


186-192: Improved error handling in slice method for Batch.

The slice method now returns a Result<Batch, EvalError>, allowing for graceful error handling when slicing batches.


Line range hint 197-225: Enhanced append_batch method for Batch.

The append_batch method now allows appending batches even when one is empty, enhancing the robustness and flexibility of batch operations.

src/flow/src/compute/render/src_sink.rs (2)

35-89: Enhanced data flow management with render_source_batch.

The render_source_batch method processes incoming batches and handles various reception scenarios with error logging, enhancing data flow management through real-time batch processing and robust error handling.


173-197: Expanded functionality with render_unbounded_sink_batch.

The render_unbounded_sink_batch method facilitates the creation of an unbounded sink for batch collections, ensuring flexible and resilient handling of batch outputs with graceful handling of sender closure.

src/flow/src/expr/relation/func.rs (3)

164-183: Validate vector and diff length consistency.

The try_new method in VectorDiff ensures that the vector and diff have the same length. This is crucial for maintaining data integrity. Ensure that this validation is consistently applied wherever VectorDiff is used.


186-197: Implement efficient iteration with VectorDiffIter.

The IntoIterator implementation for VectorDiff provides a streamlined way to iterate over vector-diff pairs. Ensure that this iterator is used efficiently in the codebase to avoid unnecessary performance overhead.


138-161: Ensure proper handling of accumulator initialization.

The eval_batch function initializes the accumulator based on whether it's empty or not. Ensure that the logic for creating a new accumulator or using an existing one is correct and efficient. Consider potential edge cases where the accumulator might not be correctly initialized.

Verification successful

Accumulator Initialization Logic Verified

The eval_batch function properly handles accumulator initialization by using Accum::new_accum when the iterator is empty and Accum::try_from_iter when restoring from an existing state. Both functions are implemented to handle their respective scenarios correctly. No issues found.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the initialization logic of the accumulator in `eval_batch`.

# Test: Search for the `eval_batch` function and ensure proper initialization logic.
rg --type rust -A 10 'fn eval_batch'

Length of output: 6285


Script:

#!/bin/bash
# Search for the definition of `Accum::new_accum` to understand its implementation.
rg --type rust 'fn new_accum' -A 10

# Search for the definition of `Accum::try_from_iter` to understand its implementation.
rg --type rust 'fn try_from_iter' -A 10

Length of output: 5696

src/flow/src/compute/render/map.rs (1)

33-76: Ensure efficient batch processing in render_mfp_batch.

The render_mfp_batch function processes batches of data using MfpPlan. Ensure that the batch processing logic is efficient and correctly handles different data scenarios, including error handling and resource management.

src/flow/src/expr/scalar.rs (1)

Line range hint 879-906: Ensure robust error handling in Batch::try_new.

The try_new method now includes error handling for batch creation. Ensure that this method is thoroughly tested and handles all potential errors gracefully, especially in edge cases.

src/flow/src/utils.rs (1)

516-516: Addition of Clone trait to ArrangeHandler.

The inclusion of the Clone trait for ArrangeHandler is appropriate for enabling multiple references to the same data in concurrent contexts. Ensure that the usage of cloned instances does not lead to data inconsistencies.

src/flow/src/expr/linear.rs (2)

484-519: Implementation of eval_batch_into.

This method efficiently processes a batch of data by applying predicates and filtering rows based on conditions. The error handling for column count mismatches is robust, providing clear feedback if the batch does not meet expected input arity. Ensure that the method is well-tested for various batch sizes and edge cases.


522-558: Implementation of eval_batch_inner.

The method enhances batch processing by evaluating predicates across a batch, utilizing a boolean vector to track rows that pass the predicates. The handling of type mismatches during downcasting to BooleanArray is well-implemented, ensuring robustness. Consider adding more test cases to cover complex predicate scenarios.

src/flow/src/compute/render/reduce.rs (3)

42-256: Introduction of render_reduce_batch.

This method significantly enhances batch processing capabilities for reduce operations. The design is robust, handling key and value separation, state updates, and scheduling efficiently. Ensure that the lack of support for distinct aggregation in batch mode is clearly documented and consider adding tests for edge cases.


378-388: Addition of from_accum_values_to_live_accums.

This utility function converts accumulated values into live accumulators, supporting batch processing. The implementation is clear and efficient. Ensure that the function is tested with various accumulator configurations to verify its correctness.


399-428: Modification of batch_split_by_key_val.

The function now processes batches, separating key and value columns efficiently. The handling of empty key or value batches is well-considered. Ensure that the error handling logic is thoroughly tested to prevent runtime issues.

src/flow/src/expr/relation/func.rs Outdated Show resolved Hide resolved
src/flow/src/compute/render.rs Outdated Show resolved Hide resolved
src/flow/src/expr.rs Outdated Show resolved Hide resolved
src/flow/src/compute/render.rs Outdated Show resolved Hide resolved
src/flow/src/compute/render/map.rs Outdated Show resolved Hide resolved
src/flow/src/compute/render/map.rs Outdated Show resolved Hide resolved
src/flow/src/compute/render/reduce.rs Show resolved Hide resolved
@discord9 discord9 force-pushed the flow_mfp_reduce_batch branch 2 times, most recently from 9f53a07 to 3aafc1c Compare August 23, 2024 06:06
@discord9 discord9 force-pushed the flow_mfp_reduce_batch branch from 3aafc1c to 21c3fff Compare August 26, 2024 06:52
Copy link
Member

@waynexia waynexia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General LGTM

src/flow/src/expr.rs Outdated Show resolved Hide resolved
@discord9 discord9 enabled auto-merge August 29, 2024 07:26
@discord9 discord9 added this pull request to the merge queue Aug 29, 2024
Merged via the queue into GreptimeTeam:main with commit 8c8499c Aug 29, 2024
32 checks passed
@discord9 discord9 deleted the flow_mfp_reduce_batch branch August 29, 2024 07:43
CookiePieWw pushed a commit to CookiePieWw/greptimedb that referenced this pull request Sep 17, 2024
…am#4567)

* feat: partial impl mfp

* feat: eval batch inner

* chore: fmt

* feat: mfp eval_batch

* WIP

* feat: Collection generic over row&Batch

* feat: render source batch

* chore: chore

* feat: render mfp batch

* feat: render reduce batch(WIP)

* feat(WIP): render reduce

* feat: reduce batch

* feat: render sink batch

* feat: render constant batch

* chore: error handling& mfp batch test

* test: mfp batch

* chore: rm import

* test: render reduce batch

* chore: add TODO

* chore: per bot review

* refactor: per review

* chore: cmt

* chore: rename

* docs: update no panic
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-not-required This change does not impact docs.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants