Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update validate_number_of_cores() #488

Merged
merged 6 commits into from
Nov 10, 2024
Merged

Conversation

jan-janssen
Copy link
Member

@jan-janssen jan-janssen commented Nov 9, 2024

Summary by CodeRabbit

  • New Features

    • Enhanced flexibility for resource allocation with optional parameters for max_cores and max_workers in the InteractiveStepExecutor.
    • Improved validation logic for core allocation, ensuring better error handling and resource management.
  • Bug Fixes

    • Added error handling for cases when resource parameters are not defined, preventing potential runtime issues.
  • Tests

    • Expanded test coverage for the validate_number_of_cores function, ensuring proper exception handling and return types.

Copy link
Contributor

coderabbitai bot commented Nov 9, 2024

Caution

Review failed

The pull request is closed.

Walkthrough

The changes in this pull request primarily enhance the logic for determining the max_workers parameter across various executor classes in the executorlib. The create_executor function now utilizes a new validation function, validate_number_of_cores, which incorporates additional parameters for improved resource management. The InteractiveStepExecutor constructor has been updated to accept max_workers as an optional parameter, and several related functions have been modified to ensure compatibility. Error handling and validation remain intact, but the overall worker allocation logic is now more robust and flexible.

Changes

File Path Change Summary
executorlib/interactive/executor.py Updated create_executor to use validate_number_of_cores for calculating max_workers. Added max_workers as a parameter in InteractiveStepExecutor instantiation for flux and slurm backends.
executorlib/interactive/shared.py Changed max_cores in InteractiveStepExecutor to optional. Added max_workers as an optional parameter across several functions (__init__, execute_separate_tasks, _wait_for_free_slots, etc.) for improved flexibility.
executorlib/standalone/inputcheck.py Updated validate_number_of_cores to include cores_per_worker and set_local_cores. Enhanced logic for error handling and resource allocation based on new parameters.
tests/test_shared_input_check.py Added test cases for validate_number_of_cores to check for exceptions and validate return types with new parameters. Updated assertions to reflect changes in function signatures.
tests/test_executor_backend_mpi.py Modified test_meta_executor_parallel_cache to change max_workers parameter to max_cores for consistency across tests.

Possibly related PRs

  • Update validate_number_of_cores() #488: The changes in this PR enhance the logic for determining the max_workers parameter in the create_executor function, which is directly related to the modifications made in the main PR regarding the create_executor function in executorlib/interactive/executor.py.
  • Raise ValueError when requesting more cores than available #489: This PR introduces a new parameter max_cores in the ExecutorBase class and adds validation logic in the submit method to raise a ValueError when the requested cores exceed the maximum allowed, which aligns with the changes in the main PR that also focus on worker allocation and validation logic.

🐇 In the land of code, where changes bloom,
New parameters sprout, making space in the room.
With max_workers now optional, oh what a delight,
Flexibility dances, in the day and the night.
Errors caught swiftly, like rabbits in play,
In the garden of logic, we frolic and sway! 🌼


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (5)
tests/test_shared_input_check.py (1)

109-117: Consider adding more test cases for comprehensive coverage.

While the current test cases cover basic scenarios, consider adding tests for:

  1. All parameters being non-None (e.g., max_cores=2, max_workers=2, cores_per_worker=1)
  2. Edge cases with cores_per_worker (e.g., cores_per_worker > max_cores)

Example additions:

def test_validate_number_of_cores(self):
    # Existing tests...
    
    # Test all parameters specified
    self.assertIsInstance(
        validate_number_of_cores(max_cores=2, max_workers=2, cores_per_worker=1),
        int
    )
    
    # Test invalid cores_per_worker
    with self.assertRaises(ValueError):
        validate_number_of_cores(max_cores=2, max_workers=2, cores_per_worker=3)
executorlib/interactive/executor.py (1)

Line range hint 220-284: Well-structured centralization of worker validation logic.

The changes successfully centralize the worker count validation across all backends while maintaining backend-specific requirements. This architectural improvement:

  • Reduces code duplication
  • Ensures consistent validation
  • Maintains backend-specific configurations

Consider documenting the backend-specific validation differences (especially the set_local_cores parameter) in the function docstring to help future maintainers understand the design decisions.

executorlib/interactive/shared.py (1)

418-427: Reduce code duplication in resource management logic.

The current implementation duplicates the dictionary comprehension for filtering completed tasks.

Consider this more DRY approach:

    if max_cores is not None:
-       while sum(active_task_dict.values()) + cores_requested > max_cores:
-           active_task_dict = {
-               k: v for k, v in active_task_dict.items() if not k.done()
-           }
-   elif max_workers is not None and max_cores is None:
-       while len(active_task_dict.values()) + 1 > max_workers:
-           active_task_dict = {
-               k: v for k, v in active_task_dict.items() if not k.done()
-           }
+       limit = max_cores
+       current_usage = lambda d: sum(d.values()) + cores_requested
+   elif max_workers is not None:
+       limit = max_workers
+       current_usage = lambda d: len(d.values()) + 1
+   else:
+       return active_task_dict
+   
+   while current_usage(active_task_dict) > limit:
+       active_task_dict = {
+           k: v for k, v in active_task_dict.items() if not k.done()
+       }
executorlib/standalone/inputcheck.py (2)

173-190: Consider renaming the function for clarity

The function validate_number_of_cores now calculates and returns max_workers, which might cause confusion since the name suggests it validates cores. Renaming the function to better reflect its purpose could improve code readability, such as calculate_max_workers.


189-189: Use floor division for integer calculation

In the line max_workers = int(max_cores / cores_per_worker), consider using floor division // to directly obtain an integer result without type casting.

Apply this minor change:

max_workers = int(max_cores / cores_per_worker)
# Change to:
max_workers = max_cores // cores_per_worker
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b314e72 and a345de5.

📒 Files selected for processing (4)
  • executorlib/interactive/executor.py (3 hunks)
  • executorlib/interactive/shared.py (9 hunks)
  • executorlib/standalone/inputcheck.py (1 hunks)
  • tests/test_shared_input_check.py (1 hunks)
🧰 Additional context used
🪛 Ruff
executorlib/interactive/shared.py

184-184: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

🔇 Additional comments (8)
tests/test_shared_input_check.py (2)

101-104: LGTM: Well-structured test for invalid input combination.

The test case properly validates that the function raises ValueError when all parameters are None, which is a critical edge case to check.


105-108: Verify if TypeError is the expected exception for this input combination.

The test expects TypeError when max_cores=1 and other parameters are None. This seems counterintuitive as having a single core specified could be a valid configuration. Could you verify if this is the intended behavior?

✅ Verification successful

Let me gather more context about the implementation to understand the validation logic better.


Now I can provide the final response based on the implementation details:

TypeError is correctly expected when cores_per_worker is None

The implementation shows that when max_cores is provided but max_workers is None, the function attempts to calculate max_workers = int(max_cores / cores_per_worker). Since cores_per_worker is None in the test case, this division operation would raise a TypeError, making this test case valid and intentional.

Looking at the test cases:

  • First case tests ValueError when both max_cores and max_workers are None
  • Second case tests TypeError when division by None is attempted
  • Third case (visible in output) tests successful execution when both max_cores and cores_per_worker are 1
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check the implementation to understand why TypeError is raised for this case

# Look for the validate_number_of_cores implementation
rg -A 10 "def validate_number_of_cores" 

Length of output: 808


Script:

#!/bin/bash
# Get the full implementation of validate_number_of_cores
rg -A 30 "def validate_number_of_cores"

# Also search for any related test cases
rg -B 2 -A 5 "TypeError.*validate_number_of_cores" tests/

Length of output: 1542


Script:

#!/bin/bash
# Get more context around the test case
rg -B 10 -A 10 "def test_validate_number_of_cores" tests/

# Also check for any division operations in the implementation
rg -B 2 -A 2 "cores_per_worker" executorlib/

Length of output: 4633

executorlib/interactive/executor.py (3)

242-247: LGTM! Consistent implementation with flux backend.

The changes maintain consistency with the flux backend implementation, ensuring uniform validation across different execution environments.

Also applies to: 254-254


272-277: Verify the local cores setting impact.

The implementation correctly sets set_local_cores=True for the local backend, which differs from other backends. This suggests special handling for local execution.

Let's verify the local cores handling:

#!/bin/bash
# Description: Verify local cores handling in validate_number_of_cores
# Look for any special handling of local cores in the validation function
rg -A 5 "set_local_cores.*=.*True"

# Check for any system-specific core counting logic
ast-grep --pattern 'validate_number_of_cores($$$set_local_cores = True$$$)'

Also applies to: 284-284


220-225: LGTM! Verify max_workers validation across all code paths.

The changes correctly implement the new validation logic for the flux backend, maintaining consistency between block and non-block allocation paths.

Let's verify the validation implementation:

Also applies to: 232-232

✅ Verification successful

Validation of max_workers is consistent and correct across all code paths

The verification shows that:

  • All three usages of validate_number_of_cores() in executor.py correctly pass the required parameters
  • The implementation in inputcheck.py properly handles all validation scenarios:
    • When both max_cores and max_workers are None
    • When max_cores is provided but max_workers is None
    • The set_local_cores parameter correctly determines whether to use local CPU count
  • No other functions are calculating max_workers independently
  • Test coverage exists for the validation function
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify validate_number_of_cores usage consistency
# Look for any direct max_workers calculations that might bypass validation
rg -g '!*.pyc' -g '!*test*' "max_workers.*=.*max_cores.*cores_per_worker" 

# Look for all validate_number_of_cores usages to ensure consistent parameters
ast-grep --pattern 'validate_number_of_cores($$$)'

Length of output: 506


Script:

#!/bin/bash
# Let's search for all validate_number_of_cores usages with better patterns
ast-grep --pattern 'validate_number_of_cores($$$)'

# Also search for raw text occurrences to catch any string references
rg -A 3 "validate_number_of_cores"

# Look for the actual function definition
ast-grep --pattern 'def validate_number_of_cores($$$)'

Length of output: 5500


Script:

#!/bin/bash
# Let's look at the actual implementation of validate_number_of_cores
cat executorlib/standalone/inputcheck.py

# And check if there are any other functions that might be calculating max_workers
rg -g '!*.pyc' -g '!*test*' "def.*max_workers"

Length of output: 6680

executorlib/interactive/shared.py (2)

Line range hint 261-275: LGTM! Clear documentation of parameter relationship.

The changes maintain backwards compatibility while providing clear guidance on preferring max_cores over max_workers.


Line range hint 514-530: LGTM! Consistent implementation of resource management.

The changes maintain consistency with other functions and provide clear documentation of the parameter relationship.

Also applies to: 553-553

executorlib/standalone/inputcheck.py (1)

181-187: Verify the use of local CPU count in resource allocation

When both max_cores and max_workers are None and set_local_cores is True, max_workers is set to multiprocessing.cpu_count(). In distributed environments, using the local CPU count may not align with the desired resource allocation strategy. Ensure this behavior is intended and consistent across different execution environments.

Comment on lines 182 to 191
max_cores: Optional[int] = None,
max_workers: Optional[int] = None,
executor_kwargs: dict = {},
spawner: BaseSpawner = MpiExecSpawner,
):
super().__init__()
executor_kwargs["future_queue"] = self._future_queue
executor_kwargs["spawner"] = spawner
executor_kwargs["max_cores"] = max_cores
executor_kwargs["max_workers"] = max_workers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mutable default argument.

The executor_kwargs parameter uses a mutable default argument which can lead to unexpected behavior if the same dictionary is shared across instances.

Apply this fix:

    def __init__(
        self,
        max_cores: Optional[int] = None,
        max_workers: Optional[int] = None,
-       executor_kwargs: dict = {},
+       executor_kwargs: Optional[dict] = None,
    ):
        super().__init__()
+       if executor_kwargs is None:
+           executor_kwargs = {}
        executor_kwargs["future_queue"] = self._future_queue

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff

184-184: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

Comment on lines +175 to +189
cores_per_worker: Optional[int] = None,
set_local_cores: bool = False,
) -> int:
"""
Validate the number of cores and return the appropriate value.
"""
if max_workers is None and max_cores is None:
return multiprocessing.cpu_count()
elif max_workers is not None and max_cores is None:
return max_workers
else:
return max_cores
if max_cores is None and max_workers is None:
if not set_local_cores:
raise ValueError(
"Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined."
)
else:
max_workers = multiprocessing.cpu_count()
elif max_cores is not None and max_workers is None:
max_workers = int(max_cores / cores_per_worker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential division by zero in validate_number_of_cores

In the calculation max_workers = int(max_cores / cores_per_worker), there is a risk of division by zero or a TypeError if cores_per_worker is None or zero. Ensure that cores_per_worker is validated to be a non-zero integer before performing the division.

Consider adding a check to validate cores_per_worker:

elif max_cores is not None and max_workers is None:
+    if cores_per_worker is None or cores_per_worker == 0:
+        raise ValueError("cores_per_worker must be a non-zero integer.")
     max_workers = int(max_cores / cores_per_worker)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cores_per_worker: Optional[int] = None,
set_local_cores: bool = False,
) -> int:
"""
Validate the number of cores and return the appropriate value.
"""
if max_workers is None and max_cores is None:
return multiprocessing.cpu_count()
elif max_workers is not None and max_cores is None:
return max_workers
else:
return max_cores
if max_cores is None and max_workers is None:
if not set_local_cores:
raise ValueError(
"Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined."
)
else:
max_workers = multiprocessing.cpu_count()
elif max_cores is not None and max_workers is None:
max_workers = int(max_cores / cores_per_worker)
cores_per_worker: Optional[int] = None,
set_local_cores: bool = False,
) -> int:
"""
Validate the number of cores and return the appropriate value.
"""
if max_cores is None and max_workers is None:
if not set_local_cores:
raise ValueError(
"Block allocation requires a fixed set of computational resources. Neither max_cores nor max_workers are defined."
)
else:
max_workers = multiprocessing.cpu_count()
elif max_cores is not None and max_workers is None:
if cores_per_worker is None or cores_per_worker == 0:
raise ValueError("cores_per_worker must be a non-zero integer.")
max_workers = int(max_cores / cores_per_worker)

@jan-janssen jan-janssen closed this Nov 9, 2024
@jan-janssen jan-janssen reopened this Nov 9, 2024
@jan-janssen
Copy link
Member Author

jan-janssen commented Nov 9, 2024

With these changes the tests start to fail sporadically. So there is still something wrong.

======================================================================
FAIL: test_meta_executor_parallel_cache (test_executor_backend_mpi.TestExecutorBackendCache.test_meta_executor_parallel_cache)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/executorlib/executorlib/tests/test_executor_backend_mpi.py", line 109, in test_meta_executor_parallel_cache
    self.assertEqual(fs_2.result(), [(1, 2, 0), (1, 2, 1)])
    ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: None != [(1, 2, 0), (1, 2, 1)]

----------------------------------------------------------------------

@jan-janssen jan-janssen marked this pull request as draft November 9, 2024 16:04
@jan-janssen jan-janssen marked this pull request as ready for review November 10, 2024 08:55
@jan-janssen jan-janssen merged commit 78062fd into main Nov 10, 2024
24 of 26 checks passed
@jan-janssen jan-janssen deleted the validate_number_of_cores branch November 10, 2024 08:55
This was referenced Nov 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[bug] For MPI parallel calculation it is required to set max_cores and max_workers
1 participant