Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same executor for pyiron table and function container #1334

Merged
merged 54 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2660dd2
Parallel pyiron table
jan-janssen Feb 27, 2023
ff4007d
Format black
pyiron-runner Feb 27, 2023
e5aa777
Update datamining.py
jan-janssen Jul 17, 2023
c00b37d
Update environment.yml
jan-janssen Jul 17, 2023
ea71463
Update setup.py
jan-janssen Jul 17, 2023
c571e04
Merge pull request #1169 from pyiron/main
jan-janssen Jul 17, 2023
d3a1648
Update datamining.py
jan-janssen Jul 24, 2023
6240624
Merge remote-tracking branch 'origin/main' into parallel_datamining
jan-janssen Aug 18, 2023
42ef18a
update pympipool
jan-janssen Aug 18, 2023
62360a2
Merge remote-tracking branch 'origin/main' into parallel_datamining
jan-janssen Dec 17, 2023
5f2ea3b
Update to latest version of pympipool
jan-janssen Dec 17, 2023
b9ff835
Update environment-old.yml
jan-janssen Dec 17, 2023
2ac61cb
Merge branch 'main' into parallel_datamining
jan-janssen Dec 18, 2023
396fdfe
Use external executor
jan-janssen Jan 10, 2024
d3c9393
Merge branch 'main' into parallel_datamining
jan-janssen Jan 10, 2024
e75983d
Update environment-old.yml
jan-janssen Jan 10, 2024
b5d1e1c
Update environment.yml
jan-janssen Jan 10, 2024
6a2d537
Enable parallelism inside the PythonFunctionContainerJob
jan-janssen Jan 24, 2024
bd7a6b8
Bug fix
jan-janssen Jan 24, 2024
b5b0f51
fix input
jan-janssen Jan 24, 2024
3ea6e38
Fix executable specified as string
jan-janssen Jan 24, 2024
dd98e79
fix future timeout
jan-janssen Jan 24, 2024
4bf799c
black formatting
jan-janssen Jan 24, 2024
370d684
Set _get_executor() to be a private method
jan-janssen Jan 25, 2024
9196b88
Format black
pyiron-runner Jan 25, 2024
fdad5e5
Merge remote-tracking branch 'origin/main' into parallel_datamining
jan-janssen Jan 31, 2024
912c3fa
Choose ProcessPoolExecutor if no Executor is set and cores are more t…
jan-janssen Jan 31, 2024
42f2fd2
Format black
pyiron-runner Jan 31, 2024
59fde64
Merge remote-tracking branch 'origin/main' into container_with_executor
jan-janssen Jan 31, 2024
587256d
Merge remote-tracking branch 'origin/container_with_executor' into pa…
jan-janssen Feb 14, 2024
f945f78
Merge remote-tracking branch 'origin/parallel_datamining' into parall…
jan-janssen Feb 14, 2024
de73ff8
Use the same executor for pyiron table and function container
jan-janssen Feb 14, 2024
7da27ee
Format black
pyiron-runner Feb 14, 2024
dbbdf41
Fix initialization
jan-janssen Feb 14, 2024
cbf92ce
Merge remote-tracking branch 'origin/parallel_executor' into parallel…
jan-janssen Feb 14, 2024
0a80cef
Merge remote-tracking branch 'origin/main' into parallel_executor
jan-janssen Feb 15, 2024
4902cc0
Add example parallel pyiron tables
jan-janssen Feb 15, 2024
ea50156
Add parallel test for function container
jan-janssen Feb 15, 2024
65d2fb3
Move executor to GenericJob
jan-janssen Feb 16, 2024
673192b
Merge remote-tracking branch 'origin/main' into parallel_executor
jan-janssen Feb 17, 2024
8abcc60
define executor types
jan-janssen Feb 17, 2024
cf503db
Format black
pyiron-runner Feb 17, 2024
59a5872
define central executor class
jan-janssen Feb 17, 2024
c1419bc
Merge remote-tracking branch 'origin/parallel_executor' into parallel…
jan-janssen Feb 17, 2024
b0ad6e2
fixes
jan-janssen Feb 17, 2024
b26c094
fix test
jan-janssen Feb 17, 2024
1a964b6
Format black
pyiron-runner Feb 17, 2024
8b7cf13
fix docstrings
jan-janssen Feb 17, 2024
2118e21
Merge remote-tracking branch 'origin/parallel_executor' into parallel…
jan-janssen Feb 17, 2024
fd321d7
Add test for wrong assignment of executors
jan-janssen Feb 17, 2024
2a5e62c
switch to executor classes
jan-janssen Feb 18, 2024
5dda980
Format black
pyiron-runner Feb 18, 2024
c48f77a
validate executor types
jan-janssen Feb 18, 2024
d8dcc09
Merge remote-tracking branch 'origin/parallel_executor' into parallel…
jan-janssen Feb 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ci_support/environment-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies:
- pint =0.23
- psutil =5.9.8
- pyfileindex =0.0.22
- pympipool =0.7.13
- pysqa =0.1.15
- pytables =3.9.2
- sqlalchemy =2.0.26
Expand Down
1 change: 1 addition & 0 deletions .ci_support/environment-old.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies:
- pint =0.18
- psutil =5.8.0
- pyfileindex =0.0.16
- pympipool =0.7.11
- pysqa =0.1.12
- pytables =3.6.1
- sqlalchemy =2.0.22
Expand Down
1 change: 1 addition & 0 deletions .ci_support/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- pint =0.23
- psutil =5.9.8
- pyfileindex =0.0.22
- pympipool =0.7.13
- pysqa =0.1.15
- pytables =3.9.2
- sqlalchemy =2.0.26
Expand Down
114 changes: 78 additions & 36 deletions pyiron_base/jobs/datamining.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Distributed under the terms of "New BSD License", see the LICENSE file.

import codecs
import concurrent.futures
from datetime import datetime
import cloudpickle
import json
Expand Down Expand Up @@ -240,7 +241,7 @@ def _get_new_functions(self, file: FileHDFio) -> Tuple[List, List]:
new_system_functions = []
return new_user_functions, new_system_functions

def create_table(self, file, job_status_list, enforce_update=False):
def create_table(self, file, job_status_list, executor=None, enforce_update=False):
"""
Create or update the table.

Expand All @@ -254,6 +255,7 @@ def create_table(self, file, job_status_list, enforce_update=False):
Args:
file (FileHDFio): HDF were the previous state of the table is stored
job_status_list (list of str): only consider jobs with these statuses
executor (concurrent.futures.Executor): executor for parallel execution
enforce_update (bool): if True always regenerate the table completely.
"""
# if there's new keys, apply the *new* functions to the old jobs and name the resulting table `df_new_keys`
Expand All @@ -272,8 +274,9 @@ def create_table(self, file, job_status_list, enforce_update=False):
if funct.__name__ in new_system_functions
]
df_new_keys = self._iterate_over_job_lst(
job_lst=map(self._project.inspect, self._get_job_ids()),
job_id_lst=self._get_job_ids(),
function_lst=function_lst,
executor=executor,
)
if len(df_new_keys) > 0:
self._df = pandas.concat([self._df, df_new_keys], axis="columns")
Expand All @@ -284,7 +287,9 @@ def create_table(self, file, job_status_list, enforce_update=False):
)
if len(new_jobs) > 0:
df_new_ids = self._iterate_over_job_lst(
job_lst=new_jobs, function_lst=self.add._function_lst
job_id_lst=new_jobs,
function_lst=self.add._function_lst,
executor=executor,
)
if len(df_new_ids) > 0:
self._df = pandas.concat([self._df, df_new_ids], ignore_index=True)
Expand Down Expand Up @@ -340,44 +345,47 @@ def _get_filtered_job_ids_from_project(self, recursive=True):
filter_funct = self.db_filter_function
return project_table[filter_funct(project_table)]["id"].tolist()

@staticmethod
def _apply_function_on_job(funct, job):
try:
return funct(job)
except (ValueError, TypeError):
return {}

def _apply_list_of_functions_on_job(self, job, function_lst):
diff_dict = {}
for funct in function_lst:
funct_dict = self._apply_function_on_job(funct, job)
for key, value in funct_dict.items():
diff_dict[key] = value
return diff_dict

def _iterate_over_job_lst(self, job_lst: List, function_lst: List) -> List[dict]:
def _iterate_over_job_lst(
self,
job_id_lst: List,
function_lst: List,
executor: concurrent.futures.Executor = None,
) -> List[dict]:
"""
Apply functions to job.

Any functions that raise an error are set to `None` in the final list.

Args:
job_lst (list of JobPath): all jobs to analyze
function_lst (list of functions): all functions to apply on jobs. Must return a dictionary.
job_id_lst (list of int): all job ids to analyze
function_lst (list of functions): all functions to apply on jobs. Must return a dictionary.
executor (concurrent.futures.Executor): executor for parallel execution

Returns:
list of dict: a list of the merged dicts from all functions for each job
"""
diff_dict_lst = []
for job_inspect in tqdm(job_lst, desc="Processing jobs"):
if self.convert_to_object:
job = job_inspect.to_object()
else:
job = job_inspect
diff_dict = self._apply_list_of_functions_on_job(
job=job, function_lst=function_lst
job_to_analyse_lst = [
[
self._project.db.get_item_by_id(job_id),
function_lst,
self.convert_to_object,
]
for job_id in job_id_lst
]
if executor is not None:
diff_dict_lst = list(
tqdm(
executor.map(_apply_list_of_functions_on_job, job_to_analyse_lst),
total=len(job_to_analyse_lst),
)
)
else:
diff_dict_lst = list(
tqdm(
map(_apply_list_of_functions_on_job, job_to_analyse_lst),
total=len(job_to_analyse_lst),
)
)
diff_dict_lst.append(diff_dict)
self.refill_dict(diff_dict_lst)
return pandas.DataFrame(diff_dict_lst)

Expand Down Expand Up @@ -439,7 +447,7 @@ def _collect_job_update_lst(self, job_status_list, job_stored_ids=None):
and job.status in job_status_list
and self.filter_function(job)
):
job_update_lst.append(job)
job_update_lst.append(job_id)
return job_update_lst

def _repr_html_(self):
Expand Down Expand Up @@ -775,11 +783,22 @@ def update_table(self, job_status_list=None):
if self.job_id is not None:
self.project.db.item_update({"timestart": datetime.now()}, self.job_id)
with self.project_hdf5.open("input") as hdf5_input:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
)
if self._executor_type is None and self.server.cores > 1:
self._executor_type = "ProcessPoolExecutor"
if self._executor_type is not None:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor=self._get_executor(max_workers=self.server.cores),
)
else:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor=None,
)
self.to_hdf()
self._pyiron_table._df.to_csv(
os.path.join(self.working_directory, "pyirontable.csv"), index=False
Expand Down Expand Up @@ -821,3 +840,26 @@ def always_true(_):

"""
return True


def _apply_function_on_job(funct, job):
try:
return funct(job)
except (ValueError, TypeError):
return {}


def _apply_list_of_functions_on_job(input_parameters):
from pyiron_base.jobs.job.path import JobPath

db_entry, function_lst, convert_to_object = input_parameters
job = JobPath.from_db_entry(db_entry)
if convert_to_object:
job = job.to_object()
job.set_input_to_read_only()
diff_dict = {}
for funct in function_lst:
funct_dict = _apply_function_on_job(funct, job)
for key, value in funct_dict.items():
diff_dict[key] = value
return diff_dict
13 changes: 12 additions & 1 deletion pyiron_base/jobs/flex/pythonfunctioncontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class PythonFunctionContainerJob(PythonTemplateJob):
def __init__(self, project, job_name):
super().__init__(project, job_name)
self._function = None
self._executor_type = None

@property
def python_function(self):
Expand Down Expand Up @@ -82,7 +83,17 @@ def save(self):
super().save()

def run_static(self):
output = self._function(**self.input.to_builtin())
if (
self._executor_type is not None
and "executor" in inspect.signature(self._function).parameters.keys()
):
input_dict = self.input.to_builtin()
del input_dict["executor"]
output = self._function(
**input_dict, executor=self._get_executor(max_workers=self.server.cores)
)
else:
output = self._function(**self.input.to_builtin())
self.output.update({"result": output})
self.to_hdf()
self.status.finished = True
37 changes: 37 additions & 0 deletions pyiron_base/jobs/job/extension/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
def get_thread_executor(max_workers):
from concurrent.futures import ThreadPoolExecutor

return ThreadPoolExecutor(max_workers=max_workers)


def get_process_executor(max_workers):
from concurrent.futures import ProcessPoolExecutor

return ProcessPoolExecutor(max_workers=max_workers)


def get_pympipool_mpi_executor(max_workers):
from pympipool.mpi.executor import PyMPIExecutor

return PyMPIExecutor(max_workers=max_workers)


def get_pympipool_slurm_executor(max_workers):
from pympipool.slurm.executor import PySlurmExecutor

return PySlurmExecutor(max_workers=max_workers)


def get_pympipool_flux_executor(max_workers):
from pympipool.flux.executor import PyFluxExecutor

return PyFluxExecutor(max_workers=max_workers)


EXECUTORDICT = {
"ThreadPoolExecutor": get_thread_executor,
"ProcessPoolExecutor": get_process_executor,
"PyMPIPoolExecutor": get_pympipool_mpi_executor,
"PyMPIPoolSlurmExecutor": get_pympipool_slurm_executor,
"PyMPIPoolFluxExecutor": get_pympipool_flux_executor,
jan-janssen marked this conversation as resolved.
Show resolved Hide resolved
}
38 changes: 36 additions & 2 deletions pyiron_base/jobs/job/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

from pyiron_base.state import state
from pyiron_base.state.signal import catch_signals
from pyiron_base.jobs.job.extension.executable import Executable
from pyiron_base.jobs.job.extension.jobstatus import JobStatus
from pyiron_base.jobs.job.core import (
JobCore,
_doc_str_job_core_args,
_doc_str_job_core_attr,
)
from pyiron_base.jobs.job.extension.executable import Executable
from pyiron_base.jobs.job.extension.executor import EXECUTORDICT
from pyiron_base.jobs.job.extension.jobstatus import JobStatus
from pyiron_base.jobs.job.runfunction import (
run_job_with_parameter_repair,
run_job_with_status_initialized,
Expand Down Expand Up @@ -157,6 +158,7 @@ def __init__(self, project, job_name):
self._restart_file_dict = dict()
self._exclude_nodes_hdf = list()
self._exclude_groups_hdf = list()
self._executor_type = None
self._process = None
self._compress_by_default = False
self._python_only_job = False
Expand Down Expand Up @@ -369,6 +371,21 @@ def working_directory(self):
self._create_working_directory()
return self.project_hdf5.working_directory

@property
def executor_type(self):
return self._executor_type

@executor_type.setter
def executor_type(self, exe):
if isinstance(exe, str) and exe in EXECUTORDICT.keys():
self._executor_type = exe
else:
raise TypeError(
"Unknown Executor Type: Please select one of the following: {}.".format(
list(EXECUTORDICT.keys())
)
)

def collect_logfiles(self):
"""
Collect the log files of the external executable and store the information in the HDF5 file. This method has
Expand Down Expand Up @@ -1019,6 +1036,8 @@ def to_dict(self):
data_dict["server"] = self._server.to_dict()
if self._import_directory is not None:
data_dict["import_directory"] = self._import_directory
if self._executor_type is not None:
data_dict["executor_type"] = self._executor_type
return data_dict

def from_dict(self, job_dict):
Expand All @@ -1042,6 +1061,8 @@ def from_dict(self, job_dict):
self._exclude_nodes_hdf = input_dict["exclude_nodes_hdf"]
if "exclude_groups_hdf" in input_dict.keys():
self._exclude_groups_hdf = input_dict["exclude_groups_hdf"]
if "executor_type" in input_dict.keys():
self._executor_type = input_dict["executor_type"]

def to_hdf(self, hdf=None, group_name=None):
"""
Expand Down Expand Up @@ -1493,6 +1514,19 @@ def _reload_update_master(self, project, master_id):
self._logger.info("busy master: {} {}".format(master_id, self.get_job_id()))
del self

def _get_executor(self, max_workers=None):
if self._executor_type is None:
raise ValueError(
"No executor type defined - Please set self.executor_type."
)
elif (
isinstance(self._executor_type, str)
and self._executor_type in EXECUTORDICT.keys()
):
return EXECUTORDICT[self._executor_type](max_workers=max_workers)
else:
raise TypeError("The self.executor_type has to be a string.")


class GenericError(object):
def __init__(self, job):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
"pint==0.23",
"psutil==5.9.8",
"pyfileindex==0.0.22",
"pympipool==0.7.13",
"pysqa==0.1.15",
"sqlalchemy==2.0.26",
"tables==3.9.2",
Expand Down
21 changes: 21 additions & 0 deletions tests/flex/test_pythonfunctioncontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ def my_sleep_funct(a, b=8):
return a+b


def my_function_exe(a_lst, b_lst, executor):
future_lst = [executor.submit(my_function, a=a, b=b) for a, b in zip(a_lst, b_lst)]
return [future.result() for future in future_lst]


class TestPythonFunctionContainer(TestWithProject):
def test_as_job(self):
job = self.project.wrap_python_function(my_function)
Expand Down Expand Up @@ -61,3 +66,19 @@ def test_with_executor_wait(self):
self.assertFalse(job.server.future.done())
self.project.wait_for_job(job=job, interval_in_s=0.01, max_iterations=1000)
self.assertTrue(job.server.future.done())

def test_with_internal_executor(self):
job = self.project.wrap_python_function(my_function_exe)
job.input["a_lst"] = [1, 2, 3, 4]
job.input["b_lst"] = [5, 6, 7, 8]
job.server.cores = 2
with self.assertRaises(TypeError):
job.executor_type = "Executor"
with self.assertRaises(TypeError):
job.executor_type = ProcessPoolExecutor
with self.assertRaises(TypeError):
job.executor_type = None
jan-janssen marked this conversation as resolved.
Show resolved Hide resolved
job.executor_type = "ProcessPoolExecutor"
jan-janssen marked this conversation as resolved.
Show resolved Hide resolved
job.run()
self.assertEqual(job.output["result"], [6, 8, 10, 12])
self.assertTrue(job.status.finished)
Loading
Loading