diff --git a/arc/job/adapters/ts/heuristics.py b/arc/job/adapters/ts/heuristics.py index 78b8f4499d..82a7fd584c 100644 --- a/arc/job/adapters/ts/heuristics.py +++ b/arc/job/adapters/ts/heuristics.py @@ -18,49 +18,52 @@ import datetime import itertools import math -import subprocess import os -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union import re +import subprocess +import time +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union import numpy as np import pandas as pd - +from arkane.statmech import is_linear from rmgpy.exceptions import ActionError from rmgpy.molecule.molecule import Molecule from rmgpy.reaction import Reaction from rmgpy.species import Species -from arkane.statmech import is_linear from arc.common import almost_equal_coords, get_logger, is_angle_linear, key_by_val from arc.imports import settings from arc.job.adapter import JobAdapter from arc.job.adapters.common import _initialize_adapter, ts_adapters_by_rmg_family from arc.job.factory import register_job_adapter -from arc.plotter import save_geo -from arc.species.converter import (compare_zmats, - relocate_zmat_dummy_atoms_to_the_end, - zmat_from_xyz, - zmat_to_xyz, - str_to_xyz, - xyz_to_str, - str_to_str, - xyz_to_str, - xyz_to_dmat) +from arc.job.local import check_job_status, submit_job from arc.mapping.engine import map_arc_rmg_species, map_two_species +from arc.plotter import save_geo +from arc.species.converter import ( + compare_zmats, + relocate_zmat_dummy_atoms_to_the_end, + str_to_str, + str_to_xyz, + xyz_to_dmat, + xyz_to_str, + zmat_from_xyz, + zmat_to_xyz, +) from arc.species.species import ARCSpecies, TSGuess, colliding_atoms from arc.species.zmat import get_parameter_from_atom_indices, remove_1st_atom, up_param if TYPE_CHECKING: from rmgpy.data.kinetics.family import KineticsFamily + from arc.level import Level from arc.reaction import ARCReaction try: - CREST_PATH = settings['CREST_PATH'] - CREST_ENV_PATH = settings['CREST_ENV_PATH'] + CREST_PATH = settings["CREST_PATH"] + CREST_ENV_PATH = settings["CREST_ENV_PATH"] HAS_CREST = True - + SERVERS = settings["servers"] except KeyError: HAS_CREST = False @@ -118,101 +121,109 @@ class HeuristicsAdapter(JobAdapter): xyz (dict, optional): The 3D coordinates to use. If not give, species.get_xyz() will be used. """ - def __init__(self, - project: str, - project_directory: str, - job_type: Union[List[str], str], - args: Optional[dict] = None, - bath_gas: Optional[str] = None, - checkfile: Optional[str] = None, - conformer: Optional[int] = None, - constraints: Optional[List[Tuple[List[int], float]]] = None, - cpu_cores: Optional[str] = None, - dihedral_increment: Optional[float] = None, - dihedrals: Optional[List[float]] = None, - directed_scan_type: Optional[str] = None, - ess_settings: Optional[dict] = None, - ess_trsh_methods: Optional[List[str]] = None, - execution_type: Optional[str] = None, - fine: bool = False, - initial_time: Optional[Union['datetime.datetime', str]] = None, - irc_direction: Optional[str] = None, - job_id: Optional[int] = None, - job_memory_gb: float = 14.0, - job_name: Optional[str] = None, - job_num: Optional[int] = None, - job_server_name: Optional[str] = None, - job_status: Optional[List[Union[dict, str]]] = None, - level: Optional['Level'] = None, - max_job_time: Optional[float] = None, - run_multi_species: bool = False, - reactions: Optional[List['ARCReaction']] = None, - rotor_index: Optional[int] = None, - server: Optional[str] = None, - server_nodes: Optional[list] = None, - queue: Optional[str] = None, - attempted_queues: Optional[List[str]] = None, - species: Optional[List[ARCSpecies]] = None, - testing: bool = False, - times_rerun: int = 0, - torsions: Optional[List[List[int]]] = None, - tsg: Optional[int] = None, - xyz: Optional[dict] = None, - ): + def __init__( + self, + project: str, + project_directory: str, + job_type: Union[List[str], str], + args: Optional[dict] = None, + bath_gas: Optional[str] = None, + checkfile: Optional[str] = None, + conformer: Optional[int] = None, + constraints: Optional[List[Tuple[List[int], float]]] = None, + cpu_cores: Optional[str] = None, + dihedral_increment: Optional[float] = None, + dihedrals: Optional[List[float]] = None, + directed_scan_type: Optional[str] = None, + ess_settings: Optional[dict] = None, + ess_trsh_methods: Optional[List[str]] = None, + execution_type: Optional[str] = None, + fine: bool = False, + initial_time: Optional[Union["datetime.datetime", str]] = None, + irc_direction: Optional[str] = None, + job_id: Optional[int] = None, + job_memory_gb: float = 14.0, + job_name: Optional[str] = None, + job_num: Optional[int] = None, + job_server_name: Optional[str] = None, + job_status: Optional[List[Union[dict, str]]] = None, + level: Optional["Level"] = None, + max_job_time: Optional[float] = None, + run_multi_species: bool = False, + reactions: Optional[List["ARCReaction"]] = None, + rotor_index: Optional[int] = None, + server: Optional[str] = None, + server_nodes: Optional[list] = None, + queue: Optional[str] = None, + attempted_queues: Optional[List[str]] = None, + species: Optional[List[ARCSpecies]] = None, + testing: bool = False, + times_rerun: int = 0, + torsions: Optional[List[List[int]]] = None, + tsg: Optional[int] = None, + xyz: Optional[dict] = None, + ): self.incore_capacity = 50 - self.job_adapter = 'heuristics' + self.job_adapter = "heuristics" self.command = None - self.execution_type = execution_type or 'incore' + self.execution_type = execution_type or "incore" if reactions is None: - raise ValueError('Cannot execute TS Heuristics without ARCReaction object(s).') - - if dihedral_increment is not None and (dihedral_increment < 0 or dihedral_increment > 360): - raise ValueError(f'dihedral_increment should be between 0 to 360, got: {dihedral_increment}') + raise ValueError( + "Cannot execute TS Heuristics without ARCReaction object(s)." + ) + + if dihedral_increment is not None and ( + dihedral_increment < 0 or dihedral_increment > 360 + ): + raise ValueError( + f"dihedral_increment should be between 0 to 360, got: {dihedral_increment}" + ) dihedral_increment = dihedral_increment or DIHEDRAL_INCREMENT - _initialize_adapter(obj=self, - is_ts=True, - project=project, - project_directory=project_directory, - job_type=job_type, - args=args, - bath_gas=bath_gas, - checkfile=checkfile, - conformer=conformer, - constraints=constraints, - cpu_cores=cpu_cores, - dihedral_increment=dihedral_increment, - dihedrals=dihedrals, - directed_scan_type=directed_scan_type, - ess_settings=ess_settings, - ess_trsh_methods=ess_trsh_methods, - fine=fine, - initial_time=initial_time, - irc_direction=irc_direction, - job_id=job_id, - job_memory_gb=job_memory_gb, - job_name=job_name, - job_num=job_num, - job_server_name=job_server_name, - job_status=job_status, - level=level, - max_job_time=max_job_time, - run_multi_species=run_multi_species, - reactions=reactions, - rotor_index=rotor_index, - server=server, - server_nodes=server_nodes, - queue=queue, - attempted_queues=attempted_queues, - species=species, - testing=testing, - times_rerun=times_rerun, - torsions=torsions, - tsg=tsg, - xyz=xyz, - ) + _initialize_adapter( + obj=self, + is_ts=True, + project=project, + project_directory=project_directory, + job_type=job_type, + args=args, + bath_gas=bath_gas, + checkfile=checkfile, + conformer=conformer, + constraints=constraints, + cpu_cores=cpu_cores, + dihedral_increment=dihedral_increment, + dihedrals=dihedrals, + directed_scan_type=directed_scan_type, + ess_settings=ess_settings, + ess_trsh_methods=ess_trsh_methods, + fine=fine, + initial_time=initial_time, + irc_direction=irc_direction, + job_id=job_id, + job_memory_gb=job_memory_gb, + job_name=job_name, + job_num=job_num, + job_server_name=job_server_name, + job_status=job_status, + level=level, + max_job_time=max_job_time, + run_multi_species=run_multi_species, + reactions=reactions, + rotor_index=rotor_index, + server=server, + server_nodes=server_nodes, + queue=queue, + attempted_queues=attempted_queues, + species=species, + testing=testing, + times_rerun=times_rerun, + torsions=torsions, + tsg=tsg, + xyz=xyz, + ) def write_input_file(self) -> None: """ @@ -254,46 +265,64 @@ def execute_incore(self): Execute a job incore. """ self._log_job_execution() - self.initial_time = self.initial_time if self.initial_time else datetime.datetime.now() + self.initial_time = ( + self.initial_time if self.initial_time else datetime.datetime.now() + ) - supported_families = [key for key, val in ts_adapters_by_rmg_family.items() if 'heuristics' in val] + supported_families = [ + key for key, val in ts_adapters_by_rmg_family.items() if "heuristics" in val + ] - self.reactions = [self.reactions] if not isinstance(self.reactions, list) else self.reactions + self.reactions = ( + [self.reactions] if not isinstance(self.reactions, list) else self.reactions + ) for rxn in self.reactions: family_label = rxn.family.label if family_label not in supported_families: - logger.warning(f'The heuristics TS search adapter does not support the {family_label} reaction family.') + logger.warning( + f"The heuristics TS search adapter does not support the {family_label} reaction family." + ) continue if any(spc.get_xyz() is None for spc in rxn.r_species + rxn.p_species): - logger.warning(f'The heuristics TS search adapter cannot process a reaction if 3D coordinates of ' - f'some/all of its reactants/products are missing.\nNot processing {rxn}.') + logger.warning( + f"The heuristics TS search adapter cannot process a reaction if 3D coordinates of " + f"some/all of its reactants/products are missing.\nNot processing {rxn}." + ) continue if rxn.ts_species is None: # Mainly used for testing, in an ARC run the TS species should already exist. - rxn.ts_species = ARCSpecies(label='TS', - is_ts=True, - charge=rxn.charge, - multiplicity=rxn.multiplicity, - ) + rxn.ts_species = ARCSpecies( + label="TS", + is_ts=True, + charge=rxn.charge, + multiplicity=rxn.multiplicity, + ) rxn.arc_species_from_rmg_reaction() - reactants, products = rxn.get_reactants_and_products(arc=True, return_copies=True) - reactant_mol_combinations = list(itertools.product(*list(reactant.mol_list for reactant in reactants))) - product_mol_combinations = list(itertools.product(*list(product.mol_list for product in products))) + reactants, products = rxn.get_reactants_and_products( + arc=True, return_copies=True + ) + reactant_mol_combinations = list( + itertools.product(*list(reactant.mol_list for reactant in reactants)) + ) + product_mol_combinations = list( + itertools.product(*list(product.mol_list for product in products)) + ) reaction_list = list() for reactants in list(reactant_mol_combinations): for products in list(product_mol_combinations): - rxns = react(reactants=list(reactants), - products=list(products), - family=rxn.family, - arc_reaction=rxn, - ) + rxns = react( + reactants=list(reactants), + products=list(products), + family=rxn.family, + arc_reaction=rxn, + ) if rxns is not None: reaction_list.extend(rxns) xyzs = list() tsg = None - if family_label == 'H_Abstraction': + if family_label == "H_Abstraction": # Todo: train guess params # r1_stretch_, r2_stretch_, a2_ = get_training_params( # family='H_Abstraction', @@ -301,47 +330,60 @@ def execute_incore(self): # atom_symbol_key=tuple(sorted([atom_a.element.symbol, atom_b.element.symbol])), # ) # r1_stretch_, r2_stretch_, a2_ = 1.2, 1.2, 170 # general guesses - tsg = TSGuess(method='Heuristics') + tsg = TSGuess(method="Heuristics") tsg.tic() - xyzs = h_abstraction(arc_reaction=rxn, - rmg_reactions=reaction_list, - dihedral_increment=self.dihedral_increment, - path=self.local_path, - ) + xyzs = h_abstraction( + arc_reaction=rxn, + rmg_reactions=reaction_list, + dihedral_increment=self.dihedral_increment, + path=self.local_path, + ) tsg.tok() for method_index, xyz in enumerate(xyzs): unique = True for other_tsg in rxn.ts_species.ts_guesses: if almost_equal_coords(xyz, other_tsg.initial_xyz): - if 'heuristics' not in other_tsg.method.lower(): - other_tsg.method += ' and Heuristics' + if "heuristics" not in other_tsg.method.lower(): + other_tsg.method += " and Heuristics" unique = False break if unique: - ts_guess = TSGuess(method='Heuristics', - index=len(rxn.ts_species.ts_guesses), - method_index=method_index, - t0=tsg.t0, - execution_time=tsg.execution_time, - success=True, - family=family_label, - xyz=xyz, - ) + ts_guess = TSGuess( + method="Heuristics", + index=len(rxn.ts_species.ts_guesses), + method_index=method_index, + t0=tsg.t0, + execution_time=tsg.execution_time, + success=True, + family=family_label, + xyz=xyz, + ) rxn.ts_species.ts_guesses.append(ts_guess) - save_geo(xyz=xyz, - path=self.local_path, - filename=f'Heuristics_{method_index}', - format_='xyz', - comment=f'Heuristics {method_index}, family: {family_label}', - ) + save_geo( + xyz=xyz, + path=self.local_path, + filename=f"Heuristics_{method_index}", + format_="xyz", + comment=f"Heuristics {method_index}, family: {family_label}", + ) if len(self.reactions) < 5: - successes = len([tsg for tsg in rxn.ts_species.ts_guesses if tsg.success and 'heuristics' in tsg.method]) + successes = len( + [ + tsg + for tsg in rxn.ts_species.ts_guesses + if tsg.success and "heuristics" in tsg.method + ] + ) if successes: - logger.info(f'Heuristics successfully found {successes} TS guesses for {rxn.label}.') + logger.info( + f"Heuristics successfully found {successes} TS guesses for {rxn.label}." + ) else: - logger.info(f'Heuristics did not find any successful TS guesses for {rxn.label}.') + logger.info( + f"Heuristics did not find any successful TS guesses for {rxn.label}." + ) self.final_time = datetime.datetime.now() @@ -353,23 +395,24 @@ def execute_queue(self): self.execute_incore() -def combine_coordinates_with_redundant_atoms(xyz_1: Union[dict, str], - xyz_2: Union[dict, str], - mol_1: 'Molecule', - mol_2: 'Molecule', - reactant_2: ARCSpecies, - h1: int, - h2: int, - c: Optional[int] = None, - d: Optional[int] = None, - r1_stretch: float = 1.2, - r2_stretch: float = 1.2, - a2: float = 180, - d2: Optional[float] = None, - d3: Optional[float] = None, - keep_dummy: bool = False, - reactants_reversed: bool = False, - ) -> dict: +def combine_coordinates_with_redundant_atoms( + xyz_1: Union[dict, str], + xyz_2: Union[dict, str], + mol_1: "Molecule", + mol_2: "Molecule", + reactant_2: ARCSpecies, + h1: int, + h2: int, + c: Optional[int] = None, + d: Optional[int] = None, + r1_stretch: float = 1.2, + r2_stretch: float = 1.2, + a2: float = 180, + d2: Optional[float] = None, + d3: Optional[float] = None, + keep_dummy: bool = False, + reactants_reversed: bool = False, +) -> dict: """ Combine two coordinates that share an atom. For this redundant atom case, only three additional degrees of freedom (here ``a2``, ``d2``, and ``d3``) @@ -439,88 +482,117 @@ def combine_coordinates_with_redundant_atoms(xyz_1: Union[dict, str], before returning the final cartesian coordinates """ is_a2_linear = is_angle_linear(a2) - is_mol_1_linear = is_linear(np.array(xyz_1['coords'])) + is_mol_1_linear = is_linear(np.array(xyz_1["coords"])) d2 = d2 if not is_a2_linear else None num_atoms_mol_1, num_atoms_mol_2 = len(mol_1.atoms), len(mol_2.atoms) if num_atoms_mol_1 == 1 or num_atoms_mol_2 == 1: raise ValueError( - f'The molecule arguments to combine_coordinates_with_redundant_atoms must each have more than 1 ' - f'atom (including the abstracted hydrogen atom in each), got {len(mol_1.atoms)} atoms in mol_1 ' - f'and {len(mol_2.atoms)} atoms in mol_2.') + f"The molecule arguments to combine_coordinates_with_redundant_atoms must each have more than 1 " + f"atom (including the abstracted hydrogen atom in each), got {len(mol_1.atoms)} atoms in mol_1 " + f"and {len(mol_2.atoms)} atoms in mol_2." + ) if d2 is None and not is_a2_linear and num_atoms_mol_1 > 2: - raise ValueError('The d2 parameter (the B-H-A-C dihedral) must be given if the a2 angle (B-H-A) is not close ' - 'to 180 degrees, got None.') + raise ValueError( + "The d2 parameter (the B-H-A-C dihedral) must be given if the a2 angle (B-H-A) is not close " + "to 180 degrees, got None." + ) if d3 is None and not is_a2_linear and num_atoms_mol_2 > 2: - raise ValueError('The d3 parameter (the A-H-B-D dihedral) must be given if the a2 angle (B-H-A) is not linear ' - 'and mol_2 has 3 or more atoms, got None.') + raise ValueError( + "The d3 parameter (the A-H-B-D dihedral) must be given if the a2 angle (B-H-A) is not linear " + "and mol_2 has 3 or more atoms, got None." + ) if c is None and num_atoms_mol_1 > 2: - raise ValueError('The c parameter (the index of atom C in xyz1) must be given if mol_1 has 3 or more atoms, ' - 'got None.') + raise ValueError( + "The c parameter (the index of atom C in xyz1) must be given if mol_1 has 3 or more atoms, " + "got None." + ) if d is None and num_atoms_mol_2 > 2: - raise ValueError('The d parameter (the index of atom D in xyz2) must be given if mol_2 has 3 or more atoms, ' - 'got None.') + raise ValueError( + "The d parameter (the index of atom D in xyz2) must be given if mol_2 has 3 or more atoms, " + "got None." + ) if d3 is None and d is not None and num_atoms_mol_1 > 2 and not is_mol_1_linear: - raise ValueError('The d3 parameter (dihedral D-B-H-A) must be given if mol_1 has 3 or more atoms, got None.') + raise ValueError( + "The d3 parameter (dihedral D-B-H-A) must be given if mol_1 has 3 or more atoms, got None." + ) a = mol_1.atoms.index(list(mol_1.atoms[h1].edges.keys())[0]) b = mol_2.atoms.index(list(mol_2.atoms[h2].edges.keys())[0]) - - #log if a and b are different from prev step - logger.info(f'Combining coordinates with redundant atoms for atoms A ({a}) and B ({b})') + + # log if a and b are different from prev step + logger.info( + f"Combining coordinates with redundant atoms for atoms A ({a}) and B ({b})" + ) if c == a: - raise ValueError(f'The value for c ({c}) is invalid (it represents atom A, not atom C)') + raise ValueError( + f"The value for c ({c}) is invalid (it represents atom A, not atom C)" + ) if d == b: - raise ValueError(f'The value for d ({d}) is invalid (it represents atom B, not atom D)') + raise ValueError( + f"The value for d ({d}) is invalid (it represents atom B, not atom D)" + ) - zmat_1, zmat_2 = generate_the_two_constrained_zmats(xyz_1, xyz_2, mol_1, mol_2, h1, h2, a, b, c, d) + zmat_1, zmat_2 = generate_the_two_constrained_zmats( + xyz_1, xyz_2, mol_1, mol_2, h1, h2, a, b, c, d + ) # Stretch the A--H1 and B--H2 bonds. stretch_zmat_bond(zmat=zmat_1, indices=(h1, a), stretch=r1_stretch) stretch_zmat_bond(zmat=zmat_2, indices=(b, h2), stretch=r2_stretch) - add_dummy = is_a2_linear and len(zmat_1['symbols']) > 2 and not is_mol_1_linear - glue_params = determine_glue_params(zmat=zmat_1, - add_dummy=add_dummy, - a=a, - h1=h1, - c=c, - d=d, - ) - new_symbols, new_coords, new_vars, new_map = get_modified_params_from_zmat_2(zmat_1=zmat_1, - zmat_2=zmat_2, - reactant_2=reactant_2, - add_dummy=add_dummy, - glue_params=glue_params, - a=a, - h1=h1, - c=c, - a2=a2, - d2=d2, - d3=d3, - reactants_reversed=reactants_reversed, - ) - combined_zmat = {'symbols': new_symbols, 'coords': new_coords, 'vars': new_vars, 'map': new_map} - for i, coords in enumerate(combined_zmat['coords']): + add_dummy = is_a2_linear and len(zmat_1["symbols"]) > 2 and not is_mol_1_linear + glue_params = determine_glue_params( + zmat=zmat_1, + add_dummy=add_dummy, + a=a, + h1=h1, + c=c, + d=d, + ) + new_symbols, new_coords, new_vars, new_map = get_modified_params_from_zmat_2( + zmat_1=zmat_1, + zmat_2=zmat_2, + reactant_2=reactant_2, + add_dummy=add_dummy, + glue_params=glue_params, + a=a, + h1=h1, + c=c, + a2=a2, + d2=d2, + d3=d3, + reactants_reversed=reactants_reversed, + ) + combined_zmat = { + "symbols": new_symbols, + "coords": new_coords, + "vars": new_vars, + "map": new_map, + } + for i, coords in enumerate(combined_zmat["coords"]): if i > 2 and None in coords: - raise ValueError(f'Could not combine zmats, got a None parameter beyond the 3rd row:\n' - f'{coords} in:\n{combined_zmat}') + raise ValueError( + f"Could not combine zmats, got a None parameter beyond the 3rd row:\n" + f"{coords} in:\n{combined_zmat}" + ) ts_xyz = zmat_to_xyz(zmat=combined_zmat, keep_dummy=keep_dummy) return ts_xyz -def generate_the_two_constrained_zmats(xyz_1: dict, - xyz_2: dict, - mol_1: 'Molecule', - mol_2: 'Molecule', - h1: int, - h2: int, - a: int, - b: int, - c: Optional[int], - d: Optional[int], - ) -> Tuple[dict, dict]: +def generate_the_two_constrained_zmats( + xyz_1: dict, + xyz_2: dict, + mol_1: "Molecule", + mol_2: "Molecule", + h1: int, + h2: int, + a: int, + b: int, + c: Optional[int], + d: Optional[int], +) -> Tuple[dict, dict]: """ Generate the two constrained zmats required for combining coordinates with a redundant atom. @@ -539,24 +611,28 @@ def generate_the_two_constrained_zmats(xyz_1: dict, Returns: Tuple[dict, dict]: The two zmats. """ - zmat1 = zmat_from_xyz(xyz=xyz_1, - mol=mol_1, - is_ts=True, - constraints={'A_group': [(h1, a, c)]} if c is not None else {'R_atom': [(h1, a)]}, - consolidate=False, - ) - zmat2 = zmat_from_xyz(xyz=xyz_2, - mol=mol_2, - is_ts=True, - constraints={'A_group': [(d, b, h2)]} if d is not None else {'R_group': [(b, h2)]}, - consolidate=False, - ) + zmat1 = zmat_from_xyz( + xyz=xyz_1, + mol=mol_1, + is_ts=True, + constraints=( + {"A_group": [(h1, a, c)]} if c is not None else {"R_atom": [(h1, a)]} + ), + consolidate=False, + ) + zmat2 = zmat_from_xyz( + xyz=xyz_2, + mol=mol_2, + is_ts=True, + constraints=( + {"A_group": [(d, b, h2)]} if d is not None else {"R_group": [(b, h2)]} + ), + consolidate=False, + ) return zmat1, zmat2 -def stretch_zmat_bond(zmat: dict, - indices: Tuple[int, int], - stretch: float): +def stretch_zmat_bond(zmat: dict, indices: Tuple[int, int], stretch: float): """ Stretch a bond in a zmat. @@ -565,17 +641,20 @@ def stretch_zmat_bond(zmat: dict, indices (tuple): A length 2 tuple with the 0-indices of the xyz (not zmat) atoms representing the bond to stretch. stretch (float): The factor by which to multiply (stretch/shrink) the bond length. """ - param = get_parameter_from_atom_indices(zmat=zmat, indices=indices, xyz_indexed=True) - zmat['vars'][param] *= stretch - - -def determine_glue_params(zmat: dict, - add_dummy: bool, - h1: int, - a: int, - c: Optional[int], - d: Optional[int], - ) -> Tuple[str, str, str]: + param = get_parameter_from_atom_indices( + zmat=zmat, indices=indices, xyz_indexed=True + ) + zmat["vars"][param] *= stretch + + +def determine_glue_params( + zmat: dict, + add_dummy: bool, + h1: int, + a: int, + c: Optional[int], + d: Optional[int], +) -> Tuple[str, str, str]: """ Determine glue parameters ``a2``, ``d2``, and ``d3`` for combining two zmats. Modifies the ``zmat`` argument if a dummy atom needs to be added. @@ -591,49 +670,60 @@ def determine_glue_params(zmat: dict, Returns: Tuple[str, str, str]: The a2, d2, and d3 zmat glue parameters. """ - num_atoms_1 = len(zmat['symbols']) # The number of atoms in zmat1, used to increment the atom indices in zmat2. + num_atoms_1 = len( + zmat["symbols"] + ) # The number of atoms in zmat1, used to increment the atom indices in zmat2. # zh = num_atoms_1 - 1 # The atom index of H in the combined zmat. - za = key_by_val(zmat['map'], a) # The atom index of A in the combined zmat. - h1 = key_by_val(zmat['map'], h1) # The atom index of H1 in the combined zmat. - zb = num_atoms_1 + int(add_dummy) # The atom index of B in the combined zmat, if a2=180, consider the dummy atom. - zc = key_by_val(zmat['map'], c) if c is not None else None - zd = num_atoms_1 + 1 + int(add_dummy) if d is not None else None # The atom index of D in the combined zmat. + za = key_by_val(zmat["map"], a) # The atom index of A in the combined zmat. + h1 = key_by_val(zmat["map"], h1) # The atom index of H1 in the combined zmat. + zb = num_atoms_1 + int( + add_dummy + ) # The atom index of B in the combined zmat, if a2=180, consider the dummy atom. + zc = key_by_val(zmat["map"], c) if c is not None else None + zd = ( + num_atoms_1 + 1 + int(add_dummy) if d is not None else None + ) # The atom index of D in the combined zmat. if add_dummy: # Add a dummy atom. - zmat['map'][len(zmat['symbols'])] = f"X{len(zmat['symbols'])}" + zmat["map"][len(zmat["symbols"])] = f"X{len(zmat['symbols'])}" zx = num_atoms_1 - zmat['symbols'] = tuple(list(zmat['symbols']) + ['X']) - r_str = f'RX_{zx}_{h1}' - a_str = f'AX_{zx}_{h1}_{za}' - d_str = f'DX_{zx}_{h1}_{za}_{zc}' if zc is not None else None # X-H-A-C - zmat['coords'] = tuple(list(zmat['coords']) + [(r_str, a_str, d_str)]) # The coords of the dummy atom. - zmat['vars'][r_str] = 1.0 - zmat['vars'][a_str] = 90.0 + zmat["symbols"] = tuple(list(zmat["symbols"]) + ["X"]) + r_str = f"RX_{zx}_{h1}" + a_str = f"AX_{zx}_{h1}_{za}" + d_str = f"DX_{zx}_{h1}_{za}_{zc}" if zc is not None else None # X-H-A-C + zmat["coords"] = tuple( + list(zmat["coords"]) + [(r_str, a_str, d_str)] + ) # The coords of the dummy atom. + zmat["vars"][r_str] = 1.0 + zmat["vars"][a_str] = 90.0 if d_str is not None: - zmat['vars'][d_str] = 0 - param_a2 = f'A_{zb}_{h1}_{za}' # B-H-A - param_d2 = f'D_{zb}_{h1}_{zx}_{za}' # B-H-X-A - param_d3 = f'D_{zd}_{zb}_{za}_{zc if c is not None else zx}' if d is not None else None # D-B-A-C/X + zmat["vars"][d_str] = 0 + param_a2 = f"A_{zb}_{h1}_{za}" # B-H-A + param_d2 = f"D_{zb}_{h1}_{zx}_{za}" # B-H-X-A + param_d3 = ( + f"D_{zd}_{zb}_{za}_{zc if c is not None else zx}" if d is not None else None + ) # D-B-A-C/X else: - param_a2 = f'A_{zb}_{h1}_{za}' # B-H-A - param_d2 = f'D_{zb}_{h1}_{za}_{zc}' if zc is not None else None # B-H-A-C - param_d3 = f'D_{zd}_{zb}_{h1}_{za}' if d is not None else None # D-B-H-A + param_a2 = f"A_{zb}_{h1}_{za}" # B-H-A + param_d2 = f"D_{zb}_{h1}_{za}_{zc}" if zc is not None else None # B-H-A-C + param_d3 = f"D_{zd}_{zb}_{h1}_{za}" if d is not None else None # D-B-H-A return param_a2, param_d2, param_d3 -def get_modified_params_from_zmat_2(zmat_1: dict, - zmat_2: dict, - reactant_2: ARCSpecies, - add_dummy: bool, - glue_params: Tuple[str, str, str], - h1: int, - a: int, - c: Optional[int], - a2: float, - d2: Optional[float], - d3: Optional[float], - reactants_reversed: bool = False, - ) -> Tuple[tuple, tuple, dict, dict]: +def get_modified_params_from_zmat_2( + zmat_1: dict, + zmat_2: dict, + reactant_2: ARCSpecies, + add_dummy: bool, + glue_params: Tuple[str, str, str], + h1: int, + a: int, + c: Optional[int], + a2: float, + d2: Optional[float], + d3: Optional[float], + reactants_reversed: bool = False, +) -> Tuple[tuple, tuple, dict, dict]: """ Generate a modified zmat2 (in parts): Remove the first atom, change the indices of all existing parameter, and add "glue" parameters. @@ -657,27 +747,29 @@ def get_modified_params_from_zmat_2(zmat_1: dict, Tuple[tuple, tuple, dict, dict]: new_symbols, new_coords, new_vars, new_map. """ # Remove the redundant H from zmat_2, it's the first atom. No need for further sorting, the zmat map will do that. - new_symbols = tuple(zmat_1['symbols'] + zmat_2['symbols'][1:]) + new_symbols = tuple(zmat_1["symbols"] + zmat_2["symbols"][1:]) new_coords, new_vars = list(), dict() param_a2, param_d2, param_d3 = glue_params - num_atoms_1 = len(zmat_1['symbols']) - for i, coords in enumerate(zmat_2['coords'][1:]): + num_atoms_1 = len(zmat_1["symbols"]) + for i, coords in enumerate(zmat_2["coords"][1:]): new_coord = list() for j, param in enumerate(coords): if param is not None: if i == 0: # Atom B should refer to H1. - new_param = f'R_{num_atoms_1}_{h1}' + new_param = f"R_{num_atoms_1}_{h1}" elif i == 1 and j == 0: # Atom D should refer to atom B. - new_param = f'R_{num_atoms_1 + 1}_{num_atoms_1}' + new_param = f"R_{num_atoms_1 + 1}_{num_atoms_1}" elif i == 1 and j == 1: - new_param = f'A_{num_atoms_1 + 1}_{num_atoms_1}_{a}' + new_param = f"A_{num_atoms_1 + 1}_{num_atoms_1}_{a}" else: new_param = up_param(param=param, increment=num_atoms_1 - 1) new_coord.append(new_param) - new_vars[new_param] = zmat_2['vars'][param] # Keep the original parameter R/A/D values. + new_vars[new_param] = zmat_2["vars"][ + param + ] # Keep the original parameter R/A/D values. else: if i == 0 and j == 1: # This is a2. @@ -694,21 +786,23 @@ def get_modified_params_from_zmat_2(zmat_1: dict, else: new_coord.append(None) new_coords.append(tuple(new_coord)) - new_map = get_new_zmat_2_map(zmat_1=zmat_1, - zmat_2=zmat_2, - reactant_2=reactant_2, - reactants_reversed=reactants_reversed, - ) - new_coords = tuple(list(zmat_1['coords']) + new_coords) - new_vars = {**zmat_1['vars'], **new_vars} + new_map = get_new_zmat_2_map( + zmat_1=zmat_1, + zmat_2=zmat_2, + reactant_2=reactant_2, + reactants_reversed=reactants_reversed, + ) + new_coords = tuple(list(zmat_1["coords"]) + new_coords) + new_vars = {**zmat_1["vars"], **new_vars} return new_symbols, new_coords, new_vars, new_map -def get_new_zmat_2_map(zmat_1: dict, - zmat_2: dict, - reactant_2: Optional[ARCSpecies], - reactants_reversed: bool = False, - ) -> Dict[int, Union[int, str]]: +def get_new_zmat_2_map( + zmat_1: dict, + zmat_2: dict, + reactant_2: Optional[ARCSpecies], + reactants_reversed: bool = False, +) -> Dict[int, Union[int, str]]: """ Get the map of the combined zmat ignoring the redundant H in ``zmat_2``. @@ -729,26 +823,34 @@ def get_new_zmat_2_map(zmat_1: dict, Returns: Dict[int, Union[int, str]]: The combined zmat map element. """ - new_map = get_new_map_based_on_zmat_1(zmat_1=zmat_1, zmat_2=zmat_2, reactants_reversed=reactants_reversed) + new_map = get_new_map_based_on_zmat_1( + zmat_1=zmat_1, zmat_2=zmat_2, reactants_reversed=reactants_reversed + ) zmat_2_mod = remove_1st_atom(zmat_2) - zmat_2_mod['map'] = relocate_zmat_dummy_atoms_to_the_end(zmat_2_mod['map']) - spc_from_zmat_2 = ARCSpecies(label='spc_from_zmat_2', xyz=zmat_2_mod) - atom_map = map_two_species(spc_1=spc_from_zmat_2, spc_2=reactant_2, consider_chirality=False) - new_map = update_new_map_based_on_zmat_2(new_map=new_map, - zmat_2=zmat_2_mod, - num_atoms_1=len(zmat_1['symbols']), - atom_map=atom_map, - reactants_reversed=reactants_reversed, - ) + zmat_2_mod["map"] = relocate_zmat_dummy_atoms_to_the_end(zmat_2_mod["map"]) + spc_from_zmat_2 = ARCSpecies(label="spc_from_zmat_2", xyz=zmat_2_mod) + atom_map = map_two_species( + spc_1=spc_from_zmat_2, spc_2=reactant_2, consider_chirality=False + ) + new_map = update_new_map_based_on_zmat_2( + new_map=new_map, + zmat_2=zmat_2_mod, + num_atoms_1=len(zmat_1["symbols"]), + atom_map=atom_map, + reactants_reversed=reactants_reversed, + ) if len(list(new_map.values())) != len(set(new_map.values())): - raise ValueError(f'Could not generate a combined zmat map with no repeating values.\n{new_map}') + raise ValueError( + f"Could not generate a combined zmat map with no repeating values.\n{new_map}" + ) return new_map -def get_new_map_based_on_zmat_1(zmat_1: dict, - zmat_2: dict, - reactants_reversed: bool = False, - ) -> dict: +def get_new_map_based_on_zmat_1( + zmat_1: dict, + zmat_2: dict, + reactants_reversed: bool = False, +) -> dict: """ Generate an initial map for the combined zmats, here only consider ``zmat_1``. @@ -761,21 +863,22 @@ def get_new_map_based_on_zmat_1(zmat_1: dict, dict: The initial map for the combined zmats. """ new_map = dict() - val_inc = len(zmat_2['symbols']) - 1 if reactants_reversed else 0 - for key, val in zmat_1['map'].items(): - if isinstance(val, str) and 'X' in val: - new_map[key] = f'X{int(val[1:]) + val_inc}' + val_inc = len(zmat_2["symbols"]) - 1 if reactants_reversed else 0 + for key, val in zmat_1["map"].items(): + if isinstance(val, str) and "X" in val: + new_map[key] = f"X{int(val[1:]) + val_inc}" else: new_map[key] = val + val_inc return new_map -def update_new_map_based_on_zmat_2(new_map: dict, - zmat_2: dict, - num_atoms_1, - atom_map: dict, - reactants_reversed: bool = False, - ): +def update_new_map_based_on_zmat_2( + new_map: dict, + zmat_2: dict, + num_atoms_1, + atom_map: dict, + reactants_reversed: bool = False, +): """ Update the map for the combined zmats, here only consider the modified version of ``zmat_2``. This function assumes that all dummy atoms are located at the end of the respective Cartesian coordinates @@ -794,18 +897,20 @@ def update_new_map_based_on_zmat_2(new_map: dict, key_inc = num_atoms_1 val_inc = 0 if reactants_reversed else num_atoms_1 dummy_atom_counter = 0 - num_of_non_x_atoms_in_zmat_2 = len([val for val in zmat_2['map'].values() if isinstance(val, int)]) - for key, val in zmat_2['map'].items(): + num_of_non_x_atoms_in_zmat_2 = len( + [val for val in zmat_2["map"].values() if isinstance(val, int)] + ) + for key, val in zmat_2["map"].items(): # Atoms in zmat_2 always come after atoms in zmat_1 in the new zmat, regardless of the reactants/products # order on each side of the given reaction. new_key = key + key_inc # Use the atom_map to map atoms in zmat_2 (i.e., values in zmat_2's 'map') to atoms in the R(*3) # **reactant** (at least for H-Abstraction reactions), since zmat_2 was built based on atoms in the R(*3)-H(*2) # **product** (at least for H-Abstraction reactions). - if isinstance(val, str) and 'X' in val: + if isinstance(val, str) and "X" in val: # A dummy atom is not in the atom_map. new_val = num_of_non_x_atoms_in_zmat_2 + val_inc + dummy_atom_counter - new_val = f'X{new_val}' + new_val = f"X{new_val}" dummy_atom_counter += 1 else: new_val = atom_map[val] + val_inc @@ -813,11 +918,12 @@ def update_new_map_based_on_zmat_2(new_map: dict, return new_map -def react(reactants: List[Union[Molecule, Species]], - products: List[Union[Molecule, Species]], - family: 'KineticsFamily', - arc_reaction: 'ARCReaction', - ) -> Optional[List[Reaction]]: +def react( + reactants: List[Union[Molecule, Species]], + products: List[Union[Molecule, Species]], + family: "KineticsFamily", + arc_reaction: "ARCReaction", +) -> Optional[List[Reaction]]: """ React molecules to give the requested products via an RMG family, resulting in a reaction with RMG's atom labels for the reactants and products. @@ -834,35 +940,51 @@ def react(reactants: List[Union[Molecule, Species]], # Assure Molecule object instances: reactant_mols, product_mols = list(), list() for reactant in reactants: - reactant_mols.append(reactant.copy(deep=True) if isinstance(reactant, Molecule) - else reactant.molecule[0].copy(deep=True)) + reactant_mols.append( + reactant.copy(deep=True) + if isinstance(reactant, Molecule) + else reactant.molecule[0].copy(deep=True) + ) for product in products: - product_mols.append(product.copy(deep=True) if isinstance(product, Molecule) - else product.molecule[0].copy(deep=True)) - reactants_copy, products_copy = [r.copy(deep=True) for r in reactant_mols], [p.copy(deep=True) for p in product_mols] + product_mols.append( + product.copy(deep=True) + if isinstance(product, Molecule) + else product.molecule[0].copy(deep=True) + ) + reactants_copy, products_copy = [r.copy(deep=True) for r in reactant_mols], [ + p.copy(deep=True) for p in product_mols + ] try: - reactions = family.generate_reactions(reactants=reactants_copy, - products=products_copy, - prod_resonance=False, - delete_labels=False, - relabel_atoms=False, - ) + reactions = family.generate_reactions( + reactants=reactants_copy, + products=products_copy, + prod_resonance=False, + delete_labels=False, + relabel_atoms=False, + ) except (ActionError, ValueError): return None for reaction in reactions: try: - family.add_atom_labels_for_reaction(reaction=reaction, - output_with_resonance=False, - save_order=True, - ) + family.add_atom_labels_for_reaction( + reaction=reaction, + output_with_resonance=False, + save_order=True, + ) except (ActionError, ValueError): continue for i in range(len(reactions)): - r_map, p_map = map_arc_rmg_species(arc_reaction=arc_reaction, rmg_reaction=reactions[i], concatenate=False) - ordered_rmg_reactants = [reactions[i].reactants[r_map[j]] for j in range(len(reactions[i].reactants))] - ordered_rmg_products = [reactions[i].products[p_map[j]] for j in range(len(reactions[i].products))] + r_map, p_map = map_arc_rmg_species( + arc_reaction=arc_reaction, rmg_reaction=reactions[i], concatenate=False + ) + ordered_rmg_reactants = [ + reactions[i].reactants[r_map[j]] for j in range(len(reactions[i].reactants)) + ] + ordered_rmg_products = [ + reactions[i].products[p_map[j]] for j in range(len(reactions[i].products)) + ] reactions[i].reactants = ordered_rmg_reactants reactions[i].products = ordered_rmg_products @@ -871,31 +993,41 @@ def react(reactants: List[Union[Molecule, Species]], for reaction in reactions: for index in range(len(reaction.reactants)): # The RMG molecule will get a random 3D conformer, don't consider chirality when mapping. - atom_map = map_two_species(spc_1=reactant_mols[index], - spc_2=reaction.reactants[index], - consider_chirality=False, - ) + atom_map = map_two_species( + spc_1=reactant_mols[index], + spc_2=reaction.reactants[index], + consider_chirality=False, + ) if atom_map is None: break new_atoms_list = list() for i in range(len(reactant_mols[index].atoms)): - reactant_mols[index].atoms[i].id = reaction.reactants[index].molecule[0].atoms[atom_map[i]].id - reactant_mols[index].atoms[i].label = reaction.reactants[index].molecule[0].atoms[atom_map[i]].label + reactant_mols[index].atoms[i].id = ( + reaction.reactants[index].molecule[0].atoms[atom_map[i]].id + ) + reactant_mols[index].atoms[i].label = ( + reaction.reactants[index].molecule[0].atoms[atom_map[i]].label + ) new_atoms_list.append(reactant_mols[index].atoms[i]) reaction.reactants[index].molecule[0].atoms = new_atoms_list else: for index in range(len(reaction.products)): # The RMG molecule will get a random 3D conformer, don't consider chirality when mapping. - atom_map = map_two_species(spc_1=product_mols[index], - spc_2=reaction.products[index], - consider_chirality=False, - ) + atom_map = map_two_species( + spc_1=product_mols[index], + spc_2=reaction.products[index], + consider_chirality=False, + ) if atom_map is None: break new_atoms_list = list() for i in range(len(product_mols[index].atoms)): - product_mols[index].atoms[i].id = reaction.products[index].molecule[0].atoms[atom_map[i]].id - product_mols[index].atoms[i].label = reaction.products[index].molecule[0].atoms[atom_map[i]].label + product_mols[index].atoms[i].id = ( + reaction.products[index].molecule[0].atoms[atom_map[i]].id + ) + product_mols[index].atoms[i].label = ( + reaction.products[index].molecule[0].atoms[atom_map[i]].label + ) new_atoms_list.append(product_mols[index].atoms[i]) reaction.products[index].molecule[0].atoms = new_atoms_list else: @@ -913,9 +1045,10 @@ def react(reactants: List[Union[Molecule, Species]], return output -def find_distant_neighbor(rmg_mol: 'Molecule', - start: int, - ) -> Optional[int]: +def find_distant_neighbor( + rmg_mol: "Molecule", + start: int, +) -> Optional[int]: """ Find the 0-index of a distant neighbor (2 steps away) if possible from the starting atom. Preferably, a heavy atom will be returned. @@ -944,14 +1077,15 @@ def find_distant_neighbor(rmg_mol: 'Molecule', # Family-specific heuristics functions: -def h_abstraction(arc_reaction: 'ARCReaction', - rmg_reactions: List['Reaction'], - r1_stretch: float = 1.2, - r2_stretch: float = 1.2, - a2: float = 180, - dihedral_increment: Optional[int] = None, - path: str = '', - ) -> List[dict]: +def h_abstraction( + arc_reaction: "ARCReaction", + rmg_reactions: List["Reaction"], + r1_stretch: float = 1.2, + r2_stretch: float = 1.2, + a2: float = 180, + dihedral_increment: Optional[int] = None, + path: str = "", +) -> List[dict]: """ Generate TS guesses for reactions of the RMG ``H_Abstraction`` family. @@ -971,7 +1105,9 @@ def h_abstraction(arc_reaction: 'ARCReaction', Entries are Cartesian coordinates of TS guesses for all reactions. """ if not len(rmg_reactions): - logger.warning(f'Cannot generate TS guesses for {arc_reaction} without an RMG Reaction object instance.') + logger.warning( + f"Cannot generate TS guesses for {arc_reaction} without an RMG Reaction object instance." + ) return list() xyz_guesses = list() @@ -982,39 +1118,58 @@ def h_abstraction(arc_reaction: 'ARCReaction', # The expected RMG atom labels are: R(*1)-H(*2) + R(*3)j <=> R(*1)j + R(*3)-H(*2). reactants_reversed, products_reversed = False, False for atom in rmg_reactions[0].reactants[1].molecule[0].atoms: - if atom.label == '*2': + if atom.label == "*2": reactants_reversed = True break for atom in rmg_reactions[0].products[0].molecule[0].atoms: - if atom.label == '*2': + if atom.label == "*2": products_reversed = True break - arc_reactants, arc_products = arc_reaction.get_reactants_and_products(arc=True, return_copies=False) + arc_reactants, arc_products = arc_reaction.get_reactants_and_products( + arc=True, return_copies=False + ) arc_reactant = arc_reactants[int(reactants_reversed)] # Get R(*1)-H(*2). arc_product = arc_products[int(not products_reversed)] # Get R(*3)-H(*2). - if any([is_linear(coordinates=np.array(arc_reactant.get_xyz()['coords'])), - is_linear(coordinates=np.array(arc_product.get_xyz()['coords']))]) and is_angle_linear(a2): + if any( + [ + is_linear(coordinates=np.array(arc_reactant.get_xyz()["coords"])), + is_linear(coordinates=np.array(arc_product.get_xyz()["coords"])), + ] + ) and is_angle_linear(a2): # Don't modify dihedrals for an attacking H (or other linear radical) at a linear angle, C ~ A -- H1 - H2 -- H. dihedral_increment = 360 for rmg_reaction in rmg_reactions: rmg_reactant_mol = rmg_reaction.reactants[int(reactants_reversed)].molecule[0] rmg_product_mol = rmg_reaction.products[int(not products_reversed)].molecule[0] - h1 = rmg_reactant_mol.atoms.index([atom for atom in rmg_reactant_mol.atoms if atom.label == '*2'][0]) - h2 = rmg_product_mol.atoms.index([atom for atom in rmg_product_mol.atoms if atom.label == '*2'][0]) - a = rmg_reactant_mol.atoms.index(list(rmg_reactant_mol.atoms[h1].edges.keys())[0]) + h1 = rmg_reactant_mol.atoms.index( + [atom for atom in rmg_reactant_mol.atoms if atom.label == "*2"][0] + ) + h2 = rmg_product_mol.atoms.index( + [atom for atom in rmg_product_mol.atoms if atom.label == "*2"][0] + ) + a = rmg_reactant_mol.atoms.index( + list(rmg_reactant_mol.atoms[h1].edges.keys())[0] + ) b = rmg_product_mol.atoms.index(list(rmg_product_mol.atoms[h2].edges.keys())[0]) c = find_distant_neighbor(rmg_mol=rmg_reactant_mol, start=h1) d = find_distant_neighbor(rmg_mol=rmg_product_mol, start=h2) # d2 describes the B-H-A-C dihedral, populate d2_values if C exists and the B-H-A angle (a2) is not linear. - d2_values = list(range(0, 360, dihedral_increment)) if len(rmg_reactant_mol.atoms) > 2 \ - and not is_angle_linear(a2) else list() + d2_values = ( + list(range(0, 360, dihedral_increment)) + if len(rmg_reactant_mol.atoms) > 2 and not is_angle_linear(a2) + else list() + ) # d3 describes the D-B-H-A dihedral, populate d3_values if D exists. - d3_values = list(range(0, 360, dihedral_increment)) if len(rmg_product_mol.atoms) > 2 else list() + d3_values = ( + list(range(0, 360, dihedral_increment)) + if len(rmg_product_mol.atoms) > 2 + else list() + ) if len(d2_values) and len(d3_values): d2_d3_product = list(itertools.product(d2_values, d3_values)) @@ -1034,7 +1189,9 @@ def h_abstraction(arc_reaction: 'ARCReaction', xyz_2=arc_product.get_xyz(), mol_1=rmg_reactant_mol, mol_2=rmg_product_mol, - reactant_2=arc_reaction.get_reactants_and_products()[0][int(not reactants_reversed)], + reactant_2=arc_reaction.get_reactants_and_products()[0][ + int(not reactants_reversed) + ], h1=h1, h2=h2, c=c, @@ -1047,7 +1204,9 @@ def h_abstraction(arc_reaction: 'ARCReaction', reactants_reversed=reactants_reversed, ) except ValueError as e: - logger.error(f'Could not generate a guess using Heuristics for H abstraction reaction, got:\n{e}') + logger.error( + f"Could not generate a guess using Heuristics for H abstraction reaction, got:\n{e}" + ) if xyz_guess is not None and not colliding_atoms(xyz_guess): zmat_guess = zmat_from_xyz(xyz_guess, is_ts=True) @@ -1074,96 +1233,100 @@ def h_abstraction(arc_reaction: 'ARCReaction', try: h_abs_atoms_dict = get_h_abs_atoms(df_dmat) - a = h_abs_atoms_dict['A'] - h = h_abs_atoms_dict['H'] - b = h_abs_atoms_dict['B'] - xyz_guess = crest_ts_conformer_search(xyz_guess_crest, a, h, b, path=path, xyz_crest_int=i) - if xyz_guess is not None: - logger.info('Successfully completed crest conformer search:' - f' {xyz_to_str(xyz_guess)}') - xyz_guesses.append(xyz_guess) - except (ValueError or KeyError) as e: - logger.error(f'Could not determine the H abstraction atoms, got:\n{e}') + crest_path = crest_ts_conformer_search( + xyz_guess_crest, + h_abs_atoms_dict["A"], + h_abs_atoms_dict["H"], + h_abs_atoms_dict["B"], + path=path, + ) + crest_paths.append(crest_path) + except (ValueError, KeyError) as e: + logger.error(f"Could not determine the H abstraction atoms, got:\n{e}") + + if crest_paths: + crest_jobs = submit_crest_jobs(crest_paths) + monitor_crest_jobs(crest_jobs) # Keep checking job statuses until complete + process_completed_jobs(crest_jobs, xyz_guesses) + else: + logger.error("No CREST paths found") return xyz_guesses -def crest_ts_conformer_search(xyz_guess: dict, a_atom: int, h_atom: int, b_atom: int, path: str = '', xyz_crest_int: int = 0 - ) -> None: +def crest_ts_conformer_search( + xyz_guess: dict, + a_atom: int, + h_atom: int, + b_atom: int, + path: str = "", + xyz_crest_int: int = 0, +) -> None: """ Perform a conformer search for the TS guess using CREST. - - + + # TODO: Check if CREST is available # TODO: Pass the first xyz guess from h_abstraction to CREST # TODO: Pass the atom A - H - B for constraints - + """ - path = os.path.join(path, 'crest') + path = os.path.join(path, f"crest_{xyz_crest_int}") if not os.path.exists(path): os.makedirs(path) # Write coords to coords.ref file - symbols = xyz_guess['symbols'] - coords = xyz_guess['coords'] - angstrom_to_bohr = 1.8897259886 - - - with open(os.path.join(path, 'coords.ref'), 'w') as f: - # Format - XYZ in Bohr: - # - # $coord - # x y z atom1 - # x y z atom2 - # ... - # $end - - f.write('$coord\n') - for i, (x, y, z) in enumerate(coords): - # Convert to Bohr from Angstrom - x_bohr = x * angstrom_to_bohr - y_bohr = y * angstrom_to_bohr - z_bohr = z * angstrom_to_bohr - f.write(f' {x_bohr:>18.14f} {y_bohr:>18.14f} {z_bohr:>18.14f} {symbols[i]}\n') - f.write('$end\n') - + symbols = xyz_guess["symbols"] + + converted_coords = str_to_str( + xyz_str=xyz_to_str(xyz_guess), reverse_atoms=True, convert_to="bohr" + ) + coords_ref_content = f"$coord\n{converted_coords}\n$end\n" + coords_ref_path = os.path.join(path, "coords.ref") + + try: + with open(coords_ref_path, "w") as f: + f.write(coords_ref_content) + except IOError as e: + print(f"Failed to write to {coords_ref_path}: {e}") + return None + # Get count of atoms - remember to add 1 to all atom numbers num_atoms = len(symbols) a_atom += 1 h_atom += 1 b_atom += 1 - list_of_atoms_numbers_not_participating_in_reaction = [i for i in range(1, num_atoms + 1) if i not in [a_atom, h_atom, b_atom]] + list_of_atoms_numbers_not_participating_in_reaction = [ + i for i in range(1, num_atoms + 1) if i not in [a_atom, h_atom, b_atom] + ] - with open(os.path.join(path, 'constraints.inp'), 'w') as f: + with open(os.path.join(path, "constraints.inp"), "w") as f: # Header - f.write('$constrain\n') - f.write(f' atoms: {a_atom}, {h_atom}, {b_atom}\n') - f.write(' force constant: 0.5\n') - f.write(' reference=coords.ref\n') - + f.write("$constrain\n") + f.write(f" atoms: {a_atom}, {h_atom}, {b_atom}\n") + f.write(" force constant: 0.5\n") + f.write(" reference=coords.ref\n") + # Distance constraints - f.write(f' distance: {a_atom}, {h_atom}, auto\n') - f.write(f' distance: {h_atom}, {b_atom}, auto\n') - + f.write(f" distance: {a_atom}, {h_atom}, auto\n") + f.write(f" distance: {h_atom}, {b_atom}, auto\n") + # Metadynamics setup - f.write('$metadyn\n') - f.write(f' atoms: {", ".join(map(str, list_of_atoms_numbers_not_participating_in_reaction))}\n') - f.write('$end\n') - # Run CREST - # Prepare the command + f.write("$metadyn\n") + f.write( + f' atoms: {", ".join(map(str, list_of_atoms_numbers_not_participating_in_reaction))}\n' + ) + f.write("$end\n") + commands = [ - f'{CREST_PATH}', - 'coords.ref', - '--cinp constraints.inp', - '--noreftopo', + f"{CREST_PATH}", + "coords.ref", + "--cinp constraints.inp", + "--noreftopo", f' -T {SERVERS["local"].get("cpus", 8)}', ] - command = ' '.join(commands) - command = f"{CREST_ENV_PATH} && {command}" if CREST_ENV_PATH else command - # Run the command using Popen - process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=path, executable='/bin/bash') - stdout, stderr = process.communicate() + command = " ".join(commands) if process.returncode == 0: print("Command completed successfully.") @@ -1173,28 +1336,27 @@ def crest_ts_conformer_search(xyz_guess: dict, a_atom: int, h_atom: int, b_atom: xyz_guess = str_to_xyz(content) return xyz_guess else: - print(f"Command failed with return code {process.returncode}") - print(f"Standard Output: {stdout.decode()}") - print(f"Standard Error: {stderr.decode()}") - return None + activation_line = "" - if SERVERS.get('local') is not None: - if SERVERS['local']['cluster_soft'].lower() in ['condor', 'htcondor']: + if SERVERS.get("local") is not None: + if SERVERS["local"]["cluster_soft"].lower() in ["condor", "htcondor"]: # Write job submission scripts - sub_job = submit_scripts['local']['crest'] + sub_job = submit_scripts["local"]["crest"] format_params = { "name": f"crest_{xyz_crest_int}", - "cpus": SERVERS['local'].get('cpus', 8), - "memory": SERVERS['local'].get('memory', 32.0) * 1024, + "cpus": SERVERS["local"].get("cpus", 8), + "memory": SERVERS["local"].get("memory", 32.0) * 1024, } sub_job = sub_job.format(**format_params) - with open(os.path.join(path, settings['submit_filenames']['HTCondor']), 'w') as f: + with open( + os.path.join(path, settings["submit_filenames"]["HTCondor"]), "w" + ) as f: f.write(sub_job) # Write the crest job - crest_job = submit_scripts['local']['crest_job'] + crest_job = submit_scripts["local"]["crest_job"] format_params = { "path": path, "activation_line": activation_line, @@ -1202,29 +1364,45 @@ def crest_ts_conformer_search(xyz_guess: dict, a_atom: int, h_atom: int, b_atom: } crest_job = crest_job.format(**format_params) - with open(os.path.join(path, 'job.sh'), 'w') as f: + with open(os.path.join(path, "job.sh"), "w") as f: f.write(crest_job) # Need to make the job.sh file 777 - os.chmod(os.path.join(path, 'job.sh'), 0o777) - - elif SERVERS['local']['cluster_soft'].lower() == 'pbs': + os.chmod(os.path.join(path, "job.sh"), 0o777) + + #### I am not sure why this happens but getting error out.txt and err.txt not found (I would have assumed that the job.sh would create these files) + with open(os.path.join(path, "out.txt"), "w") as f: + f.write("") + with open(os.path.join(path, "err.txt"), "w") as f: + f.write("") + + os.chmod(os.path.join(path, "out.txt"), 0o777) + os.chmod(os.path.join(path, "err.txt"), 0o777) + + elif SERVERS["local"]["cluster_soft"].lower() == "pbs": # Write job submission scripts - sub_job = submit_scripts['local']['crest'] + sub_job = submit_scripts["local"]["crest"] format_params = { "queue": "alon_q", "name": f"crest_{xyz_crest_int}", - "cpus": SERVERS['local'].get('cpus', 8), - "memory": SERVERS['local'].get('memory', 32.0) if SERVERS['local'].get('memory', 32.0) < 60.0 else 40.0, + "cpus": SERVERS["local"].get("cpus", 8), + "memory": ( + SERVERS["local"].get("memory", 32.0) + if SERVERS["local"].get("memory", 32.0) < 60.0 + else 40.0 + ), "activation_line": activation_line, "commands": command, } sub_job = sub_job.format(**format_params) - with open(os.path.join(path, settings['submit_filenames']['PBS']), 'w') as f: + with open( + os.path.join(path, settings["submit_filenames"]["PBS"]), "w" + ) as f: f.write(sub_job) return path + def submit_crest_jobs(crest_paths: List[str]) -> None: """ Submit CREST jobs to the server. @@ -1265,6 +1443,7 @@ def monitor_crest_jobs(crest_jobs: dict, check_interval: int = 300) -> None: break time.sleep(min(check_interval, 100)) + def process_completed_jobs(crest_jobs: dict, xyz_guesses: list) -> None: """ Process the completed CREST jobs and update XYZ guesses. @@ -1289,6 +1468,7 @@ def process_completed_jobs(crest_jobs: dict, xyz_guesses: list) -> None: return xyz_guesses + def extract_digits(s: str) -> int: """ Extract the first integer from a string @@ -1398,4 +1578,4 @@ def get_h_abs_atoms(dataframe: pd.DataFrame) -> dict: return {"H": hydrogen_with_min_distance, "A": min_distance_column, "B": second_closest_atom} -register_job_adapter('heuristics', HeuristicsAdapter) +register_job_adapter("heuristics", HeuristicsAdapter)