Skip to content

Commit

Permalink
Add linting (#555)
Browse files Browse the repository at this point in the history
* Add linting

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* some fixes

* more fixes

* more and more fixes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* more fixes

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* more fixes

* more strict

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jan-janssen and pre-commit-ci[bot] authored Feb 3, 2025
1 parent 5ec2a20 commit 11d44cc
Show file tree
Hide file tree
Showing 21 changed files with 176 additions and 117 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ repos:
hooks:
- id: ruff
name: ruff lint
args: ["--select", "I", "--fix"]
args: ["--fix"]
files: ^executorlib/
- id: ruff-format
name: ruff format
6 changes: 1 addition & 5 deletions executorlib/backend/cache_parallel.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pickle
import sys
import time
from typing import Any

import cloudpickle

Expand Down Expand Up @@ -40,10 +39,7 @@ def main() -> None:
apply_dict = backend_load_file(file_name=file_name)
apply_dict = MPI.COMM_WORLD.bcast(apply_dict, root=0)
output = apply_dict["fn"].__call__(*apply_dict["args"], **apply_dict["kwargs"])
if mpi_size_larger_one:
result = MPI.COMM_WORLD.gather(output, root=0)
else:
result = output
result = MPI.COMM_WORLD.gather(output, root=0) if mpi_size_larger_one else output
if mpi_rank_zero:
backend_write_file(
file_name=file_name,
Expand Down
16 changes: 8 additions & 8 deletions executorlib/backend/interactive_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,17 @@ def main() -> None:
input_dict = MPI.COMM_WORLD.bcast(input_dict, root=0)

# Parse input
if "shutdown" in input_dict.keys() and input_dict["shutdown"]:
if "shutdown" in input_dict and input_dict["shutdown"]:
if mpi_rank_zero:
interface_send(socket=socket, result_dict={"result": True})
interface_shutdown(socket=socket, context=context)
MPI.COMM_WORLD.Barrier()
break
elif (
"fn" in input_dict.keys()
and "init" not in input_dict.keys()
and "args" in input_dict.keys()
and "kwargs" in input_dict.keys()
"fn" in input_dict
and "init" not in input_dict
and "args" in input_dict
and "kwargs" in input_dict
):
# Execute function
try:
Expand All @@ -87,10 +87,10 @@ def main() -> None:
if mpi_rank_zero:
interface_send(socket=socket, result_dict={"result": output_reply})
elif (
"init" in input_dict.keys()
"init" in input_dict
and input_dict["init"]
and "args" in input_dict.keys()
and "kwargs" in input_dict.keys()
and "args" in input_dict
and "kwargs" in input_dict
):
memory = call_funct(input_dict=input_dict, funct=None)

Expand Down
20 changes: 10 additions & 10 deletions executorlib/backend/interactive_serial.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys
from os.path import abspath
from typing import List, Optional
from typing import Optional

from executorlib.standalone.interactive.backend import call_funct, parse_arguments
from executorlib.standalone.interactive.communication import (
Expand All @@ -11,7 +11,7 @@
)


def main(argument_lst: Optional[List[str]] = None):
def main(argument_lst: Optional[list[str]] = None):
"""
The main function of the program.
Expand Down Expand Up @@ -40,15 +40,15 @@ def main(argument_lst: Optional[List[str]] = None):
input_dict = interface_receive(socket=socket)

# Parse input
if "shutdown" in input_dict.keys() and input_dict["shutdown"]:
if "shutdown" in input_dict and input_dict["shutdown"]:
interface_send(socket=socket, result_dict={"result": True})
interface_shutdown(socket=socket, context=context)
break
elif (
"fn" in input_dict.keys()
and "init" not in input_dict.keys()
and "args" in input_dict.keys()
and "kwargs" in input_dict.keys()
"fn" in input_dict
and "init" not in input_dict
and "args" in input_dict
and "kwargs" in input_dict
):
# Execute function
try:
Expand All @@ -62,10 +62,10 @@ def main(argument_lst: Optional[List[str]] = None):
# Send output
interface_send(socket=socket, result_dict={"result": output})
elif (
"init" in input_dict.keys()
"init" in input_dict
and input_dict["init"]
and "args" in input_dict.keys()
and "kwargs" in input_dict.keys()
and "args" in input_dict
and "kwargs" in input_dict
):
memory = call_funct(input_dict=input_dict, funct=None)

Expand Down
25 changes: 16 additions & 9 deletions executorlib/base/executor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import contextlib
import queue
from concurrent.futures import (
Executor as FutureExecutor,
)
from concurrent.futures import (
Future,
)
from typing import Callable, List, Optional, Union
from typing import Callable, Optional, Union

from executorlib.standalone.inputcheck import check_resource_dict
from executorlib.standalone.queue import cancel_items_in_queue
Expand All @@ -28,7 +29,7 @@ def __init__(self, max_cores: Optional[int] = None):
cloudpickle_register(ind=3)
self._max_cores = max_cores
self._future_queue: Optional[queue.Queue] = queue.Queue()
self._process: Optional[Union[RaisingThread, List[RaisingThread]]] = None
self._process: Optional[Union[RaisingThread, list[RaisingThread]]] = None

@property
def info(self) -> Optional[dict]:
Expand All @@ -40,13 +41,13 @@ def info(self) -> Optional[dict]:
"""
if self._process is not None and isinstance(self._process, list):
meta_data_dict = self._process[0].get_kwargs().copy()
if "future_queue" in meta_data_dict.keys():
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
meta_data_dict["max_workers"] = len(self._process)
return meta_data_dict
elif self._process is not None:
meta_data_dict = self._process.get_kwargs().copy()
if "future_queue" in meta_data_dict.keys():
if "future_queue" in meta_data_dict:
del meta_data_dict["future_queue"]
return meta_data_dict
else:
Expand All @@ -62,7 +63,13 @@ def future_queue(self) -> Optional[queue.Queue]:
"""
return self._future_queue

def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Future: # type: ignore
def submit( # type: ignore
self,
fn: Callable,
*args,
resource_dict: Optional[dict] = None,
**kwargs,
) -> Future:
"""
Submits a callable to be executed with the given arguments.
Expand All @@ -87,7 +94,9 @@ def submit(self, fn: Callable, *args, resource_dict: dict = {}, **kwargs) -> Fut
Returns:
Future: A Future representing the given call.
"""
cores = resource_dict.get("cores", None)
if resource_dict is None:
resource_dict = {}
cores = resource_dict.get("cores")
if (
cores is not None
and self._max_cores is not None
Expand Down Expand Up @@ -161,7 +170,5 @@ def __del__(self):
"""
Clean-up the resources associated with the Executor.
"""
try:
with contextlib.suppress(AttributeError, RuntimeError):
self.shutdown(wait=False)
except (AttributeError, RuntimeError):
pass
8 changes: 5 additions & 3 deletions executorlib/cache/queue_spawner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import subprocess
from typing import List, Optional, Tuple, Union
from typing import Optional, Union

from pysqa import QueueAdapter

Expand All @@ -10,7 +10,7 @@

def execute_with_pysqa(
command: list,
task_dependent_lst: list[int] = [],
task_dependent_lst: Optional[list[int]] = None,
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
Expand All @@ -35,6 +35,8 @@ def execute_with_pysqa(
Returns:
int: queuing system ID
"""
if task_dependent_lst is None:
task_dependent_lst = []
check_file_exists(file_name=file_name)
queue_id = get_queue_id(file_name=file_name)
qa = QueueAdapter(
Expand Down Expand Up @@ -79,7 +81,7 @@ def _pysqa_execute_command(
split_output: bool = True,
shell: bool = False,
error_filename: str = "pysqa.err",
) -> Union[str, List[str]]:
) -> Union[str, list[str]]:
"""
A wrapper around the subprocess.check_output function. Modified from pysqa to raise an exception if the subprocess
fails to submit the job to the queue.
Expand Down
17 changes: 6 additions & 11 deletions executorlib/cache/shared.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import contextlib
import importlib.util
import os
import queue
import sys
from concurrent.futures import Future
from typing import Any, Callable, Optional, Tuple
from typing import Any, Callable, Optional

from executorlib.standalone.command import get_command_path
from executorlib.standalone.hdf import dump, get_output
Expand Down Expand Up @@ -79,15 +80,9 @@ def execute_tasks_h5(
file_name_dict: dict = {}
while True:
task_dict = None
try:
with contextlib.suppress(queue.Empty):
task_dict = future_queue.get_nowait()
except queue.Empty:
pass
if (
task_dict is not None
and "shutdown" in task_dict.keys()
and task_dict["shutdown"]
):
if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]:
if terminate_function is not None:
for task in process_dict.values():
terminate_function(task=task)
Expand All @@ -110,7 +105,7 @@ def execute_tasks_h5(
fn_kwargs=task_kwargs,
resource_dict=task_resource_dict,
)
if task_key not in memory_dict.keys():
if task_key not in memory_dict:
if task_key + ".h5out" not in os.listdir(cache_directory):
file_name = os.path.join(cache_directory, task_key + ".h5in")
dump(file_name=file_name, data_dict=data_dict)
Expand Down Expand Up @@ -204,7 +199,7 @@ def _check_task_output(

def _convert_args_and_kwargs(
task_dict: dict, memory_dict: dict, file_name_dict: dict
) -> Tuple[list, dict, list]:
) -> tuple[list, dict, list]:
"""
Convert the arguments and keyword arguments in a task dictionary to the appropriate types.
Expand Down
9 changes: 4 additions & 5 deletions executorlib/cache/subprocess_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

def execute_in_subprocess(
command: list,
task_dependent_lst: list = [],
task_dependent_lst: Optional[list] = None,
file_name: Optional[str] = None,
resource_dict: Optional[dict] = None,
config_directory: Optional[str] = None,
Expand All @@ -33,6 +33,8 @@ def execute_in_subprocess(
subprocess.Popen: The subprocess object.
"""
if task_dependent_lst is None:
task_dependent_lst = []
check_file_exists(file_name=file_name)
while len(task_dependent_lst) > 0:
task_dependent_lst = [
Expand All @@ -46,10 +48,7 @@ def execute_in_subprocess(
raise ValueError("backend parameter is not supported for subprocess spawner.")
if resource_dict is None:
resource_dict = {}
if "cwd" in resource_dict:
cwd = resource_dict["cwd"]
else:
cwd = cache_directory
cwd = resource_dict.get("cwd", cache_directory)
return subprocess.Popen(command, universal_newlines=True, cwd=cwd)


Expand Down
26 changes: 17 additions & 9 deletions executorlib/interactive/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def create_executor(
backend: str = "local",
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: dict = {},
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
Expand Down Expand Up @@ -83,6 +83,8 @@ def create_executor(
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
"""
if resource_dict is None:
resource_dict = {}
if flux_executor is not None and backend != "flux_allocation":
backend = "flux_allocation"
if backend == "flux_allocation":
Expand Down Expand Up @@ -149,7 +151,7 @@ def create_flux_allocation_executor(
max_workers: Optional[int] = None,
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: dict = {},
resource_dict: Optional[dict] = None,
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
Expand All @@ -160,16 +162,18 @@ def create_flux_allocation_executor(
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
check_init_function(block_allocation=block_allocation, init_function=init_function)
check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
if resource_dict is None:
resource_dict = {}
cores_per_worker = resource_dict.get("cores", 1)
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))
check_command_line_argument_lst(
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
)
if "openmpi_oversubscribe" in resource_dict.keys():
if "openmpi_oversubscribe" in resource_dict:
del resource_dict["openmpi_oversubscribe"]
if "slurm_cmd_args" in resource_dict.keys():
if "slurm_cmd_args" in resource_dict:
del resource_dict["slurm_cmd_args"]
resource_dict["flux_executor"] = flux_executor
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
Expand Down Expand Up @@ -206,12 +210,14 @@ def create_slurm_allocation_executor(
max_workers: Optional[int] = None,
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: dict = {},
resource_dict: Optional[dict] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
check_init_function(block_allocation=block_allocation, init_function=init_function)
if resource_dict is None:
resource_dict = {}
cores_per_worker = resource_dict.get("cores", 1)
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
Expand Down Expand Up @@ -246,12 +252,14 @@ def create_local_executor(
max_workers: Optional[int] = None,
max_cores: Optional[int] = None,
cache_directory: Optional[str] = None,
resource_dict: dict = {},
resource_dict: Optional[dict] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
) -> Union[InteractiveStepExecutor, InteractiveExecutor]:
check_init_function(block_allocation=block_allocation, init_function=init_function)
if resource_dict is None:
resource_dict = {}
cores_per_worker = resource_dict.get("cores", 1)
resource_dict["cache_directory"] = cache_directory
resource_dict["hostname_localhost"] = hostname_localhost
Expand All @@ -260,11 +268,11 @@ def create_local_executor(
check_command_line_argument_lst(
command_line_argument_lst=resource_dict.get("slurm_cmd_args", [])
)
if "threads_per_core" in resource_dict.keys():
if "threads_per_core" in resource_dict:
del resource_dict["threads_per_core"]
if "gpus_per_core" in resource_dict.keys():
if "gpus_per_core" in resource_dict:
del resource_dict["gpus_per_core"]
if "slurm_cmd_args" in resource_dict.keys():
if "slurm_cmd_args" in resource_dict:
del resource_dict["slurm_cmd_args"]
if block_allocation:
resource_dict["init_function"] = init_function
Expand Down
Loading

0 comments on commit 11d44cc

Please sign in to comment.