diff --git a/.ci_support/environment-docs.yml b/.ci_support/environment-docs.yml index 93639ca38..ceaa6cc99 100644 --- a/.ci_support/environment-docs.yml +++ b/.ci_support/environment-docs.yml @@ -17,6 +17,7 @@ dependencies: - pint =0.23 - psutil =5.9.8 - pyfileindex =0.0.22 +- pympipool =0.7.13 - pysqa =0.1.15 - pytables =3.9.2 - sqlalchemy =2.0.26 diff --git a/.ci_support/environment-old.yml b/.ci_support/environment-old.yml index 0b6e3d544..45e284011 100644 --- a/.ci_support/environment-old.yml +++ b/.ci_support/environment-old.yml @@ -12,6 +12,7 @@ dependencies: - pint =0.18 - psutil =5.8.0 - pyfileindex =0.0.16 +- pympipool =0.7.11 - pysqa =0.1.12 - pytables =3.6.1 - sqlalchemy =2.0.22 diff --git a/.ci_support/environment.yml b/.ci_support/environment.yml index e0b839e49..96fc4474a 100644 --- a/.ci_support/environment.yml +++ b/.ci_support/environment.yml @@ -16,6 +16,7 @@ dependencies: - pint =0.23 - psutil =5.9.8 - pyfileindex =0.0.22 +- pympipool =0.7.13 - pysqa =0.1.15 - pytables =3.9.2 - sqlalchemy =2.0.26 diff --git a/pyiron_base/jobs/datamining.py b/pyiron_base/jobs/datamining.py index f035b89f7..31c67e25c 100644 --- a/pyiron_base/jobs/datamining.py +++ b/pyiron_base/jobs/datamining.py @@ -3,6 +3,7 @@ # Distributed under the terms of "New BSD License", see the LICENSE file. import codecs +import concurrent.futures from datetime import datetime import cloudpickle import json @@ -240,7 +241,7 @@ def _get_new_functions(self, file: FileHDFio) -> Tuple[List, List]: new_system_functions = [] return new_user_functions, new_system_functions - def create_table(self, file, job_status_list, enforce_update=False): + def create_table(self, file, job_status_list, executor=None, enforce_update=False): """ Create or update the table. @@ -254,6 +255,7 @@ def create_table(self, file, job_status_list, enforce_update=False): Args: file (FileHDFio): HDF were the previous state of the table is stored job_status_list (list of str): only consider jobs with these statuses + executor (concurrent.futures.Executor): executor for parallel execution enforce_update (bool): if True always regenerate the table completely. """ # if there's new keys, apply the *new* functions to the old jobs and name the resulting table `df_new_keys` @@ -272,8 +274,9 @@ def create_table(self, file, job_status_list, enforce_update=False): if funct.__name__ in new_system_functions ] df_new_keys = self._iterate_over_job_lst( - job_lst=map(self._project.inspect, self._get_job_ids()), + job_id_lst=self._get_job_ids(), function_lst=function_lst, + executor=executor, ) if len(df_new_keys) > 0: self._df = pandas.concat([self._df, df_new_keys], axis="columns") @@ -284,7 +287,9 @@ def create_table(self, file, job_status_list, enforce_update=False): ) if len(new_jobs) > 0: df_new_ids = self._iterate_over_job_lst( - job_lst=new_jobs, function_lst=self.add._function_lst + job_id_lst=new_jobs, + function_lst=self.add._function_lst, + executor=executor, ) if len(df_new_ids) > 0: self._df = pandas.concat([self._df, df_new_ids], ignore_index=True) @@ -340,44 +345,47 @@ def _get_filtered_job_ids_from_project(self, recursive=True): filter_funct = self.db_filter_function return project_table[filter_funct(project_table)]["id"].tolist() - @staticmethod - def _apply_function_on_job(funct, job): - try: - return funct(job) - except (ValueError, TypeError): - return {} - - def _apply_list_of_functions_on_job(self, job, function_lst): - diff_dict = {} - for funct in function_lst: - funct_dict = self._apply_function_on_job(funct, job) - for key, value in funct_dict.items(): - diff_dict[key] = value - return diff_dict - - def _iterate_over_job_lst(self, job_lst: List, function_lst: List) -> List[dict]: + def _iterate_over_job_lst( + self, + job_id_lst: List, + function_lst: List, + executor: concurrent.futures.Executor = None, + ) -> List[dict]: """ Apply functions to job. Any functions that raise an error are set to `None` in the final list. Args: - job_lst (list of JobPath): all jobs to analyze - function_lst (list of functions): all functions to apply on jobs. Must return a dictionary. + job_id_lst (list of int): all job ids to analyze + function_lst (list of functions): all functions to apply on jobs. Must return a dictionary. + executor (concurrent.futures.Executor): executor for parallel execution Returns: list of dict: a list of the merged dicts from all functions for each job """ - diff_dict_lst = [] - for job_inspect in tqdm(job_lst, desc="Processing jobs"): - if self.convert_to_object: - job = job_inspect.to_object() - else: - job = job_inspect - diff_dict = self._apply_list_of_functions_on_job( - job=job, function_lst=function_lst + job_to_analyse_lst = [ + [ + self._project.db.get_item_by_id(job_id), + function_lst, + self.convert_to_object, + ] + for job_id in job_id_lst + ] + if executor is not None: + diff_dict_lst = list( + tqdm( + executor.map(_apply_list_of_functions_on_job, job_to_analyse_lst), + total=len(job_to_analyse_lst), + ) + ) + else: + diff_dict_lst = list( + tqdm( + map(_apply_list_of_functions_on_job, job_to_analyse_lst), + total=len(job_to_analyse_lst), + ) ) - diff_dict_lst.append(diff_dict) self.refill_dict(diff_dict_lst) return pandas.DataFrame(diff_dict_lst) @@ -439,7 +447,7 @@ def _collect_job_update_lst(self, job_status_list, job_stored_ids=None): and job.status in job_status_list and self.filter_function(job) ): - job_update_lst.append(job) + job_update_lst.append(job_id) return job_update_lst def _repr_html_(self): @@ -775,11 +783,22 @@ def update_table(self, job_status_list=None): if self.job_id is not None: self.project.db.item_update({"timestart": datetime.now()}, self.job_id) with self.project_hdf5.open("input") as hdf5_input: - self._pyiron_table.create_table( - file=hdf5_input, - job_status_list=job_status_list, - enforce_update=self._enforce_update, - ) + if self._executor_type is None and self.server.cores > 1: + self._executor_type = "concurrent.futures.ProcessPoolExecutor" + if self._executor_type is not None: + self._pyiron_table.create_table( + file=hdf5_input, + job_status_list=job_status_list, + enforce_update=self._enforce_update, + executor=self._get_executor(max_workers=self.server.cores), + ) + else: + self._pyiron_table.create_table( + file=hdf5_input, + job_status_list=job_status_list, + enforce_update=self._enforce_update, + executor=None, + ) self.to_hdf() self._pyiron_table._df.to_csv( os.path.join(self.working_directory, "pyirontable.csv"), index=False @@ -821,3 +840,26 @@ def always_true(_): """ return True + + +def _apply_function_on_job(funct, job): + try: + return funct(job) + except (ValueError, TypeError): + return {} + + +def _apply_list_of_functions_on_job(input_parameters): + from pyiron_base.jobs.job.path import JobPath + + db_entry, function_lst, convert_to_object = input_parameters + job = JobPath.from_db_entry(db_entry) + if convert_to_object: + job = job.to_object() + job.set_input_to_read_only() + diff_dict = {} + for funct in function_lst: + funct_dict = _apply_function_on_job(funct, job) + for key, value in funct_dict.items(): + diff_dict[key] = value + return diff_dict diff --git a/pyiron_base/jobs/flex/pythonfunctioncontainer.py b/pyiron_base/jobs/flex/pythonfunctioncontainer.py index 0876e79b0..43cb5c33b 100644 --- a/pyiron_base/jobs/flex/pythonfunctioncontainer.py +++ b/pyiron_base/jobs/flex/pythonfunctioncontainer.py @@ -43,6 +43,7 @@ class PythonFunctionContainerJob(PythonTemplateJob): def __init__(self, project, job_name): super().__init__(project, job_name) self._function = None + self._executor_type = None @property def python_function(self): @@ -82,7 +83,17 @@ def save(self): super().save() def run_static(self): - output = self._function(**self.input.to_builtin()) + if ( + self._executor_type is not None + and "executor" in inspect.signature(self._function).parameters.keys() + ): + input_dict = self.input.to_builtin() + del input_dict["executor"] + output = self._function( + **input_dict, executor=self._get_executor(max_workers=self.server.cores) + ) + else: + output = self._function(**self.input.to_builtin()) self.output.update({"result": output}) self.to_hdf() self.status.finished = True diff --git a/pyiron_base/jobs/job/generic.py b/pyiron_base/jobs/job/generic.py index e26cf1de5..8242593f5 100644 --- a/pyiron_base/jobs/job/generic.py +++ b/pyiron_base/jobs/job/generic.py @@ -5,24 +5,24 @@ Generic Job class extends the JobCore class with all the functionality to run the job object. """ -from concurrent.futures import Future +from concurrent.futures import Future, Executor from datetime import datetime +from inspect import isclass import os import posixpath -import signal import warnings from h5io_browser.base import _read_hdf, _write_hdf from pyiron_base.state import state from pyiron_base.state.signal import catch_signals -from pyiron_base.jobs.job.extension.executable import Executable -from pyiron_base.jobs.job.extension.jobstatus import JobStatus from pyiron_base.jobs.job.core import ( JobCore, _doc_str_job_core_args, _doc_str_job_core_attr, ) +from pyiron_base.jobs.job.extension.executable import Executable +from pyiron_base.jobs.job.extension.jobstatus import JobStatus from pyiron_base.jobs.job.runfunction import ( run_job_with_parameter_repair, run_job_with_status_initialized, @@ -44,7 +44,7 @@ _job_store_before_copy, _job_reload_after_copy, ) -from pyiron_base.utils.instance import static_isinstance +from pyiron_base.utils.instance import static_isinstance, import_class from pyiron_base.utils.deprecate import deprecate from pyiron_base.jobs.job.extension.server.generic import Server from pyiron_base.database.filetable import FileTable @@ -157,6 +157,7 @@ def __init__(self, project, job_name): self._restart_file_dict = dict() self._exclude_nodes_hdf = list() self._exclude_groups_hdf = list() + self._executor_type = None self._process = None self._compress_by_default = False self._python_only_job = False @@ -369,6 +370,39 @@ def working_directory(self): self._create_working_directory() return self.project_hdf5.working_directory + @property + def executor_type(self): + return self._executor_type + + @executor_type.setter + def executor_type(self, exe): + if exe is None: + self._executor_type = exe + elif isinstance(exe, str): + try: + exe_class = import_class(exe) # Make sure it's available + if not ( + isclass(exe_class) and issubclass(exe_class, Executor) + ): # And what we want + raise TypeError( + f"{exe} imported OK, but {exe_class} is not a subclass of {Executor}" + ) + except Exception as e: + raise ImportError("Something went wrong trying to import {exe}") from e + else: + self._executor_type = exe + elif isclass(exe) and issubclass(exe, Executor): + self._executor_type = f"{exe.__module__}.{exe.__name__}" + elif isinstance(exe, Executor): + raise NotImplementedError( + "We don't want to let you pass an entire executor, because you might think its state comes " + "with it. Try passing `.__class__` on this object instead." + ) + else: + raise TypeError( + f"Expected an executor class or string representing one, but got {exe}" + ) + def collect_logfiles(self): """ Collect the log files of the external executable and store the information in the HDF5 file. This method has @@ -1019,6 +1053,8 @@ def to_dict(self): data_dict["server"] = self._server.to_dict() if self._import_directory is not None: data_dict["import_directory"] = self._import_directory + if self._executor_type is not None: + data_dict["executor_type"] = self._executor_type return data_dict def from_dict(self, job_dict): @@ -1042,6 +1078,8 @@ def from_dict(self, job_dict): self._exclude_nodes_hdf = input_dict["exclude_nodes_hdf"] if "exclude_groups_hdf" in input_dict.keys(): self._exclude_groups_hdf = input_dict["exclude_groups_hdf"] + if "executor_type" in input_dict.keys(): + self._executor_type = input_dict["executor_type"] def to_hdf(self, hdf=None, group_name=None): """ @@ -1493,6 +1531,16 @@ def _reload_update_master(self, project, master_id): self._logger.info("busy master: {} {}".format(master_id, self.get_job_id())) del self + def _get_executor(self, max_workers=None): + if self._executor_type is None: + raise ValueError( + "No executor type defined - Please set self.executor_type." + ) + elif isinstance(self._executor_type, str): + return import_class(self._executor_type)(max_workers=max_workers) + else: + raise TypeError("The self.executor_type has to be a string.") + class GenericError(object): def __init__(self, job): diff --git a/pyiron_base/utils/instance.py b/pyiron_base/utils/instance.py index 4e5cd7928..8c29baea1 100644 --- a/pyiron_base/utils/instance.py +++ b/pyiron_base/utils/instance.py @@ -19,6 +19,9 @@ __date__ = "Sep 1, 2017" +import importlib + + def static_isinstance(obj, obj_type): """ A static implementation of isinstance() - instead of comparing an object and a class, the object is compared to a @@ -42,3 +45,11 @@ def static_isinstance(obj, obj_type): return obj_type in obj_class_lst else: raise TypeError() + + +def import_class(class_type): + module_path, class_name = class_type.rsplit(".", maxsplit=1) + return getattr( + importlib.import_module(module_path), + class_name, + ) diff --git a/pyproject.toml b/pyproject.toml index 340f915d3..79a4c00b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "pint==0.23", "psutil==5.9.8", "pyfileindex==0.0.22", + "pympipool==0.7.13", "pysqa==0.1.15", "sqlalchemy==2.0.26", "tables==3.9.2", diff --git a/tests/flex/test_pythonfunctioncontainer.py b/tests/flex/test_pythonfunctioncontainer.py index e41a7029c..fa261853c 100644 --- a/tests/flex/test_pythonfunctioncontainer.py +++ b/tests/flex/test_pythonfunctioncontainer.py @@ -14,6 +14,11 @@ def my_sleep_funct(a, b=8): return a+b +def my_function_exe(a_lst, b_lst, executor): + future_lst = [executor.submit(my_function, a=a, b=b) for a, b in zip(a_lst, b_lst)] + return [future.result() for future in future_lst] + + class TestPythonFunctionContainer(TestWithProject): def test_as_job(self): job = self.project.wrap_python_function(my_function) @@ -61,3 +66,21 @@ def test_with_executor_wait(self): self.assertFalse(job.server.future.done()) self.project.wait_for_job(job=job, interval_in_s=0.01, max_iterations=1000) self.assertTrue(job.server.future.done()) + + def test_with_internal_executor(self): + job = self.project.wrap_python_function(my_function_exe) + job.input["a_lst"] = [1, 2, 3, 4] + job.input["b_lst"] = [5, 6, 7, 8] + job.server.cores = 2 + with self.assertRaises(ImportError): + job.executor_type = "Executor" + job.executor_type = ProcessPoolExecutor + self.assertTrue(isinstance(job._get_executor(max_workers=2), ProcessPoolExecutor)) + job.executor_type = None + with self.assertRaises(ValueError): + job._get_executor(max_workers=2) + job.executor_type = "concurrent.futures.ProcessPoolExecutor" + self.assertTrue(isinstance(job._get_executor(max_workers=2), ProcessPoolExecutor)) + job.run() + self.assertEqual(job.output["result"], [6, 8, 10, 12]) + self.assertTrue(job.status.finished) diff --git a/tests/table/test_datamining.py b/tests/table/test_datamining.py index 76e33bf0f..19e0b4afa 100644 --- a/tests/table/test_datamining.py +++ b/tests/table/test_datamining.py @@ -9,6 +9,13 @@ from pyiron_base._tests import TestWithProject, ToyJob +try: + import pympipool + skip_parallel_test = False +except ImportError: + skip_parallel_test = True + + class TestProjectData(TestWithProject): @classmethod @@ -60,5 +67,31 @@ def test_numpy_reload(self): "Numpy values not read correctly.") +@unittest.skipIf(skip_parallel_test, "pympipool is not installed, so the pympipool based tests are skipped.") +class TestProjectDataParallel(TestWithProject): + @classmethod + def setUpClass(cls): + super().setUpClass() + for i, c in enumerate("abcd"): + j = cls.project.create_job(ToyJob, f"test_{c}") + j.input['input_energy'] = i + j.run() + + def test_parallel(self): + """Filter functions should restrict jobs included in the table.""" + table: pyiron_base.TableJob = self.project.create.table('test_table') + table.filter_function = lambda j: j.name in ["test_a", "test_b"] + table.add['name'] = lambda j: j.name + table.add['array'] = lambda j: np.arange(8) + table.server.cores = 2 + table.executor_type = "pympipool.mpi.executor.PyMPIExecutor" + table.run() + df = table.get_dataframe() + self.assertEqual(2, len(df), "Table not correctly filtered.") + self.assertEqual(["test_a", "test_b"], df.name.to_list(), "Table not correctly filtered.") + self.assertTrue(table.status.finished) + self.project.remove_job(table.name) + + if __name__ == '__main__': unittest.main()