Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same executor for pyiron table and function container #1334

Merged
merged 54 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2660dd2
Parallel pyiron table
jan-janssen Feb 27, 2023
ff4007d
Format black
pyiron-runner Feb 27, 2023
e5aa777
Update datamining.py
jan-janssen Jul 17, 2023
c00b37d
Update environment.yml
jan-janssen Jul 17, 2023
ea71463
Update setup.py
jan-janssen Jul 17, 2023
c571e04
Merge pull request #1169 from pyiron/main
jan-janssen Jul 17, 2023
d3a1648
Update datamining.py
jan-janssen Jul 24, 2023
6240624
Merge remote-tracking branch 'origin/main' into parallel_datamining
jan-janssen Aug 18, 2023
42ef18a
update pympipool
jan-janssen Aug 18, 2023
62360a2
Merge remote-tracking branch 'origin/main' into parallel_datamining
jan-janssen Dec 17, 2023
5f2ea3b
Update to latest version of pympipool
jan-janssen Dec 17, 2023
b9ff835
Update environment-old.yml
jan-janssen Dec 17, 2023
2ac61cb
Merge branch 'main' into parallel_datamining
jan-janssen Dec 18, 2023
396fdfe
Use external executor
jan-janssen Jan 10, 2024
d3c9393
Merge branch 'main' into parallel_datamining
jan-janssen Jan 10, 2024
e75983d
Update environment-old.yml
jan-janssen Jan 10, 2024
b5d1e1c
Update environment.yml
jan-janssen Jan 10, 2024
6a2d537
Enable parallelism inside the PythonFunctionContainerJob
jan-janssen Jan 24, 2024
bd7a6b8
Bug fix
jan-janssen Jan 24, 2024
b5b0f51
fix input
jan-janssen Jan 24, 2024
3ea6e38
Fix executable specified as string
jan-janssen Jan 24, 2024
dd98e79
fix future timeout
jan-janssen Jan 24, 2024
4bf799c
black formatting
jan-janssen Jan 24, 2024
370d684
Set _get_executor() to be a private method
jan-janssen Jan 25, 2024
9196b88
Format black
pyiron-runner Jan 25, 2024
fdad5e5
Merge remote-tracking branch 'origin/main' into parallel_datamining
jan-janssen Jan 31, 2024
912c3fa
Choose ProcessPoolExecutor if no Executor is set and cores are more t…
jan-janssen Jan 31, 2024
42f2fd2
Format black
pyiron-runner Jan 31, 2024
59fde64
Merge remote-tracking branch 'origin/main' into container_with_executor
jan-janssen Jan 31, 2024
587256d
Merge remote-tracking branch 'origin/container_with_executor' into pa…
jan-janssen Feb 14, 2024
f945f78
Merge remote-tracking branch 'origin/parallel_datamining' into parall…
jan-janssen Feb 14, 2024
de73ff8
Use the same executor for pyiron table and function container
jan-janssen Feb 14, 2024
7da27ee
Format black
pyiron-runner Feb 14, 2024
dbbdf41
Fix initialization
jan-janssen Feb 14, 2024
cbf92ce
Merge remote-tracking branch 'origin/parallel_executor' into parallel…
jan-janssen Feb 14, 2024
0a80cef
Merge remote-tracking branch 'origin/main' into parallel_executor
jan-janssen Feb 15, 2024
4902cc0
Add example parallel pyiron tables
jan-janssen Feb 15, 2024
ea50156
Add parallel test for function container
jan-janssen Feb 15, 2024
65d2fb3
Move executor to GenericJob
jan-janssen Feb 16, 2024
673192b
Merge remote-tracking branch 'origin/main' into parallel_executor
jan-janssen Feb 17, 2024
8abcc60
define executor types
jan-janssen Feb 17, 2024
cf503db
Format black
pyiron-runner Feb 17, 2024
59a5872
define central executor class
jan-janssen Feb 17, 2024
c1419bc
Merge remote-tracking branch 'origin/parallel_executor' into parallel…
jan-janssen Feb 17, 2024
b0ad6e2
fixes
jan-janssen Feb 17, 2024
b26c094
fix test
jan-janssen Feb 17, 2024
1a964b6
Format black
pyiron-runner Feb 17, 2024
8b7cf13
fix docstrings
jan-janssen Feb 17, 2024
2118e21
Merge remote-tracking branch 'origin/parallel_executor' into parallel…
jan-janssen Feb 17, 2024
fd321d7
Add test for wrong assignment of executors
jan-janssen Feb 17, 2024
2a5e62c
switch to executor classes
jan-janssen Feb 18, 2024
5dda980
Format black
pyiron-runner Feb 18, 2024
c48f77a
validate executor types
jan-janssen Feb 18, 2024
d8dcc09
Merge remote-tracking branch 'origin/parallel_executor' into parallel…
jan-janssen Feb 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ci_support/environment-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies:
- pint =0.23
- psutil =5.9.8
- pyfileindex =0.0.22
- pympipool =0.7.13
- pysqa =0.1.15
- pytables =3.9.2
- sqlalchemy =2.0.26
Expand Down
1 change: 1 addition & 0 deletions .ci_support/environment-old.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies:
- pint =0.18
- psutil =5.8.0
- pyfileindex =0.0.16
- pympipool =0.7.11
- pysqa =0.1.12
- pytables =3.6.1
- sqlalchemy =2.0.22
Expand Down
1 change: 1 addition & 0 deletions .ci_support/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- pint =0.23
- psutil =5.9.8
- pyfileindex =0.0.22
- pympipool =0.7.13
- pysqa =0.1.15
- pytables =3.9.2
- sqlalchemy =2.0.26
Expand Down
114 changes: 78 additions & 36 deletions pyiron_base/jobs/datamining.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Distributed under the terms of "New BSD License", see the LICENSE file.

import codecs
import concurrent.futures
from datetime import datetime
import cloudpickle
import json
Expand Down Expand Up @@ -240,7 +241,7 @@ def _get_new_functions(self, file: FileHDFio) -> Tuple[List, List]:
new_system_functions = []
return new_user_functions, new_system_functions

def create_table(self, file, job_status_list, enforce_update=False):
def create_table(self, file, job_status_list, executor=None, enforce_update=False):
"""
Create or update the table.

Expand All @@ -254,6 +255,7 @@ def create_table(self, file, job_status_list, enforce_update=False):
Args:
file (FileHDFio): HDF were the previous state of the table is stored
job_status_list (list of str): only consider jobs with these statuses
executor (concurrent.futures.Executor): executor for parallel execution
enforce_update (bool): if True always regenerate the table completely.
"""
# if there's new keys, apply the *new* functions to the old jobs and name the resulting table `df_new_keys`
Expand All @@ -272,8 +274,9 @@ def create_table(self, file, job_status_list, enforce_update=False):
if funct.__name__ in new_system_functions
]
df_new_keys = self._iterate_over_job_lst(
job_lst=map(self._project.inspect, self._get_job_ids()),
job_id_lst=self._get_job_ids(),
function_lst=function_lst,
executor=executor,
)
if len(df_new_keys) > 0:
self._df = pandas.concat([self._df, df_new_keys], axis="columns")
Expand All @@ -284,7 +287,9 @@ def create_table(self, file, job_status_list, enforce_update=False):
)
if len(new_jobs) > 0:
df_new_ids = self._iterate_over_job_lst(
job_lst=new_jobs, function_lst=self.add._function_lst
job_id_lst=new_jobs,
function_lst=self.add._function_lst,
executor=executor,
)
if len(df_new_ids) > 0:
self._df = pandas.concat([self._df, df_new_ids], ignore_index=True)
Expand Down Expand Up @@ -340,44 +345,47 @@ def _get_filtered_job_ids_from_project(self, recursive=True):
filter_funct = self.db_filter_function
return project_table[filter_funct(project_table)]["id"].tolist()

@staticmethod
def _apply_function_on_job(funct, job):
try:
return funct(job)
except (ValueError, TypeError):
return {}

def _apply_list_of_functions_on_job(self, job, function_lst):
diff_dict = {}
for funct in function_lst:
funct_dict = self._apply_function_on_job(funct, job)
for key, value in funct_dict.items():
diff_dict[key] = value
return diff_dict

def _iterate_over_job_lst(self, job_lst: List, function_lst: List) -> List[dict]:
def _iterate_over_job_lst(
self,
job_id_lst: List,
function_lst: List,
executor: concurrent.futures.Executor = None,
) -> List[dict]:
"""
Apply functions to job.

Any functions that raise an error are set to `None` in the final list.

Args:
job_lst (list of JobPath): all jobs to analyze
function_lst (list of functions): all functions to apply on jobs. Must return a dictionary.
job_id_lst (list of int): all job ids to analyze
function_lst (list of functions): all functions to apply on jobs. Must return a dictionary.
executor (concurrent.futures.Executor): executor for parallel execution

Returns:
list of dict: a list of the merged dicts from all functions for each job
"""
diff_dict_lst = []
for job_inspect in tqdm(job_lst, desc="Processing jobs"):
if self.convert_to_object:
job = job_inspect.to_object()
else:
job = job_inspect
diff_dict = self._apply_list_of_functions_on_job(
job=job, function_lst=function_lst
job_to_analyse_lst = [
[
self._project.db.get_item_by_id(job_id),
function_lst,
self.convert_to_object,
]
for job_id in job_id_lst
]
if executor is not None:
diff_dict_lst = list(
tqdm(
executor.map(_apply_list_of_functions_on_job, job_to_analyse_lst),
total=len(job_to_analyse_lst),
)
)
else:
diff_dict_lst = list(
tqdm(
map(_apply_list_of_functions_on_job, job_to_analyse_lst),
total=len(job_to_analyse_lst),
)
)
diff_dict_lst.append(diff_dict)
self.refill_dict(diff_dict_lst)
return pandas.DataFrame(diff_dict_lst)

Expand Down Expand Up @@ -439,7 +447,7 @@ def _collect_job_update_lst(self, job_status_list, job_stored_ids=None):
and job.status in job_status_list
and self.filter_function(job)
):
job_update_lst.append(job)
job_update_lst.append(job_id)
return job_update_lst

def _repr_html_(self):
Expand Down Expand Up @@ -775,11 +783,22 @@ def update_table(self, job_status_list=None):
if self.job_id is not None:
self.project.db.item_update({"timestart": datetime.now()}, self.job_id)
with self.project_hdf5.open("input") as hdf5_input:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
)
if self._executor_type is None and self.server.cores > 1:
self._executor_type = "concurrent.futures.ProcessPoolExecutor"
if self._executor_type is not None:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor=self._get_executor(max_workers=self.server.cores),
)
else:
self._pyiron_table.create_table(
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
executor=None,
)
self.to_hdf()
self._pyiron_table._df.to_csv(
os.path.join(self.working_directory, "pyirontable.csv"), index=False
Expand Down Expand Up @@ -821,3 +840,26 @@ def always_true(_):

"""
return True


def _apply_function_on_job(funct, job):
try:
return funct(job)
except (ValueError, TypeError):
return {}


def _apply_list_of_functions_on_job(input_parameters):
from pyiron_base.jobs.job.path import JobPath

db_entry, function_lst, convert_to_object = input_parameters
job = JobPath.from_db_entry(db_entry)
if convert_to_object:
job = job.to_object()
job.set_input_to_read_only()
diff_dict = {}
for funct in function_lst:
funct_dict = _apply_function_on_job(funct, job)
for key, value in funct_dict.items():
diff_dict[key] = value
return diff_dict
13 changes: 12 additions & 1 deletion pyiron_base/jobs/flex/pythonfunctioncontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class PythonFunctionContainerJob(PythonTemplateJob):
def __init__(self, project, job_name):
super().__init__(project, job_name)
self._function = None
self._executor_type = None

@property
def python_function(self):
Expand Down Expand Up @@ -82,7 +83,17 @@ def save(self):
super().save()

def run_static(self):
output = self._function(**self.input.to_builtin())
if (
self._executor_type is not None
and "executor" in inspect.signature(self._function).parameters.keys()
):
input_dict = self.input.to_builtin()
del input_dict["executor"]
output = self._function(
**input_dict, executor=self._get_executor(max_workers=self.server.cores)
)
else:
output = self._function(**self.input.to_builtin())
self.output.update({"result": output})
self.to_hdf()
self.status.finished = True
58 changes: 53 additions & 5 deletions pyiron_base/jobs/job/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@
Generic Job class extends the JobCore class with all the functionality to run the job object.
"""

from concurrent.futures import Future
from concurrent.futures import Future, Executor
from datetime import datetime
from inspect import isclass
import os
import posixpath
import signal
import warnings

from h5io_browser.base import _read_hdf, _write_hdf

from pyiron_base.state import state
from pyiron_base.state.signal import catch_signals
from pyiron_base.jobs.job.extension.executable import Executable
from pyiron_base.jobs.job.extension.jobstatus import JobStatus
from pyiron_base.jobs.job.core import (
JobCore,
_doc_str_job_core_args,
_doc_str_job_core_attr,
)
from pyiron_base.jobs.job.extension.executable import Executable
from pyiron_base.jobs.job.extension.jobstatus import JobStatus
from pyiron_base.jobs.job.runfunction import (
run_job_with_parameter_repair,
run_job_with_status_initialized,
Expand All @@ -44,7 +44,7 @@
_job_store_before_copy,
_job_reload_after_copy,
)
from pyiron_base.utils.instance import static_isinstance
from pyiron_base.utils.instance import static_isinstance, import_class
from pyiron_base.utils.deprecate import deprecate
from pyiron_base.jobs.job.extension.server.generic import Server
from pyiron_base.database.filetable import FileTable
Expand Down Expand Up @@ -157,6 +157,7 @@ def __init__(self, project, job_name):
self._restart_file_dict = dict()
self._exclude_nodes_hdf = list()
self._exclude_groups_hdf = list()
self._executor_type = None
self._process = None
self._compress_by_default = False
self._python_only_job = False
Expand Down Expand Up @@ -369,6 +370,39 @@ def working_directory(self):
self._create_working_directory()
return self.project_hdf5.working_directory

@property
def executor_type(self):
return self._executor_type

@executor_type.setter
def executor_type(self, exe):
if exe is None:
self._executor_type = exe
elif isinstance(exe, str):
try:
exe_class = import_class(exe) # Make sure it's available
if not (
isclass(exe_class) and issubclass(exe_class, Executor)
): # And what we want
raise TypeError(
f"{exe} imported OK, but {exe_class} is not a subclass of {Executor}"
)
except Exception as e:
raise ImportError("Something went wrong trying to import {exe}") from e
else:
self._executor_type = exe
elif isclass(exe) and issubclass(exe, Executor):
self._executor_type = f"{exe.__module__}.{exe.__name__}"
elif isinstance(exe, Executor):
raise NotImplementedError(
"We don't want to let you pass an entire executor, because you might think its state comes "
"with it. Try passing `.__class__` on this object instead."
)
else:
raise TypeError(
f"Expected an executor class or string representing one, but got {exe}"
)

def collect_logfiles(self):
"""
Collect the log files of the external executable and store the information in the HDF5 file. This method has
Expand Down Expand Up @@ -1019,6 +1053,8 @@ def to_dict(self):
data_dict["server"] = self._server.to_dict()
if self._import_directory is not None:
data_dict["import_directory"] = self._import_directory
if self._executor_type is not None:
data_dict["executor_type"] = self._executor_type
return data_dict

def from_dict(self, job_dict):
Expand All @@ -1042,6 +1078,8 @@ def from_dict(self, job_dict):
self._exclude_nodes_hdf = input_dict["exclude_nodes_hdf"]
if "exclude_groups_hdf" in input_dict.keys():
self._exclude_groups_hdf = input_dict["exclude_groups_hdf"]
if "executor_type" in input_dict.keys():
self._executor_type = input_dict["executor_type"]

def to_hdf(self, hdf=None, group_name=None):
"""
Expand Down Expand Up @@ -1493,6 +1531,16 @@ def _reload_update_master(self, project, master_id):
self._logger.info("busy master: {} {}".format(master_id, self.get_job_id()))
del self

def _get_executor(self, max_workers=None):
if self._executor_type is None:
raise ValueError(
"No executor type defined - Please set self.executor_type."
)
elif isinstance(self._executor_type, str):
return import_class(self._executor_type)(max_workers=max_workers)
else:
raise TypeError("The self.executor_type has to be a string.")


class GenericError(object):
def __init__(self, job):
Expand Down
11 changes: 11 additions & 0 deletions pyiron_base/utils/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
__date__ = "Sep 1, 2017"


import importlib


def static_isinstance(obj, obj_type):
"""
A static implementation of isinstance() - instead of comparing an object and a class, the object is compared to a
Expand All @@ -42,3 +45,11 @@ def static_isinstance(obj, obj_type):
return obj_type in obj_class_lst
else:
raise TypeError()


def import_class(class_type):
module_path, class_name = class_type.rsplit(".", maxsplit=1)
return getattr(
importlib.import_module(module_path),
class_name,
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies = [
"pint==0.23",
"psutil==5.9.8",
"pyfileindex==0.0.22",
"pympipool==0.7.13",
"pysqa==0.1.15",
"sqlalchemy==2.0.26",
"tables==3.9.2",
Expand Down
Loading
Loading