Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel pyiron table #1050

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ci_support/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ dependencies:
- sqlalchemy =2.0.19
- tqdm =4.65.0
- traitlets =5.9.0
- pympipool =0.5.4
82 changes: 52 additions & 30 deletions pyiron_base/jobs/datamining.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from tqdm.auto import tqdm
import types
from typing import List, Tuple
from pympipool import Pool

from pyiron_base.utils.deprecate import deprecate
from pyiron_base.jobs.job.generic import GenericJob
Expand Down Expand Up @@ -233,7 +234,7 @@ def _get_new_functions(self, file: FileHDFio) -> Tuple[List, List]:
new_system_functions = []
return new_user_functions, new_system_functions

def create_table(self, file, job_status_list, enforce_update=False):
def create_table(self, file, job_status_list, processes=1, enforce_update=False):
jan-janssen marked this conversation as resolved.
Show resolved Hide resolved
"""
Create or update the table.

Expand All @@ -247,6 +248,7 @@ def create_table(self, file, job_status_list, enforce_update=False):
Args:
file (FileHDFio): HDF were the previous state of the table is stored
job_status_list (list of str): only consider jobs with these statuses
processes (int): number of parallel tasks
enforce_update (bool): if True always regenerate the table completely.
"""
# if there's new keys, apply the *new* functions to the old jobs and name the resulting table `df_new_keys`
Expand All @@ -265,8 +267,9 @@ def create_table(self, file, job_status_list, enforce_update=False):
if funct.__name__ in new_system_functions
]
df_new_keys = self._iterate_over_job_lst(
job_lst=map(self._project.inspect, self._get_job_ids()),
job_id_lst=self._get_job_ids(),
function_lst=function_lst,
processes=processes,
)
if len(df_new_keys) > 0:
self._df = pandas.concat([self._df, df_new_keys], axis="columns")
Expand All @@ -277,7 +280,9 @@ def create_table(self, file, job_status_list, enforce_update=False):
)
if len(new_jobs) > 0:
df_new_ids = self._iterate_over_job_lst(
job_lst=new_jobs, function_lst=self.add._function_lst
job_id_lst=new_jobs,
function_lst=self.add._function_lst,
processes=processes,
)
if len(df_new_ids) > 0:
self._df = pandas.concat([self._df, df_new_ids], ignore_index=True)
Expand Down Expand Up @@ -333,44 +338,37 @@ def _get_filtered_job_ids_from_project(self, recursive=True):
filter_funct = self.db_filter_function
return project_table[filter_funct(project_table)]["id"].tolist()

@staticmethod
def _apply_function_on_job(funct, job):
try:
return funct(job)
except (ValueError, TypeError):
return {}

def _apply_list_of_functions_on_job(self, job, function_lst):
diff_dict = {}
for funct in function_lst:
funct_dict = self._apply_function_on_job(funct, job)
for key, value in funct_dict.items():
diff_dict[key] = value
return diff_dict

def _iterate_over_job_lst(self, job_lst: List, function_lst: List) -> List[dict]:
def _iterate_over_job_lst(
self, job_id_lst: List, function_lst: List, processes: int
) -> List[dict]:
"""
Apply functions to job.

Any functions that raise an error are set to `None` in the final list.

Args:
job_lst (list of JobPath): all jobs to analyze
job_id_lst (list of JobPath): all jobs to analyze
function_lst (list of functions): all functions to apply on jobs. Must return a dictionary.
processes (int): number of parallel tasks

Returns:
list of dict: a list of the merged dicts from all functions for each job
"""
diff_dict_lst = []
for job_inspect in tqdm(job_lst, desc="Processing jobs"):
if self.convert_to_object:
job = job_inspect.to_object()
else:
job = job_inspect
diff_dict = self._apply_list_of_functions_on_job(
job=job, function_lst=function_lst
job_to_analyse_lst = [
[
self._project.db.get_item_by_id(job_id),
function_lst,
self.convert_to_object,
]
for job_id in job_id_lst
]
with Pool(processes) as p:
diff_dict_lst = list(
tqdm(
p.map(_apply_list_of_functions_on_job, job_to_analyse_lst),
total=len(job_to_analyse_lst),
)
)
diff_dict_lst.append(diff_dict)
self.refill_dict(diff_dict_lst)
return pandas.DataFrame(diff_dict_lst)

Expand Down Expand Up @@ -432,7 +430,7 @@ def _collect_job_update_lst(self, job_status_list, job_stored_ids=None):
and job.status in job_status_list
and self.filter_function(job)
):
job_update_lst.append(job)
job_update_lst.append(job_id)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the tricky part. To apply the filter_function the job is already loaded in inspect mode, but as the job cannot be communicated to the subprocess, it has to be loaded again inside the subprocess.

return job_update_lst

def _repr_html_(self):
Expand Down Expand Up @@ -772,6 +770,7 @@ def update_table(self, job_status_list=None):
file=hdf5_input,
job_status_list=job_status_list,
enforce_update=self._enforce_update,
processes=self.server.cores,
)
self.to_hdf()
self._pyiron_table._df.to_csv(
Expand Down Expand Up @@ -814,3 +813,26 @@ def always_true(_):

"""
return True


def _apply_function_on_job(funct, job):
try:
return funct(job)
except (ValueError, TypeError):
return {}


def _apply_list_of_functions_on_job(input_parameters):
from pyiron_base.jobs.job.path import JobPath

db_entry, function_lst, convert_to_object = input_parameters
job = JobPath.from_db_entry(db_entry)
if convert_to_object:
job = job.to_object()
job.set_input_to_read_only()
diff_dict = {}
for funct in function_lst:
funct_dict = _apply_function_on_job(funct, job)
for key, value in funct_dict.items():
diff_dict[key] = value
return diff_dict
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
'tables==3.8.0',
'tqdm==4.65.0',
'traitlets==5.9.0',
'pympipool==0.5.4',
],
cmdclass=versioneer.get_cmdclass(),

Expand Down