From aa3d40b643fea29ee6735eb028e4e5e4f66d3759 Mon Sep 17 00:00:00 2001 From: Aliaksandr Yakutovich Date: Tue, 7 Jul 2020 21:27:24 +0200 Subject: [PATCH 1/2] Base RaspaBaseWorkChain on BaseRestartWorkChain from aiida-core. (#56) * Migrate all the examples to the new API * Remove redundant files (old base restart and related things) * Inject all process_handlers in the base restart work chain. --- Dockerfile | 4 +- aiida_raspa/utils/__init__.py | 5 +- aiida_raspa/utils/inspection_tools.py | 147 +------- aiida_raspa/utils/other_utilities.py | 154 -------- aiida_raspa/workchains/aiida_base_restart.py | 356 ------------------ aiida_raspa/workchains/base.py | 178 ++++++++- .../example_base_restart_timeout.py | 9 +- .../example_base_workchain_gcmc_2comp.py | 11 +- .../example_base_workchain_widom_1comp.py | 12 +- .../example_base_workchain_widom_2comp.py | 11 +- ... => too_long_base_workchain_gemc_1comp.py} | 15 +- 11 files changed, 194 insertions(+), 708 deletions(-) delete mode 100644 aiida_raspa/utils/other_utilities.py delete mode 100644 aiida_raspa/workchains/aiida_base_restart.py rename examples/workchains/{example_base_workchain_gemc_1comp.py => too_long_base_workchain_gemc_1comp.py} (88%) diff --git a/Dockerfile b/Dockerfile index 2868314..bed9507 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,8 +17,8 @@ RUN pip install coveralls # Install necessary codes to build RASPA2. RUN apt-get clean && rm -rf /var/lib/apt/lists/* && apt-get update && apt-get install -y --no-install-recommends \ automake \ - libtool - + libtool \ + mpich # Download, compile and install RASPA into ~/code folder. RUN git clone https://github.com/iRASPA/RASPA2.git RASPA2 diff --git a/aiida_raspa/utils/__init__.py b/aiida_raspa/utils/__init__.py index 9a578ce..7aac229 100644 --- a/aiida_raspa/utils/__init__.py +++ b/aiida_raspa/utils/__init__.py @@ -2,7 +2,4 @@ """Raspa utils.""" from .base_parser import parse_base_output from .base_input_generator import RaspaInput -from .inspection_tools import check_widom_convergence, check_gcmc_convergence, check_gemc_convergence -from .inspection_tools import check_gemc_box, add_write_binary_restart -from .other_utilities import UnexpectedCalculationFailure, ErrorHandlerReport -from .other_utilities import prepare_process_inputs, register_error_handler +from .inspection_tools import add_write_binary_restart, modify_number_of_cycles, increase_box_lenght diff --git a/aiida_raspa/utils/inspection_tools.py b/aiida_raspa/utils/inspection_tools.py index ab84948..5511b80 100644 --- a/aiida_raspa/utils/inspection_tools.py +++ b/aiida_raspa/utils/inspection_tools.py @@ -2,9 +2,7 @@ """RASPA inspection tools""" from aiida.engine import calcfunction -from aiida.orm import Dict, Int, Str, Float - -from .other_utilities import ErrorHandlerReport +from aiida.orm import Dict @calcfunction @@ -14,7 +12,6 @@ def add_write_binary_restart(input_dict, write_every): return input_dict if input_dict.get_dict() == final_dict else Dict(dict=final_dict) -@calcfunction def modify_number_of_cycles(input_dict, additional_init_cycle, additional_prod_cycle): """Modify number of cycles to improve the convergence.""" final_dict = input_dict.get_dict() @@ -52,145 +49,3 @@ def increase_box_lenght(input_dict, box_name, box_length_current): final_dict["System"][box_name.value]["BoxLengths"] = "{} {} {}".format(*box_one_length_new) return Dict(dict=final_dict) - - -def check_widom_convergence(workchain, calc, conv_threshold=0.1, additional_cycle=0): - """ - Checks whether a Widom particle insertion is converged. - Checking is based on the error bar on Henry coefficient. - """ - output_widom = calc.outputs.output_parameters.get_dict() - structure_label = list(calc.get_incoming().nested()['framework'].keys())[0] - conv_stat = [] - - for comp in calc.inputs.parameters['Component']: - kh_average_comp = output_widom[structure_label]["components"][comp]["henry_coefficient_average"] - kh_dev_comp = output_widom[structure_label]["components"][comp]["henry_coefficient_dev"] - - error = round((kh_dev_comp / kh_average_comp), 2) - if error <= conv_threshold: - conv_stat.append(True) - else: - conv_stat.append(False) - - if not all(conv_stat): - workchain.report("Widom particle insertion calculation is NOT converged: repeating with more trials...") - workchain.ctx.inputs.retrieved_parent_folder = calc.outputs['retrieved'] - workchain.ctx.inputs.parameters = modify_number_of_cycles(workchain.ctx.inputs.parameters, - additional_init_cycle=Int(0), - additional_prod_cycle=Int(additional_cycle)) - return ErrorHandlerReport(True, False) - - return None - - -def check_gcmc_convergence(workchain, calc, conv_threshold=0.1, additional_init_cycle=0, additional_prod_cycle=0): - """ - Checks whether a GCMC calc is converged. - Checking is based on the error bar on average loading. - """ - output_gcmc = calc.outputs.output_parameters.get_dict() - structure_label = list(calc.get_incoming().nested()['framework'].keys())[0] - conv_stat = [] - - for comp in calc.inputs.parameters['Component']: - - loading_average_comp = output_gcmc[structure_label]["components"][comp]["loading_absolute_average"] - loading_dev_comp = output_gcmc[structure_label]["components"][comp]["loading_absolute_dev"] - - # It can happen for weekly adsorbed species. - # we need to think about a better way to handle it. - # Currently, if it happens for five iterations, workchain will not continue. - if loading_average_comp == 0: - conv_stat.append(False) - else: - error = round((loading_dev_comp / loading_average_comp), 2) - if error <= conv_threshold: - conv_stat.append(True) - else: - conv_stat.append(False) - - if not all(conv_stat): - workchain.report("GCMC calculation is NOT converged: continuing from restart...") - workchain.ctx.inputs.retrieved_parent_folder = calc.outputs['retrieved'] - workchain.ctx.inputs.parameters = modify_number_of_cycles(workchain.ctx.inputs.parameters, - additional_init_cycle=Int(additional_init_cycle), - additional_prod_cycle=Int(additional_prod_cycle)) - return ErrorHandlerReport(True, False) - - return None - - -def check_gemc_convergence(workchain, calc, conv_threshold=0.1, additional_init_cycle=0, additional_prod_cycle=0): - """ - Checks whether a GCMC calc is converged. - Checking is based on the error bar on average loading which is - average number of molecules in each simulation box. - """ - output_gemc = calc.outputs.output_parameters.get_dict() - conv_stat = [] - - for comp in calc.inputs.parameters['Component']: - molec_per_box1_comp_average = output_gemc['box_one']["components"][comp]["loading_absolute_average"] - molec_per_box2_comp_average = output_gemc['box_two']["components"][comp]["loading_absolute_average"] - molec_per_box1_comp_dev = output_gemc['box_one']["components"][comp]["loading_absolute_dev"] - molec_per_box2_comp_dev = output_gemc['box_two']["components"][comp]["loading_absolute_dev"] - - error_box1 = round((molec_per_box1_comp_dev / molec_per_box1_comp_average), 2) - error_box2 = round((molec_per_box2_comp_dev / molec_per_box2_comp_average), 2) - - if (error_box1 <= conv_threshold) and (error_box2 <= conv_threshold): - conv_stat.append(True) - else: - conv_stat.append(False) - - if not all(conv_stat): - workchain.report("GEMC calculation is NOT converged: continuing from restart...") - workchain.ctx.inputs.retrieved_parent_folder = calc.outputs['retrieved'] - workchain.ctx.inputs.parameters = modify_number_of_cycles(workchain.ctx.inputs.parameters, - additional_init_cycle=Int(additional_init_cycle), - additional_prod_cycle=Int(additional_prod_cycle)) - return ErrorHandlerReport(True, False) - - return None - - -def check_gemc_box(workchain, calc): - """ - Checks whether each simulation box still satisfies minimum image convention. - """ - output_gemc = calc.outputs.output_parameters.get_dict() - cutoff = calc.inputs.parameters['GeneralSettings']['CutOff'] - box_one_stat = [] - box_two_stat = [] - - box_one_length_current = [] - box_two_length_current = [] - - for box_len_ave in ["box_ax_average", "box_by_average", "box_cz_average"]: - if output_gemc["box_one"]["general"][box_len_ave] > 2 * cutoff: - box_one_stat.append(True) - else: - box_one_stat.append(False) - box_one_length_current.append(output_gemc["box_one"]["general"][box_len_ave]) - - if output_gemc["box_two"]["general"][box_len_ave] > 2 * cutoff: - box_two_stat.append(True) - else: - box_two_stat.append(False) - box_two_length_current.append(output_gemc["box_two"]["general"][box_len_ave]) - - if not all(box_one_stat and box_two_stat): - workchain.report("GEMC box is NOT converged: repeating with increase box...") - # Fixing the issue. - if not all(box_one_stat): - workchain.ctx.inputs.parameters = increase_box_lenght(workchain.ctx.inputs.parameters, Str("box_one"), - Float(box_one_length_current[0])) - - if not all(box_two_stat): - workchain.ctx.inputs.parameters = increase_box_lenght(workchain.ctx.inputs.parameters, Str("box_two"), - Float(box_two_length_current[0])) - - return ErrorHandlerReport(True, False) - - return None diff --git a/aiida_raspa/utils/other_utilities.py b/aiida_raspa/utils/other_utilities.py deleted file mode 100644 index dfe527c..0000000 --- a/aiida_raspa/utils/other_utilities.py +++ /dev/null @@ -1,154 +0,0 @@ -# -*- coding: utf-8 -*- -"""Other utilities.""" -from collections import namedtuple -from functools import wraps - -from aiida.common import AiidaException, AttributeDict -from aiida.engine import ExitCode -from aiida.orm import Dict - - -class UnexpectedCalculationFailure(AiidaException): - """Raised when a calculation job has failed for an unexpected or unrecognized reason.""" - - -ErrorHandler = namedtuple('ErrorHandler', 'priority method') -"""A namedtuple to define an error handler for a :class:`~aiida.engine.processes.workchains.workchain.WorkChain`. -The priority determines in which order the error handling methods are executed, with -the higher priority being executed first. The method defines an unbound WorkChain method -that takes an instance of a :class:`~aiida.orm.CalcJobNode` -as its sole argument. If the condition of the error handler is met, it should return an :class:`.ErrorHandlerReport`. -:param priority: integer denoting the error handlers priority -:param method: the workchain class method -""" - -ErrorHandlerReport = namedtuple('ErrorHandlerReport', 'is_handled do_break exit_code') -ErrorHandlerReport.__new__.__defaults__ = (False, False, ExitCode()) -""" -A namedtuple to define an error handler report for a :class:`~aiida.engine.processes.workchains.workchain.WorkChain`. -This namedtuple should be returned by an error handling method of a workchain instance if -the condition of the error handling was met by the failure mode of the calculation. -If the error was appriopriately handled, the 'is_handled' field should be set to `True`, -and `False` otherwise. If no further error handling should be performed after this method -the 'do_break' field should be set to `True` -:param is_handled: boolean, set to `True` when an error was handled, default is `False` -:param do_break: boolean, set to `True` if no further error handling should be performed, default is `False` -:param exit_code: an instance of the :class:`~aiida.engine.processes.exit_code.ExitCode` tuple -""" - - -def prepare_process_inputs(process, inputs): - """Prepare the inputs for submission for the given process, according to its spec. - - That is to say that when an input is found in the inputs that corresponds to an input port in the spec of the - process that expects a `Dict`, yet the value in the inputs is a plain dictionary, the value will be wrapped in by - the `Dict` class to create a valid input. - - :param process: sub class of `Process` for which to prepare the inputs dictionary - :param inputs: a dictionary of inputs intended for submission of the process - :return: a dictionary with all bare dictionaries wrapped in `Dict` if dictated by the process spec - """ - prepared_inputs = wrap_bare_dict_inputs(process.spec().inputs, inputs) - return AttributeDict(prepared_inputs) - - -def wrap_bare_dict_inputs(port_namespace, inputs): - """Wrap bare dictionaries in `inputs` in a `Dict` node if dictated by the corresponding port in given namespace. - - :param port_namespace: a `PortNamespace` - :param inputs: a dictionary of inputs intended for submission of the process - :return: a dictionary with all bare dictionaries wrapped in `Dict` if dictated by the port namespace - """ - from aiida.engine.processes import PortNamespace - - wrapped = {} - - for key, value in inputs.items(): - - if key not in port_namespace: - wrapped[key] = value - continue - - port = port_namespace[key] - - if isinstance(port, PortNamespace): - wrapped[key] = wrap_bare_dict_inputs(port, value) - elif port.valid_type == Dict and isinstance(value, dict): - wrapped[key] = Dict(dict=value) - else: - wrapped[key] = value - - return wrapped - - -def register_error_handler(cls, priority=None): - """Decorate any function in an error handler :class:`.BaseRestartWorkChain` sub classes. - - The function expects two arguments, a workchain class and a priortity. The decorator will add the function as a - class method to the workchain class and add an :class:`.ErrorHandler` tuple to the - ``_error_handlers`` attribute of the workchain. During failed calculation handling the - :meth:`.inspect_calculation` outline method will call the `_handle_calculation_failure` which will loop over all - error handler in the ``_error_handlers``, sorted with respect to the priority in reverse. - If the workchain class defines a :attr:`.BaseRestartWorkChain._verbose` attribute and is set to `True`, a report - message will be fired when the error handler is executed. - - Requirements on the function signature of error handling functions. - - The function to which the decorator is applied needs to take two arguments: - - * `self`: This is the instance of the workchain itself - * `calculation`: This is the calculation that failed and needs to be investigated - - The function body should usually consist of a single conditional that checks the calculation if - the error that it is designed to handle is applicable. Although not required, it is advised that - the function return an :class:`.ErrorHandlerReport` tuple when its conditional was met. If an error was handled - it should set `is_handled` to `True`. If no other error handlers should be considered set `do_break` to `True`. - - :param cls: the workchain class to register the error handler with - - :param priority: optional integer that defines the order in which registered handlers will be called - during the handling of a failed calculation. Higher priorities will be handled first. If the priority is `None` - the handler will not be automatically called during calculation failure handling. This is useful to define - handlers that one only wants to call manually, for example in the `_handle_sanity_checks` and still profit - from the other features of this decorator. - - """ - - def error_handler_decorator(handler): - """Decorate a function to dynamically register an error handler to a `WorkChain` class.""" - - @wraps(handler) - def error_handler(self, calculation): - """Wrap error handler to add a log to the report if the handler is called and verbosity is turned on.""" - if hasattr(cls, '_verbose') and cls._verbose: # pylint: disable=protected-access - if priority: - self.report('({}){}'.format(priority, handler.__name__)) - else: - self.report('{}'.format(handler.__name__)) - - result = handler(self, calculation) - - # If a handler report is returned, attach the handler's name to node's attributes - if isinstance(result, ErrorHandlerReport): - try: - errors_handled = self.node.get_extra('errors_handled', []) - current_calculation = errors_handled[-1] - except IndexError: - # The extra was never initialized, so we skip this functionality - pass - else: - # Append the name of the handler to the last list in `errors_handled` and save it - current_calculation.append(handler.__name__) - self.node.set_extra('errors_handled', errors_handled) - - return result - - setattr(cls, handler.__name__, error_handler) - - if not hasattr(cls, '_error_handlers'): - cls._error_handlers = [] # pylint: disable=protected-access - cls._error_handlers.append(ErrorHandler(priority, error_handler)) # pylint: disable=protected-access - - return error_handler - - return error_handler_decorator diff --git a/aiida_raspa/workchains/aiida_base_restart.py b/aiida_raspa/workchains/aiida_base_restart.py deleted file mode 100644 index ddc4a9e..0000000 --- a/aiida_raspa/workchains/aiida_base_restart.py +++ /dev/null @@ -1,356 +0,0 @@ -# -*- coding: utf-8 -*- -# pylint: disable=inconsistent-return-statements,no-member -"""Base implementation of `WorkChain` class that implements a simple automated restart mechanism for calculations.""" - -from aiida import orm -from aiida.common import exceptions -from aiida.common.lang import override -from aiida.engine import ExitCode -from aiida.engine import CalcJob, WorkChain, ToContext, append_ -from aiida.plugins.entry_point import get_entry_point_names, load_entry_point -from aiida_raspa.utils import ErrorHandlerReport, UnexpectedCalculationFailure, prepare_process_inputs - - -class BaseRestartWorkChain(WorkChain): - """Base restart work chain. - - This work chain serves as the starting point for more complex work chains that will be designed to run a calculation - that might need multiple restarts to come to a successful end. These restarts may be necessary because a single - calculation run is not sufficient to achieve a fully converged result, or certain errors maybe encountered which - are recoverable. - - This work chain implements the most basic functionality to achieve this goal. It will launch calculations, - restarting until it is completed successfully or the maximum number of iterations is reached. It can recover from - errors through error handlers that can be attached dynamically through the `register_error_handler` decorator. - - The idea is to sub class this work chain and leverage the generic error handling that is implemented in the few - outline methods. The minimally required outline would look something like the following:: - - cls.setup - while_(cls.should_run_calculation)( - cls.run_calculation, - cls.inspect_calculation, - ) - - Each of these methods can of course be overriden but they should be general enough to fit most calculation cycles. - The `run_calculation` method will take the inputs for the calculation process from the context under the key - `inputs`. The user should therefore make sure that before the `run_calculation` method is called, that the to be - used inputs are stored under `self.ctx.inputs`. One can update the inputs based on the results from a prior - calculation by calling an outline method just before the `run_calculation` step, for example:: - - cls.setup - while_(cls.should_run_calculation)( - cls.prepare_calculation, - cls.run_calculation, - cls.inspect_calculation, - ) - - Where in the `prepare_calculation` method, the inputs dictionary at `self.ctx.inputs` is updated before the next - calculation will be run with those inputs. - - The `_calculation_class` attribute should be set to the `CalcJob` class that should be run in the loop. - """ - - _verbose = False - _calculation_class = None - _error_handler_entry_point = None - - def __init__(self, *args, **kwargs): - """Construct the instance.""" - super().__init__(*args, **kwargs) - - if self._calculation_class is None or not issubclass(self._calculation_class, CalcJob): - raise ValueError('no valid CalcJob class defined for `_calculation_class` attribute') - - self._load_error_handlers() - - @override - def load_instance_state(self, saved_state, load_context): - """Load the process instance from a saved state. - - :param saved_state: saved state of existing process instance - :param load_context: context for loading instance state - """ - super().load_instance_state(saved_state, load_context) - self._load_error_handlers() - - def _load_error_handlers(self): - """Load the error handlers defined through entry points, if any.""" - if self._error_handler_entry_point is not None: - for entry_point_name in get_entry_point_names(self._error_handler_entry_point): - try: - load_entry_point(self._error_handler_entry_point, entry_point_name) - self.logger.info("loaded the '%s' entry point for the '%s' error handlers category", - entry_point_name, self._error_handler_entry_point) - except exceptions.EntryPointError as exception: - self.logger.warning("failed to load the '%s' entry point for the '%s' error handlers: %s", - entry_point_name, self._error_handler_entry_point, exception) - - @classmethod - def define(cls, spec): - """Define the process specification.""" - # yapf: disable - super().define(spec) - spec.input('max_iterations', valid_type=orm.Int, default=lambda: orm.Int(5), - help='Maximum number of iterations the work chain will restart the calculation to finish successfully.') - spec.input('clean_workdir', valid_type=orm.Bool, default=lambda: orm.Bool(False), - help='If `True`, work directories of all called calculation will be cleaned at the end of execution.') - spec.input_namespace('fixers', valid_type=tuple, required=False, - help="Fixers you want to apply to the outputs of every calculation.", dynamic=True) - spec.exit_code(101, 'ERROR_MAXIMUM_ITERATIONS_EXCEEDED', - message='The maximum number of iterations was exceeded.') - spec.exit_code(102, 'ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE', - message='The calculation failed for an unknown reason, twice in a row.') - - def setup(self): - """Initialize context variables that are used during the logical flow of the `BaseRestartWorkChain`.""" - self.ctx.calc_name = self._calculation_class.__name__ - self.ctx.unexpected_failure = False - self.ctx.restart_calc = None - self.ctx.is_finished = False - self.ctx.iteration = 0 - - def should_run_calculation(self): - """Return whether a new calculation should be run. - - This is the case as long as the last calculation has not finished successfully and the maximum number of - restarts has not yet been exceeded. - """ - return not self.ctx.is_finished and self.ctx.iteration < self.inputs.max_iterations.value - - def run_calculation(self): - """Run the next calculation, taking the input dictionary from the context at `self.ctx.inputs`.""" - self.ctx.iteration += 1 - - try: - unwrapped_inputs = self.ctx.inputs - except AttributeError: - raise AttributeError('no calculation input dictionary was defined in `self.ctx.inputs`') - - # Set the `CALL` link label - unwrapped_inputs['metadata']['call_link_label'] = 'iteration_{:02d}'.format(self.ctx.iteration) - - inputs = prepare_process_inputs(self._calculation_class, unwrapped_inputs) - calculation = self.submit(self._calculation_class, **inputs) - - # Add a new empty list to the `errors_handled` extra. If any errors handled registered through the - # `register_error_handler` decorator return an `ErrorHandlerReport`, their name will be appended to that list. - errors_handled = self.node.get_extra('errors_handled', []) - errors_handled.append([]) - self.node.set_extra('errors_handled', errors_handled) - - self.report('launching {}<{}> iteration #{}'.format(self.ctx.calc_name, calculation.pk, self.ctx.iteration)) - - return ToContext(calculations=append_(calculation)) - - def inspect_calculation(self): - """Analyse the results of the previous calculation and call the error handlers when necessary.""" - calculation = self.ctx.calculations[self.ctx.iteration - 1] - - # Done: successful completion of last calculation - if calculation.is_finished_ok: - - # Perform an optional sanity check. If it returns an `ExitCode` this means an unrecoverable situation was - # detected and the work chain should be aborted. If it returns `False`, the sanity check detected a problem - # but has handled the problem and we should restart the cycle. - handler = self._handle_calculation_sanity_checks(calculation) # pylint: disable=assignment-from-no-return - - if (isinstance(handler, ErrorHandlerReport) and - handler.exit_code is not None and handler.exit_code.status != 0): - # Sanity check returned a handler with an exit code that is non-zero, so we abort - self.report('{}<{}> finished successfully, but sanity check detected unrecoverable problem'.format( - self.ctx.calc_name, calculation.pk)) - self.ctx.is_finished = True - return handler.exit_code - - if isinstance(handler, ErrorHandlerReport): - # Reset the `unexpected_failure` since we are restarting the calculation loop - self.ctx.unexpected_failure = False - self.report('{}<{}> finished successfully, but sanity check failed, restarting'.format( - self.ctx.calc_name, calculation.pk)) - return - - self.report('{}<{}> completed successfully'.format(self.ctx.calc_name, calculation.pk)) - self.ctx.restart_calc = calculation - self.ctx.is_finished = True - return - - # Unexpected: calculation was killed or an exception occurred, trigger unexpected failure handling - if calculation.is_excepted or calculation.is_killed: - return self._handle_unexpected_failure(calculation) - - # Failed: here the calculation is `Finished` but has a non-zero exit status, initiate the error handling - try: - exit_code = self._handle_calculation_failure(calculation) - except UnexpectedCalculationFailure as exception: - exit_code = self._handle_unexpected_failure(calculation, exception) - - # If the exit code returned actually has status `0` that means we consider the calculation as successful - if isinstance(exit_code, ExitCode): - if exit_code.status == 0: - self.ctx.is_finished = True - return exit_code - - def results(self): - """Attach the outputs specified in the output specification from the last completed calculation.""" - calculation = self.ctx.calculations[self.ctx.iteration - 1] - - # We check the `is_finished` attribute of the work chain and not the successfulness of the last calculation - # because the error handlers in the last iteration can have qualified a "failed" calculation as satisfactory - # for the outcome of the work chain and so have marked it as `is_finished=True`. - if not self.ctx.is_finished and self.ctx.iteration >= self.inputs.max_iterations.value: - # Abort: exceeded maximum number of retries - self.report('reached the maximum number of iterations {}: last ran {}<{}>'.format( - self.inputs.max_iterations.value, self.ctx.calc_name, calculation.pk)) - return self.exit_codes.ERROR_MAXIMUM_ITERATIONS_EXCEEDED - - self.report('work chain completed after {} iterations'.format(self.ctx.iteration)) - - for name, port in self.spec().outputs.items(): - - try: - node = calculation.get_outgoing(link_label_filter=name).one().node - except ValueError: - if port.required: - self.report("required output '{}' was not an output of {}<{}>".format( - name, self.ctx.calc_name, calculation.pk)) - else: - self.out(name, node) - if self._verbose: - self.report("attaching the node {}<{}> as '{}'".format(node.__class__.__name__, node.pk, name)) - - def on_terminated(self): - """Clean the working directories of all child calculations if `clean_workdir=True` in the inputs.""" - super().on_terminated() - - if self.inputs.clean_workdir.value is False: - self.report('remote folders will not be cleaned') - return - - cleaned_calcs = [] - - for called_descendant in self.node.called_descendants: - if isinstance(called_descendant, orm.CalcJobNode): - try: - called_descendant.outputs.remote_folder._clean() # pylint: disable=protected-access - cleaned_calcs.append(str(called_descendant.pk)) - except (IOError, OSError, KeyError): - pass - - if cleaned_calcs: - self.report('cleaned remote folders of calculations: {}'.format(' '.join(cleaned_calcs))) - - def _handle_calculation_sanity_checks(self, calculation): - """Perform a sanity check of a calculation that finished ok. - - Calculations that were marked as successful by the parser may still have produced outputs that do not make sense - but were not detected by the code and so were not highlighted as warnings or errors. The consistency of the - outputs can be checked here. If an unrecoverable problem is found, the function should return the appropriate - exit code to abort the work chain. If the probem can be fixed with a restart calculation, this function should - adapt the inputs as an error handler would and return `False`. This will signal to the work chain that a new - calculation should be started. If `None` is returned, the work chain assumes that the outputs produced by the - calculation are good and nothing will be done. - - :param calculation: the calculation whose outputs should be checked for consistency - :return: `ErrorHandlerReport` if a new calculation should be launched or abort if it includes an exit code - """ - import importlib - is_handled = False # Becomes True if at least one error handling operation was performed and was sucessfull - not_handled = False # Becomes True if at least one error handling operation was performed and was NOT sucessfull - - for fxtr in sorted(self.inputs.fixers.keys()): - module_path, fixer_name, *args = self.inputs.fixers[fxtr] - module = importlib.import_module(module_path) - fixer = getattr(module, fixer_name) - handler_report = fixer(self, calculation, *args) - - # if the handler detected a problem and tried to fix it - if isinstance(handler_report, ErrorHandlerReport): - - # if the problem was succesfully resolved - if handler_report.is_handled: - is_handled = True - - # Break error checks if required - if handler_report.do_break: - return ErrorHandlerReport(True) - - # If the problem wasn't resolved - else: - not_handled = True - # Specify latest not handled report for return - latest_not_handled_report = handler_report - - # If at least one error is handled, we consider that the calculation should be restarted - if is_handled: - return ErrorHandlerReport(True, True) - - # If none of the handlers were sucesfull in fixing problems, we return the latest not handled report. - if not_handled: - return latest_not_handled_report - - def _handle_calculation_failure(self, calculation): - """Call the attached error handlers if any to attempt to correct the cause of the calculation failure. - - The registered error handlers will be called in order based on their priority until a handler returns a report - that instructs to break. If the last executed error handler defines an exit code, that will be returned to - instruct the work chain to abort. Otherwise the work chain will continue the cycle. - - :param calculation: the calculation that finished with a non-zero exit status - :return: `ExitCode` if the work chain is to be aborted - :raises `UnexpectedCalculationFailure`: if no error handlers were registered or no errors were handled. - """ - is_handled = False - handler_report = None - - if not hasattr(self, '_error_handlers') or not self._error_handlers: - raise UnexpectedCalculationFailure('no calculation error handlers were registered') - - # Sort the handlers with a priority defined, based on their priority in reverse order - handlers = [handler for handler in self._error_handlers if handler.priority] - handlers = sorted(handlers, key=lambda x: x.priority, reverse=True) - - for handler in handlers: - - handler_report = handler.method(self, calculation) - - # If at least one error is handled, we consider the calculation failure handled. - if handler_report and handler_report.is_handled: - self.ctx.unexpected_failure = False - is_handled = True - - # After certain error handlers, we may want to skip all other error handling - if handler_report and handler_report.do_break: - break - - # If none of the executed error handlers reported that they handled an error, the failure reason is unknown - if not is_handled: - raise UnexpectedCalculationFailure('calculation failure was not handled') - - # The last called error handler may not necessarily have returned a handler report - if handler_report: - return handler_report.exit_code - - return - - def _handle_unexpected_failure(self, calculation, exception=None): - """Handle an unexpected failure. - - This occurs when a calculation excepted, was killed or finished with a non-zero exit status but no errors were - handled. If this is the second consecutive unexpected failure the work chain is aborted. - - :param calculation: the calculation that failed in an unexpected way - :param exception: optional exception or error message to log to the report - :return: `ExitCode` if this is the second consecutive unexpected failure - """ - if exception: - self.report('{}'.format(exception)) - - if self.ctx.unexpected_failure: - self.report('failure of {}<{}> could not be handled for the second consecutive time'.format( - self.ctx.calc_name, calculation.pk)) - return self.exit_codes.ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE - - self.ctx.unexpected_failure = True - self.report('failure of {}<{}> could not be handled, restarting once more'.format( - self.ctx.calc_name, calculation.pk)) diff --git a/aiida_raspa/workchains/base.py b/aiida_raspa/workchains/base.py index 9a8a37c..c679411 100644 --- a/aiida_raspa/workchains/base.py +++ b/aiida_raspa/workchains/base.py @@ -1,13 +1,12 @@ # -*- coding: utf-8 -*- -"""Base workchain to run a RASPA calculation""" +"""Base work chain to run a RASPA calculation""" from aiida.common import AttributeDict -from aiida.engine import while_ -from aiida.orm import Int +from aiida.engine import BaseRestartWorkChain, ProcessHandlerReport, process_handler, while_ +from aiida.orm import Int, Str, Float from aiida.plugins import CalculationFactory -from aiida_raspa.utils import ErrorHandlerReport, register_error_handler, add_write_binary_restart -from aiida_raspa.workchains.aiida_base_restart import BaseRestartWorkChain +from aiida_raspa.utils import add_write_binary_restart, modify_number_of_cycles, increase_box_lenght RaspaCalculation = CalculationFactory('raspa') # pylint: disable=invalid-name @@ -15,7 +14,7 @@ class RaspaBaseWorkChain(BaseRestartWorkChain): """Workchain to run a RASPA calculation with automated error handling and restarts.""" - _calculation_class = RaspaCalculation + _process_class = RaspaCalculation @classmethod def define(cls, spec): @@ -23,9 +22,9 @@ def define(cls, spec): spec.expose_inputs(RaspaCalculation, namespace='raspa') spec.outline( cls.setup, - while_(cls.should_run_calculation)( - cls.run_calculation, - cls.inspect_calculation, + while_(cls.should_run_process)( + cls.run_process, + cls.inspect_process, ), cls.results, ) @@ -37,6 +36,7 @@ def setup(self): internal loop.""" super().setup() + self.ctx.inputs = AttributeDict(self.exposed_inputs(RaspaCalculation, 'raspa')) if "WriteBinaryRestartFileEvery" not in self.ctx.inputs.parameters["GeneralSettings"]: self.ctx.inputs.parameters = add_write_binary_restart(self.ctx.inputs.parameters, Int(1000)) @@ -45,18 +45,162 @@ def report_error_handled(self, calculation, action): """Report an action taken for a calculation that has failed. This should be called in a registered error handler if its condition is met and an action was taken. :param calculation: the failed calculation node - :param action: a string message with the action taken - """ + :param action: a string message with the action taken""" + arguments = [calculation.process_label, calculation.pk, calculation.exit_status, calculation.exit_message] self.report('{}<{}> failed with exit status {}: {}'.format(*arguments)) self.report('Action taken: {}'.format(action)) - -@register_error_handler(RaspaBaseWorkChain, 570) -def _handle_timeout(self, calculation): - """Error handler that restarts calculation finished with TIMEOUT ExitCode.""" - if calculation.exit_status == RaspaCalculation.spec().exit_codes.TIMEOUT.status: + @process_handler(priority=570, exit_codes=RaspaCalculation.exit_codes.TIMEOUT, enabled=True) + def handle_timeout(self, calculation): + """Error handler that restarts calculation finished with TIMEOUT ExitCode.""" self.report_error_handled(calculation, "Timeout handler. Adding remote folder as input to use binary restart.") self.ctx.inputs.parent_folder = calculation.outputs.remote_folder + return ProcessHandlerReport(False) + + @process_handler(priority=400, enabled=False) + def check_widom_convergence(self, calculation): + """Checks whether a Widom particle insertion is converged. The check is based on the + error bar of the Henry coefficient.""" + + conv_threshold = 0.1 + additional_cycle = 2000 + + output_widom = calculation.outputs.output_parameters.get_dict() + structure_label = list(calculation.get_incoming().nested()['framework'].keys())[0] + conv_stat = [] + + for comp in calculation.inputs.parameters['Component']: + kh_average_comp = output_widom[structure_label]["components"][comp]["henry_coefficient_average"] + kh_dev_comp = output_widom[structure_label]["components"][comp]["henry_coefficient_dev"] + + error = round((kh_dev_comp / kh_average_comp), 2) + if error <= conv_threshold: + conv_stat.append(True) + else: + conv_stat.append(False) + + if not all(conv_stat): + + self.report("Widom particle insertion calculationulation is NOT converged: repeating with more trials...") + self.ctx.inputs.retrieved_parent_folder = calculation.outputs['retrieved'] + self.ctx.inputs.parameters = modify_number_of_cycles(self.ctx.inputs.parameters, + additional_init_cycle=Int(0), + additional_prod_cycle=Int(additional_cycle)) + return ProcessHandlerReport(False) + + return None + + @process_handler(priority=410, enabled=False) + def check_gcmc_convergence(self, calc): + """Checks whether a GCMC calc is converged. Checking is based on the error bar on average loading.""" + conv_threshold = 0.1 + additional_init_cycle = 2000 + additional_prod_cycle = 2000 + + output_gcmc = calc.outputs.output_parameters.get_dict() + structure_label = list(calc.get_incoming().nested()['framework'].keys())[0] + conv_stat = [] + + for comp in calc.inputs.parameters['Component']: + + loading_average_comp = output_gcmc[structure_label]["components"][comp]["loading_absolute_average"] + loading_dev_comp = output_gcmc[structure_label]["components"][comp]["loading_absolute_dev"] + + # It can happen for weekly adsorbed species. + # we need to think about a better way to handle it. + # Currently, if it happens for five iterations, self will not continue. + if loading_average_comp == 0: + conv_stat.append(False) + else: + error = round((loading_dev_comp / loading_average_comp), 2) + if error <= conv_threshold: + conv_stat.append(True) + else: + conv_stat.append(False) + + if not all(conv_stat): + self.report("GCMC calculation is NOT converged: continuing from restart...") + self.ctx.inputs.retrieved_parent_folder = calc.outputs['retrieved'] + self.ctx.inputs.parameters = modify_number_of_cycles(self.ctx.inputs.parameters, + additional_init_cycle=Int(additional_init_cycle), + additional_prod_cycle=Int(additional_prod_cycle)) + return ProcessHandlerReport(False) + + return None + + @process_handler(priority=410, enabled=False) + def check_gemc_convergence(self, calc): + """Checks whether a GEMC calc is converged. Checking is based on the error bar on average loading which is + average number of molecules in each simulation box.""" + + conv_threshold = 0.1 + additional_init_cycle = 2000 + additional_prod_cycle = 2000 + + output_gemc = calc.outputs.output_parameters.get_dict() + conv_stat = [] + + for comp in calc.inputs.parameters['Component']: + molec_per_box1_comp_average = output_gemc['box_one']["components"][comp]["loading_absolute_average"] + molec_per_box2_comp_average = output_gemc['box_two']["components"][comp]["loading_absolute_average"] + molec_per_box1_comp_dev = output_gemc['box_one']["components"][comp]["loading_absolute_dev"] + molec_per_box2_comp_dev = output_gemc['box_two']["components"][comp]["loading_absolute_dev"] + + error_box1 = round((molec_per_box1_comp_dev / molec_per_box1_comp_average), 2) + error_box2 = round((molec_per_box2_comp_dev / molec_per_box2_comp_average), 2) + + if (error_box1 <= conv_threshold) and (error_box2 <= conv_threshold): + conv_stat.append(True) + else: + conv_stat.append(False) + + if not all(conv_stat): + self.report("GEMC calculation is NOT converged: continuing from restart...") + self.ctx.inputs.retrieved_parent_folder = calc.outputs['retrieved'] + self.ctx.inputs.parameters = modify_number_of_cycles(self.ctx.inputs.parameters, + additional_init_cycle=Int(additional_init_cycle), + additional_prod_cycle=Int(additional_prod_cycle)) + return ProcessHandlerReport(False) + + return None + + @process_handler(priority=400, enabled=False) + def check_gemc_box(self, calc): + """Checks whether each simulation box still satisfies minimum image convention.""" + + output_gemc = calc.outputs.output_parameters.get_dict() + cutoff = calc.inputs.parameters['GeneralSettings']['CutOff'] + box_one_stat = [] + box_two_stat = [] + + box_one_length_current = [] + box_two_length_current = [] + + for box_len_ave in ["box_ax_average", "box_by_average", "box_cz_average"]: + if output_gemc["box_one"]["general"][box_len_ave] > 2 * cutoff: + box_one_stat.append(True) + else: + box_one_stat.append(False) + box_one_length_current.append(output_gemc["box_one"]["general"][box_len_ave]) + + if output_gemc["box_two"]["general"][box_len_ave] > 2 * cutoff: + box_two_stat.append(True) + else: + box_two_stat.append(False) + box_two_length_current.append(output_gemc["box_two"]["general"][box_len_ave]) + + if not all(box_one_stat and box_two_stat): + self.report("GEMC box is NOT converged: repeating with increase box...") + # Fixing the issue. + if not all(box_one_stat): + self.ctx.inputs.parameters = increase_box_lenght(self.ctx.inputs.parameters, Str("box_one"), + Float(box_one_length_current[0])) + + if not all(box_two_stat): + self.ctx.inputs.parameters = increase_box_lenght(self.ctx.inputs.parameters, Str("box_two"), + Float(box_two_length_current[0])) + + return ProcessHandlerReport(True) - return ErrorHandlerReport(True, False, None) + return None diff --git a/examples/workchains/example_base_restart_timeout.py b/examples/workchains/example_base_restart_timeout.py index da5a520..7fd25a7 100644 --- a/examples/workchains/example_base_restart_timeout.py +++ b/examples/workchains/example_base_restart_timeout.py @@ -6,7 +6,7 @@ import click from aiida.common import NotExistent -from aiida.engine import run +from aiida.engine import run_get_node from aiida.orm import CifData, Code, Dict, SinglefileData, Int from aiida_raspa.workchains import RaspaBaseWorkChain @@ -86,14 +86,15 @@ def example_base_restart_timeout(raspa_code): "num_mpiprocs_per_machine": 1, }, "max_wallclock_seconds": 1 * 30 * 60, # 30 min - "withmpi": True, - "mpirun_extra_params": ["timeout", "5"], # kill the calculation after 5 seconds, to test restart + "withmpi": True, # A trick to put the kill below before raspa command. + "mpirun_extra_params": ["timeout", "5"], # Kill the calculation after 5 seconds, to test restart. } # Specify RaspaBaseWorkChain options builder.max_iterations = Int(8) # number of maximum iterations: prevent for infinite restart (default: 5) - run(builder) + _, node = run_get_node(builder) + assert node.exit_status == 0 @click.command('cli') diff --git a/examples/workchains/example_base_workchain_gcmc_2comp.py b/examples/workchains/example_base_workchain_gcmc_2comp.py index d8c3ef6..2d6c8f9 100644 --- a/examples/workchains/example_base_workchain_gcmc_2comp.py +++ b/examples/workchains/example_base_workchain_gcmc_2comp.py @@ -6,7 +6,7 @@ import click from aiida.common import NotExistent -from aiida.engine import run +from aiida.engine import run_get_node from aiida.orm import CifData, Code, Dict, SinglefileData from aiida_raspa.workchains import RaspaBaseWorkChain @@ -89,10 +89,8 @@ def example_base_workchain_gcmc(raspa_code): "irmof_1_xenon": block_pocket_node2, } - # Add fixers that could handle physics-related problems. - builder.fixers = { - 'fixer_001': ('aiida_raspa.utils', 'check_gcmc_convergence', 0.10, 2000, 2000), - } + builder.handler_overrides = Dict(dict={'check_gcmc_convergence': True + }) # Enable gcmc convergence handler disabled by default. # Specifying the scheduler options builder.raspa.metadata.options = { @@ -104,7 +102,8 @@ def example_base_workchain_gcmc(raspa_code): "withmpi": False, } - run(builder) + _, node = run_get_node(builder) + assert node.exit_status == 0 @click.command('cli') diff --git a/examples/workchains/example_base_workchain_widom_1comp.py b/examples/workchains/example_base_workchain_widom_1comp.py index 5d1da79..6caae7d 100644 --- a/examples/workchains/example_base_workchain_widom_1comp.py +++ b/examples/workchains/example_base_workchain_widom_1comp.py @@ -6,7 +6,7 @@ import click from aiida.common import NotExistent -from aiida.engine import run +from aiida.engine import run_get_node from aiida.orm import CifData, Code, Dict, SinglefileData from aiida_raspa.workchains import RaspaBaseWorkChain @@ -69,10 +69,9 @@ def example_base_workchain_widom(raspa_code): "block_tcc1rs_xenon": block_pocket_node1, } - # Add fixers that could handle physics-related problems. - builder.fixers = { - 'fixer_001': ('aiida_raspa.utils', 'check_widom_convergence', 0.10, 2000), - } + # Add handlers that could handle physics-related problems. + builder.handler_overrides = Dict(dict={'check_widom_convergence': True + }) # Enable widom convergence handler disabled by default. # Specifying the scheduler options builder.raspa.metadata.options = { @@ -84,7 +83,8 @@ def example_base_workchain_widom(raspa_code): "withmpi": False, } - run(builder) + _, node = run_get_node(builder) + assert node.exit_status == 0 @click.command('cli') diff --git a/examples/workchains/example_base_workchain_widom_2comp.py b/examples/workchains/example_base_workchain_widom_2comp.py index f13db44..6e150ac 100644 --- a/examples/workchains/example_base_workchain_widom_2comp.py +++ b/examples/workchains/example_base_workchain_widom_2comp.py @@ -6,7 +6,7 @@ import click from aiida.common import NotExistent -from aiida.engine import run +from aiida.engine import run_get_node from aiida.orm import CifData, Code, Dict, SinglefileData from aiida_raspa.workchains import RaspaBaseWorkChain @@ -78,10 +78,8 @@ def example_base_workchain_widom_2(raspa_code): "block_tcc1rs_xenon": block_pocket_node2, } - # Add fixers that could handle physics-related problems. - builder.fixers = { - 'fixer_001': ('aiida_raspa.utils', 'check_widom_convergence', 0.1, 2000), - } + builder.handler_overrides = Dict(dict={'check_widom_convergence': True + }) # Enable widom convergence handler disabled by default. # Specifying the scheduler options builder.raspa.metadata.options = { @@ -93,7 +91,8 @@ def example_base_workchain_widom_2(raspa_code): "withmpi": False, } - run(builder) + _, node = run_get_node(builder) + assert node.exit_status == 0 @click.command('cli') diff --git a/examples/workchains/example_base_workchain_gemc_1comp.py b/examples/workchains/too_long_base_workchain_gemc_1comp.py similarity index 88% rename from examples/workchains/example_base_workchain_gemc_1comp.py rename to examples/workchains/too_long_base_workchain_gemc_1comp.py index 4fba88a..5f65923 100644 --- a/examples/workchains/example_base_workchain_gemc_1comp.py +++ b/examples/workchains/too_long_base_workchain_gemc_1comp.py @@ -5,7 +5,7 @@ import click from aiida.common import NotExistent -from aiida.engine import run +from aiida.engine import run_get_node from aiida.orm import Code, Dict from aiida_raspa.workchains import RaspaBaseWorkChain @@ -66,11 +66,11 @@ def example_base_workchain_gemc(raspa_code): # Specifying the input parameters builder.raspa.parameters = parameters - # Add fixers that could handle physics-related problems. - builder.fixers = { - 'fixer_001': ('aiida_raspa.utils', 'check_gemc_box'), - 'fixer_002': ('aiida_raspa.utils', 'check_gemc_convergence', 0.8, 200, 200), - } + # Add handlers that could handle physics-related problems. + builder.handler_overrides = Dict(dict={ + 'check_gemc_box': True, + 'check_gemc_convergence': True, + }) # Enable gemc handlers disabled by default. # Specifying the scheduler options builder.raspa.metadata.options = { @@ -82,7 +82,8 @@ def example_base_workchain_gemc(raspa_code): "withmpi": False, } - run(builder) + _, node = run_get_node(builder) + assert node.exit_status == 0 @click.command('cli') From 32fb4ff5c8eb8ba2492c3395c7931d67494b1291 Mon Sep 17 00:00:00 2001 From: Aliaksandr Yakutovich Date: Tue, 8 Sep 2020 17:45:48 +0200 Subject: [PATCH 2/2] Prepare release 1.2.0 --- aiida_raspa/__init__.py | 2 +- setup.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aiida_raspa/__init__.py b/aiida_raspa/__init__.py index 656be5f..42a0d49 100644 --- a/aiida_raspa/__init__.py +++ b/aiida_raspa/__init__.py @@ -9,4 +9,4 @@ ############################################################################## """AiiDA-RASPA plugins, parsers, workflows, etc ...""" -__version__ = "1.1.1" +__version__ = "1.2.0" diff --git a/setup.json b/setup.json index efd3586..e903ebf 100644 --- a/setup.json +++ b/setup.json @@ -1,6 +1,6 @@ { "name": "aiida-raspa", - "version": "1.1.1", + "version": "1.2.0", "author": "Aliaksandr Yakutovich", "author_email": "aliaksandr.yakutovich@epfl.ch", "description": "AiiDA plugin for RASPA code",